feat: 新增influxdb数据库

This commit is contained in:
2026-03-24 13:50:50 +08:00
parent 1ff30fd30d
commit ae95d4dfa3
10 changed files with 512 additions and 10 deletions

View File

@@ -28,35 +28,36 @@
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
<version>0.6.16</version>
</dependency>
<!-- Java COM opc da 相关 -->
<dependency>
<groupId>org.jinterop</groupId>
<artifactId>j-interop</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-api</artifactId>
<version>0.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-s7</artifactId>
<version>0.13.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-modbus</artifactId>
<version>0.13.0</version>
<scope>runtime</scope>
</dependency>
<!-- InfluxDB 2.x 官方Java客户端 -->
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,11 +1,13 @@
package org.nl.iot.core.cache;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.nl.iot.core.driver.bo.ConnectSiteDO;
import org.nl.iot.core.driver.bo.DeviceConnectionBO;
import org.nl.iot.core.driver.bo.DeviceInfoReadCacheDo;
import org.nl.iot.core.driver.bo.SiteBO;
import org.nl.iot.core.driver.entity.RValue;
import org.nl.iot.modular.influxdb.service.PlcSignalService;
import org.nl.iot.modular.iot.service.IotConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
@@ -29,6 +31,9 @@ public class MetadataCacheManager implements CommandLineRunner {
@Autowired
private IotConfigService configService;
@Autowired
private PlcSignalService plcSignalService;
private final Map<String, DeviceInfoReadCacheDo> deviceInfoCache = new ConcurrentHashMap<>();
/**
@@ -183,6 +188,14 @@ public class MetadataCacheManager implements CommandLineRunner {
// ==================== 信号值缓存方法 ====================
/**
* 存储单个信号值
* @param key connectCode.deviceCode.信号别名
* @param rValue 信号值对象
*/
public void putSignalValue(String key, RValue rValue) {
signalValueCache.put(key, rValue);
}
/**
* 存储单个信号值
* @param deviceKey connectCode.deviceCode
@@ -203,13 +216,22 @@ public class MetadataCacheManager implements CommandLineRunner {
if (rValues == null || rValues.isEmpty()) {
return;
}
for (RValue rValue : rValues) {
if (rValue != null && rValue.getSiteBO() != null) {
String alias = rValue.getSiteBO().getAlias();
putSignalValue(deviceKey, alias, rValue);
Map<String, RValue> cacheSnapshot = getAllSignalValues();
List<RValue> needSaveValues = new ArrayList<>();
for (RValue newRValue : rValues) {
if (newRValue != null && newRValue.getSiteBO() != null) {
String alias = newRValue.getSiteBO().getAlias();
String key = deviceKey + "." + alias;
RValue oldValue = cacheSnapshot.get(key);
if (ObjectUtil.isNotEmpty(oldValue) && oldValue.getValue().equals(newRValue.getValue())) {
continue;
}
putSignalValue(key, newRValue);
needSaveValues.add(newRValue);
}
}
// 批量存储到influxDB
plcSignalService.batchSaveSignalToInfluxDB(needSaveValues);
}
/**

View File

@@ -0,0 +1,83 @@
package org.nl.iot.core.influxDB;
import com.influxdb.client.*;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* influx2.0
* @Author: liyongde
* @Date: 2026/3/24 10:38
* @Modified By:
*/
@Slf4j
@Configuration
public class InfluxDB2Config {
@Value("${influx.url}")
private String url;
@Value("${influx.token}")
private String token;
@Value("${influx.org}")
private String org;
@Value("${influx.bucket}")
private String bucket;
/**
* 初始化InfluxDB 2.x客户端
* 修复通过OkHttpClient配置超时替代不存在的set超时方法
*/
@Bean
public InfluxDBClient influxDBClient() {
// 创建客户端传入自定义OkHttp配置
InfluxDBClient client = InfluxDBClientFactory.create(
url,
token.toCharArray(),
org,
bucket
);
// 测试连接
client.ping();
log.info("InfluxDB 2.x 连接成功");
return client;
}
/**
* 写入API开启批量写入提升性能
*/
@Bean
public WriteApi writeApi(InfluxDBClient client) {
// 批量写入配置缓冲1000条、500ms刷新、最大5000条
WriteOptions writeOptions = WriteOptions.builder()
.batchSize(1000)
.flushInterval(500)
.bufferLimit(5000)
.build();
return client.makeWriteApi(writeOptions);
}
/**
* 查询API
*/
@Bean
public QueryApi queryApi(InfluxDBClient client) {
return client.getQueryApi();
}
/**
* 项目关闭时销毁客户端
*/
@PreDestroy
public void destroy() {
log.info("InfluxDB 2.x 连接关闭");
influxDBClient().close();
}
}

View File

@@ -0,0 +1,105 @@
package org.nl.iot.core.influxDB;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import jakarta.annotation.Resource;
import org.nl.iot.modular.influxdb.entity.PlcSignal;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
/**
* 工具类
* @Author: liyongde
* @Date: 2026/3/24 10:43
* @Modified By:
*/
@Component
public class InfluxDB2Util {
@Value("${influx.bucket}")
private String bucket;
@Value("${influx.org}")
private String org;
@Resource
private WriteApi writeApi;
@Resource
private QueryApi queryApi;
/**
* 批量写入PLC信号
*/
public void insertBatch(List<PlcSignal> datas) {
writeApi.writeMeasurements(WritePrecision.NS, datas);
}
/**
* 单条写入PLC信号
*/
public void insertOne(PlcSignal data) {
insertOne(bucket, org, data);
}
/**
* 单条写入PLC信号
*/
public void insertOne(String bucket, String org, PlcSignal data) {
writeApi.writeMeasurement(bucket, org, WritePrecision.NS, data);
}
/**
* PLC专用Flux查询结果转换
*/
public List<PlcSignal> queryPlcFlux(String flux) {
List<FluxTable> tables = queryApi.query(flux);
return tables.stream()
.flatMap(table -> table.getRecords().stream())
.map(this::convertToPlcEntity)
.collect(Collectors.toList());
}
/**
* 执行统计查询,返回总数
* @param flux Flux查询语句
* @return 统计结果
*/
public long queryCount(String flux) {
List<FluxTable> tables = queryApi.query(flux);
if (tables.isEmpty()) {
return 0L;
}
// 获取count结果
return tables.stream()
.flatMap(table -> table.getRecords().stream())
.mapToLong(record -> {
Object value = record.getValue();
if (value instanceof Number) {
return ((Number) value).longValue();
}
return 0L;
})
.sum();
}
/**
* Flux结果转PLC实体
*/
private PlcSignal convertToPlcEntity(FluxRecord record) {
return PlcSignal.builder()
.device(record.getValueByKey("device").toString())
.tag(record.getValueByKey("tag").toString())
.value(record.getValueByKey("value").toString())
.changeTime(record.getTime())
.build();
}
}

View File

@@ -0,0 +1,112 @@
package org.nl.iot.modular.influxdb.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* 分页结果封装类
* @param <T> 数据类型
* @Author: liyongde
* @Date: 2026/3/24
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PageResult<T> {
/**
* 当前页数据列表
*/
private List<T> records;
/**
* 总记录数
*/
private long total;
/**
* 当前页码从1开始
*/
private int page;
/**
* 每页大小
*/
private int size;
/**
* 总页数
*/
private int totalPages;
/**
* 是否有下一页
*/
private boolean hasNext;
/**
* 是否有上一页
*/
private boolean hasPrevious;
/**
* 是否为第一页
*/
private boolean isFirst;
/**
* 是否为最后一页
*/
private boolean isLast;
/**
* 构建分页结果
* @param records 当前页数据
* @param total 总记录数
* @param page 当前页码
* @param size 每页大小
* @param <T> 数据类型
* @return 分页结果
*/
public static <T> PageResult<T> of(List<T> records, long total, int page, int size) {
int totalPages = (int) Math.ceil((double) total / size);
return PageResult.<T>builder()
.records(records)
.total(total)
.page(page)
.size(size)
.totalPages(totalPages)
.hasNext(page < totalPages)
.hasPrevious(page > 1)
.isFirst(page == 1)
.isLast(page >= totalPages)
.build();
}
/**
* 空分页结果
* @param page 当前页码
* @param size 每页大小
* @param <T> 数据类型
* @return 空分页结果
*/
public static <T> PageResult<T> empty(int page, int size) {
return PageResult.<T>builder()
.records(List.of())
.total(0L)
.page(page)
.size(size)
.totalPages(0)
.hasNext(false)
.hasPrevious(page > 1)
.isFirst(page == 1)
.isLast(true)
.build();
}
}

View File

@@ -0,0 +1,48 @@
package org.nl.iot.modular.influxdb.entity;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
/**
* plc信号实体
* @Author: liyongde
* @Date: 2026/3/24 10:53
* @Modified By:
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Measurement(name = "plc_signal")
public class PlcSignal {
/**
* 设备名称Tag索引
*/
@Column(tag = true, name = "device")
private String device;
/**
* 信号点位/标签Tag索引
*/
@Column(tag = true, name = "tag")
private String tag;
/**
* 信号值Field存变化数据
*/
@Column(name = "value")
private String value;
/**
* 变化时间戳InfluxDB自动维护
*/
@Column(timestamp = true)
private Instant changeTime;
}

View File

@@ -0,0 +1,28 @@
package org.nl.iot.modular.influxdb.service;
import org.nl.iot.core.driver.entity.RValue;
import java.util.List;
/**
* influxdb存储信号变化值
* @Author: liyongde
* @Date: 2026/3/24 13:11
*/
public interface PlcSignalService {
/**
* 存储变化值
* @param device
* @param tag
* @param newValue
* @return
*/
void savePlcChangeSignal(String device, String tag, String newValue);
/**
* 批量存储
* @param needSaveValues
*/
void batchSaveSignalToInfluxDB(List<RValue> needSaveValues);
}

View File

@@ -0,0 +1,52 @@
package org.nl.iot.modular.influxdb.service.impl;
import jakarta.annotation.Resource;
import org.nl.iot.core.driver.bo.SiteBO;
import org.nl.iot.core.driver.entity.RValue;
import org.nl.iot.core.influxDB.InfluxDB2Util;
import org.nl.iot.modular.influxdb.entity.PlcSignal;
import org.nl.iot.modular.influxdb.service.PlcSignalService;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
*
* @Author: liyongde
* @Date: 2026/3/24 13:11
*/
@Service
public class PlcSignalServiceImpl implements PlcSignalService {
@Resource
private InfluxDB2Util influxDB2Util;
@Override
public void savePlcChangeSignal(String device, String tag, String newValue) {
PlcSignal plcSignal = PlcSignal.builder()
.device(device)
.tag(tag)
.value(newValue)
.changeTime(Instant.now())
.build();
influxDB2Util.insertOne(plcSignal);
}
@Override
public void batchSaveSignalToInfluxDB(List<RValue> needSaveValues) {
List<PlcSignal> plcSignals = new ArrayList<>();
for (RValue needSaveValue : needSaveValues) {
SiteBO siteBO = needSaveValue.getSiteBO();
PlcSignal plcSignal = PlcSignal.builder()
.device(siteBO.getDeviceCode())
.tag(siteBO.getAlias())
.value(needSaveValue.getValue())
.changeTime(Instant.now())
.build();
plcSignals.add(plcSignal);
}
}
}