RabbitMQ实践学习笔记

RabbitMQ实践

以下是关于RabbitMQ实践的整理,涵盖常见场景和示例代码(基于Markdown格式)。内容按模块分类,避免步骤词汇,直接提供可操作的方法:

基础连接与队列声明

使用Python的pika库建立连接并声明队列:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')  # 声明持久化队列可添加参数 durable=True

消息发布示例:

channel.basic_publish(exchange='',routing_key='hello',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))  # 消息持久化


工作队列模式

消费者端的公平分发(prefetch)设置:

channel.basic_qos(prefetch_count=1)  # 每次只处理一条消息
channel.basic_consume(queue='task_queue', on_message_callback=callback)

消息确认机制:

def callback(ch, method, properties, body):print("Received %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认


发布/订阅模式

声明扇形交换机:

channel.exchange_declare(exchange='logs', exchange_type='fanout')

临时队列绑定:

result = channel.queue_declare(queue='', exclusive=True)
channel.queue_bind(exchange='logs', queue=result.method.queue)

路由与主题模式

直连交换机实现路由键过滤:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')

主题交换机匹配模式:

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.critical')

RPC远程调用

客户端发送请求并监听回调队列:

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queuechannel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=callback_queue,correlation_id=corr_id,),body=request_body)

服务端响应处理:

def on_request(ch, method, props, body):response = process_request(body)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)


消息TTL与死信队列

设置消息过期时间:

channel.basic_publish(exchange='',routing_key='queue_with_ttl',body='expiring message',properties=pika.BasicProperties(expiration='60000'))  # 60秒后过期

配置死信交换机:

args = {"x-dead-letter-exchange": "dlx_exchange"}
channel.queue_declare(queue='work_queue', arguments=args)


插件与扩展

延迟消息通过插件实现:

headers = {"x-delay": 5000}  # 延迟5秒
channel.basic_publish(exchange='delayed_exchange',routing_key='delayed_queue',body='delayed msg',properties=pika.BasicProperties(headers=headers))

集群与高可用

配置镜像队列策略:

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'  # 同步所有节点

客户端连接多个节点:

hosts = [{'host': 'node1'}, {'host': 'node2'}]
connection = pika.BlockingConnection(pika.ConnectionParameters(hosts))

监控与管理

通过API获取队列状态:

import requests
response = requests.get('http://localhost:15672/api/queues', auth=('guest', 'guest'))

以上示例覆盖了RabbitMQ的核心功能场景,包括消息模式、可靠性保证和扩展功能。实际应用时需根据业务需求调整参数和错误处理机制。更多细节可参考RabbitMQ官方文档和社区最佳实践。

RabbitMQ 源码分析(C++ 客户端)

RabbitMQ 是一个开源的消息代理软件,采用 Erlang 语言编写。C++ 客户端是 RabbitMQ 官方提供的客户端库之一,用于在 C++ 项目中与 RabbitMQ 服务器进行交互。以下是对 RabbitMQ C++ 客户端源码的分析框架和关键点。

源码结构与核心模块

RabbitMQ C++ 客户端的源码主要包含以下几个核心模块:

AMQP 协议实现
C++ 客户端基于 AMQP 0-9-1 协议实现,核心代码位于 amqp.hamqp.c 中。这部分负责协议的编解码、帧的构造与解析。

Socket 通信层
使用系统套接字(Socket)实现与 RabbitMQ 服务器的 TCP 通信。代码位于 amqp_socket.hamqp_socket.c,支持普通 Socket 和 SSL/TLS 加密通信。

连接管理
amqp_connection.h 中定义了连接的生命周期管理,包括连接建立、心跳检测、连接关闭等逻辑。

通道管理
通过 amqp_channel.h 实现多路复用机制,单个 TCP 连接可以支持多个逻辑通道(Channel),每个通道独立处理消息。

消息发布与消费
amqp_queue.hamqp_exchange.h 实现了队列和交换器的声明、绑定操作。消息的发布(basic_publish)和消费(basic_consume)逻辑在 amqp_basic.h 中定义。


关键流程分析

连接建立流程

  1. 调用 amqp_new_connection 创建连接对象。
  2. 通过 amqp_socket_open 建立 Socket 连接。
  3. 发送协议头并协商参数(amqp_login)。

消息发布流程

  1. 声明交换器(amqp_exchange_declare)。
  2. 构造消息属性(amqp_basic_properties_t)。
  3. 调用 amqp_basic_publish 发送消息帧。

消息消费流程

  1. 声明队列并绑定到交换器(amqp_queue_bind)。
  2. 调用 amqp_basic_consume 订阅队列。
  3. 通过 amqp_consume_message 循环接收消息。

核心数据结构

帧(Frame)结构
RabbitMQ 使用帧作为通信的基本单位,帧类型包括:

  • METHOD:传递 AMQP 方法(如 queue.declare)。
  • HEADER:消息属性。
  • BODY:消息内容。

连接状态机
通过 amqp_connection_state_t 管理连接状态,包括协议版本、通道列表、心跳超时等。


示例代码片段

以下是使用 C++ 客户端发布消息的简化代码:

#include <amqp.h>
#include <amqp_tcp_socket.h>void publish_message() {amqp_connection_state_t conn = amqp_new_connection();amqp_socket_t *socket = amqp_tcp_socket_new(conn);amqp_socket_open(socket, "localhost", 5672);amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");amqp_channel_open(conn, 1);amqp_basic_publish(conn, 1, amqp_cstring_bytes("exchange_name"),amqp_cstring_bytes("routing_key"), 0, 0, NULL,amqp_cstring_bytes("Hello, RabbitMQ!"));amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);amqp_connection_close(conn, AMQP_REPLY_SUCCESS);amqp_destroy_connection(conn);
}


性能优化与调试

连接复用
通过通道复用 TCP 连接,减少频繁建立连接的开销。

异步模式
使用 amqp_consume_message 的非阻塞模式,结合事件循环实现高吞吐量。

调试工具

  • 启用 AMQP_DEBUG 宏打印协议帧日志。
  • 使用 Wireshark 抓包分析 AMQP 流量。

参考资源

  1. RabbitMQ 官方文档
  2. AMQP 0-9-1 协议规范
  3. rabbitmq-c 源码仓库

通过分析源码,可以更深入地理解 RabbitMQ 的通信机制和设计思想,便于定制化开发或性能调优。

基于Python和RabbitMQ的基础连接与队列

以下是基于Python和RabbitMQ的基础连接与队列声明的实例代码示例,涵盖不同场景和参数配置:


基础连接与简单队列声明

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
connection.close()

声明持久化队列

channel.queue_declare(queue='durable_queue', durable=True)


声明带自定义参数的队列

channel.queue_declare(queue='custom_queue', arguments={'x-message-ttl': 60000})


声明排他队列

channel.queue_declare(queue='exclusive_queue', exclusive=True)


声明自动删除队列

channel.queue_declare(queue='auto_delete_queue', auto_delete=True)


声明队列并绑定死信交换器

args = {'x-dead-letter-exchange': 'dlx_exchange'}
channel.queue_declare(queue='with_dlx', arguments=args)


声明优先级队列

channel.queue_declare(queue='priority_queue', arguments={'x-max-priority': 10})


声明延迟队列(通过插件)

channel.queue_declare(queue='delayed_queue', arguments={'x-delayed-type': 'direct'})


声明长度限制队列

channel.queue_declare(queue='length_limited', arguments={'x-max-length': 1000})


声明多消费者队列

channel.queue_declare(queue='multi_consumer', durable=True)
channel.basic_qos(prefetch_count=1)  # 公平分发

使用SSL加密连接

ssl_params = pika.SSLOptions(ssl.SSLContext(), 'localhost')
connection = pika.BlockingConnection(pika.ConnectionParameters(port=5671, ssl_options=ssl_params))

连接集群节点

params = pika.ConnectionParameters(host='node1.cluster')
connection = pika.BlockingConnection(params)

使用连接池

from rabbitmq import ConnectionPool
pool = ConnectionPool(pika.ConnectionParameters, max_size=10)

声明队列并返回队列信息

result = channel.queue_declare(queue='passive_queue', passive=True)
print(f"Queue has {result.method.message_count} messages")

声明无参数默认队列

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

使用异步连接适配器

import pika
connection = pika.SelectConnection(pika.ConnectionParameters('localhost'))

连接失败重试机制

parameters = pika.ConnectionParameters(host='localhost',connection_attempts=5,retry_delay=3
)

声明队列并绑定多个路由键

channel.queue_declare(queue='multi_binding')
channel.queue_bind(exchange='logs', queue='multi_binding', routing_key='info')
channel.queue_bind(exchange='logs', queue='multi_binding', routing_key='error')

使用URL参数连接

url = 'amqp://user:pass@host:port/vhost'
parameters = pika.URLParameters(url)
connection = pika.BlockingConnection(parameters)

声明队列时设置TTL

args = {'x-expires': 3600000}  # 1小时后自动删除
channel.queue_declare(queue='temp_queue', arguments=args)

注意事项

  • 所有示例需预先安装pika库:pip install pika
  • 生产环境建议添加异常处理逻辑
  • 连接参数应根据实际RabbitMQ服务器配置调整
  • 队列属性(如durable)需与已存在队列的属性一致,否则会报错

每个示例均可独立运行,建议根据实际需求组合使用不同参数。

RabbitMQ 工作队列模式与 prefetch 设置

RabbitMQ 的工作队列模式(Work Queue)用于在多个消费者之间分发任务。通过 prefetch 参数可以控制消费者未确认消息的最大数量,从而优化任务分配效率。

Python 实现工作队列模式

安装 RabbitMQ 的 Python 客户端库:

pip install pika

生产者代码示例(发送任务):

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)for i in range(10):message = f"Task {i}"channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化)print(f" [x] Sent {message}")connection.close()

消费者代码示例(处理任务):

import pika
import timedef callback(ch, method, properties, body):print(f" [x] Received {body.decode()}")time.sleep(body.count(b'.'))  # 模拟耗时任务print(" [x] Done")ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)  # 关键 prefetch 设置
channel.basic_consume(queue='task_queue', on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

prefetch 参数详解

prefetch_count=1 表示每个消费者最多只能有一个未确认的消息。该设置能实现:

  • 公平调度:避免某个消费者积压大量消息,而其他消费者空闲
  • 负载均衡:新任务会自动分配给空闲的消费者
  • 流量控制:防止消费者过载

对于需要更高吞吐量的场景,可以适当增大 prefetch_count 值,但需注意:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/92576.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/92576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

量子生成对抗网络:量子计算与生成模型的融合革命

引言&#xff1a;当生成对抗网络遇上量子计算在人工智能与量子计算双重浪潮的交汇处&#xff0c;量子生成对抗网络&#xff08;Quantum Generative Adversarial Networks, QGAN&#xff09;正成为突破经典算力瓶颈的关键技术。传统生成对抗网络&#xff08;GAN&#xff09;在图…

VBA 多个选项,将选中的选项录入当前选中的单元格

1、使用LISTBOX插件&#xff0c;选中后回车录入 维护好数据&#xff0c;并新增一个activeX列表框插件 Private Sub Worksheet_SelectionChange(ByVal Target As Range)If Target.Count > 1 Then Exit SubIf Target.Row > 2 And Target.Row < 10 And Target.Column 2…

【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 主页-微博点赞量Top6实现

大家好&#xff0c;我是java1234_小锋老师&#xff0c;最近写了一套【NLP舆情分析】基于python微博舆情分析可视化系统(flaskpandasecharts)视频教程&#xff0c;持续更新中&#xff0c;计划月底更新完&#xff0c;感谢支持。今天讲解主页-微博点赞量Top6实现 视频在线地址&…

SAP调用外部API

SAP需求将中文字符转化为对应的拼音具体思路,由于sap中没有将中文字符转化为拼音的函数或方法类,则以http请求访问外部服务器发布的API服务,然后获取其返回值即可1.调用外部网站上提供的api缺点:免费次数有限,后需要充值这里是用www格式的json报文*&----------------------…

(12)机器学习小白入门YOLOv:YOLOv8-cls 模型微调实操

YOLOv8-cls 模型微调实操 (1)机器学习小白入门YOLOv &#xff1a;从概念到实践 (2)机器学习小白入门 YOLOv&#xff1a;从模块优化到工程部署 (3)机器学习小白入门 YOLOv&#xff1a; 解锁图片分类新技能 (4)机器学习小白入门YOLOv &#xff1a;图片标注实操手册 (5)机器学习小…

基于Matlab传统图像处理技术的车辆车型识别与分类方法研究

随着计算机视觉和图像处理技术的发展&#xff0c;车辆检测与识别已经成为智能交通系统中的一个重要研究方向。传统图像处理方法通过对图像进行预处理、特征提取、分类与识别&#xff0c;提供了一种无需复杂深度学习模型的解决方案。本研究基于MATLAB平台&#xff0c;采用传统图…

未来趋势:LeafletJS 与 Web3/AI 的融合

引言 LeafletJS 作为一个轻量、灵活的 JavaScript 地图库&#xff0c;以其模块化设计和高效渲染能力在 Web 地图开发中占据重要地位。随着 Web3 和人工智能&#xff08;AI&#xff09;的兴起&#xff0c;地图应用的开发范式正在发生变革。Web3 技术&#xff08;如区块链、去中…

Spring AI 系列之二十一 - EmbeddingModel

之前做个几个大模型的应用&#xff0c;都是使用Python语言&#xff0c;后来有一个项目使用了Java&#xff0c;并使用了Spring AI框架。随着Spring AI不断地完善&#xff0c;最近它发布了1.0正式版&#xff0c;意味着它已经能很好的作为企业级生产环境的使用。对于Java开发者来说…

LFU算法及优化

继上一篇的LRU算法的实现和讲解&#xff0c;这一篇来讲述LFU最近使用频率高的数据很大概率将会再次被使用,而最近使用频率低的数据,将来大概率不会再使用。做法&#xff1a;把使用频率最小的数据置换出去。这种算法更多是从使用频率的角度&#xff08;但是当缓存满时&#xff0…

关于原车一键启动升级手机控车的核心信息及注意事项

想知道如何给原车已经有一键启动功能的车辆加装手机远程启动。这是个很实用的汽车改装需求&#xff0c;尤其适合想在冬天提前热车、夏天提前开空调的车主。一、适配方案与核心功能 ‌升级专车专用4G手机控车模块‌&#xff0c;推荐安装「移动管家YD361-3」系统&#xff0c;该方…

数据结构与算法:类C语言有关操作补充

数据结构与算法:类C语言操作补充 作为老师,我将详细讲解类C语言(如C或C++)中的关键操作,包括动态内存分配和参数传递。这些内容在数据结构与算法中至关重要,例如在实现动态数组、链表或高效函数调用时。我会用通俗易懂的语言和代码示例逐步解释,确保你轻松掌握。内容基…

Go 并发(协程,通道,锁,协程控制)

一.协程&#xff08;Goroutine&#xff09;并发&#xff1a;指程序能够同时执行多个任务的能力&#xff0c;多线程程序在一个核的cpu上运行&#xff0c;就是并发。并行&#xff1a;多线程程序在多个核的cpu上运行&#xff0c;就是并行。并发主要由切换时间片来实现"同时&q…

图机器学习(15)——链接预测在社交网络分析中的应用

图机器学习&#xff08;15&#xff09;——链接预测在社交网络分析中的应用0. 链接预测1. 数据处理2. 基于 node2vec 的链路预测3. 基于 GraphSAGE 的链接预测3.1 无特征方法3.2 引入节点特征4. 用于链接预测的手工特征5. 结果对比0. 链接预测 如今&#xff0c;社交媒体已成为…

每日一算:华为-批萨分配问题

题目描述"吃货"和"馋嘴"两人到披萨店点了一份铁盘&#xff08;圆形&#xff09;披萨&#xff0c;并嘱咐店员将披萨按放射状切成大小相同的偶数个小块。但是粗心的服务员将披萨切成了每块大小都完全不同的奇数块&#xff0c;且肉眼能分辨出大小。由于两人都…

Transfusion,Show-o and Show-o2论文解读

目录 一、Transfusion 1、概述 2、方法 二、Show-o 1、概述 2、方法 3、训练 三、Show-o2 1、概述 2、模型架构 3、训练方法 4、实验 一、Transfusion 1、概述 Transfusion模型应该是Show系列&#xff0c;Emu系列的前传&#xff0c;首次将文本和图像生成统一到单…

聊聊 Flutter 在 iOS 真机 Debug 运行出现 Timed out *** to update 的问题

最近刚好有人在问&#xff0c;他的 Flutter 项目在升级之后出现 Error starting debug session in Xcode: Timed out waiting for CONFIGURATION_BUILD_DIR to update 问题&#xff0c;也就是真机 Debug 时始终运行不了的问题&#xff1a; 其实这已经是一个老问题了&#xff0c…

《R for Data Science (2e)》免费中文翻译 (第1章) --- Data visualization(2)

写在前面 本系列推文为《R for Data Science (2)》的中文翻译版本。所有内容都通过开源免费的方式上传至Github&#xff0c;欢迎大家参与贡献&#xff0c;详细信息见&#xff1a; Books-zh-cn 项目介绍&#xff1a; Books-zh-cn&#xff1a;开源免费的中文书籍社区 r4ds-zh-cn …

【机器学习【9】】评估算法:数据集划分与算法泛化能力评估

文章目录一、 数据集划分&#xff1a;训练集与评估集二、 K 折交叉验证&#xff1a;提升评估可靠性1. 基本原理1.1. K折交叉验证基本原理1.2. 逻辑回归算法与L22. 基于K折交叉验证L2算法三、弃一交叉验证&#xff08;Leave-One-Out&#xff09;1、基本原理2、代码实现四、Shuff…

CodeBuddy三大利器:Craft智能体、MCP协议和DeepSeek V3,编程效率提升的秘诀:我的CodeBuddy升级体验之旅(个性化推荐微服务系统)

&#x1f31f; 嗨&#xff0c;我是Lethehong&#xff01;&#x1f31f; &#x1f30d; 立志在坚不欲说&#xff0c;成功在久不在速&#x1f30d; &#x1f680; 欢迎关注&#xff1a;&#x1f44d;点赞⬆️留言收藏&#x1f680; &#x1f340;欢迎使用&#xff1a;小智初学计…

Spring Boot 整合 Redis 实现发布/订阅(含ACK机制 - 事件驱动方案)

Spring Boot整合Redis实现发布/订阅&#xff08;含ACK机制&#xff09;全流程一、整体架构二、实现步骤步骤1&#xff1a;添加Maven依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter…