diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/initial/ApplicationAutoInitialExecuter.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/initial/ApplicationAutoInitialExecuter.java index 8c885400e..e2ac6b0a5 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/initial/ApplicationAutoInitialExecuter.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/initial/ApplicationAutoInitialExecuter.java @@ -2,6 +2,7 @@ package org.nl.acs.auto.initial; import cn.hutool.core.util.ObjectUtil; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.annotation.AnnotationAwareOrderComparator; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -33,6 +34,7 @@ public class ApplicationAutoInitialExecuter { } List services = ObjectUtil.isNotEmpty(applicationAutoInitial) ? this.applicationAutoInitial : new ArrayList(); + AnnotationAwareOrderComparator.sort(services); Iterator it = services.iterator(); while (it.hasNext()) { diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/run/AutoRunServiceImpl.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/run/AutoRunServiceImpl.java index 494f4827f..74764246b 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/run/AutoRunServiceImpl.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/auto/run/AutoRunServiceImpl.java @@ -11,6 +11,7 @@ import org.nl.common.exception.BadRequestException; import org.nl.config.language.LangProcess; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; import java.util.*; diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/device_driver/driver/AbstractOpcDeviceDriver.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/device_driver/driver/AbstractOpcDeviceDriver.java index f97f2f544..b69a61236 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/device_driver/driver/AbstractOpcDeviceDriver.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/device_driver/driver/AbstractOpcDeviceDriver.java @@ -37,13 +37,8 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc public void checkcontrol(Map itemValues) throws Exception { - Group group; - try { - group = opcServerService.getServer(this.getOpcServer()); - } catch (Exception e) { - e.printStackTrace(); - group = opcServerService.getServer(this.getOpcServer()); - } + String opcServer = this.getOpcServer(); + Group group = opcServerService.getServer(opcServer); Map write = new HashMap(); Map readitems = new LinkedHashMap(); List itemsString = new ArrayList(); @@ -60,24 +55,24 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc } int i = 0; while (true) { - //下发信号 + boolean check = true; + // 下发信号 try { if (i == 0) { control(itemValues); } else { controlByNewConn(itemValues); } - } catch (Exception e) { - e.printStackTrace(); + log.warn("checkcontrol 下发失败 opcServer={} 第{}次", opcServer, i + 1, e); + check = false; } // ThreadUtl.sleep(1000L); Map read = new HashMap(); Map itemStatus = null; - boolean check = true; try { if (i > 0) { - group = opcServerService.getServer(this.getOpcServer()); + group = opcServerService.getServer(opcServer); itemsString = new ArrayList<>(itemValues.keySet()); Iterator nis = itemsString.iterator(); @@ -206,23 +201,8 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc this.last_items = this_items; this.sendTime = date; - /* this.execute_log.setResource(this.getDevice().getCode(), this.getDevice().getName()); - this.execute_log.log("原始记录{}->变更为{}", new Object[]{sb, this_items}); - OpcServerService opcServerService = OpcServerFactory.getOpcServerService();*/ - - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerServiceImpl.class); - + // 确认不回写 UDW,由读循环更新 opcServerService.writeInteger(this.getOpcServer(), itemValues); - UnifiedDataAccessor opcValueAccessor = this.getOpcValueAccessor(); - ItemValue[] var17 = itemValues; - int var18 = itemValues.length; - - for (int var19 = 0; var19 < var18; ++var19) { - //ItemValue itemValue = var17[var19]; - //String code = itemValue.getItem_code(); - //Object value = itemValue.getItem_value(); - //opcValueAccessor.setValue(code, value); - } } return true; @@ -264,21 +244,8 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc this.last_items = this_items; this.sendTime = date; - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerServiceImpl.class); - + // 确认不回写 UDW,由读循环更新 opcServerService.writeIntegerByNewConn(this.getOpcServer(), itemValues); - - - UnifiedDataAccessor opcValueAccessor = this.getOpcValueAccessor(); - ItemValue[] var17 = itemValues; - int var18 = itemValues.length; - - for (int var19 = 0; var19 < var18; ++var19) { - //ItemValue itemValue = var17[var19]; - //String code = itemValue.getItem_code(); - //Object value = itemValue.getItem_value(); - //opcValueAccessor.setValue(code, value); - } } return true; diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceAppServiceImpl.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceAppServiceImpl.java index 55fd5088c..b4ca85468 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceAppServiceImpl.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceAppServiceImpl.java @@ -13,6 +13,7 @@ import org.nl.acs.device_driver.DeviceDriverDefination; import org.nl.acs.device_driver.LinewayDeviceDriver; import org.nl.acs.device_driver.driver.OpcDeviceDriver; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; import java.util.*; @@ -22,6 +23,7 @@ import java.util.*; */ @Slf4j @Service +@Order(1) public class DeviceAppServiceImpl implements DeviceAppService, ApplicationAutoInitial { /** * 所有设备链表 diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java index ace2d5198..ac69662f9 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcProtocolRunable.java @@ -1,12 +1,10 @@ package org.nl.acs.opc; -import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.nl.acs.AcsConfig; -import org.nl.acs.instruction.service.InstructionService; import org.nl.acs.opc.service.dto.OpcServerManageDto; import org.nl.acs.udw.UnifiedDataAccessor; import org.nl.acs.udw.UnifiedDataAccessorFactory; @@ -16,9 +14,13 @@ import org.nl.config.SpringContextHolder; import org.nl.config.lucene.service.LuceneExecuteLogService; import org.nl.config.lucene.service.dto.LuceneLogDto; import org.nl.system.service.param.ISysParamService; +import org.openscada.opc.dcom.common.Result; +import org.openscada.opc.dcom.da.OPCITEMRESULT; import org.openscada.opc.lib.da.*; +import java.nio.channels.ClosedChannelException; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Pattern; @@ -92,227 +94,269 @@ public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerC private void runOld() { - OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService .class); + if (this.OpcServer == null) { + log.warn("[OPC链路] OpcServer 为空,任务退出 thread={}", Thread.currentThread().getName()); + return; + } + String opcCode = this.OpcServer.getOpc_code(); + if (StrUtil.isEmpty(opcCode)) { + log.warn("[OPC链路] opcCode 为空,任务退出"); + return; + } + OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); + String tag = Thread.currentThread().getName() + "," + getOpcGroupID(); + log.info("[OPC链路] 任务启动 opcCode={} tag={} items={}", opcCode, tag, this.protocols != null ? this.protocols.size() : 0); + while (true) { start: try { + // 1. 清组 if (this.group != null) { - group.clear(); - group.remove(); - log.trace("清理group..."); - } - if (this.server != null) { - server.disconnect(); - log.trace("清理server..."); - } -// this.server = OpcServerUtl.getServerWithOutException(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain()); - this.server = OpcUtl.getServer(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain()); - this.server.addStateListener(this); - group = this.server.addGroup(); - List itemsString = new ArrayList(); - Iterator var3 = this.protocols.iterator(); - - while (var3.hasNext()) { - OpcItemDto protocol = (OpcItemDto) var3.next(); - String item = protocol.getItem_code(); - itemsString.add(item); - } - - Map itemsMap = new LinkedHashMap(); - boolean is_error = false; - StringBuilder err_message = new StringBuilder(); - Iterator var6 = itemsString.iterator(); - - while (var6.hasNext()) { - String string = (String) var6.next(); - try { - Item item = group.addItem(string); - itemsMap.put(string, item); - log.trace("添加成功 {}", string); - } catch (Exception var26) { - err_message.append(string + ":" + var26.getMessage()); - if (!is_error) { - is_error = true; - } + this.group.clear(); + this.group.remove(); + } catch (Exception e) { + log.debug("[OPC链路] 清理group异常 opcCode={} {}", opcCode, e.getMessage()); + } + this.group = null; + log.info("[OPC链路] 已清理本组 opcCode={} tag={}", opcCode, tag); + } + + // 2. 从连接池获取连接+建组 + OpcUtl.withOpcCodeLock(opcCode, () -> { + try { + this.server = opcServerService.getServerInstanceByOpcCode(opcCode); + this.server.addStateListener(this); + this.group = this.server.addGroup(); + log.info("[OPC链路] 取连接并建组成功 opcCode={} tag={}", opcCode, tag); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + if (this.server == null || this.group == null) { + log.warn("[OPC链路] 取连接或建组失败 opcCode={} tag={}", opcCode, tag); + break start; + } + + // 3. 添加Item 先校验item再批量添加item + List requiredItemIds = new ArrayList<>(); + Set seen = new LinkedHashSet<>(); + for (OpcItemDto protocol : this.protocols) { + if (protocol != null && StrUtil.isNotBlank(protocol.getItem_code()) && seen.add(protocol.getItem_code())) { + requiredItemIds.add(protocol.getItem_code()); + } + } + Map itemsMap = new LinkedHashMap<>(); + boolean is_error = false; + StringBuilder err_message = new StringBuilder(); + if (!requiredItemIds.isEmpty()) { + try { + Map> validationResults = + group.validateItems(requiredItemIds.toArray(new String[0])); + Set validItemIds = new HashSet<>(); + for (Map.Entry> entry : validationResults.entrySet()) { + if (entry.getValue().getErrorCode() == 0) { + validItemIds.add(entry.getKey()); + } else { + log.warn("[OPC链路] Item验证失败 opcCode={} item={} 错误码={}", opcCode, entry.getKey(), entry.getValue().getErrorCode()); + err_message.append(entry.getKey()).append(":").append(entry.getValue().getErrorCode()).append("; "); + if (!is_error) is_error = true; + } + } + if (!validItemIds.isEmpty()) { + Map added = group.addItems(validItemIds.toArray(new String[0])); + itemsMap.putAll(added); + } + } catch (AddFailedException e) { + itemsMap.putAll(e.getItems()); + e.getErrors().forEach((id, code) -> err_message.append(id).append(":").append(code).append("; ")); + if (!is_error) is_error = true; + } catch (Exception e) { + err_message.append(e.getMessage()); + if (!is_error) is_error = true; } } - - String tag; if (is_error) { - tag = err_message.toString(); - log.warn("{}:{}", OpcConfig.resource_code, tag); + log.warn("[OPC链路] 部分Item校验/添加失败 opcCode={} tag={} err={}", opcCode, tag, err_message); + } + log.info("[OPC链路] 加Item完成 opcCode={} tag={} total={} ok={}", opcCode, tag, requiredItemIds.size(), itemsMap.size()); + if (itemsMap.isEmpty()) { + log.warn("[OPC链路] 本组无可用Item,跳过读取 opcCode={} tag={}", opcCode, tag); + ThreadUtl.sleep(1000); + break start; } if (!OpcStartTag.is_run) { OpcStartTag.is_run = true; } - tag = ""; - if (log.isWarnEnabled()) { - tag = Thread.currentThread().getName(); - if (this.OpcServer != null) { - tag = tag + "," + this.getOpcGroupID(); - } - } - UnifiedDataAccessor accessor_value = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); boolean time_out = false; + int readTimeoutMs = OpcConfig.opc_read_timeout_ms != null ? OpcConfig.opc_read_timeout_ms : 2000; + int retryReadDelayMs = OpcConfig.opc_retry_read_delay_ms != null ? OpcConfig.opc_retry_read_delay_ms : 150; + int staggerMaxMs = OpcConfig.opc_stagger_read_max_ms != null ? Math.max(1, OpcConfig.opc_stagger_read_max_ms) : 100; + long lastReadMonitorLog = 0L; + final long READ_MONITOR_INTERVAL_MS = 30_000L; while (DeviceOpcSynchronizeAutoRun.isRun) { + if (this.server == null || this.group == null) { + log.warn("[OPC链路] 连接或组已空,退出读循环 opcCode={} tag={}", opcCode, tag); + break; + } + // 随机延迟,防止同时读group + ThreadUtl.sleep(ThreadLocalRandom.current().nextInt(0, staggerMaxMs)); long begin = System.currentTimeMillis(); - if (log.isTraceEnabled()) { - log.trace("{} 开始记时{}", tag, DateUtil.now()); - } - - Map itemStatus = group.read(true, (Item[]) itemsMap.values().toArray(new Item[0])); - long end = System.currentTimeMillis(); - long duration = end - begin; - if (log.isTraceEnabled()) { - log.trace("{} 读取耗时:{}", tag, duration); - } - - if (duration > 1000L) { - if (!time_out) { - log.warn("{} 读取超时 : {}", tag, duration); - } + Map itemStatus = group.read(true, itemsMap.values().toArray(new Item[0])); + long duration = System.currentTimeMillis() - begin; + log.debug("[OPC链路] 读完成 opcCode={} tag={} durationMs={} items={}", opcCode, tag, duration, itemStatus != null ? itemStatus.size() : 0); + if (duration > readTimeoutMs) { + log.warn("[OPC链路] 读取超时 opcCode={} tag={} durationMs={}", opcCode, tag, duration); time_out = true; + ThreadUtl.sleep(retryReadDelayMs); + itemStatus = group.read(true, itemsMap.values().toArray(new Item[0])); + duration = System.currentTimeMillis() - begin; } else { time_out = false; } + int goodCount = 0; + int badCount = 0; boolean valueAllNotNull = false; - Set items = itemStatus.keySet(); - Iterator var18 = items.iterator(); - - while (var18.hasNext()) { - Item item = (Item) var18.next(); - ItemState itemState = (ItemState) itemStatus.get(item); + for (Map.Entry entry : (itemStatus != null ? itemStatus.entrySet() : Collections.>emptySet())) { + Item item = entry.getKey(); + ItemState itemState = entry.getValue(); Object value = OpcUtl.getValue(item, itemState); if (value != null) { valueAllNotNull = true; + goodCount++; } else { - log.info("item:{},velue为空,value:{}", item.getId(), value); + badCount++; + log.debug("[OPC链路] item值为空 opcCode={} itemId={} quality={}", opcCode, item.getId(), itemState != null ? itemState.getQuality() : null); } - String itemId = item.getId(); Object his = accessor_value.getValue(itemId); - if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && his != null) { - log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality()); + if (itemState != null && !ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && his != null) { + log.warn("[OPC链路] 值不健康 opcCode={} itemId={} quality={}", opcCode, itemId, itemState.getQuality()); valueAllNotNull = false; } - if (!UnifiedDataAppService.isEquals(value, his)) { OpcItemDto itemDto = this.getItem(itemId); - if (true) { - this.logItemChanged(itemId, accessor_value, value, itemDto); - } - if(!ObjectUtil.isEmpty(value) || "".equals(value)){ + this.logItemChanged(itemId, accessor_value, value, itemDto); + if (!ObjectUtil.isEmpty(value) || "".equals(value)) { accessor_value.setValue(itemId, value); } - if(ObjectUtil.isEmpty(value) && !"".equals(value)){ + if (ObjectUtil.isEmpty(value) && !"".equals(value)) { accessor_value.removeValue(itemId); } } } - end = System.currentTimeMillis(); - if (log.isTraceEnabled()) { - log.trace("{}", itemsString); - log.trace("{} 计算完成耗时{}", tag, end - begin); + int total = goodCount + badCount; + int unhealthyRatioThreshold = OpcConfig.opc_unhealthy_ratio_threshold_percent != null ? OpcConfig.opc_unhealthy_ratio_threshold_percent : 10; + if (total > 0) { + long now = System.currentTimeMillis(); + if (now - lastReadMonitorLog >= READ_MONITOR_INTERVAL_MS) { + lastReadMonitorLog = now; + log.info("[OPC链路] 读监控 opcCode={} tag={} good={} bad={} total={} durationMs={} timeout={}", + opcCode, tag, goodCount, badCount, total, duration, time_out); + } else { + log.debug("[OPC链路] 本组健康统计 opcCode={} tag={} good={} bad={} total={}", opcCode, tag, goodCount, badCount, total); + } } - ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond); + ThreadUtl.sleep(OpcConfig.opc_loop_interval_ms != null ? OpcConfig.opc_loop_interval_ms.longValue() : 500L); if (this.error_num != 0) { this.error_num = 0; this.message = null; } if (!valueAllNotNull) { + ++this.all_null; + int unhealthyRatio = total > 0 ? (badCount * 100 / total) : 100; if (this.all_null < 3) { - if (log.isWarnEnabled()) { - log.warn("OPC数据源: {} 所有内容都为空,检查网络, all_null:{} ,暂定{}s", tag, all_null,3); - } - while (var18.hasNext()) { - Item item = (Item) var18.next(); - String itemId = item.getId(); - OpcItemDto itemDto = this.getItem(itemId); - LuceneLogDto logDto = LuceneLogDto.builder() - .device_code("OPC数据源") - .content("OPC数据源:" + tag + "内容为:" + itemDto) - .build(); - logDto.setLog_level(4); - luceneExecuteLogService.deviceExecuteLog(logDto); - } - ThreadUtl.sleep( 3000); + log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} unhealthyRatio={}% 阈值={}% 暂定3s", opcCode, tag, all_null, unhealthyRatio, unhealthyRatioThreshold); + ThreadUtl.sleep(3000); break start; } else if (this.all_null < 6) { - if (log.isWarnEnabled()) { - log.warn(tag + "重新创建server"); - log.warn("{} 所有内容都为空, all_null:{} ,暂定{}s", tag, all_null,3); - } + log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} 暂定3s", opcCode, tag, all_null); ThreadUtl.sleep(3000); break start; } else if (this.all_null < 12) { - if (log.isWarnEnabled()) { - log.warn(tag + "重新创建server"); - log.warn("{} 所有内容都为空, all_null:{} ,暂定{}s", tag, all_null,3); - } + log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} 暂定3s", opcCode, tag, all_null); ThreadUtl.sleep(3000); break start; } else { - if (log.isWarnEnabled()) { - log.warn("{} 所有内容都为空, all_null:{} ,暂定{}ms", tag, all_null, 5000); - } + log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} 暂定5s", opcCode, tag, all_null); LuceneLogDto logDto = LuceneLogDto.builder() .device_code("OPC数据源") .content("OPC数据源:" + tag + "内容为:" + all_null) .build(); logDto.setLog_level(4); luceneExecuteLogService.deviceExecuteLog(logDto); - ThreadUtl.sleep((long) (5000)); + ThreadUtl.sleep(5000); break start; } - -// ++this.all_null; } else { this.all_null = 0; } -// break start; - } - log.warn("opc线程停止。。。"); + log.info("[OPC链路] 任务正常退出 opcCode={} tag={}", opcCode, tag); return; } catch (Exception var27) { - if (this.server != null) { + log.warn("[OPC链路] 任务异常 opcCode={} tag={}", opcCode, tag, var27); + + if (this.group != null) { try { - this.server.disconnect(); - } catch (Exception var25) { + this.group.clear(); + this.group.remove(); + } catch (Exception e) { + log.debug("[OPC链路] 异常时清理group失败 {}", e.getMessage()); } + this.group = null; + } + this.server = null; + + if (isConnectionLevelException(var27)) { + log.warn("[OPC链路] 连接级异常,释放连接 opcCode={}", opcCode); + opcServerService.invalidateByOpcCode(opcCode); + ThreadUtl.sleep(200); } - this.server = null; if (!DeviceOpcSynchronizeAutoRun.isRun) { - log.warn("opc线程停止2。。。"); + log.warn("[OPC链路] 同步器已停止,任务退出 opcCode={} tag={}", opcCode, tag); return; } String error_message = "设备信息同步异常"; if (!StrUtil.equals(this.message, error_message)) { - log.warn(error_message, var27); + log.warn("[OPC链路] {}", error_message, var27); } - ThreadUtl.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000)); ++this.error_num; - if (this.error_num > 3 && !StrUtil.equals(this.message, error_message)) { + if (this.error_num > 3) { this.message = error_message; } } } } + private static boolean isConnectionLevelException(Throwable e) { + if (e == null) return false; + Throwable t = e; + while (t != null) { + if (t instanceof ClosedChannelException) return true; + if (t instanceof java.net.SocketTimeoutException) return true; + if (t instanceof java.net.SocketException) return true; + String msg = t.getMessage(); + if (msg != null && (msg.contains("0x8001FFFF") || msg.contains("ClosedChannel") || msg.contains("connection reset"))) return true; + t = t.getCause(); + } + return false; + } + private void runNew() { @@ -385,10 +429,20 @@ public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerC @Override public void connectionStateChanged(boolean connected) { if (!connected) { + if (this.group != null) { + try { + this.group.clear(); + this.group.remove(); + } catch (Exception e) { + log.debug("[OPC链路] 断开时清理group异常 {}", e.getMessage()); + } + this.group = null; + } this.server = null; + log.warn("[OPC链路] 连接断开,已清组 opcCode={} tag={}", this.OpcServer != null ? this.OpcServer.getOpc_code() : null, getOpcGroupID()); + } else { + log.info("[OPC链路] 连接已连接 opcCode={} tag={}", this.OpcServer != null ? this.OpcServer.getOpc_code() : null, getOpcGroupID()); } - - log.warn("opc server {} {}", this.getOpcGroupID(), connected ? "connected" : "disconnected"); } private String getOpcGroupID() { diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java index 69ee0c4cc..7ae004bc9 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java @@ -1,21 +1,19 @@ package org.nl.acs.opc; import cn.hutool.core.util.ObjectUtil; +import lombok.extern.slf4j.Slf4j; import org.dromara.dynamictp.core.support.ThreadPoolBuilder; import org.nl.acs.auto.run.AbstractAutoRunnable; import org.nl.acs.opc.service.dto.OpcServerManageDto; -import org.nl.config.thread.TheadFactoryName; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import javax.annotation.Resource; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE; @@ -25,8 +23,11 @@ import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_B * @author 20220102CG\noblelift */ @Component +@Slf4j +@Order(100) public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { + public static boolean isRun = false; ExecutorService executorService = ThreadPoolBuilder.newBuilder() .threadPoolName("deviceOpc_thread") @@ -67,11 +68,17 @@ public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { } while (ObjectUtil.isEmpty(pros)); Set keys = pros.keySet(); Iterator var4 = keys.iterator(); - //代码执行一次 + int submitted = 0; + int skipped = 0; while (var4.hasNext()) { String key = (String) var4.next(); List> list = (List) pros.get(key); OpcServerManageDto opcServer = (OpcServerManageDto) servers.get(key); + if (opcServer == null) { + log.warn("[OPC链路] opcCode 无配置,跳过提交 key={}", key); + skipped++; + continue; + } Iterator var8 = list.iterator(); while (var8.hasNext()) { List groupProtols = (List) var8.next(); @@ -79,15 +86,29 @@ public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { runable.setProtocols(groupProtols); runable.setOpcServer(opcServer); this.executorService.submit(runable); + submitted++; } } + log.info("[OPC链路] 任务提交完成 已提交={} 跳过={} opcCodes={}", submitted, skipped, keys.size()); // 同步无光电设备信号 //Map>> pros1 = this.deviceAppService.findAllFormatProtocolFromDriver(); //List opcDrivers = this.deviceAppService.findDeviceDriver(DeviceDriver.class); + long lastMonitorLog = 0; + final long MONITOR_INTERVAL_MS = 60_000; while (true) { Thread.sleep(3000L); + long now = System.currentTimeMillis(); + if (now - lastMonitorLog >= MONITOR_INTERVAL_MS) { + lastMonitorLog = now; + try { + Map stats = OpcUtl.getPoolStatsByOpcCode(); + log.info("[OPC链路] 连接池监控 池大小={} 明细={}", stats.size(), stats); + } catch (Exception e) { + log.debug("[OPC链路] 连接池监控异常 {}", e.getMessage()); + } + } } } } diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcConfig.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcConfig.java index 44eccaabd..21b027467 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcConfig.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcConfig.java @@ -7,6 +7,16 @@ public class OpcConfig { public static Boolean auto_start_opc = Boolean.valueOf(true); public static String udw_opc_value_key = "opc_value"; public static Integer synchronized_millisecond = Integer.valueOf(100); + /** 读循环间隔(毫秒),runOld 用 */ + public static Integer opc_loop_interval_ms = Integer.valueOf(500); + /** 读超时阈值(毫秒),超过打超时日志并重试读一次 */ + public static Integer opc_read_timeout_ms = Integer.valueOf(2000); + /** 读超时后重试前的延迟(毫秒) */ + public static Integer opc_retry_read_delay_ms = Integer.valueOf(150); + /** 读前错峰随机延迟上限(毫秒),0~该值随机 */ + public static Integer opc_stagger_read_max_ms = Integer.valueOf(100); + /** 不健康比例阈值(%),用于日志与告警判断 */ + public static Integer opc_unhealthy_ratio_threshold_percent = Integer.valueOf(10); public static Integer synchronized_exception_wait_second = Integer.valueOf(10); public static Integer retry_times = Integer.valueOf(3); public static String sync_issue_type_code = "device_opc_sync"; diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java index 862068013..0ba2da77e 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java @@ -2,6 +2,7 @@ package org.nl.acs.opc; import org.nl.acs.device_driver.driver.ItemValue; import org.openscada.opc.lib.da.Group; +import org.openscada.opc.lib.da.Server; /** * @author ldjun @@ -16,6 +17,16 @@ public interface OpcServerService { */ void reload(); + /** + * 按 opcCode 获取连接池中的 Server 实例(单例连接,仅根据 opcCode 取) + */ + Server getServerInstanceByOpcCode(String opcCode); + + /** + * 按 opcCode 释放连接(从池移除并 disconnect) + */ + void invalidateByOpcCode(String opcCode); + /** * 获取服务器 * @param var1 diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java index 87059a076..2fccc120d 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java @@ -6,6 +6,7 @@ import org.nl.acs.auto.initial.ApplicationAutoInitial; import org.nl.acs.device_driver.driver.ItemValue; import org.nl.acs.opc.service.dto.OpcServerManageDto; import org.openscada.opc.lib.common.NotConnectedException; +import org.nl.common.exception.BadRequestException; import org.openscada.opc.lib.da.Group; import org.openscada.opc.lib.da.Server; import org.openscada.opc.lib.da.UnknownGroupException; @@ -73,6 +74,38 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn this.opcServerManageDtos = Collections.synchronizedMap(this.opcServerManageDtos); } + @Override + public Server getServerInstanceByOpcCode(String opcCode) { + if (StrUtil.isEmpty(opcCode)) { + throw new IllegalArgumentException("opcCode 不能为空"); + } + OpcServerManageDto dto = this.opcServerManageDtos.get(opcCode); + if (dto == null) { + log.warn("[OPC链路] opcCode 无配置 opcCode={}", opcCode); + throw new RuntimeException("opcCode 不存在: " + opcCode); + } + try { + return OpcUtl.getOrCreateServerByOpcCode( + opcCode, + dto.getOpc_host(), + StrUtil.trim(dto.getCls_id()), + dto.getUser(), + dto.getPassword(), + StrUtil.trim(dto.getDomain()) + ); + } catch (BadRequestException e) { + throw new RuntimeException("[OPC链路] 获取连接失败 opcCode=" + opcCode, e); + } + } + + @Override + public void invalidateByOpcCode(String opcCode) { + OpcUtl.invalidateByOpcCode(opcCode); + this.servers.remove(opcCode); + this.groups.remove(opcCode); + log.info("[OPC链路] 已释放连接 opcCode={}", opcCode); + } + @Override public Group getServer(String code) { synchronized (this.buildLock(code)) { @@ -117,14 +150,12 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn throw new RuntimeException(code + "不存在"); } -// if (server!=null){ -// server.disconnect(); -// server=null; -// } - if (server == null) { -// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); - server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); + try { + server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); + } catch (BadRequestException e) { + throw new RuntimeException("[OPC链路] 获取连接失败 code=" + code, e); + } } try { @@ -133,9 +164,8 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn this.clearServer(code); ThreadUtl.sleep(1500L); log.warn("获取opc出错重新获取", code, var12); -// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); - server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); try { + server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); group = server.addGroup(groupName); } catch (Exception var11) { throw new RuntimeException(var12); @@ -156,21 +186,19 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn @Override public synchronized Group getServerByNewConn(String code) { synchronized (this.buildLock(code)) { - - Server server = (Server) this.servers.get(code); - if (server != null) { - this.clearServer(code); - } + this.clearServer(code); OpcServerManageDto dto = (OpcServerManageDto) this.opcServerManageDtos.get(code); if (dto == null) { throw new RuntimeException(code + "不存在"); } -// if (server == null) { -// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); - server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); -// } + Server server; + try { + server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); + } catch (BadRequestException e) { + throw new RuntimeException("[OPC链路] 获取连接失败 code=" + code, e); + } String groupName = code; Group group = null; @@ -180,9 +208,8 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn this.clearServer(code); ThreadUtl.sleep(2000L); log.warn("获取opc出错重新获取", code, var12); -// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); - server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); try { + server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain())); group = server.addGroup(groupName); } catch (Exception var11) { throw new RuntimeException(var12); @@ -197,37 +224,24 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn @Override public synchronized void clearServer(String code) { - try { - Server server = (Server) this.servers.get(code); - if (server != null) { - server.disconnect(); - } - - } catch (Exception e) { - e.printStackTrace(); - log.error("清理server异常,", e.getMessage()); - } - + OpcUtl.invalidateByOpcCode(code); this.servers.remove(code); this.groups.remove(code); } + @Override public void cleanGroups(String opcCode) { - Group group = (Group)this.groups.get(opcCode); + Group group = (Group) this.groups.get(opcCode); if (group != null) { - Server server = group.getServer(); - try { group.remove(); } catch (JIException var5) { - var5.printStackTrace(); + log.warn("cleanGroups remove group 异常 opcCode={}", opcCode, var5); } - + OpcUtl.invalidateByOpcCode(opcCode); this.groups.remove(opcCode); - server.disconnect(); this.servers.remove(opcCode); } - } @Override diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcUtl.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcUtl.java index 7cc31686d..9073f3dd7 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcUtl.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcUtl.java @@ -18,9 +18,12 @@ import org.openscada.opc.lib.da.*; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; /** * @author 20220102CG\noblelift @@ -30,10 +33,96 @@ public class OpcUtl { private static int timeout = 1 * 10 * 1000; private static String key = "rpc.socketTimeout"; + /** + * 按 opcCode 单例连接池:key=opcCode, value=Server + */ + private static final Map SERVER_POOL_BY_OPCCODE = new ConcurrentHashMap<>(); + /** + * 同一 opcCode 串行化 Server 操作(addGroup 等) + */ + private static final Map OPCCODE_LOCKS = new ConcurrentHashMap<>(); + static { checkTimeout(); } + /** + * 按 opcCode 获取或创建单例连接(连接池) + */ + public static Server getOrCreateServerByOpcCode(String opcCode, String host, String clsid, String user, String password, String domain) + throws BadRequestException { + if (opcCode == null || opcCode.isEmpty()) { + throw new BadRequestException("opcCode 不能为空"); + } + ReentrantLock lock = OPCCODE_LOCKS.computeIfAbsent(opcCode, k -> new ReentrantLock()); + lock.lock(); + try { + Server server = SERVER_POOL_BY_OPCCODE.get(opcCode); + if (server != null) { + log.debug("[OPC链路] 复用连接 opcCode={}", opcCode); + return server; + } + log.info("[OPC链路] 创建新连接 opcCode={}, host={}", opcCode, host); + server = getServer(host, clsid, user, password, domain); + SERVER_POOL_BY_OPCCODE.put(opcCode, server); + return server; + } finally { + lock.unlock(); + } + } + + /** + * 按 opcCode 释放连接(从池移除并 disconnect) + */ + public static void invalidateByOpcCode(String opcCode) { + if (opcCode == null || opcCode.isEmpty()) { + return; + } + ReentrantLock lock = OPCCODE_LOCKS.get(opcCode); + if (lock != null) { + lock.lock(); + } + try { + Server server = SERVER_POOL_BY_OPCCODE.remove(opcCode); + if (server != null) { + try { + server.disconnect(); + } catch (Exception e) { + log.warn("[OPC链路] 断开连接异常 opcCode={}", opcCode, e); + } + log.info("[OPC链路] 已释放连接 opcCode={}", opcCode); + } + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + /** + * 按 opcCode 加锁执行(用于同一连接上的 addGroup 等) + */ + public static void withOpcCodeLock(String opcCode, Runnable action) { + ReentrantLock lock = OPCCODE_LOCKS.computeIfAbsent(opcCode, k -> new ReentrantLock()); + lock.lock(); + try { + action.run(); + } finally { + lock.unlock(); + } + } + + /** + * 获取连接池状态(监控用) + */ + public static Map getPoolStatsByOpcCode() { + Map stats = new ConcurrentHashMap<>(); + SERVER_POOL_BY_OPCCODE.forEach((code, server) -> { + stats.put(code, server != null ? "connected" : "null"); + }); + return stats; + } + public static void checkTimeout() { if (Integer.getInteger(key, 0).intValue() != timeout) { System.setProperty(key, String.valueOf(timeout)); @@ -50,7 +139,8 @@ public class OpcUtl { log.info("Group返回下发信号结果:" + String.valueOf(e)); // group.write(requests); } catch (Exception e1) { - throw new BadRequestException("下发信号失败:" + e1.getMessage()); + log.error("下发信号失败:{},{}", requests, e1.getMessage()); + throw new BadRequestException("下发信号失败:" + e1.getMessage()); } boolean is_success = true; @@ -73,37 +163,50 @@ public class OpcUtl { } if (!is_success) { - // throw new BusinessException(message.toString()); System.out.println("下发信号失败:" + message.toString()); - System.out.println("下发信号失败原因:" + message.toString()); log.info("下发信号失败:" + message.toString()); throw new BadRequestException(message.toString()); } + } catch (BadRequestException arg7) { + throw arg7; } catch (Exception arg7) { log.info("下发信号失败:" + arg7.getMessage()); - System.out.println("下发信号失败原因:" + arg7.getMessage()); - throw new BadRequestException(arg7.toString()); + throw new BadRequestException("下发信号失败:" + arg7.getMessage()); } } + /** + * 下发:用 group.addItems 批量加项(库内会跳过已存在项),再写。 + * 同一 Group 多次调用不会重复 add 同一 item。 + */ public static void writeValue(Group group, ItemValue... values) throws BadRequestException { try { - if (values != null && values.length > 0) { - List ws = new ArrayList(); - ItemValue[] var3 = values; - int var4 = values.length; - - for (int var5 = 0; var5 < var4; ++var5) { - ItemValue value = var3[var5]; - Item item = group.addItem(value.getItem_code()); - ws.add(new WriteRequest(item, getVariant(value.getItem_value()))); - } - - writeValue(group, (WriteRequest[]) ws.toArray(new WriteRequest[0])); + if (values == null || values.length == 0) { + return; } - - } catch (AddFailedException | JIException var8) { - throw new BadRequestException(var8.toString()); + Set seen = new LinkedHashSet<>(); + List ids = new ArrayList<>(); + for (ItemValue value : values) { + String code = value.getItem_code(); + if (code != null && seen.add(code)) { + ids.add(code); + } + } + if (ids.isEmpty()) { + return; + } + Map itemMap = group.addItems(ids.toArray(new String[0])); + List ws = new ArrayList<>(); + for (ItemValue value : values) { + Item item = itemMap.get(value.getItem_code()); + if (item == null) { + throw new BadRequestException("Item 未添加: " + value.getItem_code()); + } + ws.add(new WriteRequest(item, getVariant(value.getItem_value()))); + } + writeValue(group, ws.toArray(new WriteRequest[0])); + } catch (AddFailedException | JIException e) { + throw new BadRequestException(e.toString()); } } @@ -223,8 +326,8 @@ public class OpcUtl { throws BadRequestException { checkTimeout(); Server server = null; - if (domain==null){ - domain=""; + if (domain == null) { + domain = ""; } try { @@ -241,8 +344,8 @@ public class OpcUtl { public static ConnectionInformation getConnection(String host, String clsid, String user, String password, String domain) { ConnectionInformation connection = new ConnectionInformation(); - if (domain==null){ - domain=""; + if (domain == null) { + domain = ""; } connection.setHost(host); connection.setClsid(clsid); diff --git a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/utils/ReadUtil.java b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/utils/ReadUtil.java index 45c28bdd2..4d21b97d6 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/utils/ReadUtil.java +++ b/acs2/nladmin-system/nlsso-server/src/main/java/org/nl/acs/utils/ReadUtil.java @@ -39,38 +39,33 @@ public class ReadUtil { static OpcMapper opcMapper = SpringContextHolder.getBean("opcMapper"); + /** + * 按 opc_id 查 acs_opc 得到 opc_code,从连接池取 Server(复用连接)。 + */ public static Server getServer(String opc_id) { - //OPC表【acs_opc】 -// JSONObject opcObj = WQLObject.getWQLObject("acs_opc").query("opc_id = '" + opc_id + "'").uniqueResult(0); - Opc opc = new LambdaQueryChainWrapper<>(opcMapper) .eq(Opc::getOpc_id, opc_id) .one(); - //RD1.RD1.1028 - // 连接信息 - ConnectionInformation ci = new ConnectionInformation(); - ci.setHost(opc.getOpc_host()); - ci.setDomain(StrUtil.isEmpty(opc.getDomain()) ? " " : opc.getDomain()); - ci.setUser(opc.getUser()); - ci.setPassword(opc.getPassword()); - ci.setClsid(opc.getCls_id()); - final Server server = new Server(ci, Executors.newSingleThreadScheduledExecutor()); - try { - server.connect(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } catch (JIException e) { - e.printStackTrace(); - } catch (AlreadyConnectedException e) { - e.printStackTrace(); + if (opc == null) { + throw new BadRequestException("OPC 不存在 opc_id=" + opc_id); } - return server; + String opcCode = opc.getOpc_code(); + OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); + return opcServerService.getServerInstanceByOpcCode(opcCode); } - public static void write(Map strings, Server server) { + /** + * 使用池内 Server 下发,匿名 Group 用后移除(由池管理连接生命周期)。 + */ + public static void write(Map strings, Server server) throws BadRequestException { + Group group; + try { + group = server.addGroup(); + } catch (Exception e) { + log.warn("ReadUtil.write addGroup 失败", e); + throw new BadRequestException("下发失败:" + e.getMessage()); + } try { - //Group group = this.opcServerService.getServer(opcServiceCode); - Group group = server.addGroup(); Iterator it = strings.keySet().iterator(); while (it.hasNext()) { String key = (String) it.next(); @@ -84,11 +79,16 @@ public class ReadUtil { list.add(write1); OpcUtl.writeValue(group, (WriteRequest[]) list.toArray(new WriteRequest[0])); } - server.disconnect(); } catch (Exception e) { - e.printStackTrace(); + log.warn("ReadUtil.write 失败", e); + throw new BadRequestException("下发失败:" + e.getMessage()); + } finally { + try { + group.remove(); + } catch (Exception e) { + log.warn("ReadUtil.write 移除匿名 Group 异常", e); + } } - } public static void writeAndCheck(Map strings, String opcServer) { OpcServerService server =SpringContextHolder.getBean(OpcServerService.class); @@ -135,10 +135,19 @@ public class ReadUtil { } - public static Map read(List itemString, Server server) { - HashMap map = new HashMap(); + /** + * 使用池内 Server 读取,匿名 Group 用后移除(由池管理连接生命周期)。 + */ + public static Map read(List itemString, Server server) throws BadRequestException { + Group group; try { - Group group = server.addGroup(); + group = server.addGroup(); + } catch (Exception e) { + log.warn("ReadUtil.read addGroup 失败", e); + throw new BadRequestException("读取失败:" + e.getMessage()); + } + try { + HashMap map = new HashMap(); Map items = new LinkedHashMap(); Iterator is = itemString.iterator(); @@ -147,7 +156,7 @@ public class ReadUtil { try { items.put(string, group.addItem(string)); } catch (Exception e) { - e.printStackTrace(); + log.warn("addItem 失败 item={}", string, e); } } @@ -160,13 +169,17 @@ public class ReadUtil { Object value = OpcUtl.getValue(key, (ItemState) itemStatus.get(key)); map.put(key.getId(), value); } - server.disconnect(); - + return map; } catch (Exception e) { - e.printStackTrace(); + log.warn("ReadUtil.read 失败", e); + throw new BadRequestException("读取失败:" + e.getMessage()); + } finally { + try { + group.remove(); + } catch (Exception e) { + log.warn("ReadUtil.read 移除匿名 Group 异常", e); + } } - - return map; } public static List> showAllOpcServer(String host, String user, String password, String domain) throws BadRequestException { diff --git a/acs2/nladmin-system/nlsso-server/src/main/resources/logback-spring.xml b/acs2/nladmin-system/nlsso-server/src/main/resources/logback-spring.xml index 427aaa5df..0b4d1a601 100644 --- a/acs2/nladmin-system/nlsso-server/src/main/resources/logback-spring.xml +++ b/acs2/nladmin-system/nlsso-server/src/main/resources/logback-spring.xml @@ -65,14 +65,14 @@ https://juejin.cn/post/6844903775631572999 512 - + - - + + - + + + +