在 Dify 项目中的 Celery:异步任务的实现与集成

Celery 是一个强大而灵活的分布式任务队列系统,旨在帮助应用程序在后台异步运行耗时的任务,提高系统的响应速度和性能。在 Dify 项目中,Celery 被广泛用于处理异步任务和定时任务,并与其他工具(如 Sentry、OpenTelemetry)集成,实现了任务的监控和追踪。

本文将详细介绍 Celery 在 Dify 项目中的应用,包括其集成方式、使用方法,以及涉及的关键文件和代码片段。


概述

Dify 项目采用 Flask 作为 Web 框架,为了提升系统性能和用户体验,引入了 Celery 来处理耗时的后台任务。通过将任务分配到不同的队列,并使用 Celery 的 Worker 进行异步执行,Dify 实现了任务的解耦和并发处理。同时,项目还集成了 SentryOpenTelemetry,对任务执行进行实时监控和性能追踪。


Celery 的集成与配置

1. 应用工厂 app_factory.py

文件路径:

  • ./api/app_factory.py

内容分析:

# 将 Celery 扩展导入应用工厂
ext_celery,

在 Flask 应用工厂中,Celery 被作为扩展(ext_celery)引入。这意味着在创建 Flask 应用实例时,Celery 也会被初始化并与应用集成。这种方式常用于 Flask 框架,便于统一管理应用的各个部分。


2. Celery 扩展 ext_celery.py

文件路径:

  • ./api/extensions/ext_celery.py

内容分析:

from celery import Celery, Task
from celery.schedules import crontabdef init_celery_app(app):celery_app = Celery(app.import_name)# 更新 Celery 配置celery_app.conf.update(broker_url=app.config['CELERY_BROKER_URL'],result_backend=app.config['CELERY_RESULT_BACKEND'],task_serializer='json',accept_content=['json'],timezone='UTC',enable_utc=True,)# 注册定时任务和导入任务模块celery_app.conf.update(beat_schedule=beat_schedule,imports=imports,)# 将 Celery 实例注册到 Flask 应用app.extensions["celery"] = celery_appreturn celery_app

解释:

  • 初始化 Celery 应用实例:使用 Celery(app.import_name) 创建 Celery 实例。
  • 配置更新:通过 celery_app.conf.update() 设置消息代理、结果后端、任务序列化等配置。
  • 注册到应用扩展:将 Celery 实例添加到 Flask 应用的扩展中,便于全局访问和使用。

3. 配置文件 .envpyproject.toml

文件路径:

  • ./docker/.env
  • ./api/pyproject.toml

内容分析:

# .env 配置
# 使用 Redis 作为 Celery 的消息代理,数据库索引为 1
CELERY_BROKER_URL=redis://localhost:6379/1
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# pyproject.toml 中的依赖
"celery~=5.5.2",
"opentelemetry-instrumentation-celery==0.48b0",

解释:

  • 消息代理与结果后端:在 .env 文件中,指定了 Celery 使用 Redis 作为消息代理和结果后端。
  • 依赖管理:在 pyproject.toml 中,明确了项目对 Celery 及其相关监控工具的依赖,确保了环境的一致性。

任务的定义与处理

1. 任务定义文件 tasks/

文件路径:

  • ./api/tasks/

内容分析:

from celery import shared_task@shared_task
def some_task(args):# 任务的具体实现pass

解释:

  • 使用 shared_task 装饰器:在任务模块中,使用 @shared_task 装饰器定义任务。这种方式无需直接引用 Celery 应用实例,避免了循环导入的问题,提高了模块的独立性和可重用性。
  • 任务类型:任务包括数据处理、邮件发送、操作追踪等,满足项目的多种业务需求。

2. 定时任务 schedule/

文件路径:

  • ./api/schedule/

内容分析:

@app.celery.task(queue="dataset")
def scheduled_task():# 定时任务的逻辑实现pass

解释:

  • 使用 @app.celery.task 装饰器:在定时任务中,直接引用了 Celery 应用实例,便于指定任务的队列和其他配置。
  • 任务调度:定时任务通过 Celery Beat 进行调度,按照预定义的时间间隔自动执行。

3. 操作追踪管理器 ops_trace_manager.py

文件路径:

  • ./api/core/ops/ops_trace_manager.py

内容分析:

def send_to_celery(self, tasks: list[TraceTask]):# 将任务发送到 Celery 队列进行异步处理pass

解释:

  • 异步处理任务:定义了方法,将追踪任务发送到 Celery,利用其异步处理能力,提高系统的性能。

Celery 的启动与运行

1. 启动脚本 entrypoint.sh

文件路径:

  • ./api/docker/entrypoint.sh

内容分析:

# 启动 Celery Worker
exec celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \--loglevel ${LOG_LEVEL:-INFO} --queues ${CELERY_QUEUES:-default}# 启动 Celery Beat(定时任务调度器)
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}

解释:

  • 启动 Celery Worker:使用 celery -A app.celery worker 命令,指定应用实例为 app.celery,并使用 gevent 并发池,提高异步任务的执行效率。
  • 启动 Celery Beat:使用 celery -A app.celery beat 命令,启动定时任务调度器,按计划执行定时任务。

2. 启动命令示例 README.md

文件路径:

  • ./api/README.md

内容分析:

uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion

解释:

  • 指定任务队列:使用 -Q 参数,指定 Worker 监听的任务队列,如 datasetgenerationmail 等,实现任务的分类处理和资源优化。
  • 设置并发和日志-P gevent 指定并发池,-c 1 设置并发数量,--loglevel INFO 设置日志级别为 INFO,便于监控任务执行情况。

与其他工具的集成

1. Sentry 集成 ext_sentry.py

文件路径:

  • ./api/extensions/ext_sentry.py

内容分析:

from sentry_sdk.integrations.celery import CeleryIntegrationdef init_sentry():sentry_sdk.init(dsn="your_sentry_dsn",integrations=[CeleryIntegration()],# 其他配置)

解释:

  • 错误监控:通过集成 Sentry,Celery 任务执行中的异常将被捕获并发送到 Sentry,方便开发者及时发现和解决问题。

2. OpenTelemetry 集成 ext_otel.py

文件路径:

  • ./api/extensions/ext_otel.py

内容分析:

from opentelemetry.instrumentation.celery import CeleryInstrumentordef init_tracing():# 判断是否为 Celery Worker 进程if not is_celery_worker():CeleryInstrumentor().instrument()else:# 在 Celery Worker 初始化时进行 Instrumentationworker_init.connect(init_celery_worker)def init_celery_worker(*args, **kwargs):CeleryInstrumentor().instrument()

解释:

  • 性能监控:通过 OpenTelemetry,对 Celery 任务的执行进行性能监控和分布式追踪,助力分析系统的瓶颈和优化方向。

数据库支持与依赖管理

1. 数据库迁移 64b051264f32_init.py

文件路径:

  • ./api/migrations/versions/64b051264f32_init.py

内容分析:

op.create_table('celery_taskmeta', ...)
op.create_table('celery_tasksetmeta', ...)# 删除表
op.drop_table('celery_tasksetmeta')
op.drop_table('celery_taskmeta')

解释:

  • 任务状态存储:通过 Alembic 迁移,创建 Celery 用于存储任务元数据和结果的数据库表,实现任务状态的持久化管理。

2. 依赖管理 uv.lock

文件路径:

  • ./api/uv.lock

内容分析:

name = "celery"
version = "5.5.2"
...name = "opentelemetry-instrumentation-celery"
version = "0.48b0"
...

解释:

  • 锁定依赖uv.lock 文件记录了项目的依赖库和版本信息,确保了在不同环境下安装一致的依赖,防止版本冲突。

开发与调试支持

1. VSCode 调试配置 launch.json.example

文件路径:

  • ./api/.vscode/launch.json.example

内容分析:

{"name": "Celery Worker","type": "python","request": "launch","module": "celery","args": ["worker","-A","app.celery","--loglevel=INFO"],"console": "integratedTerminal"
}

解释:

  • 调试支持:提供了 VSCode 的调试配置,方便开发人员在 IDE 中对 Celery Worker 进行调试和测试。

2. 开发脚本 start-worker

文件路径:

  • ./dev/start-worker

内容分析:

#!/bin/bash
# 启动开发环境下的 Celery Worker
celery -A app.celery worker --loglevel=INFO

解释:

  • 快速启动:提供了脚本,简化了开发环境下 Celery Worker 的启动命令,提高了开发效率。

实践建议

  • 任务队列划分:根据任务的性质和资源需求,将任务分配到不同的队列,合理配置 Worker,提高系统的性能和可靠性。
  • 监控与日志:利用 Sentry 和 OpenTelemetry,对任务的执行状态和性能进行实时监控,及时发现潜在问题。
  • 资源优化:根据任务类型(I/O 密集型或 CPU 密集型),选择合适的并发模式(如 geventeventletprefork),优化资源利用率。
  • 依赖管理:通过 pyproject.tomluv.lock 等文件,明确项目的依赖版本,确保环境的一致性。
  • 安全性考虑:在配置文件和环境变量中,注意保护敏感信息,如数据库连接字符串和 Sentry 的 DSN,避免泄漏。

总结

Celery 在 Dify 项目中扮演了关键角色,通过处理异步任务和定时任务,提升了系统的性能和用户体验。通过与 Flask 应用的深度集成,以及与 Sentry、OpenTelemetry 等工具的结合,Dify 实现了对任务的高效管理和监控。

通过对 Celery 在项目中的集成方式、任务定义、启动运行和监控手段的了解,我们可以更好地理解其运作原理,并在实际开发中应用这些经验,提高系统的稳健性和可维护性。


参考资料:

  • Celery 官方文档
  • Flask 与 Celery 的集成
  • Sentry 对 Celery 的支持
  • OpenTelemetry 对 Celery 的支持

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/908053.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pytorch Geometric官方例程pytorch_geometric/examples/link_pred.py环境安装教程及图数据集制作

最近需要训练图卷积神经网络(Graph Convolution Neural Network, GCNN),在配置GCNN环境上总结了一些经验。 我觉得对于初学者而言,图神经网络的训练会有2个难点: ①环境配置 ②数据集制作 一、环境配置 我最初光想…

2025年微信小程序开发:AR/VR与电商的最新案例

引言 微信小程序自2017年推出以来,已成为中国移动互联网生态的核心组成部分。根据最新数据,截至2025年,微信小程序的日活跃用户超过4.5亿,总数超过430万,覆盖电商、社交、线下服务等多个领域(WeChat Mini …

互联网向左,区块链向右

2008年,中本聪首次提出了比特币的设想,这打开了去中心化的大门。 比特币白皮书清晰的描述了去中心化支付的解决方案,并分别从以下几个方面阐述了他的理念: 一、由转账双方点对点的通讯,而不通过中心化的第三方&#xf…

PV操作的C++代码示例讲解

文章目录 一、PV操作基本概念(一)信号量(二)P操作(三)V操作 二、PV操作的意义三、C中实现PV操作的方法(一)使用信号量实现PV操作代码解释: (二)使…

《对象创建的秘密:Java 内存布局、逃逸分析与 TLAB 优化详解》

大家好呀!今天我们来聊聊Java世界里那些"看不见摸不着"但又超级重要的东西——对象在内存里是怎么"住"的,以及JVM这个"超级管家"是怎么帮我们优化管理的。放心,我会用最接地气的方式讲解,保证连小学…

简单实现Ajax基础应用

Ajax不是一种技术,而是一个编程概念。HTML 和 CSS 可以组合使用来标记和设置信息样式。JavaScript 可以修改网页以动态显示,并允许用户与新信息进行交互。内置的 XMLHttpRequest 对象用于在网页上执行 Ajax,允许网站将内容加载到屏幕上而无需…

详解开漏输出和推挽输出

开漏输出和推挽输出 以上是 GPIO 配置为输出时的内部示意图,我们要关注的其实就是这两个 MOS 管的开关状态,可以组合出四种状态: 两个 MOS 管都关闭时,输出处于一个浮空状态,此时他对其他点的电阻是无穷大的&#xff…

Matlab实现LSTM-SVM回归预测,作者:机器学习之心

Matlab实现LSTM-SVM回归预测,作者:机器学习之心 目录 Matlab实现LSTM-SVM回归预测,作者:机器学习之心效果一览基本介绍程序设计参考资料 效果一览 基本介绍 代码主要功能 该代码实现了一个LSTM-SVM回归预测模型,核心流…

Leetcode - 周赛 452

目录 一,3566. 等积子集的划分方案二,3567. 子矩阵的最小绝对差三,3568. 清理教室的最少移动四,3569. 分割数组后不同质数的最大数目 一,3566. 等积子集的划分方案 题目列表 本题有两种做法,dfs 选或不选…

【FAQ】HarmonyOS SDK 闭源开放能力 —Account Kit(5)

1.问题描述: 集成华为一键登录的LoginWithHuaweiIDButton, 但是Button默认名字叫 “华为账号一键登录”,太长无法显示,能否简写成“一键登录”与其他端一致? 解决方案: 问题分两个场景: 一、…

Asp.Net Core SignalR的分布式部署

文章目录 前言一、核心二、解决方案架构三、实现方案1.使用 Azure SignalR Service2.Redis Backplane(Redis 背板方案)3.负载均衡配置粘性会话要求无粘性会话方案(仅WebSockets)完整部署示例(Redis Docker)性能优化技…

L2-054 三点共线 - java

L2-054 三点共线 语言时间限制内存限制代码长度限制栈限制Java (javac)2600 ms512 MB16KB8192 KBPython (python3)2000 ms256 MB16KB8192 KB其他编译器2000 ms64 MB16KB8192 KB 题目描述: 给定平面上 n n n 个点的坐标 ( x _ i , y _ i ) ( i 1 , ⋯ , n ) (x\_i…

【 java 基础知识 第一篇 】

目录 1.概念 1.1.java的特定有哪些? 1.2.java有哪些优势哪些劣势? 1.3.java为什么可以跨平台? 1.4JVM,JDK,JRE它们有什么区别? 1.5.编译型语言与解释型语言的区别? 2.数据类型 2.1.long与int类型可以互转吗&…

高效背诵英语四级范文

以下是结合认知科学和实战验证的 ​​高效背诵英语作文五步法​​,助你在30分钟内牢固记忆一篇作文,特别适配考前冲刺场景: 📝 ​​一、解构作文(5分钟)​​ ​​拆解逻辑框架​​ 用荧光笔标出&#xff…

RHEL7安装教程

RHEL7安装教程 下载RHEL7镜像 通过网盘分享的文件:RHEL 7.zip 链接: https://pan.baidu.com/s/1ExLhdJigj-tcrHJxIca5XA?pwdjrrj 提取码: jrrj --来自百度网盘超级会员v6的分享安装 1.打开VMware,新建虚拟机,选择自定义然后下一步 2.点击…

结构型设计模式之Decorator(装饰器)

结构型设计模式之Decorator(装饰器) 前言: 本案例通过李四举例,不改变源代码的情况下 对“才艺”进行增强。 摘要: 摘要: 装饰器模式是一种结构型设计模式,允许动态地为对象添加功能而不改变其…

Kotlin委托机制使用方式和原理

目录 类委托属性委托简单的实现属性委托Kotlin标准库中提供的几个委托延迟属性LazyLazy委托参数可观察属性Observable委托vetoable委托属性储存在Map中 实践方式双击back退出Fragment/Activity传参ViewBinding和委托 类委托 类委托有点类似于Java中的代理模式 interface Base…

SpringBoot接入Kimi实践记录轻松上手

kimi简单使用 什么是Kimi API 官网:https://platform.moonshot.cn/ Kimi API 并不是一个我所熟知的广泛通用的术语。我的推测是,你可能想问的是关于 API 的一些基础知识。API(Application Programming Interface,应用程序编程接…

书籍在其他数都出现k次的数组中找到只出现一次的数(7)0603

题目 给定一个整型数组arr和一个大于1的整数k。已知arr中只有1个数出现了1次,其他的数都出现了k次,请返回只出现了1次的数。 解答: 对此题进行思路转换,可以将此题,转换成k进制数。 k进制的两个数c和d,…

React 项目初始化与搭建指南

React 项目初始化有多种方式,可以选择已有的脚手架工具快速创建项目,也可以自定义项目结构并使用构建工具实现项目的构建打包流程。 1. 脚手架方案 1.1. Vite 通过 Vite 创建 React 项目非常简单,只需一行命令即可完成。Vite 的工程初始化…