SpringBoot 系列之集成 RabbitMQ 实现高效流量控制

系列博客专栏:

  • JVM系列博客专栏
  • SpringBoot系列博客

Spring Boot 2.2.1 集成 RabbitMQ 实现高效流量控制

在分布式系统中,消息队列是实现异步通信、解耦服务的重要组件。RabbitMQ 作为一款成熟的开源消息队列,广泛应用于各类项目中。本文将结合 Spring Boot 2.2.1,详细介绍如何集成 RabbitMQ 并实现基于队列长度、内存和磁盘的流量控制,同时引入服务端限流配置,进一步提升系统的稳定性与可靠性。

一、RabbitMQ 流量控制的重要性

当消息产生速度过快,超过消息队列的处理能力时,可能会导致队列积压、系统性能下降甚至崩溃。通过流量控制,可以有效限制消息的流入速度,使系统能够在合理的负载下运行,保障服务的稳定性和可靠性。

二、Spring Boot 2.2.1 集成 RabbitMQ 基础配置

1. 引入依赖

pom.xml 文件中添加 Spring Boot AMQP 和 Web 依赖:

<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- JSON处理依赖 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- RabbitMQ测试依赖 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置文件

application.yml 中配置 RabbitMQ 连接信息和相关参数:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /requested-heartbeat: 30connection-timeout: 10000publisher-confirms: truepublisher-returns: truelistener:simple:acknowledge-mode: autoprefetch: 50concurrency: 3max-concurrency: 10cache:channel:size: 50checkout-timeout: 30000connection:mode: CHANNELsize: 5# 自定义流量控制配置
app:flow-control:max-messages: 1000duration: 5000

3. RabbitMQ 配置类

创建 RabbitMQConfig 类,配置队列、交换机、绑定关系、消息转换器以及 RabbitTemplate:

package com.example.springboot.rabbitmq.configuration;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class RabbitMQConfig {public static final String QUEUE_NAME = "flow.control.queue";public static final String EXCHANGE_NAME = "flow.control.exchange";public static final String ROUTING_KEY = "flow.control.key";// 配置队列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}// 配置交换机@Beanpublic DirectExchange exchange() {return new DirectExchange(EXCHANGE_NAME);}// 绑定队列和交换机@Beanpublic Binding binding(Queue queue, DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}// 配置消息转换器@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);// 设置mandatory标志,确保消息在无法路由时返回rabbitTemplate.setMandatory(true);// 设置发布确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息发送成功: {}",  correlationData);} else {log.warn("消息发送失败: {}",  cause);}});// 设置返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息被退回: {}", new String(message.getBody()));log.info("回复码: ", replyCode);log.info("回复文本: ", replyText);log.info("交换机: ", exchange);log.info("路由键: ", routingKey);});return rabbitTemplate;}// 配置监听器容器工厂@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter);factory.setConcurrentConsumers(3); // 设置并发消费者数量factory.setMaxConcurrentConsumers(10);factory.setPrefetchCount(50); // 设置 QoSfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认模式return factory;}
}

三、基于队列长度的流量控制

MessageProducer 类中实现基于队列长度的流量控制逻辑:

package com.example.demo.service;import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;private final AtomicInteger messageCount = new AtomicInteger(0);private static final int MAX_MESSAGES = 1000;private volatile boolean flowControlEnabled = false;public void sendMessage(String message) {if (flowControlEnabled) {System.out.println("流量控制已启用,暂停发送消息");return;}if (messageCount.get() >= MAX_MESSAGES) {System.out.println("达到最大消息数量,触发流量控制");enableFlowControl(5000);return;}String correlationId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(correlationId);rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,message,correlationData);messageCount.incrementAndGet();System.out.println("发送消息: " + message + ", 消息ID: " + correlationId);}public void enableFlowControl(long durationMillis) {flowControlEnabled = true;System.out.println("流量控制已启用,持续时间: " + durationMillis + "ms");new Thread(() -> {try {Thread.sleep(durationMillis);} catch (InterruptedException e) {Thread.currentThread().interrupt();}flowControlEnabled = false;messageCount.set(0);System.out.println("流量控制已禁用");}).start();}
}

除了用代码限制外,可以用maxLength设置,示例代码:

 // 配置队列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}

四、x-max-length-bytes 参数详解

x-max-length-bytes 用于限制队列中消息的总字节数。在创建队列时,可以通过代码配置:

@Bean
public Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).maxLengthBytes(1024 * 1024 * 10) // 设置队列消息总字节数上限为10MB.build();
}

当队列中消息的总字节数达到设定的阈值时,后续新消息的处理策略由 x-overflow 参数决定:

  • drop-head:丢弃队列头部的消息,为新消息腾出空间。
  • reject-publish:拒绝接收新消息,并向生产者返回 Basic.Reject 响应。

五、基于内存和磁盘的流量控制

通过配置 RabbitMQ 服务器的内存和磁盘告警阈值,当服务器内存使用或磁盘空间达到阈值时,会自动触发流量控制。例如:

rabbitmqctl set_vm_memory_high_watermark 0.6

此命令将内存高水位线设置为系统内存的 60%。

六、服务端限流配置

1. 基于 Guava 的限流实现

添加 Guava 依赖:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>28.2-jre</version>
</dependency>

使用 RateLimiter 进行限流:

package com.example.demo.service;import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Service;@Service
public class LimitedService {private final RateLimiter rateLimiter = RateLimiter.create(5);public void limitedMethod() {if (rateLimiter.tryAcquire()) {System.out.println("请求被处理");} else {System.out.println("请求被限流");}}
}

七、 消费端限流

默认情况下,如果不进行配置,RabbitMQ会尽可能快速地把队列中的消息发送到消费者。如果消息数量过多,可能会导致OOM或者影响其他进程的正常运行

1. 消费端限流示例

package com.example.springboot.rabbitmq.service;import com.example.springboot.rabbitmq.configuration.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;@Service
@Slf4j
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)@Retryable(value = {IOException.class}, maxAttempts = 3,backoff = @Backoff(delay = 2000, multiplier = 2))public void receiveMessage(Message message, Channel channel) throws IOException {try {if (channel == null || !channel.isOpen()) {log.warn("Channel is closed or null, unable to process message");return;}// 动态设置预取计数channel.basicQos(calculatePrefetchCount());String content = new String(message.getBody());log.info("接收到消息:{} ", content);// 模拟消息处理时间Thread.sleep(100);// 发送消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("消息处理完成");} catch (Exception e) {log.error("处理消息时发生错误: {}", e.getMessage(), e);if (channel != null && channel.isOpen()) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 失败后重新入队}}}// 根据系统负载动态计算预取计数private int calculatePrefetchCount() {double cpuLoad = getSystemCpuLoad();int basePrefetch = 10;return (int) Math.max(1, basePrefetch * (1 - cpuLoad));}// 获取当前系统 CPU 负载private double getSystemCpuLoad() {OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();return osBean.getSystemLoadAverage() / osBean.getAvailableProcessors();}}

八、总结

通过上述配置和代码示例,您可以实现对 RabbitMQ 的高效流量控制,从而提升系统的稳定性和可靠性。合理利用队列长度限制、内存和磁盘流量控制,以及服务端限流策略,可以帮助系统在高负载情况下保持良好的运行状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/83490.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新视讯影视官网入口,影视动漫在线播放网站

新视讯影视是一个免费为广大追剧迷提供在线播放服务的影视平台&#xff0c;深受众多影视爱好者的喜爱。它涵盖了大量免费的VIP电视剧资源、最新上映的大片、好看的综艺节目以及动漫视频&#xff0c;是一个播放速度快、资源多的免费影视网站。用户无需注册或登录&#xff0c;即可…

【使用】【经验】docker 清理未使用的镜像的命令

docker images prune在 Docker 中清理未使用的镜像&#xff08;包括悬空镜像和完全未被引用的镜像&#xff09;&#xff0c;可以使用以下命令&#xff1a; 1. ​删除所有悬空镜像​&#xff08;推荐常用&#xff09; docker image prune​悬空镜像 (dangling images)​​ 是指…

OpenCV CUDA模块图像处理------图像融合函数blendLinear()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 该函数执行 线性融合&#xff08;加权平均&#xff09; 两个图像 img1 和 img2&#xff0c;使用对应的权重图 weights1 和 weights2。 融合公式…

【Typst】6.布局函数

概述 上节我们介绍了文档结构元素的函数&#xff0c;本节介绍一些控制布局使用的函数&#xff0c;掌握他们之后你可以更进一步的控制页面元素的布局。 系列目录 1.Typst概述2.Typst标记语法和基础样式3.Typst脚本语法4.导入、包含和读取5.文档结构元素与函数6.布局函数 对齐…

【音视频】FFmpeg 编码H265

一、概述 实现了读入本地yuv文件&#xff0c;通过libx265编码为H265格式&#xff0c;并存储到本地文件中 二、实现流程 准备文件 在build路径下准备yuv文件 在项目中添加文件参数&#xff0c;输出为h265文件&#xff0c;使用libx265编码 初始化解码器 通过传进来的libx265…

ECreator低代码平台-文件管理器的使用说明

Ecreator是中山华拓信息技术公司旗下的一款低代码平台&#xff0c;主要功能包含&#xff1a;文件管理器&#xff0c;表单数据管理器&#xff0c;仪表盘设计界面&#xff0c;内容页面自定义等功能&#xff0c;可以用于快速低成本的构建网站和企业内部应用。 下面介绍一下文件管…

高考加油!UI界面生成器!

这个高考助力标语生成器具有以下特点&#xff1a; 视觉设计&#xff1a;采用了蓝色为主色调&#xff0c;搭配渐变背景和圆形装饰元素&#xff0c;营造出宁静而充满希望的氛围&#xff0c;非常适合高考主题。 标语生成&#xff1a;内置了超过 100 条精心挑选的高考加油标语&a…

阿姆达尔定律的演进:古斯塔夫森定律

前言 在上一篇文章《使用阿姆达尔定律来提升效率》中提到的阿姆达尔定律前提是假设问题的规模保持不变&#xff0c;并且给定一台速度更快的机器&#xff0c;目标是更快地解决问题。然而&#xff0c;在大多数情况下&#xff0c;这并不完全正确。当有一台更快的机器时&#xff0…

【RabbitMQ】- Channel和Delivery Tag机制

在 RabbitMQ 的消费者代码中&#xff0c;Channel 和 tag 参数的存在是为了实现消息确认机制&#xff08;Acknowledgment&#xff09;和精细化的消息控制。 Channel 参数 作用 Channel 是 AMQP 协议的核心操作接口&#xff0c;通过它可以直接与 RabbitMQ 交互&#xff1a; 手…

核心机制:流量控制

搭配滑动窗口使用的 窗口大小 窗口越大,传输速度就越快,但是也不能无限大,太大了,对于可靠性会有影响 比如发生方以非常快的速度,发送,接收方的处理速度跟不上,也就会导致有效数据被接受方丢弃(又得重传) 流量控制,就是根据接收方的处理能力(如何衡量?),干预到发送方的发送…

深度强化学习赋能城市消防优化,中科院团队提出DRL新方法破解设施配置难题

在城市建设与发展中&#xff0c;地理空间优化至关重要。从工业园区选址&#xff0c;到公共服务设施布局&#xff0c;它都发挥着关键作用。但传统求解方法存在诸多局限&#xff0c;如今&#xff0c;深度学习技术为其带来了新的转机。 近日&#xff0c;在中国地理学会地理模型与…

安科电动机保护器通过ModbusRTU转profinet网关与PLC通讯

安科电动机保护器通过ModbusRTU转profinet网关与PLC通讯 在工业自动化领域&#xff0c;设备间的通信和数据交互至关重要。Modbus作为一种常用的通讯协议&#xff0c;广泛应用于各种工业现场&#xff1b;而Profinet则凭借其高效、实时性&#xff0c;在工业以太网通讯中占据重要…

python直方图

在Python中&#xff0c;绘制直方图&#xff08;Histogram&#xff09;是一项非常常见的任务&#xff0c;通常用于数据可视化&#xff0c;以展示数据的分布情况。Python中有多种库可以绘制直方图&#xff0c;其中最常用的两个库是Matplotlib和Seaborn。此外&#xff0c;Pandas库…

在Oxygen编辑器中使用DeepSeek

罗马尼亚公司研制开发的Oxygen编辑器怎样与国产大模型结合&#xff0c;这是今年我在tcworld大会上给大家的分享&#xff0c;需要ppt的朋友请私信联系 - 1 - Oxygen编辑器中的人工智能助手 Oxygen编辑器是罗马尼亚的Syncro Soft公司开发的一款结构化文档编辑器。 它是用来编写…

neo4j 5.19.0安装、apoc csv导入导出 及相关问题处理

前言 突然有需求需要用apoc 导入 低版本的图谱数据&#xff0c;网上资料又比较少&#xff0c;所以就看官网资料并处理了apoc 导入的一些问题。 相关地址 apoc 官方安装网址 apoc 官方导出csv 教程地址 apoc 官方 导入 csv 地址 docker 安装 执行如下命令启动镜像 doc…

macos常见且应该避免被覆盖的系统环境变量(避免用 USERNAME 作为你的自定义变量名)

文章目录 macos避免用 USERNAME 作为你的自定义变量名macos常见且应该避免被覆盖的系统环境变量 macos避免用 USERNAME 作为你的自定义变量名 问题&#xff1a; 你执行了&#xff1a;export USERNAME“admin” 然后执行&#xff1a;echo ${USERNAME} 输出却是&#xff1a;xxx …

Python训练打卡Day41

简单CNN 知识回顾 数据增强卷积神经网络定义的写法batch归一化&#xff1a;调整一个批次的分布&#xff0c;常用与图像数据特征图&#xff1a;只有卷积操作输出的才叫特征图调度器&#xff1a;直接修改基础学习率 卷积操作常见流程如下&#xff1a; 1. 输入 → 卷积层 → Batch…

【亲测有效】Mybatis-Plus中更新字段为null

Mybatis-Plus中更新字段为null 遇到问题 Mybatis-Plus更新的默认行为如下: Mybatis-Plus默认如果某个字段为null, 默认不更新这个字段, 例如有个Double类型的字段, 当前数据库数据为10, 然后传参时当前字段为null, 实际上Mybatis-Plus是不会覆盖该字段为null的 在传参的时候如…

如何使用插件和子主题添加WordPress自定义CSS(附:常见错误)

您是否曾经想更改网站外观的某些方面&#xff0c;但不知道怎么做&#xff1f;有一个解决方案——您可以将自定义 CSS&#xff08;层叠样式表&#xff09;添加到您的WordPress网站&#xff01; 在本文中&#xff0c;我们将讨论您需要了解的有关CSS的所有知识以及如何使用它来修…

左值引用和右值引用

一、基本概念 左值&#xff08;lvalue&#xff09;和右值&#xff08;rvalue&#xff09; 左值指的是有确定存储位置&#xff08;地址&#xff09;的对象&#xff0c;通常可以出现在赋值语句左侧。例如&#xff1a;变量名、解引用指针得到的对象、数组元素等都属于左值。 右值…