fix: influxdb数据库

This commit is contained in:
2026-03-24 14:47:52 +08:00
parent ae95d4dfa3
commit de0c031b2b
9 changed files with 310 additions and 16 deletions

View File

@@ -0,0 +1,56 @@
package org.nl.iot.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 异步任务配置
*
* @author lyd
* @date 2026/03/24
*/
@Slf4j
@Configuration
public class AsyncConfig implements AsyncConfigurer {
/**
* InfluxDB异步写入线程池
*/
@Bean("influxDbAsyncExecutor")
public Executor influxDbAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(2);
// 最大线程数
executor.setMaxPoolSize(8);
// 队列容量
executor.setQueueCapacity(200);
// 线程名前缀
executor.setThreadNamePrefix("InfluxDB-Async-");
// 线程空闲时间
executor.setKeepAliveSeconds(60);
// 拒绝策略:调用者运行策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
executor.setAwaitTerminationSeconds(60);
executor.initialize();
log.info("InfluxDB异步线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: {}",
executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity());
return executor;
}
@Override
public Executor getAsyncExecutor() {
return influxDbAsyncExecutor();
}
}

View File

@@ -230,8 +230,14 @@ public class MetadataCacheManager implements CommandLineRunner {
needSaveValues.add(newRValue); needSaveValues.add(newRValue);
} }
} }
// 批量存储到influxDB // 异步批量存储到influxDB
plcSignalService.batchSaveSignalToInfluxDB(needSaveValues); if (!needSaveValues.isEmpty()) {
plcSignalService.batchSaveSignalToInfluxDBAsync(needSaveValues)
.exceptionally(throwable -> {
log.error("异步保存信号到InfluxDB失败设备: {}, 数量: {}", deviceKey, needSaveValues.size(), throwable);
return null;
});
}
} }
/** /**

View File

@@ -2,7 +2,6 @@ package org.nl.iot.core.influxDB;
import com.influxdb.client.*; import com.influxdb.client.*;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@@ -13,7 +12,6 @@ import org.springframework.context.annotation.Configuration;
* @Date: 2026/3/24 10:38 * @Date: 2026/3/24 10:38
* @Modified By: * @Modified By:
*/ */
@Slf4j
@Configuration @Configuration
public class InfluxDB2Config { public class InfluxDB2Config {
@@ -29,6 +27,8 @@ public class InfluxDB2Config {
@Value("${influx.bucket}") @Value("${influx.bucket}")
private String bucket; private String bucket;
private InfluxDBClient influxDBClient;
/** /**
* 初始化InfluxDB 2.x客户端 * 初始化InfluxDB 2.x客户端
* 修复通过OkHttpClient配置超时替代不存在的set超时方法 * 修复通过OkHttpClient配置超时替代不存在的set超时方法
@@ -37,7 +37,7 @@ public class InfluxDB2Config {
public InfluxDBClient influxDBClient() { public InfluxDBClient influxDBClient() {
// 创建客户端传入自定义OkHttp配置 // 创建客户端传入自定义OkHttp配置
InfluxDBClient client = InfluxDBClientFactory.create( this.influxDBClient = InfluxDBClientFactory.create(
url, url,
token.toCharArray(), token.toCharArray(),
org, org,
@@ -45,9 +45,8 @@ public class InfluxDB2Config {
); );
// 测试连接 // 测试连接
client.ping(); influxDBClient.ping();
log.info("InfluxDB 2.x 连接成功"); return influxDBClient;
return client;
} }
/** /**
@@ -77,7 +76,8 @@ public class InfluxDB2Config {
*/ */
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {
log.info("InfluxDB 2.x 连接关闭"); if (influxDBClient != null) {
influxDBClient().close(); influxDBClient.close();
}
} }
} }

View File

@@ -5,8 +5,8 @@ import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.domain.WritePrecision;
import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable; import com.influxdb.query.FluxTable;
import jakarta.annotation.Resource;
import org.nl.iot.modular.influxdb.entity.PlcSignal; import org.nl.iot.modular.influxdb.entity.PlcSignal;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@@ -28,10 +28,10 @@ public class InfluxDB2Util {
@Value("${influx.org}") @Value("${influx.org}")
private String org; private String org;
@Resource @Autowired
private WriteApi writeApi; private WriteApi writeApi;
@Resource @Autowired
private QueryApi queryApi; private QueryApi queryApi;
/** /**

View File

@@ -1,8 +1,10 @@
package org.nl.iot.modular.influxdb.service; package org.nl.iot.modular.influxdb.service;
import org.nl.iot.core.driver.entity.RValue; import org.nl.iot.core.driver.entity.RValue;
import org.springframework.scheduling.annotation.Async;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
/** /**
* influxdb存储信号变化值 * influxdb存储信号变化值
@@ -21,8 +23,16 @@ public interface PlcSignalService {
void savePlcChangeSignal(String device, String tag, String newValue); void savePlcChangeSignal(String device, String tag, String newValue);
/** /**
* 批量存储 * 批量存储(同步)
* @param needSaveValues * @param needSaveValues
*/ */
void batchSaveSignalToInfluxDB(List<RValue> needSaveValues); void batchSaveSignalToInfluxDB(List<RValue> needSaveValues);
/**
* 批量存储(异步)
* @param needSaveValues
* @return CompletableFuture<Void>
*/
@Async("influxDbAsyncExecutor")
CompletableFuture<Void> batchSaveSignalToInfluxDBAsync(List<RValue> needSaveValues);
} }

View File

@@ -1,26 +1,30 @@
package org.nl.iot.modular.influxdb.service.impl; package org.nl.iot.modular.influxdb.service.impl;
import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j;
import org.nl.iot.core.driver.bo.SiteBO; import org.nl.iot.core.driver.bo.SiteBO;
import org.nl.iot.core.driver.entity.RValue; import org.nl.iot.core.driver.entity.RValue;
import org.nl.iot.core.influxDB.InfluxDB2Util; import org.nl.iot.core.influxDB.InfluxDB2Util;
import org.nl.iot.modular.influxdb.entity.PlcSignal; import org.nl.iot.modular.influxdb.entity.PlcSignal;
import org.nl.iot.modular.influxdb.service.PlcSignalService; import org.nl.iot.modular.influxdb.service.PlcSignalService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
/** /**
* *
* @Author: liyongde * @Author: liyongde
* @Date: 2026/3/24 13:11 * @Date: 2026/3/24 13:11
*/ */
@Slf4j
@Service @Service
public class PlcSignalServiceImpl implements PlcSignalService { public class PlcSignalServiceImpl implements PlcSignalService {
@Resource @Autowired
private InfluxDB2Util influxDB2Util; private InfluxDB2Util influxDB2Util;
@Override @Override
@@ -36,6 +40,10 @@ public class PlcSignalServiceImpl implements PlcSignalService {
@Override @Override
public void batchSaveSignalToInfluxDB(List<RValue> needSaveValues) { public void batchSaveSignalToInfluxDB(List<RValue> needSaveValues) {
if (needSaveValues == null || needSaveValues.isEmpty()) {
return;
}
List<PlcSignal> plcSignals = new ArrayList<>(); List<PlcSignal> plcSignals = new ArrayList<>();
for (RValue needSaveValue : needSaveValues) { for (RValue needSaveValue : needSaveValues) {
SiteBO siteBO = needSaveValue.getSiteBO(); SiteBO siteBO = needSaveValue.getSiteBO();
@@ -47,6 +55,26 @@ public class PlcSignalServiceImpl implements PlcSignalService {
.build(); .build();
plcSignals.add(plcSignal); plcSignals.add(plcSignal);
} }
try {
influxDB2Util.insertBatch(plcSignals);
log.debug("批量保存信号到InfluxDB成功数量: {}", plcSignals.size());
} catch (Exception e) {
log.error("批量保存信号到InfluxDB失败数量: {}", plcSignals.size(), e);
throw e;
}
}
@Override
@Async("influxDbAsyncExecutor")
public CompletableFuture<Void> batchSaveSignalToInfluxDBAsync(List<RValue> needSaveValues) {
try {
batchSaveSignalToInfluxDB(needSaveValues);
log.debug("异步批量保存信号到InfluxDB成功数量: {}", needSaveValues.size());
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("异步批量保存信号到InfluxDB失败数量: {}", needSaveValues.size(), e);
return CompletableFuture.failedFuture(e);
}
} }
} }

View File

@@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@@ -33,6 +34,7 @@ import org.springframework.web.bind.annotation.RestController;
@RestController @RestController
@SpringBootApplication @SpringBootApplication
@EnableScheduling @EnableScheduling
@EnableAsync
public class Application { public class Application {
/* 解决druid 日志报错discard long time none received connection:xxx */ /* 解决druid 日志报错discard long time none received connection:xxx */

View File

@@ -0,0 +1,116 @@
package org.nl;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.nl.iot.core.driver.bo.SiteBO;
import org.nl.iot.core.driver.entity.RValue;
import org.nl.iot.modular.influxdb.service.PlcSignalService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
*
* @Author: liyongde
* @Date: 2026/3/24 13:58
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class AsyncInfluxDBTest {
@Autowired
private PlcSignalService plcSignalService;
/**
* 测试异步批量保存
*/
@Test
public void testAsyncBatchSave() {
log.info("开始测试异步批量保存...");
// 构造测试数据
List<RValue> testValues = createTestData();
// 记录开始时间
long startTime = System.currentTimeMillis();
// 异步保存
CompletableFuture<Void> future = plcSignalService.batchSaveSignalToInfluxDBAsync(testValues);
// 主线程继续执行其他任务
log.info("异步任务已提交,主线程继续执行其他任务...");
doOtherWork();
// 等待异步任务完成(可选)
future.whenComplete((result, throwable) -> {
long duration = System.currentTimeMillis() - startTime;
if (throwable != null) {
log.error("异步保存失败,耗时: {}ms", duration, throwable);
} else {
log.info("异步保存成功,耗时: {}ms", duration);
}
});
log.info("测试方法执行完成");
}
/**
* 对比同步和异步性能
*/
@Test
public void comparePerformance() {
List<RValue> testValues = createTestData();
// 测试同步方式
long syncStart = System.currentTimeMillis();
plcSignalService.batchSaveSignalToInfluxDB(testValues);
long syncDuration = System.currentTimeMillis() - syncStart;
log.info("同步保存耗时: {}ms", syncDuration);
// 测试异步方式
long asyncStart = System.currentTimeMillis();
CompletableFuture<Void> future = plcSignalService.batchSaveSignalToInfluxDBAsync(testValues);
long asyncSubmitDuration = System.currentTimeMillis() - asyncStart;
log.info("异步任务提交耗时: {}ms", asyncSubmitDuration);
// 等待异步任务完成
future.join();
long asyncTotalDuration = System.currentTimeMillis() - asyncStart;
log.info("异步任务总耗时: {}ms", asyncTotalDuration);
}
private List<RValue> createTestData() {
List<RValue> testValues = new ArrayList<>();
for (int i = 0; i < 100; i++) {
SiteBO siteBO = SiteBO.builder()
.deviceCode("TEST_DEVICE_" + (i % 10))
.alias("TEST_TAG_" + i)
.build();
RValue rValue = RValue.builder()
.siteBO(siteBO)
.value("TEST_VALUE_" + i)
.build();
testValues.add(rValue);
}
return testValues;
}
private void doOtherWork() {
// 模拟其他业务逻辑
try {
Thread.sleep(100);
log.info("其他业务逻辑执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@@ -0,0 +1,76 @@
# 测试环境配置
spring:
profiles:
active: test
main:
allow-circular-references: true
datasource:
dynamic:
datasource:
master:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.81.251:3306/acs3.0?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&useInformationSchema=true&rewriteBatchedStatements=true
username: root
password: "P@ssw0rd."
strict: true
public-key: MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAMWiTVtdXFVrgFHDDKELZM0SywkWY3KjugN90eY5Sogon1j8Y0ClPF7nx3FuE7pAeBKiv7ChIS0vvx/59WUpKmUCAwEAAQ==
data:
redis:
database: 1
host: 127.0.0.1
port: 6379
password: ""
timeout: 10s
jackson:
time-zone: GMT+8
date-format: "yyyy-MM-dd HH:mm:ss"
locale: zh_CN
serialization:
write-dates-as-timestamps: false
# InfluxDB 2.x 测试配置
influx:
# 服务地址
url: http://117.72.202.142:15964/
# 初始化生成的All-Access令牌
token: mr8DZXza-WvBtHhK51Jnfh0tNlq_rLyVYW4D9Zyz0IN66RM9KqgS8agV4XWtXekCgGhG866Dtb9Jv0t-N5D0Gw==
# 组织名称
org: acs
# 数据桶名称替代1.x的database
bucket: signals
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
jdbc-type-for-null: "null"
global-config:
banner: false
enable-sql-runner: true
db-config:
id-type: ASSIGN_ID
logic-delete-field: DELETE_FLAG
logic-delete-value: DELETED
logic-not-delete-value: NOT_DELETE
mapper-locations: classpath*:org/nl/**/mapping/*.xml
type-handlers-package: org.nl.common.handler
# sa-token configuration
sa-token:
token-name: token
timeout: 2592000
active-timeout: -1
is-concurrent: true
is-share: false
max-login-count: -1
token-style: random-32
is-log: false
is-print: false
#########################################
# easy-trans configuration
#########################################
easy-trans:
is-enable-redis: true
is-enable-global: true
is-enable-tile: true
is-enable-cloud: false