如何监听RabbitMQ队列
简单代码实现RabbitMQ消息监听
需要的依赖
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>x.x.x</version></dependency>
消息监听示例
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RabbitMQConfig {private String addresses = "localhost:5672";private String username = "xxx";private String password = "xxx";@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(this.addresses);connectionFactory.setUsername(this.username);connectionFactory.setPassword(this.password);return connectionFactory;}@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("queue1", "queue2");//要监听的队列名称// 设置消息监听器container.setMessageListener((message) -> {log.info("Received message: " + new String(message.getBody()));});return container;}}
手动ACK
前面的示例是默认的自动ACK,自动确认消息投递成功。但是业务场景需要执行成功后,才确认这条消息被消费,此时我们需要设置手动ACK,只有当执行成功,才确认这条消息被消费。
@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("queue1", "queue2");//要监听的队列名称// 设置消息监听器container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动ACK,不设置默认是AUTOcontainer.setMessageListener(messageListener());return container;}@Beanpublic ChannelAwareMessageListener messageListener() {return (message, channel) -> {try {String msg = new String(message.getBody());log.info("Received: " + msg);// 业务逻辑处理boolean success = processMessage(msg);if (success) {// 处理成功,手动ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {// 处理失败,手动NACK(不重新入队)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}} catch (Exception e) {// 异常情况,NACK并重新入队(可选)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}};}/*** 处理消息* @param msg* @return* @throws Exception*/private boolean processMessage(String msg) throws Exception {boolean success = false;int i = RandomUtil.randomInt();if (i % 3 == 0) {success = true;} else if (i % 3 == 1) {success = false; //模拟失败} else {log.error("处理消息:" + msg + ", 模拟发生故障");throw new Exception("模拟发生故障");//模拟异常}log.info("处理消息:" + msg + ", flag:" + success);return success;}