rev:mqtt

This commit is contained in:
2024-03-19 15:46:02 +08:00
parent d7917a9d38
commit 6489ad259c
29 changed files with 1052 additions and 331 deletions

View File

@@ -32,6 +32,11 @@ public class OpcDto implements Serializable {
*/
private String opc_host;
/**
* MQTT订阅主题 需要与OPCServer保持一致
*/
private String topic;
/**
* 用户名
*/

View File

@@ -4,6 +4,7 @@ package org.nl.acs.device.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
@@ -85,6 +86,9 @@ public class OpcServiceImpl implements OpcService {
public List<OpcDto> queryAll(Map whereJson) {
WQLObject wo = WQLObject.getWQLObject("acs_opc");
JSONArray arr = wo.query().getResultJSONArray(0);
if (ObjectUtil.isEmpty(arr)) {
return new ArrayList<>();
}
List<OpcDto> list = arr.toJavaList(OpcDto.class);
return list;
}
@@ -119,7 +123,7 @@ public class OpcServiceImpl implements OpcService {
dto.setCreate_time(now);
WQLObject wo = WQLObject.getWQLObject("acs_opc");
JSONObject json = (JSONObject) JSONObject.toJSON(dto);
JSONObject json = (JSONObject) JSONObject.toJSON(dto);
wo.insert(json);
}
@@ -136,7 +140,7 @@ public class OpcServiceImpl implements OpcService {
dto.setUpdate_by(currentUsername);
WQLObject wo = WQLObject.getWQLObject("acs_opc");
JSONObject json = (JSONObject) JSONObject.toJSON(dto);
JSONObject json = (JSONObject) JSONObject.toJSON(dto);
wo.update(json);
}

View File

@@ -5,6 +5,9 @@ import org.nl.acs.opc.Device;
import java.util.List;
public interface DeviceDriver {
default String getDeviceCode() {
return this.getDevice().getDevice_code();
}

View File

@@ -77,7 +77,7 @@ public class DeviceExecuteAutoRun extends AbstractAutoRunnable {
for (int i = 0; !OpcStartTag.is_run; ++i) {
log.info("设备执行线程等待opc同步线程...");
Thread.sleep(1000L);
if (i > 60) {
if (i > 20) {
log.info("设备执行线程放弃等待opc同步线程...");
break;
}

View File

@@ -79,7 +79,7 @@ public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerC
if (OpcConfig.opc_item_read_using_callback) {
this.runNew();
} else {
this.runOld();
//this.runOld();
}
}

View File

@@ -590,42 +590,43 @@ public class TaskServiceImpl implements TaskService, ApplicationAutoInitial {
@Override
public void createInst(String ids) throws Exception {
TaskDto acsTask = this.findById(ids);
if (acsTask == null) throw new BadRequestException("被删除或无权限,操作失败!");
List<TaskDto> tasksByLinkNum = this.findByLinkNumFromCache(acsTask.getLink_num());
String link_no = CodeUtil.getNewCode("LINK_NO");
if (tasksByLinkNum.size() == 1) {
TaskDto taskDto = tasksByLinkNum.get(0);
Instruction instruction = instructionService.findByTaskCodeFromCache(taskDto.getTask_code());
if (instruction != null) {
throw new BadRequestException("单任务有指令未完成!指令号:" + instruction.getInstruction_code());
}
Instruction dto = instructionService.createInstDtoByTask(taskDto, link_no);
instructionService.create(dto);
taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode());
this.update(taskDto);
}
if (tasksByLinkNum.size() == 2) {
TaskDto taskDto1 = tasksByLinkNum.get(0);
Instruction inst1 = instructionService.findByTaskCodeFromCache(taskDto1.getTask_code());
if (inst1 != null) {
throw new BadRequestException("双任务存有指令未完成!指令号:" + inst1.getInstruction_code());
}
TaskDto taskDto2 = tasksByLinkNum.get(1);
Instruction inst2 = instructionService.findByTaskCodeFromCache(taskDto2.getTask_code());
if (inst2 != null) {
throw new BadRequestException("双任务有指令未完成!指令号:" + inst2.getInstruction_code());
}
Instruction dto1 = instructionService.createInstDtoByTask(taskDto1, link_no);
Instruction dto2 = instructionService.createInstDtoByTask(taskDto2, link_no);
instructionService.createTwoInst(dto1, dto2);
taskDto1.setTask_status(StatusEnum.TASK_RUNNING.getCode());
taskDto1.setUpdate_time(DateUtil.now());
this.update(taskDto1);
taskDto2.setTask_status(StatusEnum.TASK_RUNNING.getCode());
taskDto2.setUpdate_time(DateUtil.now());
this.update(taskDto2);
}
throw new BadRequestException("创建指令失败!");
// TaskDto acsTask = this.findById(ids);
// if (acsTask == null) throw new BadRequestException("被删除或无权限,操作失败!");
// List<TaskDto> tasksByLinkNum = this.findByLinkNumFromCache(acsTask.getLink_num());
// String link_no = CodeUtil.getNewCode("LINK_NO");
// if (tasksByLinkNum.size() == 1) {
// TaskDto taskDto = tasksByLinkNum.get(0);
// Instruction instruction = instructionService.findByTaskCodeFromCache(taskDto.getTask_code());
// if (instruction != null) {
// throw new BadRequestException("单任务有指令未完成!指令号:" + instruction.getInstruction_code());
// }
// Instruction dto = instructionService.createInstDtoByTask(taskDto, link_no);
// instructionService.create(dto);
// taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode());
// this.update(taskDto);
// }
// if (tasksByLinkNum.size() == 2) {
// TaskDto taskDto1 = tasksByLinkNum.get(0);
// Instruction inst1 = instructionService.findByTaskCodeFromCache(taskDto1.getTask_code());
// if (inst1 != null) {
// throw new BadRequestException("双任务存有指令未完成!指令号:" + inst1.getInstruction_code());
// }
// TaskDto taskDto2 = tasksByLinkNum.get(1);
// Instruction inst2 = instructionService.findByTaskCodeFromCache(taskDto2.getTask_code());
// if (inst2 != null) {
// throw new BadRequestException("双任务有指令未完成!指令号:" + inst2.getInstruction_code());
// }
// Instruction dto1 = instructionService.createInstDtoByTask(taskDto1, link_no);
// Instruction dto2 = instructionService.createInstDtoByTask(taskDto2, link_no);
// instructionService.createTwoInst(dto1, dto2);
// taskDto1.setTask_status(StatusEnum.TASK_RUNNING.getCode());
// taskDto1.setUpdate_time(DateUtil.now());
// this.update(taskDto1);
// taskDto2.setTask_status(StatusEnum.TASK_RUNNING.getCode());
// taskDto2.setUpdate_time(DateUtil.now());
// this.update(taskDto2);
// }
}
@Override

View File

@@ -0,0 +1,17 @@
package org.nl.acs.udw.mqttUdw;
import org.nl.acs.udw.UnifiedData;
import org.nl.acs.udw.mqttUdw.service.ItemData;
import java.util.List;
/**
* @author onepiece
*/
public interface ItemsDataAccessor {
List<String> getAllKey();
Object getValue(String key);
void setValue(String key, Object value);
}

View File

@@ -0,0 +1,37 @@
package org.nl.acs.udw.mqttUdw;
import cn.hutool.core.util.ObjectUtil;
import org.nl.acs.udw.UnifiedData;
import org.nl.acs.udw.mqttUdw.service.ItemData;
import org.nl.acs.udw.mqttUdw.service.ItemUnit;
import org.nl.acs.udw.service.impl.UnifiedDataUnit;
import java.util.List;
/**
* @author onepiece
*/
public interface ItemsProcessService {
/**
* 获取所有的key
*
* @return
*/
List<String> getAllUnifiedKey();
/**
* 根据key获取数据单元
*
* @param key
* @return
*/
ItemUnit getItemUnit(String key);
ItemData getItemData(String var1, String var2);
Object getValue(String var1, String var2);
void setValue(String var1, String var2, Object var3);
List<String> getAllDataKey(String var1);
}

View File

@@ -0,0 +1,18 @@
package org.nl.acs.udw.mqttUdw.factory;
import org.nl.acs.udw.mqttUdw.ItemsDataAccessor;
import org.nl.acs.udw.mqttUdw.service.ItemsDataAccessorImpl;
/**
* @author onepiece
*/
public class ItemDataAccessorFactory {
private static final ItemsDataAccessorImpl itemsDataAccessor = new ItemsDataAccessorImpl();
public static ItemsDataAccessor getItemsDataAccessor(String unified_key) {
itemsDataAccessor.setUnifiedKey(unified_key);
itemsDataAccessor.setItemsProcess(ItemsProcessServiceFactory.getItemsUnifyProcess());
return itemsDataAccessor;
}
}

View File

@@ -0,0 +1,17 @@
package org.nl.acs.udw.mqttUdw.factory;
import org.nl.acs.udw.mqttUdw.ItemsProcessService;
import org.nl.acs.udw.mqttUdw.service.ItemsProcessImpl;
/**
* @author onepiece
*/
public class ItemsProcessServiceFactory {
private static final ItemsProcessService itemsProcess = new ItemsProcessImpl();
public static ItemsProcessService getItemsUnifyProcess() {
return itemsProcess;
}
}

View File

@@ -0,0 +1,28 @@
package org.nl.acs.udw.mqttUdw.service;
import lombok.Data;
import java.util.Date;
/**
* @author onepiece
*/
@Data
public class ItemData {
private Object value;
private Date last_modify_date;
public ItemData() {
this.last_modify_date = new Date();
}
public ItemData(Object value) {
this.value = value;
this.last_modify_date = new Date();
}
public void changeValue(Object value) {
this.value = value;
this.last_modify_date = new Date();
}
}

View File

@@ -0,0 +1,25 @@
package org.nl.acs.udw.mqttUdw.service;
import lombok.Data;
import org.nl.acs.udw.UnifiedData;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author onepiece
*/
@Data
public class ItemUnit {
private String unifiedKey;
private Map<String, ItemData> storage = new ConcurrentHashMap<>();
private Map<String, List<ItemData>> history = new ConcurrentHashMap<>();
public ItemUnit(String unifiedKey) {
this.unifiedKey = unifiedKey;
}
}

View File

@@ -0,0 +1,39 @@
package org.nl.acs.udw.mqttUdw.service;
import org.nl.acs.udw.mqttUdw.ItemsDataAccessor;
import org.nl.acs.udw.mqttUdw.ItemsProcessService;
import java.util.List;
/**
* @author onepiece
*/
public class ItemsDataAccessorImpl implements ItemsDataAccessor {
private String unified_key;
private ItemsProcessService itemsProcess;
public void setUnifiedKey(String unified_key) {
this.unified_key = unified_key;
}
public void setItemsProcess(ItemsProcessService itemsProcess) {
this.itemsProcess = itemsProcess;
}
@Override
public List<String> getAllKey() {
return this.itemsProcess.getAllDataKey(this.unified_key);
}
@Override
public Object getValue(String key) {
return this.itemsProcess.getValue(this.unified_key, key);
}
@Override
public void setValue(String key, Object value) {
this.itemsProcess.setValue(this.unified_key, key, value);
}
}

View File

@@ -0,0 +1,125 @@
package org.nl.acs.udw.mqttUdw.service;
import lombok.extern.slf4j.Slf4j;
import org.nl.acs.udw.UdwConfig;
import org.nl.acs.udw.UnifiedDataAppService;
import org.nl.acs.udw.mqttUdw.ItemsProcessService;
import org.nl.modules.common.exception.BadRequestException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author onepiece
*/
@Slf4j
public class ItemsProcessImpl implements ItemsProcessService {
private Map<String, ItemUnit> factory = new ConcurrentHashMap<>();
public ItemsProcessImpl() {
}
@Override
public List<String> getAllUnifiedKey() {
return new ArrayList(this.factory.keySet());
}
@Override
public ItemUnit getItemUnit(String unified_key) {
ItemUnit itemUnit = this.factory.get(unified_key);
return itemUnit == null ? null : itemUnit;
}
@Override
public List<String> getAllDataKey(String unified_key) {
ItemUnit itemUnit = (ItemUnit) this.factory.get(unified_key);
if (itemUnit == null) {
return new ArrayList();
} else {
Map<String, ItemData> storage = itemUnit.getStorage();
return new ArrayList(storage.keySet());
}
}
@Override
public ItemData getItemData(String unified_key, String key) {
ItemUnit itemUnit = this.getItemUnit(unified_key);
if (itemUnit == null) {
return null;
} else {
Map<String, ItemData> storage = itemUnit.getStorage();
return (ItemData) storage.get(key);
}
}
@Override
public Object getValue(String unified_key, String key) {
ItemData itemData = this.getItemData(unified_key, key);
return itemData == null ? null : itemData.getValue();
}
@Override
public void setValue(String unified_key, String key, Object value) {
this.setValue(unified_key, key, value, false, true);
}
public synchronized void setValue(String unified_key, String key, Object value, boolean save, boolean is_log) {
if (unified_key == null) {
throw new BadRequestException("");
//throw new BusinessException(SystemMessage.cant_be_empty, new Object[]{"unified_key"});
} else if (key == null) {
throw new BadRequestException("");
//throw new BusinessException(SystemMessage.cant_be_empty, new Object[]{"key"});
} else {
if (!this.factory.containsKey(unified_key)) {
this.factory.put(unified_key, new ItemUnit(unified_key));
}
ItemUnit itemUnit = (ItemUnit) this.factory.get(unified_key);
Map<String, ItemData> storage = itemUnit.getStorage();
if (!storage.containsKey(key)) {
storage.put(key, new ItemData());
}
ItemData itemData = (ItemData) storage.get(key);
if (!UnifiedDataAppService.isEquals(itemData.getValue(), value)) {
Map<String, List<ItemData>> history = itemUnit.getHistory();
List<ItemData> historyItemDatas = (List) history.get(key);
if (historyItemDatas == null) {
history.put(key, new ArrayList());
}
ItemData historyData = new ItemData();
historyData.setLast_modify_date(itemData.getLast_modify_date());
historyData.setValue(itemData.getValue());
while (((List) history.get(key)).size() > UdwConfig.max_history_length) {
((List) history.get(key)).remove(UdwConfig.max_history_length);
}
((List) history.get(key)).add(0, historyData);
Object oldvalue = itemData.getValue();
itemData.changeValue(value);
if (save) {
/*PersistenceService persistenceService = PersistenceServiceFactory.getPersistenceService();
persistenceService.saveData(unified_key, key, StringUtl.getString(value));
if (is_log) {
this.businessLogger.setResource(unified_key, unified_key);
this.businessLogger.setMaterial(key, key);
this.businessLogger.setContainer(StringUtl.getString(value));
this.businessLogger.log("统一数据源中: unit: {}, key: {}, 值: {} 更改为 {}。", new Object[]{unified_key, key, oldvalue, value});
}*/
}
if (is_log && key != null && !key.endsWith("heartbeat") && !key.endsWith("distancex") && !key.endsWith("distancey") && !key.endsWith("Xwz") && !key.endsWith("Ywz") && !key.endsWith("Zwz")) {
log.trace("统一数据源中: unit: {}, key: {}, 值: {} 更改为 {}。", new Object[]{unified_key, key, oldvalue, value});
}
}
}
}
}

View File

@@ -0,0 +1,48 @@
package org.nl.config.mqtt2;
import org.nl.acs.device_driver.DeviceDriver;
import org.nl.acs.device_driver.driver.AbstractDeviceDriver;
import org.nl.acs.opc.Device;
import org.nl.acs.opc.DeviceAppService;
import org.nl.config.mqtt2.msg.DeviceItemData;
import org.nl.modules.wql.util.SpringContextHolder;
import java.util.List;
import java.util.Optional;
/**
* @Description TODO
* @Author Gengby
* @Date 2024/3/18
*/
public class ItemUtil {
/**
* 心跳item名称
*/
private static final String HEARTBEAT = "heartbeat";
/**
* 获取指定符合索引
*
* @param text itemId
* @param character 指定符合
* @param n 第几个富豪
* @return 索引
*/
public static int nthIndexOf(String text, String character, int n) {
int position = -1;
do {
position = text.indexOf(character, position + 1);
} while (n-- > 1 && position != -1);
return position;
}
public static boolean hasHeartbeat(String itemId) {
return itemId.contains(HEARTBEAT);
}
public static void setIsOnline(DeviceAppService deviceAppService, String deviceCode, List<DeviceItemData> items) {
}
}

View File

@@ -1,78 +0,0 @@
package org.nl.config.mqtt2;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.nl.config.mqtt2.callback.PublishCallback;
import org.nl.config.mqtt2.callback.SubsribeCallback;
import org.nl.config.mqtt2.config.MqttConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.UUID;
/*
* @author ZZQ
* @Date 2024/1/31 14:07
*
*/
@Service
@ConditionalOnProperty(name = "mqtt.active", havingValue = "true")
public class MQServer {
@Autowired
private MqttConfig mqttConfig;
public static MqttClient subsribeClient;
public static MqttClient publishClient;
@PostConstruct
public void init() throws Exception {
this.initSubsribe();
this.initPublish();
}
public static void sendMsg(String topic,String body,int qos,Boolean retained){
try {
if (publishClient!=null && StringUtils.isNotEmpty(body)){
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(body.getBytes());
publishClient.publish(topic,message);
}
}catch (Exception ex){
ex.printStackTrace();
}
}
public void shutdown() throws Exception {
subsribeClient.disconnect();
publishClient.disconnect();
}
private void initSubsribe() throws Exception {
if (subsribeClient!=null){
System.out.println("重新连接");
subsribeClient.disconnect();
}
subsribeClient = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence());
subsribeClient.connect(mqttConfig.getOption());
subsribeClient.setCallback(new SubsribeCallback());
subsribeClient.subscribe(mqttConfig.getTopics());
}
private void initPublish() throws Exception {
if (publishClient!=null){
System.out.println("重新连接");
publishClient.disconnect();
}
publishClient = new MqttClient(mqttConfig.getUrl(), UUID.randomUUID().toString(), new MemoryPersistence());
publishClient.connect(mqttConfig.getOption());
publishClient.setCallback(new PublishCallback());
}
}

View File

@@ -0,0 +1,412 @@
package org.nl.config.mqtt2;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.nl.acs.auto.initial.ApplicationAutoInitial;
import org.nl.acs.device.service.OpcService;
import org.nl.acs.device.service.dto.OpcDto;
import org.nl.acs.device_driver.driver.AbstractDeviceDriver;
import org.nl.acs.opc.Device;
import org.nl.acs.opc.DeviceAppService;
import org.nl.acs.opc.OpcConfig;
import org.nl.acs.opc.OpcServerService;
import org.nl.acs.udw.mqttUdw.ItemsDataAccessor;
import org.nl.acs.udw.mqttUdw.factory.ItemDataAccessorFactory;
import org.nl.config.mqtt2.config.MqttConfig;
import org.nl.config.mqtt2.msg.DeviceItemData;
import org.nl.modules.lucene.service.LuceneExecuteLogService;
import org.nl.modules.lucene.service.dto.LuceneLogDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @Description TODO
* @Author Gengby
* @Date 2024/3/15
*/
@Component
@ConditionalOnProperty(name = "mqtt.active", havingValue = "true")
@ConditionalOnBean({LuceneExecuteLogService.class, OpcServerService.class, DeviceAppService.class})
@Slf4j
public class MqttServer implements MqttCallback, ApplicationAutoInitial {
@Autowired
private MqttConfig mqttConfig;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private LuceneExecuteLogService logService;
@Autowired
private OpcService opcService;
@Autowired
private DeviceAppService deviceAppService;
/**
* 根据topic存储订阅消息
*/
private final Map<String, LinkedBlockingQueue<String>> topicMap = new ConcurrentHashMap<>();
/**
* 存储设备点位信息
*/
private static final ItemsDataAccessor ACCESSOR_VALUE = ItemDataAccessorFactory.getItemsDataAccessor(OpcConfig.udw_opc_value_key);
/**
* 发布TOPIC
*/
private static final String PUBLISH_TOPIC = "publishTopic";
/**
* 重连开启一个新的线程
*/
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
/**
* 重连延迟时间,单位:毫秒
*/
private static final long RECONNECT_DELAY = 5000L;
/**
* 重连标记
*/
private volatile boolean reconnectingFlag = false;
/**
* 存储Redis数据Key
*/
private static final String KEY = "opc:items";
/**
* 心跳item名称
*/
private static final String HEARTBEAT = "heartbeat";
/**
* MQTT客户端
*/
private MqttClient mqttClient;
/**
* MQTT QOS级别 这里默认的使用的是QOS_2
* 0 最多交付一次 可能导致消息丢失
* 1 最少交付一次 可能导致消息重复
* 2 只交付一次 保证消息不会丢失和重复
*/
private static final int QOS_0 = 0;
private static final int QOS_1 = 1;
private static final int QOS_2 = 2;
/**
* 线程池核心线程数量
*/
private static final int CORE_POOL_SIZE = 1;
/**
* 线程池最大线程数量
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 非核心线程存活时间
*/
private static final int KEEP_ALIVE_TIME = 10;
/**
* 用来存储等待执行任务的阻塞队列的大小
*/
private static final int WORK_QUEUE_SIZE = 100;
/**
* 队列
*/
private static final Queue<Runnable> MSG_QUEUE = new LinkedBlockingQueue<>();
/**
* 线程工厂
*/
private final ThreadFactory threadFactory = new NamedThreadFactory("MqttThreadFactory");
/**
* 线程池
*/
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(WORK_QUEUE_SIZE),
threadFactory,
(r, executor) -> MSG_QUEUE.add(r)
);
@Override
public void autoInitial() throws Exception {
this.init();
}
private void init() throws Exception {
this.initTopicMap();
this.initMqttClient();
this.run();
}
/**
* 初始化topicMap
* <p>
* 从数据库中查询所有的topic
* 并为每个topic初始化一个队列分别存储其订阅的消息
*/
private void initTopicMap() {
log.info("init topicMap...");
List<OpcDto> opcDtos = opcService.queryAll(null);
//根据配置OPC信息动态调整核心线程池个数
threadPool.setCorePoolSize(opcDtos.size() + 1);
threadPool.setMaximumPoolSize(opcDtos.size() + 10);
opcDtos.forEach(opcDto -> topicMap.put(opcDto.getTopic(), new LinkedBlockingQueue<>()));
log.info("init topicMap finish");
}
/**
* 初始化MQTT客户端
*
* @throws Exception
*/
private void initMqttClient() throws Exception {
log.info("init mqtt client...");
if (mqttClient != null) {
mqttClient.disconnect();
}
mqttClient = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence());
mqttClient.connect(mqttConfig.getOption());
mqttClient.setCallback(this);
mqttClient.subscribe(mqttConfig.getTopics(), mqttConfig.getQoss());
log.info("init mqtt client finish");
}
private void run() {
topicMap.forEach((topic, values) -> threadPool.execute(() -> {
try {
log.info("create topic thread:{}", topic);
while (true) {
String message = values.take();
execute(topic, message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("topic:{} , msg handle failed: {}", topic, e.getMessage());
}
}));
}
private void execute(String topic, String body) {
JSONObject msg = JSONObject.parseObject(body);
JSONArray msgValues = msg.getJSONArray("values");
List<DeviceItemData> itemDataList = msgValues.toJavaList(DeviceItemData.class);
Map<String, List<DeviceItemData>> map = itemDataList.stream().collect(Collectors.groupingBy(deviceData -> {
int thirdDotIndex = ItemUtil.nthIndexOf(deviceData.getId(), ".", 3);
return deviceData.getId().substring(0, thirdDotIndex);
}));
map.forEach((itemId, items) -> {
int startIndex = ItemUtil.nthIndexOf(itemId, ".", 2) + 1;
items.stream().anyMatch(item -> ItemUtil.hasHeartbeat(item.getId()));
String deviceCode = itemId.substring(startIndex);
Optional<Boolean> vOptional = items.stream()
.filter(item -> item.getId().contains(HEARTBEAT))
.map(DeviceItemData::isQ)
.findFirst();
vOptional.ifPresent(q -> {
Device device = deviceAppService.findDeviceByCode(deviceCode);
if (device != null && device.getDeviceDriver() instanceof AbstractDeviceDriver) {
AbstractDeviceDriver deviceDriver = (AbstractDeviceDriver) device.getDeviceDriver();
if (!Objects.equals(deviceDriver.getOnline(), q)) {
deviceDriver.setOnline(q);
log.info("device:{},online status:{}", deviceCode, q);
}
}
});
setValue(deviceCode, items);
});
System.out.println("线程名称:'" + Thread.currentThread() + "',接收到消息" + topic + "-" + body + "-");
}
/**
* 更改内存中的值
*
* @param deviceCode
* @param value
*/
private void setValue(String deviceCode, List<DeviceItemData> value) {
value.forEach(deviceData -> {
logService.deviceExecuteLog(new LuceneLogDto(deviceCode, "MQTT上报,信号:" + deviceData.getId() + ",由" + ACCESSOR_VALUE.getValue(deviceData.getId()) + "->" + deviceData.getV() + ",信号健康值:" + deviceData.isQ()));
ACCESSOR_VALUE.setValue(deviceData.getId(), deviceData.getV());
});
}
/**
* 断开连接执行逻辑
*
* @param cause
*/
@Override
public void connectionLost(Throwable cause) {
log.info("mqtt client break link");
if (!reconnectingFlag) {
reconnectingFlag = true;
scheduler.execute(() -> {
while (!mqttClient.isConnected()) {
try {
if (!mqttClient.isConnected()) {
mqttClient.reconnect();
log.info("mqtt client again success");
}
} catch (MqttException e) {
log.error("mqtt client again failed: {}", e.getMessage());
try {
Thread.sleep(RECONNECT_DELAY);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
reconnectingFlag = false;
});
}
}
/**
* 订阅topic
*
* @param topic 主题
* @param message 内容
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//接收到消息先放队列根据不同topic处理不同处理dd逻辑
if (topicMap.containsKey(topic)) {
topicMap.get(topic).add(new String(message.getPayload()));
System.out.println("接收到消息---" + topic + "内容:" + new String(message.getPayload()));
} else {
log.info("topic---{} does not exist", topic);
}
}
/**
* 发布结果处理
*
* @param token
*/
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
boolean complete = token.isComplete();
MqttMessage message = token.getMessage();
String[] topics = token.getTopics();
log.info("message publish result ,topic:{},message:{},complete:{}", Arrays.toString(topics), new String(message.getPayload()), complete);
}
/**
* 消息发布
*
* @param body MQTT下发数据的格式是
* [
* {"id":"opc_code.plc_code.device_code.item","v":value},
* {"id":"opc_code.plc_code.device_code.item","v":value}
* ]
*/
public void sendMsg(String body) {
try {
synchronized (this) {
if (mqttClient != null && StringUtils.isNotEmpty(body)) {
MqttMessage message = new MqttMessage();
message.setQos(QOS_2);
message.setRetained(true);
message.setPayload(body.getBytes());
mqttClient.publish(PUBLISH_TOPIC, message);
}
}
} catch (Exception ex) {
log.error("message publish failed:{}", ex.getMessage());
}
}
@PostConstruct
public void start() {
loadItems();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("----close threadPool-----");
try {
shutdown();
unloadItems();
threadPool.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}));
}
/**
* 关闭MQTT客户端连接
*
* @throws Exception
*/
public void shutdown() throws Exception {
log.info("close mqtt client...");
mqttClient.disconnect();
log.info("close mqtt client finish");
log.info("close scheduler...");
scheduler.shutdown();
log.info("close scheduler finish...");
}
/**
* 从redis写入内存 item数据
*/
private void loadItems() {
log.info("loading...");
Map<Object, Object> entries = redisTemplate.opsForHash().entries(KEY);
entries.forEach((key, value) -> ACCESSOR_VALUE.setValue((String) key, value));
log.info("load finish, delete key...");
redisTemplate.delete(KEY);
log.info("delete finish!");
}
/**
* 从内存写出到redis item数据
*/
private void unloadItems() {
log.info("unloading...");
List<String> allKey = ACCESSOR_VALUE.getAllKey();
Map<String, Object> map = new HashMap<>();
allKey.forEach(key -> map.put(key, ACCESSOR_VALUE.getValue(key)));
log.info("unload finish, put key...");
redisTemplate.opsForHash().putAll(KEY, map);
log.info("put finish!");
}
}

View File

@@ -1,28 +0,0 @@
package org.nl.config.mqtt2;
import cn.dev33.satoken.annotation.SaIgnore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
/*
* @author ZZQ
* @Date 2024/1/31 17:12
*/
//@RestController
public class PublishDemo {
@Autowired
MQServer server;
@RequestMapping("/publish")
@SaIgnore
public void send(String topic,String msg){
MQServer.sendMsg(topic,msg,1,true);
}
@RequestMapping("/init")
@SaIgnore
public void d222() throws Exception {
server.init();
}
}

View File

@@ -1,29 +0,0 @@
package org.nl.config.mqtt2.callback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/*
* @author ZZQ
* @Date 2024/1/31 13:55
*/
public class PublishCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
//重连调用服务初始化init
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//接收到消息先放队列根据不同topic处理不同处理逻辑
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//消息接收成功后
boolean complete = iMqttDeliveryToken.isComplete();
System.out.println("消息处理结果:"+complete);
}
}

View File

@@ -1,31 +0,0 @@
package org.nl.config.mqtt2.callback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.nl.config.mqtt2.msg.MsgPoolManager;
import org.nl.config.mqtt2.msg.MsgWorker;
/*
* @author ZZQ
* @Date 2024/1/31 13:55
*/
public class SubsribeCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
//重连调用服务初始化init
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//接收到消息先放队列根据不同topic处理不同处理dd逻辑
System.out.println("接收到消息topic:"+topic+"——"+new String(mqttMessage.getPayload()));
MsgPoolManager.hander(new MsgWorker(topic,new String(mqttMessage.getPayload())));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//消息接收如果处理失败允许在这里重试
}
}

View File

@@ -4,7 +4,14 @@ import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import javax.annotation.PostConstruct;
@@ -29,6 +36,8 @@ public class MqttConfig {
private String[] topics;
private int[] qoss;
private int timeout;
private int keepalive;
@@ -36,23 +45,22 @@ public class MqttConfig {
private MqttConnectOptions option;
@PostConstruct
public void initOption(){
public void initOption() {
MqttConnectOptions options = new MqttConnectOptions();
// 设定清除会话信息true时每次连接都会建立新的会话false时服务端会保留会话信息
options.setCleanSession(true);
//options.setCleanSession(true);
options.setCleanSession(false);
// 设定重连机制设定为true时,mqtt的重连机制会启动,当mqtt client掉线之后它会进入重连
options.setAutomaticReconnect(true);
if (!username.equals("a")){
options.setUserName(username);
}
if (!password.equals("a")){
options.setPassword(password.toCharArray());
}
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//options.setWill("willTopic", WILL_DATA, 2, false);
option=options;
option = options;
}
}

View File

@@ -0,0 +1,18 @@
package org.nl.config.mqtt2.msg;
import lombok.Data;
import java.util.List;
/**
* @Description TODO
* @Author Gengby
* @Date 2024/3/5
*/
@Data
public class DeviceItemData<T> {
private String id;
private T v;
private boolean q;
private long t;
}

View File

@@ -1,78 +0,0 @@
package org.nl.config.mqtt2.msg;
import org.nl.config.mqtt2.MQServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Queue;
import java.util.concurrent.*;
/*
* @author ZZQ
* @Date 2024/1/31 15:19
* 待确认1.所有消走线程还是先走队列2.同一个topic消息处理需要保证消费顺序3.消息写时订阅也会订阅毁掉会拿到自己写的信息
*/
@Component
@ConditionalOnBean(value = MQServer.class)
public class MsgPoolManager {
@Autowired
MQServer mqServer;
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 2;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 20 ;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 10;
// 线程池 所使用的缓存队列大小
private final static int WORK_QUEUE_SIZE = 1000;
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
static Queue<Runnable> msgQueue = new LinkedBlockingQueue<>();
static ThreadPoolExecutor threadPool =null;
static ScheduledFuture scheduledFuture =null;
static {
}
public static void hander(MsgWorker r){
threadPool.execute(r);
}
@PostConstruct
public void start() {
threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,new ArrayBlockingQueue<>(WORK_QUEUE_SIZE), (r, executor) -> msgQueue.add(r));
// scheduledFuture = scheduler.scheduleAtFixedRate(() -> {
// // 判断缓存队列是否存在记录
// System.out.println("定时任务启动"+msgQueue.size());
// if(!msgQueue.isEmpty()){
// if(threadPool.getQueue().size() < WORK_QUEUE_SIZE){
// Runnable worker = msgQueue.poll();
// threadPool.execute(worker);
// }
// }
// },0,1,TimeUnit.SECONDS);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("----关闭线程池-----");
try {
mqServer.shutdown();
threadPool.shutdown();
// scheduler.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}));
}
}

View File

@@ -1,27 +0,0 @@
package org.nl.config.mqtt2.msg;
/*
* @author ZZQ
* @Date 2024/1/31 15:29
* 消息处理线程通过线程池处理如果需要用到具体topic的handler可以传入
*/
public class MsgWorker implements Runnable{
/**
* topic
*/
public String topic;
/**
* 消息体
*/
private String body;
@Override
public void run() {
System.out.println("接收到消息"+topic+"-"+body);
}
public MsgWorker(String topic, String body) {
this.topic = topic;
this.body = body;
}
}

View File

@@ -5,7 +5,6 @@ import org.nl.acs.acsEnum.StatusEnum;
import org.nl.acs.instruction.service.InstructionService;
import org.nl.acs.task.service.TaskService;
import org.nl.acs.task.service.dto.TaskDto;
import org.nl.modules.system.util.CodeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -32,14 +31,13 @@ public class AutoCreateAgvOneInst {
* @throws Exception
*/
public void run() throws Exception {
List<TaskDto> list = taskService.queryAll("task_status = '0' and (agv_system_type = '2' or agv_system_type = '3')");
List<TaskDto> list = taskService.queryAll("task_status = '0' and agv_system_type = '2'");
for (int i = 0; i < list.size(); i++) {
TaskDto taskDto = list.get(i);
String link_no = CodeUtil.getNewCode("LINK_NO");
try {
instructionService.create(instructionService.createInstDtoByTask(taskDto, link_no));
instructionService.autoCreateOneInst(instructionService.createInstDtoByTask(taskDto));
} catch (Exception e) {
log.error("自动创建指令失败:任务号{},失败原因:{}", taskDto.getTask_code(), e.getMessage());
log.error("自动创建叉车指令失败:任务号{},失败原因:{}", taskDto.getTask_code(), e.getMessage());
taskDto.setRemark(e.getMessage());
taskService.updateByCodeFromCache(taskDto);
continue;
@@ -47,7 +45,7 @@ public class AutoCreateAgvOneInst {
//创建指令后修改任务状态
taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode());
taskService.update(taskDto);
log.info("自动创建指令成功,任务号:{}", taskDto.getTask_code());
log.info("自动创建叉车指令成功,任务号:{}", taskDto.getTask_code());
}
}
}

View File

@@ -0,0 +1,60 @@
package org.nl.modules.quartz.task;
import lombok.extern.slf4j.Slf4j;
import org.nl.acs.acsEnum.StatusEnum;
import org.nl.acs.instruction.service.InstructionService;
import org.nl.acs.task.service.TaskService;
import org.nl.acs.task.service.dto.TaskDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 自动创建海柔AGV指令
*
* @author GengBy
*/
@Slf4j
@Component
public class AutoCreateHrInst {
@Autowired
private InstructionService instructionService;
@Autowired
private TaskService taskService;
/**
* 海柔最大指令数量
*/
private static final int MAX_INST_NUM = 3;
/**
* 查询海柔AGV任务并生成对应的指令
*
* @throws Exception
*/
public void run() throws Exception {
List<TaskDto> list = taskService.queryAll("task_status = '0' and agv_system_type = '3'");
for (int i = 0; i < list.size(); i++) {
TaskDto taskDto = list.get(i);
try {
int maxNum = instructionService.querySameDeviceCodeForHrMaxNum(taskDto.getStart_point_code(), taskDto.getNext_point_code());
if (maxNum == MAX_INST_NUM) {
log.info("海柔同一点位指令数量大于允许生成的最大指令数量:" + MAX_INST_NUM);
continue;
}
instructionService.autoCreateHrInst(instructionService.createInstDtoByTask(taskDto));
} catch (Exception e) {
log.error("自动创建海柔指令失败:任务号{},失败原因:{}", taskDto.getTask_code(), e.getMessage());
taskDto.setRemark(e.getMessage());
taskService.updateByCodeFromCache(taskDto);
continue;
}
//创建指令后修改任务状态
taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode());
taskService.update(taskDto);
log.info("自动创建海柔指令成功,任务号:{}", taskDto.getTask_code());
}
}
}

View File

@@ -114,7 +114,7 @@ public class AutoCreateInst {
//生产指令关联编号
String link_no = CodeUtil.getNewCode("LINK_NO");
Instruction instDto = instructionService.createInstDtoByTask(taskDto, link_no);
Instruction instDto = instructionService.createInstDtoByTask(taskDto);
try {
instructionService.create(instDto);
@@ -152,9 +152,8 @@ public class AutoCreateInst {
//生产指令关联编号
String link_no = CodeUtil.getNewCode("LINK_NO");
Instruction instDto1 = instructionService.createInstDtoByTask(taskDto1, link_no);
Instruction instDto2 = instructionService.createInstDtoByTask(taskDto2, link_no);
Instruction instDto1 = instructionService.createInstDtoByTask(taskDto1);
Instruction instDto2 = instructionService.createInstDtoByTask(taskDto2);
try {
instructionService.createTwoInst(instDto1, instDto2);

View File

@@ -0,0 +1,130 @@
package org.nl.modules.quartz.task;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.nl.acs.acsEnum.StatusEnum;
import org.nl.acs.instruction.service.InstructionService;
import org.nl.acs.instruction.service.dto.Instruction;
import org.nl.acs.opc.DeviceAppService;
import org.nl.acs.task.service.TaskService;
import org.nl.acs.task.service.dto.TaskDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 自动创建诺宝AGV指令
*
* @author GengBy
*/
@Slf4j
@Component
public class AutoCreateNuoBaoInst {
@Autowired
private TaskService taskService;
@Autowired
private InstructionService instructionService;
@Autowired
private DeviceAppService deviceAppService;
/**
* 单任务个数
*/
public static final Integer TASK_SIZE_1 = 1;
/**
* 双任务个数
*/
public static final Integer TASK_SIZE_2 = 2;
/**
* 自动创建诺宝AGV指令
*/
public void run() throws Exception {
//查询所有诺宝AGV任务
List<TaskDto> list = taskService.queryAll("task_status = '0' and agv_system_type = '1' ");
if (ObjectUtil.isEmpty(list)) {
return;
}
//根据LINK_NUM分组任务
Map<String, List<TaskDto>> link_num_tasks = Optional
.ofNullable(list)
.orElse(new ArrayList<>())
.stream()
.collect(Collectors.groupingBy(TaskDto::getLink_num));
//遍历分组任务 如果任务数为2则是下发双任务如果任务数为1 则下发单任务
for (Map.Entry<String, List<TaskDto>> task_values : link_num_tasks.entrySet()) {
List<TaskDto> tasks =
Optional
.ofNullable(task_values.getValue())
.orElse(new ArrayList<>())
.stream()
.sorted((t1, t2) -> t1.getIs_send().compareTo(t2.getIs_send()))
.collect(Collectors.toList());
//如果分组后任务数量为1 下发单任务分组后任务数量为2 下发双任务
if (tasks.size() == TASK_SIZE_1) {
TaskDto taskDto = tasks.get(0);
//判断任务是否需要立即下发AGV 不需要则是双任务,等待系统下发第二条任务后,走下发双任务的逻辑
if (StrUtil.equals(taskDto.getIs_send(), StatusEnum.NO_SEND.getCode())) {
continue;
}
Instruction instDto = instructionService.createInstDtoByTask(taskDto);
try {
instructionService.autoCreateTwoInst(instDto, null);
} catch (Exception e) {
taskDto.setRemark(e.getMessage());
taskService.updateByCodeFromCache(taskDto);
continue;
}
//创建指令后修改任务状态
taskDto.setTask_status(StatusEnum.TASK_RUNNING.getCode());
taskService.update(taskDto);
} else if (tasks.size() == TASK_SIZE_2) {
TaskDto taskDto1 = tasks.get(0);
TaskDto taskDto2 = tasks.get(1);
//关联编号一致 任务类型不一致 不生成任务
if (!taskDto1.getTask_type().equals(taskDto2.getTask_type())) {
continue;
}
Instruction instDto1 = instructionService.createInstDtoByTask(taskDto1);
Instruction instDto2 = instructionService.createInstDtoByTask(taskDto2);
try {
instructionService.autoCreateTwoInst(instDto1, instDto2);
} catch (Exception e) {
taskDto1.setRemark(e.getMessage());
taskService.updateByCodeFromCache(taskDto1);
taskDto2.setRemark(e.getMessage());
taskService.updateByCodeFromCache(taskDto2);
continue;
}
//创建指令后修改任务状态
taskDto1.setTask_status(StatusEnum.TASK_RUNNING.getCode());
taskService.update(taskDto1);
taskDto2.setTask_status(StatusEnum.TASK_RUNNING.getCode());
taskService.update(taskDto2);
}
}
}
}