diff --git a/wms_pro/hd/README.md b/wms_pro/hd/README.md index fad1da5f..f61c439c 100644 --- a/wms_pro/hd/README.md +++ b/wms_pro/hd/README.md @@ -1,3 +1,7 @@ # wms - WMS +### 流程说明 +1.库存变动目前只变动记录不变动库存 +2.sub子流程节点现在拆分是根据载具拆分数据 +3.需要添加一个空的等待节点,现在使用executeflow节点暂替 +4.需要考虑流程中事务问题,手工触发的功能由于在流程中无法反馈给用户到底流程执行的怎么样 diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartClientServer.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartClientServer.java index b4b66e8c..5566942f 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartClientServer.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartClientServer.java @@ -1,19 +1,25 @@ package org.nl.common.websocket.heartSocket.clientSocket; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; +import lombok.SneakyThrows; +import org.nl.common.websocket.heartSocket.serverSocket.HeartServerHandler; import org.nl.config.lucene.remote.AbstraceServer; +import org.nl.config.lucene.remote.coder.LogConsumerHandler; +import java.net.ConnectException; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; @@ -25,7 +31,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; */ public class HeartClientServer extends AbstraceServer { - private static EventLoopGroup group = new NioEventLoopGroup(); + private static EventLoopGroup group = new NioEventLoopGroup(4); public HeartClientServer(SocketAddress address) { super(address); @@ -33,40 +39,66 @@ public class HeartClientServer extends AbstraceServer { @Override public void doOpen() { - server = new Bootstrap(); - server.group(group) -// .option(ChannelOption.SO_KEEPALIVE, true) + HeartClientServer body = this; + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) .channel(NioSocketChannel.class); - - server.handler(new ChannelInitializer() { + bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() - .addLast("client-idle-handler", new IdleStateHandler(0, 5,0 , TimeUnit.SECONDS)) + .addLast("client-idle-handler", new IdleStateHandler(5, 5,5 , TimeUnit.SECONDS)) .addLast( new StringEncoder()) .addLast( new StringDecoder()) - .addLast( new HeartConsumerHandler()); - + .addLast(new HeartConsumerHandler(body)); } }); + server = bootstrap; + Runtime.getRuntime().addShutdownHook(new Thread(() -> this.doDestroy())); + } @Override - public void doClose() { - super.doClose(); + @SneakyThrows + public void doDestroy() { + System.out.println("------客户端停机------"); + if (channel!=null){ + channel.close(); + channel.flush(); + } + channel = null; + Thread.interrupted(); Future bossGroupShutdownFuture = group.shutdownGracefully(); bossGroupShutdownFuture.syncUninterruptibly(); } @Override - public void doConnect() { + public void doConnect() { + System.out.println("-------尝试连接服务器-------"); try { ChannelFuture connect = ((Bootstrap) server).connect(address); - connect.syncUninterruptibly(); - channel = connect.channel(); - } catch (Throwable t) { - this.doClose(); - throw t; + connect.addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()){ + channel = connect.channel(); + pingpong(); + }else { + channelFuture.channel().eventLoop().schedule(() -> doConnect(),6,TimeUnit.SECONDS); + } + }); + } catch (Exception t){ + t.getStackTrace(); + this.doDestroy(); + } + } + public void pingpong() throws InterruptedException { + while (channel!=null && channel.isActive()){ + Thread.sleep(2000); + System.out.println("-----chilend-------"); + ByteBuf log = Unpooled.copiedBuffer("ping--222-", CharsetUtil.UTF_8); + if (channel==null){ + return; + } + channel.writeAndFlush(log); } } } diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartConsumerHandler.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartConsumerHandler.java index e78a1d1f..ce0c72bb 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartConsumerHandler.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/HeartConsumerHandler.java @@ -9,6 +9,7 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; +import org.nl.config.lucene.remote.AbstraceServer; import org.nl.config.lucene.remote.RemoteLogServer; /* @@ -16,11 +17,18 @@ import org.nl.config.lucene.remote.RemoteLogServer; * @Date 2024/1/22 10:24 */ public class HeartConsumerHandler extends SimpleChannelInboundHandler { + private AbstraceServer server; + + public HeartConsumerHandler(HeartClientServer server) { + this.server = server; + } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("断开连接---"); //重新建立 + ctx.close(); + server.doDisConnect(); super.channelInactive(ctx); } @@ -30,21 +38,23 @@ public class HeartConsumerHandler extends SimpleChannelInboundHandler { super.channelActive(ctx); } - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("接收到消息"+msg.toString()); - super.channelRead(ctx, msg); + public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println("客户接收到消息"+msg.toString()); + ByteBuf log = Unpooled.copiedBuffer("ping--21312-", CharsetUtil.UTF_8); + server.channel.writeAndFlush(log); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent stateEvent = (IdleStateEvent) evt; + System.out.println(((IdleStateEvent) evt).state().toString()); if (stateEvent.state() == IdleState.WRITER_IDLE) { //如果五秒内写失败,说明服务网络异常 + ctx.close(); + server.doDisConnect(); } } - super.userEventTriggered(ctx, evt); } } diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/RunClientHeartMain.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/RunClientHeartMain.java index 1be64e36..88276513 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/RunClientHeartMain.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/clientSocket/RunClientHeartMain.java @@ -15,12 +15,6 @@ import java.nio.charset.StandardCharsets; */ public class RunClientHeartMain { public static void main(String[] args) throws InterruptedException { - HeartClientServer heartServer = new HeartClientServer(new InetSocketAddress("127.0.0.1", 20889)); - while (true){ - Thread.sleep(2000); - System.out.println("-----chilend-------"); - ByteBuf log = Unpooled.copiedBuffer("ping--333-", CharsetUtil.UTF_8); - heartServer.channel.writeAndFlush(log); - } + HeartClientServer heartServer = new HeartClientServer(new InetSocketAddress("192.168.10.57", 20889)); } } diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServer.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServer.java index b1c70efd..f59010df 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServer.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServer.java @@ -31,9 +31,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; */ public class HeartServer extends AbstraceServer { - public static Map Heart_Connection = new HashMap(); - - private static EventLoopGroup boss = new NioEventLoopGroup(); private static EventLoopGroup worker = new NioEventLoopGroup(); @@ -45,6 +42,7 @@ public class HeartServer extends AbstraceServer { @Override public void doOpen() { ServerBootstrap bootstrap = new ServerBootstrap(); + AbstraceServer body = this; bootstrap .group(boss, worker) .channel(NioServerSocketChannel.class) @@ -53,17 +51,23 @@ public class HeartServer extends AbstraceServer { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() - .addLast("client-idle-handler", new IdleStateHandler(4, 0, 0, TimeUnit.SECONDS)) + .addLast("client-idle-handler", new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)) .addLast( new StringEncoder()) .addLast( new StringDecoder()) - .addLast(new HeartServerHandler()); + .addLast(new HeartServerHandler(body)); } }); server = bootstrap; + Runtime.getRuntime().addShutdownHook(new Thread(() -> this.doDestroy())); + } @Override - public void doClose(){ + public void doDestroy(){ + System.out.println("--------服务停机--------"); + if (channel != null) { + channel.close(); + } Future bossGroupShutdownFuture = boss.shutdownGracefully(); Future workerGroupShutdownFuture = worker.shutdownGracefully(); bossGroupShutdownFuture.syncUninterruptibly(); diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServerHandler.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServerHandler.java index ed4c9c00..54f0e5b6 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServerHandler.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/HeartServerHandler.java @@ -7,9 +7,11 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; +import org.nl.config.lucene.remote.AbstraceServer; import org.nl.config.lucene.remote.RemoteLogServer; /* @@ -17,26 +19,28 @@ import org.nl.config.lucene.remote.RemoteLogServer; * @Date 2024/1/22 10:24 */ public class HeartServerHandler extends SimpleChannelInboundHandler { + private AbstraceServer server; + + public HeartServerHandler(AbstraceServer server) { + this.server = server; + } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("服务端断开连接-----"); - HeartServer.Heart_Connection.remove(ctx.channel().id().asLongText()); - //重新建立 super.channelInactive(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("服务端收到连接-----连接"); - HeartServer.Heart_Connection.put(ctx.channel().id().asLongText(),ctx.channel()); super.channelActive(ctx); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("接收到消息"+msg.toString()); + System.out.println("接收到消息2"+msg.toString()); } @Override @@ -45,9 +49,8 @@ public class HeartServerHandler extends SimpleChannelInboundHandler { IdleStateEvent stateEvent = (IdleStateEvent) evt; System.out.println(stateEvent.state()); if (stateEvent.state() == IdleState.READER_IDLE) { - //向服务端发送消息 -// ctx.writeAndFlush("ping") -// .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + System.out.println("--------手动关闭连接--------"); + ctx.close(); } } super.userEventTriggered(ctx, evt); diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/RunHeartMain.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/RunHeartMain.java index a08d49a7..bed1da98 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/RunHeartMain.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/common/websocket/heartSocket/serverSocket/RunHeartMain.java @@ -15,7 +15,7 @@ import java.nio.charset.StandardCharsets; */ public class RunHeartMain { public static void main(String[] args) throws InterruptedException { - HeartServer heartServer = new HeartServer(new InetSocketAddress("127.0.0.1", 20889)); + HeartServer heartServer = new HeartServer(new InetSocketAddress("192.168.10.57", 20889)); // while (true){ // Thread.sleep(5000); // for (Channel value : HeartServer.Heart_Connection.values()) { diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/DruidFilter.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/DruidFilter.java index e286abf2..1a77f8b9 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/DruidFilter.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/DruidFilter.java @@ -81,7 +81,7 @@ public class DruidFilter extends FilterEventAdapter { }catch (Exception ex){ log.warn("[-SQL解析异常-][{}]",ex.getMessage()); } -// log.info("[----SQL----][select][执行结果:{}][ SQL: {} ]",result, executeSql); + log.info("[----SQL----][select][执行结果:{}][ SQL: {} ]",result, executeSql); } return rs; } diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/AbstraceServer.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/AbstraceServer.java index 1b990235..ca9a8c06 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/AbstraceServer.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/AbstraceServer.java @@ -17,7 +17,7 @@ public abstract class AbstraceServer { public AbstraceServer(SocketAddress address) { this.address = address; if (channel!=null){ - channel.close(); + doDestroy(); } doOpen(); doConnect(); @@ -27,19 +27,20 @@ public abstract class AbstraceServer { public SocketAddress address; public Channel channel; - private final Lock connectLock = new ReentrantLock(); - public abstract void doOpen(); - public void doClose(){ + public void doDestroy(){ if (channel!=null){ channel.close(); } }; - public abstract void doConnect(); + public abstract void doConnect() ; - public void doDisConnect() throws Throwable{ - channel.close(); + public void doDisConnect(){ + if (channel!=null){ + channel.close(); + doConnect(); + } }; } diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/RemoteLogServer.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/RemoteLogServer.java index c6780dc9..7e0662d6 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/RemoteLogServer.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/RemoteLogServer.java @@ -15,7 +15,6 @@ import org.nl.common.utils.MapOf; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.SmartLifecycle; import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; import java.net.Inet4Address; import java.net.InetSocketAddress; @@ -60,7 +59,7 @@ public class RemoteLogServer implements SmartLifecycle { redisTemplate.opsForValue().set("provider", JSON.toJSONString(config)); Runtime.getRuntime().addShutdownHook(new Thread(() ->{ System.out.println("------服务关闭-升级从变主-------"); - server.doClose(); + server.doDestroy(); redisTemplate.delete("providers"); try { Thread.sleep(5000); diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/coder/LogConsumerHandler.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/coder/LogConsumerHandler.java index 014e36cf..274ce5de 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/coder/LogConsumerHandler.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/coder/LogConsumerHandler.java @@ -15,7 +15,7 @@ public class LogConsumerHandler extends ChannelInboundHandlerAdapter { public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("断开连接---"); RemoteLogServer.LOCAL_LOG = Boolean.TRUE; - RemoteLogServer.server.doClose(); + RemoteLogServer.server.doDestroy(); //重新建立 super.channelInactive(ctx); } diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/ClientServer.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/ClientServer.java index acfc32c9..a86e04db 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/ClientServer.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/ClientServer.java @@ -53,8 +53,8 @@ public class ClientServer extends AbstraceServer { } @Override - public void doClose() { - super.doClose(); + public void doDestroy() { + super.doDestroy(); Future bossGroupShutdownFuture = group.shutdownGracefully(); bossGroupShutdownFuture.syncUninterruptibly(); } @@ -66,7 +66,7 @@ public class ClientServer extends AbstraceServer { connect.syncUninterruptibly(); channel = connect.channel(); } catch (Throwable t) { - this.doClose(); + this.doDestroy(); throw t; } } diff --git a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/RemoteServer.java b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/RemoteServer.java index 1e8629ae..d69b5321 100644 --- a/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/RemoteServer.java +++ b/wms_pro/hd/nladmin-system/src/main/java/org/nl/config/lucene/remote/impl/RemoteServer.java @@ -57,7 +57,7 @@ public class RemoteServer extends AbstraceServer { } @Override - public void doClose(){ + public void doDestroy(){ Future bossGroupShutdownFuture = boss.shutdownGracefully(); Future workerGroupShutdownFuture = worker.shutdownGracefully(); bossGroupShutdownFuture.syncUninterruptibly();