Redisson的分布式锁源码分析2

文章目录

  • Redisson的读写锁使用
    • 加锁源码分析
    • 释放锁源码分析:
  • Redisson一次加多个锁
    • RedissonMultiLock加锁源码分析:
    • RedissonMultiLock释放锁源码分析:
  • RCountDownLatch介绍:
    • RCountDownLatch源码分析:
  • RSemaphore分布式信号量
    • RSmaphore源码分析:

Redisson的读写锁使用

Redisson 的读写锁是 RReadWriteLock,它是基于 Redis 实现的可重入的分布式读写锁,支持跨线程、跨 JVM 的并发读写控制。

分布式锁的使用案例:

public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();final RReadWriteLock lock = redisson.getReadWriteLock("lock");lock.writeLock().tryLock();Thread t = new Thread() {public void run() {RLock r = lock.readLock();r.lock();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}r.unlock();};};t.start();t.join();lock.writeLock().unlock();t.join();redisson.shutdown();}

分布式锁源码分析:
RedissonReadWriteLock 的实现

public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);}//读锁@Overridepublic RLock readLock() {return new RedissonReadLock(commandExecutor, getName());}//写锁@Overridepublic RLock writeLock() {return new RedissonWriteLock(commandExecutor, getName());}}

加锁源码分析

RedissonLock.lock()源码分析:

 private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//线程ID绑定long threadId = Thread.currentThread().getId();//尝试获取锁(非阻塞)//tryAcquire 是执行一段 Lua 脚本,核心做法://如果 Redis 上的锁 key 不存在,则设置并返回 null;//如果存在,返回剩余 TTL;//所以:返回 null 表示获取成功,非 null 表示锁被占用(还剩多少 ms 过期)。Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加锁成功if (ttl == null) {return;}//订阅锁释放事件,进入阻塞等待CompletableFuture<RedissonLockEntry> future = subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;//根据 interruptibly 阻塞当前线程if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}//循环尝试重新获取锁try {while (true) {//尝试加锁ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加锁成功if (ttl == null) {//结束break;}//  // 阻塞等待 ttl 时间// 这是 Redisson 的“自旋式阻塞重试”机制;//每次重试之前,都会判断锁是否释放;//若未释放,根据 TTL 设置 CountDownLatch.tryAcquire(ttl),阻塞一段时间;//如果 ttl < 0,表示未知 TTL,则完全阻塞等待锁释放信号;if (ttl >= 0) {try {//如果在这段时间内其他线程释放了锁,会收到 Redis 发布的消息,唤醒 latch,线程会提前返回。//如果到时间 latch 还没被释放,tryAcquire 会返回 false,再进入下一轮 while 循环继续尝试。//这就是典型的“阻塞 + 自旋”模式,节省 CPU,又不失活性。entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// ttl < 0 的分支:表示锁没有设置过期时间(或 Redis 中已被人为删除)if (interruptibly) {entry.getLatch().acquire();//// 可中断阻塞等待} else {entry.getLatch().acquireUninterruptibly();//不可中断的阻塞}}}} finally {//finally 中取消订阅unsubscribe(entry, threadId);}
//        get(lockAsync(leaseTime, unit));}

尝试获取锁的源码分析RedissonLock.tryAcquireAsync():

    private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//尝试执行 Redis 脚本加锁RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {//如果指定了 leaseTime > 0,则锁将在此时间后自动过期(无续约机制)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//否则使用默认锁时间(如 30s)+ 开启「自动续约」ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {//watch Dog 现成定时给没有设置过期时间的锁加锁,默认10s加一次,最长为30sscheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}

区分读锁和写锁
读加锁RedissonReadLock.tryLockInnerAsync():

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('set', KEYS[2] .. ':1', 1); " +"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" +"redis.call('set', key, 1); " +"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));}

写加锁RedissonWriteLock.tryLockInnerAsync():

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName()),unit.toMillis(leaseTime), getLockName(threadId));}

释放锁源码分析:

释放锁RedissonBaseLock.unlockInnerAsync():

protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//获取锁释放通知标识 ID 用于创建一个“临时 Redis key”,帮助其他线程判断锁是否真正释放并通知它们String id = getServiceManager().generateId();//获取 Redisson 的集群/主从配置,包括超时、重试等参数。MasterSlaveServersConfig config = getServiceManager().getConfig();//计算一个“最大超时值”,用于后续 unlockInnerAsync(threadId, id, timeout) 调用://这个值等于:每次请求 timeout + 每次重试的 delay × 重试次数;//是 Redisson 估算出这次解锁流程最大可能执行时长,在后续通知订阅者的时候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//调用 真正的解锁逻辑(重载版本),传入://threadId:验证是否为持锁线程;//id:解锁通知标识;//timeout:通知监听器等待的超时时间;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解锁操作完成后,继续处理解锁通知 key 的删除逻辑。CommandAsyncExecutor ce = commandExecutor;// 判断当前是否在批量命令上下文中。如果是,则重新包一层 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//删除解锁通知 key(如:redisson_lock__unlock_latch__{id})//这个 key 是在 unlock 的 Lua 脚本中设置的,用于通知等待线程“锁释放”是否成功。//为什么要显式删掉它?//因为订阅者在收到解锁通知后,会去检查这个 key 是否存在;//不存在 = 真释放成功;//为了避免它一直存在,占用内存,Redisson 主动清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}

读锁释放锁RedissonReadLock.unlockInnerAsync()

  protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[5]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return nil; " +"end; " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +"if (lockExists == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter == 0) then " +"redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" +"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +"if (redis.call('hlen', KEYS[1]) > 1) then " +"local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " +"if maxRemainTime > 0 then " +"redis.call('pexpire', KEYS[1], maxRemainTime); " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0; " +"end;" + "if mode == 'write' then " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0;" +"end; " +"end; " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return 1; ",Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix, getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}

写锁释放锁RedissonWriteLock.unlockInnerAsync()

  protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return nil; " +"end;" +"if (mode == 'write') then " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +"if (lockExists == 0) then " +"return nil;" +"else " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('hdel', KEYS[1], ARGV[3]); " +"if (redis.call('hlen', KEYS[1]) == 1) then " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"else " +// has unlocked read-locks"redis.call('hset', KEYS[1], 'mode', 'read'); " +"end; " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; "+"end; " +"end; " +"end; "+ "return nil;",Arrays.<Object>asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}

读锁特点
多个线程可以同时获取读锁;
如果有写锁存在,其他线程不能获取读锁;
当前线程已持有写锁时,可以获取读锁(读写重入);
每获取一次读锁,Redisson 都会创建一个 独立的 Redis key,用于设置过期时间;
所有读锁共享一个主 Hash key,如 rwlock:{myKey}。
使用场景:缓存读取、多线程查看共享数据时

写锁特点
只能被一个线程独占;
如果有读锁或写锁,其他线程都必须等待;
支持可重入(同线程反复加锁);
超时释放:设置 leaseTime 或依赖 WatchDog;
Redis 中使用 Hash key 存储锁状态及持有者线程。

Redisson一次加多个锁

RedissonMultiLocks加多个锁的使用案例:

    public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient client = Redisson.create();RLock lock1 = client.getLock("lock1");RLock lock2 = client.getLock("lock2");RLock lock3 = client.getLock("lock3");Thread t = new Thread() {public void run() {RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();try {Thread.sleep(3000);} catch (InterruptedException e) {}lock.unlock();};};t.start();t.join(1000);RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();lock.unlock();}

RedissonMultiLock加锁源码分析:

RedissonMultiLock.lock()

   public void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {//每个锁默认等待 1500ms,因此基础等待时间是锁数量 × 1500 毫秒;long baseWaitTime = locks.size() * 1500;while (true) {long waitTime;//如果你不传 leaseTime,那就等 baseWaitTime 毫秒;if (leaseTime <= 0) {waitTime = baseWaitTime;} else {//如果你传了 leaseTime,就在一个合理范围内随机等待一段时间;//这个随机机制可以缓解多个线程同时竞争锁时的“惊群效应”(即都在同一时间重试造成 Redis 压力),提高锁的公平性和获取率。waitTime = unit.toMillis(leaseTime);if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}}if (leaseTime > 0) {leaseTime = unit.toMillis(leaseTime);}//尝试在 waitTime 时间内加锁成功;//如果成功,就返回;//如果失败,继续下一轮(外层 while 循环);//若线程在这期间被中断,则抛出 InterruptedException。if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {return;}}}

尝试加锁,本质还是对单个的对象加锁,全部加锁成功,才算成功,一个加锁失败,都需要将加锁成功的释放,防止出现死锁的现象。

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//        try {
//            return tryLockAsync(waitTime, leaseTime, unit).get();
//        } catch (ExecutionException e) {
//            throw new IllegalStateException(e);
//        }//重新计算内部实际锁定的时间 newLeaseTime。long newLeaseTime = -1;if (leaseTime > 0) {//如果设置了 waitTime,newLeaseTime 会取 waitTime*2,用于确保锁不会因为 续期等待时间过短 提前释放。if (waitTime > 0) {newLeaseTime = unit.toMillis(waitTime)*2;} else {//        //否则按用户指定的 leaseTime。newLeaseTime = unit.toMillis(leaseTime);}}//记录当前时间 time,用于后续计算剩余等待时间long time = System.currentTimeMillis();//如果设置了 waitTime,计算还剩多少时间 remainTime。long remainTime = -1;if (waitTime > 0) {remainTime = unit.toMillis(waitTime);}//lockWaitTime 是为了当前线程尝试单个锁时的最大等待时间,内部有可能会缩小单个锁的等待时间,避免整个锁组阻塞时间太久。long lockWaitTime = calcLockWaitTime(remainTime);//获取容忍失败的锁个数(在 RedLock 中可能允许某些节点获取失败)。int failedLocksLimit = failedLocksLimit();//初始化已获取锁的列表。List<RLock> acquiredLocks = new ArrayList<>(locks.size());//遍历所有要加的锁(一般用于 RedLock 或 MultiLock)。for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime <= 0 && leaseTime <= 0) {// 非阻塞锁lockAcquired = lock.tryLock();} else {//如果没有设置等待时间和租约时间,直接尝试非阻塞 tryLock()。//否则尝试阻塞获取锁,最大等待时间是 awaitTime。//如果 Redis 响应超时或异常,则判定加锁失败,并释放当前锁。long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}//判断当前锁是否成功if (lockAcquired) {//加锁成功acquiredLocks.add(lock);} else {//加锁失败 如果失败锁已达上限(如 RedLock 只要求超过半数成功),直接退出;if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}//如果不允许失败if (failedLocksLimit == 0) {//解锁之前已成功的锁(防止部分加锁造成死锁)。unlockInner(acquiredLocks);//如果超时时间是 0,立即返回失败。if (waitTime <= 0) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// 重置迭代器,重新加锁;while (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}//剩余时间调整if (remainTime > 0) {//每次加锁后都要更新剩余的可等待时间;remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();//如果耗尽 waitTime,释放当前已加锁部分,返回失败。if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}//如果指定了锁有效期 leaseTime,则设置所有成功加锁的锁的过期时间。if (leaseTime > 0) {acquiredLocks.stream().map(l -> (RedissonBaseLock) l).map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS)).forEach(f -> f.toCompletableFuture().join());}return true;}

RedissonMultiLock释放锁源码分析:

RedissonMultiLock.unlock()的源码流程

    public void unlock() {//循环遍历加锁的对象,进行锁的释放locks.forEach(Lock::unlock);}

RedissonBaseLock.unlockAsync0()

    private RFuture<Void> unlockAsync0(long threadId) {CompletionStage<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((res, e) -> {cancelExpirationRenewal(threadId, res);//释放锁报错if (e != null) {if (e instanceof CompletionException) {throw (CompletionException) e;}throw new CompletionException(e);}//返回的响应为nullif (res == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}

RedissonBaseLock.unlockInnerAsync()

protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//获取锁释放通知标识 ID 用于创建一个“临时 Redis key”,帮助其他线程判断锁是否真正释放并通知它们String id = getServiceManager().generateId();//获取 Redisson 的集群/主从配置,包括超时、重试等参数。MasterSlaveServersConfig config = getServiceManager().getConfig();//计算一个“最大超时值”,用于后续 unlockInnerAsync(threadId, id, timeout) 调用://这个值等于:每次请求 timeout + 每次重试的 delay × 重试次数;//是 Redisson 估算出这次解锁流程最大可能执行时长,在后续通知订阅者的时候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//调用 真正的解锁逻辑(重载版本),传入://threadId:验证是否为持锁线程;//id:解锁通知标识;//timeout:通知监听器等待的超时时间;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解锁操作完成后,继续处理解锁通知 key 的删除逻辑。CommandAsyncExecutor ce = commandExecutor;// 判断当前是否在批量命令上下文中。如果是,则重新包一层 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//删除解锁通知 key(如:redisson_lock__unlock_latch__{id})//这个 key 是在 unlock 的 Lua 脚本中设置的,用于通知等待线程“锁释放”是否成功。//为什么要显式删掉它?//因为订阅者在收到解锁通知后,会去检查这个 key 是否存在;//不存在 = 真释放成功;//为了避免它一直存在,占用内存,Redisson 主动清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}

执行RedissonLock.unlockInnerAsync()的lua脚本:

    protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; " +"end; ",Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}

RCountDownLatch介绍:

RCountDownLatch 是 Redisson 对 Redis 实现的 分布式 CountDownLatch 的封装,和 Java 标准库中的 java.util.concurrent.CountDownLatch 类似,但它是跨 JVM、跨服务进程共享状态的,用于在分布式环境下实现线程/服务间的同步控制。

使用场景:
你有多个微服务或线程并发执行,某个主服务需要等待它们全部完成;
替代本地 CountDownLatch,但要求在集群或多机房环境下同步状态;、
多个服务节点要协调完成某些“分布式初始化任务”;
实现“多个客户端等待某个事件发生”这种阻塞机制。

使用案例:

public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();ExecutorService executor = Executors.newFixedThreadPool(2);final RCountDownLatch latch = redisson.getCountDownLatch("latch1");latch.trySetCount(1);executor.execute(new Runnable() {@Overridepublic void run() {latch.countDown();}});executor.execute(new Runnable() {@Overridepublic void run() {try {latch.await(550, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}});executor.shutdown();executor.awaitTermination(10, TimeUnit.SECONDS);}

方法:

trySetCount(long count) 初始化计数值。只能设置一次(如果已存在则返回 false)
await() 阻塞等待直到计数为 0
await(long time, TimeUnit) 指定时间等待,超时返回 false
countDown() 将计数 -1,当计数为 0 时,唤醒所有 await 的线程
getCount() 获取当前计数值
delete() 删除 latch 对象在 Redis 中的 key(非必要)

RCountDownLatch源码分析:

RCountDownLatch.trySetCount()方法,初始化计数值

   public boolean trySetCount(long count) {//设置初始的大小,只能设置一次,如果已经设置了则返回0,否则返回1return get(trySetCountAsync(count));}

根据lua脚本设置初始化值,只能设置一次,如果当前的值为0设置成功,返回结果1,否则不需要设置,返回结果0。

    public RFuture<Boolean> trySetCountAsync(long count) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if redis.call('exists', KEYS[1]) == 0 then "+ "redis.call('set', KEYS[1], ARGV[2]); "+ "redis.call(ARGV[3], KEYS[2], ARGV[1]); "+ "return 1 "+ "else "+ "return 0 "+ "end",Arrays.asList(getRawName(), getChannelName()),CountDownLatchPubSub.NEW_COUNT_MESSAGE, count, getSubscribeService().getPublishCommand());}

countDown 将计数-1的源码分析:

    public void countDown() {get(countDownAsync());}//通过lua脚本,将计数-1,如果当前的减完后的技术小于等于0,删除当前的countdownlatch,如果等于0,则唤醒订阅阻塞的线程,public RFuture<Void> countDownAsync() {return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local v = redis.call('decr', KEYS[1]);" +"if v <= 0 then redis.call('del', KEYS[1]) end;" +"if v == 0 then redis.call(ARGV[2], KEYS[2], ARGV[1]) end;",Arrays.<Object>asList(getRawName(), getChannelName()),CountDownLatchPubSub.ZERO_COUNT_MESSAGE, getSubscribeService().getPublishCommand());
}

RSemaphore分布式信号量

RSemaphore 是 Redisson 对 Java 标准 Semaphore 的分布式实现(java.util.concurrent.Semaphore 的 Redis 版本),用于 限量访问资源 的场景,比如控制并发数、资源配额、限流器等。
功能:
支持分布式 可跨 JVM、服务节点共享资源许可数;
支持可阻塞 acquire() 支持等待直到有许可;
支持可限时 tryAcquire(timeout) 支持超时等待;
支持可释放 release() 归还许可;
支持动态调整许可数量 通过 trySetPermits();
方法功能:
trySetPermits(int permits) 初始化许可总数(只能设置一次)
addPermits(int permits) 增加许可(可重复调用)
acquire() 请求一个许可(阻塞)
tryAcquire() 非阻塞地获取许可,获取不到返回 false
tryAcquire(timeout, unit) 限时等待许可
release() 释放一个许可
availablePermits() 当前可用许可数
delete() 删除 Redis 中的信号量记录
使用案例:

    public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();RSemaphore s = redisson.getSemaphore("test");s.trySetPermits(5);s.acquire(3);Thread t = new Thread() {@Overridepublic void run() {RSemaphore s = redisson.getSemaphore("test");s.release();s.release();}};t.start();s.acquire(4);redisson.shutdown();}

RSmaphore源码分析:

 public void acquire() throws InterruptedException {//请求得到一个计数acquire(1);}
  public void acquire(int permits) throws InterruptedException {//尝试获取计数if (tryAcquire(permits)) {return;}//订阅CompletableFuture<RedissonLockEntry> future = subscribe();semaphorePubSub.timeout(future);RedissonLockEntry entry = commandExecutor.getInterrupted(future);try {while (true) {//尝试获取,如果获取到就结束,否则就阻塞if (tryAcquire(permits)) {return;}//阻塞,等待通知entry.getLatch().acquire();}} finally {//释放订阅信息unsubscribe(entry);}
//        get(acquireAsync(permits));}
    public RFuture<Boolean> tryAcquireAsync(int permits) {//健壮性校验if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");}//如果适当的数等于0,直接返回成功if (permits == 0) {return new CompletableFutureWrapper<>(true);}return commandExecutor.getServiceManager().execute(() -> {//核心的方法RFuture<Boolean> future = tryAcquireAsync0(permits);return commandExecutor.handleNoSync(future, e -> releaseAsync(permits));});}//执行我们的lua脚本private RFuture<Boolean> tryAcquireAsync0(int permits) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local value = redis.call('get', KEYS[1]); " +"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getRawName()), permits);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87470.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87470.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

系统架构设计师论文分享-论软件过程模型及应用

我的软考历程 摘要 2023年2月&#xff0c;我所在的公司通过了研发纱线MES系统的立项&#xff0c;该系统为国内纱线工厂提供SAAS服务&#xff0c;旨在提升纱线工厂的数字化和智能化水平。我在该项目中担任架构设计师&#xff0c;负责该项目的架构设计工作。本文结合我在该项目…

云原生Kubernetes系列 | etcd3.5集群部署和使用

云原生Kubernetes系列 | etcd3.5集群部署和使用 1. etcd集群部署2. etcd集群操作3. 新增etcd集群节点1. etcd集群部署 etcd3.5官网站点:    https://etcd.io/docs/v3.5/op-guide/clustering/    https://etcd.io/docs/v3.5/tutorials/how-to-setup-cluster/ [root@localh…

helm安装配置jenkins

1、k8s1.28.2、helm3.12.0&#xff0c;集群搭建 查看节点运行情况 kubectl get node -o wide openebs部署情况 kubectl get sc -n openebs 2、添加Jenkins Helm仓库 helm repo add jenkins https://charts.jenkins.iohelm repo update# 查看版本 helm search repo -l jen…

Wagtail - Django 内容管理系统

文章目录 一、关于 Wagtail1、项目概览2、相关链接资源3、功能特性 二、安装配置三、使用入门1、快速开始2、兼容性 四、其它社区与支持1、社区资源2、商业支持 开发贡献参考项目参考文献 一、关于 Wagtail 1、项目概览 Wagtail 是一个基于 Django 构建的开源内容管理系统&am…

Spring AI Alibaba 来啦!!!

博客标题&#xff1a;Spring AI Alibaba&#xff1a;深度解析其优势与阿里云生态的无缝集成 引言 随着人工智能技术的快速发展&#xff0c;越来越多的企业和开发者开始关注如何将 AI 技术融入到现有的应用开发框架中。Spring AI 作为 Spring 框架在 AI 领域的扩展&#xff0c;…

【论文阅读39】PINN求边坡内时空变化的地震动响应(位移、速度、加速度)场分布

论文提出了一种基于物理信息神经网络&#xff08;PINN&#xff09;和极限分析上界定理相结合的岩体边坡地震稳定性分析框架&#xff0c;重点考虑了边坡中的预存裂缝对稳定性的影响。 PINN用来求解岩质边坡内随时间和空间变化的地震动响应&#xff08;位移、速度、加速度&#…

驱动开发系列59- 再述如何处理硬件中断

在本文中,我们将重点讨论编写设备驱动程序时一个非常关键的方面:什么是硬件中断,更重要的是,作为驱动开发者,你该如何准确地处理它们。事实上,大量的外设(也就是你可能会为其编写驱动的设备)在需要操作系统或驱动程序立即响应时,通常会通过触发硬件中断的方式发出请求…

【蓝牙】Linux Qt4查看已经配对的蓝牙信息

在Linux系统中使用Qt4查看已配对的蓝牙设备信息&#xff0c;可以基于DBus与BlueZ&#xff08;Linux下的蓝牙协议栈&#xff09;进行交互。以下是一个实现方案&#xff1a; 1. 引入必要的库和头文件 确保项目中包含DBus相关的头文件&#xff0c;并链接QtDBus模块&#xff1a; …

企业客户数据防窃指南:从法律要件到维权实操

作者&#xff1a;邱戈龙、曾建萍 ——上海商业秘密律师 在数字经济时代&#xff0c;客户数据已成为企业最核心的资产之一。然而&#xff0c;数据显示&#xff0c;近三年全国商业秘密侵权案件中&#xff0c;涉及客户信息的案件占比高达42%&#xff0c;但最终进入刑事程序的不足…

WHAT - React Native 中 Light and Dark mode 深色模式(黑暗模式)机制

文章目录 一、Light / Dark Mode 的原理1. 操作系统层2. React Native 如何获取?3. 样式怎么跟着变?二、关键代码示例讲解代码讲解:三、自定义主题四、运行时自动更新五、核心原理一张图组件应用例子最小示例:动态样式按钮的动态样式如何封装一套自定义主题四、如何和 Them…

[25-cv-07396、25-cv-07470]Keith代理Anderson这9张版权图,除此之外原告还有50多个版权!卖家要小心!

Anderson 版权图 案件号&#xff1a;25-cv-07396、25-cv-07470 立案时间&#xff1a;2025年7月2日 原告&#xff1a;Anderson Design Group, Inc. 代理律所&#xff1a;Keith 原告介绍 原告是美国的创意设计公司&#xff0c;成立于1993年&#xff0c;简称ADG&#xff0c;一…

五、代码生成器:gen项目开发

目录 1.新建数据库 2.nacos中配置文件 3.gen项目配置代码 4.前端项目 我们再项目中需要代码生成器,这边自己开发一个gen代码生成器服务。 1.新建数据库 CREATE TABLE `gen_table` (`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT 编号,`table_name` varchar(200) DEF…

UI前端大数据处理安全性保障:数据加密与隐私保护策略

hello宝子们...我们是艾斯视觉擅长ui设计、前端开发、数字孪生、大数据、三维建模、三维动画10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 一、引言&#xff1a;大数据时代前端安全的核心挑战 在数据驱动业务发展的今天&#xff0c;U…

基于 alpine 构建 .net 的基础镜像

准备基础镜像 alpine:3.22 完整的 Dockerfile 如下&#xff1a; # 使用官方的 Alpine 3.22 镜像作为基础镜像 FROM --platform$TARGETPLATFORM alpine:3.22 AS builder# 设置环境变量 ENV DEBIAN_FRONTENDnoninteractive# 创建目录结构 WORKDIR /app# 备份原始源文件并更换为…

Blob分析及形态学分析

目录 Blob分析的基本思想&#xff1a; Blob分析主要流程&#xff1a; Blob分析 分割: Binary Threshold 分割: Histogram 分割: 动态阈值 全局阈值与动态局部阈值的比较 形态学处理 连通区域 connetion 形态学算子 特征提取 提取特征 常用相关算子 区域特征&#…

中小河流雨水情监测预报系统解决方案

一、方案概述 中小河流在防洪减灾体系中地位关键&#xff0c;但由于其数量众多、分布广泛&#xff0c;监测预报基础相对薄弱&#xff0c;易引发洪水灾害&#xff0c;威胁沿岸居民生命财产安全。本系统旨在构建完善的中小河流雨水情监测预报体系&#xff0c;提升防洪减灾能力。实…

Abase和ByteKV存储方案对比

Abase 和 ByteKV 是字节跳动内部自研的两款分布式 KV 存储系统&#xff0c;虽然都服务于大规模在线业务&#xff0c;但在设计目标、架构模型、适用场景等方面存在显著差异。以下是核心区别的详细分析&#xff1a; &#x1f527; ‌1. 设计目标与一致性模型‌ ‌Abase‌&#x…

JSON的缩进格式方式和紧凑格式方式

将对象转化为json格式字符串在以缩进的方式显示 HxParamMsg hxCommMsg new HxParamMsg() {name "Tom",age 25 }; string json JsonConvert.SerializeObject(hxCommMsg); var parsed JToken.Parse(json); string data parsed.ToString(Formatting.Indented); // …

设计模式篇:灵活多变的策略模式

引言&#xff1a;从现实世界到代码世界的面向对象在商业策略制定中&#xff0c;企业会根据市场环境选择不同的竞争策略&#xff1b;在军事行动中&#xff0c;指挥官会根据敌情选择不同的战术&#xff1b;在游戏对战中&#xff0c;玩家会根据局势调整作战方式。这种根据情境选择…

Bitvisse SSH Client 安装配置文档

一、软件功能介绍​ Bitvisse SSH Client 是一款功能强大的 SSH 客户端软件&#xff0c;具备以下显著特点&#xff1a;​ 丰富的代理隧道协议支持&#xff1a;支持 socks4、socks4a、socks5 和 http 等多种连接代理隧道协议&#xff0c;为网络连接提供多样选择。​便捷的应用…