@@ -1,38 +1,31 @@
package org.nl.iot.core.driver.protocol.plcs7 ;
import jakarta.annotation.PostConstruct ;
import lombok.* ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.plc4x.java.DefaultPlcDriverManager ;
import org.apache.plc4x.java.api.PlcConnection ;
import org.apache.plc4x.java.api.messages.PlcReadRequest ;
import org.apache.plc4x.java.api.messages.PlcReadResponse ;
import org.apache.plc4x.java.api.messages.PlcWriteRequest ;
import org.apache.plc4x.java.api.messages.PlcWriteResponse ;
import org.apache.plc4x.java.api.types.PlcResponseCode ;
import org.apache.plc4x.java.api.value.PlcValue ;
import org.nl.common.exception.CommonException ;
import org.nl.iot.core.driver.bo.AttributeBO ;
import org.nl.iot.core.driver.bo.DeviceBO ;
import org.nl.iot.core.driver.bo.MetadataEventDTO ;
import org.nl.iot.core.driver.bo.SiteBO ;
import org.nl.iot.core.driver.entity.RValue ;
import org.nl.iot.core.driver.entity.WResponse ;
import org.nl.iot.core.driver.entity.WValue ;
import org.nl.iot.core.driver.enums.AttributeTypeFlagEnum ;
import org.nl.iot.core.driver.enums.MetadataOperateTypeEnum ;
import org.nl.iot.core.driver.protocol.modbustcp.util.Modbus PlcValueConvertUtil ;
import org.nl.iot.core.driver.protoco l.p lcs7.com.github.s7.PlcS7PointVariable ;
import org.nl.iot.core.driver.protocol.plcs7.com.github.s7.api.S7Connector ;
import org.nl.iot.core.driver.protocol.plcs7.com.github.s7.api.S7Serializer ;
import org.nl.iot.core.driver.util.JavaTo PlcValueConvertUtil ;
import org.nl.iot.core.driver.uti l.P lcValueConvertUtil ;
import org.nl.iot.core.driver.protocol.plcs7.util.PlcS7Utils ;
import org.nl.iot.core.driver.service.DriverCustomService ;
import org.nl.iot.modular.iot.entity.IotConfig ;
import org.springframework.stereotype.Service ;
import java.util.* ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.ReentrantReadWriteLock ;
import java.util.concurrent.* ;
/**
*
@@ -73,16 +66,16 @@ public class PlcS7ProtocolDriverImpl implements DriverCustomService {
@Override
public RValue read ( DeviceBO device , SiteBO point ) {
log . debug ( " Plc S7 Read, connect: {}, config: {} " , device , point ) ;
PlcConnection myS7Connector = getS7Connector ( device ) ;
String tagAddress = " % " + point . getRegisterAddress ( ) + " : " + point . getDataType ( ) ;
log . info ( " 构建读取请求 - 标签名: {}, 地址: {} " , point . getAlias ( ) , tagAddress ) ;
PlcReadRequest readRequest = doBuildReadRequest ( myS7Connector , Collections . singletonList ( point ) ) . build ( ) ;
CompletableFuture < ? extends PlcReadResponse > readFuture = readRequest . execute ( ) ;
PlcReadResponse readResponse ;
try {
readResponse = readFuture . get ( 10 , TimeUnit . SECONDS ) ;
log . debug ( " Plc S7 Read, connect: {}, config: {} " , device , point ) ;
PlcConnection myS7Connector = getS7Connector ( device ) ;
String tagAddress = point . getRegisterAddress ( ) + " : " + point . getDataType ( ) ;
log . info ( " 构建读取请求 - 标签名: {}, 地址: {} " , point . getAlias ( ) , tagAddress ) ;
PlcReadRequest readRequest = doBuildReadRequest ( myS7Connector , Collections . singletonList ( point ) ) . build ( ) ;
CompletableFuture < ? extends PlcReadResponse > readFuture = readRequest . execute ( ) ;
PlcReadResponse readResponse = readFuture . get ( 10 , TimeUnit . SECONDS ) ;
if ( readResponse . getResponseCode ( point . getAlias ( ) ) ! = PlcResponseCode . OK ) {
return new RValue ( device , point , null , String . format (
" 读取 S7 失败,设备编码:%s, 地址: %s, 响应码: %s "
@@ -91,40 +84,50 @@ public class PlcS7ProtocolDriverImpl implements DriverCustomService {
, readResponse . getResponseCode ( point . getAlias ( ) )
) ) ;
}
log . info ( " 读取响应 - 可用标签: {} " , readResponse . getTagNames ( ) ) ;
return new RValue ( device , point , PlcValueConvertUtil . convertPlcValueToString ( readResponse . getPlcValue ( point . getAlias ( ) )
, point . getDataType ( ) ) , null ) ;
} catch ( Exception e ) {
log . error ( " 读取S7数据异常, 设备编码: {},地址:{} " , point . getAlias ( ) , point . getRegisterAddress ( ) , e ) ;
return new RValue ( device , point , null , e . getMessage ( ) ) ;
}
log . info ( " 读取响应 - 可用标签: {} " , readResponse . getTagNames ( ) ) ;
return new RValue ( device , point , ModbusPlcValueConvertUtil . convertPlcValueToString ( readResponse . getPlcValue ( point . getAlias ( ) )
, point . getDataType ( ) ) , null ) ;
}
@Override
public List < RValue > batchRead ( DeviceBO device , List < SiteBO > points ) {
return batchReadValue ( getS7Connector ( device ) , device , points ) ;
try {
return batchReadValue ( getS7Connector ( device ) , device , points ) ;
} catch ( Exception e ) {
log . error ( " 批量读取S7数据异常, 设备ID: {} " , device . getId ( ) , e ) ;
List < RValue > errorResults = new ArrayList < > ( ) ;
for ( SiteBO point : points ) {
errorResults . add ( new RValue ( device , point , null , e . getMessage ( ) ) ) ;
}
return errorResults ;
}
}
@Override
public Boolean write ( DeviceBO device , WValue wValue ) {
return null ;
return writeValue ( getS7Connector ( device ) , device , wValue ) ;
}
@Override
public List < WResponse > batchWrite ( DeviceBO device , List < WValue > wValue ) {
return List . of ( ) ;
return batchWriteValue ( getS7Connector ( device ) , wValue ) ;
}
/**
* 获取 PLC S7 连接器
*/
private PlcConnection getS7Connector ( DeviceBO deviceBO ) {
String deviceId = deviceBO . getId ( ) ;
PlcConnection connection = connectMap . get ( deviceId ) ;
// 检查连接是否有效(核心修复:判断连接是否存在且未关闭)
try {
String deviceId = deviceBO . getId ( ) ;
PlcConnection connection = connectMap . get ( deviceId ) ;
// 检查连接是否有效(核心修复:判断连接是否存在且未关闭)
if ( Objects . isNull ( connection ) | | ! connection . isConnected ( ) ) {
// 旧连接失效,先关闭再移除
if ( Objects . nonNull ( connection ) ) {
@@ -141,27 +144,51 @@ public class PlcS7ProtocolDriverImpl implements DriverCustomService {
connection = new DefaultPlcDriverManager ( ) . getConnection ( connectionUrl ) ;
connectMap . put ( deviceId , connection ) ;
}
return connection ;
} catch ( Exception e ) {
log . error ( " 创建S7连接失败, deviceId: {} " , deviceId , e ) ;
log . error ( " 创建S7连接失败, deviceId: {} " , deviceBO . getId ( ) , e ) ;
throw new CommonException ( " PLC S7 连接失败: " + e . getMessage ( ) ) ;
}
return connection ;
}
public static PlcReadRequest . Builder doBuildReadRequest ( PlcConnection s7Connection , List < SiteBO > points ) {
PlcReadRequest . Builder readBuilder = s7Connection . readRequestBuilder ( ) ;
for ( SiteBO point : points ) {
String tagAddress = " % " + point . getRegisterAddress ( ) + " : " + point . getDataType ( ) ;
// 构建读取请求
String tagName = point . getAlias ( ) ;
// 校验地址格式
if ( ! ModbusPlcValueConvertUtil . containerType ( point . getDataType ( ) ) ) {
log . warn ( " S7数据类型错误: 设备编码: {} " , tagName ) ;
continue ;
try {
PlcReadRequest . Builder readBuilder = s7Connection . readRequestBuilder ( ) ;
for ( SiteBO point : points ) {
try {
String tagAddress = " % " + point . getRegisterAddress ( ) + " : " + point . getDataType ( ) ;
// 构建读取请求
String tagName = point . getAlias ( ) ;
// 校验地址格式
if ( ! PlcValueConvertUtil . containerType ( point . getDataType ( ) ) ) {
log . warn ( " S7数据类型错误: 设备编码: {} " , tagName ) ;
continue ;
}
readBuilder . addTagAddress ( tagName , tagAddress ) ;
} catch ( Exception e ) {
log . error ( " 构建读取请求失败,点位:{} " , point . getAlias ( ) , e ) ;
}
}
readBuilder . addTagAddress ( tagName , tagAddress ) ;
return readBuilder ;
} catch ( Exception e ) {
log . error ( " 构建读取请求失败 " , e ) ;
throw new CommonException ( " 构建S7读取请求失败: " + e . getMessage ( ) ) ;
}
return readBuilder ;
}
public static PlcWriteRequest . Builder doBuildWriteRequest ( PlcConnection modbusMaster , List < WValue > wValues ) {
PlcWriteRequest . Builder writeRequestBuilder = modbusMaster . writeRequestBuilder ( ) ;
// 1. 解析配置
for ( WValue wValue : wValues ) {
SiteBO point = wValue . getPoint ( ) ;
String type = point . getDataType ( ) ;
String tagAddress = " % " + point . getRegisterAddress ( ) + " : " + type ;
// 2. 构建读取请求
String tagName = point . getAlias ( ) ;
// 2. 设置要写入的值
writeRequestBuilder . addTagAddress ( tagName , tagAddress , JavaToPlcValueConvertUtil . convert ( wValue . getValue ( ) , type ) ) ;
}
return writeRequestBuilder ;
}
/**
@@ -171,35 +198,91 @@ public class PlcS7ProtocolDriverImpl implements DriverCustomService {
* @param points
* @return
*/
@SneakyThrows
public List < RValue > batchReadValue ( PlcConnection s7Connection , DeviceBO deviceBO , List < SiteBO > points ) {
// 1. 解析配置
PlcReadRequest . Builder readBuilder = doBuildReadRequest ( s7Connection , points ) ;
// 3. 执行请求
PlcReadRequest readRequest = readBuilder . build ( ) ;
CompletableFuture < ? extends PlcReadResponse > readFuture = readRequest . execute ( ) ;
PlcReadResponse readResponse = readFuture . get ( 10 , TimeUnit . SECONDS ) ;
List < RValue > list = new ArrayList < > ( ) ;
// 4.组装数据
for ( SiteBO point : points ) {
try {
PlcResponseCode responseCode = readResponse . getResponseCode ( point . getAlias ( ) ) ;
// 4. 校验响应码
if ( responseCod e ! = PlcResponseCode . OK ) {
try {
// 1. 解析配置
PlcReadRequest . Builder readBuilder = doBuildReadRequest ( s7Connection , points ) ;
// 3. 执行请求
PlcReadRequest readRequest = readBuilder . build ( ) ;
CompletableFuture < ? extends PlcReadResponse > readFutur e = readRequest . execute ( ) ;
PlcReadResponse readResponse = readFuture . get ( 10 , TimeUnit . SECONDS ) ;
// 4.组装数据
for ( SiteBO point : points ) {
try {
PlcResponseCode responseCode = readResponse . getResponseCode ( point . getAlias ( ) ) ;
// 4. 校验响应码
if ( responseCode ! = PlcResponseCode . OK ) {
list . add ( new RValue ( deviceBO , point , null , String . format (
" 读取S7失败, 设备编码: %s, 地址: %s, 响应码: %s " , point . getAlias ( ) , point . getRegisterAddress ( ) , responseCode
) ) ) ;
continue ;
}
// 5. 取值并转换
PlcValue plcValue = readResponse . getPlcValue ( point . getAlias ( ) ) ;
list . add ( new RValue ( deviceBO , point , PlcValueConvertUtil . convertPlcValueToString ( plcValue , point . getDataType ( ) ) , null ) ) ;
} catch ( Exception e ) {
log . error ( " 处理单个点位数据异常,设备编码:{},地址:{} " , point . getAlias ( ) , point . getRegisterAddress ( ) , e ) ;
list . add ( new RValue ( deviceBO , point , null , String . format (
" 读取S7失败, 设备编码: %s, 地址: %s, 响应码 : %s " , point . getAlias ( ) , point . getRegisterAddress ( ) , responseCode
" 读取S7失败, 设备编码: %s, 地址: %s, 异常信息 : %s " , point . getAlias ( ) , point . getRegisterAddress ( ) , e . getMessage ( )
) ) ) ;
continue ;
}
// 5. 取值并转换
PlcValue plcValue = readResponse . getPlcValue ( point . getAlias ( ) ) ;
list . add ( new RValue ( deviceBO , point , ModbusPlcValueConvertUtil . convertPlcValueToString ( plcValue , point . getDataType ( ) ) , " " ) ) ;
} catch ( Exception e ) {
}
} catch ( Exception e ) {
log . error ( " 批量读取S7数据异常 " , e ) ;
// 如果整个批量读取失败,为所有点位返回异常信息
for ( SiteBO point : points ) {
list . add ( new RValue ( deviceBO , point , null , String . format (
" 读取S7失败, 设备编码: %s, 地址: %s, 响应码 : %s " , point . getAlias ( ) , point . getRegisterAddress ( ) , e . getMessage ( )
" 批量 读取S7失败, 设备编码: %s, 地址: %s, 异常信息 : %s" , point . getAlias ( ) , point . getRegisterAddress ( ) , e . getMessage ( )
) ) ) ;
}
}
return list ;
}
private boolean writeValue ( PlcConnection modbusMaster , DeviceBO deviceBO , WValue wValue ) {
// 1. 解析配置
PlcWriteRequest . Builder writeRequestBuilder = doBuildWriteRequest ( modbusMaster , Collections . singletonList ( wValue ) ) ;
PlcWriteRequest writeRequest = writeRequestBuilder . build ( ) ;
// 3. 执行写入(异步+超时控制)
CompletableFuture < ? extends PlcWriteResponse > writeFuture = writeRequest . execute ( ) ;
PlcWriteResponse coilResponse ;
try {
coilResponse = writeFuture . get ( 10 , TimeUnit . SECONDS ) ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
}
PlcResponseCode responseCode = coilResponse . getResponseCode ( wValue . getPoint ( ) . getAlias ( ) ) ;
return responseCode = = PlcResponseCode . OK ;
}
public static List < WResponse > batchWriteValue ( PlcConnection s7Connection , List < WValue > wValues ) {
List < WResponse > res = new ArrayList < > ( ) ;
PlcWriteRequest . Builder writeRequestBuilder = doBuildWriteRequest ( s7Connection , wValues ) ;
PlcWriteRequest writeRequest = writeRequestBuilder . build ( ) ;
// 执行写入(异步+超时控制)
CompletableFuture < ? extends PlcWriteResponse > writeFuture = writeRequest . execute ( ) ;
PlcWriteResponse coilResponse ;
try {
coilResponse = writeFuture . get ( 10 , TimeUnit . SECONDS ) ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
}
for ( WValue wValue : wValues ) {
try {
PlcResponseCode responseCode = coilResponse . getResponseCode ( wValue . getPoint ( ) . getAlias ( ) ) ;
res . add ( new WResponse ( responseCode = = PlcResponseCode . OK ,
wValue , responseCode = = PlcResponseCode . OK ? " " : String . format (
" 写入S7失败, 设备编码: %s, 地址: %s, 响应码: %s " , wValue . getPoint ( ) . getAlias ( ) ,
wValue . getPoint ( ) . getRegisterAddress ( ) , responseCode
) ) ) ;
} catch ( Exception e ) {
res . add ( new WResponse ( false , wValue , String . format ( " 写入S7失败, 设备编码: %s, 地址: %s, 响应码: %s "
, wValue . getPoint ( ) . getAlias ( ) , wValue . getPoint ( ) . getRegisterAddress ( ) , e . getMessage ( )
) ) ) ;
}
}
return res ;
}
}