From 65df3187e51397d4618f95e5fb0405c15952e8f2 Mon Sep 17 00:00:00 2001 From: zhangzq Date: Mon, 22 Jan 2024 17:30:02 +0800 Subject: [PATCH] =?UTF-8?q?add:=E5=88=86=E5=B8=83=E5=BC=8F=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../nl/common/lucene/AsyncLuceneAppender.java | 27 ++--- .../common/lucene/netty/AbstraceServer.java | 45 ++++++++ .../common/lucene/netty/RemoteLogServer.java | 100 ++++++++++++++++++ .../netty/coder/LogConsumerHandler.java | 44 ++++++++ .../netty/coder/LogProviderHandler.java | 33 ++++++ .../lucene/netty/impl/ClientServer.java | 75 +++++++++++++ .../lucene/netty/impl/RemoteServer.java | 83 +++++++++++++++ .../src/main/resources/config/application.yml | 2 +- 8 files changed, 390 insertions(+), 19 deletions(-) create mode 100644 mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/AbstraceServer.java create mode 100644 mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/RemoteLogServer.java create mode 100644 mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogConsumerHandler.java create mode 100644 mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogProviderHandler.java create mode 100644 mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/ClientServer.java create mode 100644 mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/RemoteServer.java diff --git a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/AsyncLuceneAppender.java b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/AsyncLuceneAppender.java index 32a926a5..37454a61 100644 --- a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/AsyncLuceneAppender.java +++ b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/AsyncLuceneAppender.java @@ -7,43 +7,34 @@ package org.nl.common.lucene; */ import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.classic.spi.LoggingEvent; -import ch.qos.logback.core.AppenderBase; -import com.alibaba.ttl.TransmittableThreadLocal; import com.yomahub.tlog.core.enhance.logback.async.AspectLogbackAsyncAppender; import org.apache.commons.lang3.StringUtils; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.document.*; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.nl.common.domain.Property; -import org.nl.common.utils.YmlConfigFileUtil; +import org.nl.common.lucene.netty.RemoteLogServer; import org.slf4j.MDC; -import org.wltea.analyzer.lucene.IKAnalyzer; -import java.io.IOException; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.Properties; public class AsyncLuceneAppender extends AspectLogbackAsyncAppender { + public AsyncLuceneAppender() { + RemoteLogServer.asyncLuceneAppender = this; + } + @Override protected void append(ILoggingEvent event) { String traceId = LuceneAppender.traceIdTL.get(); if (StringUtils.isNotEmpty(traceId)){ - MDC.put("traceId",traceId); + MDC.put("traceId",traceId); Map mdcPropertyMap = event.getMDCPropertyMap(); if (mdcPropertyMap.getClass().getName().contains("SynchronizedMap")){ mdcPropertyMap.put("traceId",traceId); } MDC.clear(); } + RemoteLogServer.writeLog(event); + } + public void appendSync(ILoggingEvent event){ super.append(event); } diff --git a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/AbstraceServer.java b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/AbstraceServer.java new file mode 100644 index 00000000..b3dad34d --- /dev/null +++ b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/AbstraceServer.java @@ -0,0 +1,45 @@ +package org.nl.common.lucene.netty; + +import io.netty.bootstrap.AbstractBootstrap; +import io.netty.channel.Channel; + +import java.net.SocketAddress; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/* + * @author ZZQ + * @Date 2024/1/22 10:01 + */ +public abstract class AbstraceServer { + + + public AbstraceServer(SocketAddress address) { + this.address = address; + if (channel!=null){ + channel.close(); + } + doOpen(); + doConnect(); + } + + public AbstractBootstrap server; + public SocketAddress address; + public Channel channel; + + private final Lock connectLock = new ReentrantLock(); + + public abstract void doOpen(); + + public void doClose(){ + if (channel!=null){ + channel.close(); + } + }; + + public abstract void doConnect(); + + public void doDisConnect() throws Throwable{ + channel.close(); + }; +} diff --git a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/RemoteLogServer.java b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/RemoteLogServer.java new file mode 100644 index 00000000..cc8fb61c --- /dev/null +++ b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/RemoteLogServer.java @@ -0,0 +1,100 @@ +package org.nl.common.lucene.netty; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.nl.common.lucene.AsyncLuceneAppender; +import org.nl.common.lucene.netty.AbstraceServer; +import org.nl.common.lucene.netty.impl.ClientServer; +import org.nl.common.lucene.netty.impl.RemoteServer; +import org.nl.common.utils.MapOf; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.SmartLifecycle; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; + +/* + * @author ZZQ + * @Date 2024/1/22 09:06 + */ +@Component +public class RemoteLogServer implements SmartLifecycle { + + @Autowired + private StringRedisTemplate redisTemplate; + + public static volatile Boolean LOCAL_LOG = Boolean.TRUE; + + private Integer port = 20888; + + public static AbstraceServer server; + + public static AsyncLuceneAppender asyncLuceneAppender; + + public static void writeLog(ILoggingEvent event){ + if (LOCAL_LOG){ + asyncLuceneAppender.appendSync(event); + }else { + ByteBuf log = Unpooled.copiedBuffer(JSON.toJSONString(event), CharsetUtil.UTF_8); + server.channel.writeAndFlush(log); + } + }; + + @SneakyThrows + @Override + public void start() { + try { + String provider = redisTemplate.opsForValue().get("providers"); + if (StringUtils.isEmpty(provider)){ + String ip = Inet4Address.getLocalHost().getHostAddress(); + Map config = MapOf.of("ip", ip, "port", port); + redisTemplate.opsForValue().set("provider", JSON.toJSONString(config)); + Runtime.getRuntime().addShutdownHook(new Thread(() ->{ + System.out.println("------服务关闭-升级从变主-------"); + server.doClose(); + redisTemplate.delete("providers"); + try { + Thread.sleep(5000); + }catch (Exception ex){} + })); + server = new RemoteServer(new InetSocketAddress(ip, port)); + LOCAL_LOG =Boolean.TRUE; + }else { + Map map = JSONObject.parseObject(provider, HashMap.class); + String ip = map.get("ip"); + Integer port = Integer.valueOf(map.get("port")); + server = new ClientServer(new InetSocketAddress(ip, port)); + LOCAL_LOG = Boolean.FALSE; + } + }catch (Exception ex){ + ex.printStackTrace(); + throw ex; + } + + + } + + @Override + public void stop() { + + } + + @Override + public boolean isRunning() { + return false; + } +} diff --git a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogConsumerHandler.java b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogConsumerHandler.java new file mode 100644 index 00000000..ee5fbc25 --- /dev/null +++ b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogConsumerHandler.java @@ -0,0 +1,44 @@ +package org.nl.common.lucene.netty.coder; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; +import org.nl.common.lucene.netty.RemoteLogServer; + +/* + * @author ZZQ + * @Date 2024/1/22 10:24 + */ +public class LogConsumerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + System.out.println("断开连接---"); + RemoteLogServer.LOCAL_LOG = Boolean.TRUE; + RemoteLogServer.server.doClose(); + //重新建立 + super.channelInactive(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + System.out.println("连接"); + super.channelActive(ctx); + } + + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println("接收到消息"); + super.channelRead(ctx, msg); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent){ + IdleStateEvent stateEvent = (IdleStateEvent) evt; + System.out.println(stateEvent.state()); + } + super.userEventTriggered(ctx, evt); + } +} diff --git a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogProviderHandler.java b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogProviderHandler.java new file mode 100644 index 00000000..6a05facc --- /dev/null +++ b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/coder/LogProviderHandler.java @@ -0,0 +1,33 @@ +package org.nl.common.lucene.netty.coder; + +import ch.qos.logback.classic.spi.LoggingEvent; +import com.alibaba.fastjson.JSONObject; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.nl.common.lucene.netty.RemoteLogServer; + +/* + * @author ZZQ + * @Date 2024/1/22 10:24 + */ +public class LogProviderHandler extends SimpleChannelInboundHandler { + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + System.out.println("断开连接---"); + super.channelInactive(ctx); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String c){ + LoggingEvent event = JSONObject.parseObject(c, LoggingEvent.class); + RemoteLogServer.asyncLuceneAppender.appendSync(event); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + System.out.println("创建了连接----"); + super.channelActive(ctx); + } + +} diff --git a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/ClientServer.java b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/ClientServer.java new file mode 100644 index 00000000..394a6831 --- /dev/null +++ b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/ClientServer.java @@ -0,0 +1,75 @@ +package org.nl.common.lucene.netty.impl; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import org.nl.common.lucene.netty.AbstraceServer; +import org.nl.common.lucene.netty.coder.LogConsumerHandler; + +import java.net.SocketAddress; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/* + * @author ZZQ + * @Date 2024/1/22 10:01 + */ +public class ClientServer extends AbstraceServer { + + private static EventLoopGroup group = new NioEventLoopGroup(); + + public ClientServer(SocketAddress address) { + super(address); + } + + // 非阻塞IO线程组 + @Override + public void doOpen() { + server = new Bootstrap(); + server.group(group) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .channel(NioSocketChannel.class); + + server.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast("client-idle-handler", new IdleStateHandler(500, 0,0 , MILLISECONDS)) + .addLast(new LengthFieldPrepender(2)) + .addLast( new LogConsumerHandler()); + } + }); + } + + @Override + public void doClose() { + super.doClose(); + Future bossGroupShutdownFuture = group.shutdownGracefully(); + bossGroupShutdownFuture.syncUninterruptibly(); + } + + @Override + public void doConnect() { + try { + ChannelFuture connect = ((Bootstrap) server).connect(address); + connect.syncUninterruptibly(); + channel = connect.channel(); + } catch (Throwable t) { + this.doClose(); + throw t; + } + } + + +} diff --git a/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/RemoteServer.java b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/RemoteServer.java new file mode 100644 index 00000000..1b017bef --- /dev/null +++ b/mes/hd/nladmin-system/src/main/java/org/nl/common/lucene/netty/impl/RemoteServer.java @@ -0,0 +1,83 @@ +package org.nl.common.lucene.netty.impl; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import org.nl.common.lucene.netty.AbstraceServer; +import org.nl.common.lucene.netty.coder.LogProviderHandler; + +import java.net.SocketAddress; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/* + * @author ZZQ + * @Date 2024/1/22 10:01 + */ +public class RemoteServer extends AbstraceServer { + + private static EventLoopGroup boss = new NioEventLoopGroup(); + private static EventLoopGroup worker = new NioEventLoopGroup(); + + public RemoteServer(SocketAddress address) { + super(address); + } + + // 非阻塞IO线程组 + @Override + public void doOpen() { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap + .group(boss, worker) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) + .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) + .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast("client-idle-handler", new IdleStateHandler(500, 0, 0, MILLISECONDS)) + .addLast(new LengthFieldBasedFrameDecoder(8089, 0, 2, 0, 2)) + .addLast(new LengthFieldPrepender(2)) + .addLast("encode",new StringDecoder()) + .addLast(new LogProviderHandler()); + } + }); + server = bootstrap; + } + + @Override + public void doClose(){ + Future bossGroupShutdownFuture = boss.shutdownGracefully(); + Future workerGroupShutdownFuture = worker.shutdownGracefully(); + bossGroupShutdownFuture.syncUninterruptibly(); + workerGroupShutdownFuture.syncUninterruptibly(); + } + + @Override + public void doConnect() { + ChannelFuture future = server.bind(address); + boolean ret = future.awaitUninterruptibly(3000, MILLISECONDS); + if (ret && future.isSuccess()) { + Channel newChannel = future.channel(); + if (channel != null) { + channel.close(); + channel = newChannel; + } + } else if (future.cause() != null) { + Throwable cause = future.cause(); + cause.printStackTrace(); + } + } + +} diff --git a/mes/hd/nladmin-system/src/main/resources/config/application.yml b/mes/hd/nladmin-system/src/main/resources/config/application.yml index 36711994..2505ba08 100644 --- a/mes/hd/nladmin-system/src/main/resources/config/application.yml +++ b/mes/hd/nladmin-system/src/main/resources/config/application.yml @@ -2,7 +2,7 @@ spring: freemarker: check-template-location: false profiles: - active: prod + active: dev3 jackson: time-zone: GMT+8 data: