open webui源码分析8—管道

        我们可以把Open WebUI想象成一个管道系统,数据通过管道和阀门流动。管道作为open webui的插件,可以为数据构建新的通路,可以自定义逻辑和处理数据;阀门是管道的可配置部件,控制数据流过管道时的行为。管道可以理解成用户自定义的模型,并且被追加到用户可见的模型列表中。用户的对话式,可以跟选择大模型一样选择管道。

        管道的引入方式通前面所说的Action和Filter一样,下面以一个使用langchain的管道为例,该管道在发送请求到大模型之前把{'role':'system', 'cotent':'You are a helpful bot'}增加到messages中,具体代码如:

"""
title: LangChain Pipe Function
author: Colby Sawyer @ Attollo LLC (mailto:colby.sawyer@attollodefense.com)
author_url: https://github.com/ColbySawyer7
version: 0.1.0

This module defines a Pipe class that utilizes LangChain
"""

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time

# import LangChain dependencies
from langchain_core.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_community.llms import Ollama
# Uncomment to use OpenAI and FAISS
#from langchain_openai import ChatOpenAI
#from langchain_community.vectorstores import FAISS

class Pipe:
class Valves(BaseModel):
base_url: str = Field(default="http://localhost:11434")
ollama_embed_model: str = Field(default="nomic-embed-text")
ollama_model: str = Field(default="llama3.1")
openai_api_key: str = Field(default="...")
openai_model: str = Field(default="gpt3.5-turbo")
emit_interval: float = Field(
default=2.0, description="Interval in seconds between status emissions"
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)

    def __init__(self):
self.type = "pipe"
self.id = "langchain_pipe"
self.name = "LangChain Pipe"
self.valves = self.Valves()
self.last_emit_time = 0
pass

    async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
current_time = time.time()
if (
__event_emitter__
and self.valves.enable_status_indicator
and (
current_time - self.last_emit_time >= self.valves.emit_interval or done
)
):
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
self.last_emit_time = current_time

    async def pipe(self, body: dict, 
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "/initiating Chain", False
)

        # ======================================================================================================================================
# MODEL SETUP
# ======================================================================================================================================
# Setup the model for generating responses
# ==========================================================================
# Ollama Usage
_model = Ollama(
model=self.valves.ollama_model,
base_url=self.valves.base_url
)
# ==========================================================================
# OpenAI Usage
# _model = ChatOpenAI(
#     openai_api_key=self.valves.openai_api_key,
#     model=self.valves.openai_model
# )
# ==========================================================================

        # Example usage of FAISS for retreival
# vectorstore = FAISS.from_texts(
#     texts, embedding=OpenAIEmbeddings(openai_api_key=self.valves.openai_api_key)
# )

        # ======================================================================================================================================
# PROMPTS SETUP
# ==========================================================================
_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful bot"),
("human", "{question}")
])
# ======================================================================================================================================
# CHAIN SETUP
# ==========================================================================
# Basic Chain
chain = (
{"question": RunnablePassthrough()}
| _prompt
| _model
| StrOutputParser()
)
# ======================================================================================================================================
# Langchain Calling
# ======================================================================================================================================
await self.emit_status(
__event_emitter__, "info", "Starting Chain", False
)
messages = body.get("messages", [])
# Verify a message is available
if messages:
question = messages[-1]["content"]
try:
# Invoke Chain
response = chain.invoke(question)
# 把调用管道的应答数据追加到表单的messages中
body["messages"].append({"role": "assistant", "content": response})
except Exception as e:
await self.emit_status(__event_emitter__, "error", f"Error during sequence execution: {str(e)}", True)
return {"error": str(e)}
# If no message is available alert user
else:
await self.emit_status(__event_emitter__, "error", "No messages found in the request body", True)
body["messages"].append({"role": "assistant", "content": "No messages found in the request body"})

        await self.emit_status(__event_emitter__, "info", "Complete", True)
return response

        管道增加成功后,出现在聊天页面的模型列表中,用户可以在聊天页面选择使用,效果如下图:

         一、请求报文

        在请求数据中的model就是当前新增的管道名LangChain Pipe,具体如下:

{
"stream": true,
"model": "langchain_pipe",
"messages": [
{
"role": "user",
"content": "床前明月光的下一句"
}
],
"params": {},
"tool_servers": [],
"features": {
"image_generation": false,
"code_interpreter": false,
"web_search": false,
"memory": false
},
"variables": {
"{{USER_NAME}}": "acaluis",
"{{USER_LOCATION}}": "Unknown",
"{{CURRENT_DATETIME}}": "2025-08-22 18:27:04",
"{{CURRENT_DATE}}": "2025-08-22",
"{{CURRENT_TIME}}": "18:27:04",
"{{CURRENT_WEEKDAY}}": "Friday",
"{{CURRENT_TIMEZONE}}": "Etc/GMT-8",
"{{USER_LANGUAGE}}": "zh-CN"
},
"model_item": {
"id": "langchain_pipe",
"name": "LangChain Pipe",
"object": "model",
"created": 1755857251,
"owned_by": "openai",
"pipe": {
"type": "pipe"
},
"actions": [],
"filters": [],
"tags": []
},
"session_id": "Hh2Tyy7FYSQz-I1SAAAg",
"chat_id": "b80cadeb-0389-4464-9fd9-270193f4f3a0",
"id": "618d361e-d70c-4fec-b945-a4be549bd699",
"background_tasks": {
"title_generation": true,
"tags_generation": true,
"follow_up_generation": true
}
}

        二、源码分析

        在会话主流程中,关于pipe的处理入口在generate_chat_completion方法中,相关代码如下:

async def generate_chat_completion(
request: Request,
form_data: dict,
user: Any,
bypass_filter: bool = False,
):

    ……

    if model.get("pipe"): #看这里。请求报文中有pipe属性,所以进入这个分支。
return await generate_function_chat_completion(#下面重点分析该方法代码
request, form_data, user=user, models=models
)

    ……  

        generate_function_chat_completion代码如下:

本方法处理流程如下:

1)确定使用的模型,也就是现在的langchain_pipe

2)设置pipe方法的部分参数到extra_params中

3)从模块中获取管道Function的pipe方法及参数

4)异步执行pipe方法

5)把调用pipe方法的应答包装成openai兼容格式,并以流式返回

async def generate_function_chat_completion(
request, form_data, user, models: dict = {}
):
async def execute_pipe(pipe, params):#该方法一步调用管道中的pipe方法
if inspect.iscoroutinefunction(pipe):
return await pipe(**params)
else:
return pipe(**params)

    async def get_message_content(res: str | Generator | AsyncGenerator) -> str:
if isinstance(res, str):
return res
if isinstance(res, Generator):
return "".join(map(str, res))
if isinstance(res, AsyncGenerator):
return "".join([str(stream) async for stream in res])

    def process_line(form_data: dict, line):
if isinstance(line, BaseModel):
line = line.model_dump_json()
line = f"data: {line}"
if isinstance(line, dict):
line = f"data: {json.dumps(line)}"

        try:
line = line.decode("utf-8")
except Exception:
pass

        if line.startswith("data:"):
return f"{line}\n\n"
else:
line = openai_chat_chunk_message_template(form_data["model"], line)
return f"data: {json.dumps(line)}\n\n"

    def get_pipe_id(form_data: dict) -> str:
pipe_id = form_data["model"]
if "." in pipe_id:
pipe_id, _ = pipe_id.split(".", 1)
return pipe_id

    #本方法用于从form_data,user和extra_params中获取管道的pipe函数参数

    def get_function_params(function_module, form_data, user, extra_params=None):
if extra_params is None:
extra_params = {}

        pipe_id = get_pipe_id(form_data)

        # Get the signature of the function
sig = inspect.signature(function_module.pipe) #获取函数签名

        '''

            body设置为表单,__event_emitter__、__event_caller__和__user__来自

            extra_params

        '''
params = {"body": form_data} | {
k: v for k, v in extra_params.items() if k in sig.parameters
}

        '''

            如果参数中有__user__并且管道函数中有UserValves,则需要根据pipe_id和user_id

            找到用户阀门值,并设置到params["__user__"]["valves"]中

        '''

        if "__user__" in params and hasattr(function_module, "UserValves"):
user_valves = Functions.get_user_valves_by_id_and_user_id(pipe_id, user.id)
try:
params["__user__"]["valves"] = function_module.UserValves(**user_valves)
except Exception as e:
log.exception(e)
params["__user__"]["valves"] = function_module.UserValves()

        return params

    #以下两行代码根据请求中的model确定使用的模型,这里对应就是前面定位的管道

    model_id = form_data.get("model")
model_info = Models.get_model_by_id(model_id)

    '''

        当前model_info信息如下:

                {
"id": "langchain_pipe",
"name": "LangChain Pipe",
"object": "model",
"created": 1755827527,
"owned_by": "openai",
"pipe":{"type": "pipe"},
}

    '''

    metadata = form_data.pop("metadata", {})

    files = metadata.get("files", [])
tool_ids = metadata.get("tool_ids", [])
# tool_ids处理,暂不关注
if tool_ids is None:
tool_ids = []

    __event_emitter__ = None
__event_call__ = None
__task__ = None
__task_body__ = None

    if metadata:#如果元数据不为空,判断其中是否包含了session_id,chat_id和message_id

        #如果上面的判断成立,则创建__event_emitter__和__event_call__
if all(k in metadata for k in ("session_id", "chat_id", "message_id")):
__event_emitter__ = get_event_emitter(metadata)
__event_call__ = get_event_call(metadata)
__task__ = metadata.get("task", None)
__task_body__ = metadata.get("task_body", None)

    extra_params = { #t填充extra_params,用于提供调用pipe方法时的参数
"__event_emitter__": __event_emitter__,
"__event_call__": __event_call__,
"__chat_id__": metadata.get("chat_id", None),
"__session_id__": metadata.get("session_id", None),
"__message_id__": metadata.get("message_id", None),
"__task__": __task__,
"__task_body__": __task_body__,
"__files__": files,
"__user__": user.model_dump() if isinstance(user, UserModel) else {},
"__metadata__": metadata,
"__request__": request,
}
extra_params["__tools__"] = get_tools(#根据tool_ids设置extra_params的__tools__
request,
tool_ids,
user,
{
**extra_params,
"__model__": models.get(form_data["model"], None),
"__messages__": form_data["messages"],
"__files__": files,
},
)

    if model_info:
if model_info.base_model_id:#暂不考虑
form_data["model"] = model_info.base_model_id

        params = model_info.params.model_dump()

        if params:#暂不考虑
system = params.pop("system", None)
form_data = apply_model_params_to_body_openai(params, form_data)
form_data = apply_model_system_prompt_to_body(
system, form_data, metadata, user
)

    pipe_id = get_pipe_id(form_data)#从表单中获取管道对应的函数ID
function_module = get_function_module_by_id(request, pipe_id) #根据ID获取模块

    pipe = function_module.pipe#得到该管道的pipe方法

    #从表单数据、用户和extra_params中找到并组织好调用pipe方法时所有的参数
params = get_function_params(function_module, form_data, user, extra_params)

    if form_data.get("stream", False):#一般是流式应答

        async def stream_content(): #执行管道的pipe方法,并处理流式应答
try:

                 '''

                   重要:管道中调用ollama返回的数据是ndjson

                   ndjson是一种数据交换格式,其中每行包含一个独立的json对象,

                   需要把所有的行合并成一个大的json对象,langchain完成了合并,

                   所以调用管道后,返回的内容就是应答内容

                '''
res = await execute_pipe(pipe, params)

                # Directly return if the response is a StreamingResponse
if isinstance(res, StreamingResponse):
async for data in res.body_iterator:
yield data
return
if isinstance(res, dict):
yield f"data: {json.dumps(res)}\n\n"
return

            except Exception as e:
log.error(f"Error: {e}")
yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n"
return

            if isinstance(res, str): #正常情况进入本分支

                ''' --------把大模型应答内容转换成标准openai格式------------

                   返回数据格式如下:

                    {"data":

                             {

                                   "id", "langchain_pipe-{uuid}",

                                   "created": "当前时间",

                                   "model": "langchain_pipe",    

                                    choices:[

                                                     {

                                                            "index":0,

                                                            "logprobs":None,

                                                            "finish_reason":None

                                                             "delta":{

                                                                 "content": "管道返回内容"

                                                              }

                                                      }

                                         ]

                                }

                          }         

                '''
message = openai_chat_chunk_message_template(form_data["model"], res)
yield f"data: {json.dumps(message)}\n\n"

            if isinstance(res, Iterator):
for line in res:
yield process_line(form_data, line)

            if isinstance(res, AsyncGenerator):
async for line in res:
yield process_line(form_data, line)

            if isinstance(res, str) or isinstance(res, Generator):
finish_message = openai_chat_chunk_message_template(
form_data["model"], ""
)
finish_message["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(finish_message)}\n\n"
yield "data: [DONE]"

        #调用stream_content()后,以流式应答返回供process_chat_response处理

        return StreamingResponse(stream_content(), media_type="text/event-stream")
else:#非流式请求,不须考虑
try:
res = await execute_pipe(pipe, params) #执行管道的pipe方法

        except Exception as e:
log.error(f"Error: {e}")
return {"error": {"detail": str(e)}}

        if isinstance(res, StreamingResponse) or isinstance(res, dict):
return res
if isinstance(res, BaseModel):
return res.model_dump()

        message = await get_message_content(res)
return openai_chat_completion_message_template(form_data["model"], message)

        process_chat_response的处理逻辑在系列3已经分析,在此不做赘述。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/94532.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/94532.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解 C 语言 hsearch 哈希表:限制、技巧与替代方案

概述 C 语言标准库中的 hsearch 系列函数提供了一套简单易用的哈希表实现,包含在 <search.h> 头文件中。这组函数虽然接口简洁,但在实际使用中存在一些重要的限制和注意事项。本文将深入探讨 hsearch 的功能特点、设计局限,并提供实用的解决方案和替代建议。 hsearc…

Web网络开发 -- HTML和CSS基础

HTML 超文本编辑语言 HTML 介绍 HTML的英文全称是 Hyper Text Markup Language&#xff0c;即超文本标记语言。HTML是由WEB的发明者 Tim Berners-Lee &#xff08;蒂姆伯纳斯李&#xff09;和同事 Daniel W. Connolly于1990年创立的一种标记语言&#xff0c; 它是标准通用化标…

Python爬虫实战:研究开源的高性能代理池,构建电商数据采集和分析系统

1. 绪论 1.1 研究背景与意义 随着互联网技术的飞速发展,网络数据已成为信息时代的核心资源之一。从商业角度看,企业通过分析竞争对手的产品信息、用户评价等数据,可制定更精准的市场营销策略;从学术研究角度,研究者通过爬取社交媒体数据、学术文献等,可开展社会网络分析…

项目设计文档——爬虫项目(爬取天气预报)

一、项目背景以及项目意义 项目背景&#xff1a; 爬虫技术的核心目的是自动化地从互联网上采集&#xff0c;提取和存储数据。网络爬虫是一种自动化程序&#xff0c;用于从互联网上抓取数据并进行处理。C语言因其高效性和接近硬件的特性&#xff0c;常被用于开发高性能的网络爬…

Python 操作 PPT 文件:从新手到高手的实战指南

在日常工作和学习中&#xff0c;PPT 是我们展示信息和进行演示的重要工具。无论是制作报告、演讲还是教学课件&#xff0c;PPT 都扮演着不可或缺的角色。然而&#xff0c;当面对大量重复性的 PPT 编辑任务时&#xff0c;手动操作不仅耗时耗力&#xff0c;还容易出错。幸运的是&…

系统设计中的幂等性

1. 基本概念 幂等性&#xff08;Idempotence&#xff09;是系统设计中经常提到的概念。如果某个操作执行一次或多次都能产生相同的结果&#xff0c;那么它就是幂等的。2. 代码示例 下面这段代码是幂等的。无论你调用多少次&#xff0c;show_my_button 的最终状态都是False。 de…

Pandas vs Polars Excel 数据加载对比报告

📊 Pandas vs Polars Excel 数据加载对比报告 1. 数据基本情况 数据文件:data.xlsx 数据规模:23,670 行 3 列 字段: case_time:日期/时间 case_name:公司名称(字符串) board:所属板块(字符串) 2. 加载方式与代码 Pandas import pandas as pdfrom tools import…

Kafka 为什么具有高吞吐量的特性?

Kafka 高吞吐量原因&#xff1a;面试题总结 在面试中&#xff0c;Kafka 的高吞吐量设计是高频考点&#xff0c;核心需围绕“架构设计”“存储优化”“网络效率”“资源利用”四个维度展开&#xff0c;以下是结构化总结&#xff1a; 一、核心架构&#xff1a;并行化与分层设计分…

MCP 协议原理与系统架构详解—从 Server 配置到 Client 应用

1. MCP MCP&#xff08;Model Context Protocol&#xff0c;模型上下文协议&#xff09;是开发 Claude 模型的(Anthropic)公司推出的一个开放标准协议&#xff0c;就像是一个 “通用插头” 或者 “USB 接口”&#xff0c;制定了统一的规范&#xff0c;不管是连接数据库、第三方…

uniapp安卓真机调试问题解决总结

uniapp安卓真机调试遇到各种连接不上问题&#xff1a; 手机上打开调试数据线不行&#xff0c;换数据线电脑重启手机重启拔出数据线&#xff0c;换个USB插口。

Linux Qt创建和调用so库的详细教程

一、创建so库1.文件-->新建文件或项目-->Library->C Library&#xff0c;如下图2.工程命名为Example3.一直下一步就可以4、工程创建完成&#xff0c;如下图5、删除Example_global.h6、配置.pro文件# 设置输出目录 DESTDIR $$PWD/output #只生成.so文件 CONFIG plugi…

【深度学习】蒙特卡罗方法:原理、应用与未来趋势

作者选择了由 Ian Goodfellow、Yoshua Bengio 和 Aaron Courville 三位大佬撰写的《Deep Learning》(人工智能领域的经典教程&#xff0c;深度学习领域研究生必读教材),开始深度学习领域学习&#xff0c;深入全面的理解深度学习的理论知识。 之前的文章参考下面的链接&#xf…

区块链技术原理(18)-以太坊共识机制

文章目录前言什么是共识&#xff1f;什么是共识机制&#xff1f;共识机制的核心目标共识机制的类型PoW&#xff08;工作量证明&#xff09;协议&#xff1a;&#xff08;2015-2022&#xff09;PoS&#xff08;权益证明&#xff09;协议&#xff1a;&#xff08;PoS&#xff0c;…

java基础(十五)计算机网络

网络模型概述 为了使得多种设备能通过网络相互通信&#xff0c;并解决各种不同设备在网络互联中的兼容性问题&#xff0c;国际标准化组织&#xff08;ISO&#xff09;制定了开放式系统互联通信参考模型&#xff08;OSI模型&#xff09;。与此同时&#xff0c;TCP/IP模型作为实际…

idea将服务封装为一个jar包

你使用的是 IntelliJ IDEA 2018&#xff0c;这个版本虽然不是最新的&#xff0c;但完全支持通过 图形化界面 打 JAR 包&#xff08;无需命令行&#xff09;&#xff0c;非常适合你在公司内部将 Snowflake 模块打包成通用组件。下面我将 手把手、一步一步、图文流程式地教你&…

ZYNQ [Petalinux的运行]

一、下载ubuntu 下载地址很多&#xff0c;这里提供了一个&#xff1a;http://mirrors.aliyun.com/ubuntu-releases/14.04/ 推荐开始浏览器下载之后复制下载链接使用迅雷下载。 二、虚拟机安装Ubuntu vmware中安装Ubutun–这部分不展示 安装ssh sudo apt install openssh-s…

excel 破解工作表密码

破解Excel工作表密码可通过易用宝工具、VBA脚本或修改文件格式实现&#xff0c;具体方法需根据文件类型和密码保护类型选择。 ‌使用易用宝工具&#xff08;推荐&#xff09;‌ 适用于Excel 2007及以上版本&#xff0c;操作简便且无需编程基础&#xff1a; 下载安装Excel易用…

Deepseek + RAGFlow 搭建本地知识库问答系统

Deepseek RAGFlow 搭建本地知识库问答系统原因为什么要本地部署RAG模型和微调模型区别本地部署流程1. 下载 ollama &#xff0c;通过ollama把Deepseek模型下载到本地运行。2. 下载RAGFlow 源代码和 Docker &#xff0c;通过Docker部署RAGFlow。3. 在RAGFlow中构建个人知识库并…

elementui附件上传自定义文件列表,实现传完即可预览、下载、删除,二次封装el-upload

背景当前 elementui 的文件上传组件在上传完文件之后只支持删除&#xff0c;用户希望可以看到附件信息&#xff0c;还可以预览自己刚刚上传但未提交的文件&#xff0c;还希望可以下载&#xff0c;因为公司的下载功能当前是通过 OnlyOffice 实现了文件格式转换&#xff0c;所以我…

linux的conda配置与应用阶段的简单指令备注

1.新建某虚拟环境 conda create -n 虚拟环境名 pythonPython版本号 (-y)2.退出当前虚拟环境 conda deactivate3.查看当前conda环境下所有的虚拟环境 conda info --envs4.查看conda版本和位置 conda --versionwhich conda5.激活某个conda虚拟环境 conda activate 虚拟环境名