add:心跳

This commit is contained in:
zhangzq
2024-07-09 18:56:46 +08:00
parent 40c7825a96
commit e0c508de34
13 changed files with 110 additions and 63 deletions

View File

@@ -1,3 +1,7 @@
# wms # wms
WMS WMS
### 流程说明
1.库存变动目前只变动记录不变动库存
2.sub子流程节点现在拆分是根据载具拆分数据
3.需要添加一个空的等待节点现在使用executeflow节点暂替
4.需要考虑流程中事务问题,手工触发的功能由于在流程中无法反馈给用户到底流程执行的怎么样

View File

@@ -1,19 +1,25 @@
package org.nl.common.websocket.heartSocket.clientSocket; package org.nl.common.websocket.heartSocket.clientSocket;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelInitializer; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import lombok.SneakyThrows;
import org.nl.common.websocket.heartSocket.serverSocket.HeartServerHandler;
import org.nl.config.lucene.remote.AbstraceServer; import org.nl.config.lucene.remote.AbstraceServer;
import org.nl.config.lucene.remote.coder.LogConsumerHandler;
import java.net.ConnectException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -25,7 +31,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
*/ */
public class HeartClientServer extends AbstraceServer { public class HeartClientServer extends AbstraceServer {
private static EventLoopGroup group = new NioEventLoopGroup(); private static EventLoopGroup group = new NioEventLoopGroup(4);
public HeartClientServer(SocketAddress address) { public HeartClientServer(SocketAddress address) {
super(address); super(address);
@@ -33,40 +39,66 @@ public class HeartClientServer extends AbstraceServer {
@Override @Override
public void doOpen() { public void doOpen() {
server = new Bootstrap(); HeartClientServer body = this;
server.group(group) Bootstrap bootstrap = new Bootstrap();
// .option(ChannelOption.SO_KEEPALIVE, true) bootstrap.group(group)
.channel(NioSocketChannel.class); .channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
server.handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline() ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(0, 5,0 , TimeUnit.SECONDS)) .addLast("client-idle-handler", new IdleStateHandler(5, 5,5 , TimeUnit.SECONDS))
.addLast( new StringEncoder()) .addLast( new StringEncoder())
.addLast( new StringDecoder()) .addLast( new StringDecoder())
.addLast( new HeartConsumerHandler()); .addLast(new HeartConsumerHandler(body));
} }
}); });
server = bootstrap;
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.doDestroy()));
} }
@Override @Override
public void doClose() { @SneakyThrows
super.doClose(); public void doDestroy() {
System.out.println("------客户端停机------");
if (channel!=null){
channel.close();
channel.flush();
}
channel = null;
Thread.interrupted();
Future<?> bossGroupShutdownFuture = group.shutdownGracefully(); Future<?> bossGroupShutdownFuture = group.shutdownGracefully();
bossGroupShutdownFuture.syncUninterruptibly(); bossGroupShutdownFuture.syncUninterruptibly();
} }
@Override @Override
public void doConnect() { public void doConnect() {
System.out.println("-------尝试连接服务器-------");
try { try {
ChannelFuture connect = ((Bootstrap) server).connect(address); ChannelFuture connect = ((Bootstrap) server).connect(address);
connect.syncUninterruptibly(); connect.addListener((ChannelFutureListener) channelFuture -> {
channel = connect.channel(); if (channelFuture.isSuccess()){
} catch (Throwable t) { channel = connect.channel();
this.doClose(); pingpong();
throw t; }else {
channelFuture.channel().eventLoop().schedule(() -> doConnect(),6,TimeUnit.SECONDS);
}
});
} catch (Exception t){
t.getStackTrace();
this.doDestroy();
}
}
public void pingpong() throws InterruptedException {
while (channel!=null && channel.isActive()){
Thread.sleep(2000);
System.out.println("-----chilend-------");
ByteBuf log = Unpooled.copiedBuffer("ping--222-", CharsetUtil.UTF_8);
if (channel==null){
return;
}
channel.writeAndFlush(log);
} }
} }
} }

View File

@@ -9,6 +9,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import org.nl.config.lucene.remote.AbstraceServer;
import org.nl.config.lucene.remote.RemoteLogServer; import org.nl.config.lucene.remote.RemoteLogServer;
/* /*
@@ -16,11 +17,18 @@ import org.nl.config.lucene.remote.RemoteLogServer;
* @Date 2024/1/22 10:24 * @Date 2024/1/22 10:24
*/ */
public class HeartConsumerHandler extends SimpleChannelInboundHandler { public class HeartConsumerHandler extends SimpleChannelInboundHandler {
private AbstraceServer server;
public HeartConsumerHandler(HeartClientServer server) {
this.server = server;
}
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开连接---"); System.out.println("断开连接---");
//重新建立 //重新建立
ctx.close();
server.doDisConnect();
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@@ -30,21 +38,23 @@ public class HeartConsumerHandler extends SimpleChannelInboundHandler {
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收到消息"+msg.toString()); System.out.println("客户接收到消息"+msg.toString());
super.channelRead(ctx, msg); ByteBuf log = Unpooled.copiedBuffer("ping--21312-", CharsetUtil.UTF_8);
server.channel.writeAndFlush(log);
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){ if (evt instanceof IdleStateEvent){
IdleStateEvent stateEvent = (IdleStateEvent) evt; IdleStateEvent stateEvent = (IdleStateEvent) evt;
System.out.println(((IdleStateEvent) evt).state().toString());
if (stateEvent.state() == IdleState.WRITER_IDLE) { if (stateEvent.state() == IdleState.WRITER_IDLE) {
//如果五秒内写失败,说明服务网络异常 //如果五秒内写失败,说明服务网络异常
ctx.close();
server.doDisConnect();
} }
} }
super.userEventTriggered(ctx, evt);
} }
} }

View File

@@ -15,12 +15,6 @@ import java.nio.charset.StandardCharsets;
*/ */
public class RunClientHeartMain { public class RunClientHeartMain {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
HeartClientServer heartServer = new HeartClientServer(new InetSocketAddress("127.0.0.1", 20889)); HeartClientServer heartServer = new HeartClientServer(new InetSocketAddress("192.168.10.57", 20889));
while (true){
Thread.sleep(2000);
System.out.println("-----chilend-------");
ByteBuf log = Unpooled.copiedBuffer("ping--333-", CharsetUtil.UTF_8);
heartServer.channel.writeAndFlush(log);
}
} }
} }

View File

@@ -31,9 +31,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
*/ */
public class HeartServer extends AbstraceServer { public class HeartServer extends AbstraceServer {
public static Map<String, Channel> Heart_Connection = new HashMap();
private static EventLoopGroup boss = new NioEventLoopGroup(); private static EventLoopGroup boss = new NioEventLoopGroup();
private static EventLoopGroup worker = new NioEventLoopGroup(); private static EventLoopGroup worker = new NioEventLoopGroup();
@@ -45,6 +42,7 @@ public class HeartServer extends AbstraceServer {
@Override @Override
public void doOpen() { public void doOpen() {
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
AbstraceServer body = this;
bootstrap bootstrap
.group(boss, worker) .group(boss, worker)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
@@ -53,17 +51,23 @@ public class HeartServer extends AbstraceServer {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline() ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(4, 0, 0, TimeUnit.SECONDS)) .addLast("client-idle-handler", new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
.addLast( new StringEncoder()) .addLast( new StringEncoder())
.addLast( new StringDecoder()) .addLast( new StringDecoder())
.addLast(new HeartServerHandler()); .addLast(new HeartServerHandler(body));
} }
}); });
server = bootstrap; server = bootstrap;
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.doDestroy()));
} }
@Override @Override
public void doClose(){ public void doDestroy(){
System.out.println("--------服务停机--------");
if (channel != null) {
channel.close();
}
Future<?> bossGroupShutdownFuture = boss.shutdownGracefully(); Future<?> bossGroupShutdownFuture = boss.shutdownGracefully();
Future<?> workerGroupShutdownFuture = worker.shutdownGracefully(); Future<?> workerGroupShutdownFuture = worker.shutdownGracefully();
bossGroupShutdownFuture.syncUninterruptibly(); bossGroupShutdownFuture.syncUninterruptibly();

View File

@@ -7,9 +7,11 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import org.nl.config.lucene.remote.AbstraceServer;
import org.nl.config.lucene.remote.RemoteLogServer; import org.nl.config.lucene.remote.RemoteLogServer;
/* /*
@@ -17,26 +19,28 @@ import org.nl.config.lucene.remote.RemoteLogServer;
* @Date 2024/1/22 10:24 * @Date 2024/1/22 10:24
*/ */
public class HeartServerHandler extends SimpleChannelInboundHandler { public class HeartServerHandler extends SimpleChannelInboundHandler {
private AbstraceServer server;
public HeartServerHandler(AbstraceServer server) {
this.server = server;
}
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端断开连接-----"); System.out.println("服务端断开连接-----");
HeartServer.Heart_Connection.remove(ctx.channel().id().asLongText());
//重新建立
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端收到连接-----连接"); System.out.println("服务端收到连接-----连接");
HeartServer.Heart_Connection.put(ctx.channel().id().asLongText(),ctx.channel());
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收到消息"+msg.toString()); System.out.println("接收到消息2"+msg.toString());
} }
@Override @Override
@@ -45,9 +49,8 @@ public class HeartServerHandler extends SimpleChannelInboundHandler {
IdleStateEvent stateEvent = (IdleStateEvent) evt; IdleStateEvent stateEvent = (IdleStateEvent) evt;
System.out.println(stateEvent.state()); System.out.println(stateEvent.state());
if (stateEvent.state() == IdleState.READER_IDLE) { if (stateEvent.state() == IdleState.READER_IDLE) {
//向服务端发送消息 System.out.println("--------手动关闭连接--------");
// ctx.writeAndFlush("ping") ctx.close();
// .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} }
} }
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);

View File

@@ -15,7 +15,7 @@ import java.nio.charset.StandardCharsets;
*/ */
public class RunHeartMain { public class RunHeartMain {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
HeartServer heartServer = new HeartServer(new InetSocketAddress("127.0.0.1", 20889)); HeartServer heartServer = new HeartServer(new InetSocketAddress("192.168.10.57", 20889));
// while (true){ // while (true){
// Thread.sleep(5000); // Thread.sleep(5000);
// for (Channel value : HeartServer.Heart_Connection.values()) { // for (Channel value : HeartServer.Heart_Connection.values()) {

View File

@@ -81,7 +81,7 @@ public class DruidFilter extends FilterEventAdapter {
}catch (Exception ex){ }catch (Exception ex){
log.warn("[-SQL解析异常-][{}]",ex.getMessage()); log.warn("[-SQL解析异常-][{}]",ex.getMessage());
} }
// log.info("[----SQL----][select][执行结果:{}][ SQL: {} ]",result, executeSql); log.info("[----SQL----][select][执行结果:{}][ SQL: {} ]",result, executeSql);
} }
return rs; return rs;
} }

View File

@@ -17,7 +17,7 @@ public abstract class AbstraceServer {
public AbstraceServer(SocketAddress address) { public AbstraceServer(SocketAddress address) {
this.address = address; this.address = address;
if (channel!=null){ if (channel!=null){
channel.close(); doDestroy();
} }
doOpen(); doOpen();
doConnect(); doConnect();
@@ -27,19 +27,20 @@ public abstract class AbstraceServer {
public SocketAddress address; public SocketAddress address;
public Channel channel; public Channel channel;
private final Lock connectLock = new ReentrantLock();
public abstract void doOpen(); public abstract void doOpen();
public void doClose(){ public void doDestroy(){
if (channel!=null){ if (channel!=null){
channel.close(); channel.close();
} }
}; };
public abstract void doConnect(); public abstract void doConnect() ;
public void doDisConnect() throws Throwable{ public void doDisConnect(){
channel.close(); if (channel!=null){
channel.close();
doConnect();
}
}; };
} }

View File

@@ -15,7 +15,6 @@ import org.nl.common.utils.MapOf;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@@ -60,7 +59,7 @@ public class RemoteLogServer implements SmartLifecycle {
redisTemplate.opsForValue().set("provider", JSON.toJSONString(config)); redisTemplate.opsForValue().set("provider", JSON.toJSONString(config));
Runtime.getRuntime().addShutdownHook(new Thread(() ->{ Runtime.getRuntime().addShutdownHook(new Thread(() ->{
System.out.println("------服务关闭-升级从变主-------"); System.out.println("------服务关闭-升级从变主-------");
server.doClose(); server.doDestroy();
redisTemplate.delete("providers"); redisTemplate.delete("providers");
try { try {
Thread.sleep(5000); Thread.sleep(5000);

View File

@@ -15,7 +15,7 @@ public class LogConsumerHandler extends ChannelInboundHandlerAdapter {
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开连接---"); System.out.println("断开连接---");
RemoteLogServer.LOCAL_LOG = Boolean.TRUE; RemoteLogServer.LOCAL_LOG = Boolean.TRUE;
RemoteLogServer.server.doClose(); RemoteLogServer.server.doDestroy();
//重新建立 //重新建立
super.channelInactive(ctx); super.channelInactive(ctx);
} }

View File

@@ -53,8 +53,8 @@ public class ClientServer extends AbstraceServer {
} }
@Override @Override
public void doClose() { public void doDestroy() {
super.doClose(); super.doDestroy();
Future<?> bossGroupShutdownFuture = group.shutdownGracefully(); Future<?> bossGroupShutdownFuture = group.shutdownGracefully();
bossGroupShutdownFuture.syncUninterruptibly(); bossGroupShutdownFuture.syncUninterruptibly();
} }
@@ -66,7 +66,7 @@ public class ClientServer extends AbstraceServer {
connect.syncUninterruptibly(); connect.syncUninterruptibly();
channel = connect.channel(); channel = connect.channel();
} catch (Throwable t) { } catch (Throwable t) {
this.doClose(); this.doDestroy();
throw t; throw t;
} }
} }

View File

@@ -57,7 +57,7 @@ public class RemoteServer extends AbstraceServer {
} }
@Override @Override
public void doClose(){ public void doDestroy(){
Future<?> bossGroupShutdownFuture = boss.shutdownGracefully(); Future<?> bossGroupShutdownFuture = boss.shutdownGracefully();
Future<?> workerGroupShutdownFuture = worker.shutdownGracefully(); Future<?> workerGroupShutdownFuture = worker.shutdownGracefully();
bossGroupShutdownFuture.syncUninterruptibly(); bossGroupShutdownFuture.syncUninterruptibly();