add:分布式日志
This commit is contained in:
@@ -7,43 +7,34 @@ package org.nl.common.lucene;
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||||
import ch.qos.logback.classic.spi.LoggingEvent;
|
|
||||||
import ch.qos.logback.core.AppenderBase;
|
|
||||||
import com.alibaba.ttl.TransmittableThreadLocal;
|
|
||||||
import com.yomahub.tlog.core.enhance.logback.async.AspectLogbackAsyncAppender;
|
import com.yomahub.tlog.core.enhance.logback.async.AspectLogbackAsyncAppender;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.lucene.analysis.Analyzer;
|
import org.nl.common.lucene.netty.RemoteLogServer;
|
||||||
import org.apache.lucene.document.*;
|
|
||||||
import org.apache.lucene.index.IndexWriter;
|
|
||||||
import org.apache.lucene.index.IndexWriterConfig;
|
|
||||||
import org.apache.lucene.store.Directory;
|
|
||||||
import org.apache.lucene.store.FSDirectory;
|
|
||||||
import org.nl.common.domain.Property;
|
|
||||||
import org.nl.common.utils.YmlConfigFileUtil;
|
|
||||||
import org.slf4j.MDC;
|
import org.slf4j.MDC;
|
||||||
import org.wltea.analyzer.lucene.IKAnalyzer;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
public class AsyncLuceneAppender extends AspectLogbackAsyncAppender {
|
public class AsyncLuceneAppender extends AspectLogbackAsyncAppender {
|
||||||
|
|
||||||
|
|
||||||
|
public AsyncLuceneAppender() {
|
||||||
|
RemoteLogServer.asyncLuceneAppender = this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void append(ILoggingEvent event) {
|
protected void append(ILoggingEvent event) {
|
||||||
String traceId = LuceneAppender.traceIdTL.get();
|
String traceId = LuceneAppender.traceIdTL.get();
|
||||||
if (StringUtils.isNotEmpty(traceId)){
|
if (StringUtils.isNotEmpty(traceId)){
|
||||||
MDC.put("traceId",traceId);
|
MDC.put("traceId",traceId);
|
||||||
Map<String, String> mdcPropertyMap = event.getMDCPropertyMap();
|
Map<String, String> mdcPropertyMap = event.getMDCPropertyMap();
|
||||||
if (mdcPropertyMap.getClass().getName().contains("SynchronizedMap")){
|
if (mdcPropertyMap.getClass().getName().contains("SynchronizedMap")){
|
||||||
mdcPropertyMap.put("traceId",traceId);
|
mdcPropertyMap.put("traceId",traceId);
|
||||||
}
|
}
|
||||||
MDC.clear();
|
MDC.clear();
|
||||||
}
|
}
|
||||||
|
RemoteLogServer.writeLog(event);
|
||||||
|
}
|
||||||
|
public void appendSync(ILoggingEvent event){
|
||||||
super.append(event);
|
super.append(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,45 @@
|
|||||||
|
package org.nl.common.lucene.netty;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.AbstractBootstrap;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @author ZZQ
|
||||||
|
* @Date 2024/1/22 10:01
|
||||||
|
*/
|
||||||
|
public abstract class AbstraceServer {
|
||||||
|
|
||||||
|
|
||||||
|
public AbstraceServer(SocketAddress address) {
|
||||||
|
this.address = address;
|
||||||
|
if (channel!=null){
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
doOpen();
|
||||||
|
doConnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbstractBootstrap server;
|
||||||
|
public SocketAddress address;
|
||||||
|
public Channel channel;
|
||||||
|
|
||||||
|
private final Lock connectLock = new ReentrantLock();
|
||||||
|
|
||||||
|
public abstract void doOpen();
|
||||||
|
|
||||||
|
public void doClose(){
|
||||||
|
if (channel!=null){
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public abstract void doConnect();
|
||||||
|
|
||||||
|
public void doDisConnect() throws Throwable{
|
||||||
|
channel.close();
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -0,0 +1,100 @@
|
|||||||
|
package org.nl.common.lucene.netty;
|
||||||
|
|
||||||
|
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.util.CharsetUtil;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.nl.common.lucene.AsyncLuceneAppender;
|
||||||
|
import org.nl.common.lucene.netty.AbstraceServer;
|
||||||
|
import org.nl.common.lucene.netty.impl.ClientServer;
|
||||||
|
import org.nl.common.lucene.netty.impl.RemoteServer;
|
||||||
|
import org.nl.common.utils.MapOf;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.SmartLifecycle;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.net.Inet4Address;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @author ZZQ
|
||||||
|
* @Date 2024/1/22 09:06
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class RemoteLogServer implements SmartLifecycle {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private StringRedisTemplate redisTemplate;
|
||||||
|
|
||||||
|
public static volatile Boolean LOCAL_LOG = Boolean.TRUE;
|
||||||
|
|
||||||
|
private Integer port = 20888;
|
||||||
|
|
||||||
|
public static AbstraceServer server;
|
||||||
|
|
||||||
|
public static AsyncLuceneAppender asyncLuceneAppender;
|
||||||
|
|
||||||
|
public static void writeLog(ILoggingEvent event){
|
||||||
|
if (LOCAL_LOG){
|
||||||
|
asyncLuceneAppender.appendSync(event);
|
||||||
|
}else {
|
||||||
|
ByteBuf log = Unpooled.copiedBuffer(JSON.toJSONString(event), CharsetUtil.UTF_8);
|
||||||
|
server.channel.writeAndFlush(log);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
try {
|
||||||
|
String provider = redisTemplate.opsForValue().get("providers");
|
||||||
|
if (StringUtils.isEmpty(provider)){
|
||||||
|
String ip = Inet4Address.getLocalHost().getHostAddress();
|
||||||
|
Map<String,Object> config = MapOf.of("ip", ip, "port", port);
|
||||||
|
redisTemplate.opsForValue().set("provider", JSON.toJSONString(config));
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread(() ->{
|
||||||
|
System.out.println("------服务关闭-升级从变主-------");
|
||||||
|
server.doClose();
|
||||||
|
redisTemplate.delete("providers");
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
}catch (Exception ex){}
|
||||||
|
}));
|
||||||
|
server = new RemoteServer(new InetSocketAddress(ip, port));
|
||||||
|
LOCAL_LOG =Boolean.TRUE;
|
||||||
|
}else {
|
||||||
|
Map<String,String> map = JSONObject.parseObject(provider, HashMap.class);
|
||||||
|
String ip = map.get("ip");
|
||||||
|
Integer port = Integer.valueOf(map.get("port"));
|
||||||
|
server = new ClientServer(new InetSocketAddress(ip, port));
|
||||||
|
LOCAL_LOG = Boolean.FALSE;
|
||||||
|
}
|
||||||
|
}catch (Exception ex){
|
||||||
|
ex.printStackTrace();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
package org.nl.common.lucene.netty.coder;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.handler.timeout.IdleStateEvent;
|
||||||
|
import org.nl.common.lucene.netty.RemoteLogServer;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @author ZZQ
|
||||||
|
* @Date 2024/1/22 10:24
|
||||||
|
*/
|
||||||
|
public class LogConsumerHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
System.out.println("断开连接---");
|
||||||
|
RemoteLogServer.LOCAL_LOG = Boolean.TRUE;
|
||||||
|
RemoteLogServer.server.doClose();
|
||||||
|
//重新建立
|
||||||
|
super.channelInactive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
System.out.println("连接");
|
||||||
|
super.channelActive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
System.out.println("接收到消息");
|
||||||
|
super.channelRead(ctx, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
|
if (evt instanceof IdleStateEvent){
|
||||||
|
IdleStateEvent stateEvent = (IdleStateEvent) evt;
|
||||||
|
System.out.println(stateEvent.state());
|
||||||
|
}
|
||||||
|
super.userEventTriggered(ctx, evt);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
package org.nl.common.lucene.netty.coder;
|
||||||
|
|
||||||
|
import ch.qos.logback.classic.spi.LoggingEvent;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import org.nl.common.lucene.netty.RemoteLogServer;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @author ZZQ
|
||||||
|
* @Date 2024/1/22 10:24
|
||||||
|
*/
|
||||||
|
public class LogProviderHandler extends SimpleChannelInboundHandler<String> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
System.out.println("断开连接---");
|
||||||
|
super.channelInactive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, String c){
|
||||||
|
LoggingEvent event = JSONObject.parseObject(c, LoggingEvent.class);
|
||||||
|
RemoteLogServer.asyncLuceneAppender.appendSync(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
System.out.println("创建了连接----");
|
||||||
|
super.channelActive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,75 @@
|
|||||||
|
package org.nl.common.lucene.netty.impl;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
import io.netty.handler.codec.LengthFieldPrepender;
|
||||||
|
import io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
import io.netty.util.concurrent.Future;
|
||||||
|
import org.nl.common.lucene.netty.AbstraceServer;
|
||||||
|
import org.nl.common.lucene.netty.coder.LogConsumerHandler;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @author ZZQ
|
||||||
|
* @Date 2024/1/22 10:01
|
||||||
|
*/
|
||||||
|
public class ClientServer extends AbstraceServer {
|
||||||
|
|
||||||
|
private static EventLoopGroup group = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
public ClientServer(SocketAddress address) {
|
||||||
|
super(address);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 非阻塞IO线程组
|
||||||
|
@Override
|
||||||
|
public void doOpen() {
|
||||||
|
server = new Bootstrap();
|
||||||
|
server.group(group)
|
||||||
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||||
|
.option(ChannelOption.TCP_NODELAY, true)
|
||||||
|
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||||
|
.channel(NioSocketChannel.class);
|
||||||
|
|
||||||
|
server.handler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ch.pipeline()
|
||||||
|
.addLast("client-idle-handler", new IdleStateHandler(500, 0,0 , MILLISECONDS))
|
||||||
|
.addLast(new LengthFieldPrepender(2))
|
||||||
|
.addLast( new LogConsumerHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doClose() {
|
||||||
|
super.doClose();
|
||||||
|
Future<?> bossGroupShutdownFuture = group.shutdownGracefully();
|
||||||
|
bossGroupShutdownFuture.syncUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doConnect() {
|
||||||
|
try {
|
||||||
|
ChannelFuture connect = ((Bootstrap) server).connect(address);
|
||||||
|
connect.syncUninterruptibly();
|
||||||
|
channel = connect.channel();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
this.doClose();
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package org.nl.common.lucene.netty.impl;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
import io.netty.channel.*;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
|
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||||
|
import io.netty.handler.codec.LengthFieldPrepender;
|
||||||
|
import io.netty.handler.codec.string.StringDecoder;
|
||||||
|
import io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
import io.netty.util.concurrent.Future;
|
||||||
|
import org.nl.common.lucene.netty.AbstraceServer;
|
||||||
|
import org.nl.common.lucene.netty.coder.LogProviderHandler;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @author ZZQ
|
||||||
|
* @Date 2024/1/22 10:01
|
||||||
|
*/
|
||||||
|
public class RemoteServer extends AbstraceServer {
|
||||||
|
|
||||||
|
private static EventLoopGroup boss = new NioEventLoopGroup();
|
||||||
|
private static EventLoopGroup worker = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
public RemoteServer(SocketAddress address) {
|
||||||
|
super(address);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 非阻塞IO线程组
|
||||||
|
@Override
|
||||||
|
public void doOpen() {
|
||||||
|
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||||
|
bootstrap
|
||||||
|
.group(boss, worker)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
|
||||||
|
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
|
||||||
|
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
|
||||||
|
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ch.pipeline()
|
||||||
|
.addLast("client-idle-handler", new IdleStateHandler(500, 0, 0, MILLISECONDS))
|
||||||
|
.addLast(new LengthFieldBasedFrameDecoder(8089, 0, 2, 0, 2))
|
||||||
|
.addLast(new LengthFieldPrepender(2))
|
||||||
|
.addLast("encode",new StringDecoder())
|
||||||
|
.addLast(new LogProviderHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
server = bootstrap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doClose(){
|
||||||
|
Future<?> bossGroupShutdownFuture = boss.shutdownGracefully();
|
||||||
|
Future<?> workerGroupShutdownFuture = worker.shutdownGracefully();
|
||||||
|
bossGroupShutdownFuture.syncUninterruptibly();
|
||||||
|
workerGroupShutdownFuture.syncUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doConnect() {
|
||||||
|
ChannelFuture future = server.bind(address);
|
||||||
|
boolean ret = future.awaitUninterruptibly(3000, MILLISECONDS);
|
||||||
|
if (ret && future.isSuccess()) {
|
||||||
|
Channel newChannel = future.channel();
|
||||||
|
if (channel != null) {
|
||||||
|
channel.close();
|
||||||
|
channel = newChannel;
|
||||||
|
}
|
||||||
|
} else if (future.cause() != null) {
|
||||||
|
Throwable cause = future.cause();
|
||||||
|
cause.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -2,7 +2,7 @@ spring:
|
|||||||
freemarker:
|
freemarker:
|
||||||
check-template-location: false
|
check-template-location: false
|
||||||
profiles:
|
profiles:
|
||||||
active: prod
|
active: dev3
|
||||||
jackson:
|
jackson:
|
||||||
time-zone: GMT+8
|
time-zone: GMT+8
|
||||||
data:
|
data:
|
||||||
|
|||||||
Reference in New Issue
Block a user