feat:spring event 事件机制推送变化的消息

This commit is contained in:
2026-03-26 14:52:31 +08:00
parent de0c031b2b
commit dbe1b156c7
7 changed files with 105 additions and 2 deletions

View File

@@ -1,16 +1,20 @@
package org.nl.iot.core.cache;
import cn.hutool.core.util.ObjectUtil;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.nl.iot.core.driver.bo.ConnectSiteDO;
import org.nl.iot.core.driver.bo.DeviceConnectionBO;
import org.nl.iot.core.driver.bo.DeviceInfoReadCacheDo;
import org.nl.iot.core.driver.bo.SiteBO;
import org.nl.iot.core.driver.entity.RValue;
import org.nl.iot.core.event.SignalValueEvent;
import org.nl.iot.modular.influxdb.service.PlcSignalService;
import org.nl.iot.modular.iot.service.IotConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
import java.util.*;
@@ -26,7 +30,7 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Component
public class MetadataCacheManager implements CommandLineRunner {
public class MetadataCacheManager implements CommandLineRunner, ApplicationEventPublisherAware {
@Autowired
private IotConfigService configService;
@@ -43,6 +47,13 @@ public class MetadataCacheManager implements CommandLineRunner {
*/
private final Map<String, RValue> signalValueCache = new ConcurrentHashMap<>();
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void setApplicationEventPublisher(@NonNull ApplicationEventPublisher publisher) {
this.applicationEventPublisher = publisher;
}
@Override
public void run(String... args) throws Exception {
log.info("开始加载IoT元数据到内存...");
@@ -226,18 +237,33 @@ public class MetadataCacheManager implements CommandLineRunner {
if (ObjectUtil.isNotEmpty(oldValue) && oldValue.getValue().equals(newRValue.getValue())) {
continue;
}
// 存储内存
putSignalValue(key, newRValue);
// 加入存储influxdb数据列表
needSaveValues.add(newRValue);
}
}
// 异步批量存储到influxDB
if (!needSaveValues.isEmpty()) {
log.info("存储变化的信号数据到influxDB{}", needSaveValues);
plcSignalService.batchSaveSignalToInfluxDBAsync(needSaveValues)
.exceptionally(throwable -> {
log.error("异步保存信号到InfluxDB失败设备: {}, 数量: {}", deviceKey, needSaveValues.size(), throwable);
return null;
});
}
// 发布消息
if (!needSaveValues.isEmpty()) {
List<SignalValueEvent.SignalData> signalDataList = new ArrayList<>();
for (RValue needSaveValue : needSaveValues) {
String alias = needSaveValue.getSiteBO().getAlias();
String key = deviceKey + "." + alias;
signalDataList.add(new SignalValueEvent.SignalData(key, needSaveValue.getValue(), needSaveValue.getExceptionMessage()));
}
// 推送
log.info("推送变化的信号数据到event{}", signalDataList);
applicationEventPublisher.publishEvent(new SignalValueEvent(this, signalDataList));
}
}
/**

View File

@@ -1,4 +1,4 @@
package org.nl.iot.config;
package org.nl.iot.core.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;

View File

@@ -0,0 +1,63 @@
package org.nl.iot.core.event;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.time.Clock;
import java.time.LocalDateTime;
import java.util.List;
/**
* 信号推送事件
* @Author: liyongde
* @Date: 2026/3/26 14:35
*/
@EqualsAndHashCode(callSuper = true)
@Getter
@Setter
public class SignalValueEvent extends ApplicationEvent {
private List<SignalData> signalDataList;
/**
* 推送时间
*/
private LocalDateTime sendTime;
public SignalValueEvent(Object source, List<SignalData> signalDataList) {
super(source);
this.signalDataList = signalDataList;
this.sendTime = LocalDateTime.now();
}
/**
* 信号数据内部类
*/
@Getter
@Setter
public static class SignalData {
/**
* 设备.信号名
*/
private String code;
/**
* 值
*/
private String value;
/**
* 异常信息
*/
private String exceptionMessage;
// 内部类构造方法
public SignalData(String code, String value, String exceptionMessage) {
this.code = code;
this.value = value;
this.exceptionMessage = exceptionMessage;
}
}
}

View File

@@ -0,0 +1,6 @@
/**
* spring事件机制后续可以放到API模块
* @Author: liyongde
* @Date: 2026/3/26 14:18
*/
package org.nl.iot.core.event;

View File

@@ -39,6 +39,7 @@ public class AutoReadAllSignalTaskRunner implements CommonTimerTaskRunner {
return;
}
// 同步顺序读取
// todo: 转换异步
for (Map.Entry<String, DeviceInfoReadCacheDo> entry : allDeviceInfoCache.entrySet()) {
// 获取Map的Key这里Key是设备ID和实体中的deviceId可能相同/不同)
String key = entry.getKey();

View File

@@ -34,6 +34,12 @@ public class PlcSignal {
@Column(tag = true, name = "tag")
private String tag;
/**
* 异常信息
*/
@Column(tag = true, name = "message")
private String message;
/**
* 信号值Field存变化数据
*/

View File

@@ -50,6 +50,7 @@ public class PlcSignalServiceImpl implements PlcSignalService {
PlcSignal plcSignal = PlcSignal.builder()
.device(siteBO.getDeviceCode())
.tag(siteBO.getAlias())
.message(needSaveValue.getExceptionMessage())
.value(needSaveValue.getValue())
.changeTime(Instant.now())
.build();