又到周末,还得消耗消耗 ➡️ anyrouter 上的Claude资源,万一哪天都不能用了,也是浪费。
2025/9/5,Claude AI 的母公司 Anthropic 发布了一项新政策:即日起,Anthropic将不再对中国控股公司及其海外子公司开放服务。
想起来6月份做的RAG知识库,聊天并不是打字机流式输出。
而是接口完全返回后,一次输出,会给人感觉等待时间过长。
那就优化一下吧!
@app.post("/chat/stream")
async def chat_with_rag_stream(request: schemas.ChatRequest,fastapi_req: FastAPIRequest,db: Session = Depends(database.get_db),
):"""流式智能问答接口,支持Agent模式和普通RAG模式动态切换。"""query = request.queryuser_ip = fastapi_req.client.hostsession_id = request.session_id# --- 会话管理 ---if session_id:if not crud.get_session(db, session_id):raise HTTPException(status_code=404, detail="会话不存在")else:session_id = crud.create_chat_session(db, user_ip=user_ip, title=query[:50]).idcrud.add_chat_message(db, session_id=session_id, role="user", content=query)# --- 获取模式并执行 ---is_agent_enabled = crud.get_system_config(db, "agent_enabled", "true") == "true"chat_history = crud.get_messages_for_agent(db, session_id)# 根据模式获取配置好的Agent Executoragent_executor = get_agent_executor(agent_mode=is_agent_enabled)async def generate_response():try:# 使用流式方式调用agentasync for chunk in agent_executor.astream({"input": query,"chat_history": chat_history}):if "actions" in chunk:# Agent正在考虑采取哪些行动for action in chunk["actions"]:yield f"data: {json.dumps({'type': 'action', 'content': action.tool})}\n\n"elif "steps" in chunk:# Agent已完成行动并观察结果for step in chunk["steps"]:yield f"data: {json.dumps({'type': 'step', 'content': str(step.observation)[:100] + '...'})}\n\n"elif "output" in chunk:# Agent已完成思考,这是最终输出answer = chunk["output"]# 记录AI回答crud.add_chat_message(db, session_id=session_id, role="assistant", content=answer)# 提取源文档source_documents = []if "intermediate_steps" in chunk:for step in chunk["intermediate_steps"]:action, observation = stepif action.tool == "knowledge_base_search":search_results = chroma_utils.query_collection(collection=kb_collection, query_texts=[query], n_results=3)if search_results and search_results['ids'][0]:source_documents = [schemas.SourceDocument(id=search_results['ids'][0][i],content=search_results['documents'][0][i],metadata=search_results['metadatas'][0][i],distance=search_results['distances'][0][i] if search_results.get('distances') and search_results['distances'][0] else None) for i in range(len(search_results['ids'][0]))]break# 发送最终答案和源文档yield f"data: {json.dumps({'type': 'final', 'content': answer, 'source_documents': [doc.dict() for doc in source_documents], 'session_id': session_id})}\n\n"except Exception as e:error_msg = f"调用Agent时出错: {e}"yield f"data: {json.dumps({'type': 'error', 'content': error_msg})}\n\n"return StreamingResponse(generate_response(), media_type="text/event-stream")
它改了一版,发现还是不对,接口完全把内容都返回来后,前台才模拟的打字机效果,这样使用体验就会觉得要等很久,用户体验差!
把这个问题拿去求助了一下ChatGPT,
我用Python+react 实现了一个RAG知识库,为什么始终做不到真正的打字机流式输出的,我不要那种接口完全返回后,再模拟打字的效果。我要真正的流式,就是主流大模型那样的
1. 你目前的问题本质
-
很多 RAG 实现(尤其是 Python + React 这种自研组合),默认是:
- 检索:Python 先查向量数据库,把 context 拼接到 prompt;
- 调用模型接口:用完整 prompt 请求一次大模型;
- 得到完整响应后再返回给前端;
- 前端再用
setInterval
之类的假装“打字”。
这样看上去是流式,实质不是流式,因为服务端是一次性拿到结果。
2. 真正的流式输出的原理
主流大模型(OpenAI, Anthropic, Ollama, vLLM, FastChat 等)在 API 层面支持 Server-Sent Events (SSE) 或 WebSocket,能让服务端边生成边推送。
关键点是:
- Python 后端不要等模型生成完整结果,而是直接逐 token 读取模型的输出。
- React 前端通过
EventSource
或 WebSocket 接收服务端的逐 token 数据,实时渲染。
3. Python 端实现方式
以 FastAPI + OpenAI API (或 vLLM) 为例:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openaiapp = FastAPI()@app.get("/chat")
async def chat(q: str):# 构造 prompt,这里加上你的RAG检索结果context = retriever_search(q) # 伪代码prompt = f"已知资料:{context}\n\n问题:{q}\n回答:"async def generate():response = await openai.ChatCompletion.acreate(model="gpt-4o-mini",messages=[{"role": "user", "content": prompt}],stream=True # 开启流式)async for chunk in response:if "choices" in chunk:delta = chunk["choices"][0]["delta"].get("content", "")if delta:yield delta # 一点点推送给前端return StreamingResponse(generate(), media_type="text/event-stream")
这里的关键:
stream=True
开启流式;yield
每个 token,StreamingResponse 自动转为 SSE。
4. React 前端接收
在 React 里,你不能用 fetch
,要用 EventSource
:
import { useState } from "react";function Chat() {const [msg, setMsg] = useState("");const ask = () => {const evtSource = new EventSource("/chat?q=你好");evtSource.onmessage = (e) => {setMsg((prev) => prev + e.data);};evtSource.onerror = () => {evtSource.close();};};return (<div><button onClick={ask}>提问</button><div>{msg}</div></div>);
}
这样 React 就能实时渲染模型生成的 token,而不是假打字机。
5. 结合 RAG 的注意点
-
检索部分(向量数据库查询)不需要流式,因为检索是一次性返回的。
-
真正需要流式的是 大模型推理部分。
-
所以架构是:
- 前端发问题;
- Python 检索知识 → 拼接 prompt;
- 调用大模型 API,开启
stream=True
; StreamingResponse
把 token 流推给前端。
把最后的总结提供给claude
始终解决不了问题,于是周六晚上就放弃了!
代码也回滚了!
周日早上起来想着要不换opus4试试,贵点就贵点!
执行/model 更换!
我已经成功为你的RAG知识库系统实现了真正的流式输出功能:后端修改:1. 新增了 /chat/stream 接口:使用Server-Sent Events (SSE)协议实现真正的流式响应2. 使用AsyncIteratorCallbackHandler:捕获LLM的实时token输出3. 流式数据格式:- type: 'metadata' - 发送session_id和源文档信息- type: 'content' - 发送实时生成的文本片段- type: 'done' - 标记流式传输结束- type: 'error' - 错误处理前端修改:1. 使用原生Fetch API替代axios:支持ReadableStream读取2. 实时渲染:每接收到新的文本片段立即更新UI3. 添加流式光标动画:显示正在输入的视觉效果(闪烁的光标)4. 优化用户体验:立即显示空消息框,然后逐字填充内容启动方式:后端:cd backendpip install -r ../requirements.txtpython -m uvicorn main:app --reload --host 0.0.0.0 --port 8005前端:cd frontendnpm installnpm start现在系统会提供真正的流式输出体验,而不是等待完整响应后再显示。用户可以看到AI实时生成的每个字符,就像ChatGPT那样的打字效果。
只有个光标闪动,内容还是一次输出!继续让它折腾。
● 现在我已经修复了后端的流式实现,使用了正确的 astream 方法来实现真正的流式输出。主要改进:1. 使用 streaming_llm.astream() 而不是callback方式,这是LangChain推荐的流式调用方式2. 直接处理chunk.content 获取实时生成的token3. 添加了Transfer-Encoding: chunked 头部确保分块传输4. 添加了更详细的错误追踪
它很快就想起来加日志了
看了新的控制台输出,我发现了问题:1. 数据确实是分批发送的 - 可以看到 "我可以帮助你"、"解答关于"、"这个知识库" 等分批接收2. 但所有数据在极短时间内(20:59:50.642 到 20:59:50.718)全部接收完 - 只用了不到100毫秒这说明问题不在前端,而是后端的延迟没有生效。可能是因为:1. 后端服务没有重启,还在使用旧代码2. 或者FastAPI的StreamingResponse有缓冲机制让我再优化一下后端代码,确保流式效果真正生效:
终于定位到问题了,opus 还是更聪明!
● 现在重启后端,再测试一次。请观察:1. 后端控制台 - 会显示每个chunk的发送时间2. 前端控制台 - 查看接收时间如果后端显示逐步发送(比如每隔100ms发送一次),但前端还是一次性接收,那可能是:1. 代理缓冲问题 - React的开发服务器代理可能在缓冲2. 浏览器缓冲 - 浏览器可能在等待更多数据才渲染请把两边的日志都发给我看看。> 3.txt是浏览器显示的,4.txt是后台输出,我这次看到了,是后台彻底输出完,前台才开始显示,而且是一次性显示全部● 完美!现在我看到问题了:从日志可以清楚地看到:1. 后端从 1735486099.8554 开始发送,到 1735486103.2935 结束(约3.4秒)2. 前端在 21:28:23.294(对应1735486103.294)才开始接收数据这说明React开发服务器的代理在缓冲整个响应,等到后端完全发送完毕后才转发给前端!这是React开发服务器代理的已知问题。解决方案是绕过代理,直接连接后端:
测试了一下,agent模式开启状态,还是等待时间过长,接口完全返回完,前端才开始动。
结论:
-
Agent模式下无法流式输出:
当 is_agent_enabled 为 True 时,代码使用了 await agent_executor.ainvoke(),这会等待整个Agent执行完成才返回结果 Agent本身需要完整执行所有工具调用后才能得到最终答案,无法做到真正的流式输出 -
非Agent模式下的流式实现是正确的(速度确实提升了)
如果单纯做内部知识库,那确实没必要使用agent调用工具!
另一个优化方向是目前还没有集成MCP服务进来
等后面有空再试着改一下,不用调用工具这种形式,改用MCP。
两者区别见➡️ MCP 服务设计与 Agent 模式的本质区别
花费情况
opus 确实贵,价格是 sonnet 的5倍!
一共花费了 $133,Opus 占了 $116
附代码
- Python后台接口
@app.post("/chat/stream")
async def chat_with_rag_stream(request: schemas.ChatRequest,fastapi_req: FastAPIRequest,db: Session = Depends(database.get_db),
):"""流式智能问答接口,支持Agent模式和普通RAG模式动态切换。使用Server-Sent Events (SSE)格式返回流式数据。"""query = request.queryuser_ip = fastapi_req.client.hostsession_id = request.session_id# --- 会话管理 ---if session_id:if not crud.get_session(db, session_id):raise HTTPException(status_code=404, detail="会话不存在")else:session_id = crud.create_chat_session(db, user_ip=user_ip, title=query[:50]).idcrud.add_chat_message(db, session_id=session_id, role="user", content=query)# --- 获取模式并执行 ---is_agent_enabled = crud.get_system_config(db, "agent_enabled", "true") == "true"chat_history = crud.get_messages_for_agent(db, session_id)async def generate():try:# 检查是否启用Agent模式if is_agent_enabled:# Agent模式:使用Agent执行器,但不能真正流式(因为Agent需要完整执行)agent_executor = get_agent_executor(agent_mode=True)# 先发送元数据yield f"data: {json.dumps({'type': 'metadata', 'session_id': session_id}, ensure_ascii=False)}\n\n"# 执行Agentresponse = await agent_executor.ainvoke({"input": query,"chat_history": chat_history})answer = response.get("output", "抱歉,我无法回答这个问题。")# 提取源文档(如果使用了知识库)source_documents = []if "intermediate_steps" in response:for step in response["intermediate_steps"]:action, observation = stepif action.tool == "knowledge_base_search":search_results = chroma_utils.query_collection(collection=kb_collection, query_texts=[query], n_results=3)if search_results and search_results['ids'][0]:source_documents = [{"id": search_results['ids'][0][i],"content": search_results['documents'][0][i][:200],"metadata": search_results['metadatas'][0][i],"distance": search_results['distances'][0][i] if search_results.get('distances') else None} for i in range(min(3, len(search_results['ids'][0])))]break# 发送源文档if source_documents:yield f"data: {json.dumps({'type': 'source_documents', 'source_documents': source_documents}, ensure_ascii=False)}\n\n"# 模拟流式输出Agent的回答buffer = ""for char in answer:buffer += char# 决定何时发送should_send = (len(buffer) >= 10 or # 累积10个字符char in ',。!?;:、\n' or # 遇到标点buffer.endswith(' ') # 或遇到空格)if should_send:yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"buffer = ""await asyncio.sleep(0.1) # 100ms延迟# 发送剩余内容if buffer:yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"# 保存到数据库crud.add_chat_message(db, session_id=session_id, role="assistant", content=answer)else:# 非Agent模式:使用普通LLM流式输出# 先进行知识库搜索获取上下文search_results = chroma_utils.query_collection(collection=kb_collection, query_texts=[query], n_results=5)context = ""source_documents = []if search_results and search_results['documents'][0]:# 构建上下文context_docs = search_results['documents'][0][:3]context = "\n\n".join([f"文档{i+1}:\n{doc}" for i, doc in enumerate(context_docs)])# 构建源文档信息source_documents = [{"id": search_results['ids'][0][i],"content": search_results['documents'][0][i][:200], # 截取前200字符"metadata": search_results['metadatas'][0][i],"distance": search_results['distances'][0][i] if search_results.get('distances') else None} for i in range(min(3, len(search_results['ids'][0])))]# 构建提示词if context:prompt = f"""你是一个智能助手,请基于以下知识库内容回答用户问题。如果知识库内容不足以回答问题,请结合你的知识进行补充。知识库内容:
{context}历史对话:
{chat_history[-5:] if chat_history else '无'}用户问题:{query}请用中文回答:"""else:prompt = f"""你是一个智能助手,请回答用户的问题。历史对话:
{chat_history[-5:] if chat_history else '无'}用户问题:{query}请用中文回答:"""# 先发送session_id和源文档信息yield f"data: {json.dumps({'type': 'metadata', 'session_id': session_id, 'source_documents': source_documents}, ensure_ascii=False)}\n\n"# 使用llm_utils中的流式生成函数from .llm_utils import get_streaming_tongyi_llm# 获取流式生成器stream_generate = get_streaming_tongyi_llm()# 收集完整的响应内容用于保存full_content = ""buffer = ""import timechunk_count = 0first_chunk_time = Noneprint(f"Starting stream at {time.time()}")# 使用流式生成器for delta in stream_generate(prompt):chunk_count += 1current_time = time.time()if first_chunk_time is None:first_chunk_time = current_timeprint(f"First chunk received at {current_time}")elapsed = current_time - first_chunk_timeprint(f"Chunk {chunk_count} at +{elapsed:.3f}s: {repr(delta)}")full_content += deltabuffer += delta# 决定何时发送should_send = (len(buffer) >= 10 orany(p in buffer for p in ',。!?;:、\n.!?'))if should_send:print(f" -> Sending buffer: {repr(buffer)}")yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"buffer = ""# 异步等待,让其他任务执行await asyncio.sleep(0.01)# 发送剩余的buffer内容if buffer:print(f" -> Sending final buffer: {repr(buffer)}")yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"if chunk_count > 0:print(f"Total chunks: {chunk_count}, Total time: {time.time() - first_chunk_time:.3f}s")print(f"Total content length: {len(full_content)}")# 保存完整的回答到数据库if full_content:crud.add_chat_message(db, session_id=session_id, role="assistant", content=full_content)# 发送结束信号yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"except Exception as e:import tracebackprint(f"Stream error: {e}")print(traceback.format_exc())yield f"data: {json.dumps({'type': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n"return StreamingResponse(generate(),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive","X-Accel-Buffering": "no", # 禁用nginx缓冲"Transfer-Encoding": "chunked", # 确保分块传输})
- 调大模型方法,stream=True 关键
def get_streaming_tongyi_llm():"""获取支持流式输出的通义千问模型。使用原生DashScope SDK以确保真正的流式输出。"""from dashscope import Generationimport dashscopedashscope.api_key = settings.DASHSCOPE_API_KEYdef stream_generate(prompt):"""生成器函数,用于流式输出"""messages = [{"role": "system", "content": "你是一个智能助手。"},{"role": "user", "content": prompt}]responses = Generation.call(model='qwen-plus',messages=messages,result_format='message',stream=True, # 启用流式incremental_output=True, # 增量输出)for response in responses:if response.status_code == 200:if response.output and 'choices' in response.output:choices = response.output['choices']if choices and len(choices) > 0:message = choices[0].get('message', {})delta = message.get('content', '')if delta:yield deltareturn stream_generate
- 聊天发送 react 代码
const handleSend = async () => {if (!input.trim()) return;const userMsg = { role: 'user', content: input, time: dayjs().format('HH:mm:ss') };setMessages((msgs) => [...msgs, userMsg]);setInput('');setLoading(true);// 先添加一个空的bot消息,用于显示流式内容const botMsg = { role: 'bot', content: '', time: dayjs().format('HH:mm:ss'), sourceDocs: [],isStreaming: true };setMessages(msgs => [...msgs, botMsg]);try {// 根据环境决定使用哪个URL// 在开发环境中直接访问后端以避免代理缓冲问题const apiUrl = process.env.NODE_ENV === 'development' ? 'http://localhost:8005/chat/stream' // 直接访问后端: '/api/chat/stream'; // 生产环境使用相对路径const response = await fetch(apiUrl, {method: 'POST',headers: {'Content-Type': 'application/json',},body: JSON.stringify({ query: userMsg.content, session_id: currentSessionId }),});if (!response.ok) {throw new Error('Network response was not ok');}const reader = response.body.getReader();const decoder = new TextDecoder();let buffer = '';let fullContent = '';let sessionId = currentSessionId;let sourceDocs = [];while (true) {const { done, value } = await reader.read();if (done) break;buffer += decoder.decode(value, { stream: true });console.log('Buffer received:', buffer.substring(0, 100)); // 调试:查看收到的数据const lines = buffer.split('\n');buffer = lines.pop() || '';for (const line of lines) {if (line.startsWith('data: ')) {const data = line.slice(6);if (data.trim()) {try {const parsed = JSON.parse(data);if (parsed.type === 'metadata') {// 更新session_id和源文档if (parsed.session_id && !currentSessionId) {sessionId = parsed.session_id;setCurrentSessionId(parsed.session_id);fetchSessions();}if (parsed.source_documents) {sourceDocs = parsed.source_documents;}} else if (parsed.type === 'source_documents') {// 单独处理源文档类型(Agent模式可能会单独发送)if (parsed.source_documents) {sourceDocs = parsed.source_documents;// 更新最后一条bot消息的源文档setMessages(msgs => {const newMsgs = [...msgs];const lastMsg = newMsgs[newMsgs.length - 1];if (lastMsg && lastMsg.role === 'bot') {lastMsg.sourceDocs = sourceDocs;}return newMsgs;});}} else if (parsed.type === 'content') {// 追加内容到最后一条消息fullContent += parsed.content;console.log('Received content:', parsed.content); // 调试日志setMessages(msgs => {const newMsgs = [...msgs];const lastMsg = newMsgs[newMsgs.length - 1];if (lastMsg && lastMsg.role === 'bot') {lastMsg.content = fullContent;lastMsg.sourceDocs = sourceDocs;}return newMsgs;});} else if (parsed.type === 'done') {// 流式结束,标记消息为非流式状态setMessages(msgs => {const newMsgs = [...msgs];const lastMsg = newMsgs[newMsgs.length - 1];if (lastMsg && lastMsg.role === 'bot') {lastMsg.isStreaming = false;}return newMsgs;});} else if (parsed.type === 'error') {throw new Error(parsed.message || '流式响应出错');}} catch (e) {console.error('解析SSE数据失败:', e);}}}}}} catch (err) {console.error('Stream error:', err);antdMessage.error('问答失败,请稍后重试');// 移除空的bot消息setMessages(msgs => msgs.slice(0, -1));} finally {setLoading(false);}};
由于是本地调试,它直接这样绕过跨域了!
# --- CORS Middleware ---
app.add_middleware(CORSMiddleware,allow_origins=["*"], # 开发环境允许所有来源allow_credentials=True,allow_methods=["*"],allow_headers=["*"],expose_headers=["*"], # 暴露所有响应头
)
部署到服务器上➡️ 见这篇文章
至此,问题结束!
它提了几个 Agent模式流式输出改进方案,我懒得试了。
基于您的需求,我设计了以下几种改进Agent模式流式输出的方案:方案1:Agent步骤实时流式输出(推荐)在Agent执行过程中实时流式输出每个步骤的状态和结果:1. 工具调用提示:当Agent决定调用某个工具时,立即流式输出"正在搜索知识库..."、"正在进行网络搜索..."等状态信息2. 工具结果摘要:工具执行完成后,流式输出简化的结果摘要3. 最终答案流式:Agent推理完成后,真正流式输出最终答案方案2:并行Agent执行与预流式输出1. 立即开始流式:接收到问题后立即开始流式输出分析过程2. 并行执行:在后台异步执行Agent,同时在前端显示分析步骤3. 无缝切换:Agent完成后无缝切换到真实答案的流式输出方案3:混合模式智能切换根据问题复杂度自动选择:- 简单问题:直接使用非Agent模式的真实流式输出- 复杂问题:使用改进的Agent模式流式输出