python中使用高并发分布式队列库celery的那些坑

python中使用高并发分布式队列库celery的那些坑

      • 🌟 简单理解
      • 🛠️ 核心功能
      • 🚀 工作机制
      • 📦 示例代码(使用 Redis 作为 broker)
      • 🔗 常见搭配
      • 📦 我的环境
      • 📦第一个问题
      • 📦第二个问题
        • 原因分析

Celery 是一个用于 分布式任务队列 的 Python 库,常用于处理异步任务(即任务不需要立即执行,后台慢慢做),尤其适合执行定时任务或耗时操作。


🌟 简单理解

Celery 就是让你把“任务”扔到后台执行,而不是阻塞当前程序。


🛠️ 核心功能

功能说明
异步任务执行比如发邮件、处理图片、生成报告等不需要立即完成的操作。
分布式任务调度可以运行在多台服务器上,实现任务负载均衡。
定时任务(周期任务)类似 crontab,可设置任务定时执行(如每天 8 点发日报)。
任务重试机制失败任务可以自动重试,适用于网络波动等场景。
与Django/Flask集成非常适合与这些 Web 框架配合使用,将长耗时任务下放到 Celery。

🚀 工作机制

Celery 一般由以下几部分组成:

  1. Producer(生产者):你写的代码,会将任务“发”出去。
  2. Broker(中间人):任务先存放在消息队列(如 Redis、RabbitMQ)中。
  3. Worker(工人):后台运行的进程,专门“接收”和“执行”这些任务。
  4. Result Backend(结果后端):可选,记录任务结果,如执行成功或失败。

📦 示例代码(使用 Redis 作为 broker)

# tasks.py
from celery import Celeryapp = Celery('mytasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y

运行方式:

celery -A tasks worker --loglevel=info

调用方式(异步执行):

add.delay(3, 5)  # 返回一个异步结果对象

🔗 常见搭配

  • 消息中间件:Redis、RabbitMQ(推荐 Redis 简单易用)
  • Web框架集成:Django、Flask
  • 配合 Flower、Prometheus、Grafana 等工具可实现任务监控

如果你正在开发一个 需要做“异步处理”或“后台任务”的系统,Celery 是 Python 中的主流选择之一。但是该库看似简单,却隐藏着无数坑,本文就带大家了解一下我在使用过程中遇到的那些坑。

📦 我的环境

  • windows 10
  • python 3.12
  • celery 5.5.2

📦第一个问题

执行命令:

celery -A main_async:celery_app worker --loglevel=info

报错:

[2025-05-29 19:40:22,107: INFO/MainProcess] Task main_async.background_content_similarity[4c84e1c8-6a13-4241-8e62-04e17b3884cb] received
[2025-05-29 19:40:22,142: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')
billiard.einfo.RemoteTraceback:
"""
Traceback (most recent call last):File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloopresult = (True, prepare_result(fun(*args, **kwargs)))^^^^^^^^^^^^^^^^^^^^File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_tasktasks, accept, hostname = _loc^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)
"""The above exception was the direct cause of the following exception:Traceback (most recent call last):File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloopresult = (True, prepare_result(fun(*args, **kwargs)))^^^^^^^^^^^^^^^^^^^^File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_tasktasks, accept, hostname = _loc^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)

该问题是由于celery的默认并发网络编程线程库引起的,换成eventlet可以解决问题,只需修改启动命令即可,如下:

celery -A main_async:celery_app worker --loglevel=info -P eventlet

📦第二个问题

第二个问题是日志问题,报错类似如下所示:

'LoggingProxy' object has no attribute 'encoding'"
原因分析

Celery 在启动 worker 时,默认会将标准输出和标准错误重定向到其日志系统中。这意味着 sys.stdout 和 sys.stderr 被替换为 LoggingProxy 对象。然而,某些库或代码可能期望这些对象具有标准文件对象的属性,如 encoding 或 fileno,从而导致 AttributeError。

此时只需要将worker_redirect_stdouts参数设置为False即可解决问题,代码如下:

# Celery 配置
celery_app.conf.update(task_serializer="json",accept_content=["json"],result_serializer="json",timezone="Asia/Shanghai",enable_utc=True,include=["main_async"],  # 显式指定任务模块task_track_started=True,  # 跟踪任务开始状态task_ignore_result=False,  # 保存任务结果task_store_errors_even_if_ignored=True,  # 存储错误worker_redirect_stdouts = False	# 禁止将stdout和stderr重定向到当前记录器。
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/82276.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

截图工具 Snipaste V2.10.7(2025.06.2更新)

—————【下 载 地 址】——————— 【​本章下载一】:https://pan.xunlei.com/s/VORklK9hcuoI6n_qgx25jSq2A1?pwde7bi# 【​本章下载二】:https://pan.quark.cn/s/7c62f8f86735 【百款黑科技】:https://ucnygalh6wle.feishu.cn/wiki/…

batch_size 参数最优设置

在深度学习训练中,batch_size(批量大小)的选择是一个需要权衡的问题,既不是越大越好,也不是越小越好,而是需要根据硬件资源、数据规模、模型复杂度和优化目标等因素综合决定。以下是详细分析:

【agent开发】部署LLM(一)

本周基本就是在踩坑,没什么实质性的进展 下载模型文件 推荐一个网站,可以简单计算下模型推理需要多大显存:https://apxml.com/tools/vram-calculator 我的显卡是RTX 4070,有12GB的显存,部署一个1.7B的Qwen3应该问题…

大数据-274 Spark MLib - 基础介绍 机器学习算法 剪枝 后剪枝 ID3 C4.5 CART

点一下关注吧!!!非常感谢!!持续更新!!! 大模型篇章已经开始! 目前已经更新到了第 22 篇:大语言模型 22 - MCP 自动操作 FigmaCursor 自动设计原型 Java篇开…

flutter常用动画

Flutter 动画基础概念 术语解释Animation表示动画的值,通常是一个 double (0.0 ~ 1.0) 或其他数值。AnimationController管理动画的时间进度和状态。需要 Ticker (vsync) 来驱动。Tween定义动画的取值范围,如从 0.0 到 1.0,从红色到蓝色。Cu…

Python打卡DAY43

复习日 作业: kaggle找到一个图像数据集,用cnn网络进行训练并且用grad-cam做可视化 进阶:并拆分成多个文件 我选择ouIntel Image Classification | Kagglezz,该数据集分为六类,包含建筑、森林、冰川、山脉、海洋和街道…

从多巴胺的诱惑到内啡肽的力量 | 个体成长代际教育的成瘾困局与破局之道

注:本文为“多巴胺,内啡肽”相关文章合辑。 图片清晰度受引文原图所限。 略作重排,未整理去重。 如有内容异常,请看原文。 年少偏爱多巴胺,中年才懂内啡肽 摘要 :本文通过生活实例与科学研究相结合的方式…

【音视频】H265 NALU分析

1 H265 概述 H264 与 H265 的区别 传输码率:H264 由于算法优化,可以低于 2Mbps 的速度实现标清数字图像传送;H.265 High Profile 可实现低于 1.5Mbps 的传输带宽下,实现 1080p 全高清视频传输。 编码架构:H.265/HEVC…

Python训练营打卡 Day26

知识点回顾: 函数的定义变量作用域:局部变量和全局变量函数的参数类型:位置参数、默认参数、不定参数传递参数的手段:关键词参数传递参数的顺序:同时出现三种参数类型时 ——————————————————————…

PH热榜 | 2025-05-29

1. Tapflow 2.0 标语:将你的文档转化为可销售的指导手册、操作手册和工作流程。 介绍:Tapflow 2.0将各类知识(包括人工智能、设计、开发、营销等)转化为有条理且可销售的产品。现在你可以导入文件,让人工智能快速为你…

GitHub 趋势日报 (2025年05月30日)

📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 833 agenticSeek 789 prompt-eng-interactive-tutorial 466 ai-agents-for-beginn…

Cesium 8 ,在 Cesium 上实现雷达动画和车辆动画效果,并控制显示和隐藏

目录 ✨前言 一、功能背景 1.1 核心功能概览 1.2 技术栈与工具 二、车辆动画 2.1 模型坐标 2.2 组合渲染 2.3 显隐状态 2.4 模型文件 三、雷达动画 3.1 创建元素 3.2 动画解析 3.3 坐标联动 3.4 交互事件 四、完整代码 4.1 属性参数 4.2 逻辑代码 加载车辆动画…

相机--相机标定

教程 相机标定分类 相机标定分为内参标定和外参标定。 内参标定 目的 作用 原理 外参标定

JS手写代码篇---手写类型判断函数

9、手写类型判断函数 手写完成这个函数:输入一个对象(value),返回它的类型 js中的数据类型: 值类型:String、Number、Boolean、Null、Undefied、Symbol引用类型:Object、Array、Function、RegExp、Date 使用typeOf…

量子物理:初步认识量子物理

核心特点——微观世界与宏观世界的差异 量子物理(又称量子力学)是物理学中描述微观世界(原子、电子、光子等尺度)基本规律的理论框架。它与我们熟悉的经典物理(牛顿力学、电磁学等)有根本性的不同,因为微观粒子的行为展现出许多奇特且反直觉的现象。 简单来说,量子物…

springboot配置cors拦截器与cors解释

文章目录 cors?代码 cors? CORS(跨域资源共享)的核心机制是 由后端服务器(bbb.com)决定是否允许前端(aaa.com)的跨域请求 当浏览器访问 aaa.com 的页面,并向 bbb.com/list 发起请求时&#…

国芯思辰| 同步降压转换器CN2020应用于智能电视,替换LMR33620

在智能电视不断向高画质、多功能、智能化发展的当下,其内部电源管理系统的性能至关重要。同步降压转换器可以为智能电视提供稳定、高效的运行。 国芯思辰CN2020是一款脉宽调制式同步降压转换器。内部集成两个功率MOS管,在4.5~18V宽输入电压范围内可以持…

API 版本控制:使用 ABP vNext 实现版本化 API 系统

🚀API 版本控制:使用 ABP vNext 实现版本化 API 系统 📚 目录 🚀API 版本控制:使用 ABP vNext 实现版本化 API 系统一、背景切入 🧭二、核心配置规则 📋2.1 前置准备:NuGet 包与 usi…

Android高级开发第四篇 - JNI性能优化技巧和高级调试方法

文章目录 Android高级开发第四篇 - JNI性能优化技巧和高级调试方法引言为什么JNI性能优化如此重要?第一部分:JNI性能基础知识JNI调用的性能开销何时使用JNI才有意义? 第二部分:核心性能优化技巧1. 减少JNI调用频率2. 高效的数组操…

小白的进阶之路系列之十----人工智能从初步到精通pytorch综合运用的讲解第三部分

本文将介绍Autograd基础。 PyTorch的Autograd特性是PyTorch灵活和快速构建机器学习项目的一部分。它允许在一个复杂的计算中快速而简单地计算多个偏导数(也称为梯度)。这个操作是基于反向传播的神经网络学习的核心。 autograd的强大之处在于它在运行时动态地跟踪你的计算,…