Go语言流式输出技术实现-服务器推送事件(Server-Sent Events, SSE)

目录

  • 引言
  • 背景与技术概述
  • 实现技术细节
    • 1. HTTP 头部配置
    • 2. 事件格式与发送
    • 3. 保持连接与刷新
    • 4. 处理连接关闭
      • 4.1 使用上下文管理连接生命周期
      • 4.2 使用通道管理客户端连接
    • 5. 客户端交互
    • 6.demo
    • 7.Go转发大模型流式输出demo

引言

服务器推送事件(Server-Sent Events, SSE)是一种基于 HTTP 的单向数据流技术,允许服务器通过标准 HTTP 连接向客户端推送实时更新。SSE 使用 Content-Type: text/event-stream 头部标识响应内容为事件流,例如大模型流式输出。

背景与技术概述

SSE 是 HTML5 规范的一部分,通过 EventSource API 提供客户端支持。它的主要特点包括:

  • 单向通信: 数据仅从服务器流向客户端,无法通过同一连接反向发送。
  • 自动重连: 客户端在连接断开后会自动尝试重连。
  • 基于 HTTP: 利用现有 HTTP 基础设施,无需额外协议支持。
  • 事件格式: 事件以文本形式发送,每条事件以 data: 开头,结束于两个换行符 \n\n。

在 Go 中,SSE 的实现通常依赖标准库 net/http,也可以结合框架(如 Gin)或第三方库(如 github.com/r3labs/sse)来简化开发。

实现技术细节

1. HTTP 头部配置

服务端必须在响应中设置以下头部:

  • Content-Type: text/event-stream: 标识响应为事件流。
  • Cache-Control: no-cache: 防止浏览器缓存响应,确保实时性。
  • Connection: keep-alive: 保持连接开放,支持持续流式传输。

2. 事件格式与发送

SSE 事件必须遵循特定格式,每条事件包括以下字段:

  • data:: 事件数据,多个 data: 行会被拼接为一条消息。
  • 事件以两个换行符 \n\n 结束,表示一条事件的结束。
    例如,发送一条消息 “Hello, World!” 的格式为:
data: Hello, World!

在 Go 中,事件发送通常通过 http.ResponseWriter 实现。例如,Pascal Allen 的 Medium 文章中使用了 Gin 框架c.SSEvent(“message”, msg) 方法,而 Kelche.co 的示例直接使用 fmt.Fprintf(w, “data: %d \n\n”, rand.Intn(100)) 发送随机数。

3. 保持连接与刷新

为了实现流式输出,服务端需要保持 HTTP 连接开放,通常通过无限循环实现。在每个循环中:

  • 生成或获取事件数据。
  • 写入响应,使用 w.(http.Flusher).Flush() 立即刷新,确保数据实时发送。

例如,Kelche.co 的 randomHandler 函数每 2 秒发送一次随机数:

for{rand.Seed(time.Now().UnixNano())fmt.Fprintf(w,"data: %d \n\n", rand.Intn(100))w.(http.Flusher).Flush()time.Sleep(2* time.Second)
}

4. 处理连接关闭

客户端可能随时断开连接,服务端需检测并安全退出。
例如,可以通过检查 http.ResponseWriter 的状态或使用 Hijack 方法检测连接状态。在实际应用中,推荐使用通道(channel)或上下文(context)管理连接生命周期。

4.1 使用上下文管理连接生命周期

  • 上下文的作用: 上下文可以用来传递取消信号和截止时间。例如,当客户端断开连接时,HTTP 请求的上下文会被取消,服务器可以通过 <-ctx.Done() 检测到。

  • 关键方法:

    • context.Background():创建一个空的根上下文,通常作为父上下文。
    • context.WithCancel(parentCtx):创建一个可手动取消的上下文,cancel() 函数用于取消。
    • context.WithTimeout(parentCtx, duration):创建一个在指定时间后自动取消的上下文,适合设置 SSE 连接的超时。
    • context.WithDeadline(parentCtx, deadline):创建一个在指定截止时间后自动取消的上下文。
  • 在 SSE 中的应用:

    • 在 SSE 处理函数中,使用 ctx := r.Context() 获取 HTTP 请求的上下文。
    • 使用 select 语句监听 <-ctx.Done(),当上下文被取消时(例如客户端断开),执行清理逻辑。
    • 示例代码:
func sseHandler(w http.ResponseWriter, r *http.Request) {ctx := r.Context()for {select {case <-ctx.Done():return // 客户端断开,退出default:// 发送数据fmt.Fprintf(w, "data: message\n\n")w.(http.Flusher).Flush()time.Sleep(2 * time.Second)}}
} 

这种方式确保当客户端断开时,goroutine 可以及时退出,避免资源泄漏。

4.2 使用通道管理客户端连接

  • 通道的作用: 通道可以用来管理多个客户端的连接生命周期,例如添加新客户端、移除断开的客户端和广播消息。

  • 关键结构:

    • addClient:一个通道(如 chan *SSEClient),用于添加新客户端。
    • removeClient:一个通道(如 chan *SSEClient),用于移除断开的客户端。
    • 定义一个 SSEServer 结构体,包含:- clients:一个映射(如 map[*SSEClient]struct{}),存储所有活跃客户端。
    • 每个 SSEClient 包含一个消息通道(如 chan []byte),用于发送数据。
  • 在 SSE 中的应用:

    • 当新客户端连接时,创建一个 SSEClient,初始化其消息通道,并通过 addClient 通道通知服务器。
    • 当客户端断开时,通过 removeClient 通道通知服务器,服务器从 clients 中移除该客户端并关闭其通道。
    • 使用 sync.Mutex 保护 clients 映射的并发访问,确保线程安全。
  • 示例代码:

type SSEClient struct {ID     stringStream chan []byte
}type SSEServer struct {clients      map[*SSEClient]struct{}addClient    chan *SSEClientremoveClient chan *SSEClientmutex        sync.Mutex
}func (s *SSEServer) Run() {for {select {case client := <-s.addClient:s.mutex.Lock()s.clients[client] = struct{}{}s.mutex.Unlock()case client := <-s.removeClient:s.mutex.Lock()delete(s.clients, client)s.mutex.Unlock()close(client.Stream)}}
}

5. 客户端交互

客户端通过 EventSource API 连接到 SSE 端点。例如:

const eventSource = newEventSource("/random");
eventSource.onmessage = function(event){console.log(event.data);// 处理接收到的随机数
};

EventSource 会自动处理重连,适合需要持续更新的场景。

6.demo

package mainimport ("encoding/json""fmt""io""log""net/http""runtime/debug""time""github.com/spf13/cast"
)func main() {defer recovery()http.HandleFunc("/chat/send", Send)fmt.Println("服务器启动在 http://localhost:8080")log.Fatal(http.ListenAndServe(":8080", nil))
}func Send(w http.ResponseWriter, r *http.Request) {// 处理预检请求if r.Method == "OPTIONS" {w.WriteHeader(http.StatusOK)return}body, err := io.ReadAll(r.Body)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}var params SendRequesterr = json.Unmarshal(body, &params)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}demo := []string{"你好","你是谁","你是做什么的","你是怎么工作的","你是在哪座城市","你是什么星座","你是哪个国家的","你是哪个省的","你是哪个市的","你是哪个区的","你是哪个街道的","你是哪个社区的","你是哪个村的",}flusher, ok := w.(http.Flusher) // 获取流式输出器if !ok {http.Error(w, "Streaming unsupported", http.StatusInternalServerError)return}//设置headerw.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")// 流式输出for _, v := range demo {time.Sleep(1 * time.Second)lineData := fmt.Sprintf("data: %s\n\n", v)io.WriteString(w, lineData)flusher.Flush()}
}type SendRequest struct {Msg string `json:"msg"`
}func recovery() {if rec := recover(); rec != nil {log.Printf("Panic Panic occur")if err, ok := rec.(error); ok {log.Printf("PanicRecover Unhandled error: %v\n stack:%v", err.Error(), cast.ToString(debug.Stack()))} else {log.Printf("PanicRecover Panic: %v\n stack:%v", rec, cast.ToString(debug.Stack()))}}
}

在这里插入图片描述
执行一下命令运行:

go mod initgo mod tidygo run main.go

用postman请求localhost:8080/chat/send
在这里插入图片描述

7.Go转发大模型流式输出demo

	sendRequest.Model ="qwen-max"streamResp:=&proto.StreamResp{}qwenClient:= service.NewQwen(sendRequest)qwenClient.QwenStream(streamResp)defer streamResp.HttpResp.Body.Close()// 1. 复制下游服务的响应头for key,values:= range streamResp.HttpResp.Header {for _,value:= range values {w.Header().Add(key, value)}}// 2. 复制下游服务的状态码w.WriteHeader(streamResp.HttpResp.StatusCode)//流式输出// 确保 ResponseWriter 支持 Flusherflusher,ok:= w.(http.Flusher)if!ok {http.Error(w,"Streaming unsupported", http.StatusInternalServerError)return}// 处理流式响应scanner:= bufio.NewScanner(streamResp.HttpResp.Body)for scanner.Scan(){lineData:= scanner.Text()// 将响应数据逐步发送给客户端io.WriteString(w, lineData+"\n\n")flusher.Flush()// 刷新缓冲区}

在这里插入图片描述

在 Go 中实现 Content-Type: text/event-stream 流式输出需设置正确头部、格式化事件数据并保持连接开放。标准库和框架各有优势,开发者可根据需求选择。

  • 推荐参考以下资源深入学习:
    • 使用Go实现实时通信:基于Server-Sent Events (SSE)
    • Go 中的Server-Sent Events:一种高效的实时通信替代方案
    • Server-Sent Events (SSE) in Golang
    • Using server-sent events

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/917607.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/917607.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高端房产管理小程序

系统介绍1、用户端地图找房&#xff1a;对接地图API&#xff0c;地图形式显示周边房源,支持新盘和租房两种模式查询房价走势&#xff1a;城市房价走势&#xff0c;由后台每月录入房源搜索&#xff1a;搜索房源&#xff0c;支持多维度筛选房源类型&#xff1a;新盘销售、房屋租赁…

文本转语音(TTS)脚本

文本转语音(TTS)脚本 概述 generate_voice.py 是一个用于生成语音的Python脚本。该脚本提供了文本转语音(TTS)功能&#xff0c;可以将文本内容转换为语音文件。 功能特性 文本转语音: 将输入的文本转换为语音文件多种语音选项: 支持不同的语音类型和参数批量处理: 可以处理多个…

磁盘管理与分区

磁盘管理 一、磁盘类型 SATA,SCSI,SAS类型的磁盘&#xff0c;在Linux中用sd来表示。 其中第一块硬盘为sda&#xff0c;第二块二sdb&#xff0c;以此类推。 第一块硬盘的第一个分区为sda1。 nvme类型的磁盘&#xff0c;在Linux中使用nvmeXnYpZ进行表示。 X&#xff1a;数字&…

Linux 逻辑卷管理

练习创建物理卷(pv->vg->lv)物理卷&#xff08;PV&#xff09;就像把一块块独立的硬盘&#xff0c;标记成 "可用于搭建 LVM 的积木"&#xff0c;让系统知道这些硬盘可以被 LVM 管理。#把sdb这块硬盘标记为物理卷&#xff08;相当于给这块积木盖章&#xff0c;说…

向日葵参考基因组

向日葵参考基因组升级多个版本 向日葵基因组为油脂代谢、开花调控及菊类植物进化提供新见解-文献精读151-CSDN博客 官网 https://www.sunflowergenome.org/annotations-data/

什么是爬虫协议?

什么是爬虫协议&#xff1f; 爬虫协议&#xff08;Crawl Protocol&#xff09;是指为了有效地收集网页内容而建立的一些规定和标准&#xff0c;用以指导网络爬虫如何在互联网上抓取信息。 爬虫协议主要指的是Robots协议&#xff08;Robots Exclusion Protocol&#xff09;&am…

空间平面旋转与xoy平行

空间平面旋转与xoy平行 法向量 空间平面axbyczd0的其中一个法向量(a,b,c),法向量垂直于空间平面。目标平面平行于xoy的平面为0x0yczd0;其中一个法向量为(0,0,c),c可以为不为0的任意值&#xff0c;取(0,0,1)&#xff0c;目标平面的的法向量垂直于xoy平面 向量叉乘点乘 两个向量的…

odoo reportbro 拖拽式报表设计

报表设计以及下载 在实际业务中应用非常的广泛且频繁。odoo 本身也具有报表设计功能&#xff0c;但都是代码模式。且需要开发人员定制化开发&#xff0c;耗费成本高 所以引入reportbro报表设计就非常的简单快捷。低代码模式 以下以销售报表为例进行演示 报表字段配置报表界面设…

数字信号处理_编程实例1

stem([1,2,3]) 一、初始设置 %% 初始设置 % 清空工作空间&#xff0c;关闭无关页面 clc,clear,close all; % 绘图变量 font_size 12; %全局基础字体大小 axis_size 10; %坐标轴刻度标签字体大小 line_width 2; %绘图线条宽度 legend_size 10.5; %图例字体大小 marker_siz…

Docker 安装部署 OceanBase

1.拉取镜像 docker pull oceanbase/oceanbase-ce:latest2.启动oceanbase容器 docker run -p 2881:2881 --name oceanbase-ce -e MINI_MODE0 -d quay.io/oceanbase/oceanbase-ce3.查看oceanbase初始化的日志信息 docker logs oceanbase-ce4.进入oceanbase容器 docker exec -it o…

【华为机试】685. 冗余连接 II

文章目录685. 冗余连接 II题目描述示例 1&#xff1a;示例 2&#xff1a;提示&#xff1a;解题思路算法分析核心思想算法策略算法对比问题分类流程图并查集环检测流程入度统计与候选边选择情况分析决策树完整算法流程复杂度分析时间复杂度空间复杂度关键实现技巧1. 并查集优化2…

Redis之Hash和List类型常用命令

Redis之Hash和List类型常用命令一、Hash类型详解1. Hash类型的特点2. 常用命令及示例&#xff08;1&#xff09;设置字段值&#xff08;2&#xff09;获取字段值&#xff08;3&#xff09;删除字段&#xff08;4&#xff09;其他常用命令3. 应用场景二、List类型详解1. List类型…

【测试】⾃动化测试概念篇

本节⽬标&#xff1a;⾃动化测试Web⾃动化测试selenium1. ⾃动化1.1 ⾃动化概念⾃动化在⽣活中处处可⻅&#xff0c;⾃动的代替⼈的⾏为完成操作。⾃动洒⽔机&#xff0c;主要通上⽔就可以⾃动化洒⽔并且可以⾃动的旋转。⾃动洗⼿液&#xff0c;免去了⼿动挤压可以⾃动感应出洗…

Java中给List<T> 对象集合去重

Java中给List 对象集合去重List<Student> getStudentList studentMapper.getStudentList();List<Student> distinctInsurance distinctByField(getStudentList, Student::getCertNo);public static <T> List<T> distinctByField(List<T> list…

最小二乘法MSE

最小二乘法MSEx1x2x3x4x5x6x7x8x0y014805-29-31339-41064-14-2-1481-114-1-65-123-32-21305-23105114-81126-15-15-8-157-4-1221-39511-10-243-9-671-87-1404-35101371422-3-7-2-80-6-5-91-3091前景知识: 矩阵相关公式y(339−11430126−395−87422−309)y\begin{pmatrix} 339&a…

Pixel 4D 3.4.4.0 | 支持丰富的壁纸资源,高清画质,高度的个性化设置能力,智能推荐功能

Pixel 4D是一款功能强大且用户体验良好的动态壁纸应用。它提供了丰富的壁纸资源和高清画质&#xff0c;让用户可以轻松找到自己喜欢的壁纸。此外&#xff0c;该应用还具备高度的个性化设置能力&#xff0c;允许用户根据自己的喜好调整壁纸效果。智能推荐功能则能帮助用户发现更…

<PhotoShop><JavaScript><脚本>基于JavaScript,利用脚本实现PS软件批量替换图片,并转换为智能对象?

前言 PhotoShop软件支持JavaScript脚本,来扩展软件的功能,官方本身也提供了一些常用脚本,如图像处理等,同时也支持自定义的JavaScript脚本。 环境配置 系统:windows 平台:visual studio code 语言:JavaScript 软件:PhotoShop 2022 版本:23.2.1 概述 本文利用Java…

【Linux】System V - 基于建造者模式的信号量

目录 信号量和P、V原语 信号量集结构体 信号量操作接口 semget semctl semop 封装Sem 关于建造者模式 信号量和P、V原语 信号量和 P、V 原语由 Dijkstra &#xff08;迪杰斯特拉&#xff09;提出 信号量值含义 S>0: S 表⽰可⽤资源的个数 S0: 表⽰⽆可⽤资源&a…

机器学习(11):岭回归Ridge

岭回归是失损函数通过添加所有权重的平方和的乘积(L2)来惩罚模型的复杂度。均方差除以2是因为方便求导&#xff0c;w_j指所有的权重系数, λ指惩罚型系数&#xff0c;又叫正则项力度特点:岭回归不会将权重压缩到零&#xff0c;这意味着所有特征都会保留在模型中&#xff0c;但它…

调整Idea缓存目录,释放C盘空间

本文使用 Idea2024 Idea 会将一些配置默认缓存在C盘&#xff0c;使用久了会占用大量空间&#xff08;本人的Idea占用了将近5个G&#xff0c;以至于不得不进行迁移&#xff09; 缓存目录主要涉及以下四个目录&#xff0c;四个目录可以分为两组&#xff0c;每组目录必须一起调整 …