使用 FastAPI 的 WebSockets 和 Elasticsearch 来构建实时应用

作者:来自 Elastic Jeffrey Rengifo

学习如何使用 FastAPI WebSockets 和 Elasticsearch 构建实时应用程序。

更多阅读:使用 FastAPI 构建 Elasticsearch API

想要获得 Elastic 认证吗?看看下一次 Elasticsearch Engineer 培训什么时候开始!

Elasticsearch 拥有许多新功能,可以帮助你为你的使用场景构建最佳搜索解决方案。深入学习我们的示例笔记本,了解更多内容,开始免费的云试用,或者立即在本地机器上尝试 Elastic。


WebSockets 是一种同时双向通信协议。它的理念是客户端和服务器可以保持一个打开的连接,同时互相发送消息,从而尽可能降低延迟。这种方式常见于实时应用,比如聊天、活动通知或交易平台,在这些场景中延迟是关键,并且存在持续的信息交换。

想象一下你创建了一个消息应用,想在用户收到新消息时通知他们。你可以每隔 5 或 10 秒通过发送 HTTP 请求轮询服务器,直到有新消息,或者你可以保持一个 WebSockets 连接,让服务器推送一个事件,客户端监听后在消息到达时立即显示通知标记。

在这种情况下,Elasticsearch 能够在数据集上实现快速而灵活的搜索,使其非常适合需要即时结果的实时应用。

在这篇文章中,我们将使用 FastAPI 的 WebSockets 功能和 Elasticsearch 创建一个实时应用程序。

先决条件

  • Python 版本 3.x
  • 一个 Elasticsearch 实例(自托管或 Elastic Cloud 上)
  • 一个具有写权限的 Elasticsearch API key

本文使用的所有代码可以在这里找到。

使用场景

为了向你展示如何将 WebSockets 与 FastAPI 和 Elasticsearch 一起使用,我们将采用一个使用场景:作为店主的你,想在某个查询被执行时通知所有用户,以吸引他们的注意力。这模拟了搜索驱动应用中的实时互动,比如促销活动或产品兴趣提醒。

在这个使用场景中,我们将构建一个应用,客户可以搜索产品,并在其他用户执行了在监控列表中的搜索时收到通知。

用户 A 搜索 “Kindle”,用户 B 会实时收到通知。

数据摄取

在这一部分,我们将创建索引映射,并使用一个 Python 脚本摄取所需的数据。你可以在博客仓库中找到以下脚本。

摄取脚本

创建一个名为 ingest_data.py 的新文件,其中包含用于处理数据摄取的 Python 逻辑。

安装 Elasticsearch 库以处理对 Elasticsearch 的请求:

pip install elasticsearch -q

现在导入依赖,并使用 API key 和 Elasticsearch 端点 URL 初始化 Elasticsearch 客户端。

import json
import osfrom elasticsearch import Elasticsearches_client = Elasticsearch(hosts=[os.environ["ELASTICSEARCH_ENDPOINT"]],api_key=os.environ["ELASTICSEARCH_API_KEY"],
)

创建一个方法,在名为 “products” 的索引下设置索引映射。

PRODUCTS_INDEX = "products"def create_products_index():try:mapping = {"mappings": {"properties": {"product_name": {"type": "text"},"price": {"type": "float"},"description": {"type": "text"},}}}es_client.indices.create(index=PRODUCTS_INDEX, body=mapping)print(f"Index {PRODUCTS_INDEX} created successfully")except Exception as e:print(f"Error creating index: {e}")

现在使用 bulk API 加载产品文档,将它们推送到 Elasticsearch。数据将位于项目仓库中的 NDJSON 文件中。

def load_products_from_ndjson():try:if not os.path.exists("products.ndjson"):print("Error: products.ndjson file not found!")returnproducts_loaded = 0with open("products.ndjson", "r") as f:for line in f:if line.strip():product_data = json.loads(line.strip())es_client.index(index=PRODUCTS_INDEX, body=product_data)products_loaded += 1print(f"Successfully loaded {products_loaded} products into Elasticsearch")except Exception as e:print(f"Error loading products: {e}")

最后,调用已创建的方法。

if __name__ == "__main__":create_products_index()load_products_from_ndjson()

在终端中使用以下命令运行脚本。

python ingest_data.py

完成后,让我们继续构建应用。

Index products created successfully
Successfully loaded 25 products into Elasticsearch

WebSockets 应用

为了提高可读性,应用的界面将简化。完整的应用仓库可以在这里找到。

该图展示了 WebSocket 应用如何与 Elasticsearch 和多个用户交互的高级概览。

应用结构

|-- websockets_elasticsearch_app
|-- ingest_data.py
|-- index.html
|-- main.py

安装并导入依赖

安装 FastAPI 和 WebSocket 支持。Uvicorn 将作为本地服务器,Pydantic 用于定义数据模型,Elasticsearch 客户端允许脚本连接到集群并发送数据。

pip install websockets fastapi pydantic uvicorn -q

FastAPI 提供了易用、轻量且高性能的工具来构建 web 应用,而 Uvicorn 作为 ASGI 服务器来运行它。Pydantic 在 FastAPI 内部用于数据验证和解析,使定义结构化数据更容易。WebSockets 提供了低级协议支持,使服务器和客户端之间能够实现实时双向通信。之前安装的 Elasticsearch Python 库将在此应用中用于处理数据检索。

现在,导入构建后端所需的库。

import json
import os
import uvicorn
from datetime import datetime
from typing import Dict, Listfrom elasticsearch import Elasticsearch
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field

Elasticsearch 客户端

定义 Elasticsearch 端点和 API key 的环境变量,并实例化一个 Elasticsearch 客户端来处理与 Elasticsearch 集群的连接。

os.environ["ELASTICSEARCH_ENDPOINT"] = getpass("Insert the Elasticsearch endpoint here: "
)
os.environ["ELASTICSEARCH_API_KEY"] = getpass("Insert the Elasticsearch API key here: ")es_client = Elasticsearch(hosts=[os.environ["ELASTICSEARCH_ENDPOINT"]],api_key=os.environ["ELASTICSEARCH_API_KEY"],
)PRODUCTS_INDEX = "products"

数据模型和应用设置

现在是创建 FastAPI 实例的时候了,它将处理 REST API 和 WebSocket 路由。然后,我们将使用 Pydantic 定义几个数据模型。

  • Product 模型描述每个产品的结构。
  • SearchNotification 模型定义我们将发送给其他用户的消息。
  • SearchResponse 模型定义 Elasticsearch 结果的返回方式。

这些模型有助于在整个应用中保持一致性和可读性,并在代码 IDE 中提供数据验证、默认值和自动补全。

app = FastAPI(title="Elasticsearch - FastAPI with websockets")class Product(BaseModel):product_name: strprice: floatdescription: strclass SearchNotification(BaseModel):session_id: strquery: strtimestamp: datetime = Field(default_factory=datetime.now)class SearchResponse(BaseModel):query: strresults: List[Dict]total: int

WebSockets 端点设置

当用户连接到 /ws 端点时,WebSocket 连接会保持打开状态并添加到全局列表中。这允许服务器即时向所有连接的客户端广播消息。如果用户断开连接,他们的连接将被移除。

# Store active WebSocket connections
connections: List[WebSocket] = []@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await websocket.accept()connections.append(websocket)print(f"Client connected. Total connections: {len(connections)}")try:while True:await websocket.receive_text()except WebSocketDisconnect:connections.remove(websocket)print(f"Client disconnected. Total connections: {len(connections)}")

搜索端点

现在让我们查看发生实时交互的代码。

当用户执行搜索时,会查询 Elasticsearch 并返回结果。同时,如果查询在全局监控列表中,所有其他已连接用户会收到通知,提示有人找到了其中的某个产品。通知中包含查询内容。

session_id 参数用于避免将通知发送回发起搜索的用户。

@app.get("/search")
async def search_products(q: str, session_id: str = "unknown"):# List of search terms that should trigger a notificationWATCH_LIST = ["iphone", "kindle"]try:query_body = {"query": {"bool": {"should": [{"match": {"product_name": q}},{"match_phrase": {"description": q}},],"minimum_should_match": 1,}},"size": 20,}response = es_client.search(index=PRODUCTS_INDEX, body=query_body)results = []for hit in response["hits"]["hits"]:product = hit["_source"]product["score"] = hit["_score"]results.append(product)results_count = response["hits"]["total"]["value"]# Only send notification if the search term matchesif q.lower() in WATCH_LIST:notification = SearchNotification(session_id=session_id, query=q, results_count=results_count)for connection in connections.copy():try:await connection.send_text(json.dumps({"type": "search","session_id": session_id,"query": q,"timestamp": notification.timestamp.isoformat(),}))except:connections.remove(connection)return SearchResponse(query=q, results=results, total=results_count)except Exception as e:status_code = getattr(e, "status_code", 500)return HTTPException(status_code=status_code, detail=str(e))

注意:session_id 仅基于当前时间戳以简化处理 —— 在生产环境中,你需要使用更可靠的方法。

客户端

为了展示应用流程,创建一个前端页面,使用简单的 HTML,包括搜索输入框、结果区域和用于通知的对话框。

<!DOCTYPE html>
<html lang="en"><body><h1>🛍️ TechStore - Find Your Perfect Product</h1><form onsubmit="event.preventDefault(); searchProducts();"><p><label for="searchQuery">Search Products:</label><br /><inputtype="text"id="searchQuery"placeholder="Search for phones, laptops, headphones..."size="50"required /><button type="submit">🔍 Search</button></p></form><!-- HTML Dialog for notifications --><dialog id="notificationDialog"><div><h2>🔔 Live Search Activity</h2><p id="notificationMessage"></p><p><button onclick="closeNotification()" autofocus>OK</button></p></div></dialog><div id="searchResults"><h2>Search Results</h2></div><script>...</script></body>
</html>

通知使用了 元素用于演示,但在真实应用中,你可能会使用 toast 或小徽章来显示。在实际场景中,这类通知可用于显示有多少用户正在搜索某些产品、提供库存实时更新,或突出显示返回成功结果的热门搜索查询。

Script 标签

在 标签内,包含将前端连接到后端 WebSocket 端点的逻辑。让我们看看下面的代码片段。

let ws = null;
let sessionId = null;window.onload = function () {sessionId = "session_" + Date.now();connectWebSocket();
};

页面加载时,会生成一个唯一的 session ID 并连接到 WebSocket。

function connectWebSocket() {ws = new WebSocket("ws://localhost:8000/ws");ws.onopen = function () {console.log("Connected to WebSocket");};ws.onmessage = function (event) {try {const notification = JSON.parse(event.data);if (notification.type === "search") {showSearchNotification(notification);}} catch (error) {console.error("Error parsing notification:", error);}};ws.onclose = function () {console.log("Disconnected from WebSocket");};ws.onerror = function (error) {console.error("WebSocket error:", error);};
}

函数 connectWebSocket 使用 ws = new WebSocket("ws://localhost:8000/ws") 建立 WebSocket 连接。语句 ws.onopen 通知后端已创建新连接。然后,ws.onmessage 监听其他用户在商店中搜索时发送的通知。

function showSearchNotification(notification) {// Skip notifications from the same session (same browser window)if (notification.session_id === sessionId) {return;}const dialog = document.getElementById("notificationDialog");const messageElement = document.getElementById("notificationMessage");messageElement.innerHTML = `<p><strong>Hot search alert!</strong> Other users are looking for <em>"${notification.query}"</em> right now.</p>`;// Show the notification dialogdialog.showModal();
}function closeNotification() {const dialog = document.getElementById("notificationDialog");dialog.close();
}

函数 showSearchNotification 在屏幕上显示通过 WebSockets 接收到的通知,而 closeNotification 函数用于关闭 showSearchNotification 显示的消息。

async function searchProducts() {const query = document.getElementById("searchQuery").value.trim();const response = await fetch(`/search?q=${encodeURIComponent(query)}&session_id=${encodeURIComponent(sessionId)}`);const data = await response.json();if (response.ok) {displaySearchResults(data);} else {throw new Error(data.error || "Search failed");}
}function displaySearchResults(data) {const resultsDiv = document.getElementById("searchResults");let html = `<h2>Found ${data.total} products for "${data.query}"</h2>`;data.results.forEach((product) => {html += `<ul><li><strong>${product.product_name}</strong></li><li>💰 $${product.price.toFixed(2)}</li><li>${product.description}</li>
</ul>`;});resultsDiv.innerHTML = html;
}

searchProducts() 函数将用户的查询发送到后端,并通过调用 displaySearchResults 函数更新结果区域中匹配的产品。

渲染视图和主方法

最后,在浏览器访问应用时渲染 HTML 页面并启动服务器。

@app.get("/")
async def get_main_page():return FileResponse("index.html")if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)

运行应用

使用 uvicorn 运行 FastAPI 应用。

uvicorn main:app --host 0.0.0.0 --port 8000

现在应用已上线!

INFO:     Started server process [61820]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

测试应用

访问 localhost:8000/ 渲染应用视图,并观察控制台的情况:

INFO:     127.0.0.1:53422 - "GET / HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 53425) - "WebSocket /ws" [accepted]
Client connected. Total connections: 1
INFO:     connection open

当视图被打开时,服务器会收到一个 WebSocket 连接。每打开一个新页面,都会增加一个连接。例如,如果你在三个不同的浏览器标签中打开页面,你将在控制台看到三个连接:

INFO:     ('127.0.0.1', 53503) - "WebSocket /ws" [accepted]
Client connected. Total connections: 2
INFO:     connection open
INFO:     ('127.0.0.1', 53511) - "WebSocket /ws" [accepted]
Client connected. Total connections: 3
INFO:     connection open

如果关闭一个标签,对应的连接也会关闭:

Client disconnected. Total connections: 2
INFO:     connection closed

当有多个活跃客户端连接时,如果一个用户搜索了某个产品,并且该搜索词在监控列表中,其他已连接的客户端将实时收到通知。

可选步骤是使用 Tailwind 应用一些样式。这可以改善 UI,使其看起来现代且视觉上更吸引人。完整的带有更新 UI 的代码可以在这里找到。

结论

在本文中,我们学习了如何使用 Elasticsearch 和 FastAPI 基于搜索创建实时通知。我们选择了一个固定的产品列表来发送通知,但你可以探索更多自定义流程,让用户选择自己想要接收通知的产品或查询,甚至使用 Elasticsearch 的 percolate 查询根据产品规格配置通知。

我们还尝试了一个接收通知的单用户池。使用 WebSockets,你可以选择向所有用户广播,或者选择特定用户。一个常见的模式是定义用户可以订阅的 “消息组”,就像群聊一样。

原文:Using FastAPI’s WebSockets and Elasticsearch to build a real-time app - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/94163.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/94163.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为云ModelArts+Dify AI:双剑合璧使能AI应用敏捷开发

引言:AI应用开发的敏捷化转型需求 随着大语言模型(LLM)技术的迅猛发展,企业与开发者对AI应用开发的敏捷化转型需求日益凸显,亟需将大模型能力快速转化为实际业务价值。传统AI开发模式中,复杂的模型工程化、流程编排和部署维护工作往往需要专业技术团队支撑,典型痛点包括…

网络实践——Socket编程UDP

文章目录Socket编程UDPUDP接口的使用铺垫socketrecvform && sendtobind字节序转化使用(Tips)实践部分version_1echo_serverversion_2dict_serverversion_3chat_serverSocket编程UDP 在了解了相关的网络基础知识后&#xff0c;我们不会像学系统知识一样&#xff0c;先学…

GD32 波形发生器,三角波,正弦波,方波。AD9833+MCP410生成和MCU自身的DAC生成。波形,频率,振幅可以通过按键和OLED调整输出。

DIY一个简易的信号发生器驱动板&#xff0c;主要是三角波和正弦波&#xff0c;方波。主板有两个通道能输出波形&#xff0c;CH0由AD9833MCP410AD8051放大电路组成&#xff0c;理论可以生成0.1-12.5MHZ的频率信号&#xff0c;单电源振幅范围是1-9V。CH1由MCU外设DAC生成的信号&a…

VS2022的MFC中关联使用控制台并用printf输出调试信息

前言 MFC一般在调试的时候&#xff0c;可以在IDE中方便的看到调试的信息。但是&#xff0c;有时候运行的时候也要看调试的信息怎么办&#xff1f;最好如同在Console&#xff08;控制台&#xff09;程序中输出一般的方便&#xff0c;可以么&#xff1f;可以的。 一、设置 1.1、加…

ZKmall模块商城的推荐数据体系:从多维度采集到高效存储的实践

在电商领域&#xff0c;个性化推荐已成为提升用户体验与转化效率的核心手段。ZKmall 模块商城基于用户行为、商品属性与交易数据&#xff0c;构建了一套完整的推荐算法体系&#xff0c;而数据采集的全面性与存储的高效性是该体系的基础。本文将聚焦推荐算法的 “数据输入端”&a…

Qt + windows+exe+msvc打包教程

目录 1. Qt + windows+exe+msvc打包教程1 1.1. Enigma Virtual Box下载⏬1 1.2. Enigma Virtual Box安装2 1.3. Qt 打包成独立exe教程6 1.3.1. Qt项目创建6 1.3.2. Qt项目编译13 1.3.3. Qt 项目打包 windeployqt命令14 1.3.4. Qt 项目打包 Enigma Virtual Box工具18 Q…

大语言模型应用开发——利用OpenAI函数与LangChain结合从文本构建知识图谱搭建RAG应用全流程

概述 从文本等非结构化数据中提取结构化信息并非新鲜事物&#xff0c;但大语言模型&#xff08;LLMs&#xff09;为该领域带来了重大变革。以往需要机器学习专家团队策划数据集并训练自定义模型&#xff0c;如今只需访问LLM即可实现&#xff0c;显著降低了技术门槛&#xff0c…

Vue3+Spring Boot技术栈,前端提交混合表单数据(普通字段+文件字段),上传文件,后端插入数据,将文件保存到数据库

一、技术栈1、前端 Vue3 Element Plus TypeSprict2、后端 Spring Boot 3.2.12 Mybatis Plus3、模型特点3.1、表格展示列表数据3.2、行点击&#xff0c;弹出对话框3.3、前端使用 FormData 提交混合表单数据&#xff0c;包含普通字段和文件字段3.4、文件对应数据库结构类型为 …

【Qt开发】Qt的背景介绍(四)

目录 1 -> Qt Hello World 程序 1.1 -> 使用“按钮”实现 1.1.1 -> 纯代码方式实现 1.1.2 -> 可视化操作实现 1.2 -> 使用“标签”实现 1.2.1 -> 纯代码方式实现 1.2.2 -> 可视化操作实现 2 -> 项目文件解析 2.1 -> .pro文件解析 2.2 -&g…

Linux驱动开发笔记(六)——pinctrl GPIO

开发板&#xff1a;imx6ull mini 虚拟机&#xff1a;VMware17 ubuntu&#xff1a;ubuntu20.04 视频&#xff1a;第8.1讲 pinctrl和gpio子系统试验-pincrl子系统详解_哔哩哔哩_bilibili 文档&#xff1a;《【正点原子】I.MX6U嵌入式Linux驱动开发指南.pdf》四十五章 这一章…

SpringBoot 快速上手:从环境搭建到 HelloWorld 实战

在 Java 开发领域&#xff0c;Spring 框架占据着举足轻重的地位&#xff0c;但它复杂的配置曾让不少开发者望而却步。SpringBoot 的出现&#xff0c;如同为 Spring 框架装上了 “加速器”&#xff0c;以 “约定大于配置” 的理念简化了开发流程。本文将从环境准备、Maven 配置入…

图、最小生成树与最短路径

目录 并查集 并查集实现 图 概念 图的存储结构 邻接矩阵 邻接表 无向图 有向图 图的遍历 广度优先遍历 深度优先遍历 最小生成树 Kruskal算法&#xff08;克鲁斯卡尔算法&#xff09; Prim算法&#xff08;普利姆算法&#xff09; 最短路径 单源最短路径--Dij…

互联网电商新生态:开源AI智能名片、链动2+1模式与S2B2C商城小程序的融合赋能

摘要&#xff1a;本文聚焦互联网电商领域&#xff0c;探讨在当下直播电商蓬勃发展的背景下&#xff0c;开源AI智能名片、链动21模式以及S2B2C商城小程序如何相互融合&#xff0c;为创业者、企业和淘宝主播IP等电商参与者带来新的发展机遇。通过分析各要素的特点与优势&#xff…

企业车辆|基于SprinBoot+vue的企业车辆管理系统(源码+数据库+文档)

企业车辆管理系统 基于SprinBootvue的企业车辆管理系统 一、前言 二、系统设计 三、系统功能设计 系统功能实现 后台模块实现 管理员模块实现 驾驶员模块实现 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博…

自学嵌入式第二十五天:数据结构-队列、树

一、队列队列是只允许一段进行插入&#xff0c;另一端进行删除操作的线性表&#xff1b;允许插入的一端叫队尾&#xff0c;允许删除的一端叫对头&#xff1b;先进先出&#xff1b;用于解决速度不匹配&#xff08;例如一快一慢&#xff09;&#xff0c;做缓冲用&#xff1b;二、…

MySQL索引原理与优化全解析

1、MySQL索引是什么&#xff1f; 在关系数据库中&#xff0c;索引是一种单独的、物理的对数据库表中一列或多列的值进行排序的一种存储结构&#xff0c;它是某个表中一列或若干列值的集合和相应的指向表中物理标志这些值的数据页的逻辑指针清单。索引的作用相当于图书的目录&a…

模型对话状态管理方法详解

模型对话状态管理方法详解 目录 简介手动管理对话状态构建对话历史追加响应内容 API 支持的自动化对话状态管理使用 previous_response_id 链接话轮 Token 及上下文窗口管理上下文窗口定义与限制Token 计数与工具 安全与合规注意事项结语1. 简介 在多轮对话场景中&#xff0c;合…

GPT-5 上线风波深度复盘:从口碑两极到策略调整,OpenAI 的变与不变

摘要&#xff1a; 近日&#xff0c;备受瞩目的 GPT-5 正式上线&#xff0c;却意外地在社区引发了两极化争议。面对技术故障与用户质疑&#xff0c;OpenAI 迅速推出一系列补救措施。本文将深度复盘此次发布风波&#xff0c;解析其背后的技术挑战与应对策略&#xff0c;并探讨这一…

【Android】使用FragmentManager动态添加片段

三三要成为安卓糕手 上一篇文章&#xff0c;我们是在xml中静态添加fragment&#xff0c;但是一些修改或者其他事情是做不了的&#xff1b; 本章我们达成在java代码中灵活添加、删除、替换fragment操作 一&#xff1a;核心代码展示 简单做一个这种页面public class FragmentActi…

MiniOB环境部署开发(使用开源学堂)

整体思路&#xff1a; 1.使用开源学堂在线编程环境开发MiniOB编译环境 2.使用vscode进行代码调试和开发以及上传到仓库 MiniOB源码&#xff1a;https://github.com/oceanbase/miniob MiniOB文档&#xff1a;MiniOB 介绍 - MiniOB 数据库大赛官网&#xff1a;OceanBase 社区…