diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/NDCAgvServiceImpl.java b/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/NDCAgvServiceImpl.java index 640abdd..c446c36 100644 --- a/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/NDCAgvServiceImpl.java +++ b/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/NDCAgvServiceImpl.java @@ -35,11 +35,8 @@ import java.util.Map; @RequiredArgsConstructor public class NDCAgvServiceImpl implements NDCAgvService { - private final DeviceAppService deviceAppService; private final ParamService paramService; - private final AcsToWmsService acsToWmsService; - private final DeviceExecuteLogService logServer; Map AGVDeviceStatus = new HashMap(); diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/XianGongAgvServiceImpl.java b/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/XianGongAgvServiceImpl.java index dcb3989..cea00ec 100644 --- a/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/XianGongAgvServiceImpl.java +++ b/acs/nladmin-system/src/main/java/org/nl/acs/agv/server/impl/XianGongAgvServiceImpl.java @@ -21,6 +21,7 @@ import org.nl.acs.opc.DeviceAppService; import org.nl.acs.opc.DeviceType; import org.nl.modules.common.exception.BadRequestException; import org.nl.modules.system.service.ParamService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Arrays; @@ -32,9 +33,12 @@ import java.util.Map; @Service @RequiredArgsConstructor public class XianGongAgvServiceImpl implements XianGongAgvService { - private final DeviceAppService deviceAppService; - private final ParamService paramService; - private final AcsToWmsService acsToWmsService; + @Autowired + private DeviceAppService deviceAppService; + @Autowired + private ParamService paramService; + @Autowired + private AcsToWmsService acsToWmsService; Map AGVDeviceStatus = new HashMap(); @LokiLog(type = LokiLogType.AGV) diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/ConveyorDevice.java b/acs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/ConveyorDevice.java index 9a685f3..32acd3c 100644 --- a/acs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/ConveyorDevice.java +++ b/acs/nladmin-system/src/main/java/org/nl/acs/device/service/impl/ConveyorDevice.java @@ -44,15 +44,15 @@ public class ConveyorDevice { @Override public void run() { System.out.println("test"); - String MODE = "RD1.RD1." + id + ".mode"; - String code = OpcUtl.read(MODE); - if ("2".equals(code)) { - TaskService taskserver = new TaskServiceImpl(); - TaskDto dto = new TaskDto(); - dto.setStart_point_code(id); - dto.setNext_point_code("1002"); - taskserver.create(dto); - } +// String MODE = "RD1.RD1." + id + ".mode"; +// String code = OpcUtl.read(MODE); +// if ("2".equals(code)) { +// TaskService taskserver = new TaskServiceImpl(); +// TaskDto dto = new TaskDto(); +// dto.setStart_point_code(id); +// dto.setNext_point_code("1002"); +// taskserver.create(dto); +// } } } } diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java b/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java index 4a39009..b4fdfe4 100644 --- a/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java +++ b/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java @@ -6,23 +6,32 @@ import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.nl.acs.udw.UnifiedDataAccessor; import org.nl.acs.udw.UnifiedDataAccessorFactory; -import org.openscada.opc.lib.da.Group; -import org.openscada.opc.lib.da.Item; -import org.openscada.opc.lib.da.ItemState; -import org.openscada.opc.lib.da.Server; +import org.nl.acs.udw.UnifiedDataAppService; +import org.nl.modules.wql.util.SpringContextHolder; +import org.openscada.opc.lib.da.*; import java.util.*; + @Slf4j -public class DeviceOpcProtocolRunable implements Runnable { +public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerConnectionStateListener { List protocols; OpcServerManageDto OpcServer; int error_num; String message; + int maxResartNum; + private Server server; + private Group group; + boolean flag = false; + private int all_null; + private Map itemSearchCache; public DeviceOpcProtocolRunable() { this.error_num = 0; + this.all_null = 0; this.message = null; + this.itemSearchCache = new HashMap(); + this.server = null; } public List getProtocols() { @@ -41,32 +50,58 @@ public class DeviceOpcProtocolRunable implements Runnable { this.OpcServer = opcServer; } - OpcItemDto getItem(String item) { - Iterator var2 = this.protocols.iterator(); - OpcItemDto dto; - do { - if (!var2.hasNext()) { - return null; + private OpcItemDto getItem(String item) { + OpcItemDto x = (OpcItemDto) this.itemSearchCache.get(item); + if (x == null) { + Iterator var3 = this.protocols.iterator(); + + while (var3.hasNext()) { + OpcItemDto dto = (OpcItemDto) var3.next(); + if (StrUtil.equals(item, dto.getItem_code())) { + x = dto; + this.itemSearchCache.put(item, dto); + break; + } } + } - dto = (OpcItemDto) var2.next(); - } while (!StrUtil.equals(item, dto.getItem_code())); - - return dto; + return x; } + @Override public void run() { - while (true) { - try { - Server server = OpcServerUtl.getServerWithOutException(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain()); - Group group = server.addGroup(); - List itemsString = new ArrayList(); - Iterator it = this.protocols.iterator(); + if (OpcConfig.opc_item_read_using_callback) { + this.runNew(); + } else { + this.runOld(); + } + } - while (it.hasNext()) { - OpcItemDto protocol = (OpcItemDto) it.next(); + + private void runOld() { + while (true) { + start: + try { + if (this.group != null) { + group.clear(); + group.remove(); + log.trace("清理group..."); + } + if (this.server != null) { + server.disconnect(); + log.trace("清理server..."); + } + + this.server = OpcServerUtl.getServerWithOutException(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain()); + this.server.addStateListener(this); + group = this.server.addGroup(); + List itemsString = new ArrayList(); + Iterator var3 = this.protocols.iterator(); + + while (var3.hasNext()) { + OpcItemDto protocol = (OpcItemDto) var3.next(); String item = protocol.getItem_code(); itemsString.add(item); } @@ -74,47 +109,57 @@ public class DeviceOpcProtocolRunable implements Runnable { Map itemsMap = new LinkedHashMap(); boolean is_error = false; StringBuilder err_message = new StringBuilder(); - Iterator var7 = itemsString.iterator(); + Iterator var6 = itemsString.iterator(); - while (var7.hasNext()) { - String string = (String) var7.next(); + while (var6.hasNext()) { + String string = (String) var6.next(); try { - itemsMap.put(string, group.addItem(string)); + Item item = group.addItem(string); + itemsMap.put(string, item); log.trace("添加成功 {}", string); - } catch (Exception var29) { - err_message.append(string + ":" + var29.getMessage()); + } catch (Exception var26) { + err_message.append(string + ":" + var26.getMessage()); if (!is_error) { is_error = true; } } } + String tag; if (is_error) { - log.info("设备OPC数据同步配置异常"); + tag = err_message.toString(); + log.warn("{}:{}", OpcConfig.resource_code, tag); } if (!OpcStartTag.is_run) { OpcStartTag.is_run = true; } - //线程名 - String tag = Thread.currentThread().getName(); - if (this.OpcServer != null) { - tag = tag + this.OpcServer.getOpc_code(); + tag = ""; + if (log.isWarnEnabled()) { + tag = Thread.currentThread().getName(); + if (this.OpcServer != null) { + tag = tag + this.getOpcGroupID(); + } } - UnifiedDataAccessor accessor_value = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); + UnifiedDataAccessor accessor_value = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); boolean time_out = false; - label97: - while (true) { + while (DeviceOpcSynchronizeAutoRun.isRun) { long begin = System.currentTimeMillis(); + if (log.isTraceEnabled()) { + log.trace("{} 开始记时{}", tag, DateUtil.now()); + } + Map itemStatus = group.read(true, (Item[]) itemsMap.values().toArray(new Item[0])); long end = System.currentTimeMillis(); - log.trace("{} 开始记时{}", tag, DateUtil.now()); long duration = end - begin; - log.trace("{} 读取耗时:{}", tag, duration); + if (log.isTraceEnabled()) { + log.trace("{} 读取耗时:{}", tag, duration); + } + if (duration > 1000L) { if (!time_out) { log.warn("{} 读取超时 : {}", tag, duration); @@ -125,78 +170,197 @@ public class DeviceOpcProtocolRunable implements Runnable { time_out = false; } + boolean valueAllNotNull = false; Set items = itemStatus.keySet(); Iterator var18 = items.iterator(); - while (true) { - Item item; - //当前值 - Object value; - //旧的值 - Object his; - do { - if (!var18.hasNext()) { - end = System.currentTimeMillis(); - log.trace("{}", itemsString); - log.trace("{} 计算完成耗时{}", tag, end - begin); - Thread.sleep((long) OpcConfig.synchronized_millisecond); - if (this.error_num != 0) { - this.error_num = 0; - this.message = null; - } - continue label97; - } - - item = (Item) var18.next(); - ItemState itemState = (ItemState) itemStatus.get(item); - value = OpcUtl.getValue(item, itemState); - his = accessor_value.getValue(item.getId()); - if (!ObjectUtil.equal(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && his != null) { - log.warn("opc 值不健康 item: {}, 状态: {}", item.getId(), itemState.getQuality()); - } - } while (ObjectUtil.equal(value, his));//如果两次的值相等,不走下面的代码 - - OpcItemDto itemDto = this.getItem(item.getId()); - if (itemDto.getNeed_log() != null && itemDto.getNeed_log()) { - StringBuilder sb = new StringBuilder(); - //设备的ITEM项 - List relate_items = itemDto.getRelate_items(); - Iterator var26 = relate_items.iterator(); - - while (var26.hasNext()) { - String relate = (String) var26.next(); - Object obj = accessor_value.getValue(relate); - sb.append("key:" + relate + "value:" + obj + ";"); - } - - log.info("信号{}变更从{}->{};信号快照:{}", new Object[]{item.getId(), his, value, sb}); - + while (var18.hasNext()) { + Item item = (Item) var18.next(); + ItemState itemState = (ItemState) itemStatus.get(item); + Object value = OpcUtl.getValue(item, itemState); + if (value != null) { + valueAllNotNull = true; } - //设置值 - accessor_value.setValue(item.getId(), value); + String itemId = item.getId(); + Object his = accessor_value.getValue(itemId); + if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && his != null) { + log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality()); + valueAllNotNull = true; + } + + if (!UnifiedDataAppService.isEquals(value, his)) { + OpcItemDto itemDto = this.getItem(itemId); + if (true) { + this.logItemChanged(itemId, accessor_value, value, itemDto); + } + if(!ObjectUtil.isEmpty(value)){ + accessor_value.setValue(itemId, value); + } + } + } + + end = System.currentTimeMillis(); + if (log.isTraceEnabled()) { + log.trace("{}", itemsString); + log.trace("{} 计算完成耗时{}", tag, end - begin); + } + + ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond); + if (this.error_num != 0) { + this.error_num = 0; + this.message = null; + } + + if (!valueAllNotNull) { + int random = (new Random()).nextInt(10) + 1; + random *= 1000; + if (this.all_null < 3) { + if (log.isWarnEnabled()) { + log.warn("{} 所有内容都为空, all_null:{} ,暂定{}s", tag, all_null,5000 + random); + } + + ThreadUtl.sleep((long) (5000 + random)); + } else if (this.all_null < 6) { + if (log.isWarnEnabled()) { + log.warn(tag + "重新创建server"); + log.warn("{} 所有内容都为空, all_null:{} ,暂定{}s", tag, all_null,30000 + random); + } +// ThreadUtl.sleep((long) (30000 + random)); + ThreadUtl.sleep((long) ((new Random()).nextInt(3) +1) * 1000); + break start; + } else if (this.all_null < 12) { + if (log.isWarnEnabled()) { + log.warn("{} 所有内容都为空, all_null:{} ,暂定{}ms", tag, all_null, '\uea60' + random); + } + + ThreadUtl.sleep((long) ('\uea60' + random)); + } else { + if (log.isWarnEnabled()) { + log.warn("{} 所有内容都为空, all_null:{} ,暂定{}ms", tag, all_null, 120000 + random); + } + + ThreadUtl.sleep((long) (120000 + random)); + } + + ++this.all_null; + } else { + this.all_null = 0; + } +// break start; + + } + + log.warn("opc线程停止。。。"); + return; + } catch (Exception var27) { + if (this.server != null) { + try { + this.server.disconnect(); + } catch (Exception var25) { } } - } catch (Exception var30) { - String error_message = "设备信息同步异常"; - if (!StrUtil.equals(this.message, error_message)) { - log.warn("", var30); + + this.server = null; + if (!DeviceOpcSynchronizeAutoRun.isRun) { + log.warn("opc线程停止2。。。"); + return; } - try { - Thread.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000)); - } catch (InterruptedException e) { - e.printStackTrace(); + String error_message = "设备信息同步异常"; + if (!StrUtil.equals(this.message, error_message)) { + log.warn(error_message, var27); } + + ThreadUtl.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000)); ++this.error_num; if (this.error_num > 3 && !StrUtil.equals(this.message, error_message)) { - log.info("设备同步通信异常"); this.message = error_message; } } } } + + private void runNew() { + Async20Access accessor = null; + + while (true) { + String opcGroupId = this.getOpcGroupID(); + + try { + if (this.server == null) { + this.server = OpcServerUtl.getServerWithOutException(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain()); + this.server.addStateListener(this); + accessor = new Async20Access(this.server, OpcConfig.synchronized_millisecond, true); + Iterator var9 = this.protocols.iterator(); + + while (var9.hasNext()) { + OpcItemDto protocol = (OpcItemDto) var9.next(); + String itemId = protocol.getItem_code(); + accessor.addItem(itemId, this); + } + + accessor.bind(); + log.info("Async20Access bind {}", opcGroupId); + } + + Thread.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000)); + } catch (Exception var8) { + if (accessor != null) { + try { + log.warn("Async20Access unbind {}", opcGroupId); + accessor.unbind(); + } catch (Exception var7) { + var7.printStackTrace(); + } + + accessor = null; + } + + if (this.server != null) { + try { + this.server.disconnect(); + } catch (Exception var6) { + } + + this.server = null; + } + + if (var8 instanceof InterruptedException) { + log.warn("OPC 同步线程(%s)被中断", opcGroupId); + return; + } + + log.warn("设备信息同步异常", var8); + ThreadUtl.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000)); + String error_message = var8.getMessage(); + if (error_message == null) { + error_message = var8.toString(); + } + + ++this.error_num; + if (this.error_num > 3 && !StrUtil.equals(this.message, error_message)) { + this.message = error_message; + } + } + } + } + + + public void connectionStateChanged(boolean connected) { + if (!connected) { + this.server = null; + } + + log.warn("opc server {} {}", this.getOpcGroupID(), connected ? "connected" : "disconnected"); + } + + private String getOpcGroupID() { + String var10000 = this.OpcServer.getOpc_code(); + return var10000 + "(" + this.protocols.size() + " items)"; + } + public static String formatDuring(long mss) { long days = mss / 86400000L; long hours = mss % 86400000L / 3600000L; @@ -204,4 +368,55 @@ public class DeviceOpcProtocolRunable implements Runnable { long seconds = mss % 60000L / 1000L; return days + " days " + hours + " hours " + minutes + " minutes " + seconds + " seconds "; } + + + public void changed(Item item, ItemState itemState) { + String itemId = item.getId(); + + try { + Object value = OpcUtl.getValue(item, itemState); + UnifiedDataAccessor accessor_value = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); + accessor_value.setValue(itemId, value); + +// if (value != null) { +// if (log.isTraceEnabled()) { +// log.trace("Item {} new value: {}, Timestamp: {}", new Object[]{itemId, itemState.getValue(), itemState.getTimestamp().getTime()}); +// } +// } else if (log.isInfoEnabled()) { +// log.info("Item {} new value: {}, Timestamp: {}, Quality: {}", new Object[]{itemId, itemState.getValue(), itemState.getTimestamp().getTime(), itemState.getQuality()}); +// } + log.trace("Item {} new value: {}, Timestamp: {}", new Object[]{itemId, itemState.getValue(), itemState.getTimestamp().getTime()}); + + OpcItemDto itemDto = this.getItem(itemId); +// if (Boolean.TRUE.equals(itemDto.getNeed_log())) { +// this.logItemChanged(itemId, accessor_value, value, itemDto); +// } + this.logItemChanged(itemId, accessor_value, value, itemDto); + + } catch (Exception var7) { + log.error(itemId, var7); + } + + } + + private void logItemChanged(String itemId, UnifiedDataAccessor accessor_value, Object value, OpcItemDto itemDto) { + Object his = accessor_value.getValue(itemId); + List relate_items = itemDto.getRelate_items(); + if (relate_items != null && !relate_items.isEmpty()) { + StringBuilder sb = new StringBuilder(); + Iterator var8 = relate_items.iterator(); + + while (var8.hasNext()) { + String relate = (String) var8.next(); + Object obj = accessor_value.getValue(relate); + sb.append("key:" + relate + "value:" + obj + ";"); + } + log.warn("设备:{}信号{}变更从{}->{};信号快照:{}", new Object[]{itemDto.getDevice_code(), itemId, his, value, sb}); +// this.businessLogger.setResource(itemDto.getDevice_code(), itemDto.getDevice_name()).log("信号{}变更从{}->{};信号快照:{}", new Object[]{itemId, his, value, sb}); + } else { + log.warn("设备:{}信号{}变更从{}->{};信号快照:{}", new Object[]{itemDto.getDevice_code(), itemId, his, value}); +// this.businessLogger.setResource(itemDto.getDevice_code(), itemDto.getDevice_name()).log("信号{}变更从{}->{}", new Object[]{itemId, his, value}); + } + } + } diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java b/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java index ff07751..16c6c65 100644 --- a/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java +++ b/acs/nladmin-system/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java @@ -1,276 +1,82 @@ package org.nl.acs.opc; -import cn.hutool.core.thread.NamedThreadFactory; import cn.hutool.core.util.ObjectUtil; -import lombok.extern.slf4j.Slf4j; import org.nl.acs.auto.run.AbstractAutoRunnable; -import org.nl.acs.udw.UnifiedDataAccessor; -import org.nl.acs.udw.UnifiedDataAccessorFactory; -import org.nl.acs.udw.UnifiedDataAppService; -import org.nl.modules.wql.util.SpringContextHolder; -import org.openscada.opc.lib.da.Group; -import org.openscada.opc.lib.da.Item; -import org.openscada.opc.lib.da.ItemState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.*; -import java.util.concurrent.*; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * OPC设备同步启动 */ @Component -@Slf4j public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { - static boolean isRun = true; + + public static boolean isRun = false; + ExecutorService executorService = Executors.newCachedThreadPool(); @Autowired private DeviceAppService deviceAppService; @Autowired private OpcServerManageService opcServerManageService; -// @Autowired -// LuceneExecuteLogService lucene; - - static ExecutorService executorService; - public static Map opcServersConfig; - public static Map itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); - - static boolean canRefreshOpcEntity = true; - private long lastRefreshOpcEntityTime; - static UnifiedDataAccessor udw; - private static Map canReadOpcValues; - private static volatile Map opcCodeOpcEntityMapping; - - public DeviceOpcSynchronizeAutoRun() { - this.lastRefreshOpcEntityTime = 0L; - } + @Override public String getCode() { return DeviceOpcSynchronizeAutoRun.class.getSimpleName(); } + @Override public String getName() { return "opc设备同步器"; } - static Group getGroup(String opcCode) throws Exception { - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); - return opcServerService.getServer(opcCode); - } - - static void submitTimeLimitTask(Runnable runnable, String opcCode) { - CompletableFuture future = CompletableFuture.runAsync(runnable, executorService); - -// try { -// future.get(10L, TimeUnit.SECONDS); -// } catch (InterruptedException var9) { -// Thread.currentThread().interrupt(); -// } catch (ExecutionException var10) { -// var10.printStackTrace(); -// } catch (TimeoutException var11) { -// itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { -// udw.setValue(key, (Object) null); -// }); -// canReadOpcValues = new ConcurrentHashMap<>(); -// System.out.println("opc设备同步器 任务执行超时,取消任务..."); -// future.cancel(true); -// } finally { -// canRefreshOpcEntity = true; -// if (opcCode != null) { -// canReadOpcValues.put(opcCode, true); -// } -// -// } - } - - private ExecutorService createThreadPool() { - ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("opc-sync", false)); - executor.allowCoreThreadTimeOut(true); - return executor; - } - - public void autoRun() { - OpcStartTag.is_run = true; - opcServersConfig = this.opcServerManageService.queryAllServerMap(); - executorService = this.createThreadPool(); - opcCodeOpcEntityMapping = new ConcurrentHashMap(); - itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { - udw.setValue(key, (Object) null); - }); - canRefreshOpcEntity = true; - canReadOpcValues.clear(); - - while (true) { - this.refreshOpcEntity(); - Iterator var1 = opcServersConfig.keySet().iterator(); - - while (var1.hasNext()) { - String opcCode = (String) var1.next(); - submitTimeLimitTask(() -> { - boolean in = false; - try { - if (canReadOpcValues.computeIfAbsent(opcCode, (key) -> true)) { - in = true; - canReadOpcValues.put(opcCode, false); - this.readOpcValues(opcCode); - } - } catch (Exception var3) { - var3.printStackTrace(); - } finally { - canRefreshOpcEntity = true; - if (opcCode != null && in) { - canReadOpcValues.put(opcCode, true); - } - } - }, opcCode); - } - - ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond); - } - } - - private void readOpcValues(String opcCode) throws Exception { - synchronized (opcCode.intern()) { - OpcEntity opcEntity = (OpcEntity) opcCodeOpcEntityMapping.get(opcCode); - if (opcEntity != null) { - if (opcEntity.getItems().size() != 0) { - long begin = System.currentTimeMillis(); - if (log.isTraceEnabled()) { - log.trace("opc {} 开始计时{}", opcCode, begin); - } - - Map itemStatus; - try { - itemStatus = opcEntity.readAll(); - } catch (Exception var15) { - itemStatus = opcEntity.readDividually(); - } - - long end = System.currentTimeMillis(); - long duration = end - begin; - if (log.isTraceEnabled()) { - log.trace("opc {} 读取耗时:{}", opcCode, duration); - } - - if (duration > 1000L) { - log.warn("opc {} 读取超时 : {}", opcCode, duration); - } - -// boolean allNull = itemStatus.entrySet().stream().map((map) -> { -// return OpcUtl.getValue((Item)map.getKey(), (ItemState)map.getValue()); -// }).allMatch(Objects::isNull); -// if (allNull) { -// opcEntity.getItems().clear(); -// } - - UnifiedDataAccessor udw = opcEntity.getUdw(); - - - Set items = itemStatus.keySet(); - Iterator var18 = items.iterator(); - - while (var18.hasNext()) { - Item item = (Item) var18.next(); - ItemState itemState = (ItemState) itemStatus.get(item); - Object nowValue = OpcUtl.getValue(item, itemState); - String itemId = item.getId(); - Object historyValue = udw.getValue(itemId); - if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && historyValue != null) { - log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality()); - } - if (!UnifiedDataAppService.isEquals(nowValue, historyValue)) { - OpcItemDto itemDto = (OpcItemDto) itemCodeOpcItemDtoMapping.get(itemId); - if (true) { - this.logItemChanged(itemId, udw, nowValue, itemDto); - } - udw.setValue(itemId, nowValue); - } - - } + @Override + public void autoRun() throws Exception { + { + isRun = true; + Map servers = this.opcServerManageService.queryAllServerMap(); + Map>> pros; + do{ + Thread.sleep(1000L); + pros = this.deviceAppService.findAllFormatProtocolFromDriver(); + }while (ObjectUtil.isEmpty(pros)); + Set keys = pros.keySet(); + Iterator var4 = keys.iterator(); + //代码执行一次 + while (var4.hasNext()) { + String key = (String) var4.next(); + List> list = (List) pros.get(key); + OpcServerManageDto opcServer = (OpcServerManageDto) servers.get(key); + Iterator var8 = list.iterator(); + while (var8.hasNext()) { + List groupProtols = (List) var8.next(); + DeviceOpcProtocolRunable runable = new DeviceOpcProtocolRunable(); + runable.setProtocols(groupProtols); + runable.setOpcServer(opcServer); + this.executorService.submit(runable); } } - } - } - private void refreshOpcEntity() { - if (canRefreshOpcEntity) { - canRefreshOpcEntity = false; - long now = System.currentTimeMillis(); - if (now - this.lastRefreshOpcEntityTime >= 20000L) { - this.lastRefreshOpcEntityTime = now; - submitTimeLimitTask(() -> { - try { - Map>> protocol = this.deviceAppService.findAllFormatProtocolFromDriver(); - Iterator var2 = protocol.entrySet().iterator(); + // 同步无光电设备信号 + //Map>> pros1 = this.deviceAppService.findAllFormatProtocolFromDriver(); + //List opcDrivers = this.deviceAppService.findDeviceDriver(DeviceDriver.class); - while (var2.hasNext()) { - Map.Entry>> stringListEntry = (Map.Entry) var2.next(); - String opcCode = (String) stringListEntry.getKey(); - List> opcItemDtos = (List) stringListEntry.getValue(); - ((OpcEntity) opcCodeOpcEntityMapping.computeIfAbsent(opcCode, OpcEntity::new)).reload(opcItemDtos); - } - } catch (Exception var6) { - var6.printStackTrace(); - } finally { - canRefreshOpcEntity = true; - } - - }, (String) null); + while (true) { + Thread.sleep(3000L); } } } - private void logMessage(String errorMessage) { - try { -// issueLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]); -// businessLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]); - } catch (Exception var5) { - var5.printStackTrace(); - } - - } - + @Override public void after() { - OpcStartTag.is_run = false; - opcCodeOpcEntityMapping.values().forEach((opcEntity) -> { - opcEntity.cleanUdwCache(); - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); - opcServerService.cleanGroups(opcEntity.getOpcCode()); - }); - opcCodeOpcEntityMapping = new ConcurrentHashMap(); - itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); - executorService.shutdownNow(); - } - - private void logItemChanged(String itemCode, UnifiedDataAccessor udw, Object value, OpcItemDto itemDto) { - Object his = udw.getValue(itemCode); - List relate_items = itemDto.getRelate_items(); - if (relate_items != null && !relate_items.isEmpty()) { - StringBuilder sb = new StringBuilder(); - Iterator var8 = relate_items.iterator(); - - while (var8.hasNext()) { - String relate = (String) var8.next(); - Object obj = udw.getValue(relate); - sb.append("key:").append(relate).append("value:").append(obj).append(";"); - } -// if (!itemCode.endsWith("heartbeat") && !itemCode.endsWith("time")) { -// log.warn("{} 信号 {} 发生变更 {} -> {} 信号快照 {}", itemDto.getDevice_code(), itemCode, his, value, sb); -// lucene.deviceExecuteLog(new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(), itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), String.valueOf(his), String.valueOf(value))); -// } - - } else { -// if (!itemCode.endsWith("heartbeat") && !itemCode.endsWith("time")) { -// log.warn("{} 信号 {} 发生变更 {} -> {}", itemDto.getDevice_code(), itemCode, his, value); -// lucene.deviceExecuteLog(new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(), itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), String.valueOf(his), String.valueOf(value))); -// } - } - } - - static { - udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); - canReadOpcValues = new ConcurrentHashMap(); - opcCodeOpcEntityMapping = new ConcurrentHashMap(); + isRun = false; + this.executorService.shutdownNow(); + this.executorService = Executors.newCachedThreadPool(); } } diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcConfig.java b/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcConfig.java index cadf67c..068a8aa 100644 --- a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcConfig.java +++ b/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcConfig.java @@ -4,10 +4,13 @@ public class OpcConfig { public static Boolean auto_start_opc = Boolean.valueOf(true); public static String udw_opc_value_key = "opc_value"; public static Integer synchronized_millisecond = Integer.valueOf(100); - public static Integer synchronized_exception_wait_second = Integer.valueOf(10); + public static Integer synchronized_exception_wait_second = Integer.valueOf(3); public static Integer retry_times = Integer.valueOf(3); public static String sync_issue_type_code = "device_opc_sync"; public static String opc_server_default_group = "group"; public static String resource_code = "opc_sync"; public static String resource_name = "opc同步"; + + //OPC 数据同步是否采用回调机制实现。之前是线程定期全部读,效率低。 + public static Boolean opc_item_read_using_callback = false; } diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcEntity.java b/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcEntity.java deleted file mode 100644 index dc72831..0000000 --- a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcEntity.java +++ /dev/null @@ -1,176 +0,0 @@ -package org.nl.acs.opc; - -import org.nl.acs.udw.UnifiedDataAccessor; -import org.nl.acs.udw.UnifiedDataAccessorFactory; -import org.nl.modules.wql.util.SpringContextHolder; -import org.openscada.opc.lib.da.Group; -import org.openscada.opc.lib.da.Item; -import org.openscada.opc.lib.da.ItemState; - -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -public class OpcEntity { - - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); - private final UnifiedDataAccessor udw; - private Map> items; - private Map> someFailDevices; - private String opcCode; - - public OpcEntity(String opcCode) { - this.udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); - this.items = new ConcurrentHashMap(); - this.someFailDevices = new ConcurrentHashMap(); - this.opcCode = opcCode; - } - - public void reload(List> opcItemDtos) { - Map> itemCodes = new ConcurrentHashMap(); - (opcItemDtos.stream().flatMap(Collection::stream).collect(Collectors.groupingBy(OpcItemDto::getDevice_code))).forEach((deviceCodes, opcItemDtoList) -> { - itemCodes.put(deviceCodes, opcItemDtoList.stream().map(OpcItemDto::getItem_code).collect(Collectors.toList())); - }); - DeviceOpcSynchronizeAutoRun.itemCodeOpcItemDtoMapping.putAll((Map)opcItemDtos.stream().flatMap(Collection::stream).collect(Collectors.toMap(OpcItemDto::getItem_code, (obj) -> { - return obj; - },(k, v) -> k))); - if (this.items.size() == 0) { - itemCodes.values().stream().flatMap(Collection::stream).forEach((key) -> { - this.udw.setValue(key, (Object)null); - }); - this.addItemsIntoGroup(itemCodes); - } else { - if (this.someFailDevices.size() > 0) { - this.reAddDevices(); - } - - } - } - - private void reAddDevices() { - Map> addItems = new ConcurrentHashMap(); - StringBuilder err_message = new StringBuilder(); - this.someFailDevices.forEach((deviceCode, itemCodesList) -> { - itemCodesList.forEach((itemCode) -> { - try { - Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); - ((List)addItems.computeIfAbsent(deviceCode, (key) -> { - return new ArrayList(); - })).add(group.addItem(itemCode)); - } catch (Exception var6) { - err_message.append(itemCode).append(" 添加失败; "); - } - - }); - List deviceItems = (List)addItems.get(deviceCode); - if (deviceItems != null && deviceItems.size() == itemCodesList.size()) { - this.someFailDevices.remove(deviceCode); - } else if (itemCodesList.size() == 0) { - addItems.remove(deviceCode); - } else { - assert deviceItems != null; - - ((List)this.someFailDevices.get(deviceCode)).removeAll(deviceItems); - } - - synchronized(this.opcCode.intern()) { - this.items.putAll(addItems); - } - - if (err_message.length() > 0) { - String errMsg = err_message.toString(); - //this.log.warn("{}:{}", com.wxzd.wcs.opc.OpcConfig.resource_code, errMsg); - } - - }); - } - - private void addItemsIntoGroup(Map> itemCodes) { - try { - Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); - StringBuilder err_message = new StringBuilder(); - Map> items = new ConcurrentHashMap(); - itemCodes.forEach((deviceCode, itemCodesList) -> { - itemCodesList.forEach((itemCode) -> { - try { - ((List)items.computeIfAbsent(deviceCode, (key) -> { - return new ArrayList(); - })).add(group.addItem(itemCode)); - } catch (Exception var7) { - ((List)this.someFailDevices.computeIfAbsent(deviceCode, (key) -> { - return new ArrayList(); - })).add(itemCode); - this.udw.setValue(itemCode, (Object)null); - err_message.append(itemCode).append(" 添加失败; "); - } - - }); - List deviceItems = (List)items.get(deviceCode); - if (deviceItems != null && deviceItems.size() != itemCodesList.size()) { - items.remove(deviceCode); - this.someFailDevices.put(deviceCode, itemCodesList); - } - - }); - synchronized(this.opcCode.intern()) { - this.items = items; - } - - if (err_message.length() > 0) { - String errMsg = err_message.toString(); -// this.log.warn("{}:{}", OpcConfig.resource_code, errMsg); - } - } catch (Exception var8) { - var8.printStackTrace(); - } - - } - - public void cleanUdwCache() { - this.items.values().stream().flatMap(Collection::stream).map(Item::getId).forEach((key) -> { - this.udw.setValue(key, (Object)null); - }); - } - - public Map readAll() throws Exception { - return opcServerService.getServer(this.opcCode).read(true, (Item[])this.items.values().stream().flatMap(Collection::stream).toArray((x$0) -> { - return new Item[x$0]; - })); - } - - public Map readDividually() { - Map result = new HashMap(); - CompletableFuture[] futures = (CompletableFuture[])this.items.entrySet().stream().map((entry) -> { - return CompletableFuture.runAsync(() -> { - try { - Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); - result.putAll(group.read(true, (Item[])((List)entry.getValue()).toArray(new Item[0]))); - } catch (Exception var5) { - String deviceCode = (String)entry.getKey(); - // to do -// this.someFailDevices.put(deviceCode, ((List)entry.getValue()).stream().map(Item::getId).collect(Collectors.toList())); - this.items.remove(deviceCode); - } - - }, DeviceOpcSynchronizeAutoRun.executorService); - }).toArray((x$0) -> { - return new CompletableFuture[x$0]; - }); - CompletableFuture.allOf(futures).join(); - return result; - } - - - public UnifiedDataAccessor getUdw() { - return this.udw; - } - - public Map> getItems() { - return this.items; - } - - public String getOpcCode() { - return this.opcCode; - } -} diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcServerService.java b/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcServerService.java deleted file mode 100644 index 8979a58..0000000 --- a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcServerService.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.nl.acs.opc; - -import org.nl.acs.device_driver.driver.ItemValue; -import org.openscada.opc.lib.da.Group; - -/** - * @author ldjun - * @version 1.0 - * @date 2023年02月01日 11:26 - * @desc desc - */ -public interface OpcServerService { - - void reload(); - - Group getServer(String var1) throws Exception; - - void writeInteger(String var1, ItemValue... var2); - - void clearServer(String var1); - - void cleanGroups(String var1); -} diff --git a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcUtl.java b/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcUtl.java index c38ba5a..26af619 100644 --- a/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcUtl.java +++ b/acs/nladmin-system/src/main/java/org/nl/acs/opc/OpcUtl.java @@ -20,8 +20,6 @@ import java.util.concurrent.Executors; public class OpcUtl { private static int timeout = 1*60*1000; private static String key = "rpc.socketTimeout"; - public static int successNum=0; - public static int errNum=0; static { checkTimeout(); @@ -37,18 +35,7 @@ public class OpcUtl { public static void writeValue(Group group, WriteRequest... requests) throws WDKException { try { - Map e=null; - try{ - e=group.write(requests); - group.write(requests); - }catch (Exception e1){ - try{ - e= group.write(requests); - }catch (Exception e2){ - e= group.write(requests); - } - } - + Map e=group.write(requests); boolean is_success = true; StringBuilder message = new StringBuilder(); Iterator arg4 = e.keySet().iterator(); @@ -69,16 +56,12 @@ public class OpcUtl { } if (!is_success) { - // throw new BusinessException(message.toString()); - System.out.println("下发信号失败:"+message.toString()); - System.out.println("下发信号失败原因:"+message.toString()); log.info("下发信号失败:"+message.toString()); throw new WDKException(message.toString()); } } catch (JIException arg7) { - log.info("下发信号失败:"+arg7.getMessage()); - System.out.println("下发信号失败原因:"+arg7.getMessage()); - throw new WDKException(arg7); + log.info("下发信号失败Exception:"+arg7.getMessage()); + throw new WDKException("下发信号失败Exception:"+arg7); } } @@ -94,7 +77,6 @@ public class OpcUtl { Item item = group.addItem(value.getItem_code()); ws.add(new WriteRequest(item, getVariant(value.getItem_value()))); } - writeValue(group, (WriteRequest[])ws.toArray(new WriteRequest[0])); } @@ -222,15 +204,10 @@ public class OpcUtl { server = new Server(getConnection(host, clsid, user, password, domain), Executors.newSingleThreadScheduledExecutor()); server.connect(); - successNum++; return server; } catch (Exception e) { - errNum++; + System.out.println("server error:"+e.getMessage()); throw new WDKException(e.getMessage()); - }finally{ - System.out.println("successNum:"+successNum); - System.out.println("errNum:"+errNum); -// System.out.println(11); } } @@ -292,18 +269,6 @@ public class OpcUtl { return value; } - public static String read(String item) throws Exception { - System.out.println(item); - Server server = getServer("192.168.81.251", "7bc0cc8e-482c-47ca-abdc-0fe7f9c6e729", "administrator", "Huawei@123", ""); -// String byteItemString = "RD1.RD1.1001.mode"; - Group group = server.addGroup(); - Item byteItem = group.addItem(item); - ItemState itemState = null; - JIVariant value = null; - itemState = byteItem.read(true); - value = itemState.getValue(); - String data = OpcUtl.getValue(byteItem, itemState) + ""; - return data; - } + }