fix:新增SSE推送任务,及AOP注解

This commit is contained in:
zhouz
2025-11-07 09:48:40 +08:00
parent d8af9d1b65
commit 831f253412
11 changed files with 371 additions and 1 deletions

View File

@@ -0,0 +1,29 @@
package org.nl.b_lms.pdm.screen.contorller;
import cn.dev33.satoken.annotation.SaIgnore;
import org.nl.b_lms.pdm.screen.store.InMemoryStore;
import org.nl.b_lms.sch.task.dao.SchBaseTask;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping(path = "/api/faults", produces = MediaType.APPLICATION_JSON_VALUE)
public class FaultController {
private final InMemoryStore store;
public FaultController(InMemoryStore store) {
this.store = store;
}
@GetMapping
@SaIgnore
public List<SchBaseTask> getFaults(@RequestParam(required = false) Integer limit) {
return store.findFaults();
}
}

View File

@@ -0,0 +1,29 @@
package org.nl.b_lms.pdm.screen.contorller;
import cn.dev33.satoken.annotation.SaIgnore;
import org.nl.b_lms.pdm.screen.store.InMemoryStore;
import org.nl.b_lms.sch.task.dao.SchBaseTask;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping(path = "/api/tasks", produces = MediaType.APPLICATION_JSON_VALUE)
public class ScreenTaskController {
private final InMemoryStore store;
public ScreenTaskController(InMemoryStore store) { this.store = store; }
@GetMapping
@SaIgnore
public List<SchBaseTask> getTasks() {
List<SchBaseTask> tasks = store.findTasks();
return tasks;
}
}

View File

@@ -0,0 +1,35 @@
package org.nl.b_lms.pdm.screen.model;
public class Fault {
private Long id;
private String deviceId;
private String deviceType;
private String faultContent;
private Long occurTs;
private String severity; // info/warn/error
public Fault() {}
public Fault(Long id, String deviceId, String deviceType, String faultContent, Long occurTs, String severity) {
this.id = id; this.deviceId = deviceId; this.deviceType = deviceType; this.faultContent = faultContent;
this.occurTs = occurTs; this.severity = severity;
}
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getDeviceType() { return deviceType; }
public void setDeviceType(String deviceType) { this.deviceType = deviceType; }
public String getFaultContent() { return faultContent; }
public void setFaultContent(String faultContent) { this.faultContent = faultContent; }
public Long getOccurTs() { return occurTs; }
public void setOccurTs(Long occurTs) { this.occurTs = occurTs; }
public String getSeverity() { return severity; }
public void setSeverity(String severity) { this.severity = severity; }
}

View File

@@ -0,0 +1,53 @@
package org.nl.b_lms.pdm.screen.model;
public class ScreenTask {
private Long id;
private String name;
private String type; // AGV搬运/堆垛机操作
private String status; // pending, running, completed, failed, paused
private String priority; // high/medium/low
private Integer progress; // 0-100
private String startLocation;
private String endLocation;
private String creator;
private Long createTs; // epoch millis
private String description;
public ScreenTask() {}
public ScreenTask(Long id, String name, String type, String status, String priority, Integer progress,
String startLocation, String endLocation, String creator, Long createTs, String description) {
this.id = id; this.name = name; this.type = type; this.status = status; this.priority = priority;
this.progress = progress; this.startLocation = startLocation; this.endLocation = endLocation;
this.creator = creator; this.createTs = createTs; this.description = description;
}
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public String getPriority() { return priority; }
public void setPriority(String priority) { this.priority = priority; }
public Integer getProgress() { return progress; }
public void setProgress(Integer progress) { this.progress = progress; }
public String getStartLocation() { return startLocation; }
public void setStartLocation(String startLocation) { this.startLocation = startLocation; }
public String getEndLocation() { return endLocation; }
public void setEndLocation(String endLocation) { this.endLocation = endLocation; }
public String getCreator() { return creator; }
public void setCreator(String creator) { this.creator = creator; }
public Long getCreateTs() { return createTs; }
public void setCreateTs(Long createTs) { this.createTs = createTs; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
}

View File

@@ -0,0 +1,69 @@
package org.nl.b_lms.pdm.screen.see;
import org.nl.b_lms.pdm.screen.store.InMemoryStore;
import org.nl.b_lms.sch.task.dao.SchBaseTask;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
public class SseBroadcaster {
private final InMemoryStore store;
private final CopyOnWriteArrayList<SseEmitter> taskEmitters = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<SseEmitter> faultEmitters = new CopyOnWriteArrayList<>();
public SseBroadcaster(InMemoryStore store) {
this.store = store;
}
public SseEmitter registerTasksEmitter() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
taskEmitters.add(emitter);
emitter.onCompletion(() -> taskEmitters.remove(emitter));
emitter.onTimeout(() -> taskEmitters.remove(emitter));
return emitter;
}
public SseEmitter registerFaultsEmitter() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
faultEmitters.add(emitter);
emitter.onCompletion(() -> faultEmitters.remove(emitter));
emitter.onTimeout(() -> faultEmitters.remove(emitter));
return emitter;
}
public void pushTasksSnapshot() {
List<SchBaseTask> snapshot = store.findTasks();
pushTasks(snapshot);
}
public void pushFaultsSnapshot() {
List<SchBaseTask> snapshot = store.findFaults();
pushFaults(snapshot);
}
public void pushTasks(List<SchBaseTask> tasks) {
if (taskEmitters.isEmpty()) return;
for (SseEmitter emitter : taskEmitters) {
try {
emitter.send(SseEmitter.event().name("tasks").data(tasks));
} catch (IOException e) {
emitter.completeWithError(e);
}
}
}
public void pushFaults(List<SchBaseTask> faults) {
if (faultEmitters.isEmpty()) return;
for (SseEmitter emitter : faultEmitters) {
try {
emitter.send(SseEmitter.event().name("faults").data(faults));
} catch (IOException e) {
emitter.completeWithError(e);
}
}
}
}

View File

@@ -0,0 +1,22 @@
package org.nl.b_lms.pdm.screen.see.aop;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 在方法或类上标注该注解,被执行后推送最新 SSE 快照。
* 可通过参数控制推送的通道(任务/故障)。
*/
@Target({TYPE, METHOD})
@Retention(RUNTIME)
@Documented
public @interface PushSseSnapshot {
boolean tasks() default true;
boolean faults() default true;
}

View File

@@ -0,0 +1,67 @@
package org.nl.b_lms.pdm.screen.see.aop;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.nl.b_lms.pdm.screen.see.SseBroadcaster;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* 注解驱动的 SSE 推送切面:
* - 方法或类标注 {@link PushSseSnapshot},在成功返回后推送最新快照;
* - 若处于事务中,则在事务提交后推送,确保前端拿到已提交的数据;
* - 否则立即推送。
*/
@Aspect
@Component
public class TaskUpdateBroadcastAspect {
private final SseBroadcaster broadcaster;
public TaskUpdateBroadcastAspect(SseBroadcaster broadcaster) {
this.broadcaster = broadcaster;
}
// 方法级注解:执行成功后触发推送
@AfterReturning(value = "@annotation(pushAnn)", argNames = "jp,pushAnn")
public void afterAnnotatedMethod(JoinPoint jp, PushSseSnapshot pushAnn) {
broadcastAfterCommitOrNow(pushAnn);
}
// 类级注解:类中任意方法成功返回后触发推送
@AfterReturning(value = "@within(org.nl.b_lms.pdm.screen.see.aop.PushSseSnapshot)")
public void afterAnnotatedClass(JoinPoint jp) {
// 若方法本身也标注了注解,则交由方法级 advice 处理,避免重复推送
MethodSignature sig = (MethodSignature) jp.getSignature();
if (sig.getMethod().isAnnotationPresent(PushSseSnapshot.class)) {
return;
}
PushSseSnapshot classAnn = jp.getTarget().getClass()
.getAnnotation(PushSseSnapshot.class);
if (classAnn != null) {
broadcastAfterCommitOrNow(classAnn);
}
}
// 任一写操作成功后,推送最新快照(替代定时器)
private void broadcastAfterCommitOrNow(PushSseSnapshot ann) {
Runnable doPush = () -> {
if (ann.tasks()) {
broadcaster.pushTasksSnapshot();
}
if (ann.faults()) {
try { broadcaster.pushFaultsSnapshot(); } catch (Throwable ignored) {}
}
};
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() { doPush.run(); }
});
} else {
doPush.run();
}
}
}

View File

@@ -0,0 +1,37 @@
package org.nl.b_lms.pdm.screen.see.controller;
import cn.dev33.satoken.annotation.SaIgnore;
import org.nl.b_lms.pdm.screen.see.SseBroadcaster;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
@RequestMapping("/sse")
@CrossOrigin
public class SseController {
private final SseBroadcaster broadcaster;
public SseController(SseBroadcaster broadcaster) { this.broadcaster = broadcaster; }
@GetMapping(value = "/tasks", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@SaIgnore
public SseEmitter streamTasks() {
SseEmitter emitter = broadcaster.registerTasksEmitter();
// 首帧:推送最新快照
broadcaster.pushTasksSnapshot();
return emitter;
}
@GetMapping(value = "/faults", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@SaIgnore
public SseEmitter streamFaults() {
SseEmitter emitter = broadcaster.registerFaultsEmitter();
// 首帧:推送最新快照
broadcaster.pushFaultsSnapshot();
return emitter;
}
}

View File

@@ -0,0 +1,27 @@
package org.nl.b_lms.pdm.screen.store;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.nl.b_lms.sch.task.dao.SchBaseTask;
import org.nl.b_lms.sch.task.service.IschBaseTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class InMemoryStore {
@Autowired
private IschBaseTaskService ischBaseTaskService;
public List<SchBaseTask> findTasks() {
return ischBaseTaskService.list(new LambdaQueryWrapper<SchBaseTask>()
.lt(SchBaseTask::getTask_status, "07"));
}
public List<SchBaseTask> findFaults() {
return ischBaseTaskService.list(new LambdaQueryWrapper<SchBaseTask>()
.lt(SchBaseTask::getTask_status, "07"));
}
}

View File

@@ -13,6 +13,7 @@ import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.nl.b_lms.pdm.screen.see.aop.PushSseSnapshot;
import org.nl.b_lms.sch.point.dao.BstIvtPackageinfoivt;
import org.nl.b_lms.sch.point.service.IbstIvtPackageinfoivtService;
import org.nl.b_lms.sch.task.dao.SchBaseTask;
@@ -105,6 +106,7 @@ public class DjqToKzjhcwTask extends AbstractAcsTask {
@Override
@Transactional(rollbackFor = Exception.class)
@PushSseSnapshot(faults = false)
public void updateTaskStatus(JSONObject taskObj, String status) {
String now = DateUtil.now();
SchBaseTask schBaseTask = taskService.getOne(new LambdaUpdateWrapper<SchBaseTask>().eq(SchBaseTask::getTask_id, taskObj.getString("task_id")), false);

View File

@@ -839,7 +839,7 @@ public class OutBoxManageServiceImpl implements OutBoxManageService {
.findFirst().orElse(null);
if (ObjectUtil.isEmpty(jsonExtMove)) {
throw new BadRequestException("此移库木箱【"+jsonExtMove.getString("storagevehicle_code")+"】没有绑定托盘,请核查");
throw new BadRequestException("此移库木箱【"+item.getString("storagevehicle_code")+"】没有绑定托盘,请核查");
}
});