消息队列的推拉模式详解:实现原理与代码实战

消息队列是现代分布式系统中不可或缺的中间件,它通过"生产者-消费者"模式实现了系统间的解耦和异步通信。本文将深入探讨消息队列中的两种核心消息传递模式:推送(Push)和拉取(Pull),并通过代码示例展示它们的实现方式。

目录

  1. 消息队列基础概念
  2. 推送(Push)模式详解
  3. 拉取(Pull)模式详解
  4. 推拉模式对比
  5. 主流消息队列的实现方式
  6. 代码实战:实现简单的推拉模式
  7. 总结与最佳实践

消息队列基础概念

消息队列主要由以下组件构成:

  • 生产者(Producer):发送消息到队列的应用程序
  • 消费者(Consumer):从队列接收消息的应用程序
  • 消息代理(Broker):负责存储和转发消息的中间件
  • 队列(Queue):消息的存储区域

推送(Push)模式详解

工作原理

在推送模式中,消息代理(Broker)主动将消息发送给消费者,消费者被动接收。这种模式类似于订报纸 - 报社(生产者)将报纸(消息)送到你家(消费者),你不需要主动去取。

特点

  • 实时性高:消息到达后立即推送给消费者
  • 消费者负载不可控:可能因突发流量压垮消费者
  • 实现复杂度高:需要处理消费者确认、重试等机制

代码示例:RabbitMQ推送模式

import pika# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明队列
channel.queue_declare(queue='push_queue')# 定义回调函数
def callback(ch, method, properties, body):print(f" [x] Received {body}")ch.basic_ack(delivery_tag=method.delivery_tag)# 设置消费者并启用推送模式
channel.basic_consume(queue='push_queue', on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 开始接收推送的消息

拉取(Pull)模式详解

工作原理

在拉取模式中,消费者主动从消息代理请求消息。这类似于去邮局取包裹 - 你需要主动去邮局(消息代理)检查并取回你的包裹(消息)。

特点

  • 消费者控制节奏:可以按自身处理能力获取消息
  • 实现简单:不需要复杂的推送和确认机制
  • 实时性较低:存在一定的延迟
  • 资源消耗:需要轮询或长连接检查新消息

代码示例:Kafka拉取模式

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class PullConsumer {public static void main(String[] args) {// 配置消费者Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "pull-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("pull-topic"));try {while (true) {// 主动拉取消息(100ms超时)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}} finally {consumer.close();}}
}

推拉模式对比

特性推送(Push)模式拉取(Pull)模式
实时性较低
消费者控制力
实现复杂度
资源消耗Broker端压力大消费者端需要轮询
典型应用场景实时通知、即时通讯批量处理、流处理
代表中间件RabbitMQ、ActiveMQKafka、RocketMQ(Pull模式)
消费者负载可能过载可自行调节
消息堆积处理可能导致消费者崩溃消息堆积在Broker,消费者可控

主流消息队列的实现方式

RabbitMQ - 主要采用Push模式

RabbitMQ使用AMQP协议,主要通过推送模式向消费者传递消息。它提供了复杂的确认机制(QoS)来控制推送速率。

Kafka - 采用Pull模式

Kafka采用拉取模式,消费者可以控制读取速度和位置。这种设计适合高吞吐量的日志处理场景。

RocketMQ - 混合模式

RocketMQ支持长轮询(Long Polling),本质上是Pull模式但能达到Push模式的实时性。

代码实战:实现简单的推拉模式

简单推送模式实现

class SimplePushBroker:def __init__(self):self.queues = {}self.consumers = {}def add_queue(self, queue_name):self.queues[queue_name] = []def register_consumer(self, queue_name, callback):if queue_name not in self.consumers:self.consumers[queue_name] = []self.consumers[queue_name].append(callback)def publish(self, queue_name, message):if queue_name not in self.queues:self.add_queue(queue_name)self.queues[queue_name].append(message)# 推送消息给所有消费者if queue_name in self.consumers:for callback in self.consumers[queue_name]:callback(message)# 使用示例
broker = SimplePushBroker()# 消费者回调
def consumer1(msg):print(f"Consumer1 received: {msg}")def consumer2(msg):print(f"Consumer2 received: {msg}")# 注册消费者
broker.register_consumer("test_queue", consumer1)
broker.register_consumer("test_queue", consumer2)# 发布消息
broker.publish("test_queue", "Hello Push Mode!")

简单拉取模式实现

class SimplePullBroker:def __init__(self):self.queues = {}def add_queue(self, queue_name):self.queues[queue_name] = []def publish(self, queue_name, message):if queue_name not in self.queues:self.add_queue(queue_name)self.queues[queue_name].append(message)def pull(self, queue_name):if queue_name in self.queues and self.queues[queue_name]:return self.queues[queue_name].pop(0)return None# 使用示例
broker = SimplePullBroker()
broker.publish("test_queue", "Message 1")
broker.publish("test_queue", "Message 2")# 消费者主动拉取
while True:msg = broker.pull("test_queue")if msg is None:breakprint(f"Received: {msg}")

总结与最佳实践

如何选择推拉模式?

  1. 选择Push模式当

    • 需要低延迟的消息传递
    • 消费者处理能力稳定且足够
    • 消息量不大但实时性要求高
  2. 选择Pull模式当

    • 消费者处理能力有限或变化大
    • 需要批量处理消息
    • 消费者需要控制消费速率

高级模式

  1. 长轮询(Long Polling):结合推拉的优点,消费者发起请求但Broker在有消息时才响应
  2. 混合模式:如RocketMQ的实现,表面是Push但底层是Pull
  3. 背压控制(Backpressure):在Push模式中加入流量控制机制

最佳实践

  1. 监控消息堆积:无论推拉模式,都需要监控队列长度
  2. 合理设置超时:避免消费者挂起或资源浪费
  3. 实现幂等消费:网络问题可能导致消息重发
  4. 考虑消费者分组:提高并行处理能力

消息队列的推拉模式各有优劣,理解它们的原理和实现方式有助于我们在实际项目中做出合理的选择和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/88903.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/88903.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV图像噪点消除五大滤波方法

在数字图像处理中&#xff0c;噪点消除是提高图像质量的关键步骤。本文将基于OpenCV库&#xff0c;详细讲解五种经典的图像去噪滤波方法&#xff1a;均值滤波、方框滤波、高斯滤波、中值滤波和双边滤波&#xff0c;并通过丰富的代码示例展示它们的实际应用效果。 一、图像噪点…

Rust宏和普通函数的区别

Rust 中的宏&#xff08;macro&#xff09;和普通函数有以下核心区别&#xff0c;分别从用途、扩展方式、性能影响和语法特征等多个方面来解释&#xff1a; &#x1f4cc; 1. 定义方式 项目宏函数定义方式macro_rules! 或 macro&#xff08;新版&#xff09;fn 关键字调用方式…

基于Qt C++的影像重采样批处理工具设计与实现

摘要 本文介绍了一种基于Qt C++框架开发的高效影像重采样批处理工具。该工具支持按分辨率(DPI) 和按缩放倍率两种重采样模式,提供多种插值算法选择,具备强大的批量处理能力和直观的用户界面。工具实现了影像处理的自动化流程,显著提高了图像处理效率,特别适用于遥感影像处…

TypeScript 中的 WebSocket 入门

如何开始使用 Typescript 和 React 中的 WebSockets 创建一个简单的聊天应用程序 示例源码&#xff1a;ws 下一篇&#xff1a;https://blog.csdn.net/hefeng_aspnet/article/details/148898147 介绍 WebSocket 是一项我目前还没有在工作中使用过的技术&#xff0c;但我知道…

TMS汽车热管理系统HILRCP解决方案

TMS汽车热管理系统介绍 随着汽车电动化和智能化的发展&#xff0c;整车能量管理内容增多&#xff0c;对汽车能量管理的要求也越来越高&#xff0c;从整车层面出发对各子系统进行能量统筹管理将成为电动汽车未来的发展趋势&#xff0c;其中汽车热管理是整车能量管理的重要组成部…

CCleaner Pro v6.29.11342 绿色便携版

CCleaner Pro v6.29.11342 绿色便携版 CCleaner是Piriform&#xff08;梨子公司&#xff09;最著名广受好评的系统清理优化及隐私保护软件&#xff0c;也是该公司主打和首发产品&#xff0c;它体积小、扫描速度快&#xff0c;具有强大的自定义清理规则扩展能力。CCleaner是一款…

不做手机控APP:戒掉手机瘾,找回专注与自律

在当今数字化时代&#xff0c;手机已经成为我们生活中不可或缺的一部分。然而&#xff0c;过度依赖手机不仅会分散我们的注意力&#xff0c;影响学习和工作效率&#xff0c;还可能对身心健康造成负面影响。为了帮助用户摆脱手机依赖&#xff0c;重拾自律和专注&#xff0c;一款…

Go 语言中的接口

1、接口与鸭子类型 在 Go 语言中&#xff0c;接口&#xff08;interface&#xff09;是一个核心且至关重要的概念。它为构建灵活、可扩展的软件提供了坚实的基础。要深入理解 Go 的接口&#xff0c;我们必须首先了解一个在动态语言中非常普遍的设计哲学——鸭子类型&#xff0…

在项目中如何巧妙使用缓存

缓存 对于经常访问的数据&#xff0c;每次都从数据库&#xff08;硬盘&#xff09;中获取是比较慢&#xff0c;可以利用性能更高的存储来提高系统响应速度&#xff0c;俗称缓存 。合理使用缓存可以显著降低数据库的压力、提高系统性能。 那么&#xff0c;什么样的数据适合缓存…

SLAM中的非线性优化-2D图优化之零空间(十五)

这节在进行讲解SLAM中一个重要概念&#xff0c;零空间&#xff0c;讲它有啥用呢&#xff1f;因为SLAM中零空间的存在&#xff0c;才需要FEJ或固定约束存在&#xff0c;本节内容不属于2D图优化独有&#xff0c;先看看什么是零空间概念&#xff1b;零空间是一个核心概念&#xff…

如何解决本地DNS解析失败问题?以连接AWS ElastiCache Redis为例

在云服务开发中,DNS解析问题常常成为困扰开发者的隐形障碍。本文将通过AWS ElastiCache Redis连接失败的实际案例,详细介绍如何诊断和解决DNS解析问题,帮助你快速恢复服务连接。 引言 在使用 telnet 或 redis-cli 连接 AWS ElastiCache Redis 时,有时会遇到类似以下错误:…

探索钉钉生态中的宜搭:创建与分享应用的新视界

在当今快速发展的数字化时代&#xff0c;企业对于高效协作和信息管理的需求日益增长。作为阿里巴巴集团旗下的智能工作平台&#xff0c;钉钉不仅为企业提供了强大的沟通工具&#xff0c;其开放的生态系统也为用户带来了无限可能。其中&#xff0c;宜搭&#xff08;YiDa&#xf…

深入理解事务和MVCC

文章目录 事务定义并发事务代码实现 MVCC定义核心机制 事务 定义 什么是事务&#xff1f; 事务是指一组操作要么全部成功&#xff0c;要么全部失败的执行单位。 在数据库中&#xff0c;一个事务通常包含一组SQL语句&#xff0c;系统保证这些语句作为一个整体执行。 为什么引…

用 Python 绘制精美雷达图:多维度材料属性对比可视化全指南

&#x1f31f; 为什么选择雷达图&#xff1f;从材料科学到多维数据对比的可视化利器 在科研和数据分析领域&#xff0c;当我们需要同时展示多个维度的数据对比时&#xff0c;传统的柱状图或折线图往往显得力不从心。这时候&#xff0c;雷达图&#xff08;Radar Chart&#xff…

Excel学习03

超级表与图表 Excel中具有超级表的功能。所谓超级表&#xff08;官方名称为“表格”&#xff0c;快捷键CtrlT&#xff09;是Excel中一个强大的数据管理工具&#xff0c;它将普通的数据区域转换为具有只能功能的交互式表格。 这就是表格变为超级表的样子。超级表默认具备冻结窗…

Netflix 网飞的架构演进过程、Java在网飞中的应用|图解

写在前面 上一篇文章中&#xff0c;我们讲解了网飞当前的架构&#xff0c;但网飞的架构并不是一开始就是这样的&#xff0c;而是不断演进发展才是当前的样子。 这篇文章我们就来讲讲网飞架构的演进过程。 第一阶段&#xff1a;Zuul Gateway REST API 使用 Zuul 作为API网关…

使用ros2服务实现人脸检测2-人脸检测功能实现(适合0基础小白)

文章目录 一、用到的库二、使用步骤1.引入库2.获取图片真实路径3.检测人脸4.绘制人脸5.显示结果6.更改setup.py7.完整代码 三、结果展示 一、用到的库 face_recognition&#xff1a;实现在图片中检测人脸。 cv2&#xff1a;显示图片&#xff0c;并且可以在图像中展示检测结果。…

中国农村统计年鉴-Excel版(1985-2024年)

《中国农村统计年鉴》系统收录了全国和各省农村社会经济统计数据&#xff0c;以及近年全国农村主要统计数据&#xff0c;是一部全面反映我国农村社会经济情况的资料性年刊。年鉴内容覆盖农村人口结构、农业产值、主要农产品产量、市场物价、进出口贸易以及收入消费水平等社会经…

golang pprof性能调试工具

简介 pprof是性能调试工具,可以生成类似火焰图、堆栈图,内存分析图等。 整个分析的过程分为两步:1. 导出数据,2. 分析数据。

PPIO × 302.AI:三分钟搭建可共享的聊天机器人

最近&#xff0c;各主流模型厂商频频发布新模型&#xff0c;有一如既往强大的DeepSeek-R1-0528&#xff0c;擅长长输入推理的MiniMax-M1-80k…… 好用的AI大模型这么多&#xff0c;如何才能集成在一个应用自由使用呢&#xff1f;302.AI作为企业级AI应用平台支持各主流模型调用&…