add:任务定时下发

This commit is contained in:
zhangzhiqiang
2023-03-23 16:22:59 +08:00
parent 8006802b8b
commit 14144e5228
3 changed files with 139 additions and 6 deletions

View File

@@ -80,7 +80,7 @@ public class AgvInstService {
//判断缓存线是不是去深坑清洗
if (devicePoint.getString("next_regin_code").equals(ConstantParam.SK_REGION)){
String deviceSql = nextPointList.stream().map(a -> ((JSONObject) a).getString("device_code")).collect(Collectors.joining("','"));
JSONArray runDevs = WQL.getWO("sch_point").addParamMap(MapOf.of("flag","4", "device_code",deviceSql)).process().getResultJSONArray(0);
JSONArray runDevs = WQL.getWO("sch_point").addParamMap(MapOf.of("flag","4", "device_code","'"+deviceSql+"'")).process().getResultJSONArray(0);
if (runDevs.size()>0){
String device_code = runDevs.getJSONObject(0).getString("device_code");
JSONObject SKPoint = (JSONObject)nextPointList.stream().filter(o -> ((JSONObject) o).getString("device_code").equals(device_code)).findFirst().get();

View File

@@ -1,27 +1,42 @@
package org.nl.wms.sch.tasks;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.nl.common.enums.StatusEnum;
import org.nl.common.utils.IdUtil;
import org.nl.common.utils.MapOf;
import org.nl.modules.wql.WQL;
import org.nl.modules.wql.core.bean.WQLObject;
import org.nl.wms.ext.acs.service.WmsToAcsService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/*
* @author ZZQ
* @Date 2023/3/22 17:14
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class TaskScheduleService {
private ReentrantLock lock = new ReentrantLock();
private final WmsToAcsService wmsToAcsService;
@Scheduled(cron = "0/10 * * * * ?")
public void taskPublish(){
@@ -29,9 +44,74 @@ public class TaskScheduleService {
try {
if (islock){
WQLObject taskTable = WQLObject.getWQLObject("sch_base_task");
JSONArray array = taskTable.query("task_type < '" + StatusEnum.TASK_PUBLISH + "'").getResultJSONArray(0);
JSONArray all = taskTable.query("task_type < '" + StatusEnum.TASK_PUBLISH + "'").getResultJSONArray(0);
//分区域不同区域调用不同acs接口
Map<String, List<Object>> areaCollent = all.stream().collect(Collectors.groupingBy(o -> ((JSONObject) o).getString("product_area")));
for (String area : areaCollent.keySet()) {
List<Object> array = areaCollent.get(area);
if (array.size()>0){
String pointCollect = array.stream().map(o -> ((JSONObject) o).getString("point_code1") + "','" + ((JSONObject) o).getString("point_code2")).collect(Collectors.joining(","));
JSONArray needMergeCollect = WQL.getWO("sch_point").addParamMap(MapOf.of("flag", "5", "point_code", "'" + pointCollect + "'")).process().getResultJSONArray(0);
//区域编号对应point_code
Map<String, List<Object>> pointRegionCollent = needMergeCollect.stream().collect(Collectors.groupingBy(o -> ((JSONObject) o).getString("point_code")));
Set<String> mergePointCollent = needMergeCollect.stream().map(item -> ((JSONObject) item).getString("point_code")).collect(Collectors.toSet());
//合并下发的任务
ArrayList<JSONObject> notMerge = new ArrayList<>();//单独下发的任务
ArrayList<JSONObject> Merge = new ArrayList<>();//单独下发的任务
ArrayList<JSONObject> waitingTask = new ArrayList<>();//可合并,但是只有单条任务
Map<String, String> taskGroupMap = new HashMap<>();
for (Object o : array) {
String taskGroupId = IdUtil.getStringId();
JSONObject task = (JSONObject) o;
String taskId = task.getString("task_id");
if (taskGroupMap.get(taskId) != null){
task.put("task_group_id",taskGroupMap.get(taskId));
continue;
}
String start = task.getString("point_code1");
String end = task.getString("point_code2");
taskGroupMap.put(taskId,taskGroupId);
//如果点位不属于查询出来可以合并点位集合:则单独下发集合
if (mergePointCollent.contains(start)&&mergePointCollent.contains(end)){
notMerge.add(task);
continue;
}
//起点点位是否有多个合并任务点,如果没有则判断终点是否有多个合并任务点;都没有则说明当前起点终点对应区域只有一个任务:放入等待下发集合中
String mergeTargetId = getMergePoint(pointRegionCollent, start,end,array);
if (mergeTargetId == null){
waitingTask.add(task);
continue;
}
//查询到相同区域可以合并的点位集合
taskGroupMap.put(mergeTargetId,taskGroupId);
Merge.add(task);
}
//开始下发处理waitingTask如果创建时间>2分钟则改单独下发
if (waitingTask.size()>1){
List<JSONObject> needPublish = waitingTask.stream().filter(a -> a.getString("create_time") == DateUtil.now()).collect(Collectors.toList());
notMerge.addAll(needPublish);
}
String notMergeID = notMerge.stream().map(a -> ((JSONObject) a).getString("task_id")).collect(Collectors.joining(","));
String waitingTaskId = waitingTask.stream().map(a -> ((JSONObject) a).getString("task_id")).collect(Collectors.joining(","));
String MergeId = Merge.stream().map(a -> ((JSONObject) a).getString("task_id")).collect(Collectors.joining(","));
log.info("TaskScheduleService#taskPublish notMerge:{},waitingTask:{},merge:{}",notMergeID,waitingTaskId,MergeId);
//下发任务:一次暂时下发十条同一个区域任务不超过10条
JSONArray form = new JSONArray();
Merge.addAll(notMerge);
for (JSONObject task : Merge) {
JSONObject param = new JSONObject(MapOf.of("task_id", task.getString("task_id")
, "task_type", task.getString("task_type")
, "start_point_code", task.getString("point_code1")
, "next_point_code", task.getString("point_code2")
, "return_point_code", task.getString("point_code3")
, "task_group_id", task.getString("task_group_id")
, "vehicle_code", task.getString("vehicle_code")
));
form.add(param);
}
wmsToAcsService.issueTaskToAcs(form);
}
}
}
}finally {
if (islock){
@@ -40,4 +120,39 @@ public class TaskScheduleService {
}
}
@NotNull
private String getMergePoint(Map<String, List<Object>> pointRegionCollent, String start,String end,List<Object> tasks) {
JSONObject startPointInfo = (JSONObject) pointRegionCollent.get(start).get(0);
String startPointCollent = startPointInfo.getString("pointCollent");
String[] split = startPointCollent.split(",");
if (split.length>1){
for (String s : split) {
if (!s.equals(start)){
Object task = tasks.stream().filter(a -> ((JSONObject) a).getString("point_code1").equals(s)).findAny().get();
return ((JSONObject)task).getString("task_id");
}
}
}else {
JSONObject endPointInfo = (JSONObject) pointRegionCollent.get(start).get(0);
String endPointCollent = endPointInfo.getString("pointCollent");
split = endPointCollent.split(",");
for (String s : split) {
if (!s.equals(start)){
Object task = tasks.stream().filter(a -> ((JSONObject) a).getString("point_code2").equals(s)).findAny().get();
return ((JSONObject)task).getString("task_id");
}
}
}
return null;
}
public static void main(String[] args) {
JSONArray array = new JSONArray();
for (int i =0;i<10;i++){
JSONObject jsonObject = new JSONObject(MapOf.of("product_area", new Random().nextInt(2) + 1, "name", UUID.randomUUID().toString()));
array.add(jsonObject);
}
}
}

View File

@@ -23,6 +23,7 @@
输入.material_id TYPEAS s_string
输入.vehicle_type TYPEAS s_string
输入.device_code TYPEAS s_string
输入.point_code TYPEAS s_string
[临时表]
--这边列出来的临时表就会在运行期动态创建
@@ -109,10 +110,27 @@
*
FROM
( SELECT * FROM pdm_bi_devicerunstatusrecord
where device_code in (' 输入.device_code ')
where device_code in ( 输入.device_code )
ORDER BY order_num DESC LIMIT 999999 ) a
GROUP BY
device_code
HAVING status_type = '02'
ENDPAGEQUERY
ENDIF
IF 输入.flag = "5"
PAGEQUERY
SELECT
sch_base_region.region_code,
sch_base_point.point_code,
GROUP_CONCAT(b.point_code) as pointCollent
FROM
sch_base_region
LEFT JOIN sch_base_point ON sch_base_region.region_code = sch_base_point.region_code
LEFT JOIN sch_base_point b ON sch_base_region.region_code = b.region_code AND b.point_code IN ( 输入.point_code )
WHERE
sch_base_region.MERGE = '1'
AND sch_base_point.point_code IN ( 输入.point_code )
GROUP BY sch_base_point.point_code;
ENDPAGEQUERY
ENDIF