diff --git a/nl-iot/pom.xml b/nl-iot/pom.xml
index 5a0ac2e..fcded45 100644
--- a/nl-iot/pom.xml
+++ b/nl-iot/pom.xml
@@ -28,35 +28,36 @@
org.eclipse.milo
sdk-client
- 0.6.16
org.jinterop
j-interop
- 2.0.4
org.apache.plc4x
plc4j-api
- 0.13.0
org.apache.plc4x
plc4j-driver-s7
- 0.13.0
runtime
org.apache.plc4x
plc4j-driver-modbus
- 0.13.0
runtime
+
+
+
+ com.influxdb
+ influxdb-client-java
+
\ No newline at end of file
diff --git a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java
index 3b432b4..26d8e52 100644
--- a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java
+++ b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java
@@ -1,11 +1,13 @@
package org.nl.iot.core.cache;
+import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.nl.iot.core.driver.bo.ConnectSiteDO;
import org.nl.iot.core.driver.bo.DeviceConnectionBO;
import org.nl.iot.core.driver.bo.DeviceInfoReadCacheDo;
import org.nl.iot.core.driver.bo.SiteBO;
import org.nl.iot.core.driver.entity.RValue;
+import org.nl.iot.modular.influxdb.service.PlcSignalService;
import org.nl.iot.modular.iot.service.IotConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
@@ -29,6 +31,9 @@ public class MetadataCacheManager implements CommandLineRunner {
@Autowired
private IotConfigService configService;
+ @Autowired
+ private PlcSignalService plcSignalService;
+
private final Map deviceInfoCache = new ConcurrentHashMap<>();
/**
@@ -183,6 +188,14 @@ public class MetadataCacheManager implements CommandLineRunner {
// ==================== 信号值缓存方法 ====================
+ /**
+ * 存储单个信号值
+ * @param key connectCode.deviceCode.信号别名
+ * @param rValue 信号值对象
+ */
+ public void putSignalValue(String key, RValue rValue) {
+ signalValueCache.put(key, rValue);
+ }
/**
* 存储单个信号值
* @param deviceKey connectCode.deviceCode
@@ -203,13 +216,22 @@ public class MetadataCacheManager implements CommandLineRunner {
if (rValues == null || rValues.isEmpty()) {
return;
}
-
- for (RValue rValue : rValues) {
- if (rValue != null && rValue.getSiteBO() != null) {
- String alias = rValue.getSiteBO().getAlias();
- putSignalValue(deviceKey, alias, rValue);
+ Map cacheSnapshot = getAllSignalValues();
+ List needSaveValues = new ArrayList<>();
+ for (RValue newRValue : rValues) {
+ if (newRValue != null && newRValue.getSiteBO() != null) {
+ String alias = newRValue.getSiteBO().getAlias();
+ String key = deviceKey + "." + alias;
+ RValue oldValue = cacheSnapshot.get(key);
+ if (ObjectUtil.isNotEmpty(oldValue) && oldValue.getValue().equals(newRValue.getValue())) {
+ continue;
+ }
+ putSignalValue(key, newRValue);
+ needSaveValues.add(newRValue);
}
}
+ // 批量存储到influxDB
+ plcSignalService.batchSaveSignalToInfluxDB(needSaveValues);
}
/**
diff --git a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java
new file mode 100644
index 0000000..8c83f66
--- /dev/null
+++ b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java
@@ -0,0 +1,83 @@
+package org.nl.iot.core.influxDB;
+
+import com.influxdb.client.*;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * influx2.0
+ * @Author: liyongde
+ * @Date: 2026/3/24 10:38
+ * @Modified By:
+ */
+@Slf4j
+@Configuration
+public class InfluxDB2Config {
+
+ @Value("${influx.url}")
+ private String url;
+
+ @Value("${influx.token}")
+ private String token;
+
+ @Value("${influx.org}")
+ private String org;
+
+ @Value("${influx.bucket}")
+ private String bucket;
+
+ /**
+ * 初始化InfluxDB 2.x客户端
+ * 修复:通过OkHttpClient配置超时,替代不存在的set超时方法
+ */
+ @Bean
+ public InfluxDBClient influxDBClient() {
+
+ // 创建客户端,传入自定义OkHttp配置
+ InfluxDBClient client = InfluxDBClientFactory.create(
+ url,
+ token.toCharArray(),
+ org,
+ bucket
+ );
+
+ // 测试连接
+ client.ping();
+ log.info("InfluxDB 2.x 连接成功");
+ return client;
+ }
+
+ /**
+ * 写入API(开启批量写入,提升性能)
+ */
+ @Bean
+ public WriteApi writeApi(InfluxDBClient client) {
+ // 批量写入配置:缓冲1000条、500ms刷新、最大5000条
+ WriteOptions writeOptions = WriteOptions.builder()
+ .batchSize(1000)
+ .flushInterval(500)
+ .bufferLimit(5000)
+ .build();
+ return client.makeWriteApi(writeOptions);
+ }
+
+ /**
+ * 查询API
+ */
+ @Bean
+ public QueryApi queryApi(InfluxDBClient client) {
+ return client.getQueryApi();
+ }
+
+ /**
+ * 项目关闭时销毁客户端
+ */
+ @PreDestroy
+ public void destroy() {
+ log.info("InfluxDB 2.x 连接关闭");
+ influxDBClient().close();
+ }
+}
\ No newline at end of file
diff --git a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java
new file mode 100644
index 0000000..301f8c6
--- /dev/null
+++ b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java
@@ -0,0 +1,105 @@
+package org.nl.iot.core.influxDB;
+
+import com.influxdb.client.QueryApi;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.query.FluxRecord;
+import com.influxdb.query.FluxTable;
+import jakarta.annotation.Resource;
+import org.nl.iot.modular.influxdb.entity.PlcSignal;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 工具类
+ * @Author: liyongde
+ * @Date: 2026/3/24 10:43
+ * @Modified By:
+ */
+@Component
+public class InfluxDB2Util {
+
+ @Value("${influx.bucket}")
+ private String bucket;
+
+ @Value("${influx.org}")
+ private String org;
+
+ @Resource
+ private WriteApi writeApi;
+
+ @Resource
+ private QueryApi queryApi;
+
+ /**
+ * 批量写入PLC信号
+ */
+ public void insertBatch(List datas) {
+ writeApi.writeMeasurements(WritePrecision.NS, datas);
+ }
+
+
+ /**
+ * 单条写入PLC信号
+ */
+ public void insertOne(PlcSignal data) {
+ insertOne(bucket, org, data);
+ }
+
+ /**
+ * 单条写入PLC信号
+ */
+ public void insertOne(String bucket, String org, PlcSignal data) {
+ writeApi.writeMeasurement(bucket, org, WritePrecision.NS, data);
+ }
+
+ /**
+ * PLC专用Flux查询结果转换
+ */
+ public List queryPlcFlux(String flux) {
+ List tables = queryApi.query(flux);
+ return tables.stream()
+ .flatMap(table -> table.getRecords().stream())
+ .map(this::convertToPlcEntity)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * 执行统计查询,返回总数
+ * @param flux Flux查询语句
+ * @return 统计结果
+ */
+ public long queryCount(String flux) {
+ List tables = queryApi.query(flux);
+ if (tables.isEmpty()) {
+ return 0L;
+ }
+
+ // 获取count结果
+ return tables.stream()
+ .flatMap(table -> table.getRecords().stream())
+ .mapToLong(record -> {
+ Object value = record.getValue();
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+ return 0L;
+ })
+ .sum();
+ }
+
+ /**
+ * Flux结果转PLC实体
+ */
+ private PlcSignal convertToPlcEntity(FluxRecord record) {
+ return PlcSignal.builder()
+ .device(record.getValueByKey("device").toString())
+ .tag(record.getValueByKey("tag").toString())
+ .value(record.getValueByKey("value").toString())
+ .changeTime(record.getTime())
+ .build();
+ }
+}
diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PageResult.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PageResult.java
new file mode 100644
index 0000000..fd2f9a7
--- /dev/null
+++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PageResult.java
@@ -0,0 +1,112 @@
+package org.nl.iot.modular.influxdb.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 分页结果封装类
+ * @param 数据类型
+ * @Author: liyongde
+ * @Date: 2026/3/24
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PageResult {
+
+ /**
+ * 当前页数据列表
+ */
+ private List records;
+
+ /**
+ * 总记录数
+ */
+ private long total;
+
+ /**
+ * 当前页码(从1开始)
+ */
+ private int page;
+
+ /**
+ * 每页大小
+ */
+ private int size;
+
+ /**
+ * 总页数
+ */
+ private int totalPages;
+
+ /**
+ * 是否有下一页
+ */
+ private boolean hasNext;
+
+ /**
+ * 是否有上一页
+ */
+ private boolean hasPrevious;
+
+ /**
+ * 是否为第一页
+ */
+ private boolean isFirst;
+
+ /**
+ * 是否为最后一页
+ */
+ private boolean isLast;
+
+ /**
+ * 构建分页结果
+ * @param records 当前页数据
+ * @param total 总记录数
+ * @param page 当前页码
+ * @param size 每页大小
+ * @param 数据类型
+ * @return 分页结果
+ */
+ public static PageResult of(List records, long total, int page, int size) {
+ int totalPages = (int) Math.ceil((double) total / size);
+
+ return PageResult.builder()
+ .records(records)
+ .total(total)
+ .page(page)
+ .size(size)
+ .totalPages(totalPages)
+ .hasNext(page < totalPages)
+ .hasPrevious(page > 1)
+ .isFirst(page == 1)
+ .isLast(page >= totalPages)
+ .build();
+ }
+
+ /**
+ * 空分页结果
+ * @param page 当前页码
+ * @param size 每页大小
+ * @param 数据类型
+ * @return 空分页结果
+ */
+ public static PageResult empty(int page, int size) {
+ return PageResult.builder()
+ .records(List.of())
+ .total(0L)
+ .page(page)
+ .size(size)
+ .totalPages(0)
+ .hasNext(false)
+ .hasPrevious(page > 1)
+ .isFirst(page == 1)
+ .isLast(true)
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java
new file mode 100644
index 0000000..04d6cad
--- /dev/null
+++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java
@@ -0,0 +1,48 @@
+package org.nl.iot.modular.influxdb.entity;
+
+import com.influxdb.annotations.Column;
+import com.influxdb.annotations.Measurement;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Instant;
+
+/**
+ * plc信号实体
+ * @Author: liyongde
+ * @Date: 2026/3/24 10:53
+ * @Modified By:
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@Measurement(name = "plc_signal")
+public class PlcSignal {
+
+ /**
+ * 设备名称(Tag索引)
+ */
+ @Column(tag = true, name = "device")
+ private String device;
+
+ /**
+ * 信号点位/标签(Tag索引)
+ */
+ @Column(tag = true, name = "tag")
+ private String tag;
+
+ /**
+ * 信号值(Field,存变化数据)
+ */
+ @Column(name = "value")
+ private String value;
+
+ /**
+ * 变化时间戳(InfluxDB自动维护)
+ */
+ @Column(timestamp = true)
+ private Instant changeTime;
+}
diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java
new file mode 100644
index 0000000..988ae5e
--- /dev/null
+++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java
@@ -0,0 +1,28 @@
+package org.nl.iot.modular.influxdb.service;
+
+import org.nl.iot.core.driver.entity.RValue;
+
+import java.util.List;
+
+/**
+ * influxdb存储信号变化值
+ * @Author: liyongde
+ * @Date: 2026/3/24 13:11
+ */
+public interface PlcSignalService {
+
+ /**
+ * 存储变化值
+ * @param device
+ * @param tag
+ * @param newValue
+ * @return
+ */
+ void savePlcChangeSignal(String device, String tag, String newValue);
+
+ /**
+ * 批量存储
+ * @param needSaveValues
+ */
+ void batchSaveSignalToInfluxDB(List needSaveValues);
+}
diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java
new file mode 100644
index 0000000..58734c6
--- /dev/null
+++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java
@@ -0,0 +1,52 @@
+package org.nl.iot.modular.influxdb.service.impl;
+
+import jakarta.annotation.Resource;
+import org.nl.iot.core.driver.bo.SiteBO;
+import org.nl.iot.core.driver.entity.RValue;
+import org.nl.iot.core.influxDB.InfluxDB2Util;
+import org.nl.iot.modular.influxdb.entity.PlcSignal;
+import org.nl.iot.modular.influxdb.service.PlcSignalService;
+import org.springframework.stereotype.Service;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * @Author: liyongde
+ * @Date: 2026/3/24 13:11
+ */
+@Service
+public class PlcSignalServiceImpl implements PlcSignalService {
+
+ @Resource
+ private InfluxDB2Util influxDB2Util;
+
+ @Override
+ public void savePlcChangeSignal(String device, String tag, String newValue) {
+ PlcSignal plcSignal = PlcSignal.builder()
+ .device(device)
+ .tag(tag)
+ .value(newValue)
+ .changeTime(Instant.now())
+ .build();
+ influxDB2Util.insertOne(plcSignal);
+ }
+
+ @Override
+ public void batchSaveSignalToInfluxDB(List needSaveValues) {
+ List plcSignals = new ArrayList<>();
+ for (RValue needSaveValue : needSaveValues) {
+ SiteBO siteBO = needSaveValue.getSiteBO();
+ PlcSignal plcSignal = PlcSignal.builder()
+ .device(siteBO.getDeviceCode())
+ .tag(siteBO.getAlias())
+ .value(needSaveValue.getValue())
+ .changeTime(Instant.now())
+ .build();
+ plcSignals.add(plcSignal);
+ }
+
+ }
+}
diff --git a/nl-web-app/src/main/resources/application.yml b/nl-web-app/src/main/resources/application.yml
index bb82c0d..e9f3522 100644
--- a/nl-web-app/src/main/resources/application.yml
+++ b/nl-web-app/src/main/resources/application.yml
@@ -177,3 +177,13 @@ file:
# 文件大小 /M
maxSize: 100
avatarMaxSize: 5
+# InfluxDB 2.x 核心配置
+influx:
+ # 服务地址
+ url: http://117.72.202.142:15964/
+ # 初始化生成的All-Access令牌
+ token: mr8DZXza-WvBtHhK51Jnfh0tNlq_rLyVYW4D9Zyz0IN66RM9KqgS8agV4XWtXekCgGhG866Dtb9Jv0t-N5D0Gw==
+ # 组织名称
+ org: acs
+ # 数据桶名称(替代1.x的database)
+ bucket: signals
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e5fcb3c..ea6c0da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -451,6 +451,47 @@
mssql-jdbc
12.4.2.jre11
-->
+
+
+
+ org.eclipse.milo
+ sdk-client
+ 0.6.16
+
+
+
+
+ org.jinterop
+ j-interop
+ 2.0.4
+
+
+
+ org.apache.plc4x
+ plc4j-api
+ 0.13.0
+
+
+
+ org.apache.plc4x
+ plc4j-driver-s7
+ 0.13.0
+ runtime
+
+
+
+ org.apache.plc4x
+ plc4j-driver-modbus
+ 0.13.0
+ runtime
+
+
+
+
+ com.influxdb
+ influxdb-client-java
+ 6.11.0
+