令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录

  • 前言
  • 限流+限制并发的实际理解
  • 限流
    • 令牌桶
      • 代码实现
      • 结果分析
      • 令牌桶lua的模拟实现原理
      • 总结:
    • 滑动窗口
      • 代码实现
      • 结果分析
      • lua脚本原理解析
  • 限并发
    • 分布式信号量
      • 代码实现
      • 结果分析
      • lua脚本实现原理
  • 双注解去实现限流 并发
      • 结果分析:
  • 实际业务去理解体会
  • 统一注解实现版本
  • 思考为什么不能用令牌桶 去做 限制并发 分布式信号量去做限流?
  • 总结

前言

本文
1、讲解什么是限流、限并发
2、限流 限并发的业务场景
3、限流的常用实现 限制并发的常用实现
4、双注解去实现限流限并发
5、运用双注解版本去解决一个实际业务例子
6、更近一步 使用一个注解去统一限流限并发

限流+限制并发的实际理解

理解“限流”和“限制并发”最形象的方式是——把系统当成一个餐厅厨房,然后把用户请求比作“点单”。

概念比喻说明
限流控制每秒进门的顾客数量比如每秒只允许5个顾客点单,不然就让他们在门外等
限制并发控制后厨同时能做几道菜厨房只有6个炉灶,所以一次最多做6份菜,超出就排队等灶空出来

迁移到真实系统上

限流就是打到接口上的请求
限制并发就是限制“同时执行关键逻辑的线程数”

限流

令牌桶

桶中可以最多装 N 个令牌。

每隔 T 时间就往桶里加一个令牌(速率)。

每次访问操作前,必须“拿”一个令牌。

如果桶是空的(没令牌了)→ 拒绝访问(限流)。

代码实现

public class TokenBucketTest {@Testpublic void testConcurrentRateLimit() throws InterruptedException {// 初始化 RedissonConfig config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redissonClient = Redisson.create(config);// 获取限流器对象String key = "rate:test:user123";RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);// 设置限流规则:每秒最多 2 个请求 (也就是每秒生成两个令牌)rateLimiter.trySetRate(RateType.OVERALL, 2, 1, RateIntervalUnit.SECONDS);int threadCount = 10;CountDownLatch latch = new CountDownLatch(threadCount);ExecutorService executor = Executors.newFixedThreadPool(threadCount);for (int i = 0; i < threadCount; i++) {int id = i + 1;executor.submit(() -> {try {boolean allowed = rateLimiter.tryAcquire();if (allowed) {System.out.println("请求 " + id + " 成功通过限流器");} else {System.out.println("请求 " + id + " 被限流拒绝");}} finally {latch.countDown();}});}latch.await();executor.shutdown();redissonClient.shutdown();}
}

结果分析

可以预见的是2个请求拿到 剩下的8个被拒绝(10个请求是在瞬间完成的 还没来得及补充令牌呢 )
在这里插入图片描述

令牌桶lua的模拟实现原理

假设redis中 key 为rate_limiter:user_123

-- 当前时间
now = redis.call('TIME')-- 获取桶状态(剩余 token、上次更新时间)
tokens = redis.call('GET', 'rate_limiter:user_123')-- 计算时间间隔,补充新 token
new_tokens = (now - last_time) * rate
tokens = min(max_tokens, tokens + new_tokens)-- 如果有令牌,扣除一个并允许访问
if tokens >= 1 thentokens = tokens - 1redis.call('SET', key, tokens)return true
elsereturn false
end

实际在 redissonClient.getRateLimiter(key);中的实现和上述lua脚本类似用String类型的数据去表示令牌的数量
在这里插入图片描述

总结:

   (RateType.OVERALL, 2, 1, RateIntervalUnit.SECONDS)

令牌桶理解为

你第 0 秒放进来了 2 个 token

如果你立即连续拿了 2 个,桶就空了

等到第 0.5 秒,补充 1 个;第 1 秒再补充 1 个;最多恢复回 2 个(桶容量上限)

滑动窗口

思想:

来一个人,你记下他进入的时间戳
你翻一下记录,把1 分钟前之前的人全划掉(过期了)
看看还有几个人在 1 分钟窗口里
如果不到 10 个 —— 放进来!
如果已经 10 个 —— 不让进!

代码实现

两部分 一个是手写的service 一个是测试类 代码有点长 可以直接复制到idea中去查看【注意这里当前有一个大量请求在一瞬间发起导致重复时间戳的问题 这个问题在后面我会解决 你先在这里有一个意识 当前的代码有问题】
当前是在一分钟内 限制10个请求

@Service
public class RedisLuaRateLimiter {// 改用 StringRedisTemplate,天然支持字符串序列化@Resourceprivate StringRedisTemplate stringRedisTemplate;// 原始的 Lua 脚本(去掉调试代码)private static final String SLIDING_WINDOW_LUA ="local key = KEYS[1]\n" +"local now = tonumber(ARGV[1])\n" +"local window = tonumber(ARGV[2])\n" +"local limit = tonumber(ARGV[3])\n" +"\n" +"-- 检查参数有效性\n" +"if not now or not window or not limit then\n" +"    return 0\n" +"end\n" +"\n" +"-- 移除过期元素\n" +"redis.call('ZREMRANGEBYSCORE', key, 0, now - window)\n" +"\n" +"-- 获取当前数量\n" +"local count = redis.call('ZCARD', key)\n" +"\n" +"-- 判断是否超过限制\n" +"if count < limit then\n" +"    -- 添加当前请求\n" +"    redis.call('ZADD', key, now, now)\n" +"    -- 设置过期时间\n" +"    redis.call('PEXPIRE', key, window + 1000)\n" +"    return 1\n" +"else\n" +"    return 0\n" +"end";/*** 执行限流逻辑(使用默认参数 在一分钟内 限制10个请求)*/public boolean isAllowed(String userKey) {//当前是在一分钟内 限制10个请求return isAllowed(userKey, 60000, 10);}/*** 执行限流逻辑*/public boolean isAllowed(String userKey, long windowMs, int limit) {try {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptText(SLIDING_WINDOW_LUA);redisScript.setResultType(Long.class);// 使用纳秒时间戳和纳秒窗口 当前这里是用的 毫秒 是否存在问题呢?long now = System.currentTimeMillis();long windowNanos = windowMs ;  // 可以转换为纳秒String[] args = {String.valueOf(now),String.valueOf(windowNanos),  // 传递纳秒窗口String.valueOf(limit)};System.out.println("=== 限流检查 ===");System.out.println("userKey: " + userKey);System.out.println("now: " + now);System.out.println("windowMs: " + windowMs);System.out.println("limit: " + limit);System.out.println("args: " + Arrays.toString(args));// 使用 StringRedisTemplate 执行Long result = stringRedisTemplate.execute(redisScript,Collections.singletonList(userKey),args);System.out.println("result: " + result);boolean allowed = result != null && result == 1;System.out.println("是否允许: " + (allowed ? "是" : " 否"));return allowed;} catch (Exception e) {System.err.println("限流器异常: " + e.getMessage());e.printStackTrace();return true;}}/*** 获取剩余配额*/public long getRemainingQuota(String userKey, long windowMs, int limit) {try {long now = System.currentTimeMillis();long cutoff = now - windowMs;// 清理过期数据stringRedisTemplate.opsForZSet().removeRangeByScore(userKey, cutoff, now - windowMs);// 获取当前数量Long count = stringRedisTemplate.opsForZSet().zCard(userKey);if (count == null) {count = 0L;}return Math.max(0, limit - count);} catch (Exception e) {System.err.println("获取剩余配额异常: " + e.getMessage());return limit;}}/*** 获取默认配置下的剩余配额*/public long getRemainingQuota(String userKey) {return getRemainingQuota(userKey, 60000, 10);}/*** 重置计数器*/public void reset(String userKey) {try {stringRedisTemplate.delete(userKey);System.out.println("已重置计数器: " + userKey);} catch (Exception e) {System.err.println("重置失败: " + e.getMessage());}}/*** 查看当前状态(用于调试)*/public void showStatus(String userKey, long windowMs) {try {long now = System.currentTimeMillis();long cutoff = now - windowMs;// 获取所有元素Set<ZSetOperations.TypedTuple<String>> elements =stringRedisTemplate.opsForZSet().rangeWithScores(userKey, 0, -1);System.out.println("=== " + userKey + " 状态 ===");System.out.println("当前时间: " + now);System.out.println("截止时间: " + cutoff);System.out.println("窗口大小: " + windowMs + "ms");if (elements != null && !elements.isEmpty()) {System.out.println("有效元素:");for (ZSetOperations.TypedTuple<String> element : elements) {long timestamp = element.getScore().longValue();boolean isValid = timestamp > cutoff;System.out.println("  时间戳: " + timestamp + " (有效: " + isValid + ")");}} else {System.out.println("无元素");}Long totalCount = stringRedisTemplate.opsForZSet().zCard(userKey);System.out.println("总计数: " + (totalCount != null ? totalCount : 0));} catch (Exception e) {System.err.println("查看状态失败: " + e.getMessage());}}
}
@SpringBootTest
class SlidingWindowTest {@Autowiredprivate RedisLuaRateLimiter rateLimiter;@Testvoid testDebugRateLimit() {String userKey = "debug_test_" + System.currentTimeMillis();System.out.println(" 开始调试测试...");// 重置计数器rateLimiter.reset(userKey);// 查看初始状态rateLimiter.showStatus(userKey, 60000);// 进行几次单独的请求测试for (int i = 1; i <= 15; i++) {System.out.println("\n--- 第 " + i + " 次请求 ---");boolean allowed = rateLimiter.isAllowed(userKey);System.out.println("结果: " + (allowed ? " 允许" : "限流"));// 每5次查看一下状态if (i % 5 == 0) {rateLimiter.showStatus(userKey, 60000);}}}
}

结果分析

在一分钟内限制10个请求 那么可以预见的是 应该第11个请求就不能进入了,结果竟然是第15个请求都进入了?WHY????

在这里插入图片描述

这里就是是我上文提到的时间戳重复的问题。要解决这个问题 我们必须首先搞清楚 这个lua脚本在做什么事情 窗口是如何滑动的

lua脚本原理解析

            "local key = KEYS[1]\n" +"local now = tonumber(ARGV[1])\n" +"local window = tonumber(ARGV[2])\n" +"local limit = tonumber(ARGV[3])\n" +"\n" +"-- 检查参数有效性\n" +"if not now or not window or not limit then\n" +"    return 0\n" +"end\n" +"\n" +"-- 移除过期元素\n" +"redis.call('ZREMRANGEBYSCORE', key, 0, now - window)\n" +"\n" +"-- 获取当前数量\n" +"local count = redis.call('ZCARD', key)\n" +"\n" +"-- 判断是否超过限制\n" +"if count < limit then\n" +"    -- 添加当前请求\n" +"    redis.call('ZADD', key, now, now)\n" +"    -- 设置过期时间\n" +"    redis.call('PEXPIRE', key, window + 1000)\n" +"    return 1\n" +"else\n" +"    return 0\n" +"end";

其实这里就是在控制一个窗口

新请求到达


判断当前时间戳 → 清理60秒外的数据(ZREMRANGEBYSCORE)


计算最近60秒的请求次数(ZCARD)

├─ 如果小于10次 → 记录当前时间 + 放行(ZADD + return 1)

└─ 如果大于等于10次 → 拒绝请求(return 0)

也就是每一次都是ZADD 一个保存时间戳的有序集合(Sorted Set)然后统计在某个时间窗口下的集合元素个数 如果大于了10次 就拒绝 然后这个窗口边滑动边统计数据。你可以明白这里的思想了吗?

来一个人,你记下他进入的时间戳
你翻一下记录,把1 分钟前之前的人全划掉(过期了)
看看还有几个人在 1 分钟窗口里
如果不到 10 个 —— 放进来!
如果已经 10 个 —— 不让进!

而上文15次请求进入的数据流转如下:

1-5次请求后的存储状态
ZADD key 1749016198740 1749016198740  → 新增,count=1
ZADD key 1749016198763 1749016198763  → 新增,count=2  
ZADD key 1749016198763 1749016198763  → 覆盖,count=2 (不变)
ZADD key 1749016198764 1749016198764  → 新增,count=3
ZADD key 1749016198764 1749016198764  → 覆盖,count=3 (不变)实际存储:
Score           Member
17490161987401749016198740
17490161987631749016198763
17490161987641749016198764总计数:3 (不是5)6-10次请求后的存储状态
继续添加:
ZADD key 1749016198766 1749016198766  → 新增,count=4
ZADD key 1749016198767 1749016198767  → 新增,count=5
ZADD key 1749016198767 1749016198767  → 覆盖,count=5 (不变)
ZADD key 1749016198768 1749016198768  → 新增,count=6
ZADD key 1749016198769 1749016198769  → 新增,count=7实际存储:
Score           Member
17490161987401749016198740
17490161987631749016198763
17490161987641749016198764
17490161987661749016198766
17490161987671749016198767
17490161987681749016198768
17490161987691749016198769总计数:7 (验证了测试结果!)11-15次请求后的存储状态
继续添加:
ZADD key 1749016198771 1749016198771  → 新增,count=8
ZADD key 1749016198771 1749016198771  → 覆盖,count=8 (不变)
ZADD key 1749016198771 1749016198771  → 覆盖,count=8 (不变)
ZADD key 1749016198772 1749016198772  → 新增,count=9
ZADD key 1749016198772 1749016198772  → 覆盖,count=9 (不变)最终存储:
Score           Member
17490161987401749016198740
17490161987631749016198763
17490161987641749016198764
17490161987661749016198766
17490161987671749016198767
17490161987681749016198768
17490161987691749016198769
17490161987711749016198771
17490161987721749016198772总计数:9 (< 10,所以都被允许了!)

你发现其实是因为每次请求太快了 时间戳精度不够高导致连续的请求是一样的时间戳 然后就被覆盖了。所以解决办法就是 使用更高精度纳秒而不是毫秒

// 使用纳秒时间戳和纳秒窗口 当前这里是用的 毫秒 是否存在问题呢?long now = System.currentTimeMillis();long windowNanos = windowMs ;  // 可以转换为纳秒// 使用纳秒时间戳和纳秒窗口long now = System.nanoTime();long windowNanos = windowMs * 1_000_000L;  // 转换为纳秒

这样就可以决绝这个问题

限并发

回忆 区别并发和限流

限流就是打到接口上的请求
限制并发就是限制“同时执行关键逻辑的线程数”

自然而然会想到使用信号量的概念去限制最大执行的线程数

分布式信号量

你有一个写字楼,只有 3 间厕所(许可数 = 3)
每个线程 = 每个员工
每个员工来上厕所(acquire())
上完了释放厕所(release())
存在一个手动释放的过程相比 【令牌桶 和滑动窗口】

代码实现

public class RedissonSemaphoreTest {@Testpublic void testDistributedSemaphore() throws InterruptedException {// 1. 初始化 Redis 客户端Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redissonClient = Redisson.create(config);// 2. 获取分布式信号量对象String key = "semaphore:test:resource";RSemaphore semaphore = redissonClient.getSemaphore(key);// 3. 设置最多允许3个线程同时进入(初始化许可数)semaphore.trySetPermits(3); // 若 key 不存在则设置,否则保持现有// 4. 创建10个线程模拟并发int threadCount = 10;ExecutorService pool = Executors.newFixedThreadPool(threadCount);CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {final int id = i + 1;pool.submit(() -> {try {System.out.println("线程 " + id + " 尝试获取许可...");semaphore.acquire(); // 阻塞直到拿到许可System.out.println("线程 " + id + "  成功获取许可,开始工作");// 模拟执行耗时任务Thread.sleep(1000);System.out.println("线程 " + id + " 释放许可");semaphore.release(); // 释放许可} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}});}// 等待所有线程结束latch.await();pool.shutdown();redissonClient.shutdown();}
}

结果分析

【以下显示了最多三个线程拿到资源 释放资源后 才有等待的其他的线程去拿到】
线程 1 尝试获取许可...
线程 2 尝试获取许可...
线程 3 尝试获取许可...
线程 4 尝试获取许可...
线程 5 尝试获取许可...
线程 6 尝试获取许可...
线程 7 尝试获取许可...
线程 8 尝试获取许可...
线程 9 尝试获取许可...
线程 10 尝试获取许可...线程 3 ✅ 成功获取许可,开始工作
线程 2 ✅ 成功获取许可,开始工作
线程 1 ✅ 成功获取许可,开始工作线程 2 🔁 释放许可
线程 3 🔁 释放许可
线程 1 🔁 释放许可线程 7 ✅ 成功获取许可,开始工作
线程 9 ✅ 成功获取许可,开始工作
线程 8 ✅ 成功获取许可,开始工作线程 9 🔁 释放许可
线程 8 🔁 释放许可
线程 7 🔁 释放许可线程 6 ✅ 成功获取许可,开始工作
线程 4 ✅ 成功获取许可,开始工作
线程 5 ✅ 成功获取许可,开始工作线程 6 🔁 释放许可
线程 4 🔁 释放许可
线程 5 🔁 释放许可线程 10 ✅ 成功获取许可,开始工作
线程 10 🔁 释放许可

可以看到只有最多三个线程去执行关键代码

lua脚本实现原理

-- ========== 自定义Lua实现详解 ==========-- 获取信号量的完整Lua脚本
local semaphore_key = KEYS[1]           -- "semaphore:video_transcode"
local max_permits = tonumber(ARGV[1])   -- 5 (最大许可数)
local permit_id = ARGV[2]               -- "uuid-12345" (许可证ID)
local expire_time = tonumber(ARGV[3])   -- 1654321200 (过期时间戳)
local timeout_seconds = tonumber(ARGV[4]) -- 600 (超时秒数)
local current_time = tonumber(ARGV[5])  -- 当前时间戳--1: 清理过期的许可证
-- 删除分数(过期时间) <= 当前时间的所有成员
redis.call('ZREMRANGEBYSCORE', semaphore_key, 0, current_time)--2: 检查当前许可证数量
-- ZCARD返回有序集合的成员数量
local current_count = redis.call('ZCARD', semaphore_key)--3: 判断是否可以获取新许可
if current_count < max_permits then-- 可以获取:添加新的许可证到有序集合redis.call('ZADD', semaphore_key, expire_time, permit_id)-- 设置整个key的过期时间(防止内存泄漏)redis.call('EXPIRE', semaphore_key, timeout_seconds * 2)return permit_id  -- 返回许可证ID(成功)
elsereturn nil        -- 返回空(失败)
end-- ========== 数据结构演示 ==========-- 假设max_permits = 3,当前时间 = 1000
-- 初始状态: semaphore:video_transcode = {}--1个请求获取许可:
-- ZADD semaphore:video_transcode 1600 uuid-001
-- 状态: {uuid-001: 1600}  -> 当前count = 1--2个请求获取许可:
-- ZADD semaphore:video_transcode 1650 uuid-002  
-- 状态: {uuid-001: 1600, uuid-002: 1650}  -> 当前count = 2--3个请求获取许可:
-- ZADD semaphore:video_transcode 1700 uuid-003
-- 状态: {uuid-001: 1600, uuid-002: 1650, uuid-003: 1700}  -> 当前count = 3--4个请求获取许可:
-- current_count = 3, max_permits = 3
-- 3 >= 3,拒绝请求,返回nil-- 当时间到达1650时,uuid-002过期:
-- ZREMRANGEBYSCORE semaphore:video_transcode 0 1650
-- 状态: {uuid-003: 1700}  -> 当前count = 1,可以接受新请求

实际在 redissonClient.getSemaphore(key) 中 最终redis的数据是这样的在这里插入图片描述
哦 原来就是一个String类型的数据去表示数量啊

思考一下为什么还是3?

因为是获取+释放的过程 每次都正常释放了 所以信号量还是为3

双注解去实现限流 并发

以上我已经将限流和限并发讲明白了,接下来就是给出一个工具类模版代码,让你在业务代码中直接使用注解能够使用上这样的工具。

注解:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessControl {enum Strategy {RATE_LIMIT,    // 限流:单位时间限制请求次数CONCURRENCY    // 限并发:同时最大线程数}String key(); // Redis Keyint permits(); // 最大令牌数或并发数long interval() default 1; // 限流时的单位时间长度(默认1)TimeUnit timeUnit() default TimeUnit.SECONDS; // 限流时的单位时间单位Strategy strategy(); // 控制策略
}

切面:

@Aspect
@Component
public class AccessControlAspect {@Autowiredprivate RedissonClient redissonClient;@Around("@annotation(accessControl)")public Object handleAccessControl(ProceedingJoinPoint pjp, AccessControl accessControl) throws Throwable {String key = accessControl.key();int permits = accessControl.permits();AccessControl.Strategy strategy = accessControl.strategy();if (strategy == AccessControl.Strategy.CONCURRENCY) {return handleSemaphore(key, permits, pjp);} else {return handleRateLimiter(key, permits, accessControl, pjp);}}private Object handleSemaphore(String key, int permits, ProceedingJoinPoint pjp) throws Throwable {RSemaphore semaphore = redissonClient.getSemaphore(key);semaphore.trySetPermits(permits);semaphore.acquire();try {return pjp.proceed();} finally {semaphore.release();}}private Object handleRateLimiter(String key, int permits, AccessControl accessControl, ProceedingJoinPoint pjp) throws Throwable {RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);rateLimiter.trySetRate(RateType.OVERALL,permits,accessControl.interval(),toRateUnit(accessControl.timeUnit()));boolean acquired = rateLimiter.tryAcquire(1);if (!acquired) {throw new RuntimeException("请求过快,请稍后再试");}return pjp.proceed();}private RateIntervalUnit toRateUnit(TimeUnit timeUnit) {switch (timeUnit) {case SECONDS: return RateIntervalUnit.SECONDS;case MINUTES: return RateIntervalUnit.MINUTES;case HOURS:   return RateIntervalUnit.HOURS;case DAYS:    return RateIntervalUnit.DAYS;case MILLISECONDS: return RateIntervalUnit.MILLISECONDS;default: throw new IllegalArgumentException("不支持的时间单位");}}
}

服务类:

@Service
public class TwoStrategyInOneAnotationService {@AccessControl(key = "video:concurrent", permits = 3, strategy = AccessControl.Strategy.CONCURRENCY)public void transcode(String videoName) throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 正在转码: " + videoName);Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " 转码完成: " + videoName);}@AccessControl(key = "sms:rate", permits = 5, strategy = AccessControl.Strategy.RATE_LIMIT, interval = 1, timeUnit = TimeUnit.SECONDS)public void sendSms(String phone) {System.out.println("发送短信到: " + phone);}
}

测试类:

@SpringBootTest
public class TwoStrategyInOneAnnotationTest {@Autowiredprivate TwoStrategyInOneAnotationService taskService;@Testpublic void testConcurrency() throws InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(10);CountDownLatch latch = new CountDownLatch(10);for (int i = 0; i < 10; i++) {final int id = i;pool.submit(() -> {try {taskService.transcode("video-" + id);} catch (Exception e) {System.out.println("转码失败:" + e.getMessage());} finally {latch.countDown();}});}latch.await();pool.shutdown();}@Testpublic void testRateLimit() {for (int i = 0; i < 10; i++) {try {taskService.sendSms("1380000" + i);} catch (Exception e) {System.out.println("短信失败:" + e.getMessage());}}}
}

结果分析:

限流:不超过5个请求
在这里插入图片描述
限并发:只有最大三个线程去执行
在这里插入图片描述

实际业务去理解体会

场景描述:
1、用户上传视频,系统进行转码处理。每个转码任务需要消耗大量CPU资源,平均耗时10秒。
2、当前我的计算机是一个8核的CPU
3、限流配置(RateLimiter):每秒允许 10个请求
4、限并发配置(Semaphore):可以并发转码任务数为 核数 =8。

@Service
public class TranscodeService {@AccessControl(key = "transcode:rate", permits = 10, strategy = AccessControl.Strategy.RATE_LIMIT)@AccessControl(key = "transcode:semaphore", permits = 8, strategy = AccessControl.Strategy.CONCURRENCY)public void transcode(String filename) throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 开始转码视频:" + filename);Thread.sleep(10_000); // 模拟耗时10秒System.out.println(Thread.currentThread().getName() + " 转码完成:" + filename);}
}

统一注解实现版本

当前还可以统一上面的双注解为一个注解 当前只需要简单调整注解和切面部分的代码使其同时支持两种策略就好。

注解:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessControl {enum LimitType {RATE_LIMIT,       // 仅限流CONCURRENCY,      // 仅限并发ALL               // 同时限流 + 并发控制}String key();  // Redis key 前缀LimitType limitType() default LimitType.ALL;// 限流配置int rateLimitPermits() default 0; // 每秒允许通过次数long rateLimitInterval() default 1;TimeUnit rateLimitUnit() default TimeUnit.SECONDS;// 并发配置int concurrencyPermits() default 0;
}

切面类:

@Aspect
@Component
public class AccessControlAspect {@Autowiredprivate RedissonClient redissonClient;@Around("@annotation(control)")public Object handle(ProceedingJoinPoint pjp, AccessControl control) throws Throwable {String key = control.key();AccessControl.LimitType type = control.limitType();boolean acquiredRate = true;boolean acquiredSemaphore = true;RRateLimiter rateLimiter = null;RSemaphore semaphore = null;// 限流逻辑if (type == AccessControl.LimitType.RATE_LIMIT || type == AccessControl.LimitType.ALL) {rateLimiter = redissonClient.getRateLimiter(key + ":ratelimit");rateLimiter.trySetRate(RateType.OVERALL,control.rateLimitPermits(),control.rateLimitInterval(),toRateUnit(control.rateLimitUnit()));acquiredRate = rateLimiter.tryAcquire(1);if (!acquiredRate) {throw new RuntimeException("请求过快,请稍后再试");}}// 并发逻辑if (type == AccessControl.LimitType.CONCURRENCY || type == AccessControl.LimitType.ALL) {semaphore = redissonClient.getSemaphore(key + ":semaphore");semaphore.trySetPermits(control.concurrencyPermits());acquiredSemaphore = semaphore.tryAcquire();if (!acquiredSemaphore) {throw new RuntimeException("系统忙,请稍后重试");}}try {return pjp.proceed();} finally {if (semaphore != null && acquiredSemaphore) {semaphore.release();}}}private RateIntervalUnit toRateUnit(TimeUnit unit) {switch (unit) {case SECONDS: return RateIntervalUnit.SECONDS;case MINUTES: return RateIntervalUnit.MINUTES;case HOURS:   return RateIntervalUnit.HOURS;case DAYS:    return RateIntervalUnit.DAYS;case MILLISECONDS: return RateIntervalUnit.MILLISECONDS;default: throw new IllegalArgumentException("不支持的时间单位");}}
}

服务类:

@Service
public class VideoService {@AccessControl(key = "video:transcode",rateLimitPermits = 6,concurrencyPermits = 6,limitType = AccessControl.LimitType.ALL)public void transcode(String name) throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 转码中:" + name);Thread.sleep(10_000);System.out.println(Thread.currentThread().getName() + " 转码完成:" + name);}
}

思考为什么不能用令牌桶 去做 限制并发 分布式信号量去做限流?

核心区别:什么时候释放资源

滑动窗口/令牌桶:自动释放(基于时间)

令牌消耗后就没了,不需要"归还"
系统按时间自动补充令牌

分布式信号量:手动释放(基于资源占用)

信号量必须"借用"后"归还"
归还时机由处理时长决定
天然控制并发:同时最多N个请求持有信号量

用信号量去控制流量QPS 会存在

释放太快,无法限制频率
例如:1秒内可能获取/释放几千次

用滑动窗口/令牌桶去控制并发线程存在:

令牌消耗了,无法表示"正在占用资源"
系统继续补充令牌,无法限制并发数
因为存在获得释放的过程 才能表示有多少并发线程 如果只去表示获取 不去表示释放 则不能控制并发线程的数量 正是信号量的“获得”“释放”符合去控制并发的数量。

总结

这是一篇对限流限并发的介绍,希望你有所收获。

感谢每一位看到最后的你,我知道这篇文章比较长,全程都是楼主【代码丰】手打 实验的,希望能收获点赞和收藏 谢谢小伙伴的支持,让我们一路变强

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/82727.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…

Linux中shell编程表达式和数组讲解

一、表达式 1.1 测试表达式 样式1: test 条件表达式 样式2: [ 条件表达式 ] 注意&#xff1a;以上两种方法的作用完全一样&#xff0c;后者为常用。但后者需要注意方括号[、]与条件表达式之间至少有一个空格。test跟 [] 的意思一样条件成立&#xff0c;状态返回值是0条件不成…

深入了解JavaScript当中如何确定值的类型

JavaScript是一种弱类型语言&#xff0c;当你给一个变量赋了一个值&#xff0c;该值是什么类型的&#xff0c;那么该变量就是什么类型的&#xff0c;并且你还可以给一个变量赋多种类型的值&#xff0c;也不会报错&#xff0c;这就是JavaScript的内部机制所决定的&#xff0c;那…

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信拓扑与操作 BR/EDR(经典蓝牙)和 BLE

目录 1. BR/EDR&#xff08;经典蓝牙&#xff09;网络结构微微网&#xff08;Piconet&#xff09;散射网&#xff08;Scatternet&#xff09;蓝牙 BR/EDR 拓扑结构示意图 2. BLE&#xff08;低功耗蓝牙&#xff09;网络结构广播器与观察者&#xff08;Broadcaster and Observer…

C++虚函数表(虚表Virtual Table,简称vtable、VFT)(编译器为支持运行时多态(动态绑定)而自动生成的一种内部数据结构)虚函数指针vptr

文章目录 **1. 虚函数表的核心概念**- **虚函数表&#xff08;vtable&#xff09;**&#xff1a;- **虚函数指针&#xff08;vptr&#xff09;**&#xff1a; **2. 虚函数表的生成与工作流程****生成时机**- **当一个类中至少有一个虚函数时**&#xff0c;编译器会为该类生成一…

使用Python和TensorFlow实现图像分类

最近研学过程中发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击链接跳转到网站人工智能及编程语言学习教程。读者们可以通过里面的文章详细了解一下人工智能及其编程等教程和学习方法。下面开始对正文内容的…

Unity UI 性能优化--Sprite 篇

&#x1f3af; Unity UI 性能优化终极指南 — Sprite篇 &#x1f9e9; Sprite 是什么&#xff1f;—— 渲染的基石与性能的源头 在Unity的2D渲染管线中&#xff0c;Sprite 扮演着至关重要的角色。它不仅仅是2D图像资源本身&#xff0c;更是GPU进行渲染批处理&#xff08;Batch…

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g

vue中加载Cesium地图(天地图、高德地图)

目录 1、将下载的Cesium包移动至public下 2、首先需要将Cesium.js和widgets.css文件引入到 3、 新建Cesium.js文件&#xff0c;方便在全局使用 4、新建cesium.vue文件&#xff0c;展示三维地图 1、将下载的Cesium包移动至public下 npm install cesium后​​​​​​​ 2、…

Elasticsearch的插件(Plugin)系统介绍

Elasticsearch的插件(Plugin)系统是一种扩展机制,允许用户通过添加自定义功能来增强默认功能,而无需修改核心代码。插件可以提供从分析器、存储后端到安全认证、机器学习等各种功能,使Elasticsearch能够灵活适应不同的应用场景和业务需求。 一、插件的核心特点 模块化扩展…

基于 openEuler 22.03 LTS SP1 构建 DPDK 22.11.8 开发环境指南

基于 openEuler 22.03 LTS SP1 构建 DPDK 22.11.8 开发环境指南 本文详细介绍了在 openEuler 22.03 LTS SP1 操作系统上构建 DPDK 22.11.8 开发环境的完整流程。DPDK 20 版本之后采用 mesonninja 的编译方式&#xff0c;与早期版本有所不同。本文内容也可作为其他 Linux 发行版…

微服务网关SpringCloudGateway+SaToken鉴权

目录 概念 前置知识回顾 拿到UserInfo 用于自定义权限和角色的获取逻辑 最后进行要进行 satoken 过滤器全局配置 概念 做权限认证的时候 我们首先要明确两点 我们需要的角色有几种 我们需要的权限有几种 角色 分两种 ADMIN 管理员 &#xff1a;可管理商品 CUSTIOMER 普通…

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…

Dify中聊天助手、agent、文本生成、chatflow、工作流模式解读分析与对比

一次解读 1. 聊天助手 (Chat Assistant) 情景定位 (Situation): 你需要创建一个可以与用户进行多轮对话的AI应用&#xff0c;例如客服机器人、信息查询助手、或一个特定领域的虚拟专家。目标明确 (Purpose): 核心目标是理解并响应用户的连续提问&#xff0c;维持对话的上下文…

使用Node.js分片上传大文件到阿里云OSS

阿里云OSS的分片上传&#xff08;Multipart Upload&#xff09;是一种针对大文件优化的上传方式&#xff0c;其核心流程和关键特性如下&#xff1a; 1. ‌核心流程‌ 分片上传分为三个步骤&#xff1a; 初始化任务‌&#xff1a;调用InitiateMultipartUpload接口创建上传任务…

C++ if语句完全指南:从基础到工程实践

一、选择结构在程序设计中的核心地位 程序流程控制如同城市交通网络&#xff0c;if语句则是这个网络中的决策枢纽。根据ISO C标准&#xff0c;选择结构占典型项目代码量的32%-47%&#xff0c;其正确使用直接影响程序的&#xff1a; 逻辑正确性 执行效率 可维护性 安全边界 …

【大模型LLM学习】Flash-Attention的学习记录

【大模型LLM学习】Flash-Attention的学习记录 0. 前言1. flash-attention原理简述2. 从softmax到online softmax2.1 safe-softmax2.2 3-pass safe softmax2.3 Online softmax2.4 Flash-attention2.5 Flash-attention tiling 0. 前言 Flash Attention可以节约模型训练和推理时间…

python打卡day46@浙大疏锦行

知识点回顾&#xff1a; 不同CNN层的特征图&#xff1a;不同通道的特征图什么是注意力&#xff1a;注意力家族&#xff0c;类似于动物园&#xff0c;都是不同的模块&#xff0c;好不好试了才知道。通道注意力&#xff1a;模型的定义和插入的位置通道注意力后的特征图和热力图 内…

JavaSec-SPEL - 表达式注入

简介 SPEL(Spring Expression Language)&#xff1a;SPEL是Spring表达式语言&#xff0c;允许在运行时动态查询和操作对象属性、调用方法等&#xff0c;类似于Struts2中的OGNL表达式。当参数未经过滤时&#xff0c;攻击者可以注入恶意的SPEL表达式&#xff0c;从而执行任意代码…

SpringCloud——OpenFeign

概述&#xff1a; OpenFeign是基于Spring的声明式调用的HTTP客户端&#xff0c;大大简化了编写Web服务客户端的过程&#xff0c;用于快速构建http请求调用其他服务模块。同时也是spring cloud默认选择的服务通信工具。 使用方法&#xff1a; RestTemplate手动构建: // 带查询…