diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java index 61add6a1c..65f5d3f38 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java @@ -1,316 +1,85 @@ package org.nl.acs.opc; -import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSON; -import lombok.extern.slf4j.Slf4j; -import org.apache.lucene.util.NamedThreadFactory; -import org.nl.acs.AcsConfig; +import cn.hutool.core.util.ObjectUtil; import org.nl.acs.auto.run.AbstractAutoRunnable; import org.nl.acs.opc.service.dto.OpcServerManageDto; -import org.nl.acs.udw.UnifiedDataAccessor; -import org.nl.acs.udw.UnifiedDataAccessorFactory; -import org.nl.acs.udw.UnifiedDataAppService; -import org.nl.common.enums.LogTypeEnum; -import org.nl.config.SpringContextHolder; -import org.nl.config.lucene.service.dto.LuceneLogDto; -import org.nl.system.service.param.ISysParamService; -import org.openscada.opc.lib.da.Group; -import org.openscada.opc.lib.da.Item; -import org.openscada.opc.lib.da.ItemState; +import org.nl.config.thread.TheadFactoryName; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.*; -import java.util.regex.Pattern; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * OPC设备同步启动 + * @author 20220102CG\noblelift */ @Component -@Slf4j public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { - static boolean isRun = true; + + public static boolean isRun = false; + ExecutorService executorService = Executors.newCachedThreadPool(); @Autowired private DeviceAppService deviceAppService; @Autowired private OpcServerManageService opcServerManageService; -// @Autowired -// LuceneExecuteLogService lucene; - - static ExecutorService executorService; - public static Map opcServersConfig; - public static Map itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); - - static boolean canRefreshOpcEntity = true; - private long lastRefreshOpcEntityTime; - static UnifiedDataAccessor udw; - private static Map canReadOpcValues; - private static volatile Map opcCodeOpcEntityMapping; - - public DeviceOpcSynchronizeAutoRun() { - this.lastRefreshOpcEntityTime = 0L; - } + @Override public String getCode() { return DeviceOpcSynchronizeAutoRun.class.getSimpleName(); } + @Override public String getName() { return "opc设备同步器"; } - static Group getGroup(String opcCode) throws Exception { - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); - return opcServerService.getServer(opcCode); - } - - static void submitTimeLimitTask(Runnable runnable, String opcCode) { - CompletableFuture future = CompletableFuture.runAsync(runnable, executorService); - -// try { -// future.get(10L, TimeUnit.SECONDS); -// } catch (InterruptedException var9) { -// Thread.currentThread().interrupt(); -// } catch (ExecutionException var10) { -// var10.printStackTrace(); -// } catch (TimeoutException var11) { -// itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { -// udw.setValue(key, (Object) null); -// }); -// canReadOpcValues = new ConcurrentHashMap<>(); -// System.out.println("opc设备同步器 任务执行超时,取消任务..."); -// future.cancel(true); -// } finally { -// canRefreshOpcEntity = true; -// if (opcCode != null) { -// canReadOpcValues.put(opcCode, true); -// } -// -// } - } - - private ExecutorService createThreadPool() { - ThreadPoolExecutor executor = new ThreadPoolExecutor(32, 32, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("opc-sync")); - executor.allowCoreThreadTimeOut(true); - return executor; - } - - public void autoRun() { - OpcStartTag.is_run = true; - opcServersConfig = this.opcServerManageService.queryAllServerMap(); - executorService = this.createThreadPool(); - opcCodeOpcEntityMapping = new ConcurrentHashMap(); - itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { - udw.setValue(key, (Object) null); - }); - canRefreshOpcEntity = true; - canReadOpcValues.clear(); - - while (true) { - this.refreshOpcEntity(); - Iterator var1 = opcServersConfig.keySet().iterator(); - - while (var1.hasNext()) { - String opcCode = (String) var1.next(); - submitTimeLimitTask(() -> { - boolean in = false; - try { - if (canReadOpcValues.computeIfAbsent(opcCode, (key) -> true)) { - in = true; - canReadOpcValues.put(opcCode, false); - this.readOpcValues(opcCode); - } - } catch (Exception var3) { - var3.printStackTrace(); - } finally { - canRefreshOpcEntity = true; - if (opcCode != null && in) { - canReadOpcValues.put(opcCode, true); - } - } - }, opcCode); - } - - ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond); - } - } - - private void readOpcValues(String opcCode) throws Exception { - synchronized (opcCode.intern()) { - OpcEntity opcEntity = (OpcEntity) opcCodeOpcEntityMapping.get(opcCode); - if (opcEntity != null) { - if (opcEntity.getItems().size() != 0) { - long begin = System.currentTimeMillis(); - if (log.isTraceEnabled()) { - log.trace("opc {} 开始计时{}", opcCode, begin); - } - - new HashMap(); - - Map itemStatus; - try { - itemStatus = opcEntity.readAll(); - } catch (Exception var15) { - itemStatus = opcEntity.readDividually(); - } - - long end = System.currentTimeMillis(); - long duration = end - begin; - if (log.isTraceEnabled()) { - log.trace("opc {} 读取耗时:{}", opcCode, duration); - } - - if (duration > 1000L) { - log.warn("opc {} 读取超时 : {}", opcCode, duration); - } - -// boolean allNull = itemStatus.entrySet().stream().map((map) -> { -// return OpcUtl.getValue((Item)map.getKey(), (ItemState)map.getValue()); -// }).allMatch(Objects::isNull); -// if (allNull) { -// opcEntity.getItems().clear(); -// } - - UnifiedDataAccessor udw = opcEntity.getUdw(); - - - Set items = itemStatus.keySet(); - Iterator var18 = items.iterator(); - - while (var18.hasNext()) { - Item item = (Item) var18.next(); - ItemState itemState = (ItemState) itemStatus.get(item); - Object nowValue = OpcUtl.getValue(item, itemState); - String itemId = item.getId(); - Object historyValue = udw.getValue(itemId); - if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && historyValue != null) { - log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality()); - } - if (!UnifiedDataAppService.isEquals(nowValue, historyValue)) { - OpcItemDto itemDto = (OpcItemDto) itemCodeOpcItemDtoMapping.get(itemId); - if (true) { - this.logItemChanged(itemId, udw, nowValue, itemDto); - } - udw.setValue(itemId, nowValue); - } - - } + @Override + public void autoRun() throws Exception { + { + isRun = true; + Map servers = this.opcServerManageService.queryAllServerMap(); + Map>> pros; + do { + Thread.sleep(1000L); + pros = this.deviceAppService.findAllFormatProtocolFromDriver(); + } while (ObjectUtil.isEmpty(pros)); + Set keys = pros.keySet(); + Iterator var4 = keys.iterator(); + //代码执行一次 + while (var4.hasNext()) { + String key = (String) var4.next(); + List> list = (List) pros.get(key); + OpcServerManageDto opcServer = (OpcServerManageDto) servers.get(key); + Iterator var8 = list.iterator(); + while (var8.hasNext()) { + List groupProtols = (List) var8.next(); + DeviceOpcProtocolRunable runable = new DeviceOpcProtocolRunable(); + runable.setProtocols(groupProtols); + runable.setOpcServer(opcServer); + this.executorService.submit(runable); } } - } - } - private void refreshOpcEntity() { - if (canRefreshOpcEntity) { - canRefreshOpcEntity = false; - long now = System.currentTimeMillis(); - if (now - this.lastRefreshOpcEntityTime >= 20000L) { - this.lastRefreshOpcEntityTime = now; - submitTimeLimitTask(() -> { - try { - Map>> protocol = this.deviceAppService.findAllFormatProtocolFromDriver(); - Iterator var2 = protocol.entrySet().iterator(); + // 同步无光电设备信号 + //Map>> pros1 = this.deviceAppService.findAllFormatProtocolFromDriver(); + //List opcDrivers = this.deviceAppService.findDeviceDriver(DeviceDriver.class); - while (var2.hasNext()) { - Entry>> stringListEntry = (Entry) var2.next(); - String opcCode = (String) stringListEntry.getKey(); - List> opcItemDtos = (List) stringListEntry.getValue(); - ((OpcEntity) opcCodeOpcEntityMapping.computeIfAbsent(opcCode, OpcEntity::new)).reload(opcItemDtos); - } - } catch (Exception var6) { - var6.printStackTrace(); - } finally { - canRefreshOpcEntity = true; - } - - }, (String) null); + while (true) { + Thread.sleep(3000L); } } } - private void logMessage(String errorMessage) { - try { -// issueLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]); -// businessLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]); - } catch (Exception var5) { - var5.printStackTrace(); - } - - } - + @Override public void after() { - OpcStartTag.is_run = false; - opcCodeOpcEntityMapping.values().forEach((opcEntity) -> { - opcEntity.cleanUdwCache(); - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); - opcServerService.cleanGroups(opcEntity.getOpcCode()); - }); - opcCodeOpcEntityMapping = new ConcurrentHashMap(); - itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); - executorService.shutdownNow(); - } - - private void logItemChanged(String itemId, UnifiedDataAccessor accessor_value, Object value, OpcItemDto itemDto) { - ISysParamService paramService = SpringContextHolder.getBean(ISysParamService.class); - Object his = accessor_value.getValue(itemId); - List relate_items = itemDto.getRelate_items(); - if (relate_items != null && !relate_items.isEmpty()) { - StringBuilder sb = new StringBuilder(); - Iterator var8 = relate_items.iterator(); - - while (var8.hasNext()) { - String relate = (String) var8.next(); - Object obj = accessor_value.getValue(relate); - sb.append("key:" + relate + "value: " + obj + ";"); - } - if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { - // 存在上次点位值为null情况 则不记录日志 - if(!(his instanceof Float) && !(value instanceof Float)){ - LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), - String.valueOf(his), String.valueOf(value)); - luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); - String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue(); - if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){ - log.info("{}", JSON.toJSONString(luceneLogDto)); - } - } - } - } else { - -// if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { -// if(!(his instanceof Float) && !(value instanceof Float)){ -// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(), itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), -// String.valueOf(his), String.valueOf(value)); -// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); -// log.info("{}", JSON.toJSONString(luceneLogDto)); -// } -// } - - if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { - if(!(his instanceof Float) && !(value instanceof Float)){ - LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), - String.valueOf(his), String.valueOf(value)); - luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); - String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue(); - if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){ - log.info("{}", JSON.toJSONString(luceneLogDto)); - } - } - } - - } - } - - static { - udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); - canReadOpcValues = new ConcurrentHashMap(); - opcCodeOpcEntityMapping = new ConcurrentHashMap(); - } - - public static boolean isNumeric(String str) { - return Pattern.compile("^[0-9]+$").matcher(str).matches(); + isRun = false; + this.executorService.shutdownNow(); + this.executorService = Executors.newCachedThreadPool(); } } diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcEntity.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcEntity.java index b291a5616..f88549001 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcEntity.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcEntity.java @@ -1,3 +1,4 @@ +/* package org.nl.acs.opc; import org.nl.acs.opc.DeviceOpcSynchronizeAutoRun; @@ -178,3 +179,4 @@ public class OpcEntity { return this.opcCode; } } +*/