文章目录
- 前言
- 限流+限制并发的实际理解
- 限流
- 令牌桶
- 代码实现
- 结果分析
- 令牌桶lua的模拟实现原理
- 总结:
- 滑动窗口
- 代码实现
- 结果分析
- lua脚本原理解析
- 限并发
- 分布式信号量
- 代码实现
- 结果分析
- lua脚本实现原理
- 双注解去实现限流 并发
- 结果分析:
- 实际业务去理解体会
- 统一注解实现版本
- 思考为什么不能用令牌桶 去做 限制并发 分布式信号量去做限流?
- 总结
前言
本文
1、讲解什么是限流、限并发
2、限流 限并发的业务场景
3、限流的常用实现 限制并发的常用实现
4、双注解去实现限流限并发
5、运用双注解版本去解决一个实际业务例子
6、更近一步 使用一个注解去统一限流限并发
限流+限制并发的实际理解
理解“限流”和“限制并发”最形象的方式是——把系统当成一个餐厅厨房,然后把用户请求比作“点单”。
概念 | 比喻 | 说明 |
---|---|---|
限流 | 控制每秒进门的顾客数量 | 比如每秒只允许5个顾客点单,不然就让他们在门外等 |
限制并发 | 控制后厨同时能做几道菜 | 厨房只有6个炉灶,所以一次最多做6份菜,超出就排队等灶空出来 |
迁移到真实系统上
限流就是打到接口上的请求
限制并发就是限制“同时执行关键逻辑的线程数”
限流
令牌桶
桶中可以最多装 N 个令牌。
每隔 T 时间就往桶里加一个令牌(速率)。
每次访问操作前,必须“拿”一个令牌。
如果桶是空的(没令牌了)→ 拒绝访问(限流)。
代码实现
public class TokenBucketTest {@Testpublic void testConcurrentRateLimit() throws InterruptedException {// 初始化 RedissonConfig config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redissonClient = Redisson.create(config);// 获取限流器对象String key = "rate:test:user123";RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);// 设置限流规则:每秒最多 2 个请求 (也就是每秒生成两个令牌)rateLimiter.trySetRate(RateType.OVERALL, 2, 1, RateIntervalUnit.SECONDS);int threadCount = 10;CountDownLatch latch = new CountDownLatch(threadCount);ExecutorService executor = Executors.newFixedThreadPool(threadCount);for (int i = 0; i < threadCount; i++) {int id = i + 1;executor.submit(() -> {try {boolean allowed = rateLimiter.tryAcquire();if (allowed) {System.out.println("请求 " + id + " 成功通过限流器");} else {System.out.println("请求 " + id + " 被限流拒绝");}} finally {latch.countDown();}});}latch.await();executor.shutdown();redissonClient.shutdown();}
}
结果分析
可以预见的是2个请求拿到 剩下的8个被拒绝(10个请求是在瞬间完成的 还没来得及补充令牌呢 )
令牌桶lua的模拟实现原理
假设redis中 key 为rate_limiter:user_123
-- 当前时间
now = redis.call('TIME')-- 获取桶状态(剩余 token、上次更新时间)
tokens = redis.call('GET', 'rate_limiter:user_123')-- 计算时间间隔,补充新 token
new_tokens = (now - last_time) * rate
tokens = min(max_tokens, tokens + new_tokens)-- 如果有令牌,扣除一个并允许访问
if tokens >= 1 thentokens = tokens - 1redis.call('SET', key, tokens)return true
elsereturn false
end
实际在 redissonClient.getRateLimiter(key);中的实现和上述lua脚本类似用String类型的数据去表示令牌的数量
总结:
(RateType.OVERALL, 2, 1, RateIntervalUnit.SECONDS)
令牌桶理解为
你第 0 秒放进来了 2 个 token
如果你立即连续拿了 2 个,桶就空了
等到第 0.5 秒,补充 1 个;第 1 秒再补充 1 个;最多恢复回 2 个(桶容量上限)
滑动窗口
思想:
来一个人,你记下他进入的时间戳
你翻一下记录,把1 分钟前之前的人全划掉(过期了)
看看还有几个人在 1 分钟窗口里
如果不到 10 个 —— 放进来!
如果已经 10 个 —— 不让进!
代码实现
两部分 一个是手写的service 一个是测试类 代码有点长 可以直接复制到idea中去查看【注意这里当前有一个大量请求在一瞬间发起导致重复时间戳的问题 这个问题在后面我会解决 你先在这里有一个意识 当前的代码有问题】
当前是在一分钟内 限制10个请求
@Service
public class RedisLuaRateLimiter {// 改用 StringRedisTemplate,天然支持字符串序列化@Resourceprivate StringRedisTemplate stringRedisTemplate;// 原始的 Lua 脚本(去掉调试代码)private static final String SLIDING_WINDOW_LUA ="local key = KEYS[1]\n" +"local now = tonumber(ARGV[1])\n" +"local window = tonumber(ARGV[2])\n" +"local limit = tonumber(ARGV[3])\n" +"\n" +"-- 检查参数有效性\n" +"if not now or not window or not limit then\n" +" return 0\n" +"end\n" +"\n" +"-- 移除过期元素\n" +"redis.call('ZREMRANGEBYSCORE', key, 0, now - window)\n" +"\n" +"-- 获取当前数量\n" +"local count = redis.call('ZCARD', key)\n" +"\n" +"-- 判断是否超过限制\n" +"if count < limit then\n" +" -- 添加当前请求\n" +" redis.call('ZADD', key, now, now)\n" +" -- 设置过期时间\n" +" redis.call('PEXPIRE', key, window + 1000)\n" +" return 1\n" +"else\n" +" return 0\n" +"end";/*** 执行限流逻辑(使用默认参数 在一分钟内 限制10个请求)*/public boolean isAllowed(String userKey) {//当前是在一分钟内 限制10个请求return isAllowed(userKey, 60000, 10);}/*** 执行限流逻辑*/public boolean isAllowed(String userKey, long windowMs, int limit) {try {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptText(SLIDING_WINDOW_LUA);redisScript.setResultType(Long.class);// 使用纳秒时间戳和纳秒窗口 当前这里是用的 毫秒 是否存在问题呢?long now = System.currentTimeMillis();long windowNanos = windowMs ; // 可以转换为纳秒String[] args = {String.valueOf(now),String.valueOf(windowNanos), // 传递纳秒窗口String.valueOf(limit)};System.out.println("=== 限流检查 ===");System.out.println("userKey: " + userKey);System.out.println("now: " + now);System.out.println("windowMs: " + windowMs);System.out.println("limit: " + limit);System.out.println("args: " + Arrays.toString(args));// 使用 StringRedisTemplate 执行Long result = stringRedisTemplate.execute(redisScript,Collections.singletonList(userKey),args);System.out.println("result: " + result);boolean allowed = result != null && result == 1;System.out.println("是否允许: " + (allowed ? "是" : " 否"));return allowed;} catch (Exception e) {System.err.println("限流器异常: " + e.getMessage());e.printStackTrace();return true;}}/*** 获取剩余配额*/public long getRemainingQuota(String userKey, long windowMs, int limit) {try {long now = System.currentTimeMillis();long cutoff = now - windowMs;// 清理过期数据stringRedisTemplate.opsForZSet().removeRangeByScore(userKey, cutoff, now - windowMs);// 获取当前数量Long count = stringRedisTemplate.opsForZSet().zCard(userKey);if (count == null) {count = 0L;}return Math.max(0, limit - count);} catch (Exception e) {System.err.println("获取剩余配额异常: " + e.getMessage());return limit;}}/*** 获取默认配置下的剩余配额*/public long getRemainingQuota(String userKey) {return getRemainingQuota(userKey, 60000, 10);}/*** 重置计数器*/public void reset(String userKey) {try {stringRedisTemplate.delete(userKey);System.out.println("已重置计数器: " + userKey);} catch (Exception e) {System.err.println("重置失败: " + e.getMessage());}}/*** 查看当前状态(用于调试)*/public void showStatus(String userKey, long windowMs) {try {long now = System.currentTimeMillis();long cutoff = now - windowMs;// 获取所有元素Set<ZSetOperations.TypedTuple<String>> elements =stringRedisTemplate.opsForZSet().rangeWithScores(userKey, 0, -1);System.out.println("=== " + userKey + " 状态 ===");System.out.println("当前时间: " + now);System.out.println("截止时间: " + cutoff);System.out.println("窗口大小: " + windowMs + "ms");if (elements != null && !elements.isEmpty()) {System.out.println("有效元素:");for (ZSetOperations.TypedTuple<String> element : elements) {long timestamp = element.getScore().longValue();boolean isValid = timestamp > cutoff;System.out.println(" 时间戳: " + timestamp + " (有效: " + isValid + ")");}} else {System.out.println("无元素");}Long totalCount = stringRedisTemplate.opsForZSet().zCard(userKey);System.out.println("总计数: " + (totalCount != null ? totalCount : 0));} catch (Exception e) {System.err.println("查看状态失败: " + e.getMessage());}}
}
@SpringBootTest
class SlidingWindowTest {@Autowiredprivate RedisLuaRateLimiter rateLimiter;@Testvoid testDebugRateLimit() {String userKey = "debug_test_" + System.currentTimeMillis();System.out.println(" 开始调试测试...");// 重置计数器rateLimiter.reset(userKey);// 查看初始状态rateLimiter.showStatus(userKey, 60000);// 进行几次单独的请求测试for (int i = 1; i <= 15; i++) {System.out.println("\n--- 第 " + i + " 次请求 ---");boolean allowed = rateLimiter.isAllowed(userKey);System.out.println("结果: " + (allowed ? " 允许" : "限流"));// 每5次查看一下状态if (i % 5 == 0) {rateLimiter.showStatus(userKey, 60000);}}}
}
结果分析
在一分钟内限制10个请求 那么可以预见的是 应该第11个请求就不能进入了,结果竟然是第15个请求都进入了?WHY????
这里就是是我上文提到的时间戳重复的问题。要解决这个问题 我们必须首先搞清楚 这个lua脚本在做什么事情 窗口是如何滑动的
lua脚本原理解析
"local key = KEYS[1]\n" +"local now = tonumber(ARGV[1])\n" +"local window = tonumber(ARGV[2])\n" +"local limit = tonumber(ARGV[3])\n" +"\n" +"-- 检查参数有效性\n" +"if not now or not window or not limit then\n" +" return 0\n" +"end\n" +"\n" +"-- 移除过期元素\n" +"redis.call('ZREMRANGEBYSCORE', key, 0, now - window)\n" +"\n" +"-- 获取当前数量\n" +"local count = redis.call('ZCARD', key)\n" +"\n" +"-- 判断是否超过限制\n" +"if count < limit then\n" +" -- 添加当前请求\n" +" redis.call('ZADD', key, now, now)\n" +" -- 设置过期时间\n" +" redis.call('PEXPIRE', key, window + 1000)\n" +" return 1\n" +"else\n" +" return 0\n" +"end";
其实这里就是在控制一个窗口
新请求到达
│
▼
判断当前时间戳 → 清理60秒外的数据(ZREMRANGEBYSCORE)
│
▼
计算最近60秒的请求次数(ZCARD)
│
├─ 如果小于10次 → 记录当前时间 + 放行(ZADD + return 1)
│
└─ 如果大于等于10次 → 拒绝请求(return 0)
也就是每一次都是ZADD 一个保存时间戳的有序集合(Sorted Set)然后统计在某个时间窗口下的集合元素个数 如果大于了10次 就拒绝 然后这个窗口边滑动边统计数据。你可以明白这里的思想了吗?
来一个人,你记下他进入的时间戳
你翻一下记录,把1 分钟前之前的人全划掉(过期了)
看看还有几个人在 1 分钟窗口里
如果不到 10 个 —— 放进来!
如果已经 10 个 —— 不让进!
而上文15次请求进入的数据流转如下:
第1-5次请求后的存储状态
ZADD key 1749016198740 1749016198740 → 新增,count=1
ZADD key 1749016198763 1749016198763 → 新增,count=2
ZADD key 1749016198763 1749016198763 → 覆盖,count=2 (不变)
ZADD key 1749016198764 1749016198764 → 新增,count=3
ZADD key 1749016198764 1749016198764 → 覆盖,count=3 (不变)实际存储:
Score Member
1749016198740 → 1749016198740
1749016198763 → 1749016198763
1749016198764 → 1749016198764总计数:3 (不是5!)
第6-10次请求后的存储状态
继续添加:
ZADD key 1749016198766 1749016198766 → 新增,count=4
ZADD key 1749016198767 1749016198767 → 新增,count=5
ZADD key 1749016198767 1749016198767 → 覆盖,count=5 (不变)
ZADD key 1749016198768 1749016198768 → 新增,count=6
ZADD key 1749016198769 1749016198769 → 新增,count=7实际存储:
Score Member
1749016198740 → 1749016198740
1749016198763 → 1749016198763
1749016198764 → 1749016198764
1749016198766 → 1749016198766
1749016198767 → 1749016198767
1749016198768 → 1749016198768
1749016198769 → 1749016198769总计数:7 (验证了测试结果!)
第11-15次请求后的存储状态
继续添加:
ZADD key 1749016198771 1749016198771 → 新增,count=8
ZADD key 1749016198771 1749016198771 → 覆盖,count=8 (不变)
ZADD key 1749016198771 1749016198771 → 覆盖,count=8 (不变)
ZADD key 1749016198772 1749016198772 → 新增,count=9
ZADD key 1749016198772 1749016198772 → 覆盖,count=9 (不变)最终存储:
Score Member
1749016198740 → 1749016198740
1749016198763 → 1749016198763
1749016198764 → 1749016198764
1749016198766 → 1749016198766
1749016198767 → 1749016198767
1749016198768 → 1749016198768
1749016198769 → 1749016198769
1749016198771 → 1749016198771
1749016198772 → 1749016198772总计数:9 (< 10,所以都被允许了!)
你发现其实是因为每次请求太快了 时间戳精度不够高导致连续的请求是一样的时间戳 然后就被覆盖了。所以解决办法就是 使用更高精度纳秒而不是毫秒
// 使用纳秒时间戳和纳秒窗口 当前这里是用的 毫秒 是否存在问题呢?long now = System.currentTimeMillis();long windowNanos = windowMs ; // 可以转换为纳秒// 使用纳秒时间戳和纳秒窗口long now = System.nanoTime();long windowNanos = windowMs * 1_000_000L; // 转换为纳秒
这样就可以决绝这个问题
限并发
回忆 区别并发和限流
限流就是打到接口上的请求
限制并发就是限制“同时执行关键逻辑的线程数”
自然而然会想到使用信号量的概念去限制最大执行的线程数
分布式信号量
你有一个写字楼,只有 3 间厕所(许可数 = 3)
每个线程 = 每个员工
每个员工来上厕所(acquire())
上完了释放厕所(release())
存在一个手动释放的过程相比 【令牌桶 和滑动窗口】
代码实现
public class RedissonSemaphoreTest {@Testpublic void testDistributedSemaphore() throws InterruptedException {// 1. 初始化 Redis 客户端Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redissonClient = Redisson.create(config);// 2. 获取分布式信号量对象String key = "semaphore:test:resource";RSemaphore semaphore = redissonClient.getSemaphore(key);// 3. 设置最多允许3个线程同时进入(初始化许可数)semaphore.trySetPermits(3); // 若 key 不存在则设置,否则保持现有// 4. 创建10个线程模拟并发int threadCount = 10;ExecutorService pool = Executors.newFixedThreadPool(threadCount);CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {final int id = i + 1;pool.submit(() -> {try {System.out.println("线程 " + id + " 尝试获取许可...");semaphore.acquire(); // 阻塞直到拿到许可System.out.println("线程 " + id + " 成功获取许可,开始工作");// 模拟执行耗时任务Thread.sleep(1000);System.out.println("线程 " + id + " 释放许可");semaphore.release(); // 释放许可} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}});}// 等待所有线程结束latch.await();pool.shutdown();redissonClient.shutdown();}
}
结果分析
【以下显示了最多三个线程拿到资源 释放资源后 才有等待的其他的线程去拿到】
线程 1 尝试获取许可...
线程 2 尝试获取许可...
线程 3 尝试获取许可...
线程 4 尝试获取许可...
线程 5 尝试获取许可...
线程 6 尝试获取许可...
线程 7 尝试获取许可...
线程 8 尝试获取许可...
线程 9 尝试获取许可...
线程 10 尝试获取许可...线程 3 ✅ 成功获取许可,开始工作
线程 2 ✅ 成功获取许可,开始工作
线程 1 ✅ 成功获取许可,开始工作线程 2 🔁 释放许可
线程 3 🔁 释放许可
线程 1 🔁 释放许可线程 7 ✅ 成功获取许可,开始工作
线程 9 ✅ 成功获取许可,开始工作
线程 8 ✅ 成功获取许可,开始工作线程 9 🔁 释放许可
线程 8 🔁 释放许可
线程 7 🔁 释放许可线程 6 ✅ 成功获取许可,开始工作
线程 4 ✅ 成功获取许可,开始工作
线程 5 ✅ 成功获取许可,开始工作线程 6 🔁 释放许可
线程 4 🔁 释放许可
线程 5 🔁 释放许可线程 10 ✅ 成功获取许可,开始工作
线程 10 🔁 释放许可
可以看到只有最多三个线程去执行关键代码
lua脚本实现原理
-- ========== 自定义Lua实现详解 ==========-- 获取信号量的完整Lua脚本
local semaphore_key = KEYS[1] -- "semaphore:video_transcode"
local max_permits = tonumber(ARGV[1]) -- 5 (最大许可数)
local permit_id = ARGV[2] -- "uuid-12345" (许可证ID)
local expire_time = tonumber(ARGV[3]) -- 1654321200 (过期时间戳)
local timeout_seconds = tonumber(ARGV[4]) -- 600 (超时秒数)
local current_time = tonumber(ARGV[5]) -- 当前时间戳-- 第1步: 清理过期的许可证
-- 删除分数(过期时间) <= 当前时间的所有成员
redis.call('ZREMRANGEBYSCORE', semaphore_key, 0, current_time)-- 第2步: 检查当前许可证数量
-- ZCARD返回有序集合的成员数量
local current_count = redis.call('ZCARD', semaphore_key)-- 第3步: 判断是否可以获取新许可
if current_count < max_permits then-- 可以获取:添加新的许可证到有序集合redis.call('ZADD', semaphore_key, expire_time, permit_id)-- 设置整个key的过期时间(防止内存泄漏)redis.call('EXPIRE', semaphore_key, timeout_seconds * 2)return permit_id -- 返回许可证ID(成功)
elsereturn nil -- 返回空(失败)
end-- ========== 数据结构演示 ==========-- 假设max_permits = 3,当前时间 = 1000
-- 初始状态: semaphore:video_transcode = {}-- 第1个请求获取许可:
-- ZADD semaphore:video_transcode 1600 uuid-001
-- 状态: {uuid-001: 1600} -> 当前count = 1-- 第2个请求获取许可:
-- ZADD semaphore:video_transcode 1650 uuid-002
-- 状态: {uuid-001: 1600, uuid-002: 1650} -> 当前count = 2-- 第3个请求获取许可:
-- ZADD semaphore:video_transcode 1700 uuid-003
-- 状态: {uuid-001: 1600, uuid-002: 1650, uuid-003: 1700} -> 当前count = 3-- 第4个请求获取许可:
-- current_count = 3, max_permits = 3
-- 3 >= 3,拒绝请求,返回nil-- 当时间到达1650时,uuid-002过期:
-- ZREMRANGEBYSCORE semaphore:video_transcode 0 1650
-- 状态: {uuid-003: 1700} -> 当前count = 1,可以接受新请求
实际在 redissonClient.getSemaphore(key) 中 最终redis的数据是这样的
哦 原来就是一个String类型的数据去表示数量啊
思考一下为什么还是3?
因为是获取+释放的过程 每次都正常释放了 所以信号量还是为3
双注解去实现限流 并发
以上我已经将限流和限并发讲明白了,接下来就是给出一个工具类模版代码,让你在业务代码中直接使用注解能够使用上这样的工具。
注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessControl {enum Strategy {RATE_LIMIT, // 限流:单位时间限制请求次数CONCURRENCY // 限并发:同时最大线程数}String key(); // Redis Keyint permits(); // 最大令牌数或并发数long interval() default 1; // 限流时的单位时间长度(默认1)TimeUnit timeUnit() default TimeUnit.SECONDS; // 限流时的单位时间单位Strategy strategy(); // 控制策略
}
切面:
@Aspect
@Component
public class AccessControlAspect {@Autowiredprivate RedissonClient redissonClient;@Around("@annotation(accessControl)")public Object handleAccessControl(ProceedingJoinPoint pjp, AccessControl accessControl) throws Throwable {String key = accessControl.key();int permits = accessControl.permits();AccessControl.Strategy strategy = accessControl.strategy();if (strategy == AccessControl.Strategy.CONCURRENCY) {return handleSemaphore(key, permits, pjp);} else {return handleRateLimiter(key, permits, accessControl, pjp);}}private Object handleSemaphore(String key, int permits, ProceedingJoinPoint pjp) throws Throwable {RSemaphore semaphore = redissonClient.getSemaphore(key);semaphore.trySetPermits(permits);semaphore.acquire();try {return pjp.proceed();} finally {semaphore.release();}}private Object handleRateLimiter(String key, int permits, AccessControl accessControl, ProceedingJoinPoint pjp) throws Throwable {RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);rateLimiter.trySetRate(RateType.OVERALL,permits,accessControl.interval(),toRateUnit(accessControl.timeUnit()));boolean acquired = rateLimiter.tryAcquire(1);if (!acquired) {throw new RuntimeException("请求过快,请稍后再试");}return pjp.proceed();}private RateIntervalUnit toRateUnit(TimeUnit timeUnit) {switch (timeUnit) {case SECONDS: return RateIntervalUnit.SECONDS;case MINUTES: return RateIntervalUnit.MINUTES;case HOURS: return RateIntervalUnit.HOURS;case DAYS: return RateIntervalUnit.DAYS;case MILLISECONDS: return RateIntervalUnit.MILLISECONDS;default: throw new IllegalArgumentException("不支持的时间单位");}}
}
服务类:
@Service
public class TwoStrategyInOneAnotationService {@AccessControl(key = "video:concurrent", permits = 3, strategy = AccessControl.Strategy.CONCURRENCY)public void transcode(String videoName) throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 正在转码: " + videoName);Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " 转码完成: " + videoName);}@AccessControl(key = "sms:rate", permits = 5, strategy = AccessControl.Strategy.RATE_LIMIT, interval = 1, timeUnit = TimeUnit.SECONDS)public void sendSms(String phone) {System.out.println("发送短信到: " + phone);}
}
测试类:
@SpringBootTest
public class TwoStrategyInOneAnnotationTest {@Autowiredprivate TwoStrategyInOneAnotationService taskService;@Testpublic void testConcurrency() throws InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(10);CountDownLatch latch = new CountDownLatch(10);for (int i = 0; i < 10; i++) {final int id = i;pool.submit(() -> {try {taskService.transcode("video-" + id);} catch (Exception e) {System.out.println("转码失败:" + e.getMessage());} finally {latch.countDown();}});}latch.await();pool.shutdown();}@Testpublic void testRateLimit() {for (int i = 0; i < 10; i++) {try {taskService.sendSms("1380000" + i);} catch (Exception e) {System.out.println("短信失败:" + e.getMessage());}}}
}
结果分析:
限流:不超过5个请求
限并发:只有最大三个线程去执行
实际业务去理解体会
场景描述:
1、用户上传视频,系统进行转码处理。每个转码任务需要消耗大量CPU资源,平均耗时10秒。
2、当前我的计算机是一个8核的CPU
3、限流配置(RateLimiter):每秒允许 10个请求
4、限并发配置(Semaphore):可以并发转码任务数为 核数 =8。
@Service
public class TranscodeService {@AccessControl(key = "transcode:rate", permits = 10, strategy = AccessControl.Strategy.RATE_LIMIT)@AccessControl(key = "transcode:semaphore", permits = 8, strategy = AccessControl.Strategy.CONCURRENCY)public void transcode(String filename) throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 开始转码视频:" + filename);Thread.sleep(10_000); // 模拟耗时10秒System.out.println(Thread.currentThread().getName() + " 转码完成:" + filename);}
}
统一注解实现版本
当前还可以统一上面的双注解为一个注解 当前只需要简单调整注解和切面部分的代码使其同时支持两种策略就好。
注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessControl {enum LimitType {RATE_LIMIT, // 仅限流CONCURRENCY, // 仅限并发ALL // 同时限流 + 并发控制}String key(); // Redis key 前缀LimitType limitType() default LimitType.ALL;// 限流配置int rateLimitPermits() default 0; // 每秒允许通过次数long rateLimitInterval() default 1;TimeUnit rateLimitUnit() default TimeUnit.SECONDS;// 并发配置int concurrencyPermits() default 0;
}
切面类:
@Aspect
@Component
public class AccessControlAspect {@Autowiredprivate RedissonClient redissonClient;@Around("@annotation(control)")public Object handle(ProceedingJoinPoint pjp, AccessControl control) throws Throwable {String key = control.key();AccessControl.LimitType type = control.limitType();boolean acquiredRate = true;boolean acquiredSemaphore = true;RRateLimiter rateLimiter = null;RSemaphore semaphore = null;// 限流逻辑if (type == AccessControl.LimitType.RATE_LIMIT || type == AccessControl.LimitType.ALL) {rateLimiter = redissonClient.getRateLimiter(key + ":ratelimit");rateLimiter.trySetRate(RateType.OVERALL,control.rateLimitPermits(),control.rateLimitInterval(),toRateUnit(control.rateLimitUnit()));acquiredRate = rateLimiter.tryAcquire(1);if (!acquiredRate) {throw new RuntimeException("请求过快,请稍后再试");}}// 并发逻辑if (type == AccessControl.LimitType.CONCURRENCY || type == AccessControl.LimitType.ALL) {semaphore = redissonClient.getSemaphore(key + ":semaphore");semaphore.trySetPermits(control.concurrencyPermits());acquiredSemaphore = semaphore.tryAcquire();if (!acquiredSemaphore) {throw new RuntimeException("系统忙,请稍后重试");}}try {return pjp.proceed();} finally {if (semaphore != null && acquiredSemaphore) {semaphore.release();}}}private RateIntervalUnit toRateUnit(TimeUnit unit) {switch (unit) {case SECONDS: return RateIntervalUnit.SECONDS;case MINUTES: return RateIntervalUnit.MINUTES;case HOURS: return RateIntervalUnit.HOURS;case DAYS: return RateIntervalUnit.DAYS;case MILLISECONDS: return RateIntervalUnit.MILLISECONDS;default: throw new IllegalArgumentException("不支持的时间单位");}}
}
服务类:
@Service
public class VideoService {@AccessControl(key = "video:transcode",rateLimitPermits = 6,concurrencyPermits = 6,limitType = AccessControl.LimitType.ALL)public void transcode(String name) throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 转码中:" + name);Thread.sleep(10_000);System.out.println(Thread.currentThread().getName() + " 转码完成:" + name);}
}
思考为什么不能用令牌桶 去做 限制并发 分布式信号量去做限流?
核心区别:什么时候释放资源
滑动窗口/令牌桶:自动释放(基于时间)
令牌消耗后就没了,不需要"归还"
系统按时间自动补充令牌
分布式信号量:手动释放(基于资源占用)
信号量必须"借用"后"归还"
归还时机由处理时长决定
天然控制并发:同时最多N个请求持有信号量
用信号量去控制流量QPS 会存在
释放太快,无法限制频率
例如:1秒内可能获取/释放几千次
用滑动窗口/令牌桶去控制并发线程存在:
令牌消耗了,无法表示"正在占用资源"
系统继续补充令牌,无法限制并发数
因为存在获得释放的过程 才能表示有多少并发线程 如果只去表示获取 不去表示释放 则不能控制并发线程的数量 正是信号量的“获得”“释放”符合去控制并发的数量。
总结
这是一篇对限流限并发的介绍,希望你有所收获。
感谢每一位看到最后的你,我知道这篇文章比较长,全程都是楼主【代码丰】手打 实验的,希望能收获点赞和收藏 谢谢小伙伴的支持,让我们一路变强