rev 优化线程相关

This commit is contained in:
USER-20220102CG\noblelift
2024-07-13 08:35:29 +08:00
parent 3b4fdf64dd
commit b43a390a6f
15 changed files with 115 additions and 58 deletions

View File

@@ -3,6 +3,7 @@ package org.nl;
import cn.dev33.satoken.annotation.SaIgnore;
import com.alicp.jetcache.anno.config.EnableCreateCacheAnnotation;
import com.alicp.jetcache.anno.config.EnableMethodCache;
import org.dromara.dynamictp.core.spring.EnableDynamicTp;
import org.mybatis.spring.annotation.MapperScan;
import org.nl.config.SpringContextHolder;
import org.springframework.boot.SpringApplication;
@@ -24,6 +25,7 @@ import org.springframework.web.bind.annotation.RestController;
*/
@EnableAsync
@RestController
@EnableDynamicTp
@SpringBootApplication(exclude = {
QuartzAutoConfiguration.class,
org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration.class
@@ -33,6 +35,7 @@ import org.springframework.web.bind.annotation.RestController;
@EnableMethodCache(basePackages = "org.nl")
@EnableCreateCacheAnnotation
@MapperScan("org.nl.**.mapper")
public class AppRun {
public static void main(String[] args) {

View File

@@ -69,7 +69,6 @@ public class AgvNdcTwoDeviceDriver extends AbstractDeviceDriver implements Devic
DeviceErrorLogService deviceErrorLogService = SpringContextHolder.getBean(DeviceErrorLogServiceImpl.class);
DeviceService deviceService = SpringContextHolder.getBean(DeviceService.class);
LuceneExecuteLogService luceneExecuteLogService = SpringContextHolder.getBean(LuceneExecuteLogService.class);
final ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
TwoAgvPhase twoAgvPhase = new TwoAgvPhase();
String error_code = "0";

View File

@@ -45,7 +45,6 @@ import org.nl.config.thread.ThreadPoolExecutorUtil;
import org.openscada.opc.lib.da.Server;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 一楼木箱入库站点
@@ -54,7 +53,6 @@ import java.util.concurrent.ThreadPoolExecutor;
@Data
@RequiredArgsConstructor
public class BoxStorageOutConveyorDeviceDriver extends AbstractOpcDeviceDriver implements DeviceDriver, ExecutableDeviceDriver, RouteableDeviceDriver, DeviceStageMonitor, FeedLmsRealFailed {
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
protected ItemProtocol itemProtocol = new ItemProtocol(this);
InstructionService instructionService = SpringContextHolder.getBean(InstructionService.class);

View File

@@ -44,6 +44,7 @@ import org.nl.system.service.param.ISysParamService;
import org.openscada.opc.lib.da.Server;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -53,7 +54,7 @@ import java.util.concurrent.ThreadPoolExecutor;
@Data
@RequiredArgsConstructor
public class BoxSubvolumesConveyorDeviceDriver extends AbstractOpcDeviceDriver implements DeviceDriver, ExecutableDeviceDriver, RouteableDeviceDriver, DeviceStageMonitor, FeedLmsRealFailed {
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
private final static Executor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
protected ItemProtocol itemProtocol = new ItemProtocol(this);
InstructionService instructionService = SpringContextHolder.getBean(InstructionService.class);

View File

@@ -53,7 +53,6 @@ import java.util.concurrent.ThreadPoolExecutor;
@Data
@RequiredArgsConstructor
public class FinishedProductOutBindLableDeviceDriver extends AbstractOpcDeviceDriver implements DeviceDriver, ExecutableDeviceDriver, RouteableDeviceDriver, DeviceStageMonitor, FeedLmsRealFailed {
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
protected ItemProtocol itemProtocol = new ItemProtocol(this);
InstructionService instructionService = SpringContextHolder.getBean(InstructionService.class);

View File

@@ -26,7 +26,6 @@ import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Data
public class ManipulatorCacheDeviceDriver extends AbstractOpcDeviceDriver implements DeviceDriver, ExecutableDeviceDriver, RouteableDeviceDriver, DeviceStageMonitor, FeedLmsRealFailed {
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
protected ItemProtocol itemProtocol = new ItemProtocol(this);
//当前指令

View File

@@ -51,7 +51,6 @@ import java.util.concurrent.ThreadPoolExecutor;
@Data
@RequiredArgsConstructor
public class UnBoxLableConveyorDeviceDriver extends AbstractOpcDeviceDriver implements DeviceDriver, ExecutableDeviceDriver, RouteableDeviceDriver, DeviceStageMonitor, FeedLmsRealFailed {
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
protected ItemProtocol itemProtocol = new ItemProtocol(this);
InstructionService instructionService = SpringContextHolder.getBean(InstructionService.class);

View File

@@ -48,7 +48,7 @@ import org.nl.config.thread.ThreadPoolExecutorUtil;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executor;
/**
* 烘箱-行架机械手
@@ -79,7 +79,7 @@ public class OvenGantryManipulatorDeviceDriver extends AbstractOpcDeviceDriver i
@Autowired
LuceneExecuteLogService luceneExecuteLogService = SpringContextHolder.getBean(LuceneExecuteLogService.class);
final ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
final Executor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
//工作模式
int mode = 0;
@@ -464,12 +464,12 @@ public class OvenGantryManipulatorDeviceDriver extends AbstractOpcDeviceDriver i
list.add(map3);
this.writing(list);
if (startdevice.getDeviceDriver() instanceof HongXiangConveyorDeviceDriver) {
EXECUTOR.submit(() -> {
EXECUTOR.execute(() -> {
toOpenDoor(startdevice);
});
}
if (nextdevice.getDeviceDriver() instanceof HongXiangConveyorDeviceDriver) {
EXECUTOR.submit(() -> {
EXECUTOR.execute(() -> {
toOpenDoor(nextdevice);
});
}
@@ -672,12 +672,12 @@ public class OvenGantryManipulatorDeviceDriver extends AbstractOpcDeviceDriver i
list.add(map3);
this.writing(list);
if (startdevice.getDeviceDriver() instanceof HongXiangConveyorDeviceDriver) {
EXECUTOR.submit(() -> {
EXECUTOR.execute(() -> {
toOpenDoor(startdevice);
});
}
if (nextdevice.getDeviceDriver() instanceof HongXiangConveyorDeviceDriver) {
EXECUTOR.submit(() -> {
EXECUTOR.execute(() -> {
toOpenDoor(nextdevice);
});
}
@@ -938,7 +938,7 @@ public class OvenGantryManipulatorDeviceDriver extends AbstractOpcDeviceDriver i
HongXiangConveyorDeviceDriver hongXiangConveyorDeviceDriver;
if (device.getDeviceDriver() instanceof HongXiangConveyorDeviceDriver) {
hongXiangConveyorDeviceDriver = (HongXiangConveyorDeviceDriver) device.getDeviceDriver();
EXECUTOR.submit(() -> {
EXECUTOR.execute(() -> {
toCloseDoor(device);
});
}
@@ -1040,7 +1040,7 @@ public class OvenGantryManipulatorDeviceDriver extends AbstractOpcDeviceDriver i
LampThreecolorDeviceDriver lampThreecolorDeviceDriver;
if (nextDevice.getDeviceDriver() instanceof HongXiangConveyorDeviceDriver) {
hongXiangConveyorDeviceDriver = (HongXiangConveyorDeviceDriver) nextDevice.getDeviceDriver();
EXECUTOR.submit(() -> {
EXECUTOR.execute(() -> {
toCloseDoor(nextDevice);
});
if (StrUtil.isNotEmpty(taskDto.getOven_time())) {

View File

@@ -7,9 +7,12 @@ import org.nl.acs.device_driver.driver.ExecutableDeviceDriver;
import org.nl.acs.udw.UnifiedDataAccessor;
import org.nl.acs.udw.UnifiedDataAccessorFactory;
import org.nl.config.thread.TheadFactoryName;
import org.nl.config.thread.ThreadPoolExecutorUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
@@ -30,7 +33,10 @@ public class DeviceExecuteAutoRun extends AbstractAutoRunnable {
int multiple = cache_thread;
int loop_time_millions = 100;
ExecutorService executorService;
@Resource
private ThreadPoolExecutor executorService;
Map<String, BlockedRunable> runs;
public DeviceExecuteAutoRun() {
@@ -43,7 +49,7 @@ public class DeviceExecuteAutoRun extends AbstractAutoRunnable {
new ArrayBlockingQueue<>( queueLength),
new TheadFactoryName()
);*/
this.executorService = new ThreadPoolExecutor(
/*this.executorService = new ThreadPoolExecutor(
50,
100,
1L,
@@ -62,7 +68,8 @@ public class DeviceExecuteAutoRun extends AbstractAutoRunnable {
}
}
}
);
);*/
// this.executorService= ThreadPoolExecutorUtil.getPoll();
this.runs = new LinkedHashMap();
this.runs = Collections.synchronizedMap(this.runs);
}
@@ -121,7 +128,7 @@ public class DeviceExecuteAutoRun extends AbstractAutoRunnable {
this.runs.put(deviceDriver.getDeviceCode(), runnable);
}
runnable.setIndex(this.runs);
this.executorService.submit(runnable);
this.executorService.execute(runnable);
}
}

View File

@@ -1,18 +1,24 @@
package org.nl.acs.opc;
import cn.hutool.core.util.ObjectUtil;
import org.dromara.dynamictp.core.support.ThreadPoolBuilder;
import org.nl.acs.auto.run.AbstractAutoRunnable;
import org.nl.acs.opc.service.dto.OpcServerManageDto;
import org.nl.config.thread.TheadFactoryName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE;
/**
* OPC设备同步启动
@@ -22,7 +28,16 @@ import java.util.concurrent.Executors;
public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable {
public static boolean isRun = false;
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService = ThreadPoolBuilder.newBuilder()
.threadPoolName("deviceOpc_thread")
.threadFactory("deviceOpc_thread")
.corePoolSize(2)
.maximumPoolSize(7)
.keepAliveTime(40)
.timeUnit(TimeUnit.SECONDS)
.workQueue(MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), 2000)
.buildDynamic();
@Autowired
private DeviceAppService deviceAppService;
@Autowired
@@ -80,6 +95,14 @@ public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable {
public void after() {
isRun = false;
this.executorService.shutdownNow();
this.executorService = Executors.newCachedThreadPool();
this.executorService = ThreadPoolBuilder.newBuilder()
.threadPoolName("deviceOpc_thread")
.threadFactory("deviceOpc_thread")
.corePoolSize(2)
.maximumPoolSize(7)
.keepAliveTime(40)
.timeUnit(TimeUnit.SECONDS)
.workQueue(MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), 2000)
.buildDynamic();
}
}

View File

@@ -1,6 +1,7 @@
package org.nl.acs.socket;
import cn.hutool.core.util.StrUtil;
import org.dromara.dynamictp.core.support.ThreadPoolBuilder;
import org.nl.acs.auto.run.AbstractAutoRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -14,6 +15,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE;
/**
* @author 20220102CG\noblelift
@@ -22,7 +26,15 @@ import java.util.concurrent.Executors;
public class SocketListenerAutoRun extends AbstractAutoRunnable implements SocketService {
private static final Logger log = LoggerFactory.getLogger(SocketListenerAutoRun.class);
private ServerSocket serverSocket = null;
private ExecutorService threadPool = Executors.newCachedThreadPool();
private ExecutorService threadPool = ThreadPoolBuilder.newBuilder()
.threadPoolName("socketListener_thread")
.threadFactory("socketListener_thread")
.corePoolSize(2)
.maximumPoolSize(7)
.keepAliveTime(40)
.timeUnit(TimeUnit.SECONDS)
.workQueue(MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), 2000)
.buildDynamic();
private Map<String, Socket> sockets = new HashMap();
private Map<String, OutputStream> outputs = new HashMap();
@@ -91,7 +103,15 @@ public class SocketListenerAutoRun extends AbstractAutoRunnable implements Socke
public void after() {
System.out.println("清理工作。。。。。");
this.threadPool.shutdownNow();
this.threadPool = Executors.newCachedThreadPool();
this.threadPool = ThreadPoolBuilder.newBuilder()
.threadPoolName("socketListener_thread")
.threadFactory("socketListener_thread")
.corePoolSize(2)
.maximumPoolSize(7)
.keepAliveTime(40)
.timeUnit(TimeUnit.SECONDS)
.workQueue(MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), 2000)
.buildDynamic();
this.sockets = new HashMap();
this.outputs = new HashMap();
this.closeSocket(this.serverSocket);

View File

@@ -16,10 +16,12 @@
package org.nl.config.thread;
import org.dromara.dynamictp.core.DtpRegistry;
import org.nl.config.SpringContextHolder;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -31,8 +33,11 @@ import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorUtil {
public static ThreadPoolExecutor getPoll(){
AsyncTaskProperties properties = SpringContextHolder.getBean(AsyncTaskProperties.class);
public static Executor getPoll(){
return DtpRegistry.getExecutor("dtpExecutor1");
/* AsyncTaskProperties properties = SpringContextHolder.getBean(AsyncTaskProperties.class);
return new ThreadPoolExecutor(
properties.getCorePoolSize(),
properties.getMaxPoolSize(),
@@ -40,6 +45,6 @@ public class ThreadPoolExecutorUtil {
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(properties.getQueueCapacity()),
new TheadFactoryName()
);
);*/
}
}

View File

@@ -7,6 +7,8 @@ import lombok.extern.slf4j.Slf4j;
import org.nl.common.utils.RedisUtils;
import org.nl.common.utils.ThrowableUtil;
import org.nl.config.SpringContextHolder;
import org.nl.config.thread.AsyncTaskProperties;
import org.nl.config.thread.TheadFactoryName;
import org.nl.config.thread.ThreadPoolExecutorUtil;
import org.nl.system.service.quartz.ISysQuartzJobService;
import org.nl.system.service.quartz.dao.SysQuartzJob;
@@ -21,8 +23,10 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: /
@@ -41,8 +45,16 @@ public class ExecutionJob extends TLogQuartzJobBean {
/* @Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor EXECUTOR;*/
static AsyncTaskProperties properties = SpringContextHolder.getBean(AsyncTaskProperties.class);
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolExecutorUtil.getPoll();
private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveSeconds(),
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(properties.getQueueCapacity()),
new TheadFactoryName()
);;
@Override

View File

@@ -7,7 +7,6 @@ spring:
check-template-location: false
profiles:
active: dev
# active: prod
jackson:
time-zone: GMT+8
data:
@@ -44,7 +43,7 @@ spring:
enable_lazy_load_no_trans: true
dynamic:
tp:
enabled: false # 是否启用 dynamictp默认true
enabled: true # 是否启用 dynamictp默认true
enabledBanner: false # 是否启用 控制台banner默认true
enabledCollect: true # 是否开启监控指标采集默认true
collectorTypes: logging,test_collect # 监控数据采集器类型logging | micrometer | internal_logging默认micrometer
@@ -57,30 +56,23 @@ spring:
keepAliveTime: 60
runTimeout: 10000
queueTimeout: 100
notifyItems: # 报警项,不配置自动会按默认值配置(变更通知、容量报警、活性报警、拒绝报警、任务超时报警)
- type: change
enabled: true
- type: capacity # 队列容量使用率,报警项类型,查看源码 NotifyTypeEnum枚举类
enabled: true
threshold: 80 # 报警阈值默认70意思是队列使用率达到70%告警
interval: 120 # 报警间隔单位s默认120
- type: liveness # 线程池活性
enabled: true
threshold: 80 # 报警阈值,默认 70意思是活性达到70%告警
- type: reject # 触发任务拒绝告警
enabled: true
threshold: 100 # 默认阈值10
- type: run_timeout # 任务执行超时告警
enabled: true
threshold: 100 # 默认阈值10
- type: queue_timeout # 任务排队超时告警
enabled: true
threshold: 100 # 默认阈值10
executors: # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量
- threadPoolName: dtpExecutor1 # 线程池名称,必填
threadPoolAliasName: core_thread # 线程池别名,可选
executorType: common # 线程池类型 common、eager、ordered、scheduled、priority默认 common
corePoolSize: 10 # 核心线程数默认1
maximumPoolSize: 30 # 最大线程数默认cpu核数
queueCapacity: 1024 # 队列容量默认1024
queueType: VariableLinkedBlockingQueue # 任务队列查看源码QueueTypeEnum枚举类默认VariableLinkedBlockingQueue
rejectedHandlerType: CallerRunsPolicy # 拒绝策略查看RejectedTypeEnum枚举类默认AbortPolicy
keepAliveTime: 30 # 空闲线程等待超时时间默认60
threadNamePrefix: core_thread # 线程名前缀默认dtp
allowCoreThreadTimeOut: true # 是否允许核心线程池超时默认false
waitForTasksToCompleteOnShutdown: true # 参考spring线程池设计优雅关闭线程池默认true
awaitTerminationSeconds: 5 # 优雅关闭线程池时阻塞等待线程池中任务执行时间默认3单位s
preStartAllCoreThreads: false # 是否预热所有核心线程默认false
runTimeout: 2000 # 任务执行超时阈值单位ms默认0不统计
queueTimeout: 1000 # 任务在队列等待超时阈值单位ms默认0不统计
task:
pool:
# 核心线程池大小
@@ -137,7 +129,7 @@ security:
mybatis-plus:
configuration:
map-underscore-to-camel-case: false
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
mapper-locations:
- classpath:org.nl.**.mapper/*.xml
global-config:

View File

@@ -69,11 +69,11 @@ https://juejin.cn/post/6844903775631572999
<!--开发环境:打印控制台-->
<springProfile name="dev">
<root level="debug">
<appender-ref ref="asyncLuceneAppender"/>
<appender-ref ref="asyncFileAppender"/>
<!-- <appender-ref ref="asyncLuceneAppender"/>-->
<!-- <appender-ref ref="asyncFileAppender"/>-->
<appender-ref ref="CONSOLE"/>
</root>
<logger name="jdbc" level="ERROR" additivity="true">
<!-- <logger name="jdbc" level="ERROR" additivity="true">
<appender-ref ref="asyncFileAppender"/>
</logger>
<logger name="org.springframework" level="ERROR" additivity="true">
@@ -96,7 +96,7 @@ https://juejin.cn/post/6844903775631572999
</logger>
<logger name="org.jinterop" level="ERROR" additivity="true">
<appender-ref ref="asyncFileAppender"/>
</logger>
</logger>-->
</springProfile>
<!--测试环境:打印控制台-->