rev:mqttServer

This commit is contained in:
2024-03-28 15:00:11 +08:00
parent f5b9f7290c
commit d28c7a0390

View File

@@ -27,6 +27,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@@ -198,77 +199,143 @@ public class MqttServer implements MqttCallback, ApplicationAutoInitial {
}
/**
* 为每个Topic初始化线程
*/
private void run() {
topicMap.forEach((topic, values) -> threadPool.execute(() -> {
log.info("create topic thread:{}", topic);
while (true) {
try {
String message = values.take();
long start = System.currentTimeMillis();
execute(topic, message);
log.info("处理数据耗时:{},{}", topic, (System.currentTimeMillis() - start) + "毫秒");
} catch (InterruptedException e) {
e.printStackTrace();
execute(topic, values.take());
} catch (Exception e) {
log.error("execute handler failed {}", e.getMessage());
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}));
}
private void execute(String topic, String body) {
JSONObject msg = JSONObject.parseObject(body);
JSONArray msgValues = msg.getJSONArray("values");
List<Item> itemDataList = msgValues.toJavaList(Item.class);
Map<String, List<Item>> map = itemDataList.stream().collect(Collectors.groupingBy(deviceData -> {
int thirdDotIndex = ItemUtil.nthIndexOf(deviceData.getId(), ".", 3);
return deviceData.getId().substring(0, thirdDotIndex);
}));
map.forEach((itemId, items) -> {
int startIndex = ItemUtil.nthIndexOf(itemId, ".", 2) + 1;
items.stream().anyMatch(item -> ItemUtil.hasHeartbeat(item.getId()));
String deviceCode = itemId.substring(startIndex);
Optional<Boolean> vOptional = items.stream()
.filter(item -> item.getId().contains(HEARTBEAT))
.map(Item::isQ)
.findFirst();
vOptional.ifPresent(q -> {
Device device = deviceAppService.findDeviceByCode(deviceCode);
if (device != null && device.getDeviceDriver() instanceof AbstractDeviceDriver) {
AbstractDeviceDriver deviceDriver = (AbstractDeviceDriver) device.getDeviceDriver();
if (!Objects.equals(deviceDriver.getOnline(), q)) {
deviceDriver.setOnline(q);
log.info("device:{},online status:{}", deviceCode, q);
}
}
});
setValue(deviceCode, items);
});
System.out.println("线程名称:'" + Thread.currentThread() + "',接收到消息" + topic + "-" + body + "-");
}
/**
* 更改内存中的值
* 针对topic的消息处理逻辑
*
* @param topic
* @param body
*/
private void execute(String topic, String body) {
JSONObject msg = JSONObject.parseObject(body);
JSONArray msgValues = msg.getJSONArray("values");
List<Item> itemList = msgValues.toJavaList(Item.class);
Map<String, List<Item>> groupItems = itemList.stream().collect(Collectors.groupingBy(item -> item.getId().substring(0, ItemUtil.nthIndexOf(item.getId(), ".", 3))));
groupItems.forEach((itemId, items) -> updateDeviceStatus(itemId.substring(ItemUtil.nthIndexOf(itemId, ".", 2) + 1), items));
System.out.println("线程名称:'" + Thread.currentThread() + "',接收到消息" + topic + "-" + body + "-");
}
/**
* 更新设备数据信息
*
* @param deviceCode
* @param value
* @param items
*/
private void setValue(String deviceCode, List<Item> value) {
value.forEach(deviceData -> {
if (!ObjectUtil.equals(deviceData.getV(), ACCESSOR_VALUE.getValue(deviceData.getId()))) {
logService.deviceExecuteLog(new LuceneLogDto(deviceCode, "MQTT上报,信号:" + deviceData.getId() + ",由" + ACCESSOR_VALUE.getValue(deviceData.getId()) + "->" + deviceData.getV() + ",信号健康值:" + deviceData.isQ()));
public void updateDeviceStatus(String deviceCode, List<Item> items) {
setOnline(deviceCode, items);
setValue(deviceCode, items);
}
/**
* 更新设备在线状态
*
* @param deviceCode
* @param items
*/
public void setOnline(String deviceCode, List<Item> items) {
Optional<Boolean> heartbeatPresent = items.stream()
.filter(item -> item.getId().contains(HEARTBEAT))
.map(Item::isQ)
.findFirst();
heartbeatPresent.ifPresent(q -> {
Device device = deviceAppService.findDeviceByCode(deviceCode);
if (device != null && device.getDeviceDriver() instanceof AbstractDeviceDriver) {
AbstractDeviceDriver deviceDriver = (AbstractDeviceDriver) device.getDeviceDriver();
if (!Objects.equals(deviceDriver.getOnline(), q)) {
deviceDriver.setOnline(q);
log.info("device:{},online status:{}", deviceCode, q);
}
}
ACCESSOR_VALUE.setValue(deviceData.getId(), deviceData.getV());
});
}
/**
* 更新内存中的信号值
*
* @param deviceCode
* @param values
*/
private void setValue(String deviceCode, List<Item> values) {
values.forEach(item -> {
if (!ObjectUtil.equals(item.getV(), ACCESSOR_VALUE.getValue(item.getId()))) {
logService.deviceExecuteLog(new LuceneLogDto(deviceCode, "MQTT上报,信号:" + item.getId() + ",由" + ACCESSOR_VALUE.getValue(item.getId()) + "->" + item.getV() + ",信号健康值:" + item.isQ()));
ACCESSOR_VALUE.setValue(item.getId(), item.getV());
}
});
}
/**
* 订阅topic
*
* @param topic 主题
* @param message 内容
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//接收到消息先放队列根据不同topic处理不同处理dd逻辑
if (topicMap.containsKey(topic)) {
topicMap.get(topic).add(new String(message.getPayload()));
System.out.println("接收到消息---" + topic + "内容:" + new String(message.getPayload()));
} else {
log.info("topic---{} does not exist", topic);
}
}
/**
* 消息发布
*
* @param body MQTT下发数据的格式是
* [
* {"id":"opc_code.plc_code.device_code.item","v":value},
* {"id":"opc_code.plc_code.device_code.item","v":value}
* ]
*/
public void sendMsg(String body) {
try {
synchronized (this) {
if (mqttClient != null && StringUtils.isNotEmpty(body)) {
MqttMessage message = new MqttMessage();
message.setQos(QOS_2);
message.setRetained(true);
message.setPayload(body.getBytes());
mqttClient.publish(PUBLISH_TOPIC, message);
}
}
} catch (Exception ex) {
log.error("message publish failed:{}", ex.getMessage());
}
}
/**
* 发布结果处理
*
* @param token
*/
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
boolean complete = token.isComplete();
MqttMessage message = token.getMessage();
String[] topics = token.getTopics();
log.info("message publish result ,topic:{},message:{},complete:{}", Arrays.toString(topics), message == null ? "" : message.getPayload(), complete);
}
/**
* 断开连接执行逻辑
@@ -301,97 +368,13 @@ public class MqttServer implements MqttCallback, ApplicationAutoInitial {
}
}
/**
* 订阅topic
*
* @param topic 主题
* @param message 内容
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//接收到消息先放队列根据不同topic处理不同处理dd逻辑
if (topicMap.containsKey(topic)) {
topicMap.get(topic).add(new String(message.getPayload()));
System.out.println("接收到消息---" + topic + "内容:" + new String(message.getPayload()));
} else {
log.info("topic---{} does not exist", topic);
}
}
/**
* 发布结果处理
*
* @param token
*/
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
boolean complete = token.isComplete();
MqttMessage message = token.getMessage();
String[] topics = token.getTopics();
log.info("message publish result ,topic:{},message:{},complete:{}", Arrays.toString(topics), message == null ? "" : message.getPayload(), complete);
}
/**
* 消息发布
*
* @param body MQTT下发数据的格式是
* [
* {"id":"opc_code.plc_code.device_code.item","v":value},
* {"id":"opc_code.plc_code.device_code.item","v":value}
* ]
*/
public void sendMsg(String body) {
try {
synchronized (this) {
if (mqttClient != null && StringUtils.isNotEmpty(body)) {
MqttMessage message = new MqttMessage();
message.setQos(QOS_2);
message.setRetained(true);
message.setPayload(body.getBytes());
mqttClient.publish(PUBLISH_TOPIC, message);
}
}
} catch (Exception ex) {
log.error("message publish failed:{}", ex.getMessage());
}
}
@PostConstruct
public void start() {
loadItems();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("----close threadPool-----");
try {
shutdown();
threadPool.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}));
}
/**
* 关闭MQTT客户端连接
*
* @throws Exception
*/
public void shutdown() throws Exception {
log.info("close mqtt client...");
mqttClient.disconnect();
log.info("close mqtt client finish");
unloadItems();
log.info("close scheduler...");
scheduler.shutdown();
log.info("close scheduler finish...");
}
/**
* 从redis写入内存 item数据
* 从redis写入内存item数据
*/
private void loadItems() {
log.info("loading...");
@@ -402,6 +385,38 @@ public class MqttServer implements MqttCallback, ApplicationAutoInitial {
log.info("delete finish!");
}
@SneakyThrows
@PreDestroy
public void destroy() {
shutdownMqtt();
unloadItems();
shutdownScheduler();
shutdownThreadPool();
}
/**
* 关闭MQTT客户端连接
*
* @throws Exception
*/
private void shutdownMqtt() throws Exception {
log.info("close mqtt client...");
mqttClient.disconnect();
log.info("close mqtt client finish");
}
private void shutdownScheduler() {
log.info("close scheduler...");
scheduler.shutdown();
log.info("close scheduler finish...");
}
private void shutdownThreadPool() {
log.info("close threadPool...");
threadPool.shutdown();
log.info("close threadPool finish");
}
/**
* 从内存写出到redis item数据
*/