利用 Google Guava 的令牌桶限流实现数据处理限流控制

目录

一、令牌桶限流机制原理

二、场景设计与目标

三、核心实现代码(Java)

1. 完整代码实现

四、运行效果分析

五、应用建议


在高吞吐数据处理场景中,如何限制数据处理速率、保护系统资源、防止下游服务过载是系统设计中重要的环节。本文将介绍一种简单实用的限流方式 —— 基于 Google Guava 的令牌桶限流机制(Token Bucket),并通过实际代码演示如何将其应用于数据处理任务中。

一、令牌桶限流机制原理

令牌桶算法(Token Bucket)是一种常见的流量控制算法。其基本原理如下:

  • 系统按照固定速率(如每秒 5 个)往桶中放入令牌;

  • 每次处理数据前,需要从桶中取出一个令牌;

  • 若桶中有令牌,则允许处理数据;

  • 若桶中无令牌,当前请求会被阻塞或丢弃(根据实现策略);

  • 桶容量可设定为最大突发请求数,支持短时间突发。

在 Google 的 Guava 库中,RateLimiter 类就实现了一个基于令牌桶的限流器,它适用于:

  • 接口请求限速;

  • 后台批处理速率控制;

  • 防止后端系统过载等场景。

二、场景设计与目标

我们构建一个数据处理系统,包含以下三个核心组件:

  1. 数据生成器线程:以较高频率(每秒 2 条)不断将数据放入队列。

  2. 数据处理器线程:通过 RateLimiter 每秒只允许处理 1 条数据。

  3. 监控线程:每隔 2 秒打印队列中积压的数据数量,观测限流效果。

三、核心实现代码(Java)

使用 Guava 31+,Maven 依赖如下:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version>
</dependency>

1. 完整代码实现

package google;
​
import com.google.common.util.concurrent.RateLimiter;
​
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
​
public class TokenBucketDataProcessor {
​// 每秒最多处理5个数据private static final RateLimiter rateLimiter = RateLimiter.create(1);
​// 模拟数据管道private static final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
​public static void main(String[] args) {
​// 线程1:数据生成线程,每秒生成2条数据Thread producer = new Thread(() -> {int i = 0;while (true) {try {Thread.sleep(500); // 每100ms生成1条数据 -> 每秒10条String data = "data-" + (i++);queue.put(data);System.out.println("[Producer] Generated: " + data);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});
​// 线程2:数据处理线程,受RateLimiter限流影响Thread consumer = new Thread(() -> {while (true) {try {rateLimiter.acquire(1); // 阻塞直到拿到令牌String data = queue.take(); // 取数据System.out.println("[Consumer] Processed: " + data);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});
​// 线程3:监控线程,每2秒输出积压情况Thread monitor = new Thread(() -> {while (true) {try {Thread.sleep(2000);System.out.println("[Monitor] Queue size: " + queue.size());} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});
​producer.start();consumer.start();monitor.setDaemon(true); // 守护线程monitor.start();}
}

四、运行效果分析

预期输出如下:

[Producer] Generated: data-0
[Consumer] Processed: data-0
[Producer] Generated: data-1
[Consumer] Processed: data-1
[Producer] Generated: data-2
[Consumer] Processed: data-2
[Monitor] Queue size: 0
[Producer] Generated: data-3
[Producer] Generated: data-4
[Consumer] Processed: data-3
[Producer] Generated: data-5
[Producer] Generated: data-6
[Consumer] Processed: data-4
[Monitor] Queue size: 2
[Producer] Generated: data-7
[Producer] Generated: data-8
[Consumer] Processed: data-5
[Producer] Generated: data-9
[Producer] Generated: data-10
[Consumer] Processed: data-6
[Monitor] Queue size: 4
[Producer] Generated: data-11
[Producer] Generated: data-12
[Consumer] Processed: data-7
[Producer] Generated: data-13
[Producer] Generated: data-14
[Consumer] Processed: data-8
[Monitor] Queue size: 6
... ...

从输出中可以看到:

  • 数据生产速度 > 数据处理速度;

  • queue.size() 会逐步增长,表明数据被限流处理;

  • RateLimiter.acquire() 自动阻塞了处理线程,使得处理速度不超过 2 qps;

  • 限流处理过程对系统其他线程无影响,灵活、安全、无锁。

五、应用建议

在需要处理数据流或消息流的系统中,控制处理速率是一项必要手段:

  • Guava 的 RateLimiter 实现了高效、线程安全、非阻塞或阻塞可控的令牌桶限流机制;

  • 利用其 acquire() 阻塞等待机制,我们可以方便地将限流逻辑嵌入处理线程中;

  • 搭配 BlockingQueue 和监控线程,我们可以直观观察限流效果,并对系统做动态调优;

  • 适用场景包括但不限于:

    • Kafka 消费处理限流

    • 多线程数据清洗任务控制速率

    • 日志写入、数据库插入频率控制

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/90868.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/90868.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小黑课堂计算机二级 WPS Office题库安装包2.52_Win中文_计算机二级考试_安装教程

软件下载 【名称】&#xff1a;小黑课堂计算机二级 WPS Office题库安装包2.52 【大小】&#xff1a;584M 【语言】&#xff1a;简体中文 【安装环境】&#xff1a;Win10/Win11&#xff08;其他系统不清楚&#xff09; 【迅雷网盘下载链接】&#xff08;务必手机注册&#…

CSS3知识补充

1.伪类和伪元素&#xff1a; 简单的伪类实例 :first-chlid :last-child :only-child :invalid 用户行为伪类 :hover——上面提到过&#xff0c;只会在用户将指针挪到元素上的时候才会激活&#xff0c;一般就是链接元素。:focus——只会在用户使用键盘控制&#xff0c;选…

Spring Retry 异常重试机制:从入门到生产实践

Spring Retry 异常重试机制&#xff1a;从入门到生产实践 适用版本&#xff1a;Spring Boot 3.x spring-retry 2.x 本文覆盖 注解声明式、RetryTemplate 编程式、监听器、最佳实践 与 避坑清单&#xff0c;可直接落地生产。 一、核心坐标 <!-- Spring Boot Starter 已经帮…

VTK交互——CallData

0. 概要 这段代码https://examples.vtk.org/site/Cxx/Interaction/CallData/是一个使用VTK(Visualization Toolkit)库的示例程序,主要演示了自定义事件、回调函数和定时器的使用。程序创建一个旋转球体场景,并通过定时器触发自定义事件来更新计数器。以下是详细解释: 1.…

OCR工具集下载与保姆级安装教程!!

软件下载 软件名称&#xff1a;OCR工具集1.1 软件语言&#xff1a;简体中文 软件大小&#xff1a;78.8M 系统要求&#xff1a;Windows7或更高&#xff0c; 32/64位操作系统 硬件要求&#xff1a;CPU2GHz &#xff0c;RAM4G或更高 盘丨下载&#xff1a;https://tool.nineya…

平时遇到的错误码及场景?404?400?502?都是什么场景下什么含义,该怎么做 ?

✅ 一、常见 HTTP 错误码及含义状态码含义简述类型400Bad Request&#xff1a;请求格式有误客户端错误401Unauthorized&#xff1a;未授权客户端错误403Forbidden&#xff1a;禁止访问客户端错误404Not Found&#xff1a;资源不存在客户端错误405Method Not Allowed&#xff1a…

基于Tornado的WebSocket实时聊天系统:从零到一构建与解析

引言 在当今互联网应用中&#xff0c;实时通信已成为不可或缺的一部分。无论是社交媒体、在线游戏还是协同办公&#xff0c;用户都期待即时、流畅的交互体验。传统的HTTP协议是无状态的、单向的请求-响应模式&#xff0c;客户端发起请求&#xff0c;服务器返回响应&#xff0c…

【语义分割】记录2:yolo系列

图像分割笔记1、源码下载2、数据获取3、环境配置4、模型训练5、模型推理6、模型部署6.1 yolov5_flask学习7、版本上传1、源码下载 git clone https://github.com/ultralytics/ultralytics.gitgit回到对应版本&#xff1a; 方式一&#xff1a;使用 git checkout&#xff08;临…

ubuntu22.04系统 算力4090服务器 病毒防护 查杀等 运维入门(三)clamAV工具离线查杀

以下有免费的4090云主机提供ubuntu22.04系统的其他入门实践操作 地址&#xff1a;星宇科技 | GPU服务器 高性能云主机 云服务器-登录 相关兑换码星宇社区---4090算力卡免费体验、共享开发社区-CSDN博客 兑换码要是过期了&#xff0c;可以私信我获取最新兑换码&#xff01;&a…

微信小程序文件下载与预览功能实现详解

在微信小程序开发中&#xff0c;文件处理是常见需求&#xff0c;尤其是涉及合同、文档等场景。本文将通过一个实际案例&#xff0c;详细讲解如何实现文件的下载、解压、列表展示及预览功能。 功能概述 该页面主要实现了以下核心功能&#xff1a; 列表展示可下载的文件信息支持 …

postgresql执行创建和删除时遇到的问题

删除数据库的时候出现的问题 有连接在占用 postgres=# DROP DATABASE "subgraph-dev"; ERROR: database "subgraph-dev" is being accessed by other users DETAIL: There is 1 other session using the database.强制断开在用的连接 -- 替换 subgraph…

linux 应用层直接操作GPIO的方法

了解&#xff01;你使用的是 Rockchip RK3588S 平台&#xff0c;需要操作 GPIO3_D5_d 这个引脚&#xff08;即 MCU_JTAG_TMS_M1/.../GPIO3_D5_d&#xff09;。以下是基于你提供的系统信息的具体操作步骤&#xff1a;&#x1f50d; 第一步&#xff1a;确认 GPIO 系统编号 在 RK3…

JavaScript核心概念全解析

目录 1. 作用域 (1) 局部作用域 (2) 全局作用域 2. 垃圾回收 (1) 引用计数法 (2) 标记清除法 3. 闭包 (1) 作用 (2) 风险 4. 变量提升 (1) var (2) let 和 const (3) const 5. 函数提升 (1) 函数声明 (2) 函数表达式 6. 函数参数 (1) 动态参数 (2) 剩余参数…

力扣刷题(第一百天)

灵感来源 - 保持更新&#xff0c;努力学习- python脚本学习提莫攻击解题思路初始化总中毒时间 total。遍历每次攻击的时间点&#xff08;从第二个开始&#xff09;&#xff1a;计算当前攻击与前一次攻击的时间间隔 gap。若 gap < duration&#xff0c;则本次中毒时间为 gap&…

JMeter 性能测试实战笔记

JMeter 性能测试实战笔记 本文档是一份详细的 JMeter 指南&#xff0c;涵盖了从创建测试计划、执行测试到解读性能结果的全过程。 一、创建测试计划 一个完整的测试计划是执行性能测试的基础。下面将分步介绍如何创建一个针对文件上传接口的测试场景。 第一步&#xff1a;添加线…

图像处理:第二篇 —— 选择镜头的基础知识及对图像处理的影响

一、图像传感器的典型应用图像处理过程大致可分为如下四步&#xff1a;1.拍 摄 按下快门&#xff0c;拍摄图像2.传 送 将图像数据由照相机传送到控制器。3.处 理 前处理 : 对于图像数据进行加工&#xff0c;使其特征更加明显。测算处理 : 根据图像数据对于损…

Linux 系统文件夹结构及用途说明

Linux 系统采用树形文件结构&#xff0c;每个目录都有明确的功能定位&#xff0c;遵循 FHS&#xff08;Filesystem Hierarchy Standard&#xff09; 标准。以下是新安装系统后主要文件夹的用途&#xff1a;一、根目录&#xff08;/&#xff09;核心文件夹1. /bin&#xff1a;基…

[spring6: HttpSecurity]-全新写法

HttpSecurity HttpSecurity 是 Spring Security 中用于配置基于 HTTP 请求的安全策略的核心构建器&#xff0c;支持细粒度控制请求授权、认证、登录、登出、CSRF、CORS、会话管理等安全功能。 package xyz.idoly.demo;import org.springframework.context.annotation.Bean; imp…

MIPI DSI 转 1LVDS ,分辨率1920*1080.

一款桥接芯片&#xff0c;它接收 MP DSI 输入并发送 LVDS 输出。MlPI DSI 支持至多 4 条通道&#xff0c;每条通道的最大传输速率为 1Gbps&#xff0c;总的最大输入带宽为 4Gbps&#xff0c;并且还支持 MlPI 定义的 ULPS&#xff08;超低功耗状态&#xff09;。LVDS 输出采用 V…

墨者:SQL手工注入漏洞测试(MySQL数据库)

一、SQL手工注入漏洞测试(MySQL数据库) 本文以墨者学院靶场为例&#xff0c;演示MySQL数据库的手工SQL注入全过程。靶场以自己的地址为准&#xff1a;http://124.70.64.48:47777/new_list.php?id1 二、注入原理与流程&#xff08;如下指令去掉了id之前的内容&#xff09; M…