fix:修复多线程导致无法锁定同排站点
This commit is contained in:
@@ -1,11 +1,8 @@
|
||||
package org.nl.wms.sch_manage.service.util;
|
||||
|
||||
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.nl.wms.sch_manage.enums.TaskStatus;
|
||||
@@ -17,7 +14,8 @@ import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.sql.Time;
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -30,83 +28,68 @@ import java.util.stream.Collectors;
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class AutoTask {
|
||||
private static final ScheduledExecutorService DELAY_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
/**
|
||||
* 任务服务
|
||||
* 延迟任务调度线程池,从单线程改为适中并发,避免任务堆积
|
||||
*/
|
||||
private ScheduledExecutorService delayExecutor;
|
||||
|
||||
@Autowired
|
||||
private ISchBaseTaskService taskService;
|
||||
|
||||
/**
|
||||
* 任务工厂服务
|
||||
*/
|
||||
@Autowired
|
||||
private TaskFactory taskFactory;
|
||||
|
||||
private final RedissonClient redissonClient;
|
||||
//定时任务
|
||||
@SneakyThrows
|
||||
public void run() {
|
||||
RLock lock = redissonClient.getLock(this.getClass().getName());
|
||||
boolean tryLock = lock.tryLock(0, TimeUnit.SECONDS);
|
||||
try {
|
||||
if (tryLock) {
|
||||
sendTask();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (tryLock) {
|
||||
lock.unlock();
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
//无界线程池,线程数
|
||||
this.delayExecutor = Executors.newScheduledThreadPool(4);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (delayExecutor != null && !delayExecutor.isShutdown()) {
|
||||
delayExecutor.shutdown();
|
||||
try {
|
||||
if (!delayExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
delayExecutor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
delayExecutor.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时下发任务
|
||||
* 定时任务
|
||||
*/
|
||||
// private void sendTask() {
|
||||
// List<SchBaseTask> taskList = taskService.list(new LambdaQueryWrapper<SchBaseTask>()
|
||||
// .eq(SchBaseTask::getIs_delete, IOSConstant.IS_DELETE_NO)
|
||||
// .eq(SchBaseTask::getTask_status, TaskStatus.CREATE.getCode()));
|
||||
// if (ObjectUtil.isEmpty(taskList)) {
|
||||
// return;
|
||||
// }
|
||||
// // 找出需要间隔的任务列表
|
||||
// List<SchBaseTask> delayTaskList = taskList.stream()
|
||||
// .filter(r -> "STInTask".equals(r.getConfig_code()))
|
||||
// .collect(Collectors.toList());
|
||||
// List<SchBaseTask> immediatelyTaskList = new ArrayList<>(taskList);
|
||||
// immediatelyTaskList.removeAll(delayTaskList);
|
||||
// //优先发送正常的任务列表
|
||||
// if (ObjectUtil.isNotEmpty(immediatelyTaskList)) {
|
||||
// // 整理下发acs参数
|
||||
// for (SchBaseTask taskDao : immediatelyTaskList) {
|
||||
// //任务优先级
|
||||
// taskDao.setPriority(StringUtils.isBlank(taskDao.getPriority()) ? "1" : taskDao.getPriority());
|
||||
// AbstractTask task = taskFactory.getTask(taskDao.getConfig_code());
|
||||
// task.sendTaskOne(taskDao.getTask_id());
|
||||
// }
|
||||
// }
|
||||
// //间隔3秒发一个任务
|
||||
// if (ObjectUtil.isNotEmpty(delayTaskList)) {
|
||||
// DELAY_EXECUTOR.execute(() -> {
|
||||
// try {
|
||||
// for (SchBaseTask taskDao : delayTaskList) {
|
||||
// //间隔3秒发一个任务
|
||||
// Thread.sleep(2000);
|
||||
// //任务优先级
|
||||
// taskDao.setPriority(StringUtils.isBlank(taskDao.getPriority()) ? "1" : taskDao.getPriority());
|
||||
// AbstractTask task = taskFactory.getTask(taskDao.getConfig_code());
|
||||
// task.sendTaskOne(taskDao.getTask_id());
|
||||
// }
|
||||
// } catch (InterruptedException e) {
|
||||
// Thread.currentThread().interrupt();
|
||||
// log.error("延迟任务被中断", e);
|
||||
// } catch (Exception e) {
|
||||
// log.error("延迟下发任务执行失败", e);
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
public void run() {
|
||||
RLock lock = redissonClient.getLock(this.getClass().getName());
|
||||
boolean locked = false;
|
||||
try {
|
||||
//最多等待 1 秒,锁自动释放时间 30 秒
|
||||
locked = lock.tryLock(2, 30, TimeUnit.SECONDS);
|
||||
if (locked) {
|
||||
sendTask();
|
||||
} else {
|
||||
log.debug("未获取到分布式锁,跳过本次执行");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("获取分布式锁或执行任务异常,可能 Redis 不可用", e);
|
||||
} finally {
|
||||
// 确保释放锁,且只能释放自己持有的锁
|
||||
if (locked && lock.isHeldByCurrentThread()) {
|
||||
try {
|
||||
lock.unlock();
|
||||
} catch (Exception e) {
|
||||
log.error("释放分布式锁失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时下发任务
|
||||
@@ -119,6 +102,7 @@ public class AutoTask {
|
||||
if (ObjectUtil.isEmpty(taskList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 统一设置默认优先级并分组
|
||||
Map<Boolean, List<SchBaseTask>> partitioned = taskList.stream()
|
||||
.peek(task -> {
|
||||
@@ -129,38 +113,35 @@ public class AutoTask {
|
||||
.collect(Collectors.partitioningBy(r -> "STInTask".equals(r.getConfig_code())));
|
||||
List<SchBaseTask> immediatelyTaskList = partitioned.get(false);
|
||||
List<SchBaseTask> delayTaskList = partitioned.get(true);
|
||||
|
||||
// 3. 立即下发任务
|
||||
if (ObjectUtil.isNotEmpty(immediatelyTaskList)) {
|
||||
//log.info("开始下发立即任务, 数量: {}", immediatelyTaskList.size());
|
||||
for (SchBaseTask task : immediatelyTaskList) {
|
||||
try {
|
||||
AbstractTask taskExecutor = taskFactory.getTask(task.getConfig_code());
|
||||
taskExecutor.sendTaskOne(task.getTask_id());
|
||||
// log.info("任务下发成功: taskId={}, configCode={}", task.getTask_id(), task.getConfig_code());
|
||||
} catch (Exception e) {
|
||||
log.error("任务下发失败: taskId={}, configCode={}", task.getTask_id(), task.getConfig_code(), e);
|
||||
}
|
||||
}
|
||||
//log.info("立即任务下发完成");
|
||||
}
|
||||
|
||||
// 4. 延迟下发任务(间隔 7 秒,异步执行)
|
||||
if (ObjectUtil.isNotEmpty(delayTaskList)) {
|
||||
delayTaskList.sort(Comparator.comparing(SchBaseTask::getTask_code));
|
||||
//log.info("开始调度延迟任务, 数量: {}, 间隔: 7秒", delayTaskList.size());
|
||||
long delaySeconds = 0;
|
||||
for (SchBaseTask task : delayTaskList) {
|
||||
DELAY_EXECUTOR.schedule(() -> {
|
||||
delayExecutor.schedule(() -> {
|
||||
try {
|
||||
AbstractTask taskExecutor = taskFactory.getTask(task.getConfig_code());
|
||||
taskExecutor.sendTaskOne(task.getTask_id());
|
||||
log.info("延迟任务下发成功: taskCode={}, configCode={}", task.getTask_code(), task.getConfig_code());
|
||||
//log.info("延迟任务下发成功: taskCode={}, configCode={}", task.getTask_code(), task.getConfig_code());
|
||||
} catch (Exception e) {
|
||||
log.error("延迟任务下发失败: taskId={}, configCode={}", task.getTask_id(), task.getConfig_code(), e);
|
||||
}
|
||||
}, delaySeconds, TimeUnit.SECONDS);
|
||||
delaySeconds += 7;
|
||||
}
|
||||
//log.info("延迟任务已全部提交调度");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,10 @@ package org.nl.wms.system_manage.enums;
|
||||
*/
|
||||
public class SysParamConstant {
|
||||
|
||||
/**
|
||||
* 是否反馈IWMS
|
||||
*/
|
||||
public final static String IS_CONNECT_IWMS = "is_connect_iwms";
|
||||
/**
|
||||
* 是否连接ACS
|
||||
*/
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package org.nl.wms.wbwms.service.impl;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.http.HttpRequest;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.nl.common.exception.BadRequestException;
|
||||
import org.nl.config.SpringContextHolder;
|
||||
@@ -14,14 +12,12 @@ import org.nl.wms.basedata_manage.service.dao.mapper.MdMeMaterialbaseMapper;
|
||||
import org.nl.wms.system_manage.enums.SysParamConstant;
|
||||
import org.nl.wms.system_manage.service.param.dao.Param;
|
||||
import org.nl.wms.system_manage.service.param.impl.SysParamServiceImpl;
|
||||
import org.nl.wms.warehouse_manage.enums.IOSConstant;
|
||||
import org.nl.wms.warehouse_manage.inAndOut.service.dto.TOWMSMSG;
|
||||
import org.nl.wms.warehouse_manage.service.dto.CheckToWmsMsg;
|
||||
import org.nl.wms.warehouse_manage.service.dto.MoveToWmsMsg;
|
||||
import org.nl.wms.wbwms.enums.WMSConstant;
|
||||
import org.nl.wms.wbwms.service.IWmsToWmsService;
|
||||
import org.nl.wms.wbwms.service.dto.IWmstoWmsResponse;
|
||||
import org.nl.wms.wbwms.service.dto.WmstoIWmsResponse;
|
||||
import org.slf4j.MDC;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -75,11 +71,20 @@ public class IWmsToWmsServiceImpl implements IWmsToWmsService {
|
||||
|
||||
@Override
|
||||
public JSONObject FinishOutTask(TOWMSMSG towmsmsg) {
|
||||
JSONObject result = new JSONObject();
|
||||
// 系统参数类
|
||||
SysParamServiceImpl sysParamService = SpringContextHolder.getBean(SysParamServiceImpl.class);
|
||||
//判断是否反馈IWMS
|
||||
Param isConnectAcs = sysParamService.findByCode(SysParamConstant.IS_CONNECT_IWMS);
|
||||
if (ObjectUtil.isNotEmpty(isConnectAcs)) {
|
||||
if (isConnectAcs.getValue().equals(IOSConstant.IS_DELETE_NO)) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
JSONObject result;
|
||||
// 系统参数类
|
||||
JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(towmsmsg), JSONObject.class);
|
||||
log.info("FinishOutTask请求参数是:-------------------" + jsonObject.toString());
|
||||
String wmsUrl = SpringContextHolder.getBean(SysParamServiceImpl.class).findByCode(SysParamConstant.WMS_URL).getValue();
|
||||
String wmsUrl = sysParamService.findByCode(SysParamConstant.WMS_URL).getValue();
|
||||
wmsUrl = wmsUrl + WMSConstant.INOUT_WMS_API;
|
||||
try {
|
||||
String resultMsg = HttpRequest.post(wmsUrl)
|
||||
|
||||
@@ -351,7 +351,7 @@ public class WmsToIWmsServiceImpl implements WmsToIWmsService {
|
||||
for (SchBasePoint point : list) {
|
||||
JSONObject pointData = new JSONObject();
|
||||
pointData.put("point_code", point.getPoint_code());
|
||||
pointData.put("status", point.getPoint_status().equals(IOSEnum.POINT_STATUS.code("空位")) ? "0" : "1");
|
||||
pointData.put("status", point.getPoint_status().equals(PointStatusEnum.EMPTY_POINT.getCode()) && point.getLock_up()? "0" : "1");
|
||||
dataList.add(pointData);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,8 +70,11 @@ spring:
|
||||
connectionMinimumIdleSize: 8
|
||||
connectionPoolSize: 8
|
||||
address: redis://127.0.0.1:6379
|
||||
idleConnectionTimeout: 10000
|
||||
idleConnectionTimeout: 30000
|
||||
timeout: 3000
|
||||
pingConnectionInterval: 30000
|
||||
retryAttempts: 3
|
||||
retryInterval: 1500
|
||||
|
||||
# 登录相关配置
|
||||
login:
|
||||
|
||||
@@ -11,7 +11,7 @@ spring:
|
||||
freemarker:
|
||||
check-template-location: false
|
||||
profiles:
|
||||
active: dev
|
||||
active: prod
|
||||
jackson:
|
||||
time-zone: GMT+8730 885 969
|
||||
data:
|
||||
|
||||
Reference in New Issue
Block a user