feat:opcua协议读写

This commit is contained in:
2026-03-16 19:20:19 +08:00
parent fdfdc71f79
commit e152fb68a9
3 changed files with 824 additions and 137 deletions

View File

@@ -109,7 +109,7 @@ public enum PointTypeFlagEnum {
* @return {@link PointTypeFlagEnum}
*/
public static PointTypeFlagEnum ofCode(String code) {
Optional<PointTypeFlagEnum> any = Arrays.stream(PointTypeFlagEnum.values()).filter(type -> type.getCode().equals(code)).findFirst();
Optional<PointTypeFlagEnum> any = Arrays.stream(PointTypeFlagEnum.values()).filter(type -> type.getCode().equals(code.toLowerCase())).findFirst();
return any.orElse(null);
}

View File

@@ -5,6 +5,7 @@ import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -12,18 +13,22 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.WriteValue;
import org.nl.common.exception.CommonException;
import org.nl.iot.core.driver.bo.AttributeBO;
import org.nl.iot.core.driver.bo.DeviceBO;
import org.nl.iot.core.driver.bo.MetadataEventDTO;
import org.nl.iot.core.driver.bo.SiteBO;
import org.nl.iot.core.driver.entity.RValue;
import org.nl.iot.core.driver.entity.WResponse;
import org.nl.iot.core.driver.entity.WValue;
import org.nl.iot.core.driver.enums.MetadataOperateTypeEnum;
import org.nl.iot.core.driver.enums.PointTypeFlagEnum;
import org.nl.iot.core.driver.service.DriverCustomService;
import org.nl.iot.modular.iot.entity.IotConfig;
import org.nl.iot.modular.iot.entity.IotConnect;
import org.springframework.stereotype.Service;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
@@ -46,6 +51,21 @@ public class OpcUaProtocolDriverImpl implements DriverCustomService {
connectMap = new ConcurrentHashMap<>(16);
}
// @PreDestroy
// public void destroy() {
// // 应用关闭时清理所有连接
// log.info("Cleaning up OPC UA connections...");
// connectMap.values().forEach(client -> {
// try {
// client.disconnect().get(5, TimeUnit.SECONDS);
// } catch (Exception e) {
// log.warn("Error disconnecting OPC UA client during cleanup: {}", e.getMessage());
// }
// });
// connectMap.clear();
// log.info("OPC UA connections cleanup completed");
// }
@Override
public void schedule() {
@@ -58,30 +78,84 @@ public class OpcUaProtocolDriverImpl implements DriverCustomService {
// When the device is updated or deleted, remove the corresponding connection handle
if (MetadataOperateTypeEnum.DELETE.equals(operateType) || MetadataOperateTypeEnum.UPDATE.equals(operateType)) {
connectMap.remove(metadataEvent.getConnectId());
OpcUaClient client = connectMap.remove(metadataEvent.getConnectId());
if (client != null) {
try {
client.disconnect().get(5, TimeUnit.SECONDS);
log.info("Disconnected OPC UA client for device: {}", metadataEvent.getConnectId());
} catch (Exception e) {
log.warn("Error disconnecting OPC UA client: {}", e.getMessage());
}
}
}
}
@Override
public RValue read(IotConnect connect, IotConfig config) {
return new RValue(config, connect, readValue(getConnector(connect), config));
public RValue read(DeviceBO device, SiteBO point) {
return readValue(device, point);
}
@Override
public Boolean write(DeviceBO device, WValue wValue) {
OpcUaClient client = getConnector(device);
return writeValue(client, device, wValue);
}
@Override
public List<RValue> batchRead(DeviceBO device, List<SiteBO> point) {
return batchReadValue(getConnector(device), device, point);
}
@Override
public List<WResponse> batchWrite(DeviceBO device, List<WValue> wValue) {
int maxRetries = 2; // 最大重试次数
for (int retry = 0; retry <= maxRetries; retry++) {
try {
OpcUaClient client = getConnector(device);
return batchWriteNodes(client, wValue);
} catch (Exception e) {
log.error("批量写入失败 (重试 {}/{}): {}", retry + 1, maxRetries + 1, e.getMessage());
// 如果是会话相关错误,清除缓存的连接并重试
if (e.getMessage().contains("SessionClosed") || e.getMessage().contains("Bad_SessionClosed")) {
log.warn("检测到会话关闭错误,清除缓存连接并重试");
connectMap.remove(device.getId());
if (retry < maxRetries) {
continue;
}
}
// 最后一次重试失败,为每个写入值创建错误结果
if (retry == maxRetries) {
List<WResponse> errorResponses = new ArrayList<>();
for (WValue wVal : wValue) {
errorResponses.add(new WResponse(false, wVal, e.getMessage()));
}
return errorResponses;
}
}
}
// 理论上不会到达这里
List<WResponse> errorResponses = new ArrayList<>();
for (WValue wVal : wValue) {
errorResponses.add(new WResponse(false, wVal, "未知错误"));
}
return errorResponses;
}
/**
* 获取 OPC UA 客户端连接
*
* @param deviceId 设备ID, 用于标识唯一的设备连接
* @param driverConfig 驱动配置信息, 包含连接 OPC UA 服务器所需的参数
* @return OpcUaClient 返回与指定设备关联的 OPC UA 客户端实例
* @throws CommonException 如果连接 OPC UA 服务器失败, 抛出此异常
*/
private OpcUaClient getConnector(IotConnect connect) {
log.debug("OPC UA server connection info: {}", connect);
OpcUaClient opcUaClient = connectMap.get(connect.getId().toString());
private OpcUaClient getConnector(DeviceBO device) {
log.debug("OPC UA server connection info: {}", device);
OpcUaClient opcUaClient = connectMap.get(device.getId());
if (Objects.isNull(opcUaClient)) {
JSONObject driverConfig = JSONObject.parseObject(connect.getProperties());
String host = connect.getHost();
int port = connect.getPort();
JSONObject driverConfig = JSONObject.parseObject(device.getProperties());
String host = device.getHost();
int port = device.getPort();
String path = driverConfig.getString("path");
String url = String.format("opc.tcp://%s:%s%s", host, port, path);
try {
@@ -91,13 +165,15 @@ public class OpcUaProtocolDriverImpl implements DriverCustomService {
configBuilder -> configBuilder
.setIdentityProvider(new AnonymousProvider()) // 使用匿名身份验证
.setRequestTimeout(Unsigned.uint(5000)) // 设置请求超时时间为 5000 毫秒
.setSessionTimeout(Unsigned.uint(60000)) // 设置会话超时时间为 60 秒
.build()
);
connectMap.put(connect.getId().toString(), opcUaClient);
connectMap.put(device.getId(), opcUaClient);
log.info("Created new OPC UA client for device: {}", device.getId());
} catch (UaException e) {
connectMap.entrySet().removeIf(next -> next.getKey().equals(connect.getId().toString()));
log.error("Failed to connect OPC UA client: {}", e.getMessage(), e);
throw new CommonException(e.getMessage());
connectMap.entrySet().removeIf(next -> next.getKey().equals(device.getId()));
log.error("Failed to create OPC UA client: {}", e.getMessage(), e);
throw new IllegalArgumentException(e.getMessage());
}
}
return opcUaClient;
@@ -106,139 +182,208 @@ public class OpcUaProtocolDriverImpl implements DriverCustomService {
/**
* 读取 OPC UA 节点的值
*
* @param client OPC UA 客户端实例
* @param config 点位配置信息
* @param device 设备信息
* @param point 点位配置信息
* @return 读取到的节点值
* @throws CommonException 如果读取操作失败, 抛出此异常
*/
private String readValue(OpcUaClient client, IotConfig config) {
private RValue readValue(DeviceBO device, SiteBO point) {
int maxRetries = 2; // 最大重试次数
for (int retry = 0; retry <= maxRetries; retry++) {
try {
OpcUaClient client = getConnector(device);
NodeId nodeId = NodeId.parse(point.getRegisterAddress());
// 确保客户端已连接
client.connect().get(10, TimeUnit.SECONDS);
// 直接使用同步方式读取值,设置更长的超时时间
DataValue dataValue = client.readValue(0.0, TimestampsToReturn.Both, nodeId).get(10, TimeUnit.SECONDS);
// 检查读取状态
if (dataValue.getStatusCode().isGood()) {
Object value = dataValue.getValue().getValue();
if (value != null) {
return new RValue(device, point, value.toString(), "");
} else {
return new RValue(device, point, null, "读取数据为空!");
}
} else {
return new RValue(device, point, null, "读取数据失败: " + dataValue.getStatusCode().getValue());
}
} catch (Exception e) {
log.error("读取 OPC UA 节点失败 (重试 {}/{}): {}", retry + 1, maxRetries + 1, e.getMessage());
// 如果是会话相关错误,清除缓存的连接并重试
if (e.getMessage().contains("SessionClosed") || e.getMessage().contains("Bad_SessionClosed")) {
log.warn("检测到会话关闭错误,清除缓存连接并重试");
connectMap.remove(device.getId());
if (retry < maxRetries) {
continue;
}
}
// 最后一次重试失败
if (retry == maxRetries) {
return new RValue(device, point, null, e.getMessage());
}
}
}
return new RValue(device, point, null, "读取失败:超过最大重试次数");
}
/**
* 批量读取
* @param connector
* @param device
* @param point
* @return
*/
private List<RValue> batchReadValue(OpcUaClient connector, DeviceBO device, List<SiteBO> points) {
return batchReadNodes(connector, device, points);
}
/**
* 核心:批量读取节点值
* @param client OPC UA客户端
* @param device 设备信息
* @param points 点位列表
* @return 读取结果列表顺序与nodeIds一致
*/
private List<RValue> batchReadNodes(OpcUaClient client, DeviceBO device, List<SiteBO> points) {
List<RValue> list = new ArrayList<>();
int maxRetries = 2; // 最大重试次数
for (int retry = 0; retry <= maxRetries; retry++) {
try {
// 确保客户端已连接
client.connect().get(10, TimeUnit.SECONDS);
// 异步批量读取readValues是Milo推荐的批量读值API
List<NodeId> nodeIds = buildNodeIds(points);
CompletableFuture<List<DataValue>> future = client.readValues(
0.0, // MaxAge=0强制读取最新值
TimestampsToReturn.Both, // 返回源时间戳+服务器时间戳
nodeIds // 批量节点列表
);
// 10秒超时获取结果异步转同步新手友好
List<DataValue> dataValues = future.get(10, TimeUnit.SECONDS);
// 转成List<RValue>
for (int i = 0; i < dataValues.size(); i++) {
DataValue dataValue = dataValues.get(i);
SiteBO site = getSiteByNodeId(points, nodeIds.get(i));
// 检查读取状态
if (dataValue.getStatusCode().isGood()) {
Object value = dataValue.getValue().getValue();
if (value != null) {
list.add(new RValue(device, site, value.toString(), null));
} else {
list.add(new RValue(device, site, null, "读取数据为空"));
}
} else {
list.add(new RValue(device, site, null, "读取数据失败: " + dataValue.getStatusCode().getValue()));
}
}
// 成功读取,跳出重试循环
break;
} catch (Exception e) {
log.error("批量读取 OPC UA 节点失败 (重试 {}/{}): {}", retry + 1, maxRetries + 1, e.getMessage());
// 如果是会话相关错误,清除缓存的连接并重新获取
if (e.getMessage().contains("SessionClosed") || e.getMessage().contains("Bad_SessionClosed")) {
log.warn("检测到会话关闭错误,清除缓存连接并重试");
connectMap.remove(device.getId());
if (retry < maxRetries) {
// 重新获取连接
client = getConnector(device);
continue;
}
}
// 最后一次重试失败,为每个点位创建错误结果
if (retry == maxRetries) {
list.clear(); // 清空之前可能的部分结果
for (SiteBO point : points) {
list.add(new RValue(device, point, null, e.getMessage()));
}
}
}
}
return list;
}
/**
* 批量写入核心方法
*/
private List<WResponse> batchWriteNodes(OpcUaClient client, List<WValue> wValues) {
List<WResponse> responses = new ArrayList<>();
try {
NodeId nodeId = getNode(config);
// 确保客户端已连接
client.connect().get(10, TimeUnit.SECONDS);
// 直接使用同步方式读取值,设置更长的超时时间
DataValue dataValue = client.readValue(0.0, TimestampsToReturn.Both, nodeId).get(10, TimeUnit.SECONDS);
// 检查读取状态
if (dataValue.getStatusCode().isGood()) {
Object value = dataValue.getValue().getValue();
return value != null ? value.toString() : null;
} else {
log.error("Read opc ua value failed with status: {}", dataValue.getStatusCode());
throw new CommonException("Read failed with status: " + dataValue.getStatusCode());
List<DataValue> dateValues = new ArrayList<>();
List<NodeId> nodeIds = new ArrayList<>();
for (WValue wValue : wValues) {
nodeIds.add(NodeId.parse(wValue.getPoint().getRegisterAddress()));
dateValues.add(buildDataValueByType(wValue));
}
} catch (InterruptedException e) {
log.error("Read opc ua value error: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
throw new CommonException(e.getMessage());
} catch (ExecutionException | TimeoutException e) {
log.error("Read opc ua value error: {}", e.getMessage(), e);
throw new CommonException(e.getMessage());
CompletableFuture<List<StatusCode>> future = client.writeValues(nodeIds, dateValues);
// 10秒超时获取结果
List<StatusCode> statusCodeList = future.get(10, TimeUnit.SECONDS);
// 转成WResponse
for (int i = 0; i < statusCodeList.size(); i++) {
StatusCode statusCode = statusCodeList.get(i);
responses.add(new WResponse(statusCode.isGood(), wValues.get(i), statusCode.isGood() ? "" : statusCode.toString()));
}
} catch (Exception e) {
log.error("批量写入 OPC UA 节点失败: {}", e.getMessage());
throw new RuntimeException(e);
}
}
@Override
public Boolean write(IotConnect connect, IotConfig config, WValue wValue) {
OpcUaClient client = getConnector(connect);
return writeValue(client, config, wValue);
return responses;
}
/**
* 写入 OPC UA 节点的值
*
* @param client OPC UA 客户端实例
* @param config 点位配置信息
* @param wValue 写入值
* @return 写入操作是否成功
* @throws CommonException 如果写入操作失败, 抛出此异常
*/
private boolean writeValue(OpcUaClient client, IotConfig config, WValue wValue) {
private boolean writeValue(OpcUaClient client, DeviceBO device, WValue wValue) {
try {
NodeId nodeId = getNode(config);
NodeId nodeId = NodeId.parse(wValue.getPoint().getRegisterAddress());
// 确保客户端已连接,设置超时时间
client.connect().get(10, TimeUnit.SECONDS);
return writeNode(client, nodeId, wValue);
} catch (InterruptedException e) {
log.error("Write opc ua value error: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
throw new CommonException(e.getMessage());
} catch (ExecutionException | TimeoutException e) {
log.error("Write opc ua value error: {}", e.getMessage(), e);
} catch (Exception e) {
log.error("写入 opc ua value 出错: {}", e.getMessage(), e);
// 连接异常时,移除缓存的连接
connectMap.remove(device.getId());
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new CommonException(e.getMessage());
}
}
public SiteBO getSiteByNodeId(List<SiteBO> points, NodeId nodeId) {
return points.stream().filter(p -> NodeId.parse(p.getRegisterAddress()).toString().equals(nodeId.toString())).findFirst().orElse(null);
}
/**
* 根据点位配置获取 OPC UA 节点
*
* @param pointConfig 点位配置信息, 包含命名空间和标签
* @return OPC UA 节点标识
* 组装nodeids
* @param points
* @return
*/
private NodeId getNode(IotConfig config) {
public List<NodeId> buildNodeIds(List<SiteBO> points) {
// 解析地址ns=3;s=Temperature
return parseAddress(config.getRegisterAddress());
}
/**
* 解析地址字符串返回NodeId对象
* @param address 地址字符串格式要求ns=3;s=xxx
* @return 包含namespace(固定3)和tag的NodeId对象
* @throws IllegalArgumentException 输入格式错误或ns值不为3时抛出异常
*/
public static NodeId parseAddress(String address) {
// 校验输入不为空
if (address == null || address.trim().isEmpty()) {
throw new IllegalArgumentException("地址字符串不能为空");
List<NodeId> list = new ArrayList<>();
for (SiteBO point : points) {
list.add(NodeId.parse(point.getRegisterAddress()));
}
// 按分号拆分字符串
String[] parts = address.split(";");
if (parts.length != 2) {
throw new IllegalArgumentException("地址格式错误正确格式应为ns=x;s=xxx");
}
int namespace = -1;
String tag = null;
// 遍历拆分后的部分提取ns和s的值
for (String part : parts) {
String[] keyValue = part.split("=");
if (keyValue.length != 2) {
throw new IllegalArgumentException("地址格式错误键值对格式应为key=value");
}
String key = keyValue[0].trim();
String value = keyValue[1].trim();
switch (key) {
case "ns":
// 解析ns的值并校验是否为3
try {
namespace = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("ns的值必须是数字", e);
}
if (namespace != 3) {
throw new IllegalArgumentException("ns的值必须等于3");
}
break;
case "s":
// 提取s对应的标签值
tag = value;
break;
default:
throw new IllegalArgumentException("不支持的键:" + key + "仅支持ns和s");
}
}
// 校验s的值不为空
if (tag == null || tag.isEmpty()) {
throw new IllegalArgumentException("s的标签值不能为空");
}
// 返回NodeId对象
return new NodeId(namespace, tag);
return list;
}
/**
@@ -247,10 +392,6 @@ public class OpcUaProtocolDriverImpl implements DriverCustomService {
* @param client OPC UA 客户端实例
* @param nodeId OPC UA 节点标识
* @param wValue 待写入的值
* @return 写入操作是否成功
* @throws ExecutionException 写入操作失败时抛出
* @throws InterruptedException 写入操作被中断时抛出
* @throws TimeoutException 写入操作超时时抛出
*/
private boolean writeNode(OpcUaClient client, NodeId nodeId, WValue wValue) throws ExecutionException, InterruptedException, TimeoutException {
PointTypeFlagEnum valueType = PointTypeFlagEnum.ofCode(wValue.getType());
@@ -295,4 +436,34 @@ public class OpcUaProtocolDriverImpl implements DriverCustomService {
}
return false;
}
private DataValue buildDataValueByType(WValue wValue) {
PointTypeFlagEnum valueType = PointTypeFlagEnum.ofCode(wValue.getType());
if (Objects.isNull(valueType)) {
throw new CommonException("Unsupported type of " + wValue.getType());
}
return switch (valueType) {
case INT -> {
int intValue = wValue.getValueByClass(Integer.class);
yield new DataValue(new Variant(intValue));
}
case LONG -> {
long longValue = wValue.getValueByClass(Long.class);
yield new DataValue(new Variant(longValue));
}
case FLOAT -> {
float floatValue = wValue.getValueByClass(Float.class);
yield new DataValue(new Variant(floatValue));
}
case DOUBLE -> {
double doubleValue = wValue.getValueByClass(Double.class);
yield new DataValue(new Variant(doubleValue));
}
case BOOLEAN -> {
boolean booleanValue = wValue.getValueByClass(Boolean.class);
yield new DataValue(new Variant(booleanValue));
}
default -> new DataValue(new Variant(wValue.getValue()));
};
}
}