add:语音控制功能-初版待联调测试

This commit is contained in:
zhaoyf
2026-04-20 09:03:34 +08:00
parent f9ca8583fb
commit 6950c6c2c2
8 changed files with 1370 additions and 304 deletions

View File

@@ -1,11 +1,13 @@
package org.nl.wms.system_manage.controller.speech;
import lombok.extern.slf4j.Slf4j;
import org.nl.wms.system_manage.service.speech.SysSpeechService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.Map;
/**
* @author Zhao Ya Fei
@@ -17,6 +19,9 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/speech")
public class SysSpeechController {
@Resource
private SysSpeechService sysSpeechService;
/**
* 健康检查
* @return
@@ -45,8 +50,13 @@ public class SysSpeechController {
return ResponseEntity.noContent().build();
}
/**
* 用户确认文字
* @return
*/
@PostMapping("/confirm")
public ResponseEntity<Object> confirm(){
return ResponseEntity.noContent().build();
public ResponseEntity<Object> confirm(@RequestBody Map<String, Object> body){
sysSpeechService.confirm(body.getOrDefault("text","").toString());
return new ResponseEntity<>(HttpStatus.OK);
}
}

View File

@@ -0,0 +1,171 @@
package org.nl.wms.system_manage.controller.speech;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.nl.common.utils.MapOf;
import org.nl.wms.system_manage.service.speech.api.AsrTtsClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
@ServerEndpoint("/ws/speech")
public class SysSpeechWebSocketEndpoint {
private static AsrTtsClient asrTtsClient;
private final ObjectMapper objectMapper = new ObjectMapper();
// 存储每个会话的上下文
private static final Map<String, SessionContext> sessionContexts = new ConcurrentHashMap<>();
private Session session;
//注入静态属性
@Autowired
public void setAsrTtsClient(AsrTtsClient asrTtsClient) {
SysSpeechWebSocketEndpoint.asrTtsClient = asrTtsClient;
}
@OnOpen
public void onOpen(Session session) {
sessionContexts.put(session.getId(), new SessionContext());
this.session = session;
log.info("WebSocket 监听: {}", session.getId());
}
@OnMessage
public void onTextMessage(String message, Session session) throws IOException {
SessionContext ctx = sessionContexts.get(session.getId());
if (ctx == null) return;
JsonNode json = objectMapper.readTree(message);
String type = json.get("type").asText();
if ("start".equals(type)) {
//TODO 健康检查
asrTtsClient.health().thenAccept(result -> {
Integer resCode = JSON.parseObject(result.toString()).getInteger("code");
if (resCode != 0) {
sendError("语音控制服务异常");
}
}).exceptionally(ex -> {
log.error("健康检查异常: {}", session.getId(), ex);
sendError("语音控制服务异常");
return null;
});
} else if ("end".equals(type)) {
// 录音结束,将 PCM 数据发送给第三方 ASR
byte[] pcmData = ctx.audioBuffer.toByteArray();
//测试
// String resp = JSONObject.toJSONString(MapOf.of("type", "transcription", "text", "识别结果"));
// String resp1 = JSONObject.toJSONString(MapOf.of("type", "response", "text", "系统回复消息"));
// session.getBasicRemote().sendText(resp);
// session.getBasicRemote().sendText(resp1);
//测试结束
// 异步调用第三方服务,避免阻塞 WebSocket 线程
asrTtsClient.recognizeAsync(pcmData, ctx.sampleRate, ctx.channels, ctx.bitDepth)
.thenAccept(result -> {
JSONObject jsonObject = JSON.parseObject(result.toString(), JSONObject.class);
Integer code = jsonObject.getInteger("code");
if (code == null || code != 0) {
String msg = jsonObject.getString("msg");
sendError(msg != null ? msg : "未知错误");
return;
}
JSONObject data = jsonObject.getJSONObject("data");
if (data != null) {
String text = data.getString("text");
sendText(text);
} else {
sendError("语音转文字发生错误");
}
})
.exceptionally(ex -> {
log.error("ASR 异常: {}", session.getId(), ex);
sendError("ASR 异常");
return null;
});
ctx.audioBuffer.reset();
// 清理上下文
// sessionContexts.remove(session.getId());
}
}
@OnMessage
public void onBinaryMessage(ByteBuffer byteBuffer, Session session) {
SessionContext ctx = sessionContexts.get(session.getId());
if (ctx != null) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
try {
ctx.audioBuffer.write(bytes);
System.out.println("已接收音频数据:" + ctx.audioBuffer.size());
} catch (IOException e) {
log.error("写入音频数据异常: {}", session.getId(), e);
}
}
}
@OnClose
public void onClose(Session session) {
sessionContexts.remove(session.getId());
log.info("连接关闭: {}", session.getId());
}
@OnError
public void onError(Session session, Throwable error) {
log.error("语音控制ws连接错误", error);
sessionContexts.remove(session.getId());
}
// 会话上下文内部类
private static class SessionContext {
ByteArrayOutputStream audioBuffer = new ByteArrayOutputStream();
int sampleRate;
int channels;
int bitDepth;
@Override
public String toString() {
return "sampleRate=" + sampleRate + ", channels=" + channels + ", bitDepth=" + bitDepth;
}
}
private void sendText(String message) {
try {
Map<String, Object> map = new HashMap<>();
map.put("type", "result");
map.put("text", message);
// 结果通过 WebSocket 返回给前端
String response = objectMapper.writeValueAsString(map);
session.getBasicRemote().sendText(response);
} catch (IOException e) {
log.error("发送文本消息异常: {}", session.getId(), e);
}
}
private void sendError(String message) {
try {
Map<String, String> map = new HashMap<>();
map.put("type", "error");
map.put("message", message);
session.getBasicRemote().sendText(
objectMapper.writeValueAsString(map));
} catch (IOException e) {
log.error("语音控制:发送错误消息异常: {}", session.getId(), e);
}
}
}

View File

@@ -1,5 +1,7 @@
package org.nl.wms.system_manage.service.speech;
import java.util.Map;
public interface SysSpeechService {
/**
@@ -8,4 +10,25 @@ public interface SysSpeechService {
* @return Boolean
*/
Boolean health();
/**
* 语音转文字
*
* @return Map<String, Object>
*/
Map<String, Object> asr();
/**
* 文字转语音
*
* @return Map<String, Object>
*/
Map<String, Object> tts();
/**
* 语音识别结果确认
*
* @return Map<String, Object>
*/
Map<String, Object> confirm(String text);
}

View File

@@ -0,0 +1,85 @@
package org.nl.wms.system_manage.service.speech.api;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
* 语音识别服务接口
*/
@Component
public class AsrTtsClient {
@Value("${vc.base-url:http://127.0.0.1:5000}")
private String vcBaseUrl;
/**
* 健康检查
*/
public CompletableFuture<Object> health() {
return CompletableFuture.supplyAsync(() -> {
return HttpRequest.get(vcBaseUrl + "/health")
.execute()
.body();
});
}
/**
* 语音转文字
*/
public static void asr() {
}
/**
* 文字转语音
*/
public CompletableFuture<byte[]> tts(String text) {
return CompletableFuture.supplyAsync(() -> {
Map<String, Object> params = new HashMap<>();
params.put("text", text);
params.put("vcn", "xiaoyan");
params.put("speed", 50);
params.put("volume", 50);
params.put("pitch", 50);
return HttpRequest.post(vcBaseUrl + "/api/speech/tts")
.body(JSONObject.toJSONString(params))
.contentType("application/json")
.execute()
.bodyBytes();
});
}
/**
* 用户确认文字
*/
public CompletableFuture<Object> confirm(String text, boolean confirmed) {
return CompletableFuture.supplyAsync(() -> {
Map<String, Object> params = new HashMap<>();
params.put("text", text);
params.put("confirmed", confirmed);
return HttpRequest.post(vcBaseUrl + "/api/speech/confirm")
.body(JSONObject.toJSONString(params))
.contentType("application/json")
.execute()
.body();
});
}
public CompletionStage<Object> recognizeAsync(byte[] pcmData, int sampleRate, int channels, int bitDepth) {
return CompletableFuture.supplyAsync(() -> {
return HttpRequest.post(vcBaseUrl + "/api/speech/asr")
.form("audio", pcmData)
.contentType("multipart/form-data")
.execute()
.body();
});
}
}

View File

@@ -1,9 +1,15 @@
package org.nl.wms.system_manage.service.speech.impl;
import lombok.extern.slf4j.Slf4j;
import org.nl.common.utils.MapOf;
import org.nl.wms.system_manage.service.speech.SysSpeechService;
import org.nl.wms.system_manage.service.speech.api.AsrTtsClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
/**
* @author Zhao Ya Fei
* @date 2026-04-14
@@ -13,8 +19,38 @@ import org.springframework.stereotype.Service;
@Service
public class SysSpeechServiceImpl implements SysSpeechService {
@Resource
private AsrTtsClient asrTtsClient;
@Override
public Boolean health() {
return null;
}
@Override
public Map<String, Object> asr() {
return Collections.emptyMap();
}
@Override
public Map<String, Object> tts() {
return Collections.emptyMap();
}
@Override
public Map<String, Object> confirm(String text) {
if ("".equals( text)){
throw new RuntimeException("确认文本为空");
}
asrTtsClient.confirm(text, true)
.thenAccept(result -> {
log.info("语音控制:用户确认结果: {}", result);
}).exceptionally(ex -> {
log.error("语音控制:用户确认异常: {}", ex);
return null;
});
byte[] audio = asrTtsClient.tts(text).join();
//TODO 音频返回前端播放处理
return Collections.emptyMap();
}
}

View File

@@ -0,0 +1,98 @@
package org.nl.wms.system_manage.service.speech.session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.Session;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 暂时先不用
* 后续如果需要实时展示任务结果可能用到
*/
@Slf4j
//@Component
public class SessionManager {
// userId -> WebSocket Session
private final Map<String, Session> sessionMap = new ConcurrentHashMap<>();
private final AtomicInteger onlineCount = new AtomicInteger(0);
/**
* 注册会话(支持单端登录策略:新连接踢掉旧连接)
*/
public void register(String userId, Session session) {
Session oldSession = sessionMap.put(userId, session);
if (oldSession != null && oldSession.isOpen()) {
try { oldSession.close(); } catch (IOException ignored) {}
log.info("语音控制:🔄 用户 {} 旧连接已被踢出", userId);
}
onlineCount.incrementAndGet();
log.info("语音控制:✅ 用户 {} 已上线 | 当前在线: {}", userId, onlineCount.get());
}
/**
* 注销会话
*/
public void unregister(String userId) {
sessionMap.remove(userId);
onlineCount.decrementAndGet();
log.info("语音控制:❌ 用户 {} 已下线 | 当前在线: {}", userId, onlineCount.get());
}
/**
* 主动推送文本消息
*/
public boolean pushText(String userId, String message) {
Session session = sessionMap.get(userId);
if (session == null || !session.isOpen()) {
log.warn("⚠️ 推送失败: 用户 {} 不在线", userId);
return false;
}
try {
session.getBasicRemote().sendText(message);
return true;
} catch (IOException e) {
log.error("📤 推送文本异常: {}", userId, e);
// 连接已失效,自动清理
unregister(userId);
return false;
}
}
/**
* 主动推送二进制数据TTS音频、文件流等
*/
public boolean pushBinary(String userId, byte[] data) {
Session session = sessionMap.get(userId);
if (session == null || !session.isOpen()) {
log.warn("⚠️ 推送失败: 用户 {} 不在线", userId);
return false;
}
try {
session.getBasicRemote().sendBinary(ByteBuffer.wrap(data));
return true;
} catch (IOException e) {
log.error("📤 推送二进制异常: {}", userId, e);
unregister(userId);
return false;
}
}
/**
* 广播文本(如系统公告、全局状态)
*/
public void broadcastText(String message) {
sessionMap.values().forEach(session -> {
if (session.isOpen()) {
try { session.getBasicRemote().sendText(message); } catch (IOException ignored) {}
}
});
}
public int getOnlineCount() { return onlineCount.get(); }
}