【RabbitMQ】工作队列和发布/订阅模式的具体实现

文章目录

  • 建立连接
  • 工作队列模式实现
    • 创建队列和交换机
    • 生产者代码
    • 消费者代码
    • 运行程序
      • 启动消费者
      • 启动生产者
  • 发布/订阅模式实现
    • 创建队列和交换机
    • 生产者代码
      • 创建交换机
      • 声明两个队列
      • 绑定队列和交换机
      • 发送消息
      • 完整代码
    • 消费者代码
      • 完整代码
    • 运行程序
      • 启动生产者
      • 启动消费者

建立连接

我们把建立连接时,创建的连接工厂部分创建成常量,方便后面进行使用

  • rabbitmq 包下,再创建一个 constant
package rabbitmq.constant;  public class Constants {  static public final String HOST = "localhost";  static public final int PORT = 5672;  static public final String USER_NAME = "study";  static public final String PASSWORD = "study";  static public final String VIRTUAL_HOST = "coding ";  
}

工作队列模式实现

和简单模式相比较,工作队列与之不同的就是有多个消费者,其他都一样。所以我们只需要多添加几个消费者即可

创建队列和交换机

Constants 中添加:

// 工作队列模式  
public static final String WORK_QUEUE = "work.queue ";

生产者代码

package rabbitmq.work;  import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Producer {  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 声明队列  channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);  //4. 发送消息  for (int i = 0; i < 10; i++) {  String msg = "hello work queue..." + i;  channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());  }  System.out.println("消息发送成功!");  // 5. 资源释放  channel.close();  connection.close();  }  
}

消费者代码

package rabbitmq.work;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Consumer1 {  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  // 1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 声明队列  channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);  //4. 消费消息  DefaultConsumer consumer = new DefaultConsumer(channel){  // 从队列中收到消息,就会执行的方法  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  //TODO  System.out.println("接收到消息: " + new String(body));  }  };  channel.basicConsume(Constants.WORK_QUEUE, true, consumer);  // 等待程序执行完成  Thread.sleep(2000);  // 5. 释放资源  
//        channel.close();  
//        connection.close();  }  
}
  • 多个消费者的代码都一样的

运行程序

我们先启动两个消费者,再启动生产者

  • 如果先启动生产者,再启动消费者,由于消息较少,处理较快,那么第一个启动的消费者就会瞬间把 10 条消息消费掉,所以我们先启动两个消费者,再启动生产者

启动消费者

我们将两个消费者启动

  • 我们可以看到 rabbitmq 客户端里面,work.queue 队列已经被创建了出来
  • image.png

启动生产者

在启动消费者之后,我们启动生产者,发送 10 条消息到队列中

  • 我们可以看到,连个该消费者将 10 条消息消费完了image.png

发布/订阅模式实现

在发布/订阅模式中,多了一个 Exchange 角色

Exchange 常见有三种类型,分别代表不同的路由规则

  1. Fanout: 广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe
  2. Direct: 定向,将消息交给符合指定 routingKey 的队列 (Routing 模式)
  3. Topic: 通配符,把消息交给符合 routing pattern(路由模式)的队列(Topics 模式)
    也就分别对应不同的工作模式

image.png

创建队列和交换机

Constants 中添加:

// 发布订阅模式  
public static final String FANOUT_EXCHANGE = "fanout.exchange";  
public static final String FANOUT_QUEUE1 = "fanout.queue1";  
public static final String FANOUT_QUEUE2 = "fanout.queue2";

生产者代码

发布/订阅模式的生产者代码和简单模式类似,只是有些变化

  • 需要声明交换机
  • 需要指出交换机和队列之间的关系

创建交换机

相比于生产者代码和简单模式,这一步是关键的一步。我们需要声明一个交换机,而不是使用默认交换机

channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
  • 我们会使用到 exchangeDeclare() 方法
Exchange.DeclareOk exchangeDeclare(String exchange,  
BuiltinExchangeType type,  
boolean durable,  
boolean autoDelete,  
boolean internal,  
Map<String, Object> arguments) throws IOException;

参数解释:

  1. exchange:交换机名称
  2. type:交换机类型
    • Direct("direct"):定向,直连,routing
    • Fanout("fanout"):扇形(广播),每个队列都能收到消息
    • TOPIC("topic"):通配符
    • HEADERS("headers"):参数匹配(工作时用到的少)
  3. durable:是否持久化
    • true:持久化
    • false:非持久化
    • 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
  4. autoDelete:自动删除
    • 自动删除的前提是至少有一个对类或者交换器与这个交换器绑定,之后所有与这个交换器绑定的对类或交换器都与此解绑
    • 而不是这种理解:当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器
  5. internal:内部使用,一般 false
    • 如果设置为 true,表示内部使用
    • 客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
  6. argument:参数

声明两个队列

// 如果没有一个这样的队列,会自动创建;如果有,则不创建
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);  
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

绑定队列和交换机

channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");  
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
  • 这里会用到 queueBind() 方法
queueBind(String queue, String exchange, String routingKey)

参数解释:

  1. queue:对类名称
  2. exchange:交换机名称
  3. routingKey:路由 key,路由规则
  • 如果交换机类型为 fanoutroutingKey 设置为 “”,表示每个消费者都能收到全部信息

发送消息

String msg = "hello fanout...";  
// 第二个参数 routingKey 为空。因为这是广播模式,交换机收到消息后需要全部转发(绑定的时候设为空,发送的时候也为空  
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());  
System.out.println("消息发送成功!");
  • 这里会用到 basicPublish() 方法
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)

参数解释:

  1. Exchange:交换机名称
  2. routingKey:如果交换机类型为 fanoutroutingKey 设置为 “”,表示每个消费者都能收到全部信息

完整代码

package rabbitmq.fanout;  import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Producer {  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 声明交换机  /*  Exchange.DeclareOk exchangeDeclare(String exchange,        BuiltinExchangeType type,        boolean durable,        boolean autoDelete,        boolean internal,        Map<String, Object> arguments) throws IOException;        参数解释:  exchange:交换机名称  type:交换机类型  DIRECT("direct"),定向,直连,routing  FANOUT("fanout"),扇形(广播),每个队列都能收到消息  TOPIC("topic"),通配符  HEADERS("headers"),参数匹配(工作中用的少)  durable:是否持久化  autoDelete:自动删除  internal:内部使用(一般false)  arguments:参数  */        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);  // 4. 声明队列  channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);  channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);  // 5. 绑定交换机和队列  channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");  channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");  // 6. 发布消息  String msg = "hello fanout...";  // 第二个参数 routingKey 为空。因为这是广播模式,交换机收到消息后需要全部转发(绑定的时候设为空,发送的时候也为空  channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());  System.out.println("消息发送成功!");  // 7. 释放资源  channel.close();  connection.close();  }  
}

消费者代码

主要的步骤为:

  1. 创建 Channel
  2. 接收消息,并处理

完整代码

package rabbitmq.fanout;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Consumer1 {  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  // 2.建立信道  Channel channel = connection.createChannel();  // 3.声明队列(如果队列已经存在,就不会创建;不存在就会创建)  channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);  // 4. 消费信息  DefaultConsumer consumer = new DefaultConsumer(channel) {  // 从队列中收到消息,就会执行的方法  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("接收到消息:" + new String(body));  }  };  channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);  }  }

运行程序

启动生产者

  1. 消息全转发

image.png|424

  • 我们可以看到两个队列中分别有了一条消息
  • 这就是发布订阅模式,他会把收到的消息都转发
  1. 交换机绑定了队列
    image.png|374
  • 这里,我们可以看到交换机和队列之间的绑定关系

启动消费者

消费者 1:

接收到消息:hello fanout...

消费者 2:

接收到消息:hello fanout...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/82868.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/82868.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Codeforces Round 998 (Div. 3)

A. Fibonacciness 题目大意 给你四个数字abde&#xff0c;让你找到一个中间值c&#xff0c;问 a b c a b c abc &#xff0c; b c d b c d bcd &#xff0c; c d e c d e cde 最多能有几个式子成立 解题思路 显然最多就六种情况&#xff0c;暴力枚举即可 代…

火山引擎发展初始

火山引擎是字节跳动旗下的云计算服务品牌&#xff0c;其云服务业务的启动和正式商业化时间线如下&#xff1a; 1. **初期探索&#xff08;2020年之前&#xff09;** 字节跳动在早期为支持自身业务&#xff08;如抖音、今日头条等&#xff09;构建了强大的基础设施和技术中…

【认知思维】光环效应:第一印象的持久力量

什么是光环效应 光环效应&#xff08;Halo Effect&#xff09;是指人们倾向于让对某人或某物的一个显著特质的印象影响对其他特质的评价的认知偏差。简单来说&#xff0c;当我们对某人的一个特质&#xff08;如外表、智力或某项技能&#xff09;形成积极印象时&#xff0c;我们…

Java Solon v3.3.0 发布(国产优秀应用开发基座)

Solon 框架&#xff01; Solon 是新一代&#xff0c;Java 企业级应用开发框架。从零开始构建&#xff08;No Java-EE&#xff09;&#xff0c;有灵活的接口规范与开放生态。采用商用友好的 Apache 2.0 开源协议&#xff0c;是“杭州无耳科技有限公司”开源的根级项目&#xff…

力扣-104.二叉树的最大深度

题目描述 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 class Solution { public:int maxDepth(TreeNode* root) {if(!root){return 0;}return max(maxDepth(root->left), maxDepth(root->…

单反和无反(私人笔记)

① 单反相机&#xff1a; 定义&#xff1a; 单反相机&#xff08;Single-Lens Reflex&#xff0c;SLR&#xff09;是一种带有反光镜结构的数码相机。光线通过镜头进入后&#xff0c;先被反光镜反射到五棱镜/五面镜&#xff0c;再通过取景器进入人眼。按下快门时&#xff0c;反…

超详细讲解C语言转义字符\a \b \r \t \? \n等等

转义字符 C语言有一组字符很特殊&#xff0c;叫做转义字符&#xff0c;顾名思义&#xff0c;改变原来的意思的字符。 1 \? ??)是一个三字母词&#xff0c;在以前的编译器它会被编译为] (??会被编译为[ 因此在以前输入(are you ok ??)就会被编译为are you ok ] 解决这个…

Java Spring MVC -01

SpringMVC 是一种基于 的实现 MVC 设计模式的请求驱动类型的轻量级 Web 框架&#xff0c;属于 Spring FrameWork 的后续产品&#xff0c;已经融合在 Spring Web Flow 中。 First:SpringMVC-01-SpringMVC 概述 SpringMVC 是 Spring 框架的一个模块&#xff0c;用于构建 Web 应…

Spring MessageSource 详解:如何在国际化消息中传递参数

在开发多语言应用程序时,Spring 的 MessageSource 是处理国际化(i18n)文本的核心组件。它允许我们根据用户的 Locale (区域设置) 显示不同的消息。然而,很多时候我们的消息并不是静态的,而是需要包含动态数据,比如用户名、数量、文件名等。这时,我们就需要在获取国际化消…

Datawhale 5月llm-universe 第1次笔记

课程地址&#xff1a;GitHub - datawhalechina/llm-universe: 本项目是一个面向小白开发者的大模型应用开发教程&#xff0c;在线阅读地址&#xff1a;https://datawhalechina.github.io/llm-universe/ 难点&#xff1a;配置conda环境变量 我用的vscode github方法 目录 重要…

基于Java的家政服务平台设计与实现(代码+数据库+LW)

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本家政服务平台就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&a…

Android中LinearLayout线性布局使用详解

Android中LinearLayout线性布局使用详解 LinearLayout&#xff08;线性布局&#xff09;是Android中最基础、最常用的布局之一&#xff0c;它按照水平或垂直方向依次排列子视图。 基本特性 方向性&#xff1a;可以设置为水平(horizontal)或垂直(vertical)排列权重&#xff1…

LVS+keepalived实战案例

目录 部署LVS 安装软件 创建VIP 创建保存规则文件 给RS添加规则 验证规则 部署RS端 安装软件 页面内容 添加VIP 配置系统ARP 传输到rs-2 客户端测试 查看规则文件 实现keepalived 编辑配置文件 传输文件给backup 修改backup的配置文件 开启keepalived服务 …

(C语言)超市管理系统(测试版)(指针)(数据结构)(二进制文件读写)

目录 前言&#xff1a; 源代码&#xff1a; product.h product.c fileio.h fileio.c main.c 代码解析&#xff1a; fileio模块&#xff08;文件&#xff08;二进制&#xff09;&#xff09; 写文件&#xff08;保存&#xff09; 函数功能 代码逐行解析 关键知识点 读文…

ubuntu----100,常用命令2

目录 文件与目录管理系统信息与管理用户与权限管理网络配置与管理软件包管理打包与压缩系统服务与任务调度硬件信息查看系统操作高级工具开发相关其他实用命令 在 Ubuntu 系统中&#xff0c;掌握常用命令可以大幅提升操作效率。以下是一些常用的命令&#xff0c;涵盖了文件管理…

WiFi密码查看器打开软件自动获取数据

相信有很大一部分人都不知道怎么看已经连过的WiFi密码。 你还在手动查询自己的电脑连接过得WiFi密码吗&#xff1f; —————【下 载 地 址】——————— 【本章单下载】&#xff1a;https://drive.uc.cn/s/dbbedf933dad4 【百款黑科技】&#xff1a;https://ucnygalh6…

开目新一代MOM:AI赋能高端制造的破局之道

导读 INTRODUCTION 在高端制造业智能化转型的深水区&#xff0c;企业正面临着个性化定制、多工艺场景、动态生产需求的敏捷响应以及传统MES柔性不足的考验……在此背景下&#xff0c;武汉开目信息技术股份有限公司&#xff08;简称“开目软件”&#xff09;正式发布新一代开目…

Android开发-视图基础

在Android应用开发中&#xff0c;视图&#xff08;View&#xff09;是构建用户界面的基本元素。无论是按钮、文本框还是复杂的自定义控件&#xff0c;它们都是基于View类或其子类实现的。掌握视图的基础知识对于创建功能强大且美观的应用至关重要。本文将深入探讨Android中的视…

无人机信号线被电磁干扰导致停机

问题描述&#xff1a; 无人机飞控和电调之间使用PWM信号控制时候&#xff0c;无人机可以正常起飞&#xff0c;但是在空中悬停的时候会出现某一个电机停机&#xff0c;经排查电调没有启动过流过压等保护&#xff0c;定位到电调和飞控之间的信号线被干扰问题。 信号线被干扰&am…

VSCode设置SSH免密登录

引言 2025年05月13日20:21:14 原来一直用的PyCharn来完成代码在远程服务器上的运行&#xff0c;但是PyCharm时不时同步代码会有问题。因此&#xff0c;尝试用VSCode来完成代码SSH远程运行。由于VSCode每次进行SSH连接的时候都要手动输入密码&#xff0c;为了解决这个问题在本…