微服务--消息队列mq

1. mq简介

        消息队列是分布式系统中的异步通信中间件,采用"生产者-消费者"模型实现服务间解耦通信

核心作用

  • 服务解耦
  • 异步处理
  • 流量削峰
  • 数据同步
  • 最终一致性

消息队列模式

  • 发布/订阅模式:一对多广播
  • 工作队列模式:竞争消费
  • 死信队列:处理失败消息
  • 延迟队列:定时任务处理
  • 消息回溯:Kafka按offset重新消费

2. mq入门

        使用SpringAMQP实现HelloWorld中的基础消息队列功能,一个生产者,一个队列,一个消费者

2.1 启动mq

        打开mq下载目录,输入命令(rabbitmq-server start)启动

网址localhost:15672访问,账号密码均为guest

2.2 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gaohe</groupId><artifactId>clouddemo</artifactId><packaging>pom</packaging><version>0.0.1-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><name>clouddemo</name><description>clouddemo</description><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>3.3.3</spring-boot.version><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.gaohe.clouddemo.ClouddemoApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>

2.3 在yml配置文件中配置连接信息

spring:rabbitmq:host: localhost # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码

2.4 在publisher中利用RabbitTemplate发送信息到simple.queue队列

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;//    发送消息
@Test
public void test1(){
//        1.发送的队列String queueName1 ="hello.queue";
//        2.发送的消息String msg = "你好我哟一个帽衫";
//        3.发送rabbitTemplate.convertAndSend(queueName1,msg);
}}

2.5 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class HelloLisenner {@RabbitListener(queues = "hello.queue")public void helloQueueLisenner(String msg){System.out.println("helloQueueLisenner"+msg);}@RabbitListener(queues = "hello.queue")public void helloQueueLisenner2(String msg){System.out.println("helloQueueLisenner2"+msg);}}

3.交换机

        Exchange是消息队列系统中的消息路由中枢,负责接收生产者发送的消息并根据特定规则将消息路由到一个或多个队列中。

常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

3.1 路由交换机(FanoutExchange)

  • 在consumer服务创建一个类,添加注解,声明交换机,队列以及绑定关系对象
package com.gaohe.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FanoutConfig {//    交换机@Beanpublic FanoutExchange fanout1(){return new FanoutExchange("itgaohe.fanout");}//    定义队列@Beanpublic Queue queue1(){return new Queue("fanout.queue1");}//    队列绑定交换机@Beanpublic Binding binding1(FanoutExchange fanout1){return BindingBuilder.bind(queue1()).to(fanout1);}//    定义队列@Beanpublic Queue queue2(){return new Queue("fanout.queue2");}//    队列绑定交换机@Beanpublic Binding binding2(FanoutExchange fanout1){return BindingBuilder.bind(queue2()).to(fanout1);}
}

 

  • 在consumer服务中的监听类中添加方法进行监听
package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutLisenner {@RabbitListener(queues = "fanout.queue1")public void fanoutQueueLisenner(String msg){System.out.println("fanoutQueueLisenner:"+msg);}@RabbitListener(queues = "fanout.queue2")public void fanoutQueueLisenner2(String msg){System.out.println("fanoutQueueLisenner2:"+msg);}
}

 

  • 在publisher服务创建测试类进行测试

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
//        1.发送的队列String exName ="itgaohe.fanout";
//        2.发送的消息String msg = "你好";
//        3.发送rabbitTemplate.convertAndSend(exName,"",msg);}}

3.2 路由交换机(DirectExchange)

        交换机,队列不仅可以单独配置,也可以在监听类使用注解进行配置

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"blue","red"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"yellow","red"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}

         publisher测试类进行测试

package com.gaohe.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
//        1.发送的队列String exName ="itgaohe.direct";
//        2.发送的消息String msg = "I LOVE YOU ";
//        3.发送rabbitTemplate.convertAndSend(exName,"yellow",msg);}}

3.3 广播交换机(TopicExchange)

  • 监听类
package com.gaohe.consumer.lisenner;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class TopicLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"china.#","#.weather"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"us.#","#.weather"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}
  • 测试类
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test4(){
//        1.发送的String exName ="itgaohe.topic";
//        2.发送的消息String msg = "hello world6666";
//        3.发送rabbitTemplate.convertAndSend(exName,"aa.weather",msg);}
}

        用的最多的是路由交换机和广播交换机

4. mq消息转换器

        消息转换器是消息中间件中的数据格式转换层,负责在消息生产/消费过程中实现:

  • Java对象 ↔ 消息体序列化/反序列化

  • 消息属性(headers/properties)的自动处理

  • 不同数据格式间的相互转换

配置消息转换器

  • 父工程导入依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
  • 给提供者和消费者配置消息转换器Bean对象
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class mqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
  • 定义消费者,监听队列并消费消息

  • 测试

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/83723.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第26节 Node.js 事件

Node里很多对象会分发事件&#xff1a; 每次有连接的时候net.Server会分发事件&#xff0c;当文件打开的时候fs.readStream会分发事件。所有能分发事件的对象都是 events.EventEmitter的实例。通过require("events");能访问这个模块。 一般来说&#xff0c;事件名都…

LangChain + MCP + vLLM + Qwen3-32B 构建本地私有化智能体应用

一、私有化智能体应用 在本专栏的前面文章基于Spring AI MCP实现了本地 ChatBI 问答应用&#xff0c;本文还是依据该场景&#xff0c;采用 LangChain vLLM Qwen3-32B MCP 技术栈构建该流程&#xff0c;整体过程如下图所示&#xff1a; 实现效果如下所示&#xff1a; 关于 M…

AKS升级路线最佳实践方案

前言 Kubernetes 社区大约每 4 个月发布次要版本&#xff0c;次要版本包括新增功能和改进。补丁发布更为频繁&#xff08;有时每周都会发布&#xff09;&#xff0c;适用于次要版本中的关键 Bug 修复。修补程序版本包括针对安全漏洞或主要 bug 的修复。对于受支持版本列表以…

树莓派智能小车基本移动实验指导书

1.安装LOBOROBOT库函数 LOBOROBOT.py代码如下&#xff1a; #!/usr/bin/python # -*- coding: utf-8 -*-import time import math import smbus import RPi.GPIO as GPIODir [forward,backward, ]class PCA9685:# Registers/etc.__SUBADR1 0x02__SUBADR2 …

如何对目标检测算法RT-DETR进行创新和改进:突破瓶颈,提升性能!

更多精彩&#xff0c;详见文末~~~ 在目标检测的高速发展中&#xff0c;RT-DETR作为DETR&#xff08;DEtection TRansformer&#xff09;的高效变体&#xff0c;凭借其优异的性能和较快的推理速度&#xff0c;已经成为许多实际应用中的首选算法。然而&#xff0c;尽管RT-DETR在…

Java-String

前言 package com.kjxy.st;public class TestString1 {public static void main(String[] args) {String s1 "hello";String s2 "hello";String s3 new String("hello");String s4 new String("hello");System.out.println(s1 s2…

计算机组成原理——C/存储系统

&#x1f308;个人主页&#xff1a;慢了半拍 &#x1f525; 创作专栏&#xff1a;《史上最强算法分析》 | 《无味生》 |《史上最强C语言讲解》 | 《史上最强C练习解析》|《史上最强C讲解》|《史上最强计组》|《史上最强数据结构》 &#x1f3c6;我的格言&#xff1a;一切只是时…

什么是电输运性能

电输运性能‌是指材料在电场作用下&#xff0c;电子在材料中传输的能力和效率。具体来说&#xff0c;电输运性能包括以下几个方面&#xff1a; ‌电子的自由移动性‌&#xff1a;导体中的电子具有较大的自由移动能力&#xff0c;这是由于导体中的原子或分子结构具有一定的松散…

k3s入门教程(二)部署前后端分离程序

文章目录 部署基础服务部署Redis部署MySQL端口转发测试 运行与构建前后端镜像构建后端镜像 docker build -t ruoyi-admin:v3.8 .构建前端镜像 docker build -t ruoyi-ui:v3.8 .创建私库&#xff0c;推拉镜像 前后端应用部署后端应用部署前端应用部署 启动顺序与初始化容器修改前…

Seata如何与Spring Cloud整合?

&#x1f527; 一、整合核心步骤 1. 启动 Seata Server&#xff08;TC&#xff09; 环境准备&#xff1a; 修改 registry.conf&#xff0c;指定注册中心&#xff08;如 Nacos&#xff09;和配置中心&#xff1a;registry {type "nacos"nacos {serverAddr "l…

Python惰性函数与技术总结-由Deepseek产生

在Python中&#xff0c;惰性&#xff08;Lazy&#xff09;技术指延迟计算直到真正需要结果时才执行&#xff0c;常用于优化内存和性能。以下是常见的惰性函数和技术&#xff1a; 1. 生成器&#xff08;Generators&#xff09; 原理&#xff1a;使用 yield 返回迭代结果&#x…

轮廓 裂缝修复 轮廓修复 填补孔洞 源代码

目录 1. 形态学闭合操作填补小孔洞 完整代码: 使用 Douglas-Peucker 算法对轮廓进行多边形逼近 2.裂缝修复 轮廓修复 轮廓补全 函数封装 调用示例: 1. 形态学闭合操作填补小孔洞 完整代码: import cv2 import numpy as np# 创建模拟图像(白色区域 + 多个不规则黑洞)…

HTTP1.1

HTTP基础知识 HTTP&#xff08;HyperText Transfer Protocol&#xff09;是用于传输超文本 的应用层协议&#xff0c;采用客户端-服务器 模型。 客户端&#xff08;如浏览器&#xff09;发起请求&#xff0c;服务器响应并返回数据。 工作原理 客户端发送HTTP请求至服…

【Linux教程】Linux 生存指南:掌握常用命令,避开致命误操作

Linux 常用操作命令&#xff1a;避免误操作指南 在 Linux 系统中&#xff0c;熟练掌握常用操作命令是高效工作的基础&#xff0c;但同时也要警惕误操作带来的风险。无论是部署程序、配置防火墙、管理端口还是处理进程&#xff0c;一个小小的失误都可能导致系统故障、数据丢失等…

PHP:Web 开发领域的常青树

在当今数字化浪潮中&#xff0c;Web 开发技术日新月异&#xff0c;各种新兴语言和框架层出不穷。然而&#xff0c;PHP 作为一门经典的后端开发语言&#xff0c;依然在 Web 开发领域占据着重要地位&#xff0c;展现出强大的生命力和广泛的应用价值。 PHP 的历史与现状 PHP&…

平均数与倍数

目录 一. 平均数现期平均数基期平均数&#xff08;比较冷门&#xff09;两期平均数-比较平均数的增长量平均数的增长率 二. 倍数基期倍数 \quad 一. 平均数 \quad 现期平均数 \quad \quad \quad \quad \quad \quad \quad \quad \quad \quad 平均数速算技巧&#xff1a;削峰填谷…

一个完整的日志收集方案:Elasticsearch + Logstash + Kibana+Filebeat (三)

现在我们主要完成AI-RAG服务的扩展&#xff0c;利用ES的向量检索能力完成历史聊天记录的存储和向量检索&#xff0c;让ai聊天有记忆。 主要做法是在首次聊天完成后将对话内容写出日志到D:\dev\dev2025\EC0601\logs\chat-his.log 写出日志同时嵌入向量 向量可以从ollama的端点&…

Vue嵌套(多级)路由

一、前言 在构建中大型单页应用(SPA)时,页面结构往往比较复杂,比如仪表盘、用户中心、商品管理等模块通常包含多个子功能页面。为了更好地组织这些页面,Vue Router 提供了嵌套(多级)路由的功能。 通过嵌套路由,我们可以在父级组件中嵌入一个 <router-view> 来展…

Kubernetes 集群安全(身份认证机制、SecurityContext、Network Policy网络策略、预防配置泄露、全面加固集群安全)

Kubernetes 集群安全(身份认证机制、SecurityContext、Network Policy网络策略、预防配置泄露、全面加固集群安全) 一、Kubernetes 身份认证机制 身份认证(Authentication): 在 K8S 中,身份认证是安全访问控制的第一道大门,它的目标是: 确认请求发起者的真实身份 K8…

【VUE3】基于Vue3和Element Plus的递归组件实现多级导航栏

文章目录 前言一、递归的意义二、递归组件的实现——基于element-plus UI的多级导航栏2.1 element-plus Menu菜单官方示例2.2 接口定义2.3 组件递归2.4 父组件封装递归组件 三、完整代码——基于element-plus UI的多级导航栏3.1 组件架构3.2 types.ts3.3 menuTreeItem.vue3.4 i…