文章目录
- Redisson的读写锁使用
- 加锁源码分析
- 释放锁源码分析:
- Redisson一次加多个锁
- RedissonMultiLock加锁源码分析:
- RedissonMultiLock释放锁源码分析:
- RCountDownLatch介绍:
- RCountDownLatch源码分析:
- RSemaphore分布式信号量
- RSmaphore源码分析:
Redisson的读写锁使用
Redisson 的读写锁是 RReadWriteLock,它是基于 Redis 实现的可重入的分布式读写锁,支持跨线程、跨 JVM 的并发读写控制。
分布式锁的使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();final RReadWriteLock lock = redisson.getReadWriteLock("lock");lock.writeLock().tryLock();Thread t = new Thread() {public void run() {RLock r = lock.readLock();r.lock();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}r.unlock();};};t.start();t.join();lock.writeLock().unlock();t.join();redisson.shutdown();}
分布式锁源码分析:
RedissonReadWriteLock 的实现
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);}//读锁@Overridepublic RLock readLock() {return new RedissonReadLock(commandExecutor, getName());}//写锁@Overridepublic RLock writeLock() {return new RedissonWriteLock(commandExecutor, getName());}}
加锁源码分析
RedissonLock.lock()源码分析:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//线程ID绑定long threadId = Thread.currentThread().getId();//尝试获取锁(非阻塞)//tryAcquire 是执行一段 Lua 脚本,核心做法://如果 Redis 上的锁 key 不存在,则设置并返回 null;//如果存在,返回剩余 TTL;//所以:返回 null 表示获取成功,非 null 表示锁被占用(还剩多少 ms 过期)。Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加锁成功if (ttl == null) {return;}//订阅锁释放事件,进入阻塞等待CompletableFuture<RedissonLockEntry> future = subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;//根据 interruptibly 阻塞当前线程if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}//循环尝试重新获取锁try {while (true) {//尝试加锁ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加锁成功if (ttl == null) {//结束break;}// // 阻塞等待 ttl 时间// 这是 Redisson 的“自旋式阻塞重试”机制;//每次重试之前,都会判断锁是否释放;//若未释放,根据 TTL 设置 CountDownLatch.tryAcquire(ttl),阻塞一段时间;//如果 ttl < 0,表示未知 TTL,则完全阻塞等待锁释放信号;if (ttl >= 0) {try {//如果在这段时间内其他线程释放了锁,会收到 Redis 发布的消息,唤醒 latch,线程会提前返回。//如果到时间 latch 还没被释放,tryAcquire 会返回 false,再进入下一轮 while 循环继续尝试。//这就是典型的“阻塞 + 自旋”模式,节省 CPU,又不失活性。entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// ttl < 0 的分支:表示锁没有设置过期时间(或 Redis 中已被人为删除)if (interruptibly) {entry.getLatch().acquire();//// 可中断阻塞等待} else {entry.getLatch().acquireUninterruptibly();//不可中断的阻塞}}}} finally {//finally 中取消订阅unsubscribe(entry, threadId);}
// get(lockAsync(leaseTime, unit));}
尝试获取锁的源码分析RedissonLock.tryAcquireAsync():
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//尝试执行 Redis 脚本加锁RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {//如果指定了 leaseTime > 0,则锁将在此时间后自动过期(无续约机制)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//否则使用默认锁时间(如 30s)+ 开启「自动续约」ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {//watch Dog 现成定时给没有设置过期时间的锁加锁,默认10s加一次,最长为30sscheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}
区分读锁和写锁
读加锁RedissonReadLock.tryLockInnerAsync():
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('set', KEYS[2] .. ':1', 1); " +"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" +"redis.call('set', key, 1); " +"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));}
写加锁RedissonWriteLock.tryLockInnerAsync():
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName()),unit.toMillis(leaseTime), getLockName(threadId));}
释放锁源码分析:
释放锁RedissonBaseLock.unlockInnerAsync():
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//获取锁释放通知标识 ID 用于创建一个“临时 Redis key”,帮助其他线程判断锁是否真正释放并通知它们String id = getServiceManager().generateId();//获取 Redisson 的集群/主从配置,包括超时、重试等参数。MasterSlaveServersConfig config = getServiceManager().getConfig();//计算一个“最大超时值”,用于后续 unlockInnerAsync(threadId, id, timeout) 调用://这个值等于:每次请求 timeout + 每次重试的 delay × 重试次数;//是 Redisson 估算出这次解锁流程最大可能执行时长,在后续通知订阅者的时候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//调用 真正的解锁逻辑(重载版本),传入://threadId:验证是否为持锁线程;//id:解锁通知标识;//timeout:通知监听器等待的超时时间;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解锁操作完成后,继续处理解锁通知 key 的删除逻辑。CommandAsyncExecutor ce = commandExecutor;// 判断当前是否在批量命令上下文中。如果是,则重新包一层 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//删除解锁通知 key(如:redisson_lock__unlock_latch__{id})//这个 key 是在 unlock 的 Lua 脚本中设置的,用于通知等待线程“锁释放”是否成功。//为什么要显式删掉它?//因为订阅者在收到解锁通知后,会去检查这个 key 是否存在;//不存在 = 真释放成功;//为了避免它一直存在,占用内存,Redisson 主动清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}
读锁释放锁RedissonReadLock.unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[5]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return nil; " +"end; " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +"if (lockExists == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter == 0) then " +"redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" +"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +"if (redis.call('hlen', KEYS[1]) > 1) then " +"local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " +"if maxRemainTime > 0 then " +"redis.call('pexpire', KEYS[1], maxRemainTime); " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0; " +"end;" + "if mode == 'write' then " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0;" +"end; " +"end; " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return 1; ",Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix, getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}
写锁释放锁RedissonWriteLock.unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return nil; " +"end;" +"if (mode == 'write') then " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +"if (lockExists == 0) then " +"return nil;" +"else " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('hdel', KEYS[1], ARGV[3]); " +"if (redis.call('hlen', KEYS[1]) == 1) then " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"else " +// has unlocked read-locks"redis.call('hset', KEYS[1], 'mode', 'read'); " +"end; " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; "+"end; " +"end; " +"end; "+ "return nil;",Arrays.<Object>asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}
读锁特点
多个线程可以同时获取读锁;
如果有写锁存在,其他线程不能获取读锁;
当前线程已持有写锁时,可以获取读锁(读写重入);
每获取一次读锁,Redisson 都会创建一个 独立的 Redis key,用于设置过期时间;
所有读锁共享一个主 Hash key,如 rwlock:{myKey}。
使用场景:缓存读取、多线程查看共享数据时
写锁特点
只能被一个线程独占;
如果有读锁或写锁,其他线程都必须等待;
支持可重入(同线程反复加锁);
超时释放:设置 leaseTime 或依赖 WatchDog;
Redis 中使用 Hash key 存储锁状态及持有者线程。
Redisson一次加多个锁
RedissonMultiLocks加多个锁的使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient client = Redisson.create();RLock lock1 = client.getLock("lock1");RLock lock2 = client.getLock("lock2");RLock lock3 = client.getLock("lock3");Thread t = new Thread() {public void run() {RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();try {Thread.sleep(3000);} catch (InterruptedException e) {}lock.unlock();};};t.start();t.join(1000);RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();lock.unlock();}
RedissonMultiLock加锁源码分析:
RedissonMultiLock.lock()
public void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {//每个锁默认等待 1500ms,因此基础等待时间是锁数量 × 1500 毫秒;long baseWaitTime = locks.size() * 1500;while (true) {long waitTime;//如果你不传 leaseTime,那就等 baseWaitTime 毫秒;if (leaseTime <= 0) {waitTime = baseWaitTime;} else {//如果你传了 leaseTime,就在一个合理范围内随机等待一段时间;//这个随机机制可以缓解多个线程同时竞争锁时的“惊群效应”(即都在同一时间重试造成 Redis 压力),提高锁的公平性和获取率。waitTime = unit.toMillis(leaseTime);if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}}if (leaseTime > 0) {leaseTime = unit.toMillis(leaseTime);}//尝试在 waitTime 时间内加锁成功;//如果成功,就返回;//如果失败,继续下一轮(外层 while 循环);//若线程在这期间被中断,则抛出 InterruptedException。if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {return;}}}
尝试加锁,本质还是对单个的对象加锁,全部加锁成功,才算成功,一个加锁失败,都需要将加锁成功的释放,防止出现死锁的现象。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }//重新计算内部实际锁定的时间 newLeaseTime。long newLeaseTime = -1;if (leaseTime > 0) {//如果设置了 waitTime,newLeaseTime 会取 waitTime*2,用于确保锁不会因为 续期等待时间过短 提前释放。if (waitTime > 0) {newLeaseTime = unit.toMillis(waitTime)*2;} else {// //否则按用户指定的 leaseTime。newLeaseTime = unit.toMillis(leaseTime);}}//记录当前时间 time,用于后续计算剩余等待时间long time = System.currentTimeMillis();//如果设置了 waitTime,计算还剩多少时间 remainTime。long remainTime = -1;if (waitTime > 0) {remainTime = unit.toMillis(waitTime);}//lockWaitTime 是为了当前线程尝试单个锁时的最大等待时间,内部有可能会缩小单个锁的等待时间,避免整个锁组阻塞时间太久。long lockWaitTime = calcLockWaitTime(remainTime);//获取容忍失败的锁个数(在 RedLock 中可能允许某些节点获取失败)。int failedLocksLimit = failedLocksLimit();//初始化已获取锁的列表。List<RLock> acquiredLocks = new ArrayList<>(locks.size());//遍历所有要加的锁(一般用于 RedLock 或 MultiLock)。for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime <= 0 && leaseTime <= 0) {// 非阻塞锁lockAcquired = lock.tryLock();} else {//如果没有设置等待时间和租约时间,直接尝试非阻塞 tryLock()。//否则尝试阻塞获取锁,最大等待时间是 awaitTime。//如果 Redis 响应超时或异常,则判定加锁失败,并释放当前锁。long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}//判断当前锁是否成功if (lockAcquired) {//加锁成功acquiredLocks.add(lock);} else {//加锁失败 如果失败锁已达上限(如 RedLock 只要求超过半数成功),直接退出;if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}//如果不允许失败if (failedLocksLimit == 0) {//解锁之前已成功的锁(防止部分加锁造成死锁)。unlockInner(acquiredLocks);//如果超时时间是 0,立即返回失败。if (waitTime <= 0) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// 重置迭代器,重新加锁;while (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}//剩余时间调整if (remainTime > 0) {//每次加锁后都要更新剩余的可等待时间;remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();//如果耗尽 waitTime,释放当前已加锁部分,返回失败。if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}//如果指定了锁有效期 leaseTime,则设置所有成功加锁的锁的过期时间。if (leaseTime > 0) {acquiredLocks.stream().map(l -> (RedissonBaseLock) l).map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS)).forEach(f -> f.toCompletableFuture().join());}return true;}
RedissonMultiLock释放锁源码分析:
RedissonMultiLock.unlock()的源码流程
public void unlock() {//循环遍历加锁的对象,进行锁的释放locks.forEach(Lock::unlock);}
RedissonBaseLock.unlockAsync0()
private RFuture<Void> unlockAsync0(long threadId) {CompletionStage<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((res, e) -> {cancelExpirationRenewal(threadId, res);//释放锁报错if (e != null) {if (e instanceof CompletionException) {throw (CompletionException) e;}throw new CompletionException(e);}//返回的响应为nullif (res == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}
RedissonBaseLock.unlockInnerAsync()
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//获取锁释放通知标识 ID 用于创建一个“临时 Redis key”,帮助其他线程判断锁是否真正释放并通知它们String id = getServiceManager().generateId();//获取 Redisson 的集群/主从配置,包括超时、重试等参数。MasterSlaveServersConfig config = getServiceManager().getConfig();//计算一个“最大超时值”,用于后续 unlockInnerAsync(threadId, id, timeout) 调用://这个值等于:每次请求 timeout + 每次重试的 delay × 重试次数;//是 Redisson 估算出这次解锁流程最大可能执行时长,在后续通知订阅者的时候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//调用 真正的解锁逻辑(重载版本),传入://threadId:验证是否为持锁线程;//id:解锁通知标识;//timeout:通知监听器等待的超时时间;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解锁操作完成后,继续处理解锁通知 key 的删除逻辑。CommandAsyncExecutor ce = commandExecutor;// 判断当前是否在批量命令上下文中。如果是,则重新包一层 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//删除解锁通知 key(如:redisson_lock__unlock_latch__{id})//这个 key 是在 unlock 的 Lua 脚本中设置的,用于通知等待线程“锁释放”是否成功。//为什么要显式删掉它?//因为订阅者在收到解锁通知后,会去检查这个 key 是否存在;//不存在 = 真释放成功;//为了避免它一直存在,占用内存,Redisson 主动清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}
执行RedissonLock.unlockInnerAsync()的lua脚本:
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; " +"end; ",Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}
RCountDownLatch介绍:
RCountDownLatch 是 Redisson 对 Redis 实现的 分布式 CountDownLatch 的封装,和 Java 标准库中的 java.util.concurrent.CountDownLatch 类似,但它是跨 JVM、跨服务进程共享状态的,用于在分布式环境下实现线程/服务间的同步控制。
使用场景:
你有多个微服务或线程并发执行,某个主服务需要等待它们全部完成;
替代本地 CountDownLatch,但要求在集群或多机房环境下同步状态;、
多个服务节点要协调完成某些“分布式初始化任务”;
实现“多个客户端等待某个事件发生”这种阻塞机制。
使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();ExecutorService executor = Executors.newFixedThreadPool(2);final RCountDownLatch latch = redisson.getCountDownLatch("latch1");latch.trySetCount(1);executor.execute(new Runnable() {@Overridepublic void run() {latch.countDown();}});executor.execute(new Runnable() {@Overridepublic void run() {try {latch.await(550, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}});executor.shutdown();executor.awaitTermination(10, TimeUnit.SECONDS);}
方法:
trySetCount(long count) 初始化计数值。只能设置一次(如果已存在则返回 false)
await() 阻塞等待直到计数为 0
await(long time, TimeUnit) 指定时间等待,超时返回 false
countDown() 将计数 -1,当计数为 0 时,唤醒所有 await 的线程
getCount() 获取当前计数值
delete() 删除 latch 对象在 Redis 中的 key(非必要)
RCountDownLatch源码分析:
RCountDownLatch.trySetCount()方法,初始化计数值
public boolean trySetCount(long count) {//设置初始的大小,只能设置一次,如果已经设置了则返回0,否则返回1return get(trySetCountAsync(count));}
根据lua脚本设置初始化值,只能设置一次,如果当前的值为0设置成功,返回结果1,否则不需要设置,返回结果0。
public RFuture<Boolean> trySetCountAsync(long count) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if redis.call('exists', KEYS[1]) == 0 then "+ "redis.call('set', KEYS[1], ARGV[2]); "+ "redis.call(ARGV[3], KEYS[2], ARGV[1]); "+ "return 1 "+ "else "+ "return 0 "+ "end",Arrays.asList(getRawName(), getChannelName()),CountDownLatchPubSub.NEW_COUNT_MESSAGE, count, getSubscribeService().getPublishCommand());}
countDown 将计数-1的源码分析:
public void countDown() {get(countDownAsync());}//通过lua脚本,将计数-1,如果当前的减完后的技术小于等于0,删除当前的countdownlatch,如果等于0,则唤醒订阅阻塞的线程,public RFuture<Void> countDownAsync() {return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local v = redis.call('decr', KEYS[1]);" +"if v <= 0 then redis.call('del', KEYS[1]) end;" +"if v == 0 then redis.call(ARGV[2], KEYS[2], ARGV[1]) end;",Arrays.<Object>asList(getRawName(), getChannelName()),CountDownLatchPubSub.ZERO_COUNT_MESSAGE, getSubscribeService().getPublishCommand());
}
RSemaphore分布式信号量
RSemaphore 是 Redisson 对 Java 标准 Semaphore 的分布式实现(java.util.concurrent.Semaphore 的 Redis 版本),用于 限量访问资源 的场景,比如控制并发数、资源配额、限流器等。
功能:
支持分布式 可跨 JVM、服务节点共享资源许可数;
支持可阻塞 acquire() 支持等待直到有许可;
支持可限时 tryAcquire(timeout) 支持超时等待;
支持可释放 release() 归还许可;
支持动态调整许可数量 通过 trySetPermits();
方法功能:
trySetPermits(int permits) 初始化许可总数(只能设置一次)
addPermits(int permits) 增加许可(可重复调用)
acquire() 请求一个许可(阻塞)
tryAcquire() 非阻塞地获取许可,获取不到返回 false
tryAcquire(timeout, unit) 限时等待许可
release() 释放一个许可
availablePermits() 当前可用许可数
delete() 删除 Redis 中的信号量记录
使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();RSemaphore s = redisson.getSemaphore("test");s.trySetPermits(5);s.acquire(3);Thread t = new Thread() {@Overridepublic void run() {RSemaphore s = redisson.getSemaphore("test");s.release();s.release();}};t.start();s.acquire(4);redisson.shutdown();}
RSmaphore源码分析:
public void acquire() throws InterruptedException {//请求得到一个计数acquire(1);}
public void acquire(int permits) throws InterruptedException {//尝试获取计数if (tryAcquire(permits)) {return;}//订阅CompletableFuture<RedissonLockEntry> future = subscribe();semaphorePubSub.timeout(future);RedissonLockEntry entry = commandExecutor.getInterrupted(future);try {while (true) {//尝试获取,如果获取到就结束,否则就阻塞if (tryAcquire(permits)) {return;}//阻塞,等待通知entry.getLatch().acquire();}} finally {//释放订阅信息unsubscribe(entry);}
// get(acquireAsync(permits));}
public RFuture<Boolean> tryAcquireAsync(int permits) {//健壮性校验if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");}//如果适当的数等于0,直接返回成功if (permits == 0) {return new CompletableFutureWrapper<>(true);}return commandExecutor.getServiceManager().execute(() -> {//核心的方法RFuture<Boolean> future = tryAcquireAsync0(permits);return commandExecutor.handleNoSync(future, e -> releaseAsync(permits));});}//执行我们的lua脚本private RFuture<Boolean> tryAcquireAsync0(int permits) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local value = redis.call('get', KEYS[1]); " +"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getRawName()), permits);}