From d28c7a039081e3e578f98e6979a6fbca1d199eaa Mon Sep 17 00:00:00 2001 From: gengby <858962040@qq.com> Date: Thu, 28 Mar 2024 15:00:11 +0800 Subject: [PATCH] rev:mqttServer --- .../java/org/nl/config/mqtt2/MqttServer.java | 285 +++++++++--------- 1 file changed, 150 insertions(+), 135 deletions(-) diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MqttServer.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MqttServer.java index 1680c5cb..1f92b684 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MqttServer.java +++ b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MqttServer.java @@ -27,6 +27,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; @@ -198,77 +199,143 @@ public class MqttServer implements MqttCallback, ApplicationAutoInitial { } + /** + * 为每个Topic初始化线程 + */ private void run() { topicMap.forEach((topic, values) -> threadPool.execute(() -> { log.info("create topic thread:{}", topic); while (true) { try { - String message = values.take(); - long start = System.currentTimeMillis(); - execute(topic, message); - log.info("处理数据耗时:{},{}", topic, (System.currentTimeMillis() - start) + "毫秒"); - } catch (InterruptedException e) { - e.printStackTrace(); + execute(topic, values.take()); } catch (Exception e) { log.error("execute handler failed {}", e.getMessage()); - try { - Thread.sleep(5000); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } } } })); } - private void execute(String topic, String body) { - JSONObject msg = JSONObject.parseObject(body); - JSONArray msgValues = msg.getJSONArray("values"); - List itemDataList = msgValues.toJavaList(Item.class); - Map> map = itemDataList.stream().collect(Collectors.groupingBy(deviceData -> { - int thirdDotIndex = ItemUtil.nthIndexOf(deviceData.getId(), ".", 3); - return deviceData.getId().substring(0, thirdDotIndex); - })); - map.forEach((itemId, items) -> { - int startIndex = ItemUtil.nthIndexOf(itemId, ".", 2) + 1; - items.stream().anyMatch(item -> ItemUtil.hasHeartbeat(item.getId())); - String deviceCode = itemId.substring(startIndex); - Optional vOptional = items.stream() - .filter(item -> item.getId().contains(HEARTBEAT)) - .map(Item::isQ) - .findFirst(); - vOptional.ifPresent(q -> { - Device device = deviceAppService.findDeviceByCode(deviceCode); - if (device != null && device.getDeviceDriver() instanceof AbstractDeviceDriver) { - AbstractDeviceDriver deviceDriver = (AbstractDeviceDriver) device.getDeviceDriver(); - if (!Objects.equals(deviceDriver.getOnline(), q)) { - deviceDriver.setOnline(q); - log.info("device:{},online status:{}", deviceCode, q); - } - } - }); - setValue(deviceCode, items); - }); - System.out.println("线程名称:'" + Thread.currentThread() + "',接收到消息" + topic + "-" + body + "-"); - } - - /** - * 更改内存中的值 + * 针对topic的消息处理逻辑 + * + * @param topic + * @param body + */ + private void execute(String topic, String body) { + JSONObject msg = JSONObject.parseObject(body); + JSONArray msgValues = msg.getJSONArray("values"); + List itemList = msgValues.toJavaList(Item.class); + Map> groupItems = itemList.stream().collect(Collectors.groupingBy(item -> item.getId().substring(0, ItemUtil.nthIndexOf(item.getId(), ".", 3)))); + groupItems.forEach((itemId, items) -> updateDeviceStatus(itemId.substring(ItemUtil.nthIndexOf(itemId, ".", 2) + 1), items)); + System.out.println("线程名称:'" + Thread.currentThread() + "',接收到消息" + topic + "-" + body + "-"); + } + + /** + * 更新设备数据信息 * * @param deviceCode - * @param value + * @param items */ - private void setValue(String deviceCode, List value) { - value.forEach(deviceData -> { - if (!ObjectUtil.equals(deviceData.getV(), ACCESSOR_VALUE.getValue(deviceData.getId()))) { - logService.deviceExecuteLog(new LuceneLogDto(deviceCode, "MQTT上报,信号:" + deviceData.getId() + ",由" + ACCESSOR_VALUE.getValue(deviceData.getId()) + "->" + deviceData.getV() + ",信号健康值:" + deviceData.isQ())); + public void updateDeviceStatus(String deviceCode, List items) { + setOnline(deviceCode, items); + setValue(deviceCode, items); + } + + /** + * 更新设备在线状态 + * + * @param deviceCode + * @param items + */ + public void setOnline(String deviceCode, List items) { + Optional heartbeatPresent = items.stream() + .filter(item -> item.getId().contains(HEARTBEAT)) + .map(Item::isQ) + .findFirst(); + heartbeatPresent.ifPresent(q -> { + Device device = deviceAppService.findDeviceByCode(deviceCode); + if (device != null && device.getDeviceDriver() instanceof AbstractDeviceDriver) { + AbstractDeviceDriver deviceDriver = (AbstractDeviceDriver) device.getDeviceDriver(); + if (!Objects.equals(deviceDriver.getOnline(), q)) { + deviceDriver.setOnline(q); + log.info("device:{},online status:{}", deviceCode, q); + } } - ACCESSOR_VALUE.setValue(deviceData.getId(), deviceData.getV()); }); } + /** + * 更新内存中的信号值 + * + * @param deviceCode + * @param values + */ + private void setValue(String deviceCode, List values) { + values.forEach(item -> { + if (!ObjectUtil.equals(item.getV(), ACCESSOR_VALUE.getValue(item.getId()))) { + logService.deviceExecuteLog(new LuceneLogDto(deviceCode, "MQTT上报,信号:" + item.getId() + ",由" + ACCESSOR_VALUE.getValue(item.getId()) + "->" + item.getV() + ",信号健康值:" + item.isQ())); + ACCESSOR_VALUE.setValue(item.getId(), item.getV()); + } + }); + } + + /** + * 订阅topic + * + * @param topic 主题 + * @param message 内容 + * @throws Exception + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + //接收到消息:先放队列,根据不同topic处理不同处理dd逻辑 + if (topicMap.containsKey(topic)) { + topicMap.get(topic).add(new String(message.getPayload())); + System.out.println("接收到消息---" + topic + "内容:" + new String(message.getPayload())); + } else { + log.info("topic---{} does not exist", topic); + } + } + + /** + * 消息发布 + * + * @param body MQTT下发数据的格式是 + * [ + * {"id":"opc_code.plc_code.device_code.item","v":value}, + * {"id":"opc_code.plc_code.device_code.item","v":value} + * ] + */ + public void sendMsg(String body) { + try { + synchronized (this) { + if (mqttClient != null && StringUtils.isNotEmpty(body)) { + MqttMessage message = new MqttMessage(); + message.setQos(QOS_2); + message.setRetained(true); + message.setPayload(body.getBytes()); + mqttClient.publish(PUBLISH_TOPIC, message); + } + } + } catch (Exception ex) { + log.error("message publish failed:{}", ex.getMessage()); + } + } + + /** + * 发布结果处理 + * + * @param token + */ + @SneakyThrows + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + boolean complete = token.isComplete(); + MqttMessage message = token.getMessage(); + String[] topics = token.getTopics(); + log.info("message publish result ,topic:{},message:{},complete:{}", Arrays.toString(topics), message == null ? "" : message.getPayload(), complete); + } /** * 断开连接执行逻辑 @@ -301,97 +368,13 @@ public class MqttServer implements MqttCallback, ApplicationAutoInitial { } } - - /** - * 订阅topic - * - * @param topic 主题 - * @param message 内容 - * @throws Exception - */ - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - //接收到消息:先放队列,根据不同topic处理不同处理dd逻辑 - if (topicMap.containsKey(topic)) { - topicMap.get(topic).add(new String(message.getPayload())); - System.out.println("接收到消息---" + topic + "内容:" + new String(message.getPayload())); - } else { - log.info("topic---{} does not exist", topic); - } - } - - - /** - * 发布结果处理 - * - * @param token - */ - @SneakyThrows - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - boolean complete = token.isComplete(); - MqttMessage message = token.getMessage(); - String[] topics = token.getTopics(); - log.info("message publish result ,topic:{},message:{},complete:{}", Arrays.toString(topics), message == null ? "" : message.getPayload(), complete); - } - - /** - * 消息发布 - * - * @param body MQTT下发数据的格式是 - * [ - * {"id":"opc_code.plc_code.device_code.item","v":value}, - * {"id":"opc_code.plc_code.device_code.item","v":value} - * ] - */ - public void sendMsg(String body) { - try { - synchronized (this) { - if (mqttClient != null && StringUtils.isNotEmpty(body)) { - MqttMessage message = new MqttMessage(); - message.setQos(QOS_2); - message.setRetained(true); - message.setPayload(body.getBytes()); - mqttClient.publish(PUBLISH_TOPIC, message); - } - } - } catch (Exception ex) { - log.error("message publish failed:{}", ex.getMessage()); - } - } - - @PostConstruct public void start() { loadItems(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("----close threadPool-----"); - try { - shutdown(); - threadPool.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - })); } /** - * 关闭MQTT客户端连接 - * - * @throws Exception - */ - public void shutdown() throws Exception { - log.info("close mqtt client..."); - mqttClient.disconnect(); - log.info("close mqtt client finish"); - unloadItems(); - log.info("close scheduler..."); - scheduler.shutdown(); - log.info("close scheduler finish..."); - } - - /** - * 从redis写入内存 item数据 + * 从redis写入内存item数据 */ private void loadItems() { log.info("loading..."); @@ -402,6 +385,38 @@ public class MqttServer implements MqttCallback, ApplicationAutoInitial { log.info("delete finish!"); } + @SneakyThrows + @PreDestroy + public void destroy() { + shutdownMqtt(); + unloadItems(); + shutdownScheduler(); + shutdownThreadPool(); + } + + /** + * 关闭MQTT客户端连接 + * + * @throws Exception + */ + private void shutdownMqtt() throws Exception { + log.info("close mqtt client..."); + mqttClient.disconnect(); + log.info("close mqtt client finish"); + } + + private void shutdownScheduler() { + log.info("close scheduler..."); + scheduler.shutdown(); + log.info("close scheduler finish..."); + } + + private void shutdownThreadPool() { + log.info("close threadPool..."); + threadPool.shutdown(); + log.info("close threadPool finish"); + } + /** * 从内存写出到redis item数据 */