结束了一个超级消耗周末,满安排之健身+梅溪湖游泳+做饭喝酒+羽毛球赛
完全力竭了,久久不能恢复过来,暂停健身安排了 端午后再继续
今日完成记录
Time | Plan | 完成情况 |
---|---|---|
7:30 - 8:10 | 有氧爬坡 | √ |
9:00 - 11:00 | RabbitMQ学习 | √ |
12:00 - 14:30 | 继续RabbitMQ学习 | √ |
14:30 - 17:30 | 小论文2 | √ |
RabbitMQ
整体架构以及核心概念
rabbitmq由消息生产者publisher、消息消费者consumer以及消息代理server broker构成,生产者生产消息,消息代理根据规则将消息放在消息队列中,消费者从消息队列消费消息。
什么是rabbitmq server broker?
rabbitmq server broker是一种消息代理(message broker),它由exchange交换机、queue消息队列和binding路由规则组成,规定了消息如何传递。
为什么需要broker?(broker的好处)
- 解耦生产者和消费者,实现异步通信!
- 提供灵活的路由策略(一对一【直连】,一对多【主题、扇出】)
- 支持消息持久化、重试等高级功能
什么是virtual host?
(简称vhost)
vhost是RabbitMQ的逻辑隔离机制,类似于文件系统中的目录或者数据库中的schema。它允许在同一个rabbitmq server上创建多个独立环境,每个vhost有自己的exchange、queue、binding和权限,彼此互不干扰。
vhost和集群的关系?
- vhost是逻辑隔离,集群式物理部署
- 集群中所有节点都会同步vhost定义,但队列数据默认只存储在创建它的节点上(除非配置镜像队列)
vhost能跨服务器共享吗?
- 不能。vhost是单服务器内的逻辑概念,不同mq server之间vhost完全独立。
快速实战(Java)
这里直接使用spring提供的基于RabbitMQ的消息收发模板:SpringAMQP
maven依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
环境配置
基本配置如下,部署rabbitmq的主机ip、端口、业务对应虚拟主机、用户名、密码
spring:rabbitmq:host: 192.168.4.144port: 5672virtual-host: /hmallusername: hmallpassword: 123321
发送信息
@Autowired
RabbitMQTemplate rt;String queueName = "hello.queue";
String msg = "hello mq";
// 使用
rt.convertAndSend(queueName, msg);
消费消息
@RabbitListener(queues = "hello.queue")
public void ListenSimpleQueue(String msg){System.out.println(msg);
}
上面两部分中的消息,并非一定是String类型,任意类型都可以,SpringAMQP会自动进行类型转换。
WorkQueues模型
任务模型,就是让多个消费者绑定到同一个队列,共同消费队列中的消息。
(1)接下来代码实现两个消费者绑定在同一个队列上以每秒50条消息的速度消费消息,并且由一个生产者以每秒钟50条消息的速度生产消息。
生产者
@SpringBootTest
public class MQTest {@Autowiredprivate RabbitTemplate rt;@Testvoid testSendMQHello() throws InterruptedException {for(int i = 0; i < 50; i ++ ) {Thread.sleep(20);rt.convertAndSend("hello.queue", "hello, dame mq + " + i + " 号消息");}}
}
消费者
@Slf4j
@Component
public class MyMQListener {@RabbitListener(queues = "hello.queue")public void listenSimpleQueue1(String msg){System.out.println("消费者1收到了simple.queue的消息 :【" + msg +"】");try {Thread.sleep(20);} catch (InterruptedException e) {}}@RabbitListener(queues = "hello.queue")public void listenSimpleQueue2(String msg){System.out.println("消费者2收到了simple.queue的消息:【" + msg +"】");try {Thread.sleep(20);} catch (InterruptedException e) {}}
}
结果如下:两个消费者依次消费生产者产生的消息
(2)实际上部署在不同设备上的代码执行速度会因为设备性能而表现有所差异,因此接下来修改消费者代码,将一个消费者消费速度改为5个消息一秒钟,观察一下结果
@RabbitListener(queues = "hello.queue")public void listenSimpleQueue2(String msg){System.out.println("消费者2收到了simple.queue的消息:【" + msg +"】");try {Thread.sleep(200);} catch (InterruptedException e) {}}
结果如下,我们发现消费者2消费速度下降了以后,它们消费的编号依旧没有区别,也就是队列分配给不同性能的消费者相同的消息量,不考虑消费者的实际消费能力,这样会导致性能差的机器持续阻塞很多消息,而性能好的机器空闲。
(3)SpringAMQP中提供了prefetch的设置,可以限制消费者获取消息队列中消息的数量
spring:rabbitmq:listener:simple:prefetch: 1
设置后测试,结果如下:可见消费能力强的消费者1不再是固定地消费编号为02468的消息,而是消费了02345678,也就是每个消费者每次只能取一条消息,消费完成后再取下一个消息。
交换机类型
交换机没有存储消息的能力,只负责转发消息,因此如果没有任何队列和交换机绑定,或者没有符合路由规则的队列,那么消息会丢失。
交换机一共有四种:
- Fanout:扇出型,广播给所有绑定到交换机的队列
- Direct:订阅型,基于RouteKey发送给订阅了消息的队列
- Topic:主题订阅,根据通配符匹配RoutingKey进行转发
- Headers:头匹配
Fanout交换机
如图,fanout交换机接收到生产者的消息会将其路由给所有绑定的消息队列,消息会在每个消息队列中被消费一次
Direct交换机
如图,direct exchange会根据消息的routing key 匹配相同的key发送到消息队列中
Topic交换机
如图,Topic Exchange也是使用routing key匹配binding key,但是允许bindingkey使用通配符。