diff --git a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java index 239dc45..0664c20 100644 --- a/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java +++ b/nl-iot/src/main/java/org/nl/iot/core/cache/MetadataCacheManager.java @@ -1,16 +1,20 @@ package org.nl.iot.core.cache; import cn.hutool.core.util.ObjectUtil; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.nl.iot.core.driver.bo.ConnectSiteDO; import org.nl.iot.core.driver.bo.DeviceConnectionBO; import org.nl.iot.core.driver.bo.DeviceInfoReadCacheDo; import org.nl.iot.core.driver.bo.SiteBO; import org.nl.iot.core.driver.entity.RValue; +import org.nl.iot.core.event.SignalValueEvent; import org.nl.iot.modular.influxdb.service.PlcSignalService; import org.nl.iot.modular.iot.service.IotConfigService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.stereotype.Component; import java.util.*; @@ -26,7 +30,7 @@ import java.util.stream.Collectors; */ @Slf4j @Component -public class MetadataCacheManager implements CommandLineRunner { +public class MetadataCacheManager implements CommandLineRunner, ApplicationEventPublisherAware { @Autowired private IotConfigService configService; @@ -43,6 +47,13 @@ public class MetadataCacheManager implements CommandLineRunner { */ private final Map signalValueCache = new ConcurrentHashMap<>(); + private ApplicationEventPublisher applicationEventPublisher; + + @Override + public void setApplicationEventPublisher(@NonNull ApplicationEventPublisher publisher) { + this.applicationEventPublisher = publisher; + } + @Override public void run(String... args) throws Exception { log.info("开始加载IoT元数据到内存..."); @@ -226,18 +237,33 @@ public class MetadataCacheManager implements CommandLineRunner { if (ObjectUtil.isNotEmpty(oldValue) && oldValue.getValue().equals(newRValue.getValue())) { continue; } + // 存储内存 putSignalValue(key, newRValue); + // 加入存储influxdb数据列表 needSaveValues.add(newRValue); } } // 异步批量存储到influxDB if (!needSaveValues.isEmpty()) { + log.info("存储变化的信号数据到influxDB:{}", needSaveValues); plcSignalService.batchSaveSignalToInfluxDBAsync(needSaveValues) .exceptionally(throwable -> { log.error("异步保存信号到InfluxDB失败,设备: {}, 数量: {}", deviceKey, needSaveValues.size(), throwable); return null; }); } + // 发布消息 + if (!needSaveValues.isEmpty()) { + List signalDataList = new ArrayList<>(); + for (RValue needSaveValue : needSaveValues) { + String alias = needSaveValue.getSiteBO().getAlias(); + String key = deviceKey + "." + alias; + signalDataList.add(new SignalValueEvent.SignalData(key, needSaveValue.getValue(), needSaveValue.getExceptionMessage())); + } + // 推送 + log.info("推送变化的信号数据到event:{}", signalDataList); + applicationEventPublisher.publishEvent(new SignalValueEvent(this, signalDataList)); + } } /** diff --git a/nl-iot/src/main/java/org/nl/iot/config/AsyncConfig.java b/nl-iot/src/main/java/org/nl/iot/core/config/AsyncConfig.java similarity index 98% rename from nl-iot/src/main/java/org/nl/iot/config/AsyncConfig.java rename to nl-iot/src/main/java/org/nl/iot/core/config/AsyncConfig.java index e79a7b7..a9269da 100644 --- a/nl-iot/src/main/java/org/nl/iot/config/AsyncConfig.java +++ b/nl-iot/src/main/java/org/nl/iot/core/config/AsyncConfig.java @@ -1,4 +1,4 @@ -package org.nl.iot.config; +package org.nl.iot.core.config; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; diff --git a/nl-iot/src/main/java/org/nl/iot/core/event/SignalValueEvent.java b/nl-iot/src/main/java/org/nl/iot/core/event/SignalValueEvent.java new file mode 100644 index 0000000..db35b9a --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/core/event/SignalValueEvent.java @@ -0,0 +1,63 @@ +package org.nl.iot.core.event; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; + +import java.time.Clock; +import java.time.LocalDateTime; +import java.util.List; + +/** + * 信号推送事件 + * @Author: liyongde + * @Date: 2026/3/26 14:35 + */ +@EqualsAndHashCode(callSuper = true) +@Getter +@Setter +public class SignalValueEvent extends ApplicationEvent { + + private List signalDataList; + + /** + * 推送时间 + */ + private LocalDateTime sendTime; + + public SignalValueEvent(Object source, List signalDataList) { + super(source); + this.signalDataList = signalDataList; + this.sendTime = LocalDateTime.now(); + } + + /** + * 信号数据内部类 + */ + @Getter + @Setter + public static class SignalData { + /** + * 设备.信号名 + */ + private String code; + /** + * 值 + */ + private String value; + /** + * 异常信息 + */ + private String exceptionMessage; + + // 内部类构造方法 + public SignalData(String code, String value, String exceptionMessage) { + this.code = code; + this.value = value; + this.exceptionMessage = exceptionMessage; + } + } + +} diff --git a/nl-iot/src/main/java/org/nl/iot/core/event/package-info.java b/nl-iot/src/main/java/org/nl/iot/core/event/package-info.java new file mode 100644 index 0000000..293a8d3 --- /dev/null +++ b/nl-iot/src/main/java/org/nl/iot/core/event/package-info.java @@ -0,0 +1,6 @@ +/** + * spring事件机制,后续可以放到API模块 + * @Author: liyongde + * @Date: 2026/3/26 14:18 + */ +package org.nl.iot.core.event; \ No newline at end of file diff --git a/nl-iot/src/main/java/org/nl/iot/core/schedule/AutoReadAllSignalTaskRunner.java b/nl-iot/src/main/java/org/nl/iot/core/schedule/AutoReadAllSignalTaskRunner.java index ef22a36..bf60c5c 100644 --- a/nl-iot/src/main/java/org/nl/iot/core/schedule/AutoReadAllSignalTaskRunner.java +++ b/nl-iot/src/main/java/org/nl/iot/core/schedule/AutoReadAllSignalTaskRunner.java @@ -39,6 +39,7 @@ public class AutoReadAllSignalTaskRunner implements CommonTimerTaskRunner { return; } // 同步顺序读取 + // todo: 转换异步 for (Map.Entry entry : allDeviceInfoCache.entrySet()) { // 获取Map的Key(这里Key是设备ID,和实体中的deviceId可能相同/不同) String key = entry.getKey(); diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java index 04d6cad..bb6f432 100644 --- a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/entity/PlcSignal.java @@ -34,6 +34,12 @@ public class PlcSignal { @Column(tag = true, name = "tag") private String tag; + /** + * 异常信息 + */ + @Column(tag = true, name = "message") + private String message; + /** * 信号值(Field,存变化数据) */ diff --git a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java index a1ff004..338c55f 100644 --- a/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java +++ b/nl-iot/src/main/java/org/nl/iot/modular/influxdb/service/impl/PlcSignalServiceImpl.java @@ -50,6 +50,7 @@ public class PlcSignalServiceImpl implements PlcSignalService { PlcSignal plcSignal = PlcSignal.builder() .device(siteBO.getDeviceCode()) .tag(siteBO.getAlias()) + .message(needSaveValue.getExceptionMessage()) .value(needSaveValue.getValue()) .changeTime(Instant.now()) .build();