add:信号处理
This commit is contained in:
36
nl-task/pom.xml
Normal file
36
nl-task/pom.xml
Normal file
@@ -0,0 +1,36 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.nl</groupId>
|
||||
<artifactId>nl-tool-platform</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nl-task</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>jakarta.annotation</groupId>
|
||||
<artifactId>jakarta.annotation-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.nl</groupId>
|
||||
<artifactId>nl-common</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,60 @@
|
||||
package org.nl.task.modular.core.schedule;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.nl.task.modular.task.handler.AbstractDeviceHandler;
|
||||
import org.nl.task.modular.task.manage.DeviceManager;
|
||||
import org.nl.task.modular.task.model.SignalEvent;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DeviceSignalSchedule {
|
||||
|
||||
@Autowired
|
||||
private DeviceManager deviceManager;
|
||||
|
||||
/**
|
||||
* 定时消费所有设备事件队列
|
||||
*/
|
||||
@Scheduled(fixedDelay = 3000)
|
||||
public void processSignalQueue() {
|
||||
Map<String, AbstractDeviceHandler<?>> handlers = deviceManager.getHandlers();
|
||||
for (AbstractDeviceHandler<?> handler : handlers.values()) {
|
||||
BlockingQueue<SignalEvent> queue = handler.getEngine().getQueue();
|
||||
consumeQueue(handler, queue);
|
||||
}
|
||||
}
|
||||
|
||||
private void consumeQueue(AbstractDeviceHandler<?> handler, BlockingQueue<SignalEvent> queue) {
|
||||
SignalEvent event;
|
||||
while ((event = queue.peek()) != null) {
|
||||
try {
|
||||
handleEvent(event);
|
||||
queue.poll();
|
||||
removeEventIndex(handler, event);
|
||||
log.info("处理信号成功,设备号:{}, 信号:{}, 信号值:{}", event.getDeviceCode(), event.getAlias(), event.getValue());
|
||||
} catch (Exception e) {
|
||||
log.error("处理信号事件失败, deviceCode={}, alias={}, value={}", event.getDeviceCode(), event.getAlias(), event.getValue(), e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理信号事件
|
||||
*/
|
||||
private void handleEvent(SignalEvent event) {
|
||||
|
||||
}
|
||||
|
||||
private void removeEventIndex(AbstractDeviceHandler<?> handler, SignalEvent event) {
|
||||
String key = event.getDeviceCode() + ":" + event.getAlias();
|
||||
handler.getEngine().getEventIndex().remove(key, event);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package org.nl.task.modular.task.annotation;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface DeviceType {
|
||||
String value();
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package org.nl.task.modular.task.annotation;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
@Target(ElementType.FIELD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface EnqueueWhen {
|
||||
String[] values();
|
||||
|
||||
boolean change() default true;
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package org.nl.task.modular.task.annotation;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
@Target(ElementType.FIELD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface SignalAlias {
|
||||
String value();
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package org.nl.task.modular.task.config;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import org.nl.task.modular.task.annotation.DeviceType;
|
||||
import org.nl.task.modular.task.handler.AbstractDeviceHandler;
|
||||
import org.nl.task.modular.task.manage.DeviceManager;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Configuration
|
||||
public class DeviceRegisterConfig {
|
||||
|
||||
@Autowired(required = false)
|
||||
private DeviceManager deviceManager;
|
||||
@Autowired
|
||||
private List<AbstractDeviceHandler<?>> handlers;
|
||||
|
||||
@PostConstruct
|
||||
public void registerHandlers() {
|
||||
for (AbstractDeviceHandler<?> handler : handlers) {
|
||||
DeviceType deviceType = handler.getClass().getAnnotation(DeviceType.class);
|
||||
if (deviceType != null) {
|
||||
deviceManager.register(deviceType.value(), handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package org.nl.task.modular.task.engine;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.nl.task.modular.task.model.SignalEvent;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@Getter
|
||||
public class SignalEngine<T> {
|
||||
|
||||
/**
|
||||
* key 设备号
|
||||
* value 设备信号对象
|
||||
*/
|
||||
private final Map<String, T> deviceSignalMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 设备对应的触发事件
|
||||
*/
|
||||
private final BlockingQueue<SignalEvent> queue = new LinkedBlockingQueue<>(1000);
|
||||
|
||||
/**
|
||||
* 去重, 防止同一设备同一信号值多次投递到队列中
|
||||
*/
|
||||
private final Map<String, SignalEvent> eventIndex = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
|
||||
public void enqueue(String deviceCode, String alias, Object value) {
|
||||
String key = deviceCode + ":" + alias;
|
||||
SignalEvent newEvent = SignalEvent
|
||||
.builder()
|
||||
.deviceCode(deviceCode)
|
||||
.alias(alias)
|
||||
.value(value)
|
||||
.timestamp(System.currentTimeMillis())
|
||||
.build();
|
||||
SignalEvent oldEvent = eventIndex.put(key, newEvent);
|
||||
if (oldEvent != null) {
|
||||
queue.remove(oldEvent);
|
||||
}
|
||||
queue.offer(newEvent);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package org.nl.task.modular.task.handler;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.nl.task.modular.task.annotation.EnqueueWhen;
|
||||
import org.nl.task.modular.task.annotation.SignalAlias;
|
||||
import org.nl.task.modular.task.engine.SignalEngine;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.*;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@Getter
|
||||
public abstract class AbstractDeviceHandler<T> {
|
||||
|
||||
protected final SignalEngine<T> engine = new SignalEngine<>();
|
||||
|
||||
private final Class<T> signalClass;
|
||||
|
||||
private final Map<String, Field> fieldMap = new HashMap<>();
|
||||
|
||||
private final Map<String, BiConsumer<T, Object>> setterMap = new HashMap<>();
|
||||
|
||||
private final Map<String, EnqueueWhen> ruleMap = new HashMap<>();
|
||||
|
||||
protected AbstractDeviceHandler(Class<T> signalClass) {
|
||||
this.signalClass = signalClass;
|
||||
initMetadata();
|
||||
}
|
||||
|
||||
private void initMetadata() {
|
||||
for (Field field : signalClass.getDeclaredFields()) {
|
||||
field.setAccessible(true);
|
||||
SignalAlias aliasAnn = field.getAnnotation(SignalAlias.class);
|
||||
String alias;
|
||||
if (aliasAnn == null || aliasAnn.value().isEmpty()) {
|
||||
alias = field.getName();
|
||||
} else {
|
||||
alias = aliasAnn.value();
|
||||
}
|
||||
fieldMap.put(alias, field);
|
||||
setterMap.put(alias, (obj, val) -> {
|
||||
try {
|
||||
field.set(obj, val);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
EnqueueWhen rule = field.getAnnotation(EnqueueWhen.class);
|
||||
if (rule != null) {
|
||||
ruleMap.put(alias, rule);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void syncSignals(String deviceCode, Map<String, Object> signals) {
|
||||
T plcSignal = engine.getDeviceSignalMap().computeIfAbsent(deviceCode, k -> newInstance());
|
||||
signals.forEach((alias, value) -> {
|
||||
BiConsumer<T, Object> setter = setterMap.get(alias);
|
||||
if (setter == null) {
|
||||
return;
|
||||
}
|
||||
Object oldValue = getOldValue(plcSignal, alias);
|
||||
boolean changed = !Objects.equals(oldValue, value);
|
||||
if (changed) {
|
||||
setter.accept(plcSignal, value);
|
||||
}
|
||||
//TODO 需要判断一下,如果当前变化值,不在EnqueueWhen注解范围内
|
||||
// 就需要从队列里将之前的给移出队列
|
||||
EnqueueWhen rule = ruleMap.get(alias);
|
||||
boolean enqueue = false;
|
||||
if (rule != null) {
|
||||
if (rule.values().length == 0) {
|
||||
enqueue = !rule.change() || changed;
|
||||
} else {
|
||||
for (String v : rule.values()) {
|
||||
if (Objects.equals(String.valueOf(value), v)) {
|
||||
enqueue = !rule.change() || changed;
|
||||
if (enqueue) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (enqueue) {
|
||||
engine.enqueue(deviceCode, alias, value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Object getOldValue(T plcSignal, String alias) {
|
||||
try {
|
||||
Field field = fieldMap.get(alias);
|
||||
if (field == null) {
|
||||
return null;
|
||||
}
|
||||
return field.get(plcSignal);
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private T newInstance() {
|
||||
try {
|
||||
return signalClass.getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package org.nl.task.modular.task.handler;
|
||||
|
||||
import lombok.Data;
|
||||
import org.nl.task.modular.task.annotation.DeviceType;
|
||||
import org.nl.task.modular.task.annotation.EnqueueWhen;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@DeviceType("CONVEYOR")
|
||||
@Component
|
||||
public class ConveyorHandler extends AbstractDeviceHandler<ConveyorHandler.PlcSignal> {
|
||||
|
||||
public ConveyorHandler() {
|
||||
super(PlcSignal.class);
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class PlcSignal {
|
||||
|
||||
@EnqueueWhen(values = {"5","2","6"})
|
||||
private int mode;
|
||||
|
||||
@EnqueueWhen(values = {"0"})
|
||||
private int move;
|
||||
|
||||
private int action;
|
||||
|
||||
private int error;
|
||||
|
||||
private int task;
|
||||
|
||||
private String barcode;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package org.nl.task.modular.task.handler;
|
||||
|
||||
import lombok.Data;
|
||||
import org.nl.task.modular.task.annotation.DeviceType;
|
||||
import org.nl.task.modular.task.annotation.EnqueueWhen;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@DeviceType("STACKER")
|
||||
@Component
|
||||
public class StackerHandler extends AbstractDeviceHandler<StackerHandler.PlcSignal> {
|
||||
|
||||
public StackerHandler() {
|
||||
super(PlcSignal.class);
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class PlcSignal {
|
||||
|
||||
@EnqueueWhen(values = {"5"})
|
||||
private int mode;
|
||||
|
||||
private int move;
|
||||
|
||||
private int action;
|
||||
|
||||
private int error;
|
||||
|
||||
private int task;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package org.nl.task.modular.task.manage;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.nl.task.modular.task.handler.AbstractDeviceHandler;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class DeviceManager {
|
||||
|
||||
private final Map<String, AbstractDeviceHandler<?>> handlerMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<String, AbstractDeviceHandler<?>> deviceIndex = new ConcurrentHashMap<>();
|
||||
|
||||
public void register(String deviceType, AbstractDeviceHandler<?> handler) {
|
||||
handlerMap.put(deviceType, handler);
|
||||
}
|
||||
|
||||
public AbstractDeviceHandler<?> getHandler(String deviceType) {
|
||||
return handlerMap.get(deviceType);
|
||||
}
|
||||
|
||||
public Map<String, AbstractDeviceHandler<?>> getHandlers() {
|
||||
return handlerMap;
|
||||
}
|
||||
|
||||
public void registerDevice(String deviceType, String deviceCode) {
|
||||
AbstractDeviceHandler<?> handler = handlerMap.get(deviceType);
|
||||
if (handler == null) {
|
||||
return;
|
||||
}
|
||||
deviceIndex.put(deviceCode, handler);
|
||||
}
|
||||
|
||||
public void registerDevice(String deviceType, List<String> deviceCodeList) {
|
||||
AbstractDeviceHandler<?> handler = handlerMap.get(deviceType);
|
||||
if (handler == null) {
|
||||
log.warn("未找到对应设备类型{}的Handler", deviceType);
|
||||
return;
|
||||
}
|
||||
deviceCodeList.stream().forEach(deviceCode -> {
|
||||
deviceIndex.put(deviceCode, handler);
|
||||
});
|
||||
log.info("注册设备信息:设备类型:{}, 设备号:{}", deviceType, deviceCodeList);
|
||||
}
|
||||
|
||||
public void setSignal(String deviceCode, Map<String, Object> signals) {
|
||||
AbstractDeviceHandler<?> handler = deviceIndex.get(deviceCode);
|
||||
if (handler == null) {
|
||||
log.warn("未找到对应设备{}的Handler", deviceCode);
|
||||
return;
|
||||
}
|
||||
handler.syncSignals(deviceCode, signals);
|
||||
}
|
||||
|
||||
public <T> T getSignal(String deviceCode) {
|
||||
AbstractDeviceHandler<T> handler = (AbstractDeviceHandler<T>) deviceIndex.get(deviceCode);
|
||||
if (handler == null) {
|
||||
return null;
|
||||
}
|
||||
return handler.getEngine().getDeviceSignalMap().get(deviceCode);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package org.nl.task.modular.task.model;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
@Value
|
||||
@Builder
|
||||
public class SignalEvent {
|
||||
|
||||
String deviceCode;
|
||||
|
||||
String alias;
|
||||
|
||||
Object value;
|
||||
|
||||
long timestamp;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package org.nl.task.modular.task.test;
|
||||
|
||||
import cn.dev33.satoken.annotation.SaIgnore;
|
||||
import org.nl.task.modular.task.handler.ConveyorHandler;
|
||||
import org.nl.task.modular.task.handler.StackerHandler;
|
||||
import org.nl.task.modular.task.manage.DeviceManager;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/test")
|
||||
@Validated
|
||||
public class TestController {
|
||||
|
||||
@Autowired
|
||||
private DeviceManager deviceManager;
|
||||
|
||||
@PostMapping("/register")
|
||||
@SaIgnore
|
||||
public void register(@RequestBody Map<String, Object> map) {
|
||||
deviceManager.registerDevice("CONVEYOR", Arrays.asList("CONVEYOR1", "CONVEYOR2"));
|
||||
deviceManager.registerDevice("STACKER", Arrays.asList("STACKER1"));
|
||||
Map<String, Object> conveyor1 = new HashMap<>();
|
||||
conveyor1.put("mode", 5);
|
||||
conveyor1.put("move", 0);
|
||||
conveyor1.put("action", 0);
|
||||
conveyor1.put("barcode","T000000001");
|
||||
deviceManager.setSignal("CONVEYOR1", conveyor1);
|
||||
Map<String, Object> conveyor2 = new HashMap<>();
|
||||
conveyor2.put("mode", 2);
|
||||
conveyor2.put("move", 1);
|
||||
conveyor2.put("action", 1);
|
||||
deviceManager.setSignal("CONVEYOR2", conveyor2);
|
||||
Map<String, Object> stacker1 = new HashMap<>();
|
||||
stacker1.put("mode", 5);
|
||||
stacker1.put("move", 1);
|
||||
stacker1.put("action", 1);
|
||||
deviceManager.setSignal("STACKER1", stacker1);
|
||||
ConveyorHandler.PlcSignal signal = deviceManager.getSignal("CONVEYOR1");
|
||||
System.out.println(signal.toString());
|
||||
signal = deviceManager.getSignal("CONVEYOR2");
|
||||
System.out.println(signal.toString());
|
||||
StackerHandler.PlcSignal signal2 = deviceManager.getSignal("STACKER1");
|
||||
System.out.println(signal2);
|
||||
}
|
||||
}
|
||||
@@ -60,6 +60,12 @@
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.nl</groupId>
|
||||
<artifactId>nl-task</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.nl</groupId>
|
||||
<artifactId>nl-iot</artifactId>
|
||||
|
||||
@@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@@ -31,6 +32,7 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
@Slf4j
|
||||
@RestController
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
public class Application {
|
||||
|
||||
/* 解决druid 日志报错:discard long time none received connection:xxx */
|
||||
|
||||
Reference in New Issue
Block a user