流式数据处理实战:用状态机 + scan 优雅过滤 AI 响应中的 `<think>` 标签

流式数据处理实战:用状态机 + scan 优雅过滤 AI 响应中的 <think> 标签

1. 引言:流式数据处理的挑战

在现代 AI 应用开发中,流式 API(如 OpenAI、Claude 等)能实时返回分块数据,提升用户体验。但流式数据存在一个常见问题:如何跨多个分块处理结构化内容?

例如,某些 AI 会在响应中返回 <think>...</think> 标签(内部是 AI 的推理过程,不应展示给用户)。由于流式数据是分块返回的,可能出现:

  • <think></think> 被拆到不同分块
  • 标签不完整(如只收到 <think> 但未收到 </think>

传统的字符串替换(如 replaceAll)无法处理这种情况。本文将介绍如何用 响应式编程(Reactive Programming)scan 操作符 + 状态机(State Machine) 解决该问题。


2. 核心思路:状态机 + scan

2.1 什么是状态机?

状态机(State Machine)是一种基于状态转移的编程模型,适用于处理有状态的数据流。在我们的场景中:

  • 状态insideThink(是否在 <think> 标签内)
  • 事件:遇到 <think></think>

2.2 什么是 scan?

scan 是响应式编程(如 Reactor、RxJava)中的一个操作符,类似于 reduce,但会在每次元素到达时立即发出累积结果

  • 输入Flux<T>(数据流)
  • 输出Flux<State>(带状态的流)

3. 代码实现

3.1 定义状态机

public class ThinkTagState {private boolean insideThink = false;  // 是否在 <think> 标签内private final StringBuilder buffer = new StringBuilder(); // 有效文本缓冲区public ThinkTagState process(String chunk) {StringBuilder output = new StringBuilder();String remaining = chunk;while (!remaining.isEmpty()) {if (!insideThink) {// 查找 <think> 标签int startIdx = remaining.indexOf("<think>");if (startIdx == -1) {output.append(remaining); // 无标签,直接保留break;}// 保留 <think> 之前的内容output.append(remaining.substring(0, startIdx));insideThink = true;remaining = remaining.substring(startIdx + "<think>".length());} else {// 查找 </think> 标签int endIdx = remaining.indexOf("</think>");if (endIdx == -1) {break; // 未闭合,等待后续分块}// 跳过 </think> 之后的内容insideThink = false;remaining = remaining.substring(endIdx + "</think>".length());}}buffer.append(output);return this;}public String getFilteredText() {return buffer.toString();}public boolean isInsideThink() {return insideThink;}
}

3.2 流式处理 Pipeline

Flux<ChatResponseDTO> filteredStream = aiCallService.stream(new Prompt(messages)).scan(new ThinkTagState(), (state, response) -> {String chunk = response.getResult().getOutput().getText();return state.process(chunk); // 更新状态}).filter(state -> !state.getFilteredText().isEmpty()) // 跳过空内容.map(state -> {ChatResponseDTO dto = new ChatResponseDTO();dto.setAnswer(state.getFilteredText());state.buffer.setLength(0); // 清空缓冲区return dto;});

4. 关键点解析

4.1 状态机如何工作?

  • 初始状态insideThink = false
  • 遇到 <think>insideThink = true(跳过后续内容)
  • 遇到 </think>insideThink = false(恢复正常处理)
  • 跨分块处理:如果只有 <think> 没有 </think>,会等待后续分块

4.2 为什么用 scan 而不是 map

  • map 无记忆能力,无法处理跨分块的状态
  • scan 维护状态,适合流式数据的渐进式处理

4.3 性能优化

  • 缓冲区清空:每次 map 后调用 buffer.setLength(0) 避免内存增长
  • 正则替代方案:如果确定标签不跨分块,可用简单 replaceAll

5. 扩展场景

5.1 处理多个标签(如 <reasoning>

enum TagType { THINK, REASONING, NONE }public class MultiTagState {private TagType currentTag = TagType.NONE;// 类似逻辑,但需处理多种标签
}

5.2 结合元数据(Metadata)

// 使用 Tuple2 保留原始响应
.scan(Tuples.of(new ThinkTagState(), null), (tuple, response) -> {ThinkTagState state = tuple.getT1().process(response.getText());return Tuples.of(state, response); // 保留原始响应
})
.map(tuple -> {ChatResponseMetadata metadata = tuple.getT2().getMetadata();// ...
});

6. 总结

方案优点缺点
状态机 + scan精准处理跨分块标签代码稍复杂
简单 replaceAll实现简单无法处理跨分块

推荐选择

  • 如果 AI 响应可能跨分块 → 状态机 + scan
  • 如果标签保证完整 → 正则替换

7. 进一步阅读

  • Reactor 官方文档 - scan
  • 状态机设计模式

希望这篇博文能帮助你优雅处理流式数据!如果有问题,欢迎留言讨论。 🚀

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/89286.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/89286.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【实时Linux实战系列】硬件中断与实时性

在实时系统中&#xff0c;硬件中断是系统响应外部事件的关键机制之一。硬件中断允许系统在执行任务时被外部事件打断&#xff0c;从而快速响应这些事件。然而&#xff0c;中断处理不当可能会导致系统延迟增加&#xff0c;影响系统的实时性。因此&#xff0c;优化中断处理对于提…

基于DTLC-AEC与DTLN的轻量级实时语音降噪系统设计与实现

基于DTLC-AEC与DTLN的轻量级实时语音降噪系统设计与实现 1. 引言 在当今的实时通信应用中,语音质量是影响用户体验的关键因素之一。环境噪声和回声会严重降低语音清晰度,特别是在移动设备和嵌入式系统上。本文将详细介绍如何将两种先进的开源模型——DTLC-AEC(深度学习回声…

基于Hadoop与LightFM的美妆推荐系统设计与实现

文章目录有需要本项目的代码或文档以及全部资源&#xff0c;或者部署调试可以私信博主项目介绍总结每文一语有需要本项目的代码或文档以及全部资源&#xff0c;或者部署调试可以私信博主 项目介绍 本项目旨在基于大数据Hadoop平台和机器学习技术&#xff0c;构建一套面向美妆…

notepad++ 多行复制拼接

如何将中文一 一复制到英文后面按住 ALT ,鼠标左键拖动多行选中中文Ctrl C 复制 在英文的第一行结尾处 Ctrl v 粘贴

【前沿技术动态】【AI总结】Spring Boot 4.0 预览版深度解析:云原生时代的新里程碑

Spring Boot 4.0 预览版深度解析&#xff1a;云原生时代的新里程碑 最低 Java 17&#xff0c;原生支持虚拟线程&#xff0c;性能提升最高800%&#xff0c;Spring Boot 4.0 带来开发体验与运行时性能的全面飞跃 Spring Boot 4.0 的预览版在2025年5月底悄然上线&#xff0c;标志着…

OkHttp 框架封装一个 HTTP 客户端,用于调用外部服务接口

✅ 背景与需求 需要基于 OkHttp 框架封装一个 HTTP 客户端&#xff0c;用于调用外部服务接口&#xff08;如拼团回调&#xff09;&#xff0c;实现以下功能&#xff1a; 动态传入请求地址&#xff08;URL&#xff09;支持 JSON 请求体实现类放在 infrastructure 层的 gateway…

使用Collections.max比较Map<String, Integer>中的最大值

文章目录使用Collections.max比较Map<String, Integer>中的最大值基本方法1. 比较Map的值2. 比较Map的键自定义比较器1. 按值降序排列2. 复杂比较逻辑完整示例代码性能考虑替代方案1. 使用Stream API (Java 8)2. 手动遍历实际应用场景注意事项总结使用Collections.max比较…

鸿蒙状态栏操作

1.鸿蒙设备基础信息 1.1图解 1.1窗口内容规避区域 AvoidArea7 窗口内容规避区域。 窗口内容规避区域。如系统栏区域、刘海屏区域、手势区域、软键盘区域等与窗口内容重叠时&#xff0c;需要窗口内容避让的区域。在规避区无法响应用户点击事件。 除此之外还需注意规避区域的如…

Product Hunt 每日热榜 | 2025-07-17

1. Brain MAX by ClickUp 标语&#xff1a;一款AI应用统治一切&#xff1a;你的知识 语音转文字 介绍&#xff1a;Brain MAX 是 ClickUp 完全原生的桌面应用&#xff0c;旨在提升生产力&#xff0c;帮助你摆脱 AI 的杂乱无章。只需每月 9 美元&#xff0c;就可以使用所有的 …

如何使用VScode使用ssh连接远程服务器不需要输入密码直接登录

ssh-keygen 之后一直默认 回车 确认即可结果 (base) amaxamax:/data/std$ ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/home/amax/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again:…

vue实现el-table-column中自定义label

vue实现el-table-column中自定义label<el-table-columnlabel"操作"align"left"width"50"><template #header><div><el-buttonsize"mini"type"primary"icon"el-icon-plus"circle></el-…

Vue 常用的 ESLint 规则集

对Vue项目来说&#xff0c;Vue 官方通过 eslint-plugin-vue 提供了多个规则集&#xff08;Rule Sets&#xff09;&#xff0c;适用于不同严格度和 Vue 版本。以下是主要的规则集及其对应的 ESLint 插件和用途&#xff1a; 1. Vue 2.x 规则集 适用于 Vue 2 项目&#xff0c;规则…

AbMole小课堂 | Angiotensin II(血管紧张素Ⅱ)在心血管研究中的多元应用

Angiotensin II&#xff08;血管紧张素Ⅱ&#xff0c;AbMole&#xff0c;M6240&#xff09;是一种血管收缩剂&#xff0c;也是肾素-血管紧张素系统 (RAS) 的主要效应肽。Angiotensin II参与动物的血压调节、水电解质平衡等经典生理过程在科研中Angiotensin II被广泛用于动物心血…

【Unity】Mono相关理论知识学习

一种编译技术。优点&#xff1a;支持JIT编译&#xff1a;在运行时将IL编译成机器码。首次执行稍慢&#xff0c;好处在于运行更快&#xff0c;迭代更高效。构建速度快&#xff1a;无需将IL转成C&#xff0c;构建过程省去了IL2CPP的转换和原生编译步骤&#xff0c;适合开发阶段快…

React源码4 三大核心模块之一:Schedule,scheduleUpdateOnFiber函数

scheduler工作阶段在React内部被称为schedule阶段。在《React源码3》&#xff0c;我们已经将update加入队列并返回到了根容器节点root。function updateContainer(element, container, parentComponent, callback) {//前面略过var root enqueueUpdate(current$1, update, lane…

Unity3D + VS2022连接雷电模拟器调试

本文参考了Unity3D Profiler 连接真机和模拟器_unity 连接雷电模拟器-CSDN博客 具体步骤&#xff1a; 1、cmd打开命令窗口&#xff0c;输入adb devices&#xff0c;确认能检测到模拟器 示例&#xff1a;List of devices attached emulator-5554 device 2、…

学习软件测试的第十五天

1.会写测试用例吗&#xff1f;测试用例有什么要素“会的&#xff0c;我写过多个功能测试和接口测试的测试用例。我写用例的时候会根据需求文档或原型图分析测试点&#xff0c;然后从正常流程、异常流程、边界情况等方面设计测试场景。每条用例我都会包含&#xff1a;用例编号、…

C++硬实时调度:原理、实践与最佳方案

在工业自动化、航空航天、医疗设备等领域&#xff0c;系统的实时性往往直接关系到生命安全和财产损失。C作为高性能编程语言&#xff0c;为硬实时系统开发提供了强大支持。本文将深入探讨C硬实时调度的核心技术&#xff0c;从操作系统原理到代码实现的全方位解析。 一、实时系统…

LeetCode 1156.单字符重复子串的最大长度

如果字符串中的所有字符都相同&#xff0c;那么这个字符串是单字符重复的字符串。 给你一个字符串 text&#xff0c;你只能交换其中两个字符一次或者什么都不做&#xff0c;然后得到一些单字符重复的子串。返回其中最长的子串的长度。 示例 1&#xff1a; 输入&#xff1a;text…

K近邻算法的分类与回归应用场景

K近邻算法的分类与回归应用场景 K近邻&#xff08;K-Nearest Neighbors, KNN&#xff09;算法是一种基础但强大的机器学习方法&#xff0c;它既可以用于分类问题&#xff0c;也能解决回归问题。 两者的核心思想都是基于"近朱者赤&#xff0c;近墨者黑"的原理&#xf…