diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/device.xls b/wcs/nladmin-system/src/main/java/org/nl/acs/device.xls index 1dc1fdff..69c0ee5b 100644 Binary files a/wcs/nladmin-system/src/main/java/org/nl/acs/device.xls and b/wcs/nladmin-system/src/main/java/org/nl/acs/device.xls differ diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/dto/OpcDto.java b/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/dto/OpcDto.java index 37f526fb..f2524a82 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/dto/OpcDto.java +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/dto/OpcDto.java @@ -32,6 +32,11 @@ public class OpcDto implements Serializable { */ private String opc_host; + /** + * MQTT订阅主题 需要与OPCServer保持一致 + */ + private String topic; + /** * 用户名 */ diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/OpcServiceImpl.java b/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/OpcServiceImpl.java index 2f0a93d0..00cb4fa4 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/OpcServiceImpl.java +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/OpcServiceImpl.java @@ -4,6 +4,7 @@ package org.nl.acs.device.service.impl; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.RequiredArgsConstructor; @@ -85,6 +86,9 @@ public class OpcServiceImpl implements OpcService { public List queryAll(Map whereJson) { WQLObject wo = WQLObject.getWQLObject("acs_opc"); JSONArray arr = wo.query().getResultJSONArray(0); + if (ObjectUtil.isEmpty(arr)) { + return new ArrayList<>(); + } List list = arr.toJavaList(OpcDto.class); return list; } @@ -119,7 +123,7 @@ public class OpcServiceImpl implements OpcService { dto.setCreate_time(now); WQLObject wo = WQLObject.getWQLObject("acs_opc"); - JSONObject json = (JSONObject) JSONObject.toJSON(dto); + JSONObject json = (JSONObject) JSONObject.toJSON(dto); wo.insert(json); } @@ -136,7 +140,7 @@ public class OpcServiceImpl implements OpcService { dto.setUpdate_by(currentUsername); WQLObject wo = WQLObject.getWQLObject("acs_opc"); - JSONObject json = (JSONObject) JSONObject.toJSON(dto); + JSONObject json = (JSONObject) JSONObject.toJSON(dto); wo.update(json); } diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/device_driver/DeviceDriver.java b/wcs/nladmin-system/src/main/java/org/nl/acs/device_driver/DeviceDriver.java index 3f702c59..ccf631e3 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/acs/device_driver/DeviceDriver.java +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/device_driver/DeviceDriver.java @@ -5,6 +5,9 @@ import org.nl.acs.opc.Device; import java.util.List; public interface DeviceDriver { + + + default String getDeviceCode() { return this.getDevice().getDevice_code(); } diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceExecuteAutoRun.java b/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceExecuteAutoRun.java index bfb40096..7e7295f0 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceExecuteAutoRun.java +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceExecuteAutoRun.java @@ -77,7 +77,7 @@ public class DeviceExecuteAutoRun extends AbstractAutoRunnable { for (int i = 0; !OpcStartTag.is_run; ++i) { log.info("设备执行线程等待opc同步线程..."); Thread.sleep(1000L); - if (i > 60) { + if (i > 20) { log.info("设备执行线程放弃等待opc同步线程..."); break; } diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java b/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java index f5476f9b..49fd24e3 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java @@ -79,7 +79,7 @@ public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerC if (OpcConfig.opc_item_read_using_callback) { this.runNew(); } else { - this.runOld(); + //this.runOld(); } } diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/task/service/impl/TaskServiceImpl.java b/wcs/nladmin-system/src/main/java/org/nl/acs/task/service/impl/TaskServiceImpl.java index 06e8b1c7..4ac7504e 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/acs/task/service/impl/TaskServiceImpl.java +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/task/service/impl/TaskServiceImpl.java @@ -590,42 +590,43 @@ public class TaskServiceImpl implements TaskService, ApplicationAutoInitial { @Override public void createInst(String ids) throws Exception { - TaskDto acsTask = this.findById(ids); - if (acsTask == null) throw new BadRequestException("被删除或无权限,操作失败!"); - List tasksByLinkNum = this.findByLinkNumFromCache(acsTask.getLink_num()); - String link_no = CodeUtil.getNewCode("LINK_NO"); - if (tasksByLinkNum.size() == 1) { - TaskDto taskDto = tasksByLinkNum.get(0); - Instruction instruction = instructionService.findByTaskCodeFromCache(taskDto.getTask_code()); - if (instruction != null) { - throw new BadRequestException("单任务有指令未完成!指令号:" + instruction.getInstruction_code()); - } - Instruction dto = instructionService.createInstDtoByTask(taskDto, link_no); - instructionService.create(dto); - taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode()); - this.update(taskDto); - } - if (tasksByLinkNum.size() == 2) { - TaskDto taskDto1 = tasksByLinkNum.get(0); - Instruction inst1 = instructionService.findByTaskCodeFromCache(taskDto1.getTask_code()); - if (inst1 != null) { - throw new BadRequestException("双任务存有指令未完成!指令号:" + inst1.getInstruction_code()); - } - TaskDto taskDto2 = tasksByLinkNum.get(1); - Instruction inst2 = instructionService.findByTaskCodeFromCache(taskDto2.getTask_code()); - if (inst2 != null) { - throw new BadRequestException("双任务有指令未完成!指令号:" + inst2.getInstruction_code()); - } - Instruction dto1 = instructionService.createInstDtoByTask(taskDto1, link_no); - Instruction dto2 = instructionService.createInstDtoByTask(taskDto2, link_no); - instructionService.createTwoInst(dto1, dto2); - taskDto1.setTask_status(StatusEnum.TASK_RUNNING.getCode()); - taskDto1.setUpdate_time(DateUtil.now()); - this.update(taskDto1); - taskDto2.setTask_status(StatusEnum.TASK_RUNNING.getCode()); - taskDto2.setUpdate_time(DateUtil.now()); - this.update(taskDto2); - } + throw new BadRequestException("创建指令失败!"); +// TaskDto acsTask = this.findById(ids); +// if (acsTask == null) throw new BadRequestException("被删除或无权限,操作失败!"); +// List tasksByLinkNum = this.findByLinkNumFromCache(acsTask.getLink_num()); +// String link_no = CodeUtil.getNewCode("LINK_NO"); +// if (tasksByLinkNum.size() == 1) { +// TaskDto taskDto = tasksByLinkNum.get(0); +// Instruction instruction = instructionService.findByTaskCodeFromCache(taskDto.getTask_code()); +// if (instruction != null) { +// throw new BadRequestException("单任务有指令未完成!指令号:" + instruction.getInstruction_code()); +// } +// Instruction dto = instructionService.createInstDtoByTask(taskDto, link_no); +// instructionService.create(dto); +// taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode()); +// this.update(taskDto); +// } +// if (tasksByLinkNum.size() == 2) { +// TaskDto taskDto1 = tasksByLinkNum.get(0); +// Instruction inst1 = instructionService.findByTaskCodeFromCache(taskDto1.getTask_code()); +// if (inst1 != null) { +// throw new BadRequestException("双任务存有指令未完成!指令号:" + inst1.getInstruction_code()); +// } +// TaskDto taskDto2 = tasksByLinkNum.get(1); +// Instruction inst2 = instructionService.findByTaskCodeFromCache(taskDto2.getTask_code()); +// if (inst2 != null) { +// throw new BadRequestException("双任务有指令未完成!指令号:" + inst2.getInstruction_code()); +// } +// Instruction dto1 = instructionService.createInstDtoByTask(taskDto1, link_no); +// Instruction dto2 = instructionService.createInstDtoByTask(taskDto2, link_no); +// instructionService.createTwoInst(dto1, dto2); +// taskDto1.setTask_status(StatusEnum.TASK_RUNNING.getCode()); +// taskDto1.setUpdate_time(DateUtil.now()); +// this.update(taskDto1); +// taskDto2.setTask_status(StatusEnum.TASK_RUNNING.getCode()); +// taskDto2.setUpdate_time(DateUtil.now()); +// this.update(taskDto2); +// } } @Override diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/ItemsDataAccessor.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/ItemsDataAccessor.java new file mode 100644 index 00000000..6cfd1380 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/ItemsDataAccessor.java @@ -0,0 +1,17 @@ +package org.nl.acs.udw.mqttUdw; + +import org.nl.acs.udw.UnifiedData; +import org.nl.acs.udw.mqttUdw.service.ItemData; + +import java.util.List; + +/** + * @author onepiece + */ +public interface ItemsDataAccessor { + List getAllKey(); + + Object getValue(String key); + + void setValue(String key, Object value); +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/ItemsProcessService.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/ItemsProcessService.java new file mode 100644 index 00000000..11919509 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/ItemsProcessService.java @@ -0,0 +1,37 @@ +package org.nl.acs.udw.mqttUdw; + +import cn.hutool.core.util.ObjectUtil; +import org.nl.acs.udw.UnifiedData; +import org.nl.acs.udw.mqttUdw.service.ItemData; +import org.nl.acs.udw.mqttUdw.service.ItemUnit; +import org.nl.acs.udw.service.impl.UnifiedDataUnit; + +import java.util.List; + +/** + * @author onepiece + */ +public interface ItemsProcessService { + /** + * 获取所有的key + * + * @return + */ + List getAllUnifiedKey(); + + /** + * 根据key获取数据单元 + * + * @param key + * @return + */ + ItemUnit getItemUnit(String key); + + ItemData getItemData(String var1, String var2); + + Object getValue(String var1, String var2); + + void setValue(String var1, String var2, Object var3); + + List getAllDataKey(String var1); +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/factory/ItemDataAccessorFactory.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/factory/ItemDataAccessorFactory.java new file mode 100644 index 00000000..0d8e63d3 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/factory/ItemDataAccessorFactory.java @@ -0,0 +1,18 @@ +package org.nl.acs.udw.mqttUdw.factory; + +import org.nl.acs.udw.mqttUdw.ItemsDataAccessor; +import org.nl.acs.udw.mqttUdw.service.ItemsDataAccessorImpl; + +/** + * @author onepiece + */ +public class ItemDataAccessorFactory { + + private static final ItemsDataAccessorImpl itemsDataAccessor = new ItemsDataAccessorImpl(); + + public static ItemsDataAccessor getItemsDataAccessor(String unified_key) { + itemsDataAccessor.setUnifiedKey(unified_key); + itemsDataAccessor.setItemsProcess(ItemsProcessServiceFactory.getItemsUnifyProcess()); + return itemsDataAccessor; + } +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/factory/ItemsProcessServiceFactory.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/factory/ItemsProcessServiceFactory.java new file mode 100644 index 00000000..1f05a861 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/factory/ItemsProcessServiceFactory.java @@ -0,0 +1,17 @@ +package org.nl.acs.udw.mqttUdw.factory; + +import org.nl.acs.udw.mqttUdw.ItemsProcessService; +import org.nl.acs.udw.mqttUdw.service.ItemsProcessImpl; + +/** + * @author onepiece + */ +public class ItemsProcessServiceFactory { + + private static final ItemsProcessService itemsProcess = new ItemsProcessImpl(); + + + public static ItemsProcessService getItemsUnifyProcess() { + return itemsProcess; + } +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemData.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemData.java new file mode 100644 index 00000000..78d3f905 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemData.java @@ -0,0 +1,28 @@ +package org.nl.acs.udw.mqttUdw.service; + +import lombok.Data; + +import java.util.Date; + +/** + * @author onepiece + */ +@Data +public class ItemData { + private Object value; + private Date last_modify_date; + + public ItemData() { + this.last_modify_date = new Date(); + } + + public ItemData(Object value) { + this.value = value; + this.last_modify_date = new Date(); + } + + public void changeValue(Object value) { + this.value = value; + this.last_modify_date = new Date(); + } +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemUnit.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemUnit.java new file mode 100644 index 00000000..d92bc435 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemUnit.java @@ -0,0 +1,25 @@ +package org.nl.acs.udw.mqttUdw.service; + +import lombok.Data; +import org.nl.acs.udw.UnifiedData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author onepiece + */ +@Data +public class ItemUnit { + private String unifiedKey; + private Map storage = new ConcurrentHashMap<>(); + private Map> history = new ConcurrentHashMap<>(); + + public ItemUnit(String unifiedKey) { + this.unifiedKey = unifiedKey; + } + +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemsDataAccessorImpl.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemsDataAccessorImpl.java new file mode 100644 index 00000000..ce037395 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemsDataAccessorImpl.java @@ -0,0 +1,39 @@ +package org.nl.acs.udw.mqttUdw.service; + +import org.nl.acs.udw.mqttUdw.ItemsDataAccessor; +import org.nl.acs.udw.mqttUdw.ItemsProcessService; + +import java.util.List; + +/** + * @author onepiece + */ +public class ItemsDataAccessorImpl implements ItemsDataAccessor { + + private String unified_key; + private ItemsProcessService itemsProcess; + + + public void setUnifiedKey(String unified_key) { + this.unified_key = unified_key; + } + + public void setItemsProcess(ItemsProcessService itemsProcess) { + this.itemsProcess = itemsProcess; + } + + @Override + public List getAllKey() { + return this.itemsProcess.getAllDataKey(this.unified_key); + } + + @Override + public Object getValue(String key) { + return this.itemsProcess.getValue(this.unified_key, key); + } + + @Override + public void setValue(String key, Object value) { + this.itemsProcess.setValue(this.unified_key, key, value); + } +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemsProcessImpl.java b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemsProcessImpl.java new file mode 100644 index 00000000..948eb9cd --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/acs/udw/mqttUdw/service/ItemsProcessImpl.java @@ -0,0 +1,125 @@ +package org.nl.acs.udw.mqttUdw.service; + +import lombok.extern.slf4j.Slf4j; +import org.nl.acs.udw.UdwConfig; +import org.nl.acs.udw.UnifiedDataAppService; +import org.nl.acs.udw.mqttUdw.ItemsProcessService; +import org.nl.modules.common.exception.BadRequestException; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author onepiece + */ +@Slf4j +public class ItemsProcessImpl implements ItemsProcessService { + private Map factory = new ConcurrentHashMap<>(); + + public ItemsProcessImpl() { + } + + + @Override + public List getAllUnifiedKey() { + return new ArrayList(this.factory.keySet()); + } + + @Override + public ItemUnit getItemUnit(String unified_key) { + ItemUnit itemUnit = this.factory.get(unified_key); + return itemUnit == null ? null : itemUnit; + } + + @Override + public List getAllDataKey(String unified_key) { + ItemUnit itemUnit = (ItemUnit) this.factory.get(unified_key); + if (itemUnit == null) { + return new ArrayList(); + } else { + Map storage = itemUnit.getStorage(); + return new ArrayList(storage.keySet()); + } + } + + @Override + public ItemData getItemData(String unified_key, String key) { + ItemUnit itemUnit = this.getItemUnit(unified_key); + if (itemUnit == null) { + return null; + } else { + Map storage = itemUnit.getStorage(); + return (ItemData) storage.get(key); + } + } + + @Override + public Object getValue(String unified_key, String key) { + ItemData itemData = this.getItemData(unified_key, key); + return itemData == null ? null : itemData.getValue(); + } + + + @Override + public void setValue(String unified_key, String key, Object value) { + this.setValue(unified_key, key, value, false, true); + } + + + public synchronized void setValue(String unified_key, String key, Object value, boolean save, boolean is_log) { + if (unified_key == null) { + throw new BadRequestException(""); + //throw new BusinessException(SystemMessage.cant_be_empty, new Object[]{"unified_key"}); + } else if (key == null) { + throw new BadRequestException(""); + //throw new BusinessException(SystemMessage.cant_be_empty, new Object[]{"key"}); + } else { + if (!this.factory.containsKey(unified_key)) { + this.factory.put(unified_key, new ItemUnit(unified_key)); + } + + ItemUnit itemUnit = (ItemUnit) this.factory.get(unified_key); + Map storage = itemUnit.getStorage(); + if (!storage.containsKey(key)) { + storage.put(key, new ItemData()); + } + + ItemData itemData = (ItemData) storage.get(key); + if (!UnifiedDataAppService.isEquals(itemData.getValue(), value)) { + Map> history = itemUnit.getHistory(); + List historyItemDatas = (List) history.get(key); + if (historyItemDatas == null) { + history.put(key, new ArrayList()); + } + + ItemData historyData = new ItemData(); + historyData.setLast_modify_date(itemData.getLast_modify_date()); + historyData.setValue(itemData.getValue()); + + while (((List) history.get(key)).size() > UdwConfig.max_history_length) { + ((List) history.get(key)).remove(UdwConfig.max_history_length); + } + + ((List) history.get(key)).add(0, historyData); + Object oldvalue = itemData.getValue(); + itemData.changeValue(value); + if (save) { + /*PersistenceService persistenceService = PersistenceServiceFactory.getPersistenceService(); + persistenceService.saveData(unified_key, key, StringUtl.getString(value)); + if (is_log) { + this.businessLogger.setResource(unified_key, unified_key); + this.businessLogger.setMaterial(key, key); + this.businessLogger.setContainer(StringUtl.getString(value)); + this.businessLogger.log("统一数据源中: unit: {}, key: {}, 值: {} 更改为 {}。", new Object[]{unified_key, key, oldvalue, value}); + }*/ + } + + if (is_log && key != null && !key.endsWith("heartbeat") && !key.endsWith("distancex") && !key.endsWith("distancey") && !key.endsWith("Xwz") && !key.endsWith("Ywz") && !key.endsWith("Zwz")) { + log.trace("统一数据源中: unit: {}, key: {}, 值: {} 更改为 {}。", new Object[]{unified_key, key, oldvalue, value}); + } + } + + } + } + +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/ItemUtil.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/ItemUtil.java new file mode 100644 index 00000000..0eb2506a --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/ItemUtil.java @@ -0,0 +1,48 @@ +package org.nl.config.mqtt2; + +import org.nl.acs.device_driver.DeviceDriver; +import org.nl.acs.device_driver.driver.AbstractDeviceDriver; +import org.nl.acs.opc.Device; +import org.nl.acs.opc.DeviceAppService; +import org.nl.config.mqtt2.msg.DeviceItemData; +import org.nl.modules.wql.util.SpringContextHolder; + +import java.util.List; +import java.util.Optional; + +/** + * @Description TODO + * @Author Gengby + * @Date 2024/3/18 + */ +public class ItemUtil { + + /** + * 心跳item名称 + */ + private static final String HEARTBEAT = "heartbeat"; + + /** + * 获取指定符合索引 + * + * @param text itemId + * @param character 指定符合 + * @param n 第几个富豪 + * @return 索引 + */ + public static int nthIndexOf(String text, String character, int n) { + int position = -1; + do { + position = text.indexOf(character, position + 1); + } while (n-- > 1 && position != -1); + return position; + } + + public static boolean hasHeartbeat(String itemId) { + return itemId.contains(HEARTBEAT); + } + + public static void setIsOnline(DeviceAppService deviceAppService, String deviceCode, List items) { + } + +} \ No newline at end of file diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MQServer.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MQServer.java deleted file mode 100644 index e2c55619..00000000 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MQServer.java +++ /dev/null @@ -1,78 +0,0 @@ -package org.nl.config.mqtt2; - -import org.apache.commons.lang3.StringUtils; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.nl.config.mqtt2.callback.PublishCallback; -import org.nl.config.mqtt2.callback.SubsribeCallback; -import org.nl.config.mqtt2.config.MqttConfig; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import java.util.UUID; - -/* - * @author ZZQ - * @Date 2024/1/31 14:07 - * - */ -@Service -@ConditionalOnProperty(name = "mqtt.active", havingValue = "true") -public class MQServer { - - @Autowired - private MqttConfig mqttConfig; - - public static MqttClient subsribeClient; - - public static MqttClient publishClient; - - @PostConstruct - public void init() throws Exception { - this.initSubsribe(); - this.initPublish(); - } - - - public static void sendMsg(String topic,String body,int qos,Boolean retained){ - try { - if (publishClient!=null && StringUtils.isNotEmpty(body)){ - MqttMessage message = new MqttMessage(); - message.setQos(qos); - message.setRetained(retained); - message.setPayload(body.getBytes()); - publishClient.publish(topic,message); - } - }catch (Exception ex){ - ex.printStackTrace(); - } - } - - public void shutdown() throws Exception { - subsribeClient.disconnect(); - publishClient.disconnect(); - } - - private void initSubsribe() throws Exception { - if (subsribeClient!=null){ - System.out.println("重新连接"); - subsribeClient.disconnect(); - } - subsribeClient = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence()); - subsribeClient.connect(mqttConfig.getOption()); - subsribeClient.setCallback(new SubsribeCallback()); - subsribeClient.subscribe(mqttConfig.getTopics()); - } - private void initPublish() throws Exception { - if (publishClient!=null){ - System.out.println("重新连接"); - publishClient.disconnect(); - } - publishClient = new MqttClient(mqttConfig.getUrl(), UUID.randomUUID().toString(), new MemoryPersistence()); - publishClient.connect(mqttConfig.getOption()); - publishClient.setCallback(new PublishCallback()); - } -} diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MqttServer.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MqttServer.java new file mode 100644 index 00000000..010c39d3 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/MqttServer.java @@ -0,0 +1,412 @@ +package org.nl.config.mqtt2; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import io.micrometer.core.instrument.util.NamedThreadFactory; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.nl.acs.auto.initial.ApplicationAutoInitial; +import org.nl.acs.device.service.OpcService; +import org.nl.acs.device.service.dto.OpcDto; +import org.nl.acs.device_driver.driver.AbstractDeviceDriver; +import org.nl.acs.opc.Device; +import org.nl.acs.opc.DeviceAppService; +import org.nl.acs.opc.OpcConfig; +import org.nl.acs.opc.OpcServerService; +import org.nl.acs.udw.mqttUdw.ItemsDataAccessor; +import org.nl.acs.udw.mqttUdw.factory.ItemDataAccessorFactory; +import org.nl.config.mqtt2.config.MqttConfig; +import org.nl.config.mqtt2.msg.DeviceItemData; +import org.nl.modules.lucene.service.LuceneExecuteLogService; +import org.nl.modules.lucene.service.dto.LuceneLogDto; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * @Description TODO + * @Author Gengby + * @Date 2024/3/15 + */ +@Component +@ConditionalOnProperty(name = "mqtt.active", havingValue = "true") +@ConditionalOnBean({LuceneExecuteLogService.class, OpcServerService.class, DeviceAppService.class}) +@Slf4j +public class MqttServer implements MqttCallback, ApplicationAutoInitial { + + @Autowired + private MqttConfig mqttConfig; + @Autowired + private RedisTemplate redisTemplate; + @Autowired + private LuceneExecuteLogService logService; + @Autowired + private OpcService opcService; + @Autowired + private DeviceAppService deviceAppService; + + /** + * 根据topic存储订阅消息 + */ + private final Map> topicMap = new ConcurrentHashMap<>(); + + /** + * 存储设备点位信息 + */ + private static final ItemsDataAccessor ACCESSOR_VALUE = ItemDataAccessorFactory.getItemsDataAccessor(OpcConfig.udw_opc_value_key); + + /** + * 发布TOPIC + */ + private static final String PUBLISH_TOPIC = "publishTopic"; + + /** + * 重连开启一个新的线程 + */ + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + /** + * 重连延迟时间,单位:毫秒 + */ + private static final long RECONNECT_DELAY = 5000L; + + /** + * 重连标记 + */ + private volatile boolean reconnectingFlag = false; + + /** + * 存储Redis数据Key + */ + private static final String KEY = "opc:items"; + + /** + * 心跳item名称 + */ + private static final String HEARTBEAT = "heartbeat"; + + /** + * MQTT客户端 + */ + private MqttClient mqttClient; + + /** + * MQTT QOS级别 这里默认的使用的是QOS_2 + * 0 最多交付一次 可能导致消息丢失 + * 1 最少交付一次 可能导致消息重复 + * 2 只交付一次 保证消息不会丢失和重复 + */ + private static final int QOS_0 = 0; + private static final int QOS_1 = 1; + private static final int QOS_2 = 2; + + /** + * 线程池核心线程数量 + */ + private static final int CORE_POOL_SIZE = 1; + + /** + * 线程池最大线程数量 + */ + private static final int MAX_POOL_SIZE = 10; + + /** + * 非核心线程存活时间 + */ + private static final int KEEP_ALIVE_TIME = 10; + + /** + * 用来存储等待执行任务的阻塞队列的大小 + */ + private static final int WORK_QUEUE_SIZE = 100; + + /** + * 队列 + */ + private static final Queue MSG_QUEUE = new LinkedBlockingQueue<>(); + + /** + * 线程工厂 + */ + private final ThreadFactory threadFactory = new NamedThreadFactory("MqttThreadFactory"); + + /** + * 线程池 + */ + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + KEEP_ALIVE_TIME, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(WORK_QUEUE_SIZE), + threadFactory, + (r, executor) -> MSG_QUEUE.add(r) + ); + + + @Override + public void autoInitial() throws Exception { + this.init(); + } + + + private void init() throws Exception { + this.initTopicMap(); + this.initMqttClient(); + this.run(); + } + + /** + * 初始化topicMap + *

+ * 从数据库中查询所有的topic + * 并为每个topic初始化一个队列,分别存储其订阅的消息 + */ + private void initTopicMap() { + log.info("init topicMap..."); + List opcDtos = opcService.queryAll(null); + //根据配置OPC信息动态调整核心线程池个数 + threadPool.setCorePoolSize(opcDtos.size() + 1); + threadPool.setMaximumPoolSize(opcDtos.size() + 10); + opcDtos.forEach(opcDto -> topicMap.put(opcDto.getTopic(), new LinkedBlockingQueue<>())); + log.info("init topicMap finish"); + } + + /** + * 初始化MQTT客户端 + * + * @throws Exception + */ + private void initMqttClient() throws Exception { + log.info("init mqtt client..."); + if (mqttClient != null) { + mqttClient.disconnect(); + } + mqttClient = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence()); + mqttClient.connect(mqttConfig.getOption()); + mqttClient.setCallback(this); + mqttClient.subscribe(mqttConfig.getTopics(), mqttConfig.getQoss()); + log.info("init mqtt client finish"); + } + + + private void run() { + topicMap.forEach((topic, values) -> threadPool.execute(() -> { + try { + log.info("create topic thread:{}", topic); + while (true) { + String message = values.take(); + execute(topic, message); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("topic:{} , msg handle failed: {}", topic, e.getMessage()); + } + })); + } + + + private void execute(String topic, String body) { + JSONObject msg = JSONObject.parseObject(body); + JSONArray msgValues = msg.getJSONArray("values"); + List itemDataList = msgValues.toJavaList(DeviceItemData.class); + Map> map = itemDataList.stream().collect(Collectors.groupingBy(deviceData -> { + int thirdDotIndex = ItemUtil.nthIndexOf(deviceData.getId(), ".", 3); + return deviceData.getId().substring(0, thirdDotIndex); + })); + map.forEach((itemId, items) -> { + int startIndex = ItemUtil.nthIndexOf(itemId, ".", 2) + 1; + items.stream().anyMatch(item -> ItemUtil.hasHeartbeat(item.getId())); + String deviceCode = itemId.substring(startIndex); + Optional vOptional = items.stream() + .filter(item -> item.getId().contains(HEARTBEAT)) + .map(DeviceItemData::isQ) + .findFirst(); + vOptional.ifPresent(q -> { + Device device = deviceAppService.findDeviceByCode(deviceCode); + if (device != null && device.getDeviceDriver() instanceof AbstractDeviceDriver) { + AbstractDeviceDriver deviceDriver = (AbstractDeviceDriver) device.getDeviceDriver(); + if (!Objects.equals(deviceDriver.getOnline(), q)) { + deviceDriver.setOnline(q); + log.info("device:{},online status:{}", deviceCode, q); + } + } + }); + setValue(deviceCode, items); + }); + System.out.println("线程名称:'" + Thread.currentThread() + "',接收到消息" + topic + "-" + body + "-"); + } + + + /** + * 更改内存中的值 + * + * @param deviceCode + * @param value + */ + private void setValue(String deviceCode, List value) { + value.forEach(deviceData -> { + logService.deviceExecuteLog(new LuceneLogDto(deviceCode, "MQTT上报,信号:" + deviceData.getId() + ",由" + ACCESSOR_VALUE.getValue(deviceData.getId()) + "->" + deviceData.getV() + ",信号健康值:" + deviceData.isQ())); + ACCESSOR_VALUE.setValue(deviceData.getId(), deviceData.getV()); + }); + } + + + /** + * 断开连接执行逻辑 + * + * @param cause + */ + @Override + public void connectionLost(Throwable cause) { + log.info("mqtt client break link"); + if (!reconnectingFlag) { + reconnectingFlag = true; + scheduler.execute(() -> { + while (!mqttClient.isConnected()) { + try { + if (!mqttClient.isConnected()) { + mqttClient.reconnect(); + log.info("mqtt client again success"); + } + } catch (MqttException e) { + log.error("mqtt client again failed: {}", e.getMessage()); + try { + Thread.sleep(RECONNECT_DELAY); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + reconnectingFlag = false; + }); + } + } + + + /** + * 订阅topic + * + * @param topic 主题 + * @param message 内容 + * @throws Exception + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + //接收到消息:先放队列,根据不同topic处理不同处理dd逻辑 + if (topicMap.containsKey(topic)) { + topicMap.get(topic).add(new String(message.getPayload())); + System.out.println("接收到消息---" + topic + "内容:" + new String(message.getPayload())); + } else { + log.info("topic---{} does not exist", topic); + } + } + + + /** + * 发布结果处理 + * + * @param token + */ + @SneakyThrows + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + boolean complete = token.isComplete(); + MqttMessage message = token.getMessage(); + String[] topics = token.getTopics(); + log.info("message publish result ,topic:{},message:{},complete:{}", Arrays.toString(topics), new String(message.getPayload()), complete); + } + + /** + * 消息发布 + * + * @param body MQTT下发数据的格式是 + * [ + * {"id":"opc_code.plc_code.device_code.item","v":value}, + * {"id":"opc_code.plc_code.device_code.item","v":value} + * ] + */ + public void sendMsg(String body) { + try { + synchronized (this) { + if (mqttClient != null && StringUtils.isNotEmpty(body)) { + MqttMessage message = new MqttMessage(); + message.setQos(QOS_2); + message.setRetained(true); + message.setPayload(body.getBytes()); + mqttClient.publish(PUBLISH_TOPIC, message); + } + } + } catch (Exception ex) { + log.error("message publish failed:{}", ex.getMessage()); + } + } + + + @PostConstruct + public void start() { + loadItems(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("----close threadPool-----"); + try { + shutdown(); + unloadItems(); + threadPool.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + } + + /** + * 关闭MQTT客户端连接 + * + * @throws Exception + */ + public void shutdown() throws Exception { + log.info("close mqtt client..."); + mqttClient.disconnect(); + log.info("close mqtt client finish"); + + log.info("close scheduler..."); + scheduler.shutdown(); + log.info("close scheduler finish..."); + } + + /** + * 从redis写入内存 item数据 + */ + private void loadItems() { + log.info("loading..."); + Map entries = redisTemplate.opsForHash().entries(KEY); + entries.forEach((key, value) -> ACCESSOR_VALUE.setValue((String) key, value)); + log.info("load finish, delete key..."); + redisTemplate.delete(KEY); + log.info("delete finish!"); + } + + /** + * 从内存写出到redis item数据 + */ + private void unloadItems() { + log.info("unloading..."); + List allKey = ACCESSOR_VALUE.getAllKey(); + Map map = new HashMap<>(); + allKey.forEach(key -> map.put(key, ACCESSOR_VALUE.getValue(key))); + log.info("unload finish, put key..."); + redisTemplate.opsForHash().putAll(KEY, map); + log.info("put finish!"); + } + +} \ No newline at end of file diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/PublishDemo.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/PublishDemo.java deleted file mode 100644 index 995a0bb1..00000000 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/PublishDemo.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.nl.config.mqtt2; - -import cn.dev33.satoken.annotation.SaIgnore; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.RequestMapping; - -/* - * @author ZZQ - * @Date 2024/1/31 17:12 - */ -//@RestController -public class PublishDemo { - - @Autowired - MQServer server; - - @RequestMapping("/publish") - @SaIgnore - public void send(String topic,String msg){ - MQServer.sendMsg(topic,msg,1,true); - } - - @RequestMapping("/init") - @SaIgnore - public void d222() throws Exception { - server.init(); - } -} diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/callback/PublishCallback.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/callback/PublishCallback.java deleted file mode 100644 index 023db756..00000000 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/callback/PublishCallback.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.nl.config.mqtt2.callback; - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -/* - * @author ZZQ - * @Date 2024/1/31 13:55 - */ -public class PublishCallback implements MqttCallback { - - @Override - public void connectionLost(Throwable throwable) { - //重连,调用服务初始化init - } - - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - //接收到消息:先放队列,根据不同topic处理不同处理逻辑 - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - //消息接收成功后 - boolean complete = iMqttDeliveryToken.isComplete(); - System.out.println("消息处理结果:"+complete); - } -} diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/callback/SubsribeCallback.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/callback/SubsribeCallback.java deleted file mode 100644 index 68316daf..00000000 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/callback/SubsribeCallback.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.nl.config.mqtt2.callback; - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.nl.config.mqtt2.msg.MsgPoolManager; -import org.nl.config.mqtt2.msg.MsgWorker; - -/* - * @author ZZQ - * @Date 2024/1/31 13:55 - */ -public class SubsribeCallback implements MqttCallback { - - @Override - public void connectionLost(Throwable throwable) { - //重连,调用服务初始化init - } - - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - //接收到消息:先放队列,根据不同topic处理不同处理dd逻辑 - System.out.println("接收到消息topic:"+topic+"——"+new String(mqttMessage.getPayload())); - MsgPoolManager.hander(new MsgWorker(topic,new String(mqttMessage.getPayload()))); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - //消息接收如果处理失败允许在这里重试 - } -} diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/config/MqttConfig.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/config/MqttConfig.java index 4ab39b02..c009e251 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/config/MqttConfig.java +++ b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/config/MqttConfig.java @@ -4,7 +4,14 @@ import lombok.Data; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import javax.annotation.PostConstruct; @@ -29,6 +36,8 @@ public class MqttConfig { private String[] topics; + private int[] qoss; + private int timeout; private int keepalive; @@ -36,23 +45,22 @@ public class MqttConfig { private MqttConnectOptions option; @PostConstruct - public void initOption(){ + public void initOption() { MqttConnectOptions options = new MqttConnectOptions(); // 设定清除会话信息,true时,每次连接都会建立新的会话,false时,服务端会保留会话信息 - options.setCleanSession(true); - //options.setCleanSession(true); + options.setCleanSession(false); // 设定重连机制,设定为true时,mqtt的重连机制会启动,当mqtt client掉线之后它会进入重连 options.setAutomaticReconnect(true); - if (!username.equals("a")){ - options.setUserName(username); - } - if (!password.equals("a")){ - options.setPassword(password.toCharArray()); - } + options.setUserName(username); + options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); + // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 //options.setWill("willTopic", WILL_DATA, 2, false); - option=options; + option = options; } + + + } diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/DeviceItemData.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/DeviceItemData.java new file mode 100644 index 00000000..37f73e93 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/DeviceItemData.java @@ -0,0 +1,18 @@ +package org.nl.config.mqtt2.msg; + +import lombok.Data; + +import java.util.List; + +/** + * @Description TODO + * @Author Gengby + * @Date 2024/3/5 + */ +@Data +public class DeviceItemData { + private String id; + private T v; + private boolean q; + private long t; +} \ No newline at end of file diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/MsgPoolManager.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/MsgPoolManager.java deleted file mode 100644 index 5d9ba37c..00000000 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/MsgPoolManager.java +++ /dev/null @@ -1,78 +0,0 @@ -package org.nl.config.mqtt2.msg; - -import org.nl.config.mqtt2.MQServer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import java.util.Queue; -import java.util.concurrent.*; - -/* - * @author ZZQ - * @Date 2024/1/31 15:19 - * 待确认:1.所有消走线程还是先走队列;2.同一个topic消息处理需要保证消费顺序3.消息写时订阅也会订阅毁掉会拿到自己写的信息 - */ -@Component -@ConditionalOnBean(value = MQServer.class) -public class MsgPoolManager { - - @Autowired - MQServer mqServer; - // 线程池维护线程的最少数量 - private final static int CORE_POOL_SIZE = 2; - // 线程池维护线程的最大数量 - private final static int MAX_POOL_SIZE = 20 ; - // 线程池维护线程所允许的空闲时间 - private final static int KEEP_ALIVE_TIME = 10; - // 线程池 所使用的缓存队列大小 - private final static int WORK_QUEUE_SIZE = 1000; - - - private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); - - - static Queue msgQueue = new LinkedBlockingQueue<>(); - - static ThreadPoolExecutor threadPool =null; - - static ScheduledFuture scheduledFuture =null; - - static { - - } - - - public static void hander(MsgWorker r){ - threadPool.execute(r); - } - - @PostConstruct - public void start() { - threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE, - KEEP_ALIVE_TIME, TimeUnit.SECONDS,new ArrayBlockingQueue<>(WORK_QUEUE_SIZE), (r, executor) -> msgQueue.add(r)); - -// scheduledFuture = scheduler.scheduleAtFixedRate(() -> { -// // 判断缓存队列是否存在记录 -// System.out.println("定时任务启动"+msgQueue.size()); -// if(!msgQueue.isEmpty()){ -// if(threadPool.getQueue().size() < WORK_QUEUE_SIZE){ -// Runnable worker = msgQueue.poll(); -// threadPool.execute(worker); -// } -// } -// },0,1,TimeUnit.SECONDS); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("----关闭线程池-----"); - try { - mqServer.shutdown(); - threadPool.shutdown(); -// scheduler.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - })); - } - -} diff --git a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/MsgWorker.java b/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/MsgWorker.java deleted file mode 100644 index d826fad7..00000000 --- a/wcs/nladmin-system/src/main/java/org/nl/config/mqtt2/msg/MsgWorker.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.nl.config.mqtt2.msg; - -/* - * @author ZZQ - * @Date 2024/1/31 15:29 - * 消息处理线程:通过线程池处理:如果需要用到具体topic的handler:可以传入 - */ -public class MsgWorker implements Runnable{ - /** - * topic - */ - public String topic; - /** - * 消息体 - */ - private String body; - - @Override - public void run() { - System.out.println("接收到消息"+topic+"-"+body); - } - - public MsgWorker(String topic, String body) { - this.topic = topic; - this.body = body; - } -} diff --git a/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateAgvOneInst.java b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateAgvOneInst.java index 8ffb2290..5209d728 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateAgvOneInst.java +++ b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateAgvOneInst.java @@ -5,7 +5,6 @@ import org.nl.acs.acsEnum.StatusEnum; import org.nl.acs.instruction.service.InstructionService; import org.nl.acs.task.service.TaskService; import org.nl.acs.task.service.dto.TaskDto; -import org.nl.modules.system.util.CodeUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -32,14 +31,13 @@ public class AutoCreateAgvOneInst { * @throws Exception */ public void run() throws Exception { - List list = taskService.queryAll("task_status = '0' and (agv_system_type = '2' or agv_system_type = '3')"); + List list = taskService.queryAll("task_status = '0' and agv_system_type = '2'"); for (int i = 0; i < list.size(); i++) { TaskDto taskDto = list.get(i); - String link_no = CodeUtil.getNewCode("LINK_NO"); try { - instructionService.create(instructionService.createInstDtoByTask(taskDto, link_no)); + instructionService.autoCreateOneInst(instructionService.createInstDtoByTask(taskDto)); } catch (Exception e) { - log.error("自动创建指令失败:任务号{},失败原因:{}", taskDto.getTask_code(), e.getMessage()); + log.error("自动创建叉车指令失败:任务号{},失败原因:{}", taskDto.getTask_code(), e.getMessage()); taskDto.setRemark(e.getMessage()); taskService.updateByCodeFromCache(taskDto); continue; @@ -47,7 +45,7 @@ public class AutoCreateAgvOneInst { //创建指令后修改任务状态 taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode()); taskService.update(taskDto); - log.info("自动创建指令成功,任务号:{}", taskDto.getTask_code()); + log.info("自动创建叉车指令成功,任务号:{}", taskDto.getTask_code()); } } } diff --git a/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateHrInst.java b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateHrInst.java new file mode 100644 index 00000000..bcc0db43 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateHrInst.java @@ -0,0 +1,60 @@ +package org.nl.modules.quartz.task; + +import lombok.extern.slf4j.Slf4j; +import org.nl.acs.acsEnum.StatusEnum; +import org.nl.acs.instruction.service.InstructionService; +import org.nl.acs.task.service.TaskService; +import org.nl.acs.task.service.dto.TaskDto; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 自动创建海柔AGV指令 + * + * @author GengBy + */ +@Slf4j +@Component +public class AutoCreateHrInst { + + @Autowired + private InstructionService instructionService; + @Autowired + private TaskService taskService; + + /** + * 海柔最大指令数量 + */ + private static final int MAX_INST_NUM = 3; + + /** + * 查询海柔AGV任务并生成对应的指令 + * + * @throws Exception + */ + public void run() throws Exception { + List list = taskService.queryAll("task_status = '0' and agv_system_type = '3'"); + for (int i = 0; i < list.size(); i++) { + TaskDto taskDto = list.get(i); + try { + int maxNum = instructionService.querySameDeviceCodeForHrMaxNum(taskDto.getStart_point_code(), taskDto.getNext_point_code()); + if (maxNum == MAX_INST_NUM) { + log.info("海柔同一点位指令数量大于允许生成的最大指令数量:" + MAX_INST_NUM); + continue; + } + instructionService.autoCreateHrInst(instructionService.createInstDtoByTask(taskDto)); + } catch (Exception e) { + log.error("自动创建海柔指令失败:任务号{},失败原因:{}", taskDto.getTask_code(), e.getMessage()); + taskDto.setRemark(e.getMessage()); + taskService.updateByCodeFromCache(taskDto); + continue; + } + //创建指令后修改任务状态 + taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode()); + taskService.update(taskDto); + log.info("自动创建海柔指令成功,任务号:{}", taskDto.getTask_code()); + } + } +} diff --git a/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateInst.java b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateInst.java index 46fa5662..c3d5e6c5 100644 --- a/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateInst.java +++ b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateInst.java @@ -114,7 +114,7 @@ public class AutoCreateInst { //生产指令关联编号 String link_no = CodeUtil.getNewCode("LINK_NO"); - Instruction instDto = instructionService.createInstDtoByTask(taskDto, link_no); + Instruction instDto = instructionService.createInstDtoByTask(taskDto); try { instructionService.create(instDto); @@ -152,9 +152,8 @@ public class AutoCreateInst { //生产指令关联编号 String link_no = CodeUtil.getNewCode("LINK_NO"); - - Instruction instDto1 = instructionService.createInstDtoByTask(taskDto1, link_no); - Instruction instDto2 = instructionService.createInstDtoByTask(taskDto2, link_no); + Instruction instDto1 = instructionService.createInstDtoByTask(taskDto1); + Instruction instDto2 = instructionService.createInstDtoByTask(taskDto2); try { instructionService.createTwoInst(instDto1, instDto2); diff --git a/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateNuoBaoInst.java b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateNuoBaoInst.java new file mode 100644 index 00000000..7b01e149 --- /dev/null +++ b/wcs/nladmin-system/src/main/java/org/nl/modules/quartz/task/AutoCreateNuoBaoInst.java @@ -0,0 +1,130 @@ +package org.nl.modules.quartz.task; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.nl.acs.acsEnum.StatusEnum; +import org.nl.acs.instruction.service.InstructionService; +import org.nl.acs.instruction.service.dto.Instruction; +import org.nl.acs.opc.DeviceAppService; +import org.nl.acs.task.service.TaskService; +import org.nl.acs.task.service.dto.TaskDto; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * 自动创建诺宝AGV指令 + * + * @author GengBy + */ +@Slf4j +@Component +public class AutoCreateNuoBaoInst { + + @Autowired + private TaskService taskService; + @Autowired + private InstructionService instructionService; + @Autowired + private DeviceAppService deviceAppService; + + + /** + * 单任务个数 + */ + public static final Integer TASK_SIZE_1 = 1; + /** + * 双任务个数 + */ + public static final Integer TASK_SIZE_2 = 2; + + /** + * 自动创建诺宝AGV指令 + */ + public void run() throws Exception { + + //查询所有诺宝AGV任务 + List list = taskService.queryAll("task_status = '0' and agv_system_type = '1' "); + + if (ObjectUtil.isEmpty(list)) { + return; + } + //根据LINK_NUM分组任务 + Map> link_num_tasks = Optional + .ofNullable(list) + .orElse(new ArrayList<>()) + .stream() + .collect(Collectors.groupingBy(TaskDto::getLink_num)); + //遍历分组任务 如果任务数为2则是下发双任务,如果任务数为1 则下发单任务 + for (Map.Entry> task_values : link_num_tasks.entrySet()) { + List tasks = + Optional + .ofNullable(task_values.getValue()) + .orElse(new ArrayList<>()) + .stream() + .sorted((t1, t2) -> t1.getIs_send().compareTo(t2.getIs_send())) + .collect(Collectors.toList()); + + + //如果分组后任务数量为1 下发单任务,分组后任务数量为2 下发双任务 + if (tasks.size() == TASK_SIZE_1) { + TaskDto taskDto = tasks.get(0); + //判断任务是否需要立即下发AGV 不需要则是双任务,等待系统下发第二条任务后,走下发双任务的逻辑 + if (StrUtil.equals(taskDto.getIs_send(), StatusEnum.NO_SEND.getCode())) { + continue; + } + + Instruction instDto = instructionService.createInstDtoByTask(taskDto); + + try { + instructionService.autoCreateTwoInst(instDto, null); + } catch (Exception e) { + taskDto.setRemark(e.getMessage()); + taskService.updateByCodeFromCache(taskDto); + continue; + } + + //创建指令后修改任务状态 + taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode()); + taskService.update(taskDto); + + } else if (tasks.size() == TASK_SIZE_2) { + + TaskDto taskDto1 = tasks.get(0); + TaskDto taskDto2 = tasks.get(1); + + //关联编号一致 任务类型不一致 不生成任务 + if (!taskDto1.getTask_type().equals(taskDto2.getTask_type())) { + continue; + } + + Instruction instDto1 = instructionService.createInstDtoByTask(taskDto1); + Instruction instDto2 = instructionService.createInstDtoByTask(taskDto2); + + try { + instructionService.autoCreateTwoInst(instDto1, instDto2); + } catch (Exception e) { + taskDto1.setRemark(e.getMessage()); + taskService.updateByCodeFromCache(taskDto1); + taskDto2.setRemark(e.getMessage()); + taskService.updateByCodeFromCache(taskDto2); + continue; + } + + //创建指令后修改任务状态 + taskDto1.setTask_status(StatusEnum.TASK_RUNNING.getCode()); + taskService.update(taskDto1); + taskDto2.setTask_status(StatusEnum.TASK_RUNNING.getCode()); + taskService.update(taskDto2); + + } + } + } + +}