opt:异常重试优化
This commit is contained in:
@@ -0,0 +1,26 @@
|
||||
package org.nl.acs.device_driver.conveyor.belt_conveyor;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
@Configuration
|
||||
@EnableAsync
|
||||
public class AsyncConfig {
|
||||
|
||||
@Bean(name = "retryTaskExecutor")
|
||||
public Executor retryTaskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(5); // 核心线程数
|
||||
executor.setMaxPoolSize(10); // 最大线程数
|
||||
executor.setQueueCapacity(25); // 队列容量
|
||||
executor.setThreadNamePrefix("Retry-"); // 线程名前缀
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,8 @@ import org.nl.config.SpringContextHolder;
|
||||
import org.nl.config.language.LangProcess;
|
||||
import org.nl.config.lucene.service.LuceneExecuteLogService;
|
||||
import org.nl.config.lucene.service.dto.LuceneLogDto;
|
||||
import org.nl.system.service.dict.ISysDictService;
|
||||
import org.nl.system.service.dict.dao.Dict;
|
||||
import org.nl.system.service.param.ISysParamService;
|
||||
import org.nl.system.service.user.dao.SysUser;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -49,6 +51,7 @@ import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 输送线
|
||||
@@ -68,6 +71,9 @@ public class BeltConveyorDeviceDriver extends AbstractOpcDeviceDriver implements
|
||||
TaskService taskserver = SpringContextHolder.getBean(TaskService.class);
|
||||
@Autowired
|
||||
RouteLineService routeLineService = SpringContextHolder.getBean(RouteLineService.class);
|
||||
@Autowired
|
||||
ISysDictService iSysDictService = SpringContextHolder.getBean(ISysDictService.class);
|
||||
|
||||
@Autowired
|
||||
AcsToWmsService acsToWmsService = SpringContextHolder.getBean(AcsToWmsServiceImpl.class);
|
||||
@Autowired
|
||||
@@ -79,6 +85,9 @@ public class BeltConveyorDeviceDriver extends AbstractOpcDeviceDriver implements
|
||||
@Autowired
|
||||
IbmVehicleRfidService ibmVehicleRfidService = SpringContextHolder.getBean(IbmVehicleRfidService.class);
|
||||
|
||||
@Autowired
|
||||
RetryService retryService = SpringContextHolder.getBean(RetryService.class);
|
||||
|
||||
LuceneExecuteLogService luceneExecuteLogService = SpringContextHolder.getBean("luceneExecuteLogServiceImpl");
|
||||
|
||||
ISysParamService paramService = SpringContextHolder.getBean(ISysParamService.class);
|
||||
@@ -430,179 +439,281 @@ public class BeltConveyorDeviceDriver extends AbstractOpcDeviceDriver implements
|
||||
// }
|
||||
|
||||
/**
|
||||
* 申请任务
|
||||
* 1. 将获取重试次数的逻辑封装成独立方法
|
||||
* 从系统字典中获取,如果获取失败或值不合法,则返回默认值7。
|
||||
*/
|
||||
public synchronized boolean
|
||||
instruction_require() {
|
||||
Date date = new Date();
|
||||
if (this.move != 1) {
|
||||
return false;
|
||||
}
|
||||
if (date.getTime() - this.instruction_require_time.getTime() < (long) this.instruction_require_time_out) {
|
||||
log.trace("触发时间因为小于{}毫秒,而被无视", this.instruction_require_time_out);
|
||||
return false;
|
||||
} else {
|
||||
this.instruction_require_time = date;
|
||||
Device device = deviceAppService.findDeviceByCode(device_code);
|
||||
log.info("device_code"+ device_code);
|
||||
List<String> list = device.getDeviceDriver().getExtraDeviceCodes("link_device_code");
|
||||
log.info("link_device_code_list"+ JSONObject.toJSONString(list));
|
||||
if (CollUtil.isNotEmpty(list)) {
|
||||
//关联的RFID设备
|
||||
String linkDeviceCode = list.get(0);
|
||||
BmVehicleRfid bmVehicleRfid = ibmVehicleRfidService.findByDeviceCode(linkDeviceCode);
|
||||
String rfid = bmVehicleRfid.getRfid();
|
||||
if (!StrUtil.isEmpty(rfid)) {
|
||||
JSONObject param = new JSONObject();
|
||||
param.put("device_code", device_code);
|
||||
param.put("vehicle_code", rfid);
|
||||
LuceneLogDto logDto = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库任务,接口请求参数:" + param)
|
||||
.build();
|
||||
logDto.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto);
|
||||
int i = 0;
|
||||
while (i < 3) {
|
||||
try {
|
||||
HttpResponse httpResponse = acsToWmsService.applyIn(param);
|
||||
JSONObject jsonObject = null;
|
||||
if (ObjectUtil.isNotEmpty(httpResponse)) {
|
||||
String body = httpResponse.body();
|
||||
jsonObject = JSONObject.parseObject(body);
|
||||
}
|
||||
message = "申请入库任务,接口返回参数:" + jsonObject;
|
||||
if (ObjectUtil.isNotNull(jsonObject) && jsonObject.getInteger("status") == 200) {
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库任务成功,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
List list1 = new ArrayList();
|
||||
Map map = new HashMap();
|
||||
map.put("code", "to_command");
|
||||
map.put("value", 5);
|
||||
list1.add(map);
|
||||
this.writing(list1);
|
||||
this.requireSucess = true;
|
||||
break;
|
||||
} else {
|
||||
this.iserror = true;
|
||||
message = "申请入库任务出错,接口返回参数:" + jsonObject;
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库任务出错,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
i++; // 先自增
|
||||
|
||||
// 如果不是最后一次失败,则等待
|
||||
if (i < 3) {
|
||||
try {
|
||||
Thread.sleep(6000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break; // 中断后退出循环
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("申请入库任务第 {} 次尝试发生异常: {}", i + 1, e.getMessage(), e);
|
||||
this.iserror = true; // 标记为错误
|
||||
i++; // 关键:异常也计入重试次数
|
||||
|
||||
// 同样,在异常后也需要等待,然后再重试
|
||||
if (i < 3) {
|
||||
try {
|
||||
Thread.sleep(6000);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
break; // 如果等待被中断,则退出循环
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//代表3次都没有成功
|
||||
if (i == 3) {
|
||||
HttpResponse httpResponse = acsToWmsService.applyInBillError(param);
|
||||
JSONObject jsonObject = null;
|
||||
if (ObjectUtil.isNotEmpty(httpResponse)) {
|
||||
String body = httpResponse.body();
|
||||
jsonObject = JSONObject.parseObject(body);
|
||||
}
|
||||
message = "申请入库申请异常功能,接口返回参数:" + jsonObject;
|
||||
if (ObjectUtil.isNotNull(jsonObject) && jsonObject.getInteger("status") == 200) {
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库申请异常任务成功,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
List list1 = new ArrayList();
|
||||
Map map = new HashMap();
|
||||
map.put("code", "to_command");
|
||||
map.put("value", 5);
|
||||
list1.add(map);
|
||||
this.writing(list1);
|
||||
this.requireSucess = true;
|
||||
} else {
|
||||
this.iserror = true;
|
||||
message = "申请入库申请异常任务出错,接口返回参数:" + jsonObject;
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库申请异常任务出错,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
}
|
||||
}
|
||||
private int getRetryTime() {
|
||||
final String DICT_NAME = "belt_conveyor_retry_time";
|
||||
final int DEFAULT_RETRY_TIME = 7;
|
||||
try {
|
||||
Dict dict = iSysDictService.getDictByName2(DICT_NAME);
|
||||
if (dict != null && StrUtil.isNotBlank(dict.getValue())) {
|
||||
int retryTime = Integer.parseInt(dict.getValue().trim());
|
||||
if (retryTime > 0) {
|
||||
log.debug("设备 {} 从字典获取到重试次数: {}", device_code, retryTime);
|
||||
return retryTime;
|
||||
} else {
|
||||
rfid = CodeUtil.getNewCode("INSTRUCT_NO");
|
||||
log.info("当前RFID没有读到值,无法发起任务。");
|
||||
log.warn("字典 '{}' 的值 {} 不合法,将使用默认重试次数 {}", DICT_NAME, dict.getValue(), DEFAULT_RETRY_TIME);
|
||||
}
|
||||
} else {
|
||||
log.warn("未找到字典 '{}' 或其值为空,将使用默认重试次数 {}", DICT_NAME, DEFAULT_RETRY_TIME);
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
log.error("字典 '{}' 的值格式错误,无法转换为整数。将使用默认重试次数 {}", DICT_NAME, DEFAULT_RETRY_TIME, e);
|
||||
} catch (Exception e) {
|
||||
log.error("获取字典 '{}' 时发生未知异常。将使用默认重试次数 {}", DICT_NAME, DEFAULT_RETRY_TIME, e);
|
||||
}
|
||||
return DEFAULT_RETRY_TIME;
|
||||
}
|
||||
|
||||
/**
|
||||
* 申请任务
|
||||
*/
|
||||
// 2. 移除方法级别的 synchronized,但保持关键部分的同步
|
||||
public synchronized boolean instruction_require() {
|
||||
Date now = new Date();
|
||||
|
||||
// 快速检查:冷却时间和设备状态
|
||||
synchronized (this) {
|
||||
if (this.move != 1) {
|
||||
log.trace("设备 {} 不满足申请任务条件 (move={}),快速返回。", device_code, move);
|
||||
return false;
|
||||
}
|
||||
if (now.getTime() - this.instruction_require_time.getTime() < (long) this.instruction_require_time_out) {
|
||||
log.trace("设备 {} 申请任务过于频繁,触发冷却时间 {}ms,指令被忽略。", device_code, this.instruction_require_time_out);
|
||||
return false;
|
||||
}
|
||||
// 更新最后一次触发时间
|
||||
this.instruction_require_time = now;
|
||||
}
|
||||
|
||||
log.info("开始为设备 {} 申请入库任务。", device_code);
|
||||
|
||||
Device device = deviceAppService.findDeviceByCode(device_code);
|
||||
if (device == null || device.getDeviceDriver() == null) {
|
||||
log.error("设备 {} 或其驱动信息不存在,无法申请任务。", device_code);
|
||||
return false;
|
||||
}
|
||||
List<String> linkDeviceCodes = device.getDeviceDriver().getExtraDeviceCodes("link_device_code");
|
||||
log.info("设备 {} 关联的RFID设备列表: {}", device_code, JSONObject.toJSONString(linkDeviceCodes));
|
||||
|
||||
if (CollUtil.isEmpty(linkDeviceCodes)) {
|
||||
log.warn("设备 {} 未配置关联的RFID设备,无法申请任务。", device_code);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 只处理第一个关联的RFID设备
|
||||
String linkDeviceCode = linkDeviceCodes.get(0);
|
||||
String rfid = null;
|
||||
BmVehicleRfid bmVehicleRfid = ibmVehicleRfidService.findByDeviceCode(linkDeviceCode);
|
||||
if (bmVehicleRfid != null) {
|
||||
rfid = bmVehicleRfid.getRfid();
|
||||
}
|
||||
|
||||
// 如果第一次查询失败,则进行有限次数的同步重试
|
||||
if (StrUtil.isEmpty(rfid)) {
|
||||
int retryTime = getRetryTime();
|
||||
boolean rfidFound = false;
|
||||
for (int i = 0; i < retryTime; i++) {
|
||||
try {
|
||||
log.info("设备 {} 第 {} 次重试查询RFID (关联设备: {}).", device_code, i + 1, linkDeviceCode);
|
||||
bmVehicleRfid = ibmVehicleRfidService.findByDeviceCode(linkDeviceCode);
|
||||
|
||||
if (bmVehicleRfid != null) {
|
||||
rfid = bmVehicleRfid.getRfid();
|
||||
if (StrUtil.isNotBlank(rfid)) {
|
||||
log.info("设备 {} 第 {} 次重试成功获取到RFID: {}", device_code, i + 1, rfid);
|
||||
rfidFound = true;
|
||||
break; // 获取成功,跳出循环
|
||||
}
|
||||
}
|
||||
|
||||
// 如果不是最后一次失败,则等待后重试
|
||||
if (i < retryTime - 1) {
|
||||
long sleepTime = 1000 * (i + 1); // 等待时间递增:1s, 2s, 3s...
|
||||
log.info("设备 {} 第 {} 次重试未获取到有效RFID,将在 {}ms 后重试。", device_code, i + 1, sleepTime);
|
||||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt(); // 响应中断
|
||||
log.warn("设备 {} RFID查询线程被中断,已尝试 {} 次。", device_code, i + 1, e);
|
||||
break; // 中断后退出重试
|
||||
} catch (Exception e) {
|
||||
log.error("设备 {} 第 {} 次查询RFID时发生异常。", device_code, i + 1, e);
|
||||
// 发生异常也进行等待,避免请求风暴
|
||||
if (i < retryTime - 1) {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("设备 {} RFID查询重试等待被中断。", device_code, ie);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!rfidFound) {
|
||||
log.error("设备 {} 重试 {} 次后仍未获取到有效RFID,执行错误处理流程。", device_code, retryTime);
|
||||
String errorRfid = CodeUtil.getNewCode("INSTRUCT_NO");
|
||||
JSONObject param = new JSONObject();
|
||||
param.put("device_code", device_code);
|
||||
param.put("vehicle_code", rfid);
|
||||
HttpResponse httpResponse = acsToWmsService.applyInBillError(param);
|
||||
JSONObject jsonObject = null;
|
||||
if (ObjectUtil.isNotEmpty(httpResponse)) {
|
||||
String body = httpResponse.body();
|
||||
jsonObject = JSONObject.parseObject(body);
|
||||
}
|
||||
message = "申请入库申请异常功能,接口返回参数:" + jsonObject;
|
||||
if (ObjectUtil.isNotNull(jsonObject) && jsonObject.getInteger("status") == 200) {
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库申请异常任务成功,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
List list1 = new ArrayList();
|
||||
Map map = new HashMap();
|
||||
map.put("code", "to_command");
|
||||
map.put("value", 5);
|
||||
list1.add(map);
|
||||
this.writing(list1);
|
||||
this.requireSucess = true;
|
||||
} else {
|
||||
this.iserror = true;
|
||||
message = "申请入库申请异常任务出错,接口返回参数:" + jsonObject;
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库申请异常任务出错,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
}
|
||||
param.put("vehicle_code", errorRfid);
|
||||
applyInBillError(param);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// 成功获取到RFID,准备调用入库接口
|
||||
JSONObject param = new JSONObject();
|
||||
param.put("device_code", device_code);
|
||||
param.put("vehicle_code", rfid);
|
||||
LuceneLogDto logDto = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("设备 " + device_code + " 申请入库任务,接口请求参数:" + param)
|
||||
.build();
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto);
|
||||
|
||||
try {
|
||||
// 第一次同步调用入库接口
|
||||
log.info("设备 {} 第一次调用WMS入库接口。", device_code);
|
||||
HttpResponse httpResponse = acsToWmsService.applyIn(param);
|
||||
JSONObject jsonObject = ObjectUtil.isNotEmpty(httpResponse) ? JSONObject.parseObject(httpResponse.body()) : null;
|
||||
|
||||
if (ObjectUtil.isNotNull(jsonObject) && jsonObject.getInteger("status") == 200) {
|
||||
log.info("设备 {} 第一次调用WMS入库接口成功。", device_code);
|
||||
synchronized (this) {
|
||||
// 确保在同步块内更新requireSucess状态
|
||||
this.requireSucess = true;
|
||||
}
|
||||
handleApplyInSuccess(jsonObject, param);
|
||||
return true;
|
||||
} else {
|
||||
log.warn("设备 {} 第一次调用WMS入库接口失败。响应: {}", device_code, jsonObject);
|
||||
// --- 核心修改:提交给异步重试服务 ---
|
||||
synchronized (this) {
|
||||
// 防止并发条件下的重复提交
|
||||
if (!this.requireSucess) {
|
||||
int maxRetries = getRetryTime() - 1; // 减去已经尝试过的1次
|
||||
if (maxRetries > 0) {
|
||||
log.info("设备 {} 将任务提交给异步重试服务,剩余重试次数: {}", device_code, maxRetries);
|
||||
// 创建参数副本,避免传递this引用导致的潜在内存泄漏
|
||||
JSONObject retryParam = new JSONObject();
|
||||
retryParam.put("device_code", device_code);
|
||||
retryParam.put("vehicle_code", rfid);
|
||||
retryService.submitApplyInRetryTask(device_code, retryParam, 1, maxRetries);
|
||||
// 提交成功后,立即设置为成功,防止重复提交
|
||||
this.requireSucess = true;
|
||||
log.info("设备 {} 任务已成功提交至重试队列,本次申请流程结束。", device_code);
|
||||
} else {
|
||||
log.error("设备 {} 已无重试次数,执行错误处理流程。", device_code);
|
||||
applyInBillError(param);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("设备 {} 第一次调用WMS入库接口时发生严重异常。", device_code, e);
|
||||
// 发生异常也提交给重试服务
|
||||
synchronized (this) {
|
||||
// 双重检查锁定模式,防止并发条件下的重复提交
|
||||
if (!this.requireSucess) {
|
||||
int maxRetries = getRetryTime() - 1;
|
||||
if (maxRetries > 0) {
|
||||
log.info("设备 {} 将任务提交给异步重试服务,剩余重试次数: {}", device_code, maxRetries);
|
||||
// 创建参数副本,避免传递this引用导致的潜在内存泄漏
|
||||
JSONObject retryParam = new JSONObject();
|
||||
retryParam.put("device_code", device_code);
|
||||
retryParam.put("vehicle_code", rfid);
|
||||
retryService.submitApplyInRetryTask(device_code, retryParam, 1, maxRetries);
|
||||
this.requireSucess = true;
|
||||
log.info("设备 {} 任务已成功提交至重试队列,本次申请流程结束。", device_code);
|
||||
} else {
|
||||
log.error("设备 {} 已无重试次数,执行错误处理流程。", device_code);
|
||||
applyInBillError(param);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 注意:这里返回true表示本次申请流程已"处理完毕"(可能是成功或已提交重试),
|
||||
// 不代表WMS入库操作最终成功。最终成功与否由handleApplyInSuccess或RetryService决定。
|
||||
return true;
|
||||
}
|
||||
public void applyInBillError(JSONObject param) {
|
||||
try {
|
||||
HttpResponse httpResponse = acsToWmsService.applyInBillError(param);
|
||||
JSONObject jsonObject = null;
|
||||
if (ObjectUtil.isNotEmpty(httpResponse)) {
|
||||
String body = httpResponse.body();
|
||||
jsonObject = JSONObject.parseObject(body);
|
||||
}
|
||||
message = "申请入库申请异常功能,接口返回参数:" + jsonObject;
|
||||
if (ObjectUtil.isNotNull(jsonObject) && jsonObject.getInteger("status") == 200) {
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库申请异常任务成功,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
List list1 = new ArrayList();
|
||||
Map map = new HashMap();
|
||||
map.put("code", "to_command");
|
||||
map.put("value", 5);
|
||||
list1.add(map);
|
||||
this.writing(list1);
|
||||
this.requireSucess = true;
|
||||
} else {
|
||||
this.iserror = true;
|
||||
message = "申请入库申请异常任务出错,接口返回参数:" + jsonObject;
|
||||
LuceneLogDto logDto2 = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("申请入库申请异常任务出错,接口返回参数:" + jsonObject)
|
||||
.build();
|
||||
logDto2.setLog_level(4);
|
||||
luceneExecuteLogService.deviceExecuteLog(logDto2);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("设备 {} 调用入库异常处理接口时发生严重错误,参数: {}", device_code, param, e);
|
||||
this.iserror = true;
|
||||
this.message = "入库申请失败,且异常处理流程也发生错误。";
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 4. 处理入库申请成功
|
||||
* 当WMS入库接口调用成功时(无论是第一次还是重试后),执行此方法。
|
||||
*/
|
||||
public synchronized void handleApplyInSuccess(JSONObject jsonObject, JSONObject param) {
|
||||
try {
|
||||
log.info("设备 {} 入库申请成功,准备下发执行命令。LMS响应: {}", device_code, jsonObject);
|
||||
|
||||
LuceneLogDto successLog = LuceneLogDto.builder()
|
||||
.device_code(device_code)
|
||||
.content("设备 " + device_code + " 入库申请成功。WMS响应: " + jsonObject)
|
||||
.build();
|
||||
luceneExecuteLogService.deviceExecuteLog(successLog);
|
||||
|
||||
// 下发执行命令
|
||||
List<Map<String, Object>> commandList = new ArrayList<>();
|
||||
Map<String, Object> commandMap = new HashMap<>();
|
||||
commandMap.put("code", "to_command");
|
||||
commandMap.put("value", 5); // 假设5是执行命令
|
||||
commandList.add(commandMap);
|
||||
this.writing(commandList);
|
||||
|
||||
// 标记为成功,防止在冷却期内重复申请
|
||||
this.requireSucess = true;
|
||||
this.iserror = false; // 清除错误状态
|
||||
this.message = ""; // 清除错误信息
|
||||
|
||||
log.info("设备 {} 成功下发入库执行命令,任务申请流程全部完成。", device_code);
|
||||
} catch (Exception e) {
|
||||
log.error("设备 {} 处理入库成功响应时发生异常,可能导致命令下发失败。", device_code, e);
|
||||
// 即使下发命令失败,也认为申请成功,避免死循环重试
|
||||
this.requireSucess = true;
|
||||
}
|
||||
}
|
||||
public void writing(List list) {
|
||||
|
||||
Map<String, Object> itemMap = new HashMap<String, Object>();
|
||||
|
||||
@@ -0,0 +1,105 @@
|
||||
package org.nl.acs.device_driver.conveyor.belt_conveyor;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.http.HttpResponse;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.nl.acs.device.domain.Device;
|
||||
import org.nl.acs.device_driver.DeviceDriver;
|
||||
import org.nl.acs.ext.wms.service.AcsToWmsService;
|
||||
import org.nl.acs.opc.DeviceAppService;
|
||||
import org.nl.config.SpringContextHolder;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
@Service
|
||||
public class RetryService {
|
||||
|
||||
@Autowired
|
||||
private AcsToWmsService acsToWmsService;
|
||||
|
||||
@Autowired
|
||||
private DeviceAppService deviceAppService;
|
||||
|
||||
/**
|
||||
* 异步提交WMS入库申请重试任务
|
||||
*
|
||||
* @param device_code 设备代码
|
||||
* @param param WMS接口请求参数
|
||||
* @param currentRetry 当前重试次数 (从0开始)
|
||||
* @param maxRetries 最大重试次数
|
||||
*/
|
||||
@Async("retryTaskExecutor") // 指定一个专门用于执行重试任务的线程池
|
||||
public void submitApplyInRetryTask(String device_code, JSONObject param, int currentRetry, int maxRetries) {
|
||||
if (currentRetry > maxRetries) {
|
||||
log.error("设备 {} 入库申请重试次数已耗尽 ({}次),任务最终失败。参数: {}",
|
||||
device_code, maxRetries, param);
|
||||
// 这里需要获取对应的设备驱动实例来调用applyInBillError
|
||||
BeltConveyorDeviceDriver driver = getDeviceDriver(device_code);
|
||||
if (driver != null) {
|
||||
driver.applyInBillError(param); // 最终失败,调用错误处理
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 指数退避策略:计算等待时间 (1s, 2s, 4s, 8s, ...)
|
||||
long delay = 6;
|
||||
|
||||
try {
|
||||
log.info("设备 {} 准备进行第 {} 次重试 (共 {} 次),将在 {}s 后执行。",
|
||||
device_code, currentRetry, maxRetries, delay);
|
||||
|
||||
log.info("设备 {} 开始第 {} 次重试调用WMS入库接口。", device_code, currentRetry);
|
||||
HttpResponse httpResponse = acsToWmsService.applyIn(param);
|
||||
JSONObject jsonObject = ObjectUtil.isNotEmpty(httpResponse) ? JSONObject.parseObject(httpResponse.body()) : null;
|
||||
|
||||
if (ObjectUtil.isNotNull(jsonObject) && jsonObject.getInteger("status") == 200) {
|
||||
log.info("设备 {} 第 {} 次重试调用WMS入库接口成功!", device_code, currentRetry );
|
||||
// 获取驱动实例调用成功处理方法
|
||||
BeltConveyorDeviceDriver driver = getDeviceDriver(device_code);
|
||||
if (driver != null) {
|
||||
driver.handleApplyInSuccess(jsonObject, param); // 重试成功,回调驱动的成功处理方法
|
||||
}
|
||||
} else {
|
||||
TimeUnit.SECONDS.sleep(delay);
|
||||
log.warn("设备 {} 第 {} 次重试调用WMS入库接口失败。响应: {}", device_code, currentRetry + 1, jsonObject);
|
||||
// 递归调用,进行下一次重试
|
||||
currentRetry++;
|
||||
submitApplyInRetryTask(device_code, param, currentRetry, maxRetries);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
currentRetry++;
|
||||
log.warn("设备 {} 第 {} 次重试任务被中断。", device_code, currentRetry, e);
|
||||
// 可以选择是否重新提交一次
|
||||
submitApplyInRetryTask(device_code, param, currentRetry, maxRetries);
|
||||
} catch (Exception e) {
|
||||
log.error("设备 {} 第 {} 次重试调用WMS入库接口时发生严重异常。", device_code, currentRetry + 1, e);
|
||||
currentRetry++;
|
||||
// 发生异常也继续重试
|
||||
submitApplyInRetryTask(device_code, param, currentRetry, maxRetries);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据设备代码获取对应的设备驱动实例
|
||||
* @param device_code 设备代码
|
||||
* @return 设备驱动实例,如果未找到返回null
|
||||
*/
|
||||
private BeltConveyorDeviceDriver getDeviceDriver(String device_code) {
|
||||
// 这里需要实现根据device_code获取驱动实例的逻辑
|
||||
// 可能需要从某个容器或Map中获取
|
||||
// 假设通过SpringContextHolder和设备管理服务获取
|
||||
Device device = deviceAppService.findDeviceByCode(device_code);
|
||||
if (device != null) {
|
||||
DeviceDriver driver = device.getDeviceDriver();
|
||||
if (driver instanceof BeltConveyorDeviceDriver) {
|
||||
return (BeltConveyorDeviceDriver) driver;
|
||||
}
|
||||
}
|
||||
log.error("未找到设备 {} 对应的驱动实例", device_code);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -189,7 +189,10 @@ public class AcsToWmsServiceImpl implements AcsToWmsService {
|
||||
luceneLogService.interfaceExecuteLog(luceneLogDto);
|
||||
return result2;
|
||||
|
||||
} finally {
|
||||
} catch (Exception e){
|
||||
log.info("推送LMS异常。异常=【{}】",e.getMessage());
|
||||
return result2;
|
||||
}finally {
|
||||
MDC.remove(log_file_type);
|
||||
}
|
||||
}
|
||||
@@ -203,6 +206,7 @@ public class AcsToWmsServiceImpl implements AcsToWmsService {
|
||||
String wmsurl = paramService.findByCode(AcsConfig.WMSURL).getValue();
|
||||
AddressDto addressDto = addressService.findByCode("applyInBillError");
|
||||
String url = wmsurl + addressDto.getMethods_url();
|
||||
log.info("url"+url);
|
||||
try {
|
||||
result2 = HttpRequest.post(url)
|
||||
.addInterceptor(tLogHutoolhttpInterceptor)
|
||||
|
||||
@@ -5,9 +5,14 @@ import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.io.unit.DataUnit;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.nl.acs.device.domain.Device;
|
||||
import org.nl.acs.device_driver.conveyor.belt_conveyor.BeltConveyorDeviceDriver;
|
||||
import org.nl.acs.device_driver.conveyor.belt_conveyor.RetryService;
|
||||
import org.nl.acs.instruction.domain.Instruction;
|
||||
import org.nl.acs.instruction.enums.InstructionStatusEnum;
|
||||
import org.nl.acs.opc.DeviceAppService;
|
||||
import org.nl.acs.task.enums.TaskTypeEnum;
|
||||
import org.nl.acs.task.service.dto.TaskDto;
|
||||
import org.nl.system.service.user.ISysUserService;
|
||||
@@ -30,6 +35,13 @@ public class ApplicationTest {
|
||||
@Autowired
|
||||
private ISysUserService userService;
|
||||
|
||||
@Autowired
|
||||
private RetryService retryService;
|
||||
|
||||
@Autowired
|
||||
private DeviceAppService deviceAppService;
|
||||
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
|
||||
|
||||
|
||||
@@ -41,6 +53,16 @@ public class ApplicationTest {
|
||||
System.out.println(qzz);
|
||||
}
|
||||
|
||||
@Test
|
||||
void t() throws InterruptedException {
|
||||
Device device = deviceAppService.findDeviceByCode("CRK03");
|
||||
BeltConveyorDeviceDriver beltConveyorDeviceDriver = (BeltConveyorDeviceDriver) device.getDeviceDriver();
|
||||
beltConveyorDeviceDriver.setMove(1);
|
||||
beltConveyorDeviceDriver.setDevice_code("CRK03");
|
||||
beltConveyorDeviceDriver.instruction_require();
|
||||
Thread.sleep(1000*300);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Scheduled(cron = "0/5 * * * * ?")
|
||||
void testOrderTask() {
|
||||
|
||||
Reference in New Issue
Block a user