📋 目录
🚀 ActiveMQ简介
什么是ActiveMQ?
核心概念
🏗️ 基础架构组件
📝 重要概念解释
ActiveMQ vs 其他消息中间件
🔧 环境搭建
1. ActiveMQ服务端安装
Docker方式(推荐初学者)
手动安装方式
2. 验证安装
访问Web管理界面
连接参数
测试连接
🏗️ Spring Boot集成配置
1. 添加依赖
2. 配置文件
3. ActiveMQ配置类
📨 基础消息收发
1. 创建消息实体类
2. 消息生产者服务
3. 消息消费者服务
4. 测试控制器
🎯 消息模式详解
1. 点对点模式(Queue)
特点
实现示例
2. 发布订阅模式(Topic)
特点
实现示例
🚀 高级特性
1. 消息选择器(Message Selector)
基于消息属性过滤
2. 事务消息
JMS事务配置
3. 消息确认机制
手动确认模式
4. 死信队列(DLQ)
死信队列配置
🖥️ Web管理界面
ActiveMQ Web Console使用
1. 访问管理界面
2. 主要功能
队列管理
主题管理
连接监控
消息浏览
2. 自定义监控页面
创建监控控制器
创建监控页面
📊 监控和管理
1. Spring Boot Actuator集成
添加Actuator依赖
配置监控端点
自定义健康指示器
2. 性能监控
消息处理性能监控
3. 日志配置
logback-spring.xml
🎯 实战案例
⚡ 性能优化
💡 最佳实践
❓ 常见问题解决
🚀 ActiveMQ简介
什么是ActiveMQ?
ActiveMQ是Apache软件基金会开发的开源消息中间件,是Java消息服务(JMS)的完整实现,具有高性能、可靠性强、易于使用的特点。
核心概念
🏗️ 基础架构组件
Producer(生产者):负责发送消息
Consumer(消费者):负责接收和处理消息
Broker(代理):消息服务器,负责存储和转发消息
Destination(目的地):消息的目标,包括Queue和Topic
📝 重要概念解释
- Queue(队列):点对点模式,一条消息只能被一个消费者消费
- Topic(主题):发布订阅模式,一条消息可以被多个消费者消费
- JMS(Java Message Service):Java消息服务API标准
- Message(消息):传输的数据单元
- Session(会话):生产和消费消息的上下文
- Connection(连接):客户端与消息服务器的网络连接
ActiveMQ vs 其他消息中间件
特性 | ActiveMQ | RabbitMQ | RocketMQ |
---|---|---|---|
开发语言 | Java | Erlang | Java |
协议支持 | JMS、AMQP、STOMP | AMQP | 自定义协议 |
管理界面 | Web Console | Management UI | Console |
集群支持 | ✅ | ✅ | ✅ |
事务支持 | ✅ | ✅ | ✅ |
消息持久化 | ✅ | ✅ | ✅ |
🔧 环境搭建
1. ActiveMQ服务端安装
Docker方式(推荐初学者)
# 1. 拉取ActiveMQ镜像
docker pull webcenter/activemq:latest# 2. 启动ActiveMQ容器
docker run -d \--name activemq \-p 61616:61616 \-p 8161:8161 \webcenter/activemq:latest# 3. 查看容器状态
docker ps | grep activemq# 4. 查看日志
docker logs activemq
手动安装方式
# 1. 下载ActiveMQ
wget https://archive.apache.org/dist/activemq/5.17.3/apache-activemq-5.17.3-bin.tar.gz# 2. 解压
tar -zxvf apache-activemq-5.17.3-bin.tar.gz# 3. 启动ActiveMQ
cd apache-activemq-5.17.3
./bin/activemq start# 4. 停止ActiveMQ
./bin/activemq stop# 5. 查看状态
./bin/activemq status
2. 验证安装
访问Web管理界面
URL: http://localhost:8161/admin
默认用户名: admin
默认密码: admin
连接参数
JMS连接URL: tcp://localhost:61616
Web管理端口: 8161
JMX端口: 1099
测试连接
# 使用ActiveMQ自带的测试工具
cd apache-activemq-5.17.3# 启动消费者
./bin/activemq consumer# 启动生产者(新开终端)
./bin/activemq producer
🏗️ Spring Boot集成配置
1. 添加依赖
<dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot ActiveMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!-- ActiveMQ连接池 --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 开发工具 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency>
</dependencies>
2. 配置文件
application.yml
# ActiveMQ配置
spring:activemq:broker-url: tcp://localhost:61616 # ActiveMQ服务器地址user: admin # 用户名password: admin # 密码in-memory: false # 不使用内存模式pool:enabled: true # 启用连接池max-connections: 50 # 最大连接数idle-timeout: 30000 # 空闲超时时间(毫秒)packages:trust-all: true # 信任所有包(开发环境)trusted: com.example.model # 信任的包(生产环境推荐)# JMS配置jms:pub-sub-domain: false # false=Queue模式,true=Topic模式template:default-destination: default-queue # 默认目的地delivery-mode: persistent # 消息持久化模式priority: 100 # 消息优先级(0-255)time-to-live: 36000000 # 消息存活时间(毫秒)receive-timeout: 10000 # 接收超时时间(毫秒)# 应用配置
server:port: 8080# 日志配置
logging:level:org.apache.activemq: INFOorg.springframework.jms: DEBUGcom.example: DEBUG
application.properties(可选)
# ActiveMQ连接配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false# 连接池配置
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring.activemq.pool.idle-timeout=30000# JMS配置
spring.jms.pub-sub-domain=false
spring.jms.template.default-destination=default-queue
spring.jms.template.delivery-mode=persistent
3. ActiveMQ配置类
基础配置
package com.example.config;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageType;@Configuration
@EnableJms // 启用JMS
public class ActiveMQConfig {@Value("${spring.activemq.broker-url}")private String brokerUrl;@Value("${spring.activemq.user}")private String username;@Value("${spring.activemq.password}")private String password;/*** ActiveMQ连接工厂*/@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory() {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();factory.setBrokerURL(brokerUrl);factory.setUserName(username);factory.setPassword(password);// 信任所有包(开发环境)factory.setTrustAllPackages(true);// 生产环境推荐指定信任的包// factory.setTrustedPackages(Arrays.asList("com.example.model"));return factory;}/*** 连接池工厂*/@Beanpublic PooledConnectionFactory pooledConnectionFactory() {PooledConnectionFactory pooledFactory = new PooledConnectionFactory();pooledFactory.setConnectionFactory(activeMQConnectionFactory());pooledFactory.setMaxConnections(50); // 最大连接数pooledFactory.setIdleTimeout(30000); // 空闲超时return pooledFactory;}/*** JmsTemplate - 用于发送消息*/@Beanpublic JmsTemplate jmsTemplate() {JmsTemplate template = new JmsTemplate();template.setConnectionFactory(pooledConnectionFactory());template.setMessageConverter(jacksonJmsMessageConverter());template.setDeliveryPersistent(true); // 消息持久化template.setSessionTransacted(true); // 启用事务return template;}/*** Queue模式的监听器工厂*/@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(pooledConnectionFactory());factory.setMessageConverter(jacksonJmsMessageConverter());factory.setPubSubDomain(false); // Queue模式factory.setSessionTransacted(true); // 启用事务factory.setConcurrency("3-10"); // 并发消费者数量return factory;}/*** Topic模式的监听器工厂*/@Beanpublic DefaultJmsListenerContainerFactory topicListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(pooledConnectionFactory());factory.setMessageConverter(jacksonJmsMessageConverter());factory.setPubSubDomain(true); // Topic模式factory.setSessionTransacted(true);factory.setConcurrency("3-10");return factory;}/*** 消息转换器 - 支持对象序列化*/@Beanpublic MappingJackson2MessageConverter jacksonJmsMessageConverter() {MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();converter.setTargetType(MessageType.TEXT); // 使用文本消息converter.setTypeIdPropertyName("_type"); // 类型标识属性return converter;}
}
📨 基础消息收发
1. 创建消息实体类
package com.example.model;import java.io.Serializable;
import java.time.LocalDateTime;/*** 用户消息实体*/
public class UserMessage implements Serializable {private static final long serialVersionUID = 1L;private Long id;private String username;private String email;private String action;private LocalDateTime timestamp;private String description;// 无参构造函数(JSON反序列化需要)public UserMessage() {}// 全参构造函数public UserMessage(Long id, String username, String email, String action, String description) {this.id = id;this.username = username;this.email = email;this.action = action;this.description = description;this.timestamp = LocalDateTime.now();}// Getter和Setter方法public Long getId() { return id; }public void setId(Long id) { this.id = id; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getEmail() { return email; }public void setEmail(String email) { this.email = email; }public String getAction() { return action; }public void setAction(String action) { this.action = action; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public String getDescription() { return description; }public void setDescription(String description) { this.description = description; }@Overridepublic String toString() {return "UserMessage{" +"id=" + id +", username='" + username + '\'' +", email='" + email + '\'' +", action='" + action + '\'' +", timestamp=" + timestamp +", description='" + description + '\'' +'}';}
}/*** 订单消息实体*/
public class OrderMessage implements Serializable {private static final long serialVersionUID = 1L;private String orderId;private String userId;private String productName;private Integer quantity;private Double totalPrice;private String status;private LocalDateTime createTime;// 构造函数public OrderMessage() {}public OrderMessage(String orderId, String userId, String productName, Integer quantity, Double totalPrice, String status) {this.orderId = orderId;this.userId = userId;this.productName = productName;this.quantity = quantity;this.totalPrice = totalPrice;this.status = status;this.createTime = LocalDateTime.now();}// Getter和Setter方法省略...@Overridepublic String toString() {return "OrderMessage{" +"orderId='" + orderId + '\'' +", userId='" + userId + '\'' +", productName='" + productName + '\'' +", quantity=" + quantity +", totalPrice=" + totalPrice +", status='" + status + '\'' +", createTime=" + createTime +'}';}
}
2. 消息生产者服务
package com.example.service;import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;import javax.jms.Queue;
import javax.jms.Topic;/*** 消息生产者服务*/
@Service
public class MessageProducerService {@Autowiredprivate JmsTemplate jmsTemplate;/*** 发送简单文本消息到Queue*/public void sendTextMessage(String queueName, String message) {try {jmsTemplate.convertAndSend(queueName, message);System.out.println("✅ 文本消息发送成功到Queue: " + queueName);System.out.println(" 消息内容: " + message);} catch (Exception e) {System.err.println("❌ 文本消息发送失败: " + e.getMessage());throw new RuntimeException("消息发送失败", e);}}/*** 发送用户消息对象到Queue*/public void sendUserMessage(String queueName, UserMessage userMessage) {try {jmsTemplate.convertAndSend(queueName, userMessage);System.out.println("✅ 用户消息发送成功到Queue: " + queueName);System.out.println(" 消息内容: " + userMessage);} catch (Exception e) {System.err.println("❌ 用户消息发送失败: " + e.getMessage());throw new RuntimeException("用户消息发送失败", e);}}/*** 发送订单消息到Queue*/public void sendOrderMessage(String queueName, OrderMessage orderMessage) {try {jmsTemplate.convertAndSend(queueName, orderMessage);System.out.println("✅ 订单消息发送成功到Queue: " + queueName);System.out.println(" 订单ID: " + orderMessage.getOrderId());} catch (Exception e) {System.err.println("❌ 订单消息发送失败: " + e.getMessage());throw new RuntimeException("订单消息发送失败", e);}}/*** 发送消息到Topic(发布订阅模式)*/public void sendMessageToTopic(String topicName, Object message) {try {// 临时设置为Topic模式jmsTemplate.setPubSubDomain(true);jmsTemplate.convertAndSend(topicName, message);// 恢复Queue模式jmsTemplate.setPubSubDomain(false);System.out.println("✅ 消息发布成功到Topic: " + topicName);System.out.println(" 消息内容: " + message);} catch (Exception e) {System.err.println("❌ Topic消息发送失败: " + e.getMessage());throw new RuntimeException("Topic消息发送失败", e);}}/*** 发送带优先级的消息*/public void sendPriorityMessage(String queueName, String message, int priority) {try {jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {messagePostProcessor.setJMSPriority(priority);return messagePostProcessor;});System.out.println("✅ 优先级消息发送成功: 优先级=" + priority);System.out.println(" 消息内容: " + message);} catch (Exception e) {System.err.println("❌ 优先级消息发送失败: " + e.getMessage());}}/*** 发送延时消息*/public void sendDelayMessage(String queueName, String message, long delayTime) {try {jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {messagePostProcessor.setLongProperty("AMQ_SCHEDULED_DELAY", delayTime);return messagePostProcessor;});System.out.println("✅ 延时消息发送成功: 延时=" + delayTime + "毫秒");System.out.println(" 消息内容: " + message);} catch (Exception e) {System.err.println("❌ 延时消息发送失败: " + e.getMessage());}}/*** 批量发送消息*/public void sendBatchMessages(String queueName, String messagePrefix, int count) {try {for (int i = 1; i <= count; i++) {String message = messagePrefix + " #" + i;jmsTemplate.convertAndSend(queueName, message);}System.out.println("✅ 批量消息发送成功: " + count + " 条消息");} catch (Exception e) {System.err.println("❌ 批量消息发送失败: " + e.getMessage());}}
}
3. 消息消费者服务
package com.example.service;import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;/*** 消息消费者服务*/
@Service
public class MessageConsumerService {/*** 消费文本消息 - Queue模式*/@JmsListener(destination = "text.queue")public void receiveTextMessage(String message) {try {System.out.println("📨 接收到文本消息: " + message);// 模拟业务处理processTextMessage(message);System.out.println("✅ 文本消息处理完成");} catch (Exception e) {System.err.println("❌ 文本消息处理失败: " + e.getMessage());throw new RuntimeException("消息处理失败", e);}}/*** 消费用户消息对象 - Queue模式*/@JmsListener(destination = "user.queue")public void receiveUserMessage(UserMessage userMessage) {try {System.out.println("📨 接收到用户消息: " + userMessage);// 根据用户行为进行不同处理switch (userMessage.getAction().toLowerCase()) {case "register":handleUserRegistration(userMessage);break;case "login":handleUserLogin(userMessage);break;case "logout":handleUserLogout(userMessage);break;case "update":handleUserUpdate(userMessage);break;default:System.out.println("🤔 未知的用户行为: " + userMessage.getAction());}System.out.println("✅ 用户消息处理完成");} catch (Exception e) {System.err.println("❌ 用户消息处理失败: " + e.getMessage());throw new RuntimeException("用户消息处理失败", e);}}/*** 消费订单消息 - Queue模式*/@JmsListener(destination = "order.queue")public void receiveOrderMessage(OrderMessage orderMessage) {try {System.out.println("📨 接收到订单消息: " + orderMessage);// 根据订单状态进行处理switch (orderMessage.getStatus().toLowerCase()) {case "created":handleOrderCreated(orderMessage);break;case "paid":handleOrderPaid(orderMessage);break;case "shipped":handleOrderShipped(orderMessage);break;case "completed":handleOrderCompleted(orderMessage);break;case "cancelled":handleOrderCancelled(orderMessage);break;default:System.out.println("🤔 未知的订单状态: " + orderMessage.getStatus());}System.out.println("✅ 订单消息处理完成");} catch (Exception e) {System.err.println("❌ 订单消息处理失败: " + e.getMessage());throw new RuntimeException("订单消息处理失败", e);}}/*** 消费Topic消息 - 发布订阅模式* 多个消费者可以同时接收到同一条消息*/@JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")public void receiveNewsFromTopic(String news) {try {System.out.println("📡 [新闻订阅者1] 接收到新闻: " + news);// 处理新闻消息processNews(news, "订阅者1");System.out.println("✅ [新闻订阅者1] 新闻处理完成");} catch (Exception e) {System.err.println("❌ [新闻订阅者1] 新闻处理失败: " + e.getMessage());}}/*** 另一个Topic消费者*/@JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")public void receiveNewsFromTopic2(String news) {try {System.out.println("📡 [新闻订阅者2] 接收到新闻: " + news);// 处理新闻消息processNews(news, "订阅者2");System.out.println("✅ [新闻订阅者2] 新闻处理完成");} catch (Exception e) {System.err.println("❌ [新闻订阅者2] 新闻处理失败: " + e.getMessage());}}/*** 消费原始JMS消息(可以获取更多消息属性)*/@JmsListener(destination = "raw.message.queue")public void receiveRawMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;String content = textMessage.getText();// 获取消息属性String messageId = message.getJMSMessageID();int priority = message.getJMSPriority();long timestamp = message.getJMSTimestamp();System.out.println("