From de0c031b2b96c94989da5c60f8736dc094402286 Mon Sep 17 00:00:00 2001 From: liyongde <1419499670@qq.com> Date: Tue, 24 Mar 2026 14:47:52 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20influxdb=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/nl/iot/config/AsyncConfig.java | 56 +++++++++ .../iot/core/cache/MetadataCacheManager.java | 10 +- .../nl/iot/core/influxDB/InfluxDB2Config.java | 16 +-- .../nl/iot/core/influxDB/InfluxDB2Util.java | 6 +- .../influxdb/service/PlcSignalService.java | 12 +- .../service/impl/PlcSignalServiceImpl.java | 32 ++++- .../src/main/java/org/nl/Application.java | 2 + .../test/java/org/nl/AsyncInfluxDBTest.java | 116 ++++++++++++++++++ .../src/test/resources/application-test.yml | 76 ++++++++++++ 9 files changed, 310 insertions(+), 16 deletions(-) create mode 100644 nl-iot/src/main/java/org/nl/iot/config/AsyncConfig.java create mode 100644 nl-web-app/src/test/java/org/nl/AsyncInfluxDBTest.java create mode 100644 nl-web-app/src/test/resources/application-test.yml diff --git a/nl-iot/src/main/java/org/nl/iot/config/AsyncConfig.java b/nl-iot/src/main/java/org/nl/iot/config/AsyncConfig.java new file mode 100644 index 0000000..e79a7b7 --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/config/AsyncConfig.java @@ -0,0 +1,56 @@ +package org.nl.iot.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 异步任务配置 + * + * @author lyd + * @date 2026/03/24 + */ +@Slf4j +@Configuration +public class AsyncConfig implements AsyncConfigurer { + + /** + * InfluxDB异步写入线程池 + */ + @Bean("influxDbAsyncExecutor") + public Executor influxDbAsyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(2); + // 最大线程数 + executor.setMaxPoolSize(8); + // 队列容量 + executor.setQueueCapacity(200); + // 线程名前缀 + executor.setThreadNamePrefix("InfluxDB-Async-"); + // 线程空闲时间 + executor.setKeepAliveSeconds(60); + // 拒绝策略:调用者运行策略 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 等待所有任务结束后再关闭线程池 + executor.setWaitForTasksToCompleteOnShutdown(true); + // 等待时间 + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + + log.info("InfluxDB异步线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: {}", + executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity()); + + return executor; + } + + @Override + public Executor getAsyncExecutor() { + return influxDbAsyncExecutor(); + } +} \ No newline at end of file diff --git a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java index 26d8e52..239dc45 100644 --- a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java +++ b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java @@ -230,8 +230,14 @@ public class MetadataCacheManager implements CommandLineRunner { needSaveValues.add(newRValue); } } - // 批量存储到influxDB - plcSignalService.batchSaveSignalToInfluxDB(needSaveValues); + // 异步批量存储到influxDB + if (!needSaveValues.isEmpty()) { + plcSignalService.batchSaveSignalToInfluxDBAsync(needSaveValues) + .exceptionally(throwable -> { + log.error("异步保存信号到InfluxDB失败,设备: {}, 数量: {}", deviceKey, needSaveValues.size(), throwable); + return null; + }); + } } /** diff --git a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java index 8c83f66..0431d8b 100644 --- a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java +++ b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Config.java @@ -2,7 +2,6 @@ package org.nl.iot.core.influxDB; import com.influxdb.client.*; import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,7 +12,6 @@ import org.springframework.context.annotation.Configuration; * @Date: 2026/3/24 10:38 * @Modified By: */ -@Slf4j @Configuration public class InfluxDB2Config { @@ -29,6 +27,8 @@ public class InfluxDB2Config { @Value("${influx.bucket}") private String bucket; + private InfluxDBClient influxDBClient; + /** * 初始化InfluxDB 2.x客户端 * 修复:通过OkHttpClient配置超时,替代不存在的set超时方法 @@ -37,7 +37,7 @@ public class InfluxDB2Config { public InfluxDBClient influxDBClient() { // 创建客户端,传入自定义OkHttp配置 - InfluxDBClient client = InfluxDBClientFactory.create( + this.influxDBClient = InfluxDBClientFactory.create( url, token.toCharArray(), org, @@ -45,9 +45,8 @@ public class InfluxDB2Config { ); // 测试连接 - client.ping(); - log.info("InfluxDB 2.x 连接成功"); - return client; + influxDBClient.ping(); + return influxDBClient; } /** @@ -77,7 +76,8 @@ public class InfluxDB2Config { */ @PreDestroy public void destroy() { - log.info("InfluxDB 2.x 连接关闭"); - influxDBClient().close(); + if (influxDBClient != null) { + influxDBClient.close(); + } } } \ No newline at end of file diff --git a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java index 301f8c6..b9ca227 100644 --- a/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java +++ b/nl-iot/src/main/java/org/nl/iot/core/influxDB/InfluxDB2Util.java @@ -5,8 +5,8 @@ import com.influxdb.client.WriteApi; import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; -import jakarta.annotation.Resource; import org.nl.iot.modular.influxdb.entity.PlcSignal; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -28,10 +28,10 @@ public class InfluxDB2Util { @Value("${influx.org}") private String org; - @Resource + @Autowired private WriteApi writeApi; - @Resource + @Autowired private QueryApi queryApi; /** diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java index 988ae5e..c098b2a 100644 --- a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/PlcSignalService.java @@ -1,8 +1,10 @@ package org.nl.iot.modular.influxdb.service; import org.nl.iot.core.driver.entity.RValue; +import org.springframework.scheduling.annotation.Async; import java.util.List; +import java.util.concurrent.CompletableFuture; /** * influxdb存储信号变化值 @@ -21,8 +23,16 @@ public interface PlcSignalService { void savePlcChangeSignal(String device, String tag, String newValue); /** - * 批量存储 + * 批量存储(同步) * @param needSaveValues */ void batchSaveSignalToInfluxDB(List needSaveValues); + + /** + * 批量存储(异步) + * @param needSaveValues + * @return CompletableFuture + */ + @Async("influxDbAsyncExecutor") + CompletableFuture batchSaveSignalToInfluxDBAsync(List needSaveValues); } diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java index 58734c6..a1ff004 100644 --- a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java @@ -1,26 +1,30 @@ package org.nl.iot.modular.influxdb.service.impl; -import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; import org.nl.iot.core.driver.bo.SiteBO; import org.nl.iot.core.driver.entity.RValue; import org.nl.iot.core.influxDB.InfluxDB2Util; import org.nl.iot.modular.influxdb.entity.PlcSignal; import org.nl.iot.modular.influxdb.service.PlcSignalService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; /** * * @Author: liyongde * @Date: 2026/3/24 13:11 */ +@Slf4j @Service public class PlcSignalServiceImpl implements PlcSignalService { - @Resource + @Autowired private InfluxDB2Util influxDB2Util; @Override @@ -36,6 +40,10 @@ public class PlcSignalServiceImpl implements PlcSignalService { @Override public void batchSaveSignalToInfluxDB(List needSaveValues) { + if (needSaveValues == null || needSaveValues.isEmpty()) { + return; + } + List plcSignals = new ArrayList<>(); for (RValue needSaveValue : needSaveValues) { SiteBO siteBO = needSaveValue.getSiteBO(); @@ -47,6 +55,26 @@ public class PlcSignalServiceImpl implements PlcSignalService { .build(); plcSignals.add(plcSignal); } + + try { + influxDB2Util.insertBatch(plcSignals); + log.debug("批量保存信号到InfluxDB成功,数量: {}", plcSignals.size()); + } catch (Exception e) { + log.error("批量保存信号到InfluxDB失败,数量: {}", plcSignals.size(), e); + throw e; + } + } + @Override + @Async("influxDbAsyncExecutor") + public CompletableFuture batchSaveSignalToInfluxDBAsync(List needSaveValues) { + try { + batchSaveSignalToInfluxDB(needSaveValues); + log.debug("异步批量保存信号到InfluxDB成功,数量: {}", needSaveValues.size()); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + log.error("异步批量保存信号到InfluxDB失败,数量: {}", needSaveValues.size(), e); + return CompletableFuture.failedFuture(e); + } } } diff --git a/nl-web-app/src/main/java/org/nl/Application.java b/nl-web-app/src/main/java/org/nl/Application.java index df5d3f1..dcd0591 100644 --- a/nl-web-app/src/main/java/org/nl/Application.java +++ b/nl-web-app/src/main/java/org/nl/Application.java @@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.Environment; +import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @@ -33,6 +34,7 @@ import org.springframework.web.bind.annotation.RestController; @RestController @SpringBootApplication @EnableScheduling +@EnableAsync public class Application { /* 解决druid 日志报错:discard long time none received connection:xxx */ diff --git a/nl-web-app/src/test/java/org/nl/AsyncInfluxDBTest.java b/nl-web-app/src/test/java/org/nl/AsyncInfluxDBTest.java new file mode 100644 index 0000000..bf1d5c5 --- /dev/null +++ b/nl-web-app/src/test/java/org/nl/AsyncInfluxDBTest.java @@ -0,0 +1,116 @@ +package org.nl; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.nl.iot.core.driver.bo.SiteBO; +import org.nl.iot.core.driver.entity.RValue; +import org.nl.iot.modular.influxdb.service.PlcSignalService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * + * @Author: liyongde + * @Date: 2026/3/24 13:58 + */ +@Slf4j +@RunWith(SpringRunner.class) +@SpringBootTest(classes = Application.class) +public class AsyncInfluxDBTest { + @Autowired + private PlcSignalService plcSignalService; + + /** + * 测试异步批量保存 + */ + @Test + public void testAsyncBatchSave() { + log.info("开始测试异步批量保存..."); + + // 构造测试数据 + List testValues = createTestData(); + + // 记录开始时间 + long startTime = System.currentTimeMillis(); + + // 异步保存 + CompletableFuture future = plcSignalService.batchSaveSignalToInfluxDBAsync(testValues); + + // 主线程继续执行其他任务 + log.info("异步任务已提交,主线程继续执行其他任务..."); + doOtherWork(); + + // 等待异步任务完成(可选) + future.whenComplete((result, throwable) -> { + long duration = System.currentTimeMillis() - startTime; + if (throwable != null) { + log.error("异步保存失败,耗时: {}ms", duration, throwable); + } else { + log.info("异步保存成功,耗时: {}ms", duration); + } + }); + + log.info("测试方法执行完成"); + } + + /** + * 对比同步和异步性能 + */ + @Test + public void comparePerformance() { + List testValues = createTestData(); + + // 测试同步方式 + long syncStart = System.currentTimeMillis(); + plcSignalService.batchSaveSignalToInfluxDB(testValues); + long syncDuration = System.currentTimeMillis() - syncStart; + log.info("同步保存耗时: {}ms", syncDuration); + + // 测试异步方式 + long asyncStart = System.currentTimeMillis(); + CompletableFuture future = plcSignalService.batchSaveSignalToInfluxDBAsync(testValues); + long asyncSubmitDuration = System.currentTimeMillis() - asyncStart; + log.info("异步任务提交耗时: {}ms", asyncSubmitDuration); + + // 等待异步任务完成 + future.join(); + long asyncTotalDuration = System.currentTimeMillis() - asyncStart; + log.info("异步任务总耗时: {}ms", asyncTotalDuration); + } + + private List createTestData() { + List testValues = new ArrayList<>(); + + for (int i = 0; i < 100; i++) { + SiteBO siteBO = SiteBO.builder() + .deviceCode("TEST_DEVICE_" + (i % 10)) + .alias("TEST_TAG_" + i) + .build(); + + RValue rValue = RValue.builder() + .siteBO(siteBO) + .value("TEST_VALUE_" + i) + .build(); + + testValues.add(rValue); + } + + return testValues; + } + + private void doOtherWork() { + // 模拟其他业务逻辑 + try { + Thread.sleep(100); + log.info("其他业务逻辑执行完成"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/nl-web-app/src/test/resources/application-test.yml b/nl-web-app/src/test/resources/application-test.yml new file mode 100644 index 0000000..3511516 --- /dev/null +++ b/nl-web-app/src/test/resources/application-test.yml @@ -0,0 +1,76 @@ +# 测试环境配置 +spring: + profiles: + active: test + main: + allow-circular-references: true + datasource: + dynamic: + datasource: + master: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://192.168.81.251:3306/acs3.0?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&useInformationSchema=true&rewriteBatchedStatements=true + username: root + password: "P@ssw0rd." + strict: true + public-key: MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAMWiTVtdXFVrgFHDDKELZM0SywkWY3KjugN90eY5Sogon1j8Y0ClPF7nx3FuE7pAeBKiv7ChIS0vvx/59WUpKmUCAwEAAQ== + data: + redis: + database: 1 + host: 127.0.0.1 + port: 6379 + password: "" + timeout: 10s + jackson: + time-zone: GMT+8 + date-format: "yyyy-MM-dd HH:mm:ss" + locale: zh_CN + serialization: + write-dates-as-timestamps: false + +# InfluxDB 2.x 测试配置 +influx: + # 服务地址 + url: http://117.72.202.142:15964/ + # 初始化生成的All-Access令牌 + token: mr8DZXza-WvBtHhK51Jnfh0tNlq_rLyVYW4D9Zyz0IN66RM9KqgS8agV4XWtXekCgGhG866Dtb9Jv0t-N5D0Gw== + # 组织名称 + org: acs + # 数据桶名称(替代1.x的database) + bucket: signals + +mybatis-plus: + configuration: + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + jdbc-type-for-null: "null" + global-config: + banner: false + enable-sql-runner: true + db-config: + id-type: ASSIGN_ID + logic-delete-field: DELETE_FLAG + logic-delete-value: DELETED + logic-not-delete-value: NOT_DELETE + mapper-locations: classpath*:org/nl/**/mapping/*.xml + type-handlers-package: org.nl.common.handler + +# sa-token configuration +sa-token: + token-name: token + timeout: 2592000 + active-timeout: -1 + is-concurrent: true + is-share: false + max-login-count: -1 + token-style: random-32 + is-log: false + is-print: false + +######################################### +# easy-trans configuration +######################################### +easy-trans: + is-enable-redis: true + is-enable-global: true + is-enable-tile: true + is-enable-cloud: false \ No newline at end of file