一个简单的分布式追踪系统

1. 准备工作

导入必要的库

import contextvars
import time
from typing import Any, Optional, Dict, List, Union
from dataclasses import dataclass, field

2. 定义上下文变量

# 定义两个上下文变量,存储当前 Span 和 Trace
_current_span: contextvars.ContextVar[Optional["Span"]] = contextvars.ContextVar("current_span", default=None
)_current_trace: contextvars.ContextVar[Optional["Trace"]] = contextvars.ContextVar("current_trace", default=None
)

3. 数据模型定义

3.1 SpanContext 类

@dataclass
class SpanContext:"""Span 的上下文信息(用于跨进程传递)"""trace_id: strspan_id: stris_remote: bool = False

3.2 Span 类

@dataclass
class Span:"""表示一个操作的时间段追踪"""name: strcontext: SpanContextparent: Optional["Span"] = Nonestart_time: float = field(default_factory=time.time)end_time: Optional[float] = Noneattributes: Dict[str, Any] = field(default_factory=dict)events: List[Dict[str, Any]] = field(default_factory=list)status: str = "UNSET"def end(self, status: str = "OK") -> None:"""结束 Span 并记录状态"""self.end_time = time.time()self.status = statusdef add_event(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> None:"""添加事件到 Span"""self.events.append({"name": name,"timestamp": time.time(),"attributes": attributes or {}})def __enter__(self) -> "Span":"""支持 with 语句"""return selfdef __exit__(self, exc_type, exc_val, exc_tb) -> None:"""自动结束 Span"""self.end("ERROR" if exc_type else "OK")

3.3 Trace 类

@dataclass
class Trace:"""完整的追踪链"""root_span: Spanspans: List[Span] = field(default_factory=list)def add_span(self, span: Span) -> None:"""添加 Span 到 Trace"""self.spans.append(span)

4. 追踪 API 实现

4.1 辅助函数

def generate_id() -> str:"""生成追踪ID(简化版)"""return f"id-{int(time.time() * 1000)}"def get_current_span() -> Optional[Span]:"""获取当前 Span"""return _current_span.get()def get_current_trace() -> Optional[Trace]:"""获取当前 Trace"""return _current_trace.get()

4.2 核心函数

def start_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Span:"""创建并激活一个新 Span:param name: Span 名称:param attributes: 附加属性:return: 新创建的 Span"""parent = get_current_span()context = SpanContext(trace_id=parent.context.trace_id if parent else generate_id(),span_id=generate_id())span = Span(name=name, context=context, parent=parent)if attributes:span.attributes.update(attributes)# 设置当前 Span_current_span.set(span)# 如果是根 Span,则创建 Traceif parent is None:trace = Trace(root_span=span)_current_trace.set(trace)else:trace = get_current_trace()if trace:trace.add_span(span)return spandef end_span(status: str = "OK") -> None:"""结束当前 Span 并返回父 Span"""current = get_current_span()if current:current.end(status)_current_span.set(current.parent)

5. 数据导出器

class ConsoleExporter:"""将追踪数据打印到控制台"""@staticmethoddef export(trace: Trace) -> None:print("\n=== Exporting Trace ===")print(f"Trace ID: {trace.root_span.context.trace_id}")for span in trace.spans:duration = (span.end_time or time.time()) - span.start_timeprint(f"Span: {span.name} ({duration:.3f}s), Status: {span.status}")

6. 使用示例

6.1 同步代码示例

# 示例 1: 同步代码
with start_span("main_operation", {"type": "sync"}):# 当前 Span 是 "main_operation"with start_span("child_operation"):# 当前 Span 是 "child_operation"get_current_span().add_event("processing_start")time.sleep(0.1)get_current_span().add_event("processing_end")# 手动创建 Spanspan = start_span("manual_span")time.sleep(0.05)span.end()# 导出追踪数据
if trace := get_current_trace():ConsoleExporter.export(trace)

=== Exporting Trace ===
Trace ID: id-1751643441896
Span: main_operation (0.152s), Status: OK
Span: child_operation (0.101s), Status: OK
Span: manual_span (0.050s), Status: OK

6.2 异步代码示例(可选)

import asyncioasync def async_task():with start_span("async_operation"):print(f"Current span: {get_current_span().name}")await asyncio.sleep(0.1)async def main():tasks = [async_task() for _ in range(3)]await asyncio.gather(*tasks)# 运行异步示例
asyncio.run(main())

Current span: async_operation
Current span: async_operation
Current span: async_operation

7. 可视化追踪数据(可选)

import matplotlib.pyplot as pltdef visualize_trace(trace: Trace):fig, ax = plt.subplots(figsize=(10, 6))for i, span in enumerate(trace.spans):duration = (span.end_time or time.time()) - span.start_timeax.barh(span.name, duration, left=span.start_time, alpha=0.6)ax.text(span.start_time, i, f"{duration:.3f}s", va='center')ax.set_xlabel('Time')ax.set_title('Trace Visualization')plt.show()if trace := get_current_trace():visualize_trace(trace)

在这里插入图片描述
代码:https://github.com/zhouruiliangxian/Awesome-demo/blob/main/Distributed-Tracing/%E7%AE%80%E6%98%93%E5%88%86%E5%B8%83%E5%BC%8F%E8%BF%BD%E8%B8%AA%E7%B3%BB%E7%BB%9F.ipynb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913114.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913114.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Qt】事件处理、事件分发器、事件过滤器

事件处理 一. 事件事件处理鼠标事件处理按键事件处理定时器事件处理窗口事件处理 二. 事件分发器三. 事件过滤器 虽然 Qt 是跨平台的 C 开发框架,Qt 的很多能力其实是操作系统提供的,只不过 Qt 封装了系统 API,程序是运行在操作系统上的&…

广东省省考备考(第三十八天7.4)——言语理解:逻辑填空(题目训练)

错题解析 本题可从第二空入手,横线处搭配“理论”,且根据“使得”可知,横线处与前文构成因果关系,即“遗传学的空白和古生物证据的缺乏”导致他的理论在某些方面存在不足,A项“捉襟见肘”指拉一拉衣襟,就露…

5G网络切片技术

5G中的网络切片技术是一种通过虚拟化将单一物理网络划分为多个独立、可定制的虚拟网络的技术,旨在满足不同应用场景对网络性能、带宽、时延等需求的差异化要求。以下从技术原理、核心价值、应用场景、实现方式及未来趋势五个维度展开分析:一、技术原理&a…

算法学习笔记:7.Dijkstra 算法——从原理到实战,涵盖 LeetCode 与考研 408 例题

在计算机科学领域,图论算法一直占据着重要地位,其中 Dijkstra 算法作为求解单源最短路径问题的经典算法,被广泛应用于路径规划、网络路由等多个场景。无论是算法竞赛、实际项目开发,还是计算机考研 408 的备考,Dijkstr…

汇编 函数调用栈

前言 网上很多对函数栈的解释,说的不是很清楚感觉,尤其是对到底是谁的栈,以及指令的微小但是很致命的细节没说,特写本文,一是帮助自己记忆,二是为了帮助大家,如有疏忽错误请指正。 核心概念 首先…

基于Apache MINA SSHD配置及应用

Apache MINA SSHD 是一个基于 Java 的 SSH 服务器和客户端实现,它是 Apache MINA 项目的一部分,提供了完整的 SSH 协议支持。 主要特性 SSH 协议支持: 支持 SSH2 协议 兼容大多数 SSH 客户端 支持多种加密算法和密钥交换方法 服务器功能…

Excel 如何让数据自动按要求排序或筛选?

让数据按要求排序和筛选是Excel数据处理的基础核心功能,也是进行有效分析前必做的准备工作。下面我们分开讲解这两个功能。 一、排序 (Sort):让数据井井有条 排序的目的是重新排列数据行的顺序,以便更好地观察和比较。 1. 快速单列排序 (最…

Django 安装使用教程

一、Django 简介 Django 是一个高级 Python Web 框架,鼓励快速开发和简洁实用的设计。它内置 ORM、认证系统、后台管理、表单处理、路由控制等功能,广泛用于开发企业级网站、内容管理系统、电商平台等。 二、环境准备 2.1 安装 Python Django 基于 Py…

前沿交叉:Fluent与深度学习驱动的流体力学计算体系

基础模块 流体力学方程求解 1、不可压缩N-S方程数值解法(有限差分/有限元/伪谱法) Fluent工业级应用:稳态/瞬态流、两相流仿真(圆柱绕流、入水问题) Tecplot流场可视化与数据导出 2、CFD数据的AI预处理 基于P…

五、Flutter动画

目录1. Flutter 中动画的基本概念是什么?2. 解释 AnimationController 和 Tween 的作用3. 如何实现一个补间(Tween)动画?4. 什么是隐式动画?举例说明5. 如何实现自定义复杂动画?1. Flutter 中动画的基本概念…

全网唯一/Qt结合ffmpeg实现手机端采集摄像头推流到rtsp或rtmp/可切换前置后置摄像头/指定分辨率帧率

一、前言说明 之前已经实现了Qt结合ffmpeg在安卓上运行,所有在win上的功能,在安卓上都已经实现,比如编码保存到MP4文件,正常解码音视频文件播放等,唯独还差一个功能,尽管用的不多,但是还是有一…

Install Ubuntu 24.04 System

1.制作安装镜像盘(U盘) 下载rufus制作工具(网址:https://www.xiaomoxz.com/nexus/bi1/rufus4.shtml?bd_vid8643969197265870719) 2. 设置U盘启动: F2进入BIOS 3. Install Ubuntu 24.04 Ubuntu下载地址:…

solidjs 处理复杂类型的响应式

solidjs 处理复杂类型的响应式 在 solidjs 里响应式一般直接用 createSignal 就可以,但 createSignal 一般用于基础数据类型。 虽然复杂类型也是可以使用,但基于起细粒度响应性的特性。 一般复杂的数据使用 createSignal 就不是那么友好了。 所以 cre…

爬虫技术-获取浏览器身份认证信息(如 Cookie、Token、Session 等)

方法一:通过浏览器开发者工具查看和提取 Cookie / Token 📌 示例场景: 你在使用一个网站时已经登录了,想看看这个网站是如何保存你的身份凭证的。 🔧 操作过程: 打开浏览器(例如 Chrome&#xf…

[密码学实战]GMT 0136-2024《密码应用HTTP接口规范》解析

[密码学实战]GM/T 0136-2024《密码应用HTTP接口规范》解析国家密码管理局于2025年7月1日正式实施GM/T 0136-2024标准,该规范首次统一了密码服务的HTTP接口设计,为国产密码技术的规模化应用铺平道路。本文结合标准原文,深入剖析其技术细节并给…

Docker 国内镜像列表(免费长期)

Docker 可用镜像源列表(7月1日更新-长期维护)_dockerhub国内镜像源列表-CSDN博客

BlenderFBXExporter 导出fbx被修改问题

1) 解决增加A节点的问题 https://github.com/A-Ribeiro/CustomBlenderFBXExporter 2)找出blendshape 不一致,生成blendshape key name映射map 文件compare.txt C:\Users\49938\Documents\DazToUnreal\zhang01\UpdatedFBX\zhang01_fix7.fbx…

AI时代下的IT服务管理转型:趋势、挑战与破局之道

近年来,人工智能(AI)与自动化技术的迅猛发展,正以前所未有的速度重塑企业运营的各个层面。特别是在IT服务管理(ITSM)领域,AI的介入不仅提高了问题响应效率,也推动了组织从“被动响应…

三体融合实战:Django+讯飞星火+Colossal-AI的企业级AI系统架构

目录 技术栈关键词:Django 5.0 讯飞星火4.0Ultra Colossal-AI 1.2 WebSocket 联邦学习 ⚡ 核心架构设计 🛠️ 一、Django深度集成讯飞星火API(免费版) 1. 获取API凭证 2. 流式通信改造(解决高并发阻塞&#xff09…

多模态数据融合预警:从IoT传感器到卫星监测的可视化方案升级

你有没有想过,为什么有些城市在暴雨来临时能提前数小时发布内涝预警,而有些地方却只能“等水来了才反应”? 背后的关键,就是多模态数据融合预警系统——它把来自IoT传感器、无人机、地面雷达、气象站、甚至卫星的数据整合在一起&a…