Celery 核心概念详解及示例

Celery 核心概念详解及示例

Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,提供对任务队列的操作,并支持任务的调度和异步执行。它常用于深度优化 Web 应用的性能和响应速度,通过将耗时的操作移到后台异步执行,使主程序不被阻塞。

本文将统一介绍 Celery 中的核心概念,并通过示例进行说明,帮助您深入理解并应用 Celery 来构建高性能的异步任务处理系统。


目录

  • Celery 核心概念详解及示例
    • 任务(Task)
      • 示例
    • 工作者(Worker)
      • 启动 Worker
    • 消息代理(Broker)
      • 配置示例
    • 客户端(Client)
      • 任务调用示例
    • 结果后端(Result Backend)
      • 配置示例
      • 获取任务结果
    • 任务队列(Queue)
      • 定义队列中的任务
      • 启动 Worker 监听指定队列
    • 任务路由(Task Routing)
      • 配置路由
    • 并发(Concurrency)
      • 启动 Worker 指定并发方式
    • 定时任务(Periodic Tasks)
      • 配置定时任务
      • 启动 Celery Beat
    • 任务重试与超时(Task Retry and Timeout)
      • 设置任务重试
      • 设置任务超时
    • 任务组与工作流(Task Groups and Workflows)
      • 组任务(Group)
      • 链任务(Chain)
    • 示例项目结构
      • tasks.py
      • app.py
      • worker.sh
    • 总结
    • 参考资料


任务(Task)

任务(Task) 是 Celery 的基本执行单元,表示要执行的具体函数或操作。在 Celery 中,任务通常由装饰器 @task@shared_task 声明。

示例

# tasks.py
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
  • 说明
    • 使用 @app.task 装饰器将函数 add 声明为一个 Celery 任务。
    • 任务可以被工作者执行,并可以异步调用。

工作者(Worker)

工作者(Worker) 是实际执行任务的进程或节点。工作者从消息代理中获取任务,执行任务函数,并返回结果。

启动 Worker

celery -A tasks worker --loglevel=INFO
  • 说明
    • -A tasks:指定 Celery 应用实例的位置。
    • worker:启动一个工作者进程。
    • --loglevel=INFO:设置日志级别。

消息代理(Broker)

消息代理(Broker) 是任务队列的中介,负责接收来自客户端的任务,并将任务分发给工作者。常用的消息代理包括 RedisRabbitMQAmazon SQS 等。

配置示例

# 使用 Redis 作为消息代理
app = Celery('tasks', broker='redis://localhost:6379/0')
  • 说明
    • broker 参数指定消息代理的连接 URL。

客户端(Client)

客户端(Client) 是发送任务到消息代理的应用程序。它可以是 Web 应用、脚本等,使用 Celery 的 API 将任务异步地发送到 Broker。

任务调用示例

# 调用加法任务
from tasks import addresult = add.delay(4, 6)
  • 说明
    • 使用 delay() 方法将任务发送到消息代理,立即返回一个 AsyncResult 对象。
    • 任务将在后台执行,客户端无需等待任务完成。

结果后端(Result Backend)

结果后端(Result Backend) 用于存储任务执行的结果,允许客户端在任务执行完成后获取结果。常用的结果后端包括 RedisRabbitMQ数据库 等。

配置示例

# 使用 Redis 作为结果后端
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

获取任务结果

# 获取任务结果
result = add.delay(4, 6)
print(result.get(timeout=10))  # 输出 10
  • 说明
    • result.get(timeout=10):等待任务完成并获取结果,超时设置为 10 秒。

任务队列(Queue)

任务队列(Queue) 是存储任务的队列,允许将任务分类、路由和分配。队列的使用可以实现任务的隔离和资源的优化。

定义队列中的任务

@app.task(queue='priority_high')
def high_priority_task():pass

启动 Worker 监听指定队列

celery -A tasks worker -Q priority_high --loglevel=INFO
  • 说明
    • -Q priority_high:指定 Worker 只监听名为 priority_high 的队列。

任务路由(Task Routing)

任务路由(Task Routing) 用于将任务分配到特定的队列,可以在任务定义、配置文件或运行时指定。

配置路由

# 配置路由规则
app.conf.task_routes = {'tasks.add': {'queue': 'math_queue'},'tasks.mul': {'queue': 'math_queue'},'tasks.email': {'queue': 'email_queue'},
}
  • 说明
    • addmul 任务路由到 math_queue 队列,email 任务路由到 email_queue 队列。

并发(Concurrency)

Celery 支持多种并发方式,包括多进程、多线程和协程。常用的并发池有 prefork(默认,多进程)、eventlet(协程)、gevent(协程)。

启动 Worker 指定并发方式

# 使用多进程并发,设置并发数为 4
celery -A tasks worker --concurrency=4# 使用 gevent 并发
celery -A tasks worker -P gevent --concurrency=1000
  • 说明
    • --concurrency=4:设置并发的工作进程数。
    • -P gevent:指定并发池为 gevent,适用于 I/O 密集型任务。

定时任务(Periodic Tasks)

定时任务(Periodic Tasks) 是指按照预设的时间间隔或特定时间点自动执行的任务。Celery 提供了 Celery Beat 调度器来管理定时任务。

配置定时任务

from celery.schedules import crontabapp.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},'multiply-at-midnight': {'task': 'tasks.mul','schedule': crontab(hour=0, minute=0),'args': (4, 5),},
}

启动 Celery Beat

celery -A tasks beat --loglevel=INFO
  • 说明
    • Celery Beat 会按照配置的计划周期性地发送任务到 Broker。

任务重试与超时(Task Retry and Timeout)

任务在执行过程中可能失败,Celery 提供了任务重试和超时机制,增强任务的可靠性。

设置任务重试

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def unstable_task(self, *args):try:# 可能出现异常的代码passexcept Exception as exc:# 任务失败后重试raise self.retry(exc=exc)
  • 说明
    • max_retries=3:最大重试次数为 3 次。
    • default_retry_delay=60:每次重试之间的间隔为 60 秒。
    • self.retry(exc=exc):任务失败时抛出 retry 异常,实现重试。

设置任务超时

@app.task(time_limit=300, soft_time_limit=200)
def long_running_task():pass
  • 说明
    • time_limit=300:任务的硬超时时间为 300 秒,超时后工作进程会被杀死。
    • soft_time_limit=200:任务的软超时时间为 200 秒,超时后会抛出 SoftTimeLimitExceeded 异常,可以在任务中捕获并处理。

任务组与工作流(Task Groups and Workflows)

Celery 支持任务的组合执行,包括组任务(Group Tasks)链任务(Chain Tasks)回调(Callbacks),用于构建复杂的工作流。

组任务(Group)

同时执行一组独立的任务,所有任务完成后返回结果列表。

from celery import group# 定义任务组
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()# 获取所有任务的结果
print(result.get())
  • 说明
    • add.s(i, i):生成任务的签名。
    • group():创建任务组。
    • apply_async():异步执行任务组。

链任务(Chain)

顺序执行一系列任务,每个任务的结果作为下一个任务的输入。

from celery import chain# 定义任务链
job = chain(add.s(2, 2) | mul.s(4))result = job.apply_async()# 获取最终结果
print(result.get())  # 输出 (2 + 2) * 4 = 16
  • 说明
    • | 运算符连接任务,表示顺序执行。

示例项目结构

一个简单的 Celery 项目通常包含以下结构:

project/
│
├── tasks.py          # 定义任务
├── app.py            # 定义 Celery 应用
└── worker.sh         # 启动 Worker 的脚本

tasks.py

from app import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y

app.py

from celery import Celeryapp = Celery('project',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')# 可在此处配置任务路由、定时任务等

worker.sh

#!/bin/bash
celery -A app worker --loglevel=INFO

总结

Celery 通过引入 任务工作者消息代理结果后端任务队列 等核心概念,构建了一个强大的异步任务处理框架。通过合理地配置和使用这些概念,您可以:

  • 异步执行耗时任务,提高应用程序的响应速度。
  • 实现任务的调度和重试,增强系统的可靠性。
  • 优化资源利用率,通过并发和队列管理提升性能。
  • 构建复杂的任务工作流,满足多样化的业务需求。

希望通过本文对 Celery 核心概念的统一介绍和示例说明,您可以更好地理解和应用 Celery,在实际项目中构建高性能的异步任务处理系统。


参考资料

  • Celery 官方文档
  • Celery 入门教程
  • Celery 最佳实践
  • 使用 Celery 构建异步任务队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/83814.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能对联网页小程序的仓颉之旅

#传统楹联遇上AI智能体:我的Cangjie Magic开发纪实 引言:一场跨越千年的数字对话 "云对雨,雪对风,晚照对晴空"。昨天晚上星空璀璨,当我用仓颉语言写下第一个智能对联网页小程序的Agent DSL代码时&#xff0…

《ERP原理与应用教程》第3版习题和答案

ERP原理与应用教程是一门系统介绍企业资源计划(Enterprise Resource Planning, ERP)系统核心理论、技术架构及实施应用的综合性课程。它主要面向管理类、信息类、工程类等专业学生及企业管理者,旨在培养对现代企业信息化管理的理解与实践能力。以下是该课程的详细解析: 一…

SOC-ESP32S3部分:32-LVGL显示框架

飞书文档https://x509p6c8to.feishu.cn/wiki/Ly6ywvphqi6HZlk38vHcz2OgnXg LVGL是一个开源的显示框架,使用它可以加速我们开发带显示屏交互的应用。 IDF对于LVGL的支持一直有更新的,我们可以很方便在组件库中搜索到对应版本的LVGL,并把它添…

原理图与 PCB 设计流程及注意事项

原理图与 PCB 设计流程及注意事项 一、原理图设计 1. 首先,需要创建一个新的项目,在此项目中建立原理图。 2. 接着,在原理图中添加元件和芯片。可以从元件库中挑选所需的元件,如电阻、电容等。既可以在元件库中进行搜索查找&…

LeetCode--23.合并k个升序链表

解题思路: 1.获取信息: 给出了多个升序链表,要求合并成一个升序链表,返回首元结点 2.分析题目: 外面在21题的时候,讲了怎样合并两个升序链表为一个升序链表,不了解的,建议去看一下21…

【国产化适配】如何选择高效合规的安全数据交换系统?

一、安全数据交换系统的核心价值与国产化需求 在数字化转型浪潮中,企业数据流动的频率与规模呈指数级增长,跨网文件传输已成为日常运营的刚需,所以安全数据交换系统也是企业必备的工具。然而,数据泄露事件频发、行业合规要求趋严…

JMM初学

文章目录 1,线程间的同步和通信1.1, 共享内存并发模型 (Shared Memory Model)线程通信机制线程同步机制特点 1.2, 消息传递并发模型 (Message Passing Model)线程通信机制线程同步机制特点 适用场景对比 2,Java内存模型JMM2.0,Java内存模型的基础(1)内存…

【动手学MCP从0到1】2.5 MCP中的Context日志输出、进度汇报和服务端调用客户端的大模型项目实现步骤详解

MCP中的Context 1. Context2. 日志输出2.1 服务端2.2 客户端2.2.1 客户端代码调试2.2.2 客户端全部代码 3. 进度汇报3.1 服务端3.2 客户端3.2.1 客户端代码调试3.2.2 客户端全部代码 4. 模型调用4.1 服务端4.2 客户端4.2.1 客户端代码调试4.2.2 客户端全部代码 1. Context Con…

QT自定义资源管理器

使用qt 和 C实现。还在优化中 项目地址:GitHub - Linda1226/FileResourceManager: 自定义资源管理器 有问题可以交流

[华为eNSP] OSPF综合实验

目录 配置流程 画出拓扑图、标注重要接口IP 配置客户端IP 配置服务端IP 配置服务器服务 配置路由器基本信息:名称和接口IP 配置路由器ospf协议 测试结果 通过配置OSPF路由协议,实现跨多路由器的网络互通,并验证终端设备的访问能力。 …

如何把本地服务器变成公网服务器?内网ip网址转换到外网连接访问

​ 内网IP只能在本地内部网络连接访问,当本地搭建服务器部署好相关网站或应用后,在局域网内可以通过内网IP访问,但在外网是无法直接访问异地内网IP端口应用的,只有公网IP和域名才能实现互联网上的访问。那么需要如何把本地服务器变…

Linux-文件管理及归档压缩

1.根下的目录作用说明: /:Linux系统中所有的文件都在根下/bin:(二进制命令目录)存放常用的用户命令/boot:系统启动时的引导文件(内核的引导配置文件,grub配置文件,内核配置文件) 例…

从零开始的python学习(七)P95+P96+P97+P98+P99+P100+P101

本文章记录观看B站python教程学习笔记和实践感悟,视频链接:【花了2万多买的Python教程全套,现在分享给大家,入门到精通(Python全栈开发教程)】 https://www.bilibili.com/video/BV1wD4y1o7AS/?p6&share_sourcecopy_web&v…

Linux 查找特定字符详细讲解

CentOS 7 中使用 grep 查找特定字符详细笔记​ 一、grep 命令概述​ grep 全称为 Global Regular Expression Print,即全局正则表达式打印,是 CentOS 7 系统中用于文本搜索的核心工具。它基于正则表达式或固定字符串,在文件、标准输入流中进…

uniappx插件nutpi-idcard 开发与使用指南(适配鸿蒙)

uniappx插件nutpi-idcard 开发与使用指南(适配鸿蒙) 前言 nutpi-idcard 是一个基于 UTS (uni-app TypeScript Syntax) 开发的 uni-app 插件适配鸿蒙,主要用于解析身份证号码,提取其中的关键信息,如地区、出生日期、性…

Grafana-ECharts应用讲解(玫瑰图示例)

工具: MySQL 数据库 MySQL Workbench 数据库管理工具(方便编辑数据) Grafana v11.5.2 Business Charts 6.6(原 Echarts插件) 安装 安装 MySQL社区版安装 MySQL Workbench安装 Grafana在 Grafana 插件中搜索 Business Charts 进行安装以上安装步骤网上教程很多,自行搜…

React状态管理Context API + useReducer

在 React 中,Context API useReducer 是一种轻量级的状态管理方案,适合中小型应用或需要跨组件共享复杂状态的场景。它避免了 Redux 的繁琐配置,同时提供了清晰的状态更新逻辑。 1. 基本使用步骤 (1) 定义 Reducer 类似于 Redux 的 reduce…

3 个优质的终端 GitHub 开源工具

1、Oh My Zsh Oh My Zsh 是一个帮助你管理和美化 zsh 终端的开源工具。它让你的终端更炫酷、更高效。安装后,你可以快速使用各种插件和主题,比如常见的 git 命令简化、支持多种编程语言工具等,每次打开终端都会有惊喜。无论你是开发者还是普…

无人机巡检智能边缘计算终端技术方案‌‌——基于EFISH-SCB-RK3588工控机/SAIL-RK3588核心板的国产化替代方案‌

一、方案核心价值‌ ‌实时AI处理‌:6TOPS NPU实现无人机影像的实时缺陷检测(延迟<50ms)‌全国产化‌:芯片、操作系统、算法工具链100%自主可控‌极端环境适配‌:-40℃~85℃稳定运行,IP65防护等…

SpringAI 1.0.0 正式版——利用Redis存储会话(ChatMemory)

官方文档:Chat Memory :: Spring AI Reference 1. 引言 SpringAI 1.0.0 改动了很多地方,本文根据官方的InMemoryChatMemoryRepository实现了自定义的RedisChatMemoryRepository,并使用MessageWindowChatMemory创建ChatMemory 2. 实现 2.1.…