rev 更新opcua
This commit is contained in:
@@ -446,6 +446,24 @@
|
||||
<artifactId>swagger-annotations</artifactId>
|
||||
<version>1.5.22</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.11</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!--Milo客户端的依赖-->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.milo</groupId>
|
||||
<artifactId>sdk-client</artifactId>
|
||||
<version>0.6.3</version>
|
||||
</dependency>
|
||||
<!--Milo客户端的依赖-->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.milo</groupId>
|
||||
<artifactId>sdk-server</artifactId>
|
||||
<version>0.6.3</version>
|
||||
</dependency>
|
||||
<!--<dependency>
|
||||
<groupId>xerces</groupId>
|
||||
<artifactId>xercesImpl</artifactId>
|
||||
|
||||
@@ -1325,8 +1325,7 @@ public class DeviceServiceImpl extends CommonServiceImpl<DeviceMapper, Device> i
|
||||
itemString.add(json.getString("code"));
|
||||
|
||||
}
|
||||
Server server = ReadUtil.getServer(opc_id);
|
||||
final Map<String, Object> readList = ReadUtil.read(itemString, server);
|
||||
final Map<String, Object> readList = ReadUtil.read(itemString);
|
||||
|
||||
JSONArray result = new JSONArray();
|
||||
for (int i = 0; i < dbItems.size(); i++) {
|
||||
@@ -1354,8 +1353,7 @@ public class DeviceServiceImpl extends CommonServiceImpl<DeviceMapper, Device> i
|
||||
itemMap.put(json.getString("code"), json.getString("dbw_value"));
|
||||
}
|
||||
}
|
||||
Server server = ReadUtil.getServer(opc_id);
|
||||
ReadUtil.write(itemMap, server);
|
||||
ReadUtil.write(itemMap);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -5,6 +5,19 @@ import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
||||
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
|
||||
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
|
||||
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
|
||||
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
|
||||
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
|
||||
import org.eclipse.milo.opcua.stack.core.UaException;
|
||||
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
||||
import org.nl.acs.AcsConfig;
|
||||
import org.nl.acs.instruction.service.InstructionService;
|
||||
import org.nl.acs.opc.service.dto.OpcServerManageDto;
|
||||
@@ -18,7 +31,11 @@ import org.nl.config.lucene.service.dto.LuceneLogDto;
|
||||
import org.nl.system.service.param.ISysParamService;
|
||||
import org.openscada.opc.lib.da.*;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
||||
@@ -319,68 +336,159 @@ public class DeviceOpcProtocolRunable implements Runnable, DataCallback, ServerC
|
||||
|
||||
|
||||
private void runNew() {
|
||||
Async20Access accessor = null;
|
||||
try{
|
||||
OpcUaClient client = createClient();
|
||||
client.connect().get();
|
||||
managedSubscriptionEvent(client);
|
||||
} catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
String opcGroupId = this.getOpcGroupID();
|
||||
private final static String endPointUrl = "opc.tcp://127.0.0.1:49320";
|
||||
|
||||
try {
|
||||
if (this.server == null) {
|
||||
// this.server = OpcServerUtl.getServerWithOutException(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain());
|
||||
this.server = OpcUtl.getServer(this.OpcServer.getOpc_host(), this.OpcServer.getCls_id(), this.OpcServer.getUser(), this.OpcServer.getPassword(), this.OpcServer.getDomain());
|
||||
this.server.addStateListener(this);
|
||||
accessor = new Async20Access(this.server, OpcConfig.synchronized_millisecond, true);
|
||||
Iterator var9 = this.protocols.iterator();
|
||||
private static OpcUaClient createClient() throws Exception {
|
||||
//opc ua服务端地址
|
||||
Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
|
||||
Files.createDirectories(securityTempDir);
|
||||
if (!Files.exists(securityTempDir)) {
|
||||
throw new Exception("unable to create security dir: " + securityTempDir);
|
||||
}
|
||||
return OpcUaClient.create(endPointUrl, endpoints -> endpoints.stream().filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri())).findFirst(), configBuilder -> configBuilder.setApplicationName(LocalizedText.english("eclipse milo opc-ua client")).setApplicationUri("urn:eclipse:milo:examples:client")
|
||||
//访问方式
|
||||
.setIdentityProvider(new AnonymousProvider()).setRequestTimeout(UInteger.valueOf(5000)).build());
|
||||
}
|
||||
|
||||
while (var9.hasNext()) {
|
||||
OpcItemDto protocol = (OpcItemDto) var9.next();
|
||||
String itemId = protocol.getItem_code();
|
||||
accessor.addItem(itemId, this);
|
||||
}
|
||||
/**
|
||||
* 批量订阅
|
||||
*
|
||||
* @param client
|
||||
* @throws Exception
|
||||
*/
|
||||
private void managedSubscriptionEvent(OpcUaClient client) throws Exception {
|
||||
final CountDownLatch eventLatch = new CountDownLatch(1);
|
||||
//添加订阅监听器,用于处理断线重连后的订阅问题
|
||||
client.getSubscriptionManager().addSubscriptionListener(new DeviceOpcProtocolRunable.CustomSubscriptionListener(client));
|
||||
//处理订阅业务
|
||||
handlerNode(client);
|
||||
//持续监听
|
||||
eventLatch.await();
|
||||
}
|
||||
|
||||
accessor.bind();
|
||||
log.info("Async20Access bind {}", opcGroupId);
|
||||
}
|
||||
|
||||
Thread.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000));
|
||||
} catch (Exception var8) {
|
||||
if (accessor != null) {
|
||||
try {
|
||||
log.warn("Async20Access unbind {}", opcGroupId);
|
||||
accessor.unbind();
|
||||
} catch (Exception var7) {
|
||||
var7.printStackTrace();
|
||||
}
|
||||
/**
|
||||
* 处理订阅业务
|
||||
*
|
||||
* @param client OPC UA客户端
|
||||
*/
|
||||
public void handlerNode(OpcUaClient client) {
|
||||
try {
|
||||
//创建订阅
|
||||
ManagedSubscription subscription = ManagedSubscription.create(client);
|
||||
|
||||
accessor = null;
|
||||
}
|
||||
//你所需要订阅的key
|
||||
List<String> key = new ArrayList<>();
|
||||
|
||||
if (this.server != null) {
|
||||
try {
|
||||
this.server.disconnect();
|
||||
} catch (Exception var6) {
|
||||
}
|
||||
List<String> itemsString = new ArrayList();
|
||||
Iterator var3 = this.protocols.iterator();
|
||||
|
||||
this.server = null;
|
||||
}
|
||||
|
||||
if (var8 instanceof InterruptedException) {
|
||||
log.warn("OPC 同步线程(%s)被中断", opcGroupId);
|
||||
return;
|
||||
}
|
||||
|
||||
log.warn("设备信息同步异常", var8);
|
||||
ThreadUtl.sleep((long) (OpcConfig.synchronized_exception_wait_second * 1000));
|
||||
String error_message = var8.getMessage();
|
||||
if (error_message == null) {
|
||||
error_message = var8.toString();
|
||||
}
|
||||
|
||||
++this.error_num;
|
||||
if (this.error_num > 3 && !StrUtil.equals(this.message, error_message)) {
|
||||
this.message = error_message;
|
||||
}
|
||||
while (var3.hasNext()) {
|
||||
OpcItemDto protocol = (OpcItemDto) var3.next();
|
||||
String item = protocol.getItem_code();
|
||||
key.add(item);
|
||||
}
|
||||
|
||||
|
||||
List<NodeId> nodeIdList = new ArrayList<>();
|
||||
for (String s : key) {
|
||||
nodeIdList.add(new NodeId(2, s));
|
||||
}
|
||||
|
||||
//监听
|
||||
List<ManagedDataItem> dataItemList = subscription.createDataItems(nodeIdList);
|
||||
|
||||
UnifiedDataAccessor accessor_value = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key);
|
||||
|
||||
if (!OpcStartTag.is_run) {
|
||||
OpcStartTag.is_run = true;
|
||||
}
|
||||
|
||||
for (ManagedDataItem managedDataItem : dataItemList) {
|
||||
managedDataItem.addDataValueListener((t) -> {
|
||||
if(ObjectUtil.isNotEmpty(managedDataItem.getNodeId().getIdentifier())){
|
||||
//managedDataItem.getNodeId().getIdentifier() ZJXXHJ.ZJXXHJ.ZXQ_05.to_command
|
||||
//System.out.println(managedDataItem.getNodeId().getIdentifier().toString() + ":" + t.getValue().getValue());
|
||||
//需要判断连接是否正常
|
||||
if( t.getStatusCode().isGood()){
|
||||
//字符串存在为空的情况
|
||||
if(ObjectUtil.isNotEmpty(t.getValue().getValue())){
|
||||
if(!ObjectUtil.equal(accessor_value.getValue(managedDataItem.getNodeId().getIdentifier().toString()),t.getValue().getValue().toString())){
|
||||
OpcItemDto itemDto = this.getItem(managedDataItem.getNodeId().getIdentifier().toString());
|
||||
this.logItemChanged(managedDataItem.getNodeId().getIdentifier().toString(), accessor_value, OpcUtl.getValue(t.getValue().getValue()),itemDto);
|
||||
accessor_value.setValue(managedDataItem.getNodeId().getIdentifier().toString(), OpcUtl.getValue(t.getValue().getValue()));
|
||||
}
|
||||
} else {
|
||||
if(!ObjectUtil.equal(accessor_value.getValue(managedDataItem.getNodeId().getIdentifier().toString()),t.getValue().getValue())){
|
||||
OpcItemDto itemDto = this.getItem(managedDataItem.getNodeId().getIdentifier().toString());
|
||||
this.logItemChanged(managedDataItem.getNodeId().getIdentifier().toString(), accessor_value, OpcUtl.getValue(t.getValue()),itemDto);
|
||||
accessor_value.setValue(managedDataItem.getNodeId().getIdentifier().toString(), OpcUtl.getValue(t.getValue().getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 自定义订阅监听
|
||||
*/
|
||||
private class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener {
|
||||
|
||||
private OpcUaClient client;
|
||||
|
||||
CustomSubscriptionListener(OpcUaClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
|
||||
// System.out.println("onKeepAlive");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStatusChanged(UaSubscription subscription, StatusCode status) {
|
||||
// System.out.println("onStatusChanged");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPublishFailure(UaException exception) {
|
||||
// System.out.println("onPublishFailure");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNotificationDataLost(UaSubscription subscription) {
|
||||
// System.out.println("onNotificationDataLost");
|
||||
}
|
||||
|
||||
/**
|
||||
* 重连时 尝试恢复之前的订阅失败时 会调用此方法
|
||||
* @param uaSubscription 订阅
|
||||
* @param statusCode 状态
|
||||
*/
|
||||
@Override
|
||||
public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) {
|
||||
System.out.println("恢复订阅失败 需要重新订阅");
|
||||
//在回调方法中重新订阅
|
||||
handlerNode(client);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,5 +17,5 @@ public class OpcConfig {
|
||||
/**
|
||||
* OPC 数据同步是否采用回调机制实现。之前是线程定期全部读,效率低。
|
||||
*/
|
||||
public static Boolean opc_item_read_using_callback = false;
|
||||
public static Boolean opc_item_read_using_callback = true;
|
||||
}
|
||||
|
||||
@@ -5,6 +5,10 @@ import cn.hutool.core.date.TimeInterval;
|
||||
import cn.hutool.core.util.NumberUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.dynamictp.core.DtpRegistry;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UByte;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
|
||||
import org.jinterop.dcom.common.JIException;
|
||||
import org.jinterop.dcom.core.*;
|
||||
import org.nl.acs.device_driver.driver.ItemValue;
|
||||
@@ -130,6 +134,24 @@ public class OpcUtl {
|
||||
}
|
||||
}
|
||||
|
||||
public static Object getValue(Object value) throws BadRequestException {
|
||||
try{
|
||||
if(value instanceof UByte){
|
||||
return Integer.valueOf(value.toString());
|
||||
} else if (value instanceof UInteger){
|
||||
return Integer.valueOf(value.toString());
|
||||
} else if (value instanceof ByteString) {
|
||||
return value.toString();
|
||||
} else if (value instanceof UShort) {
|
||||
return Integer.valueOf(value.toString());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new BadRequestException(e.getMessage());
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
|
||||
public static Object getValue(Item item, ItemState itemState) throws BadRequestException {
|
||||
if (NumberUtil.compare(itemState.getQuality(), Short.valueOf(QualityTypeValue.OPC_QUALITY_GOOD)) != 0) {
|
||||
if (item != null) {
|
||||
|
||||
@@ -0,0 +1,270 @@
|
||||
package org.nl.acs.opc;
|
||||
|
||||
import cn.hutool.core.util.NumberUtil;
|
||||
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
||||
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
|
||||
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
|
||||
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
|
||||
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
|
||||
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
|
||||
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
|
||||
import org.eclipse.milo.opcua.stack.core.AttributeId;
|
||||
import org.eclipse.milo.opcua.stack.core.Identifiers;
|
||||
import org.eclipse.milo.opcua.stack.core.UaException;
|
||||
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
||||
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
|
||||
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
|
||||
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
|
||||
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
|
||||
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class TestOpcUA {
|
||||
// private final static String endPointUrl = "opc.tcp://192.168.10.60:49320";
|
||||
|
||||
private final static String endPointUrl = "opc.tcp://127.0.0.1:49320";
|
||||
|
||||
|
||||
/**
|
||||
* 创建OPC UA客户端
|
||||
*/
|
||||
private static OpcUaClient createClient() throws Exception {
|
||||
//opc ua服务端地址
|
||||
Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
|
||||
Files.createDirectories(securityTempDir);
|
||||
if (!Files.exists(securityTempDir)) {
|
||||
throw new Exception("unable to create security dir: " + securityTempDir);
|
||||
}
|
||||
return OpcUaClient.create(endPointUrl, endpoints -> endpoints.stream().filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri())).findFirst(), configBuilder -> configBuilder.setApplicationName(LocalizedText.english("eclipse milo opc-ua client")).setApplicationUri("urn:eclipse:milo:examples:client")
|
||||
//访问方式
|
||||
.setIdentityProvider(new AnonymousProvider()).setRequestTimeout(UInteger.valueOf(500)).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 遍历树形节点
|
||||
*
|
||||
* @param client OPC UA客户端
|
||||
* @param uaNode 节点
|
||||
* @throws Exception
|
||||
*/
|
||||
private static void browseNode(OpcUaClient client, UaNode uaNode) throws Exception {
|
||||
List<? extends UaNode> nodes;
|
||||
if (uaNode == null) {
|
||||
nodes = client.getAddressSpace().browseNodes(Identifiers.ObjectsFolder);
|
||||
} else {
|
||||
nodes = client.getAddressSpace().browseNodes(uaNode);
|
||||
}
|
||||
for (UaNode nd : nodes) {
|
||||
//排除系统行性节点,这些系统性节点名称一般都是以"_"开头
|
||||
if (Objects.requireNonNull(nd.getBrowseName().getName()).contains("_")) {
|
||||
continue;
|
||||
}
|
||||
System.out.println("Node= " + nd.getBrowseName().getName());
|
||||
browseNode(client, nd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取节点数据
|
||||
*
|
||||
* @param client OPC UA客户端
|
||||
* @throws Exception
|
||||
*/
|
||||
private static void readNode(OpcUaClient client) throws Exception {
|
||||
int namespaceIndex = 2;
|
||||
String identifier = "DDJ1.DDJ1.DDJ1.to_command";
|
||||
//节点
|
||||
NodeId nodeId = new NodeId(namespaceIndex, identifier);
|
||||
//读取节点数据
|
||||
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
|
||||
//标识符
|
||||
identifier = String.valueOf(nodeId.getIdentifier());
|
||||
System.out.println(identifier + ": " + String.valueOf(value.getValue().getValue()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入节点数据
|
||||
*
|
||||
* @param client
|
||||
* @throws Exception
|
||||
*/
|
||||
private static void writeNodeValue(OpcUaClient client) {
|
||||
//节点
|
||||
NodeId nodeId = new NodeId(2, "DDJ1.DDJ1.DDJ1.to_command");
|
||||
Object i = 112233;
|
||||
DataValue nowValue = null;
|
||||
|
||||
if(NumberUtil.isNumber(String.valueOf(i))){
|
||||
Long data = Long.valueOf(i.toString());
|
||||
nowValue = new DataValue(new Variant(Long.valueOf(data)), null, null);
|
||||
}
|
||||
//创建数据对象,此处的数据对象一定要定义类型,不然会出现类型错误,导致无法写入
|
||||
//写入节点数据
|
||||
try{
|
||||
StatusCode statusCode = client.writeValue(nodeId, nowValue).join();
|
||||
System.out.println(statusCode);
|
||||
System.out.println("结果:" + statusCode.isGood());
|
||||
} catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅(单个)
|
||||
*
|
||||
* @param client
|
||||
* @throws Exception
|
||||
*/
|
||||
private static void subscribe(OpcUaClient client) throws Exception {
|
||||
AtomicInteger a = new AtomicInteger();
|
||||
//创建发布间隔1000ms的订阅对象
|
||||
client.getSubscriptionManager().createSubscription(1000.0).thenAccept(t -> {
|
||||
//节点
|
||||
NodeId nodeId = new NodeId(2, "T1.T1.T1");
|
||||
ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
|
||||
//创建监控的参数
|
||||
MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(a.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true);
|
||||
//创建监控项请求
|
||||
//该请求最后用于创建订阅。
|
||||
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
|
||||
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
|
||||
requests.add(request);
|
||||
//创建监控项,并且注册变量值改变时候的回调函数。
|
||||
t.createMonitoredItems(TimestampsToReturn.Both, requests, (item, id) -> item.setValueConsumer((it, val) -> {
|
||||
System.out.println("nodeid :" + it.getReadValueId().getNodeId());
|
||||
System.out.println("value :" + val.getValue().getValue());
|
||||
}));
|
||||
}).get();
|
||||
|
||||
//持续订阅
|
||||
Thread.sleep(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量订阅
|
||||
*
|
||||
* @param client
|
||||
* @throws Exception
|
||||
*/
|
||||
// private static void managedSubscriptionEvent(OpcUaClient client) throws Exception {
|
||||
// final CountDownLatch eventLatch = new CountDownLatch(1);
|
||||
//
|
||||
// //处理订阅业务
|
||||
// handlerNode(client);
|
||||
//
|
||||
// //持续监听
|
||||
// eventLatch.await();
|
||||
// }
|
||||
|
||||
/**
|
||||
* 处理订阅业务
|
||||
*
|
||||
* @param client OPC UA客户端
|
||||
*/
|
||||
private static void handlerNode(OpcUaClient client) {
|
||||
try {
|
||||
//创建订阅
|
||||
ManagedSubscription subscription = ManagedSubscription.create(client);
|
||||
|
||||
//你所需要订阅的key
|
||||
List<String> key = new ArrayList<>();
|
||||
key.add("T1.T1.T1");
|
||||
key.add("T2.T2.A1.T1");
|
||||
|
||||
List<NodeId> nodeIdList = new ArrayList<>();
|
||||
for (String s : key) {
|
||||
nodeIdList.add(new NodeId(2, s));
|
||||
}
|
||||
|
||||
//监听
|
||||
List<ManagedDataItem> dataItemList = subscription.createDataItems(nodeIdList);
|
||||
for (ManagedDataItem managedDataItem : dataItemList) {
|
||||
managedDataItem.addDataValueListener((t) -> {
|
||||
System.out.println(managedDataItem.getNodeId().getIdentifier().toString() + ":" + t.getValue().getValue().toString());
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 自定义订阅监听
|
||||
*/
|
||||
private static class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener {
|
||||
|
||||
private OpcUaClient client;
|
||||
|
||||
CustomSubscriptionListener(OpcUaClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
|
||||
System.out.println("onKeepAlive");
|
||||
}
|
||||
|
||||
public void onStatusChanged(UaSubscription subscription, StatusCode status) {
|
||||
System.out.println("onStatusChanged");
|
||||
}
|
||||
|
||||
public void onPublishFailure(UaException exception) {
|
||||
System.out.println("onPublishFailure");
|
||||
}
|
||||
|
||||
public void onNotificationDataLost(UaSubscription subscription) {
|
||||
System.out.println("onNotificationDataLost");
|
||||
}
|
||||
|
||||
/**
|
||||
* 重连时 尝试恢复之前的订阅失败时 会调用此方法
|
||||
* @param uaSubscription 订阅
|
||||
* @param statusCode 状态
|
||||
*/
|
||||
public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) {
|
||||
System.out.println("恢复订阅失败 需要重新订阅");
|
||||
//在回调方法中重新订阅
|
||||
handlerNode(client);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量订阅
|
||||
*
|
||||
* @param client
|
||||
* @throws Exception
|
||||
*/
|
||||
private static void managedSubscriptionEvent(OpcUaClient client) throws Exception {
|
||||
final CountDownLatch eventLatch = new CountDownLatch(1);
|
||||
|
||||
//添加订阅监听器,用于处理断线重连后的订阅问题
|
||||
client.getSubscriptionManager().addSubscriptionListener(new CustomSubscriptionListener(client));
|
||||
|
||||
//处理订阅业务
|
||||
handlerNode(client);
|
||||
|
||||
//持续监听
|
||||
eventLatch.await();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
OpcUaClient client = createClient();
|
||||
client.connect().get();
|
||||
// browseNode(client, null);
|
||||
// readNode(client);
|
||||
writeNodeValue(client);
|
||||
// subscribe(client);
|
||||
// managedSubscriptionEvent(client);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,11 +1,18 @@
|
||||
package org.nl.acs.utils;
|
||||
|
||||
import cn.hutool.core.util.NumberUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
||||
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
|
||||
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
||||
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
|
||||
import org.jinterop.dcom.common.JIException;
|
||||
import org.jinterop.dcom.core.JIVariant;
|
||||
import org.nl.acs.device.device_driver.standard_inspect.ItemDto;
|
||||
@@ -25,6 +32,9 @@ import org.openscada.opc.lib.list.Category;
|
||||
import org.openscada.opc.lib.list.ServerList;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@@ -64,6 +74,61 @@ public class ReadUtil {
|
||||
return server;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建OPC UA客户端
|
||||
*/
|
||||
private static OpcUaClient createClient() throws Exception {
|
||||
String endPointUrl = "opc.tcp://127.0.0.1:49320";
|
||||
|
||||
//opc ua服务端地址
|
||||
Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
|
||||
Files.createDirectories(securityTempDir);
|
||||
if (!Files.exists(securityTempDir)) {
|
||||
throw new Exception("unable to create security dir: " + securityTempDir);
|
||||
}
|
||||
return OpcUaClient.create(endPointUrl, endpoints -> endpoints.stream().filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri())).findFirst(), configBuilder -> configBuilder.setApplicationName(LocalizedText.english("eclipse milo opc-ua client")).setApplicationUri("urn:eclipse:milo:examples:client")
|
||||
//访问方式
|
||||
.setIdentityProvider(new AnonymousProvider()).setRequestTimeout(UInteger.valueOf(5000)).build());
|
||||
}
|
||||
|
||||
|
||||
public static void write(Map<String, Object> itemString) {
|
||||
HashMap map = new HashMap();
|
||||
OpcUaClient client = null;
|
||||
try {
|
||||
client = createClient();
|
||||
client.connect().get();
|
||||
Iterator it = itemString.keySet().iterator();
|
||||
|
||||
while (it.hasNext()) {
|
||||
String key = (String) it.next();
|
||||
Object o = itemString.get(key);
|
||||
if (o == null || "".equals(o)) {
|
||||
break;
|
||||
}
|
||||
//节点
|
||||
NodeId nodeId = new NodeId(2, key);
|
||||
//创建数据对象,此处的数据对象一定要定义类型,不然会出现类型错误,导致无法写入
|
||||
DataValue nowValue = null;
|
||||
if(NumberUtil.isNumber(String.valueOf(o))){
|
||||
nowValue = new DataValue(new Variant(Integer.valueOf(o.toString())), null, null);
|
||||
} else if (NumberUtil.isNumber(String.valueOf(o))) {
|
||||
nowValue = new DataValue(new Variant(Double.valueOf(o.toString())), null, null);
|
||||
} else {
|
||||
nowValue = new DataValue(new Variant(String.valueOf(o)), null, null);
|
||||
}
|
||||
//写入节点数据
|
||||
StatusCode statusCode = client.writeValue(nodeId, nowValue).join();
|
||||
System.out.println("结果:" + statusCode.isGood());
|
||||
}
|
||||
client.disconnect();
|
||||
}catch (Exception e) {
|
||||
client.disconnect();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void write(Map<String, Object> strings, Server server) {
|
||||
try {
|
||||
//Group group = this.opcServerService.getServer(opcServiceCode);
|
||||
@@ -96,6 +161,32 @@ public class ReadUtil {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static Map<String, Object> read(List<String> itemString) {
|
||||
HashMap map = new HashMap();
|
||||
OpcUaClient client = null;
|
||||
try {
|
||||
client = createClient();
|
||||
client.connect().get();
|
||||
int namespaceIndex = 2;
|
||||
Iterator it = itemString.iterator();
|
||||
while (it.hasNext()) {
|
||||
String identifier = (String)it.next();
|
||||
//节点
|
||||
NodeId nodeId = new NodeId(namespaceIndex, identifier);
|
||||
//读取节点数据
|
||||
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
|
||||
map.put(identifier, String.valueOf(value.getValue().getValue()));
|
||||
}
|
||||
client.disconnect();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static JSONArray getWriteableItemDtos() {
|
||||
List<ItemDto> list = ItemProtocol.getWriteableItemDtos();
|
||||
JSONArray array = JSONArray.parseArray(JSON.toJSONString(list));
|
||||
|
||||
Reference in New Issue
Block a user