详解RabbitMQ工作模式之工作队列模式

目录

工作队列模式

概念

特点

应用场景

工作原理

注意事项

代码案例

引入依赖

常量类

编写生产者代码

编写消费者1代码

编写消费者2代码

先运行生产者,后运行消费者

先运行消费者,后运行生产者


工作队列模式

概念

在工作队列模式中,一个生产者(producer)将任务发布到队列中,多个消费者(consumer)从队列中获取任务并执行。这种模式的主要目标是提高任务的并行处理能力,从而提高系统的吞吐量和效率。

特点

可以有多个消费者,但一条消息只能被一个消费者获取。
消费者在处理完某条消息后,才会收到下一条消息。
RabbitMQ采用轮询(Round-Robin)或公平分发(Fair Dispatch)的方式将消息发送给消费者。 

应用场景

1.任务分发:将任务分发给多个工作者(消费者),以便并行处理。这对于需要高吞吐量和任务处理效率的应用程序非常有用。例如,图像处理、视频编码、数据转换等应用可以使用工作队列模式来并行处理大量任务。
2.负载均衡:当有多个消费者时,工作队列模式可以用来实现负载均衡。任务将均匀分布给可用的消费者,以确保每个消费者都有工作可做,而且不会超负荷。
3.后台任务处理:在Web应用程序中,后台任务处理是一个常见的需求。工作队列模式可用于处理与Web请求无关的长时间运行任务,而不会影响用户体验。例如,发送电子邮件、生成报告、备份数据等后台任务可以使用工作队列来处理。

工作原理

1.生产者发送任务:生产者将任务封装为消息,并将其发送到RabbitMQ队列中。
2.RabbitMQ分发任务:RabbitMQ根据配置的分发策略(如轮询或公平分发)将任务分发给消费者。
3.消费者处理任务:消费者从队列中获取任务并执行。在处理完任务后,消费者会向RabbitMQ发送确认消息,表示任务已完成。
4.RabbitMQ确认任务完成:在收到消费者的确认消息后,RabbitMQ会将该任务从队列中移除。

注意事项

1.消息确认:为了确保消息不会丢失,消费者在处理完任务后需要向RabbitMQ发送确认消息。如果消费者在处理任务时失败或崩溃,RabbitMQ会将该任务重新分发给其他消费者。
2.负载均衡:RabbitMQ默认采用轮询方式将消息分发给消费者。如果需要更复杂的负载均衡策略,可以考虑使用其他分发策略或自定义交换机类型。
3.错误处理:在生产者和消费者中都需要添加适当的错误处理逻辑,以处理可能出现的异常情况,如连接失败、消息发送失败等。

代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量类
public class Constants {public static final String HOST = "47.98.109.138";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "aaa";//工作队列模式public static final String WORK_QUEUE = "work.queue";
}
编写生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功~");//6. 资源释放channel.close();connection.close();}
}
编写消费者1代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 资源释放
//        channel.close();
//        connection.close();}
}
编写消费者2代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODOSystem.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//        //6. 资源释放
//        channel.close();
//        connection.close();}
}
先运行生产者,后运行消费者

查看管理界面

我们此时会看到,先启动的消费者会消费掉队列中所有的消息。

先运行消费者,后运行生产者

此时我们能看到,两个消费者都能够消费消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/78970.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/78970.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构-非线性结构-二叉树

概述 /** * 术语 * 根节点&#xff08;root node&#xff09;&#xff1a;位于二叉树顶层的节点&#xff0c;没有父节点。 * 叶节点&#xff08;leaf node&#xff09;&#xff1a;没有子节点的节点&#xff0c;其两个指针均指向 None 。 * 边&#xff08;edge&#xff09;&…

芯片笔记 - 手册参数注释

芯片手册参数注释 基础参数外围设备USB OTG&#xff08;On-The-Go&#xff09;以太网存储卡&#xff08;SD&#xff09;SDIO 3.0(Secure Digital Input/Output)GPIO&#xff08;General Purpose Input/Output 通用输入/输出接口&#xff09;ADC&#xff08;Analog to Digital C…

力扣94. 二叉树的中序遍历

94. 二叉树的中序遍历 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2]示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[]示例 3&#xff1a; 输入&#…

深度学习:AI为老年痴呆患者点亮希望之光

引言 随着全球人口老龄化进程的加速&#xff0c;老年痴呆症已成为严重威胁老年人健康和生活质量的公共卫生问题。据世界卫生组织统计&#xff0c;全球每 3 秒钟就有 1 人被诊断为痴呆&#xff0c;预计到 2050 年&#xff0c;全球痴呆患者人数将从目前的约 5000 万激增至 1.52 亿…

抛物线法(二次插值法)

抛物线法简介 抛物线法&#xff08;Quadratic Interpolation Method&#xff09;是一种用于一维单峰函数极值搜索的经典优化方法。该方法通过在区间内选取三个不同的点&#xff0c;拟合一条二次抛物线&#xff0c;并求取这条抛物线的极值点作为新的迭代点&#xff0c;从而逐步…

FreeRTOS如何检测内存泄漏

在嵌入式系统中&#xff0c;内存资源通常非常有限&#xff0c;内存泄漏可能导致系统性能下降甚至崩溃。内存泄漏是指程序分配的内存未被正确释放&#xff0c;逐渐耗尽可用内存。 FreeRTOS作为一种轻量级实时操作系统&#xff08;RTOS&#xff09;&#xff0c;广泛应用于资源受限…

Mockoon 使用教程

文章目录 一、简介二、模拟接口1、Get2、Post 一、简介 1、Mockoon 可以快速模拟API&#xff0c;无需远程部署&#xff0c;无需帐户&#xff0c;免费&#xff0c;跨平台且开源&#xff0c;适合离线环境。 2、支持get、post、put、delete等所有格式。 二、模拟接口 1、Get 左…

如何进行APP安全加固

进行APP安全加固的关键在于代码混淆、加密敏感数据、权限管理、漏洞扫描与修复。其中&#xff0c;代码混淆能有效阻止逆向工程与篡改攻击&#xff0c;提升应用的安全防护能力。通过混淆代码&#xff0c;攻击者难以轻易理解源代码逻辑&#xff0c;从而降低被破解或攻击的风险。 …

【C++】手搓一个STL风格的string容器

C string类的解析式高效实现 GitHub地址 有梦想的电信狗 1. 引言&#xff1a;字符串处理的复杂性 ​ 在C标准库中&#xff0c;string类作为最常用的容器之一&#xff0c;其内部实现复杂度远超表面认知。本文将通过一个简易仿照STL的string类的完整实现&#xff0c;揭示其设…

辰鳗科技朱越洋:紧扣时代契机,全力投身能源转型战略赛道

国家能源局于4月28日出台的《关于促进能源领域民营经济发展若干举措的通知》&#xff08;以下简称《通知》&#xff09;&#xff0c;是继2月民营企业座谈会后深化能源领域市场化改革的关键政策&#xff0c;标志着民营经济在“双碳”目标引领下正式进入能源转型的核心赛道。 自…

Vue实现不同网站之间的Cookie共享功能

前言 最近有小伙伴在聊天室中提到这么一个需求&#xff0c;就是说希望用户在博客首页中登录了之后&#xff0c;可以跳转到管理系统去发布文章。这个需求的话就涉及到了不同网站之间cookie共享的功能&#xff0c;那么咱们就来试着解决一下这个功能。 实现方式 1. 后端做中转 …

在一台服务器上通过 Nginx 配置实现不同子域名访问静态文件和后端服务

一、域名解析配置 要实现通过不同子域名访问静态文件和后端服务&#xff0c;首先需要进行域名解析。在域名注册商或 DNS 服务商处&#xff0c;为你的两个子域名 blog.xxx.com 和 api.xxx.com 配置 A 记录或 CNAME 记录。将它们的 A 记录都指向你服务器的 IP 地址。例如&#x…

Opencv进阶操作:图像拼接

文章目录 前言一、图像拼接的原理1. 特征提取与匹配2. 图像配准3. 图像变换与投影4. 图像融合5. 优化与后处理 二、图像拼接的简单实现&#xff08;案例实现&#xff09;1.引入库2.定义cv_show()函数3.创建特征检测函数detectAndDescribe()4.读取拼接图片5.计算图片特征点及描述…

LLM 论文精读(三)Demystifying Long Chain-of-Thought Reasoning in LLMs

这是一篇2025年发表在arxiv中的LLM领域论文&#xff0c;主要描述了长思维链 Long Chain-of-Thought 对LLM的影响&#xff0c;以及其可能的生成机制。通过大量的消融实验证明了以下几点&#xff1a; 与shot CoT 相比&#xff0c;long CoT 的 SFT 可以扩展到更高的性能上限&…

计算机网络常识:缓存、长短连接 网络初探、URL、客户端与服务端、域名操作 tcp 三次握手 四次挥手

缓存&#xff1a; 缓存是对cpu&#xff0c;内存的一个节约&#xff1a;节约的是网络带宽资源 节约服务器的性能 资源的每次下载和请求都会造成服务器的一个压力 减少网络对资源拉取的延迟 这个就是浏览器缓存的一个好处 表示这个html页面的返回是不要缓存的 忽略缓存 需要每次…

《构建社交应用用户激励引擎:React Native与Flutter实战解析》

React Native凭借其与JavaScript和React的紧密联系&#xff0c;为开发者提供了一个熟悉且灵活的开发环境。在构建用户等级体系时&#xff0c;它能够充分利用现有的前端开发知识和工具。通过将用户在社交应用中的各种行为进行量化&#xff0c;比如发布动态的数量、点赞评论的次数…

接口自动化测试框架详解(pytest+allure+aiohttp+ 用例自动生成)

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 近期准备优先做接口测试的覆盖&#xff0c;为此需要开发一个测试框架&#xff0c;经过思考&#xff0c;这次依然想做点儿不一样的东西。 接口测试是比较讲究效…

Linux-----文件系统

文件大家都知道&#xff0c;前面的我的博客课程也为大家解释了关于文件的打开等&#xff0c;今天我们要谈论的是 文件在没被打开的时候在磁盘中的位置和找到它的方式。 画图为大家展示&#xff1a; 方便理解 我们从下面几个方面入手&#xff1a; 1. 看看物理磁盘 2. 了解一…

C++ set替换vector进行优化

文章目录 demo代码解释&#xff1a; 底层原理1. 二叉搜索树基础2. 红黑树的特性3. std::set 基于红黑树的实现优势4. 插入操作5. 删除操作6. 查找操作 demo #include <iostream> #include <set>int main() {// 创建一个存储整数的std::setstd::set<int> myS…

如何巧妙解决 Too many connections 报错?

1. 背景 在日常的 MySQL 运维中&#xff0c;难免会出现参数设置不合理&#xff0c;导致 MySQL 在使用过程中出现各种各样的问题。 今天&#xff0c;我们就来讲解一下 MySQL 运维中一种常见的问题&#xff1a;最大连接数设置不合理&#xff0c;一旦到了业务高峰期就会出现连接…