文章目录
- 需求描述
- 创建项目
- 订单系统(生产者)
- 完善配置
- 声明队列
- 下单接口
- 启动服务
- 物流系统(消费者)
- 完善配置
- 监听队列
- 启动服务
- 格式化发送消息对象
- SimpleMessageConverter
- 定义一个对象
- 生产者代码
- 消费者
- 运行程序
- JSON
- 定义一个对象
- 生产者代码
- 定义转换器
- 消费者代码
- 运行程序
需求描述
用户下单成功之后,通知物流系统,进行发货(只涉及到应用通信,不做具体功能实现)
- 订单系统——生产者
- 物流系统——消费者
创建项目
通常情况下,订单系统和物流系统是不同团队来开发的,是两个独立的应用
- 为了方便演示,就把两个项目创建到一个文件夹下
- 图标没有发生变化,启动类也没有被识别出来
- 因为
Maven
没有被识别出来 - 我们手动加入
Maven
- 在项目目录中右键点击
pom.xml
文件,选择:Add as Maven Project
- 在项目目录中右键点击
- 因为
订单系统(生产者)
完善配置
spring.application.name=order-service
server.port=8080
#amqp://username:password@Ip:port/virtual-host
spring.rabbitmq.addresses=amqp://order:order@127.0.0.1:5672/order
声明队列
package org.example.order.config; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { // 使用简单模式来完成消息发送 @Bean public Queue orderQueue(){ return QueueBuilder.durable("order.create").build(); }
}
下单接口
package org.example.order.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RequestMapping("/order")
@RestController
public class OrderController { // 注入RabbitMQ的客户端 @Autowired private RabbitTemplate rabbitTemplate; // 下单的接口 @RequestMapping("/create") public String create(){ // 参数校验、数据库保存等等...业务代码省略 // 发送消息 // 交换机、队列是什么routingKey就是什么、字符串信息 String orderId = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("", "order.create", "订单信息,订单ID: " + orderId); return "下单成功"; }
}
- 下单成功之后,发送订单消息
启动服务
- 访问接口,模拟下单请求: http://127.0.0.1:8080/order/create
查看消息:
物流系统(消费者)
从 RabbitMQ
中接收消息
完善配置
8080
端口号已经被订单系统占用了,修改物流系统的端口号为 9090
spring.application.name=logistics-service
# 两边的端口号不能一样,他们是同时运行的
server.port=9090
#amqp://username:password@Ip:port/virtual-host
spring.rabbitmq.addresses=amqp://guest:guest@127.0.0.1:5672/order
监听队列
package org.example.logistics.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class OrderListener { @RabbitListener(queues = "order.create") // 指出需要监听的队列的名称 public void handMessage(String orderInfo) { System.out.println("接收到订单消息: " + orderInfo); // 收到消息后的处理,代码省略}
}
启动服务
访问订单系统的接口,模拟下单请求: http://127.0.0.1:8080/order/create
在物流系统的日志中,可以观察到,通过 RabbitMQ
,成功把下单信息传递给了物流系统
格式化发送消息对象
如果通过 RabbitTemplate
发送⼀个对象作为消息, 我们需要对该对象进⾏序列化
SimpleMessageConverter
默认使用的是 SimpleMessageConverter
进行序列化
定义一个对象
package org.example.order.model; import lombok.Data; import java.io.Serializable; @Data
public class OrderInfo implements Serializable { private String orderId; private String name;
}
- 在发送消息的时候,信息需要经过
MessageConverter
进行转换 (默认是SimpleMessageConverter
) SimpleMessageConverter
只支持String
、byte[]
、Serializable
- 此接口将
OrderInfo
序列化,之后才能被正常接收 (OrderInfo
类型不被支持,会报500
错误)
生产者代码
package org.example.order.controller; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.Random;
import java.util.UUID; @RequestMapping("/order")
@RestController
public class OrderController { // 注入RabbitMQ的客户端 @Autowired private RabbitTemplate rabbitTemplate; // 下单的接口 @RequestMapping("/create2") public String create2(){ // 发送对象 OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(UUID.randomUUID().toString()); orderInfo.setName("商品" + new Random().nextInt(100)); rabbitTemplate.convertAndSend("", "order.create", orderInfo); return "下单成功"; }
}
消费者
package org.example.logistics.listener; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
@RabbitListener(queues = "order.create") // 指出需要监听的队列的名称
public class OrderListener { @RabbitHandler public void handMessage(String orderInfo) { System.out.println("接收到订单消息String: " + orderInfo); // 收到消息后的处理,代码省略 }
}
运行程序
访问订单系统的接口,模拟下单请求: http://127.0.0.1:8080/order/create2
观察发送的消息
- 可以看到消息的可读性太差
- 所以我们使用
JSON
序列化
JSON
使用 SimpleMessageConverter
序列化可读性太差,Spring AMQP
推荐使用 JSON
序列化
Spring AMQP
提供了Jsckson2JsonMessageConverter
和MappingJackson2MessageConverter
等转换器- 我们需要把一个
MessageConverter
设置到RabbitTemplate
中
定义一个对象
package org.example.order.model; import lombok.Data; @Data
public class OrderInfo { private String orderId; private String name;
}
生产者代码
package org.example.order.controller; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.Random;
import java.util.UUID; @RequestMapping("/order")
@RestController
public class OrderController { // 注入RabbitMQ的客户端 @Autowired private RabbitTemplate rabbitTemplate; // 下单的接口 @RequestMapping("/create2") public String create2(){ // 发送对象 OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(UUID.randomUUID().toString()); orderInfo.setName("商品" + new Random().nextInt(100)); rabbitTemplate.convertAndSend("", "order.create", orderInfo); return "下单成功"; }
}
- 和前面的使用默认转换器代码一样
定义转换器
package org.example.order.config; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { // 使用简单模式来完成消息发送 @Bean public Queue orderQueue(){ return QueueBuilder.durable("order.create").build(); } /** * 创建一个 rabbitTemplate 对象 * @return */ @Bean public Jackson2JsonMessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jsonMessageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter); return rabbitTemplate; }
}
- 创建出一个
rabbitTemplate
对象进行使用 - 生产者(
order-service
)和消费者(logistics-service
)都需要- 不然还是拿不到消息
消费者代码
package org.example.logistics.listener; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
@RabbitListener(queues = "order.create") // 指出需要监听的队列的名称
public class OrderListener { @RabbitHandler public void handMessage2(OrderInfo orderInfo) { System.out.println("接收到订单消息OrderInfo: " + orderInfo); // 收到消息后的处理,代码省略 }
}
@RabbitListener(queues = "order.create")
可以加在类上,也可以加在方法上,用于定义一个类或者方法作为消息的监听器
@RabbitHandler
是一个方法级别的注解,当使用@RabbitHandler
注解时,这个方法将被调用处理特定的消息
- 根据调用的类型,调用相关方法
运行程序
访问订单系统的接口,模拟下单请求: http://127.0.0.1:8080/order/create2
前面的 String
也还可以接收到,只要模拟下单请求: http://127.0.0.1:8080/order/create 即可
- 归功于
@RabbitHandler
,指哪打哪