接收rabbitmq消息

以下是一个使用纯Java(非Spring Boot)接收RabbitMQ消息的完整实现,包含Maven依赖和持续监听消息的循环:

1. 首先添加Maven依赖 (pom.xml)

<dependencies><!-- RabbitMQ Java Client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><!-- 日志框架 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency>
</dependencies>

2. RabbitMQ消息接收器实现

import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitMQReceiver {private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class);private final ConnectionFactory factory;private Connection connection;private Channel channel;private volatile boolean running = true;public RabbitMQReceiver(String host, int port, String username, String password) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);}public void startListening(String queueName) {try {// 建立连接connection = factory.newConnection();channel = connection.createChannel();// 声明队列(如果不存在则创建)channel.queueDeclare(queueName, true, false, false, null);logger.info("连接到队列: {}", queueName);// 设置每次只接收一条消息(公平分发)channel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);try {// 在这里处理你的业务逻辑processMessage(message);// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {logger.error("消息处理失败", e);// 处理失败时拒绝消息(不重新入队)channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}};// 取消消费者回调CancelCallback cancelCallback = consumerTag -> {logger.warn("消费者被取消: {}", consumerTag);};// 开始消费消息channel.basicConsume(queueName, false, deliverCallback, cancelCallback);logger.info("开始监听消息... (按CTRL+C停止)");// 保持程序运行while (running) {Thread.sleep(1000); // 防止CPU空转}} catch (IOException | TimeoutException | InterruptedException e) {logger.error("RabbitMQ连接失败", e);} finally {closeResources();}}private void processMessage(String message) {// 这里是你的业务逻辑处理logger.info("处理消息: {}", message);// 示例:打印消息长度System.out.println("消息长度: " + message.length());}public void stop() {running = false;logger.info("停止监听...");}private void closeResources() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMQ连接已关闭");} catch (IOException | TimeoutException e) {logger.error("关闭资源时出错", e);}}public static void main(String[] args) {// 配置RabbitMQ连接参数String host = "localhost";int port = 5672;String username = "guest";String password = "guest";String queueName = "my_queue";RabbitMQReceiver receiver = new RabbitMQReceiver(host, port, username, password);// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {receiver.stop();receiver.closeResources();}));// 开始监听receiver.startListening(queueName);}
}

关键功能说明:

  1. 持续监听机制

    while (running) {Thread.sleep(1000); // 防止CPU空转
    }
    

    使用running标志控制循环,优雅退出

  2. 消息处理流程

    • 声明队列确保存在
    • 设置QoS为1(公平分发)
    • 使用DeliverCallback处理消息
    • 手动消息确认(ACK/NACK)
    • 异常处理与错误恢复
  3. 资源管理

    • 使用finally块确保关闭连接
    • 添加Shutdown Hook处理程序终止
    • 线程安全的状态管理(volatile running
  4. 日志记录

    • 使用SLF4J进行日志记录
    • 关键操作都有日志输出

使用说明:

  1. 启动消费者

    mvn compile exec:java -Dexec.mainClass="RabbitMQReceiver"
    
  2. 发送测试消息(使用RabbitMQ管理界面或命令行工具):

    rabbitmqadmin publish exchange=amq.default routing_key=my_queue payload="hello world"
    
  3. 停止程序

    • CTRL+C优雅停止
    • 程序会自动关闭连接

自定义配置:

  1. 修改连接参数

    String host = "your.rabbitmq.host";
    int port = 5672;
    String username = "your_user";
    String password = "your_password";
    String queueName = "your_queue_name";
    
  2. 自定义消息处理
    修改processMessage方法实现你的业务逻辑:

    private void processMessage(String message) {// 示例:解析JSON消息// JSONObject json = new JSONObject(message);// System.out.println("收到订单: " + json.getString("orderId"));// 你的实际业务逻辑
    }
    
  3. 配置调整

    • 修改channel.basicQos()调整预取数量
    • 修改basicNackrequeue参数控制是否重新入队
    • 添加交换机绑定逻辑(如果需要)

这个实现遵循了RabbitMQ最佳实践,包括:

  • 手动消息确认
  • 公平分发(QoS设置)
  • 连接和通道的异常处理
  • 资源清理
  • 优雅关闭机制

如果需要处理更复杂的场景(如多个队列、消息持久化、死信队列等),可以在channel.queueDeclarechannel.basicConsume方法中添加相应参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/86597.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL进阶之旅 Day 23:事务隔离级别与性能优化

【SQL进阶之旅 Day 23】事务隔离级别与性能优化 文章简述 在数据库系统中&#xff0c;事务是确保数据一致性和完整性的核心机制。随着业务复杂度的提升&#xff0c;如何合理设置事务隔离级别以平衡并发性能与数据一致性成为开发人员必须掌握的关键技能。本文深入解析事务隔离级…

六.原型模式

一.原型模式的定义 原型模式是一种创建型设计模式&#xff0c;通过复制现有对象&#xff08;原型&#xff09;生成新对象&#xff0c;避免重复初始化成本。需了解以下关键概念&#xff1a; ‌浅拷贝‌&#xff1a;复制基本类型字段&#xff0c;引用类型字段共享内存地址&#…

【笔记】LoRA 理论与实现|大模型轻量级微调

论文链接&#xff1a;LoRA: Low-Rank Adaptation of Large Language Models 官方实现&#xff1a;microsoft/LoRA 非官方实现&#xff1a;huggingface/peft、huggingface/diffusers 这篇文章要介绍的是一种大模型/扩散模型的微调方法&#xff0c;叫做低秩适应&#xff08;也就是…

Cilium动手实验室: 精通之旅---15.Isovalent Enterprise for Cilium: Network Policies

Cilium动手实验室: 精通之旅---15.Isovalent Enterprise for Cilium: Network Policies 1. 环境信息2. 测试环境部署3. 默认规则3.1 测试默认规则3.2 小测验 4. 网络策略可视化4.1 通过可视化创建策略4.2 小测试 5. 测试策略5.1 应用策略5.2 流量观测5.3 Hubble观测5.4 小测试 …

opencv RGB图像转灰度图

这段代码的作用是将一个 3通道的 RGB 图像&#xff08;CV_8UC3&#xff09;转换为灰度图像&#xff08;CV_8UC1&#xff09;&#xff0c;并使用 OpenCV 的 parallel_for_ 对图像处理进行并行加速。 &#x1f50d; 一、函数功能总结 if (CV_8UC3 img.type()) {// 创建灰度图 d…

React Hooks 的原理、常用函数及用途详解

1. ​​Hooks 是什么&#xff1f;​​ Hooks 是 React 16.8 引入的函数式组件特性&#xff0c;允许在不编写 class 的情况下使用 state 和其他 React 特性&#xff08;如生命周期、副作用等&#xff09;。​​本质是一类特殊函数​​&#xff0c;它们挂载到 React 的调度系统中…

学习路之PHP--webman协程学习

学习路之PHP--webman协程学习 一、准备二、配置三、启动四、使用 协程是一种比线程更轻量级的用户级并发机制&#xff0c;能够在进程中实现多任务调度。它通过手动控制挂起和恢复来实现协程间的切换&#xff0c;避免了进程上下文切换的开销 一、准备 PHP > 8.1 Workerman &g…

linux libusb使用libusb_claim_interface失败(-6,Resource busy)解决方案

linux libusb使用libusb_claim_interface失败&#xff08;-6&#xff0c;Resource busy&#xff09;解决方案 ✅ 问题原因&#x1f6e0;️ 解决方案&#x1f538; 方法一&#xff1a;分离内核驱动 libusb_detach_kernel_driver()&#x1f538; 方法二&#xff1a;使用 usb-devi…

使用mpu6500/6050, PID,互补滤波实现一个简单的飞行自稳控制系统

首先&#xff0c;参考ai给出的客机飞机的比较平稳的最大仰府&#xff0c;偏转&#xff0c;和防滚角度&#xff0c;如下&#xff1a; 客机的最大平稳仰俯&#xff08;Pitch&#xff09;、偏转&#xff08;Yaw&#xff09;和防滚&#xff08;Roll&#xff09;角度&#xff0c;通…

深度解析AD7685ARMZRL7:16位精密ADC在低功耗系统中的设计价值

产品概述 AD7685ARMZRL7是16位逐次逼近型&#xff08;SAR&#xff09;ADC&#xff0c;采用MSOP-10紧凑封装。其核心架构基于电荷再分配技术&#xff0c;支持2.3V至5.5V单电源供电&#xff0c;集成低噪声采样保持电路与内部转换时钟。器件采用伪差分输入结构&#xff08;IN/-&a…

EXCEL 实现“点击跳转到指定 Sheet”的方法

&#x1f4cc; WPS 表格技巧&#xff1a;如何实现点击单元格跳转到指定 Sheet 在使用 WPS 表格&#xff08;或 Excel&#xff09;时&#xff0c;我们经常会希望通过点击一个单元格&#xff0c;直接跳转到工作簿中的另一个工作表&#xff08;Sheet&#xff09;。这在制作目录页…

Python格式化:让数据输出更优雅

Python格式化&#xff1a;让数据输出更优雅 Python的格式化功能能让数据输出瞬间变得优雅又规范。不管是对齐文本、控制数字精度&#xff0c;还是动态填充内容&#xff0c;它都能轻松搞定。 一、基础格式化&#xff1a;从简单拼接开始 1. 百分号&#xff08;%&#xff09;格式…

2025年渗透测试面试题总结-小鹏[实习]安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 小鹏[实习]安全工程师 1. 自我介绍 2. 有没有挖过src&#xff1f; 3. 平时web渗透怎么学的&#xff0c;有…

VSCode科技风主题设计详细指南

1. 科技风设计的核心特点 科技风设计是一种强调未来感、现代感和高科技感的设计风格,在VSCode主题设计中,可以通过以下几个核心特点来体现: 1.1 色彩特点 冷色调为主:蓝色、紫色、青色等冷色调是科技风设计的主要色彩高对比度:深色背景配合明亮的霓虹色,形成强烈的视觉…

android知识总结

Activity启动模式 standard (标准模式) 每次启动该 Activity&#xff08;例如&#xff0c;通过 startActivity()&#xff09;&#xff0c;系统总会创建一个新的实例&#xff0c;并将其放入调用者&#xff08;启动它的那个 Activity&#xff09;所在的任务栈中。 singleTop (栈…

第3章 MySQL数据类型

MySQL数据类型 1、数字数据类型1.1 整数类型1.2 定点类型1.3 浮点类型1.4位值类型1.5 超出范围和溢出处理1.5.1 超出范围处理1.5.2 溢出处理 2、日期和时间数据类型3、字符串数据类型3.1 char和varchar类型3.2 binary和varbinary类型3.3 blob 和 text类型3.4 enum类型3.4.1 创建…

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…

mysql为什么一个表中不能同时存在两个字段自增

背景。设置sort自增。会引发错误 通常自增字段都是用于表示数据的唯一性。数据库限制。需要自定义排序字段大小。

牛客round95D

原题链接&#xff1a;D-小红的区间修改&#xff08;一&#xff09;_牛客周赛 Round 95 题目背景&#xff1a; 初始拥有一个长度10^100元素全为0的数组&#xff0c;进行q查询&#xff0c;每次查询如果区间内的元素都为0就将区间变为首项为 1、公差为 1 的等差数列&#xff1b;否…

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成