Java SpringBoot 扣子CozeAI SseEmitter流式对话完整实战 打字机效果

书接上回:springBoot 整合 扣子cozeAI 智能体 对话https://blog.csdn.net/weixin_44548582/article/details/147457236

上文实现的是一次性等待得到完整的AI回复内容,但随着问题和AI的逻辑日趋复杂,会明显增加这个等待时间,这对前端用户并不友好,所以需要实现与coze对话的流式、打字机效果。

核心工具:SseEmitter

基本概念

SseEmitter 是 Spring Framework 提供的一个类,用于实现服务器发送事件(Server-Sent Events, SSE)。SSE 是一种允许服务器向客户端推送实时更新的技术,通常用于实现实时通知、数据流传输等功能。SseEmitter 通过 HTTP 长连接保持与客户端的通信,服务器可以持续向客户端发送数据,而客户端则通过 EventSource API 接收这些数据。

实现流式传输的原理

SseEmitter 实现流式传输的核心在于它使用了 HTTP 长连接和分块传输编码(Chunked Transfer Encoding)。当客户端发起 SSE 请求时,服务器会保持连接打开,并通过分块传输的方式逐步发送数据。每个数据块都是一个独立的事件,客户端可以实时接收并处理这些事件。

实现打字机效果的原理

打字机效果是指文本逐字或逐行显示的效果。通过 SseEmitter,可以实现这种效果。服务器可以逐步发送文本的每个字符或每行,客户端接收到数据后立即追加显示,从而模拟出打字机的效果。

实战代码

application.yml配置
# Tomcat
server:port: 9210#扣子参数
coze:clientId: xxxxxxxxxxxxxpublicKeyId: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyprivateKeyFilePath: 本地或服务器绝对路径/private_key.pemwwwBase: https://www.coze.cnapiBase: https://api.coze.cn# 智能体IDbotId: zzzzzzzzzzzzzzzzzzzzzzzzzzzz
扣子参数配置类 
/*** 扣子参数配置类* @Author: Tenk*/
@Component
@ConfigurationProperties(prefix = "coze") // 通过yml读取
public class CozeConfig {//OAuth应用IDprivate String clientId;//公钥private String publicKeyId;//私钥证书private String privateKeyFilePath;//coze官网private String wwwBase;//cozeApi请求地址private String apiBase;//智能体botIdprivate String botId;
}
Coze授权工具类
/*** 初始化CozeJWTOAuth授权工具** @url https://github.com/coze-dev/coze-java/blob/main/example/src/main/java/example/auth/JWTOAuthExample.java* @Author: Tenk*/
@Component
public class CozeJWTOAuthUtil {private static final Logger log = LoggerFactory.getLogger(CozeJWTOAuthUtil.class);@Autowiredprivate CozeConfig cozeConfig;@Autowiredprivate RedisService redisService;//CozeAPIprivate JWTOAuthClient oauth;public OAuthToken getAccessToken(Long userId) {OAuthToken accessToken;if (redisService.hasKey(CozeConstants.JWT_TOKEN + userId)) {accessToken = JSONObject.parseObject(redisService.getCacheObject(CozeConstants.JWT_TOKEN + userId).toString(), OAuthToken.class);} else {accessToken = oauth.getAccessToken(userId.toString());redisService.setCacheObject(CozeConstants.JWT_TOKEN + userId, accessToken, 14L, TimeUnit.MINUTES);}return accessToken;}public CozeConfig getCozeConfig() {return cozeConfig;}@PostConstructpublic void init() {this.oauth = createJWTOAuthClient();}public JWTOAuthClient getJWTOAuthClient() {return oauth;}/*** 初始化CozeJWTOAuth** @return*/public CozeAPI createCozeAPIByUser(String accessToken) {return new CozeAPI.Builder().auth(new TokenAuth(accessToken)).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();}public JWTOAuthClient createJWTOAuthClient() {try {//读取coze_private_key_pemString jwtOAuthPrivateKey = new String(Files.readAllBytes(Paths.get(cozeConfig.getPrivateKeyFilePath())), StandardCharsets.UTF_8);oauth = new JWTOAuthClient.JWTOAuthBuilder().clientID(cozeConfig.getClientId()).privateKey(jwtOAuthPrivateKey).publicKey(cozeConfig.getPublicKeyId()).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();} catch (Exception e) {log.error("初始化CozeJWTOAuth失败", e);return null;}return oauth;}
}
SSE服务类 
/*** SSE服务类** @Author: Tenk*/
@Service
public class SseServiceImpl implements SseService {private static final Logger log = LoggerFactory.getLogger(SseServiceImpl.class);/*** k:扣子会话id  v:SseEmitter* 这里一定是使用ConcurrentHashMap,因为他是多线程安全的。*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** k:会话id  v:userId* 这里一定是使用ConcurrentHashMap,因为他是多线程安全的。*/private static Map<String, Long> sseUserMap = new ConcurrentHashMap<>();@Overridepublic SseEmitter connect(String conversationId, Long userId) {SseEmitter sseEmitter;// 判断是否已经存在if (sseEmitterMap.containsKey(conversationId)) {sseEmitter = sseEmitterMap.get(conversationId);} else {// 最多6小时断开连接sseEmitter = new SseEmitter(6 * 60 * 60 * 1000L);}// 连接断开sseEmitter.onCompletion(() -> {disconnect("连接断开", conversationId);});// 连接超时sseEmitter.onTimeout(() -> {disconnect("连接超时", conversationId);});// 连接报错sseEmitter.onError((throwable) -> {disconnect("连接报错", conversationId);});sseEmitterMap.put(conversationId, sseEmitter);sseUserMap.put(conversationId, userId);return sseEmitter;}private static void disconnect(String action, String conversationId) {Long value = sseUserMap.get(conversationId);log.info("sse{},用户userId:{}", action, value);sseEmitterMap.remove(conversationId);sseUserMap.remove(conversationId);}
}
AI 接口 Controller 
/*** AI 接口 Controller* @Author: Tenk*/
@RestController
@RequestMapping("/ai/chats")
public class CozeController {@Autowiredprivate CozeService cozeService;@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;@Autowiredprivate SseService sseService;/*** 向AI发起流式对话请求** @param conversationId 会话ID* @param content        对话内容* @return 对话流*/@GetMapping(value = "/send", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter sendFlowMessage(@RequestParam String conversationId,@RequestParam String content) throws IOException {if (StringUtils.isEmpty(conversationId)) {throw new ServiceException("会话信息缺失", 500);}ChatBo bo = new ChatBo();bo.setUserId(userId);bo.setConversationId(conversationId);bo.setContent(content);SseEmitter emitter = sseService.connect(conversationId, userId);try{cozeService.sendFlowMessage(bo, emitter);}catch (Exception e){e.printStackTrace();// 捕获并发送 SSE 格式的错误emitter.send("{\"status\":\"fail\",\"data\":\""+e.getMessage()+"\"}");emitter.completeWithError(e);}return emitter;}
}
方法实现 Service 
/*** 方法具体实现 Service* @Author: Tenk*/
@Service
public class CozeServiceImpl implements CozeService {// @Autowired Mapper Service ……@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;// 流式消息状态private static final String inProgress = "in-progress"; // 进行中private static final String done = "done";              // 完成/*** 构建 SSE 返回格式** @param status 响应状态(in-progress / done)* @param data   数据内容,可以是 textBuffer 或最终 data 对象* @return 构造好的 JSON 对象*/private JSONObject buildSseResult(String status, Object data) {JSONObject result = new JSONObject();result.put("status", status);result.put("data", data);return result;}@Override@Transactionalpublic void sendFlowMessage(ChatBo bo, SseEmitter emitter) {// 1. 初始化 Coze API 客户端CozeAPI cozeAPI = cozeJWTOAuthUtil.createCozeAPIByUser(cozeJWTOAuthUtil.getAccessToken(bo.getUserId()).getAccessToken());// 2. 构造用户发送的消息CreateMessageReq msgReq = CreateMessageReq.builder().conversationID(bo.getConversationId()).role(MessageRole.USER).content(bo.getContent()).contentType(MessageContentType.TEXT).build();// 整理用户消息,插入消息历史数据表Message userMsg = cozeAPI.conversations().messages().create(msgReq).getMessage();CozeMsgLog userMsgLog = new CozeMsgLog(bo.getUserId(),bo.getConversationId(),userMsg.getBotId(),userMsg.getChatId(),userMsg.getId(),null,bo.getContent(),userMsg.getContentType().getValue(),userMsg.getMetaData().toString(),userMsg.getReasoningContent(),userMsg.getRole().getValue(),userMsg.getSectionId(),MessageType.QUESTION.getValue(),new Date(userMsg.getCreatedAt() * 1000),new Date(userMsg.getUpdatedAt() * 1000));cozeMsgLogService.insertCozeMsgLog(userMsgLog);// 4. 打开 Coze 流式对话Flowable<ChatEvent> chatStream = cozeAPI.chat().stream(CreateChatReq.builder().botID(cozeJWTOAuthUtil.getCozeConfig().getTripBotId()).stream(true).autoSaveHistory(true).conversationID(bo.getConversationId()).userID(bo.getUserId().toString()).messages(Collections.singletonList(userMsg)).build());// 5. 发送初始提示信息try {JSONObject delayJson = new JSONObject();delayJson.put("type", "delay");delayJson.put("delayReason", "开始思考……");emitter.send(buildSseResult(inProgress, delayJson));} catch (IOException e) {log.error("规划行程开始错误", e);throw new ServiceException("规划行程开始错误");}StringBuffer fullContent = new StringBuffer();// 完整 AI 回复文本,包含一些不想给前端的特殊符号List<JSONObject> textBuffer = new ArrayList<>();     // 缓冲 SSE 数据int bufferThreshold = 3;                             // 缓冲阈值,当缓冲列表长度超过此值时,发送给前端// 7. 订阅流式对话事件chatStream.timeout(10, TimeUnit.MINUTES).observeOn(Schedulers.io()).subscribe(event -> {// 增量消息(例如:['H','e','l','l','o',' ','W','o','r','l',……])if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {// 提取增量消息String delta = event.getMessage().getContent();// 逐步拼接成完整消息fullContent.append(delta);// 清洗输出给前端的文本:去除 <、>、[、] 特殊符号,发送给前端,视情况而定,非必须String cleanText= delta.replaceAll("[<>\\[\\]]", "");// TODO 自定义业务逻辑// 实际发送if (!cleanText.isEmpty()) {JSONObject textJson = new JSONObject();textJson.put("type", "text");textJson.put("text", cleanText);synchronized (textBuffer) {// 添加到缓冲列表textBuffer.add(textJson);// 发送缓冲列表if (textBuffer.size() >= bufferThreshold) {/** 示例* {*     "data": [*         {*             "text": "hello world\n",*             "type": "text"*         },*         {*             "text": "### title three\n",*             "type": "text"*         },*         {*             "text": "#### title four\n",*             "type": "text"*         }*     ],*     "status": "in-progress"* }*/emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}}// TODO 自定义业务逻辑}// AI处理、回复完成// event:conversation.message.completed会有两次,&&后的条件是取其中一次,详见 https://www.coze.cn/open/docs/developer_guides/chat_v3#70a1d1bdif (ChatEventType.CONVERSATION_MESSAGE_COMPLETED.equals(event.getEvent()) && MessageType.ANSWER.getValue().equals(event.getMessage().getType().getValue())) {// === 最后一批textBuffer没发的统一发出 ===synchronized (textBuffer) {if (!textBuffer.isEmpty()) {emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}// === 构造最终完成的数据包 ===JSONObject finalData = new JSONObject();finalData.put("xxx", "自定义数据");finalData.put("yyy", "自定义内容");finalData.put("botMessage", fullContent.toString());// 发送状态为 done 的 SSEemitter.send(buildSseResult(done, finalData));// AI回复的内容,插入消息历史数据表Message message = event.getMessage();CozeMsgLog aiMsgLog = new CozeMsgLog(bo.getUserId(),message.getConversationId(),message.getBotId(),message.getChatId(),message.getId(),message.getContent(),finalData.toString(),message.getContentType().getValue(),message.getMetaData() == null ? null : message.getMetaData().toString(),message.getReasoningContent(),message.getRole().getValue(),message.getSectionId(),message.getType().getValue(),new Date(message.getCreatedAt() * 1000),new Date());cozeMsgLogService.insertCozeMsgLog(aiMsgLog);}},error -> {log.error("AI对话异常:{}", error.getMessage(), error);emitter.send(buildSseResult(done, "AI思考失败"));emitter.completeWithError(error);cozeAPI.shutdownExecutor();},() -> {// 释放资源emitter.complete();cozeAPI.shutdownExecutor();});}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/82172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《AVL树完全解析:平衡之道与C++实现》

目录 AVL树的核心概念数据结构与节点定义插入操作与平衡因子更新旋转操作&#xff1a;从理论到代码双旋场景深度剖析平衡检测与测试策略性能分析与工程实践总结 0.前置知识&#xff1a;BS树 代码实现部分对和BS树相似的部分会省略。 1. AVL树的核心概念 1.1 平衡二叉搜索树…

跨平台游戏引擎 Axmol-2.6.0 发布

Axmol 2.6.0 版本是一个以错误修复和功能改进为主的次要LTS长期支持版本 &#x1f64f;感谢所有贡献者及财务赞助者&#xff1a;scorewarrior、peterkharitonov、duong、thienphuoc、bingsoo、asnagni、paulocoutinhox、DelinWorks 相对于2.5.0版本的重要变更&#xff1a; 通…

【Django Serializer】一篇文章详解 Django 序列化器

第一章 Django 序列化器概述 1.1 序列化器的定义 1.1.1 序列化与反序列化的概念 1. 序列化 想象你有一个装满各种物品&#xff08;数据对象&#xff09;的大箱子&#xff08;数据库&#xff09;&#xff0c;但是你要把这些物品通过一个狭窄的管道&#xff08;网络&#xff…

关于spring @Bean里调用其他产生bean的方法

背景 常常见到如下代码 Bean public TestBean testBean() {TestBean t new TestBean();System.out.println("testBean:" t);return t; }Bean public FooBean fooBean() {TestBean t testBean();System.out.println("这里看似是自己new的&#xff0c;但因为…

Level1.7列表

1.7_1列表&#xff08;索引切片&#xff09; #1.列表 students[Bob,Alice,Jim,Mike,Judy] print(students)#2.在列表&#xff08;添加不同数据类型&#xff0c;查看列表是否可以运行&#xff1f;是否为列表类型&#xff1f;&#xff09; students[Bob,Alice,Jim,Mike,Judy,123…

Python爬虫实战:研究Cola框架相关技术

一、Cola 框架概述 Cola 是一款基于 Python 的异步爬虫框架,专为高效抓取和处理大规模数据设计。它结合了 Scrapy 的强大功能和 asyncio 的异步性能优势,特别适合需要高并发处理的爬虫任务。 1.1 核心特性 异步 IO 支持:基于 asyncio 实现非阻塞 IO,大幅提高并发性能模块…

vue2中el-table 实现前端分页

一些接口不分页的数据列表&#xff0c;一次性返回大量数据会导致前端渲染卡顿&#xff0c;接口不做分页的情况下前端可以截取数据来做分页 以下是一个例子&#xff0c;被截取的列表和全量数据在同一个栈内存空间&#xff0c;所以如果有表格内的表单编辑&#xff0c;新的值也会事…

Python + moviepy:根据图片或数据高效生成视频全流程详解

前言 在数据可视化、自媒体内容生产、学术汇报等领域,我们常常需要将一组图片或一段变动的数据,自动合成为视频文件。这样不仅能提升内容表现力,也极大节省了人工操作时间。Python作为数据处理和自动化领域的王者,其`moviepy`库为我们提供了灵活高效的视频生成方案。本文将…

科技赋能,开启现代健康养生新潮流

在科技与生活深度融合的当下&#xff0c;健康养生也迎来了全新的打开方式。无需传统医学的介入&#xff0c;借助现代科学与智能设备&#xff0c;我们能以更高效、精准的方式守护健康。​ 饮食管理步入精准化时代。利用手机上的营养计算 APP&#xff0c;录入每日饮食&#xff0…

Ubuntu24.04 LTS安装java8、mysql8.0

在 Ubuntu 24.04 上安装 OpenJDK OpenJDK 包在 Ubuntu 24.04 的默认存储库中随时可用。 打开终端并运行以下 apt 命令: sudo apt update查看是否已经安装java java --version如果未安装会有提示&#xff0c;直接复制命令安装即可&#xff0c;默认版本&#xff1a; sudo apt in…

深度学习框架显存泄漏诊断手册(基于PyTorch的Memory Snapshot对比分析方法)

点击 “AladdinEdu&#xff0c;同学们用得起的【H卡】算力平台”&#xff0c;H卡级别算力&#xff0c;按量计费&#xff0c;灵活弹性&#xff0c;顶级配置&#xff0c;学生专属优惠。 一、显存泄漏&#xff1a;深度学习开发者的"隐形杀手" 在深度学习模型的训练与推…

Pytorch分布式训练,数据并行,单机多卡,多机多卡

分布式训练 所有代码可以见我github 仓库&#xff1a;https://github.com/xiejialong/ddp_learning.git 数据并行&#xff08;Data Parallelism&#xff0c;DP&#xff09; 跨多个gpu训练模型的最简单方法是使用 torch.nn.DataParallel. 在这种方法中&#xff0c;模型被复制…

【论文阅读】——D^3-Human: Dynamic Disentangled Digital Human from Monocular Vi

文章目录 摘要1 引言2 相关工作3 方法3.1 HmSDF 表示3.2 区域聚合3.3. 变形场3.4. 遮挡感知可微分渲染3.5 训练3.5.1 训练策略3.5.2 重建损失3.5.3 正则化限制 4. 实验4.1 定量评估4.2 定性评价4.3 消融研究4.4 应用程序 5 结论 摘要 我们介绍 D 3 D^{3} D3人&#xff0c;一种…

docker commit除了提交容器成镜像,还能搞什么之修改cmd命令

要让新镜像默认启动时执行 /usr/sbin/sshd -D&#xff0c;需在提交镜像时 ​​显式指定新的启动命令​​。 方法一&#xff1a;提交时通过 --change 覆盖 CMD docker commit --changeCMD ["/usr/sbin/sshd", "-D"] v2 project:v2 方法二&#xff1a;重…

为什么我输入对了密码,还是不能用 su 切换到 root?

“为什么我输入对了密码&#xff0c;还是不能用 su 切换到 root&#xff1f;” 其实这背后可能不是“密码错了”&#xff0c;而是系统不允许你用 su 切 root&#xff0c;即使密码对了。 &#x1f447; 以下是最常见的几个真正原因&#xff1a; ❌ 1. Root 用户没有设置密码&…

转移dp简单数学数论

1.转移dp问题 昨天的练习赛上有一个很好玩的起终点问题&#xff0c;第一时间给出bfs的写法。 但是写到后面发现不行&#xff0c;还得是的dp转移的写法才能完美的解决这道题目。 每个格子可以经过可以不经过&#xff0c;因此它的状态空间是2^&#xff08;n*m&#xff09;&…

IP查询基础介绍

IP 查询原理 IP 地址是网络设备唯一标识&#xff0c;IP 查询通过解析 IP 地址获取地理位置、运营商等信息。目前主流的 IPv4&#xff08;32 位&#xff09;与 IPv6&#xff08;128 位&#xff09;协议&#xff0c;前者理论提供约 43 亿地址&#xff0c;后者地址空间近乎无限。…

Linux命令简介

1 Linux系统的命令概述 在 Linux 操作系统中&#xff0c;凡是在字符操作界面中输入能够完成特定操作和任务的字符串都可以称为命令。严格来说&#xff0c;命令通常只代表实现某一类功能的指令或程序的名称。 1.1 Shell Linux 命令的执行必须依赖于 Shell 命令解释器。Shell …

WebRTC与RTSP|RTMP的技术对比:低延迟与稳定性如何决定音视频直播的未来

引言 音视频直播技术已经深刻影响了我们的生活方式&#xff0c;尤其是在教育、医疗、安防、娱乐等行业中&#xff0c;音视频技术成为了行业发展的重要推动力。近年来&#xff0c;WebRTC作为一种开源的实时通信技术&#xff0c;成为了音视频领域的重要选择&#xff0c;它使得浏览…

多通道振弦式数据采集仪MCU安装指南

设备介绍 数据采集仪 MCU集传统数据采集器与5G/4G,LoRa/RS485两种通信功能与一体的智能数据采集仪。该产品提供振弦、RS-485等的物理接口&#xff0c;能自动采集并存储多种自然资源、建筑、桥梁、城市管廊、大坝、隧道、水利、气象传感器的实时数据&#xff0c;利用现场采集的数…