This commit is contained in:
2026-03-04 20:04:47 +08:00
parent 1f91405a40
commit a791da75c0
12 changed files with 481 additions and 280 deletions

View File

@@ -2,6 +2,7 @@ package org.nl.acs.auto.initial;
import cn.hutool.core.util.ObjectUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@@ -33,6 +34,7 @@ public class ApplicationAutoInitialExecuter {
}
List<ApplicationAutoInitial> services = ObjectUtil.isNotEmpty(applicationAutoInitial) ? this.applicationAutoInitial : new ArrayList();
AnnotationAwareOrderComparator.sort(services);
Iterator it = services.iterator();
while (it.hasNext()) {

View File

@@ -11,6 +11,7 @@ import org.nl.common.exception.BadRequestException;
import org.nl.config.language.LangProcess;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import java.util.*;

View File

@@ -37,13 +37,8 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc
public void checkcontrol(Map<String, Object> itemValues) throws Exception {
Group group;
try {
group = opcServerService.getServer(this.getOpcServer());
} catch (Exception e) {
e.printStackTrace();
group = opcServerService.getServer(this.getOpcServer());
}
String opcServer = this.getOpcServer();
Group group = opcServerService.getServer(opcServer);
Map<String, Object> write = new HashMap();
Map<String, Item> readitems = new LinkedHashMap();
List<String> itemsString = new ArrayList();
@@ -60,24 +55,24 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc
}
int i = 0;
while (true) {
//下发信号
boolean check = true;
// 下发信号
try {
if (i == 0) {
control(itemValues);
} else {
controlByNewConn(itemValues);
}
} catch (Exception e) {
e.printStackTrace();
log.warn("checkcontrol 下发失败 opcServer={} 第{}次", opcServer, i + 1, e);
check = false;
}
// ThreadUtl.sleep(1000L);
Map<String, Object> read = new HashMap();
Map<Item, ItemState> itemStatus = null;
boolean check = true;
try {
if (i > 0) {
group = opcServerService.getServer(this.getOpcServer());
group = opcServerService.getServer(opcServer);
itemsString = new ArrayList<>(itemValues.keySet());
Iterator nis = itemsString.iterator();
@@ -206,23 +201,8 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc
this.last_items = this_items;
this.sendTime = date;
/* this.execute_log.setResource(this.getDevice().getCode(), this.getDevice().getName());
this.execute_log.log("原始记录{}->变更为{}", new Object[]{sb, this_items});
OpcServerService opcServerService = OpcServerFactory.getOpcServerService();*/
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerServiceImpl.class);
// 确认不回写 UDW由读循环更新
opcServerService.writeInteger(this.getOpcServer(), itemValues);
UnifiedDataAccessor opcValueAccessor = this.getOpcValueAccessor();
ItemValue[] var17 = itemValues;
int var18 = itemValues.length;
for (int var19 = 0; var19 < var18; ++var19) {
//ItemValue itemValue = var17[var19];
//String code = itemValue.getItem_code();
//Object value = itemValue.getItem_value();
//opcValueAccessor.setValue(code, value);
}
}
return true;
@@ -264,21 +244,8 @@ public class AbstractOpcDeviceDriver extends AbstractDeviceDriver implements Opc
this.last_items = this_items;
this.sendTime = date;
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerServiceImpl.class);
// 确认不回写 UDW由读循环更新
opcServerService.writeIntegerByNewConn(this.getOpcServer(), itemValues);
UnifiedDataAccessor opcValueAccessor = this.getOpcValueAccessor();
ItemValue[] var17 = itemValues;
int var18 = itemValues.length;
for (int var19 = 0; var19 < var18; ++var19) {
//ItemValue itemValue = var17[var19];
//String code = itemValue.getItem_code();
//Object value = itemValue.getItem_value();
//opcValueAccessor.setValue(code, value);
}
}
return true;

View File

@@ -13,6 +13,7 @@ import org.nl.acs.device_driver.DeviceDriverDefination;
import org.nl.acs.device_driver.LinewayDeviceDriver;
import org.nl.acs.device_driver.driver.OpcDeviceDriver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import java.util.*;
@@ -22,6 +23,7 @@ import java.util.*;
*/
@Slf4j
@Service
@Order(1)
public class DeviceAppServiceImpl implements DeviceAppService, ApplicationAutoInitial {
/**
* 所有设备链表

View File

@@ -1,12 +1,10 @@
package org.nl.acs.opc;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.nl.acs.AcsConfig;
import org.nl.acs.instruction.service.InstructionService;
import org.nl.acs.opc.service.dto.OpcServerManageDto;
import org.nl.acs.udw.UnifiedDataAccessor;
import org.nl.acs.udw.UnifiedDataAccessorFactory;
@@ -16,9 +14,13 @@ import org.nl.config.SpringContextHolder;
import org.nl.config.lucene.service.LuceneExecuteLogService;
import org.nl.config.lucene.service.dto.LuceneLogDto;
import org.nl.system.service.param.ISysParamService;
import org.openscada.opc.dcom.common.Result;
import org.openscada.opc.dcom.da.OPCITEMRESULT;
import org.openscada.opc.lib.da.*;
import java.nio.channels.ClosedChannelException;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
@@ -92,227 +94,269 @@ public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerC
private void runOld() {
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService .class);
if (this.OpcServer == null) {
log.warn("[OPC链路] OpcServer 为空,任务退出 thread={}", Thread.currentThread().getName());
return;
}
String opcCode = this.OpcServer.getOpc_code();
if (StrUtil.isEmpty(opcCode)) {
log.warn("[OPC链路] opcCode 为空,任务退出");
return;
}
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class);
String tag = Thread.currentThread().getName() + "," + getOpcGroupID();
log.info("[OPC链路] 任务启动 opcCode={} tag={} items={}", opcCode, tag, this.protocols != null ? this.protocols.size() : 0);
while (true) {
start:
try {
// 1. 清组
if (this.group != null) {
group.clear();
group.remove();
log.trace("清理group...");
}
if (this.server != null) {
server.disconnect();
log.trace("清理server...");
}
// this.server = OpcServerUtl.getServerWithOutException(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain());
this.server = OpcUtl.getServer(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain());
this.server.addStateListener(this);
group = this.server.addGroup();
List<String> itemsString = new ArrayList();
Iterator var3 = this.protocols.iterator();
while (var3.hasNext()) {
OpcItemDto protocol = (OpcItemDto) var3.next();
String item = protocol.getItem_code();
itemsString.add(item);
}
Map<String, Item> itemsMap = new LinkedHashMap();
boolean is_error = false;
StringBuilder err_message = new StringBuilder();
Iterator var6 = itemsString.iterator();
while (var6.hasNext()) {
String string = (String) var6.next();
try {
Item item = group.addItem(string);
itemsMap.put(string, item);
log.trace("添加成功 {}", string);
} catch (Exception var26) {
err_message.append(string + ":" + var26.getMessage());
if (!is_error) {
is_error = true;
}
this.group.clear();
this.group.remove();
} catch (Exception e) {
log.debug("[OPC链路] 清理group异常 opcCode={} {}", opcCode, e.getMessage());
}
this.group = null;
log.info("[OPC链路] 已清理本组 opcCode={} tag={}", opcCode, tag);
}
// 2. 从连接池获取连接+建组
OpcUtl.withOpcCodeLock(opcCode, () -> {
try {
this.server = opcServerService.getServerInstanceByOpcCode(opcCode);
this.server.addStateListener(this);
this.group = this.server.addGroup();
log.info("[OPC链路] 取连接并建组成功 opcCode={} tag={}", opcCode, tag);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
if (this.server == null || this.group == null) {
log.warn("[OPC链路] 取连接或建组失败 opcCode={} tag={}", opcCode, tag);
break start;
}
// 3. 添加Item 先校验item再批量添加item
List<String> requiredItemIds = new ArrayList<>();
Set<String> seen = new LinkedHashSet<>();
for (OpcItemDto protocol : this.protocols) {
if (protocol != null && StrUtil.isNotBlank(protocol.getItem_code()) && seen.add(protocol.getItem_code())) {
requiredItemIds.add(protocol.getItem_code());
}
}
Map<String, Item> itemsMap = new LinkedHashMap<>();
boolean is_error = false;
StringBuilder err_message = new StringBuilder();
if (!requiredItemIds.isEmpty()) {
try {
Map<String, Result<OPCITEMRESULT>> validationResults =
group.validateItems(requiredItemIds.toArray(new String[0]));
Set<String> validItemIds = new HashSet<>();
for (Map.Entry<String, Result<OPCITEMRESULT>> entry : validationResults.entrySet()) {
if (entry.getValue().getErrorCode() == 0) {
validItemIds.add(entry.getKey());
} else {
log.warn("[OPC链路] Item验证失败 opcCode={} item={} 错误码={}", opcCode, entry.getKey(), entry.getValue().getErrorCode());
err_message.append(entry.getKey()).append(":").append(entry.getValue().getErrorCode()).append("; ");
if (!is_error) is_error = true;
}
}
if (!validItemIds.isEmpty()) {
Map<String, Item> added = group.addItems(validItemIds.toArray(new String[0]));
itemsMap.putAll(added);
}
} catch (AddFailedException e) {
itemsMap.putAll(e.getItems());
e.getErrors().forEach((id, code) -> err_message.append(id).append(":").append(code).append("; "));
if (!is_error) is_error = true;
} catch (Exception e) {
err_message.append(e.getMessage());
if (!is_error) is_error = true;
}
}
String tag;
if (is_error) {
tag = err_message.toString();
log.warn("{}:{}", OpcConfig.resource_code, tag);
log.warn("[OPC链路] 部分Item校验/添加失败 opcCode={} tag={} err={}", opcCode, tag, err_message);
}
log.info("[OPC链路] 加Item完成 opcCode={} tag={} total={} ok={}", opcCode, tag, requiredItemIds.size(), itemsMap.size());
if (itemsMap.isEmpty()) {
log.warn("[OPC链路] 本组无可用Item跳过读取 opcCode={} tag={}", opcCode, tag);
ThreadUtl.sleep(1000);
break start;
}
if (!OpcStartTag.is_run) {
OpcStartTag.is_run = true;
}
tag = "";
if (log.isWarnEnabled()) {
tag = Thread.currentThread().getName();
if (this.OpcServer != null) {
tag = tag + "," + this.getOpcGroupID();
}
}
UnifiedDataAccessor accessor_value = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key);
boolean time_out = false;
int readTimeoutMs = OpcConfig.opc_read_timeout_ms != null ? OpcConfig.opc_read_timeout_ms : 2000;
int retryReadDelayMs = OpcConfig.opc_retry_read_delay_ms != null ? OpcConfig.opc_retry_read_delay_ms : 150;
int staggerMaxMs = OpcConfig.opc_stagger_read_max_ms != null ? Math.max(1, OpcConfig.opc_stagger_read_max_ms) : 100;
long lastReadMonitorLog = 0L;
final long READ_MONITOR_INTERVAL_MS = 30_000L;
while (DeviceOpcSynchronizeAutoRun.isRun) {
if (this.server == null || this.group == null) {
log.warn("[OPC链路] 连接或组已空,退出读循环 opcCode={} tag={}", opcCode, tag);
break;
}
// 随机延迟,防止同时读group
ThreadUtl.sleep(ThreadLocalRandom.current().nextInt(0, staggerMaxMs));
long begin = System.currentTimeMillis();
if (log.isTraceEnabled()) {
log.trace("{} 开始记时{}", tag, DateUtil.now());
}
Map<Item, ItemState> itemStatus = group.read(true, (Item[]) itemsMap.values().toArray(new Item[0]));
long end = System.currentTimeMillis();
long duration = end - begin;
if (log.isTraceEnabled()) {
log.trace("{} 读取耗时:{}", tag, duration);
}
if (duration > 1000L) {
if (!time_out) {
log.warn("{} 读取超时 : {}", tag, duration);
}
Map<Item, ItemState> itemStatus = group.read(true, itemsMap.values().toArray(new Item[0]));
long duration = System.currentTimeMillis() - begin;
log.debug("[OPC链路] 读完成 opcCode={} tag={} durationMs={} items={}", opcCode, tag, duration, itemStatus != null ? itemStatus.size() : 0);
if (duration > readTimeoutMs) {
log.warn("[OPC链路] 读取超时 opcCode={} tag={} durationMs={}", opcCode, tag, duration);
time_out = true;
ThreadUtl.sleep(retryReadDelayMs);
itemStatus = group.read(true, itemsMap.values().toArray(new Item[0]));
duration = System.currentTimeMillis() - begin;
} else {
time_out = false;
}
int goodCount = 0;
int badCount = 0;
boolean valueAllNotNull = false;
Set<Item> items = itemStatus.keySet();
Iterator var18 = items.iterator();
while (var18.hasNext()) {
Item item = (Item) var18.next();
ItemState itemState = (ItemState) itemStatus.get(item);
for (Map.Entry<Item, ItemState> entry : (itemStatus != null ? itemStatus.entrySet() : Collections.<Map.Entry<Item, ItemState>>emptySet())) {
Item item = entry.getKey();
ItemState itemState = entry.getValue();
Object value = OpcUtl.getValue(item, itemState);
if (value != null) {
valueAllNotNull = true;
goodCount++;
} else {
log.info("item:{},velue为空value:{}", item.getId(), value);
badCount++;
log.debug("[OPC链路] item值为空 opcCode={} itemId={} quality={}", opcCode, item.getId(), itemState != null ? itemState.getQuality() : null);
}
String itemId = item.getId();
Object his = accessor_value.getValue(itemId);
if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && his != null) {
log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality());
if (itemState != null && !ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && his != null) {
log.warn("[OPC链路] 值不健康 opcCode={} itemId={} quality={}", opcCode, itemId, itemState.getQuality());
valueAllNotNull = false;
}
if (!UnifiedDataAppService.isEquals(value, his)) {
OpcItemDto itemDto = this.getItem(itemId);
if (true) {
this.logItemChanged(itemId, accessor_value, value, itemDto);
}
if(!ObjectUtil.isEmpty(value) || "".equals(value)){
this.logItemChanged(itemId, accessor_value, value, itemDto);
if (!ObjectUtil.isEmpty(value) || "".equals(value)) {
accessor_value.setValue(itemId, value);
}
if(ObjectUtil.isEmpty(value) && !"".equals(value)){
if (ObjectUtil.isEmpty(value) && !"".equals(value)) {
accessor_value.removeValue(itemId);
}
}
}
end = System.currentTimeMillis();
if (log.isTraceEnabled()) {
log.trace("{}", itemsString);
log.trace("{} 计算完成耗时{}", tag, end - begin);
int total = goodCount + badCount;
int unhealthyRatioThreshold = OpcConfig.opc_unhealthy_ratio_threshold_percent != null ? OpcConfig.opc_unhealthy_ratio_threshold_percent : 10;
if (total > 0) {
long now = System.currentTimeMillis();
if (now - lastReadMonitorLog >= READ_MONITOR_INTERVAL_MS) {
lastReadMonitorLog = now;
log.info("[OPC链路] 读监控 opcCode={} tag={} good={} bad={} total={} durationMs={} timeout={}",
opcCode, tag, goodCount, badCount, total, duration, time_out);
} else {
log.debug("[OPC链路] 本组健康统计 opcCode={} tag={} good={} bad={} total={}", opcCode, tag, goodCount, badCount, total);
}
}
ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond);
ThreadUtl.sleep(OpcConfig.opc_loop_interval_ms != null ? OpcConfig.opc_loop_interval_ms.longValue() : 500L);
if (this.error_num != 0) {
this.error_num = 0;
this.message = null;
}
if (!valueAllNotNull) {
++this.all_null;
int unhealthyRatio = total > 0 ? (badCount * 100 / total) : 100;
if (this.all_null < 3) {
if (log.isWarnEnabled()) {
log.warn("OPC数据源: {} 所有内容都为空,检查网络, all_null:{} ,暂定{}s", tag, all_null,3);
}
while (var18.hasNext()) {
Item item = (Item) var18.next();
String itemId = item.getId();
OpcItemDto itemDto = this.getItem(itemId);
LuceneLogDto logDto = LuceneLogDto.builder()
.device_code("OPC数据源")
.content("OPC数据源:" + tag + "内容为:" + itemDto)
.build();
logDto.setLog_level(4);
luceneExecuteLogService.deviceExecuteLog(logDto);
}
ThreadUtl.sleep( 3000);
log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} unhealthyRatio={}% 阈值={}% 暂定3s", opcCode, tag, all_null, unhealthyRatio, unhealthyRatioThreshold);
ThreadUtl.sleep(3000);
break start;
} else if (this.all_null < 6) {
if (log.isWarnEnabled()) {
log.warn(tag + "重新创建server");
log.warn("{} 所有内容都为空, all_null:{} ,暂定{}s", tag, all_null,3);
}
log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} 暂定3s", opcCode, tag, all_null);
ThreadUtl.sleep(3000);
break start;
} else if (this.all_null < 12) {
if (log.isWarnEnabled()) {
log.warn(tag + "重新创建server");
log.warn("{} 所有内容都为空, all_null:{} ,暂定{}s", tag, all_null,3);
}
log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} 暂定3s", opcCode, tag, all_null);
ThreadUtl.sleep(3000);
break start;
} else {
if (log.isWarnEnabled()) {
log.warn("{} 所有内容都为空, all_null:{} ,暂定{}ms", tag, all_null, 5000);
}
log.warn("[OPC链路] 全空/不健康 opcCode={} tag={} all_null={} 暂定5s", opcCode, tag, all_null);
LuceneLogDto logDto = LuceneLogDto.builder()
.device_code("OPC数据源")
.content("OPC数据源:" + tag + "内容为:" + all_null)
.build();
logDto.setLog_level(4);
luceneExecuteLogService.deviceExecuteLog(logDto);
ThreadUtl.sleep((long) (5000));
ThreadUtl.sleep(5000);
break start;
}
// ++this.all_null;
} else {
this.all_null = 0;
}
// break start;
}
log.warn("opc线程停止。。。");
log.info("[OPC链路] 任务正常退出 opcCode={} tag={}", opcCode, tag);
return;
} catch (Exception var27) {
if (this.server != null) {
log.warn("[OPC链路] 任务异常 opcCode={} tag={}", opcCode, tag, var27);
if (this.group != null) {
try {
this.server.disconnect();
} catch (Exception var25) {
this.group.clear();
this.group.remove();
} catch (Exception e) {
log.debug("[OPC链路] 异常时清理group失败 {}", e.getMessage());
}
this.group = null;
}
this.server = null;
if (isConnectionLevelException(var27)) {
log.warn("[OPC链路] 连接级异常,释放连接 opcCode={}", opcCode);
opcServerService.invalidateByOpcCode(opcCode);
ThreadUtl.sleep(200);
}
this.server = null;
if (!DeviceOpcSynchronizeAutoRun.isRun) {
log.warn("opc线程停止2。。。");
log.warn("[OPC链路] 同步器已停止,任务退出 opcCode={} tag={}", opcCode, tag);
return;
}
String error_message = "设备信息同步异常";
if (!StrUtil.equals(this.message, error_message)) {
log.warn(error_message, var27);
log.warn("[OPC链路] {}", error_message, var27);
}
ThreadUtl.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000));
++this.error_num;
if (this.error_num > 3 && !StrUtil.equals(this.message, error_message)) {
if (this.error_num > 3) {
this.message = error_message;
}
}
}
}
private static boolean isConnectionLevelException(Throwable e) {
if (e == null) return false;
Throwable t = e;
while (t != null) {
if (t instanceof ClosedChannelException) return true;
if (t instanceof java.net.SocketTimeoutException) return true;
if (t instanceof java.net.SocketException) return true;
String msg = t.getMessage();
if (msg != null && (msg.contains("0x8001FFFF") || msg.contains("ClosedChannel") || msg.contains("connection reset"))) return true;
t = t.getCause();
}
return false;
}
private void runNew() {
@@ -385,10 +429,20 @@ public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerC
@Override
public void connectionStateChanged(boolean connected) {
if (!connected) {
if (this.group != null) {
try {
this.group.clear();
this.group.remove();
} catch (Exception e) {
log.debug("[OPC链路] 断开时清理group异常 {}", e.getMessage());
}
this.group = null;
}
this.server = null;
log.warn("[OPC链路] 连接断开,已清组 opcCode={} tag={}", this.OpcServer != null ? this.OpcServer.getOpc_code() : null, getOpcGroupID());
} else {
log.info("[OPC链路] 连接已连接 opcCode={} tag={}", this.OpcServer != null ? this.OpcServer.getOpc_code() : null, getOpcGroupID());
}
log.warn("opc server {} {}", this.getOpcGroupID(), connected ? "connected" : "disconnected");
}
private String getOpcGroupID() {

View File

@@ -1,21 +1,19 @@
package org.nl.acs.opc;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.core.support.ThreadPoolBuilder;
import org.nl.acs.auto.run.AbstractAutoRunnable;
import org.nl.acs.opc.service.dto.OpcServerManageDto;
import org.nl.config.thread.TheadFactoryName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE;
@@ -25,8 +23,11 @@ import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_B
* @author 20220102CG\noblelift
*/
@Component
@Slf4j
@Order(100)
public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable {
public static boolean isRun = false;
ExecutorService executorService = ThreadPoolBuilder.newBuilder()
.threadPoolName("deviceOpc_thread")
@@ -67,11 +68,17 @@ public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable {
} while (ObjectUtil.isEmpty(pros));
Set<String> keys = pros.keySet();
Iterator var4 = keys.iterator();
//代码执行一次
int submitted = 0;
int skipped = 0;
while (var4.hasNext()) {
String key = (String) var4.next();
List<List<OpcItemDto>> list = (List) pros.get(key);
OpcServerManageDto opcServer = (OpcServerManageDto) servers.get(key);
if (opcServer == null) {
log.warn("[OPC链路] opcCode 无配置,跳过提交 key={}", key);
skipped++;
continue;
}
Iterator var8 = list.iterator();
while (var8.hasNext()) {
List<OpcItemDto> groupProtols = (List) var8.next();
@@ -79,15 +86,29 @@ public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable {
runable.setProtocols(groupProtols);
runable.setOpcServer(opcServer);
this.executorService.submit(runable);
submitted++;
}
}
log.info("[OPC链路] 任务提交完成 已提交={} 跳过={} opcCodes={}", submitted, skipped, keys.size());
// 同步无光电设备信号
//Map<String, List<List<OpcItemDto>>> pros1 = this.deviceAppService.findAllFormatProtocolFromDriver();
//List<DeviceDriver> opcDrivers = this.deviceAppService.findDeviceDriver(DeviceDriver.class);
long lastMonitorLog = 0;
final long MONITOR_INTERVAL_MS = 60_000;
while (true) {
Thread.sleep(3000L);
long now = System.currentTimeMillis();
if (now - lastMonitorLog >= MONITOR_INTERVAL_MS) {
lastMonitorLog = now;
try {
Map<String, String> stats = OpcUtl.getPoolStatsByOpcCode();
log.info("[OPC链路] 连接池监控 池大小={} 明细={}", stats.size(), stats);
} catch (Exception e) {
log.debug("[OPC链路] 连接池监控异常 {}", e.getMessage());
}
}
}
}
}

View File

@@ -7,6 +7,16 @@ public class OpcConfig {
public static Boolean auto_start_opc = Boolean.valueOf(true);
public static String udw_opc_value_key = "opc_value";
public static Integer synchronized_millisecond = Integer.valueOf(100);
/** 读循环间隔毫秒runOld 用 */
public static Integer opc_loop_interval_ms = Integer.valueOf(500);
/** 读超时阈值(毫秒),超过打超时日志并重试读一次 */
public static Integer opc_read_timeout_ms = Integer.valueOf(2000);
/** 读超时后重试前的延迟(毫秒) */
public static Integer opc_retry_read_delay_ms = Integer.valueOf(150);
/** 读前错峰随机延迟上限毫秒0~该值随机 */
public static Integer opc_stagger_read_max_ms = Integer.valueOf(100);
/** 不健康比例阈值(%),用于日志与告警判断 */
public static Integer opc_unhealthy_ratio_threshold_percent = Integer.valueOf(10);
public static Integer synchronized_exception_wait_second = Integer.valueOf(10);
public static Integer retry_times = Integer.valueOf(3);
public static String sync_issue_type_code = "device_opc_sync";

View File

@@ -2,6 +2,7 @@ package org.nl.acs.opc;
import org.nl.acs.device_driver.driver.ItemValue;
import org.openscada.opc.lib.da.Group;
import org.openscada.opc.lib.da.Server;
/**
* @author ldjun
@@ -16,6 +17,16 @@ public interface OpcServerService {
*/
void reload();
/**
* 按 opcCode 获取连接池中的 Server 实例(单例连接,仅根据 opcCode 取)
*/
Server getServerInstanceByOpcCode(String opcCode);
/**
* 按 opcCode 释放连接(从池移除并 disconnect
*/
void invalidateByOpcCode(String opcCode);
/**
* 获取服务器
* @param var1

View File

@@ -6,6 +6,7 @@ import org.nl.acs.auto.initial.ApplicationAutoInitial;
import org.nl.acs.device_driver.driver.ItemValue;
import org.nl.acs.opc.service.dto.OpcServerManageDto;
import org.openscada.opc.lib.common.NotConnectedException;
import org.nl.common.exception.BadRequestException;
import org.openscada.opc.lib.da.Group;
import org.openscada.opc.lib.da.Server;
import org.openscada.opc.lib.da.UnknownGroupException;
@@ -73,6 +74,38 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn
this.opcServerManageDtos = Collections.synchronizedMap(this.opcServerManageDtos);
}
@Override
public Server getServerInstanceByOpcCode(String opcCode) {
if (StrUtil.isEmpty(opcCode)) {
throw new IllegalArgumentException("opcCode 不能为空");
}
OpcServerManageDto dto = this.opcServerManageDtos.get(opcCode);
if (dto == null) {
log.warn("[OPC链路] opcCode 无配置 opcCode={}", opcCode);
throw new RuntimeException("opcCode 不存在: " + opcCode);
}
try {
return OpcUtl.getOrCreateServerByOpcCode(
opcCode,
dto.getOpc_host(),
StrUtil.trim(dto.getCls_id()),
dto.getUser(),
dto.getPassword(),
StrUtil.trim(dto.getDomain())
);
} catch (BadRequestException e) {
throw new RuntimeException("[OPC链路] 获取连接失败 opcCode=" + opcCode, e);
}
}
@Override
public void invalidateByOpcCode(String opcCode) {
OpcUtl.invalidateByOpcCode(opcCode);
this.servers.remove(opcCode);
this.groups.remove(opcCode);
log.info("[OPC链路] 已释放连接 opcCode={}", opcCode);
}
@Override
public Group getServer(String code) {
synchronized (this.buildLock(code)) {
@@ -117,14 +150,12 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn
throw new RuntimeException(code + "不存在");
}
// if (server!=null){
// server.disconnect();
// server=null;
// }
if (server == null) {
// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
try {
server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
} catch (BadRequestException e) {
throw new RuntimeException("[OPC链路] 获取连接失败 code=" + code, e);
}
}
try {
@@ -133,9 +164,8 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn
this.clearServer(code);
ThreadUtl.sleep(1500L);
log.warn("获取opc出错重新获取", code, var12);
// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
try {
server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
group = server.addGroup(groupName);
} catch (Exception var11) {
throw new RuntimeException(var12);
@@ -156,21 +186,19 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn
@Override
public synchronized Group getServerByNewConn(String code) {
synchronized (this.buildLock(code)) {
Server server = (Server) this.servers.get(code);
if (server != null) {
this.clearServer(code);
}
this.clearServer(code);
OpcServerManageDto dto = (OpcServerManageDto) this.opcServerManageDtos.get(code);
if (dto == null) {
throw new RuntimeException(code + "不存在");
}
// if (server == null) {
// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
// }
Server server;
try {
server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
} catch (BadRequestException e) {
throw new RuntimeException("[OPC链路] 获取连接失败 code=" + code, e);
}
String groupName = code;
Group group = null;
@@ -180,9 +208,8 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn
this.clearServer(code);
ThreadUtl.sleep(2000L);
log.warn("获取opc出错重新获取", code, var12);
// server = OpcServerUtl.getServerWithOutException(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
server = OpcUtl.getServer(dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
try {
server = OpcUtl.getOrCreateServerByOpcCode(code, dto.getOpc_host(), StrUtil.trim(dto.getCls_id()), dto.getUser(), dto.getPassword(), StrUtil.trim(dto.getDomain()));
group = server.addGroup(groupName);
} catch (Exception var11) {
throw new RuntimeException(var12);
@@ -197,37 +224,24 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn
@Override
public synchronized void clearServer(String code) {
try {
Server server = (Server) this.servers.get(code);
if (server != null) {
server.disconnect();
}
} catch (Exception e) {
e.printStackTrace();
log.error("清理server异常,", e.getMessage());
}
OpcUtl.invalidateByOpcCode(code);
this.servers.remove(code);
this.groups.remove(code);
}
@Override
public void cleanGroups(String opcCode) {
Group group = (Group)this.groups.get(opcCode);
Group group = (Group) this.groups.get(opcCode);
if (group != null) {
Server server = group.getServer();
try {
group.remove();
} catch (JIException var5) {
var5.printStackTrace();
log.warn("cleanGroups remove group 异常 opcCode={}", opcCode, var5);
}
OpcUtl.invalidateByOpcCode(opcCode);
this.groups.remove(opcCode);
server.disconnect();
this.servers.remove(opcCode);
}
}
@Override

View File

@@ -18,9 +18,12 @@ import org.openscada.opc.lib.da.*;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 20220102CG\noblelift
@@ -30,10 +33,96 @@ public class OpcUtl {
private static int timeout = 1 * 10 * 1000;
private static String key = "rpc.socketTimeout";
/**
* 按 opcCode 单例连接池key=opcCode, value=Server
*/
private static final Map<String, Server> SERVER_POOL_BY_OPCCODE = new ConcurrentHashMap<>();
/**
* 同一 opcCode 串行化 Server 操作addGroup 等)
*/
private static final Map<String, ReentrantLock> OPCCODE_LOCKS = new ConcurrentHashMap<>();
static {
checkTimeout();
}
/**
* 按 opcCode 获取或创建单例连接(连接池)
*/
public static Server getOrCreateServerByOpcCode(String opcCode, String host, String clsid, String user, String password, String domain)
throws BadRequestException {
if (opcCode == null || opcCode.isEmpty()) {
throw new BadRequestException("opcCode 不能为空");
}
ReentrantLock lock = OPCCODE_LOCKS.computeIfAbsent(opcCode, k -> new ReentrantLock());
lock.lock();
try {
Server server = SERVER_POOL_BY_OPCCODE.get(opcCode);
if (server != null) {
log.debug("[OPC链路] 复用连接 opcCode={}", opcCode);
return server;
}
log.info("[OPC链路] 创建新连接 opcCode={}, host={}", opcCode, host);
server = getServer(host, clsid, user, password, domain);
SERVER_POOL_BY_OPCCODE.put(opcCode, server);
return server;
} finally {
lock.unlock();
}
}
/**
* 按 opcCode 释放连接(从池移除并 disconnect
*/
public static void invalidateByOpcCode(String opcCode) {
if (opcCode == null || opcCode.isEmpty()) {
return;
}
ReentrantLock lock = OPCCODE_LOCKS.get(opcCode);
if (lock != null) {
lock.lock();
}
try {
Server server = SERVER_POOL_BY_OPCCODE.remove(opcCode);
if (server != null) {
try {
server.disconnect();
} catch (Exception e) {
log.warn("[OPC链路] 断开连接异常 opcCode={}", opcCode, e);
}
log.info("[OPC链路] 已释放连接 opcCode={}", opcCode);
}
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 按 opcCode 加锁执行(用于同一连接上的 addGroup 等)
*/
public static void withOpcCodeLock(String opcCode, Runnable action) {
ReentrantLock lock = OPCCODE_LOCKS.computeIfAbsent(opcCode, k -> new ReentrantLock());
lock.lock();
try {
action.run();
} finally {
lock.unlock();
}
}
/**
* 获取连接池状态(监控用)
*/
public static Map<String, String> getPoolStatsByOpcCode() {
Map<String, String> stats = new ConcurrentHashMap<>();
SERVER_POOL_BY_OPCCODE.forEach((code, server) -> {
stats.put(code, server != null ? "connected" : "null");
});
return stats;
}
public static void checkTimeout() {
if (Integer.getInteger(key, 0).intValue() != timeout) {
System.setProperty(key, String.valueOf(timeout));
@@ -50,7 +139,8 @@ public class OpcUtl {
log.info("Group返回下发信号结果" + String.valueOf(e));
// group.write(requests);
} catch (Exception e1) {
throw new BadRequestException("下发信号失败:" + e1.getMessage());
log.error("下发信号失败:{},{}", requests, e1.getMessage());
throw new BadRequestException("下发信号失败:" + e1.getMessage());
}
boolean is_success = true;
@@ -73,37 +163,50 @@ public class OpcUtl {
}
if (!is_success) {
// throw new BusinessException(message.toString());
System.out.println("下发信号失败:" + message.toString());
System.out.println("下发信号失败原因:" + message.toString());
log.info("下发信号失败:" + message.toString());
throw new BadRequestException(message.toString());
}
} catch (BadRequestException arg7) {
throw arg7;
} catch (Exception arg7) {
log.info("下发信号失败:" + arg7.getMessage());
System.out.println("下发信号失败原因" + arg7.getMessage());
throw new BadRequestException(arg7.toString());
throw new BadRequestException("下发信号失败:" + arg7.getMessage());
}
}
/**
* 下发:用 group.addItems 批量加项(库内会跳过已存在项),再写。
* 同一 Group 多次调用不会重复 add 同一 item。
*/
public static void writeValue(Group group, ItemValue... values) throws BadRequestException {
try {
if (values != null && values.length > 0) {
List<WriteRequest> ws = new ArrayList();
ItemValue[] var3 = values;
int var4 = values.length;
for (int var5 = 0; var5 < var4; ++var5) {
ItemValue value = var3[var5];
Item item = group.addItem(value.getItem_code());
ws.add(new WriteRequest(item, getVariant(value.getItem_value())));
}
writeValue(group, (WriteRequest[]) ws.toArray(new WriteRequest[0]));
if (values == null || values.length == 0) {
return;
}
} catch (AddFailedException | JIException var8) {
throw new BadRequestException(var8.toString());
Set<String> seen = new LinkedHashSet<>();
List<String> ids = new ArrayList<>();
for (ItemValue value : values) {
String code = value.getItem_code();
if (code != null && seen.add(code)) {
ids.add(code);
}
}
if (ids.isEmpty()) {
return;
}
Map<String, Item> itemMap = group.addItems(ids.toArray(new String[0]));
List<WriteRequest> ws = new ArrayList<>();
for (ItemValue value : values) {
Item item = itemMap.get(value.getItem_code());
if (item == null) {
throw new BadRequestException("Item 未添加: " + value.getItem_code());
}
ws.add(new WriteRequest(item, getVariant(value.getItem_value())));
}
writeValue(group, ws.toArray(new WriteRequest[0]));
} catch (AddFailedException | JIException e) {
throw new BadRequestException(e.toString());
}
}
@@ -223,8 +326,8 @@ public class OpcUtl {
throws BadRequestException {
checkTimeout();
Server server = null;
if (domain==null){
domain="";
if (domain == null) {
domain = "";
}
try {
@@ -241,8 +344,8 @@ public class OpcUtl {
public static ConnectionInformation getConnection(String host, String clsid, String user, String password,
String domain) {
ConnectionInformation connection = new ConnectionInformation();
if (domain==null){
domain="";
if (domain == null) {
domain = "";
}
connection.setHost(host);
connection.setClsid(clsid);

View File

@@ -39,38 +39,33 @@ public class ReadUtil {
static OpcMapper opcMapper = SpringContextHolder.getBean("opcMapper");
/**
* 按 opc_id 查 acs_opc 得到 opc_code从连接池取 Server复用连接
*/
public static Server getServer(String opc_id) {
//OPC表【acs_opc】
// JSONObject opcObj = WQLObject.getWQLObject("acs_opc").query("opc_id = '" + opc_id + "'").uniqueResult(0);
Opc opc = new LambdaQueryChainWrapper<>(opcMapper)
.eq(Opc::getOpc_id, opc_id)
.one();
//RD1.RD1.1028
// 连接信息
ConnectionInformation ci = new ConnectionInformation();
ci.setHost(opc.getOpc_host());
ci.setDomain(StrUtil.isEmpty(opc.getDomain()) ? " " : opc.getDomain());
ci.setUser(opc.getUser());
ci.setPassword(opc.getPassword());
ci.setClsid(opc.getCls_id());
final Server server = new Server(ci, Executors.newSingleThreadScheduledExecutor());
try {
server.connect();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (JIException e) {
e.printStackTrace();
} catch (AlreadyConnectedException e) {
e.printStackTrace();
if (opc == null) {
throw new BadRequestException("OPC 不存在 opc_id=" + opc_id);
}
return server;
String opcCode = opc.getOpc_code();
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class);
return opcServerService.getServerInstanceByOpcCode(opcCode);
}
public static void write(Map<String, Object> strings, Server server) {
/**
* 使用池内 Server 下发,匿名 Group 用后移除(由池管理连接生命周期)。
*/
public static void write(Map<String, Object> strings, Server server) throws BadRequestException {
Group group;
try {
group = server.addGroup();
} catch (Exception e) {
log.warn("ReadUtil.write addGroup 失败", e);
throw new BadRequestException("下发失败:" + e.getMessage());
}
try {
//Group group = this.opcServerService.getServer(opcServiceCode);
Group group = server.addGroup();
Iterator it = strings.keySet().iterator();
while (it.hasNext()) {
String key = (String) it.next();
@@ -84,11 +79,16 @@ public class ReadUtil {
list.add(write1);
OpcUtl.writeValue(group, (WriteRequest[]) list.toArray(new WriteRequest[0]));
}
server.disconnect();
} catch (Exception e) {
e.printStackTrace();
log.warn("ReadUtil.write 失败", e);
throw new BadRequestException("下发失败:" + e.getMessage());
} finally {
try {
group.remove();
} catch (Exception e) {
log.warn("ReadUtil.write 移除匿名 Group 异常", e);
}
}
}
public static void writeAndCheck(Map<String, Object> strings, String opcServer) {
OpcServerService server =SpringContextHolder.getBean(OpcServerService.class);
@@ -135,10 +135,19 @@ public class ReadUtil {
}
public static Map<String, Object> read(List<String> itemString, Server server) {
HashMap map = new HashMap();
/**
* 使用池内 Server 读取,匿名 Group 用后移除(由池管理连接生命周期)。
*/
public static Map<String, Object> read(List<String> itemString, Server server) throws BadRequestException {
Group group;
try {
Group group = server.addGroup();
group = server.addGroup();
} catch (Exception e) {
log.warn("ReadUtil.read addGroup 失败", e);
throw new BadRequestException("读取失败:" + e.getMessage());
}
try {
HashMap map = new HashMap();
Map<String, Item> items = new LinkedHashMap();
Iterator is = itemString.iterator();
@@ -147,7 +156,7 @@ public class ReadUtil {
try {
items.put(string, group.addItem(string));
} catch (Exception e) {
e.printStackTrace();
log.warn("addItem 失败 item={}", string, e);
}
}
@@ -160,13 +169,17 @@ public class ReadUtil {
Object value = OpcUtl.getValue(key, (ItemState) itemStatus.get(key));
map.put(key.getId(), value);
}
server.disconnect();
return map;
} catch (Exception e) {
e.printStackTrace();
log.warn("ReadUtil.read 失败", e);
throw new BadRequestException("读取失败:" + e.getMessage());
} finally {
try {
group.remove();
} catch (Exception e) {
log.warn("ReadUtil.read 移除匿名 Group 异常", e);
}
}
return map;
}
public static List<Map<String, String>> showAllOpcServer(String host, String user, String password, String domain) throws BadRequestException {

View File

@@ -65,14 +65,14 @@ https://juejin.cn/post/6844903775631572999
<queueSize>512</queueSize>
</appender>
<!--开发环境:打印控制台-->
<!--开发环境:打印控制台和输出到文件-->
<springProfile name="dev">
<root level="info">
<!-- <appender-ref ref="asyncLuceneAppender"/>-->
<!-- <appender-ref ref="asyncFileAppender"/>-->
<appender-ref ref="asyncLuceneAppender"/>
<appender-ref ref="asyncFileAppender"/>
<appender-ref ref="CONSOLE"/>
</root>
<!-- <logger name="jdbc" level="ERROR" additivity="true">
<logger name="jdbc" level="ERROR" additivity="true">
<appender-ref ref="asyncFileAppender"/>
</logger>
<logger name="org.springframework" level="ERROR" additivity="true">
@@ -95,7 +95,10 @@ https://juejin.cn/post/6844903775631572999
</logger>
<logger name="org.jinterop" level="ERROR" additivity="true">
<appender-ref ref="asyncFileAppender"/>
</logger>-->
</logger>
<logger name="org.openscada" level="ERROR" additivity="true">
<appender-ref ref="asyncFileAppender"/>
</logger>
</springProfile>
<!--测试环境:打印控制台-->