From ae95d4dfa372a1b85be4c0bd262693f44d1cff59 Mon Sep 17 00:00:00 2001 From: liyongde <1419499670@qq.com> Date: Tue, 24 Mar 2026 13:50:50 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9Einfluxdb=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nl-iot/pom.xml | 11 +- .../iot/core/cache/MetadataCacheManager.java | 32 ++++- .../nl/iot/core/influxDB/InfluxDB2Config.java | 83 +++++++++++++ .../nl/iot/core/influxDB/InfluxDB2Util.java | 105 ++++++++++++++++ .../modular/influxdb/entity/PageResult.java | 112 ++++++++++++++++++ .../modular/influxdb/entity/PlcSignal.java | 48 ++++++++ .../influxdb/service/PlcSignalService.java | 28 +++++ .../service/impl/PlcSignalServiceImpl.java | 52 ++++++++ nl-web-app/src/main/resources/application.yml | 10 ++ pom.xml | 41 +++++++ 10 files changed, 512 insertions(+), 10 deletions(-) create mode 100644 nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java create mode 100644 nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java create mode 100644 nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PageResult.java create mode 100644 nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java create mode 100644 nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java create mode 100644 nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java diff --git a/nl-iot/pom.xml b/nl-iot/pom.xml index 5a0ac2e..fcded45 100644 --- a/nl-iot/pom.xml +++ b/nl-iot/pom.xml @@ -28,35 +28,36 @@ org.eclipse.milo sdk-client - 0.6.16 org.jinterop j-interop - 2.0.4 org.apache.plc4x plc4j-api - 0.13.0 org.apache.plc4x plc4j-driver-s7 - 0.13.0 runtime org.apache.plc4x plc4j-driver-modbus - 0.13.0 runtime + + + + com.influxdb + influxdb-client-java + \ No newline at end of file diff --git a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java index 3b432b4..26d8e52 100644 --- a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java +++ b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java @@ -1,11 +1,13 @@ package org.nl.iot.core.cache; +import cn.hutool.core.util.ObjectUtil; import lombok.extern.slf4j.Slf4j; import org.nl.iot.core.driver.bo.ConnectSiteDO; import org.nl.iot.core.driver.bo.DeviceConnectionBO; import org.nl.iot.core.driver.bo.DeviceInfoReadCacheDo; import org.nl.iot.core.driver.bo.SiteBO; import org.nl.iot.core.driver.entity.RValue; +import org.nl.iot.modular.influxdb.service.PlcSignalService; import org.nl.iot.modular.iot.service.IotConfigService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; @@ -29,6 +31,9 @@ public class MetadataCacheManager implements CommandLineRunner { @Autowired private IotConfigService configService; + @Autowired + private PlcSignalService plcSignalService; + private final Map deviceInfoCache = new ConcurrentHashMap<>(); /** @@ -183,6 +188,14 @@ public class MetadataCacheManager implements CommandLineRunner { // ==================== 信号值缓存方法 ==================== + /** + * 存储单个信号值 + * @param key connectCode.deviceCode.信号别名 + * @param rValue 信号值对象 + */ + public void putSignalValue(String key, RValue rValue) { + signalValueCache.put(key, rValue); + } /** * 存储单个信号值 * @param deviceKey connectCode.deviceCode @@ -203,13 +216,22 @@ public class MetadataCacheManager implements CommandLineRunner { if (rValues == null || rValues.isEmpty()) { return; } - - for (RValue rValue : rValues) { - if (rValue != null && rValue.getSiteBO() != null) { - String alias = rValue.getSiteBO().getAlias(); - putSignalValue(deviceKey, alias, rValue); + Map cacheSnapshot = getAllSignalValues(); + List needSaveValues = new ArrayList<>(); + for (RValue newRValue : rValues) { + if (newRValue != null && newRValue.getSiteBO() != null) { + String alias = newRValue.getSiteBO().getAlias(); + String key = deviceKey + "." + alias; + RValue oldValue = cacheSnapshot.get(key); + if (ObjectUtil.isNotEmpty(oldValue) && oldValue.getValue().equals(newRValue.getValue())) { + continue; + } + putSignalValue(key, newRValue); + needSaveValues.add(newRValue); } } + // 批量存储到influxDB + plcSignalService.batchSaveSignalToInfluxDB(needSaveValues); } /** diff --git a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java new file mode 100644 index 0000000..8c83f66 --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java @@ -0,0 +1,83 @@ +package org.nl.iot.core.influxDB; + +import com.influxdb.client.*; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * influx2.0 + * @Author: liyongde + * @Date: 2026/3/24 10:38 + * @Modified By: + */ +@Slf4j +@Configuration +public class InfluxDB2Config { + + @Value("${influx.url}") + private String url; + + @Value("${influx.token}") + private String token; + + @Value("${influx.org}") + private String org; + + @Value("${influx.bucket}") + private String bucket; + + /** + * 初始化InfluxDB 2.x客户端 + * 修复:通过OkHttpClient配置超时,替代不存在的set超时方法 + */ + @Bean + public InfluxDBClient influxDBClient() { + + // 创建客户端,传入自定义OkHttp配置 + InfluxDBClient client = InfluxDBClientFactory.create( + url, + token.toCharArray(), + org, + bucket + ); + + // 测试连接 + client.ping(); + log.info("InfluxDB 2.x 连接成功"); + return client; + } + + /** + * 写入API(开启批量写入,提升性能) + */ + @Bean + public WriteApi writeApi(InfluxDBClient client) { + // 批量写入配置:缓冲1000条、500ms刷新、最大5000条 + WriteOptions writeOptions = WriteOptions.builder() + .batchSize(1000) + .flushInterval(500) + .bufferLimit(5000) + .build(); + return client.makeWriteApi(writeOptions); + } + + /** + * 查询API + */ + @Bean + public QueryApi queryApi(InfluxDBClient client) { + return client.getQueryApi(); + } + + /** + * 项目关闭时销毁客户端 + */ + @PreDestroy + public void destroy() { + log.info("InfluxDB 2.x 连接关闭"); + influxDBClient().close(); + } +} \ No newline at end of file diff --git a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java new file mode 100644 index 0000000..301f8c6 --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java @@ -0,0 +1,105 @@ +package org.nl.iot.core.influxDB; + +import com.influxdb.client.QueryApi; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import jakarta.annotation.Resource; +import org.nl.iot.modular.influxdb.entity.PlcSignal; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * 工具类 + * @Author: liyongde + * @Date: 2026/3/24 10:43 + * @Modified By: + */ +@Component +public class InfluxDB2Util { + + @Value("${influx.bucket}") + private String bucket; + + @Value("${influx.org}") + private String org; + + @Resource + private WriteApi writeApi; + + @Resource + private QueryApi queryApi; + + /** + * 批量写入PLC信号 + */ + public void insertBatch(List datas) { + writeApi.writeMeasurements(WritePrecision.NS, datas); + } + + + /** + * 单条写入PLC信号 + */ + public void insertOne(PlcSignal data) { + insertOne(bucket, org, data); + } + + /** + * 单条写入PLC信号 + */ + public void insertOne(String bucket, String org, PlcSignal data) { + writeApi.writeMeasurement(bucket, org, WritePrecision.NS, data); + } + + /** + * PLC专用Flux查询结果转换 + */ + public List queryPlcFlux(String flux) { + List tables = queryApi.query(flux); + return tables.stream() + .flatMap(table -> table.getRecords().stream()) + .map(this::convertToPlcEntity) + .collect(Collectors.toList()); + } + + /** + * 执行统计查询,返回总数 + * @param flux Flux查询语句 + * @return 统计结果 + */ + public long queryCount(String flux) { + List tables = queryApi.query(flux); + if (tables.isEmpty()) { + return 0L; + } + + // 获取count结果 + return tables.stream() + .flatMap(table -> table.getRecords().stream()) + .mapToLong(record -> { + Object value = record.getValue(); + if (value instanceof Number) { + return ((Number) value).longValue(); + } + return 0L; + }) + .sum(); + } + + /** + * Flux结果转PLC实体 + */ + private PlcSignal convertToPlcEntity(FluxRecord record) { + return PlcSignal.builder() + .device(record.getValueByKey("device").toString()) + .tag(record.getValueByKey("tag").toString()) + .value(record.getValueByKey("value").toString()) + .changeTime(record.getTime()) + .build(); + } +} diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PageResult.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PageResult.java new file mode 100644 index 0000000..fd2f9a7 --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PageResult.java @@ -0,0 +1,112 @@ +package org.nl.iot.modular.influxdb.entity; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * 分页结果封装类 + * @param 数据类型 + * @Author: liyongde + * @Date: 2026/3/24 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PageResult { + + /** + * 当前页数据列表 + */ + private List records; + + /** + * 总记录数 + */ + private long total; + + /** + * 当前页码(从1开始) + */ + private int page; + + /** + * 每页大小 + */ + private int size; + + /** + * 总页数 + */ + private int totalPages; + + /** + * 是否有下一页 + */ + private boolean hasNext; + + /** + * 是否有上一页 + */ + private boolean hasPrevious; + + /** + * 是否为第一页 + */ + private boolean isFirst; + + /** + * 是否为最后一页 + */ + private boolean isLast; + + /** + * 构建分页结果 + * @param records 当前页数据 + * @param total 总记录数 + * @param page 当前页码 + * @param size 每页大小 + * @param 数据类型 + * @return 分页结果 + */ + public static PageResult of(List records, long total, int page, int size) { + int totalPages = (int) Math.ceil((double) total / size); + + return PageResult.builder() + .records(records) + .total(total) + .page(page) + .size(size) + .totalPages(totalPages) + .hasNext(page < totalPages) + .hasPrevious(page > 1) + .isFirst(page == 1) + .isLast(page >= totalPages) + .build(); + } + + /** + * 空分页结果 + * @param page 当前页码 + * @param size 每页大小 + * @param 数据类型 + * @return 空分页结果 + */ + public static PageResult empty(int page, int size) { + return PageResult.builder() + .records(List.of()) + .total(0L) + .page(page) + .size(size) + .totalPages(0) + .hasNext(false) + .hasPrevious(page > 1) + .isFirst(page == 1) + .isLast(true) + .build(); + } +} \ No newline at end of file diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java new file mode 100644 index 0000000..04d6cad --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java @@ -0,0 +1,48 @@ +package org.nl.iot.modular.influxdb.entity; + +import com.influxdb.annotations.Column; +import com.influxdb.annotations.Measurement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Instant; + +/** + * plc信号实体 + * @Author: liyongde + * @Date: 2026/3/24 10:53 + * @Modified By: + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Measurement(name = "plc_signal") +public class PlcSignal { + + /** + * 设备名称(Tag索引) + */ + @Column(tag = true, name = "device") + private String device; + + /** + * 信号点位/标签(Tag索引) + */ + @Column(tag = true, name = "tag") + private String tag; + + /** + * 信号值(Field,存变化数据) + */ + @Column(name = "value") + private String value; + + /** + * 变化时间戳(InfluxDB自动维护) + */ + @Column(timestamp = true) + private Instant changeTime; +} diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java new file mode 100644 index 0000000..988ae5e --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java @@ -0,0 +1,28 @@ +package org.nl.iot.modular.influxdb.service; + +import org.nl.iot.core.driver.entity.RValue; + +import java.util.List; + +/** + * influxdb存储信号变化值 + * @Author: liyongde + * @Date: 2026/3/24 13:11 + */ +public interface PlcSignalService { + + /** + * 存储变化值 + * @param device + * @param tag + * @param newValue + * @return + */ + void savePlcChangeSignal(String device, String tag, String newValue); + + /** + * 批量存储 + * @param needSaveValues + */ + void batchSaveSignalToInfluxDB(List needSaveValues); +} diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java new file mode 100644 index 0000000..58734c6 --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java @@ -0,0 +1,52 @@ +package org.nl.iot.modular.influxdb.service.impl; + +import jakarta.annotation.Resource; +import org.nl.iot.core.driver.bo.SiteBO; +import org.nl.iot.core.driver.entity.RValue; +import org.nl.iot.core.influxDB.InfluxDB2Util; +import org.nl.iot.modular.influxdb.entity.PlcSignal; +import org.nl.iot.modular.influxdb.service.PlcSignalService; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +/** + * + * @Author: liyongde + * @Date: 2026/3/24 13:11 + */ +@Service +public class PlcSignalServiceImpl implements PlcSignalService { + + @Resource + private InfluxDB2Util influxDB2Util; + + @Override + public void savePlcChangeSignal(String device, String tag, String newValue) { + PlcSignal plcSignal = PlcSignal.builder() + .device(device) + .tag(tag) + .value(newValue) + .changeTime(Instant.now()) + .build(); + influxDB2Util.insertOne(plcSignal); + } + + @Override + public void batchSaveSignalToInfluxDB(List needSaveValues) { + List plcSignals = new ArrayList<>(); + for (RValue needSaveValue : needSaveValues) { + SiteBO siteBO = needSaveValue.getSiteBO(); + PlcSignal plcSignal = PlcSignal.builder() + .device(siteBO.getDeviceCode()) + .tag(siteBO.getAlias()) + .value(needSaveValue.getValue()) + .changeTime(Instant.now()) + .build(); + plcSignals.add(plcSignal); + } + + } +} diff --git a/nl-web-app/src/main/resources/application.yml b/nl-web-app/src/main/resources/application.yml index bb82c0d..e9f3522 100644 --- a/nl-web-app/src/main/resources/application.yml +++ b/nl-web-app/src/main/resources/application.yml @@ -177,3 +177,13 @@ file: # 文件大小 /M maxSize: 100 avatarMaxSize: 5 +# InfluxDB 2.x 核心配置 +influx: + # 服务地址 + url: http://117.72.202.142:15964/ + # 初始化生成的All-Access令牌 + token: mr8DZXza-WvBtHhK51Jnfh0tNlq_rLyVYW4D9Zyz0IN66RM9KqgS8agV4XWtXekCgGhG866Dtb9Jv0t-N5D0Gw== + # 组织名称 + org: acs + # 数据桶名称(替代1.x的database) + bucket: signals \ No newline at end of file diff --git a/pom.xml b/pom.xml index e5fcb3c..ea6c0da 100644 --- a/pom.xml +++ b/pom.xml @@ -451,6 +451,47 @@ mssql-jdbc 12.4.2.jre11 --> + + + + org.eclipse.milo + sdk-client + 0.6.16 + + + + + org.jinterop + j-interop + 2.0.4 + + + + org.apache.plc4x + plc4j-api + 0.13.0 + + + + org.apache.plc4x + plc4j-driver-s7 + 0.13.0 + runtime + + + + org.apache.plc4x + plc4j-driver-modbus + 0.13.0 + runtime + + + + + com.influxdb + influxdb-client-java + 6.11.0 +