分布式锁—Redisson的公平锁

1.Redisson公平锁RedissonFairLock概述

(1)非公平和公平的可重入锁

一.非公平可重入锁

锁被释放后,排队获取锁的线程会重新无序获取锁,没有任何顺序性可言。

二.公平可重入锁

锁被释放后,排队获取锁的线程会按照请求获取锁时候的顺序去获取锁。公平锁可以保证线程获取锁的顺序,与其请求获取锁的顺序是一样的。也就是谁先申请获取到这把锁,谁就可以先获取到这把锁。公平可重入锁会把各个线程的加锁请求进行排队处理,保证先申请获取锁的线程,可以优先获取锁,从而实现所谓的公平性。

三.可重入的非公平锁和公平锁不同点

可重入的非公平锁和公平锁,在整体的技术实现框架上都是一样的。唯一的不同点就是加锁和解锁的逻辑不一样。非公平锁的加锁逻辑,比较简单。公平锁的加锁逻辑,要加入排队机制,保证各个线程排队能按顺序获取锁。

(2)Redisson公平锁的简单使用

Redisson的可重入锁RedissonLock指的是非公平可重入锁,Redisson的公平锁RedissonFairLock指的是公平可重入锁。

Redisson的公平可重入锁实现了java.util.concurrent.locks.Lock接口,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒之后才会继续分配下一个线程。

RedissonFairLock是RedissonLock的子类。RedissonFairLock的锁实现框架,和RedissonLock基本一样。而在获取锁和释放锁的lua脚本中,RedissonFairLock的逻辑才有所区别。

//1.最常见的使用方法
RedissonClient redisson = Redisson.create(config);
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();//2.10秒钟以后自动解锁,无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);//3.尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();//4.Redisson为公平的可重入锁提供了异步执行的相关方法
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

(3)Redisson公平锁的初始化
public class RedissonDemo {public static void main(String[] args) throws Exception {...//创建RedissonClient实例RedissonClient redisson = Redisson.create(config);//获取公平的可重入锁RLock fairLock = redisson.getFairLock("myLock");fairLock.lock();//加锁fairLock.unlock();//释放锁}
}public class Redisson implements RedissonClient {//Redis的连接管理器,封装了一个Config实例protected final ConnectionManager connectionManager;//Redis的命令执行器,封装了一个ConnectionManager实例protected final CommandAsyncExecutor commandExecutor;...protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);//初始化Redis的连接管理器connectionManager = ConfigSupport.createConnectionManager(configCopy);...  //初始化Redis的命令执行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);...}public RLock getFairLock(String name) {return new RedissonFairLock(commandExecutor, name);}...
}public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;private final CommandAsyncExecutor commandExecutor;...public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor, name, 60000*5);}public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.threadWaitTime = threadWaitTime;...}...
}public class RedissonLock extends RedissonBaseLock {protected long internalLockLeaseTime;final CommandAsyncExecutor commandExecutor;...public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;//与WatchDog有关的internalLockLeaseTime//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config//通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();...}...
}public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {...protected long internalLockLeaseTime;final String id;final String entryName;final CommandAsyncExecutor commandExecutor;public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();//获取UUIDthis.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;}...
}abstract class RedissonExpirable extends RedissonObject implements RExpirable {RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {super(connectionManager, name);}...
}public abstract class RedissonObject implements RObject {protected final CommandAsyncExecutor commandExecutor;protected String name;protected final Codec codec;public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);}public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {this.codec = codec;this.commandExecutor = commandExecutor;if (name == null) {throw new NullPointerException("name can't be null");}setName(name);}...
}public class ConfigSupport {...//创建Redis的连接管理器public static ConnectionManager createConnectionManager(Config configCopy) {//生成UUIDUUID id = UUID.randomUUID();...if (configCopy.getClusterServersConfig() != null) {validate(configCopy.getClusterServersConfig());//返回ClusterConnectionManager实例return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);}...}...
}public class ClusterConnectionManager extends MasterSlaveConnectionManager {public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {super(config, id);...}...
}public class MasterSlaveConnectionManager implements ConnectionManager {protected final String id;//初始化时为UUIDprivate final Config cfg;protected Codec codec;...protected MasterSlaveConnectionManager(Config cfg, UUID id) {this.id = id.toString();//传入的是UUID...this.cfg = cfg;this.codec = cfg.getCodec();...}public String getId() {return id;}public Codec getCodec() {return codec;}...
}

2.公平锁源码之加锁和排队

(1)加锁时的执行流程

使用Redisson的公平锁RedissonFairLock进行加锁时:首先调用的是RedissonLock的lock()方法,然后会调用RedissonLock的tryAcquire()方法,接着会调用RedissonLock的tryAcquireAsync()方法。

在RedissonLock的tryAcquireAsync()方法中,会调用一个可以被RedissonLock子类重载的tryLockInnerAsync()方法。对于非公平锁,执行到这会调用RedissonLock的tryLockInnerAsync()方法。对于公平锁,执行到这会调用RedissonFairLock的tryLockInnerAsync()方法。

在RedissonFairLock的tryLockInnerAsync()方法中,便执行具体的lua脚本。

public class RedissonDemo {public static void main(String[] args) throws Exception {...//创建RedissonClient实例RedissonClient redisson = Redisson.create(config);//获取公平的可重入锁RLock fairLock = redisson.getFairLock("myLock");fairLock.lock();//加锁fairLock.unlock();//释放锁}
}public class RedissonLock extends RedissonBaseLock {...//不带参数的加锁public void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}//带参数的加锁public void lock(long leaseTime, TimeUnit unit) {try {lock(leaseTime, unit, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//加锁成功if (ttl == null) {return;}//加锁失败...}private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime != -1) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法//公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//对RFuture<Long>类型的ttlRemainingFuture添加回调监听CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {//tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑://加锁成功if (ttlRemaining == null) {if (leaseTime != -1) {//如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务internalLockLeaseTime = unit.toMillis(leaseTime);} else {//创建定时调度任务scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}...
}public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;//线程可以等待锁的时间private final CommandAsyncExecutor commandExecutor;private final String threadsQueueName;private final String timeoutSetName;public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor, name, 60000*5);//传入60秒*5=5分钟}public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.threadWaitTime = threadWaitTime;threadsQueueName = prefixName("redisson_lock_queue", name);timeoutSetName = prefixName("redisson_lock_timeout", name);}...@Override<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {long wait = threadWaitTime;if (waitTime != -1) {//将传入的指定的获取锁等待时间赋值给wait变量wait = unit.toMillis(waitTime);}  ...if (command == RedisCommands.EVAL_LONG) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//步骤一:remove stale threads,移除等待超时的线程"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//remove the item from the queue and timeout set NOTE we do not alter any other timeout//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +//check if the lock can be acquired now//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//remove this thread from the queue and timeout set//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//decrease timeouts for all waiting in the queue//递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间//zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])//ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//acquire the lock and set the TTL for the lease//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//check if the lock is already held, and this is a re-entry(可重入锁)//步骤四:判断锁是否已经被当前线程持有,KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID + ThreadID;"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//the lock cannot be acquired, check if the thread is already in the queue//步骤五:判断当前获取锁失败的线程是否已经在队列中排队//KEYS[3]是对线程排序的有序集合,ARGV[2]是当前线程的UUID + ThreadID;"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +//the real timeout is the timeout of the prior thread in the queue, //but this is approximately correct, and avoids having to traverse the queue//如果当前获取锁失败的线程已经在队列中排队//那么就返回该线程等待获取锁时,还剩多少时间就超时了,外部代码拿到这个时间会阻塞等待这个时间//ARGV[3]是当前线程获取锁时可以等待的时间,ARGV[4]是当前时间"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +//add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of//the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the threadWaitTime//步骤六:对获取锁失败的线程进行排队处理"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +//如果在队列中排队的最后一个元素不是当前线程"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在队列中排最后的线程,ARGV[2]是当前线程的UUID+线程ID,ARGV[4]是当前时间//因为拥有最大过期时间的线程在队列中是排最后的//所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间//从而保证新加入队列和有序集合的线程的过期时间是最大的//下面这一行会计算出:还有多少时间,当前队列中排最后的线程就会过期,外部代码拿到这个时间会阻塞等待这个时间//这样后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +//下面这一行会计算出:还有多少时间,锁就会过期,外部代码拿到这个时间会阻塞等待这个时间"ttl = redis.call('pttl', KEYS[1]);" +"end;" +//计算当前线程在排队等待锁时的过期时间"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间//然后再把当前线程作为一个元素插入队列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),unit.toMillis(leaseTime),getLockName(threadId),wait,currentTime);}...}...
}

(2)获取公平锁的lua脚本相关参数说明

KEYS[1]是getRawName(),它是一个Hash数据结构的key,也就是锁的名字,比如"myLock"。

KEYS[2]是threadsQueueName,它是一个用来对线程排队的队列的名字,多个客户端线程申请获取锁时,会到这个队列里进行排队。比如"redisson_lock_queue:{myLock}"。

KEYS[3]是timeoutSetName,它是一个用来对线程排序的有序集合的名字,这个有序集合可以自动按照每个数据指定的分数进行排序。比如"redisson_lock_timeout:{myLock}"。

ARGV[1]是leaseTime,代表锁的过期时间。如果leaseTime没有指定,默认就是internalLockLeaseTime = 30秒。

ARGV[2]是getLockName(threadId),代表客户端UUID + 线程ID。

ARGV[3]是threadWaitTime,代表线程可以等待的时间(默认5分钟)。

ARGV[4]是currentTime,代表当前时间。

(3)lua脚本步骤一:进入while循环移除队列和有序集合中等待超时的线程

while循环中首先执行命令:"lindex redisson_lock_queue:{myLock} 0",也就是获取"redisson_lock_queue:{myLock}"这个队列中的第一个元素。一开始该队列是空的,所以什么都获取不到,firstThreadId2为false。此时就会break掉,退出while循环。

如果获取到队列中的第一个元素,那么就会执行zscore命令:从有序集合中获取该元素对应的分数,也就是该元素对应线程的过期时间。如果过期时间比当前时间小,那么就要从队列和有序集合中移除该元素。否则,也会break掉,退出while循环。

//步骤一:remove stale threads,移除等待超时的线程
"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待锁超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//remove the item from the queue and timeout set NOTE we do not alter any other timeout//从有序集合+队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +
"end;" +

(4)lua脚本步骤二:判断当前线程能否获取锁

判断条件一:

首先执行命令"exists myLock",判断锁是否存在。一开始没有线程加过锁,所以判断条件肯定是成立的,该条件为true。

判断条件二:

接着执行命令"exists redisson_lock_queue:{myLock}",看队列是否存在。一开始也没有这个队列,所以这个条件也肯定成立,该条件为true。

判断条件三:

如果有这个队列,则判断队列存在的条件不成立,执行"或"后面的判断。也就是执行命令"lindex redisson_lock_queue:{myLock} 0",判断队列的第一个元素是否是当前线程的UUID + ThreadID。

//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +...
"end;" +

总结当前线程现在可以尝试获取锁的情况如下:

情况一:锁不存在 + 队列也不存在

情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程

(5)lua脚本步骤三:执行获取锁的操作

当判断现在能否尝试获取锁的条件通过后,便会执行如下操作:

步骤一:执行命令"lpop redisson_lock_queue:{myLock}",弹出队列第一个元素。一开始该队列是空的,所以该命令不会进行处理。接着执行命令"zrem redisson_lock_timeout:{myLock} UUID1:ThreadID1",也就是从有序集合中删除UUID1:ThreadID1对应的元素。一开始该有序集合也是空的,所以该命令不会进行处理。

步骤二:执行命令"hset myLock UUID1:ThreadID1 1",进行加锁操作。在设置key为myLock的Hash值中,field为UUID1:ThreadID1的value值为1。接着执行命令"pexpire myLock 30000",设置锁key的过期时间为30秒。

最后返回nil,这样在外层代码中,就会认为加锁成功。于是就会创建一个WatchDog看门狗定时调度任务,10秒后对锁进行检查。如果检查发现当前线程还持有这个锁,那么就重置锁key的过期时间为30秒,并且重新创建一个WatchDog看门狗定时调度任务在10秒后继续进行检查。

//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//remove this thread from the queue and timeout set//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//decrease timeouts for all waiting in the queue//递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间//zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])//ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//acquire the lock and set the TTL for the lease//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;" +

(6)lua脚本步骤四:判断锁是否已经被当前线程持有(可重入锁)

此时会执行命令"hexists myLock UUID:ThreadID"。如果判断条件通过,则说明是持有锁的线程对锁进行了重入。于是会执行命令"hincrby myLock UUID:ThreadID 1",对key为锁名的Hash值中,field为UUID + 线程ID的value值累加1。并且执行命令"pexpire myLock 300000"重置锁key的过期时间。最后返回nil,表示重入加锁成功。

//check if the lock is already held, and this is a re-entry(可重入锁)
//步骤四:判断锁是否已经被当前线程持有,KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID + ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;" +

(7)lua脚本步骤五:判断当前获取锁失败的线程是否已经在队列中排队

通过执行命令"zscore redisson_lock_timeout:{myLock} UUID:ThreadID",获取当前线程在有序集合中的对应的分数,也就是过期时间。如果获取成功则返回:当前线程等待获取锁的超时时间还剩多少,外部代码拿到这个时间会阻塞等待这个时间。

//the lock cannot be acquired, check if the thread is already in the queue
//步骤五:判断当前获取锁失败的线程是否已经在队列中排队
//KEYS[3]是对线程排序的有序集合,ARGV[2]是当前线程的UUID+ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +//the real timeout is the timeout of the prior thread in the queue, //but this is approximately correct, and avoids having to traverse the queue//如果当前获取锁失败的线程已经在队列中排队//那么就返回该线程等待获取锁时,还剩多少时间就超时了,外部代码拿到这个时间会阻塞等待这个时间//ARGV[3]是当前线程获取锁时可以等待的时间,ARGV[4]是当前时间"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +

(8)lua脚本步骤六:对获取锁失败的线程进行排队

首先获取队列中的最后一个元素。因为拥有最大过期时间的线程在队列中是排最后的,所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间。从而保证新加入队列和有序集合的线程的过期时间是最大的。然后获取锁或者队列中排最后的线程剩余的存活时间,接着计算当前线程在排队等待锁时的过期时间。

然后把当前线程作为一个元素插入有序集合,并设置有序集合中该元素的分数为该线程在排队等待锁时的过期时间,接着再把当前线程作为一个元素插入队列尾部。

最后返回锁或者队列中排第一的线程剩余的存活时间ttl给外层代码。如果外层代码拿到的返回值是非null,那么客户端会进入一个while循环。在while循环会每阻塞等待ttl时间再尝试去进行加锁,重新执行lua脚本。

如果队列里没有元素,那么第一个加入队列的线程,会阻塞等待锁的过期时间。如果队列里有元素,那么后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间。

//步骤六:对获取锁失败的线程进行排队处理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在队列中排队的最后一个元素不是当前线程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在队列中排最后的线程,ARGV[2]是当前线程的UUID + 线程ID,ARGV[4]是当前时间//因为拥有最大过期时间的线程在队列中是排最后的//所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间//从而保证新加入队列和有序集合的线程的过期时间是最大的//下面这一行会计算出:还有多少时间,当前队列中排最后的线程就会过期,外部代码拿到这个时间会阻塞等待这个时间//这样后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +//下面这一行会计算出:还有多少时间,锁就会过期,外部代码拿到这个时间会阻塞等待这个时间"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//计算当前线程在排队等待锁时的过期时间
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间
//然后再把当前线程作为一个元素插入队列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",

(9)获取锁失败的第一个线程执行lua脚本的流程

公平锁的核心在于申请加锁时,加锁失败的各个客户端会排队。之后锁被释放时,会依次获取锁,从而实现公平性。

假设此时第一个客户端线程已加锁成功,第二个客户端线程也来尝试加锁,那么会进行如下排队处理。

步骤一:进入while循环,移除等待超时的线程。执行命令"lindex redisson_lock_queue:{myLock} 0",获取队列排第一元素。由于此时队列还是空的,所以获取到的是false,于是退出while循环。

步骤二:判断当前线程现在能否尝试获取锁。因为执行命令"exists myLock",发现锁已经存在了,于是判断不通过。

步骤三:判断锁是否已经被当前线程持有,由于第二个客户端线程的UUID + 线程ID必然不等于第一个客户端线程。所以此时执行命令"hexists myLock UUID2:ThreadID2",发现不存在。所以此处的可重入锁的判断条件也不成立。

步骤四:判断当前获取锁失败的线程是否已经在队列中排队。由于当前线程是第一个获取锁失败的线程,所以判断不通过。

步骤五:接下来进行排队处理。

//对获取锁失败的线程进行排队处理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在队列中排队的最后一个元素不是当前线程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在队列中排最后的线程,ARGV[2]是当前线程的UUID+线程ID,ARGV[4]是当前时间//因为拥有最大过期时间的线程在队列中是排最后的//所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间//从而保证新加入队列和有序集合的线程的过期时间是最大的//下面这一行会计算出:还有多少时间,当前队列中排最后的线程就会过期,外部代码拿到这个时间会阻塞等待这个时间  //这样后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +//下面这一行会计算出:还有多少时间,锁就会过期,外部代码拿到这个时间会阻塞等待这个时间"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//计算当前线程在排队等待锁时的过期时间
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间
//然后再把当前线程作为一个元素插入队列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"

首先执行命令"lindex redisson_lock_queue:{myLock} 0"。也就是从队列中获取最后一个元素,由于此时队列是空,所以获取不到元素。然后执行命令"ttl = pttl myLock",获取锁剩余的存活时间。

接着计算当前线程在排队等待锁时的过期时间。假设myLock剩余的存活时间ttl为20秒,那么timeout = ttl + 5分钟 + 当前时间 = 20秒 + 5分钟 + 10:00:00 = 10:05:20;

然后执行命令"zadd redisson_lock_timeout:{myLock} 10:05:20 UUID2:ThreadID2",这行命令的意思是,在有序集合中插入一个元素。元素值是UUID2:ThreadID2,元素对应的分数是10:05:20。分数会用时间的Long型时间戳来表示,时间越靠后,时间戳就越大。有序集合Sorted Set会自动根据插入的元素分数从小到大进行排序。

接着执行命令"rpush redisson_lock_queue:{myLock} UUID2:TheadID2",这行命令的意思是,将UUID2:ThreadID2插入到队列的尾部。

最后返回ttl给外层代码,也就是返回myLock剩余的存活时间。如果外层代码拿到的ttl是非null,那么客户端会进入一个while循环。在while循环会每阻塞等待ttl时间就尝试进行加锁,重新执行lua脚本。

(10)获取锁失败的第二个线程执行lua脚本的流程

如果此时有第三个客户端线程也来尝试加锁,那么会进行如下排队处理。

步骤一:进入while循环,移除等待超时的线程。执行命令"lindex redisson_lock_queue:{myLock} 0",获取队列排第一元素。此时获取到UUID2:ThreadID2,代表着第二个客户端线程正在队列里排队。

继续执行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",从有序集合中获取UUID2:ThreadID2对应的分数,timeout = 10:05:20。

假设当前时间是10:00:25,那么timeout <= 10:00:25的这个条件不成立,于是退出while循环。

步骤二:判断当前线程现在能否尝试获取锁,发现不能通过。因为执行命令"exists myLock"时,发现锁已经存在。

步骤三:判断锁是否已经被当前线程持有。由于第三个客户端线程的UUID + 线程ID必然不等于第一个客户端线程。所以此时执行命令"hexists myLock UUID3:ThreadID3",发现不存在。所以此处的可重入锁的判断条件也不成立。

步骤四:判断当前获取锁失败的线程是否已经在队列中排队。由于当前线程是第二个获取锁失败的线程,所以判断不通过。

步骤五:接下来进行排队处理。

//对获取锁失败的线程进行排队处理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在队列中排队的最后一个元素不是当前线程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在队列中排最后的线程,ARGV[2]是当前线程的UUID + 线程ID,ARGV[4]是当前时间//因为拥有最大过期时间的线程在队列中是排最后的//所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间//从而保证新加入队列和有序集合的线程的过期时间是最大的//下面这一行会计算出:还有多少时间,当前队列中排最后的线程就会过期,外部代码拿到这个时间会阻塞等待这个时间  //这样后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +//下面这一行会计算出:还有多少时间,锁就会过期,外部代码拿到这个时间会阻塞等待这个时间"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//计算当前线程在排队等待锁时的过期时间
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间
//然后再把当前线程作为一个元素插入队列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"

首先执行命令"lindex redisson_lock_queue:{myLock} 0",获取到队列中的最后一个元素UUID2:ThreadID2。

然后判断条件是否成立:lastThreadId不为false + lastThreadId不是自己。由于此时的ARGV[2] = UUID3:ThreadID3,所以判断条件成立。即在队列里排队的最后一个元素并不是当前尝试获取锁的客户端线程。

于是执行:"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" - 当前时间,也就是获取在队列中排最后的线程还有多少时间就会过期,从而得到ttl。

接着根据ttl计算当前线程在排队等待锁时的过期时间timeout,然后执行zadd和rpush命令对当前线程进行入队和排队,最后返回ttl。

3.公平锁源码之可重入加锁

持有公平锁的客户端重复进行lock.lock(),执行加锁lua脚本的流程如下:

步骤一:进入while循环,移除等待超时的线程。执行命令"lindex redisson_lock_queue:{myLock} 0",获取队列排第一元素。此时获取到UUID2:ThreadID2,代表着第二个客户端线程正在队列里排队。

继续执行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",从有序集合中获取UUID2:ThreadID2对应的分数,timeout = 10:05:20。

假设当前时间是10:00:25,那么timeout <= 10:00:25的这个条件不成立,于是退出while循环。

步骤二:判断当前线程现在能否尝试获取锁,发现不能通过。因为执行命令"exists myLock"时,发现锁已经存在。

步骤三:判断锁是否已经被当前线程持有。由于当前线程的UUID + 线程ID等于持有锁的线程。即此时执行命令"hexists myLock UUID:ThreadID"发现key是存在的,所以此处的可重入锁的判断条件成立。

于是会执行命令"hincrby myLock UUID:ThreadID 1",对key为锁名的Hash值中,key为UUID + 线程ID的Hash值累加1。并且执行命令"pexpire myLock 300000"重置锁key的过期时间。最后返回nil,表示重入加锁成功。

//check if the lock is already held, and this is a re-entry(可重入锁)
//步骤四:判断锁是否已经被当前线程持有,KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID+ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;" +

4.公平锁源码之新旧版本对比

当客户端线程尝试加公平锁失败处于排队状态时,会进入while循环。在while循环中,每次都会等待一段时间,再重新进行尝试加公平锁。

public class RedissonLock extends RedissonBaseLock {...//加锁@Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//线程ID,用来生成设置Hash的值long threadId = Thread.currentThread().getId();//尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//ttl为null说明加锁成功if (ttl == null) {return;}//加锁失败时的处理CompletableFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {//再次尝试获取锁ttl = tryAcquire(-1, leaseTime, unit, threadId);//返回的ttl为null,获取到锁,就退出while循环if (ttl == null) {break;}//返回的ttl不为null,则说明其他客户端或线程还持有锁//那么就利用同步组件Semaphore进行阻塞等待一段ttl的时间if (ttl >= 0) {try {commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {commandExecutor.getNow(future).getLatch().acquire();} else {commandExecutor.getNow(future).getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(commandExecutor.getNow(future), threadId);}}...
}

假设第二个客户端线程第一次加锁是在10:00:00,然后在10:00:15该客户端线程再次发起请求尝试进行加锁,但第一个客户端线程在10:00:00~10:00:15之间一直持有这把锁,此时第二个客户端线程的再次加锁流程如下:

(1)新版本再次加锁失败不会刷新排队分数(等待超时的时间点timeout)

步骤一:进入while循环,移除等待超时的线程。执行命令"lindex redisson_lock_queue:{myLock} 0",获取队列排第一元素。此时获取到UUID2:ThreadID2,代表着第二个客户端线程正在队列里排队。

继续执行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",从有序集合中获取UUID2:ThreadID2对应的分数,比如获取到的timeout = 10:05:20。根据当前时间是10:00:15,那么timeout <= 10:00:15的这个条件不成立,于是退出while循环。

步骤二:判断当前线程现在能否尝试获取锁,发现不能通过。因为执行命令"exists myLock"时,发现锁已经存在。

步骤三:判断锁是否已经被当前线程持有。由于第二个客户端线程的UUID + 线程ID必然不等于第一个客户端线程,所以此时执行命令"hexists myLock UUID2:ThreadID2",发现不存在,所以此处的可重入锁的判断条件也不成立。

步骤四:判断当前获取锁失败的线程是否已经在队列中排队。由于当前线程是第二次尝试获取锁,所以判断通过。然后返回第二个客户端线程等待获取锁时,还剩多少时间就超时,不会刷新排队分数。

//Redisson的3.16.8版本
if (command == RedisCommands.EVAL_LONG) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//步骤一:remove stale threads,移除等待超时的线程"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间//zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])//ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//步骤四:判断锁是否已经被当前线程持有(可重入锁),KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID+ThreadID;"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//步骤五:判断当前获取锁失败的线程是否已经在队列中排队//KEYS[3]是对线程排序的有序集合,ARGV[2]是当前线程的UUID + ThreadID;"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +//如果当前获取锁失败的线程已经在队列中排队//那么就返回该线程等待获取锁时,还剩多少时间就超时了,外部代码拿到这个时间会阻塞等待这个时间//ARGV[3]是当前线程获取锁时可以等待的时间,ARGV[4]是当前时间"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +//步骤六:对获取锁失败的线程进行排队处理"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +//如果在队列中排队的最后一个元素不是当前线程"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在队列中排最后的线程,ARGV[2]是当前线程的UUID + 线程ID,ARGV[4]是当前时间//因为拥有最大过期时间的线程在队列中是排最后的//所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间//从而保证新加入队列和有序集合的线程的过期时间是最大的//下面这一行会计算出:还有多少时间,当前队列中排最后的线程就会过期,外部代码拿到这个时间会阻塞等待这个时间 //这样后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +//下面这一行会计算出:还有多少时间,锁就会过期,外部代码拿到这个时间会阻塞等待这个时间"ttl = redis.call('pttl', KEYS[1]);" +"end;" +//计算当前线程在排队等待锁时的过期时间"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间//然后再把当前线程作为一个元素插入队列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),unit.toMillis(leaseTime),getLockName(threadId),wait,//默认是5分钟currentTime);
}

(2)旧版本再次加锁失败会刷新排队分数(等待超时的时间点timeout)

旧版本公平锁的lua脚本如下所示,当第二个客户端线程再次加锁时会再次进入排队逻辑。

首先会出计算队列中的第一个元素还有多少时间就超时,即ttl。然后根据ttl + 传入的等待时间,计算当前线程等待锁的超时时间timeout。

接着执行命令"zadd redisson_lock_timeout:{myLock} timeout UUID2:ThreadID2",刷新有序集合中的同名元素的分数为timeout。客户端线程每次重复尝试加锁,都会将其对应的过期时间往后延长,也就是刷新了排队的分数。

zadd命令在添加存在的元素时,会返回0,但会更新该元素的分数。

//Redisson的3.8.1版本
if (command == RedisCommands.EVAL_LONG) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,//步骤一:移除等待超时的线程"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +"end;" +//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID+ThreadID;"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]); " +"redis.call('zrem', KEYS[3], ARGV[2]); " +//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//步骤四:判断锁是否已经被当前线程持有,KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID+ThreadID;"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//步骤五:对获取锁失败的线程进行排队处理"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +"local ttl; " +//如果在队列中排队的第一个元素不是当前线程"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +//计算队列中第一个元素还有多少时间就超时了"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +"else " +"ttl = redis.call('pttl', KEYS[1]);" +"end; " +//计算当前线程等待锁的超时时间"local timeout = ttl + tonumber(ARGV[3]);" +//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间//然后再把当前线程作为一个元素插入队列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end; " +"return ttl;",Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),//KEYS[1]、KEYS[2]、KEYS[3]internalLockLeaseTime,//ARGV[1]getLockName(threadId),//ARGV[2]currentTime + threadWaitTime,//ARGV[3] = 当前时间 + 5秒currentTime//ARGV[4]);
}

注意:如果仅仅使用有序集合是不行的,因为有序集合的分数在lua脚本执行过程中也会发生变化。旧版本中,客户端线程每次尝试加锁,有序集合中的分数会更新。新版本中,当前线程可以尝试获取锁时,也会遍历更新有序集合中的分数。

此外,有序集合获取第一个元素的时间复杂度比队列要高。如果仅仅使用队列也是不行的,因为需要管理排队线程的等待超时时间。如果没有有序集合,那么就不能移除在队列中排队已超时的线程。当然,为了管理线程的等待超时时间,将有序集合换成两层Hash值也可以。

5.公平锁源码之队列重排

(1)新版本在5分钟后尝试再次加锁才会队列重排

新版本的公平锁中,获取锁失败的线程默认会进入队列最多等待5分钟。

在这5分钟内,该线程不管再次加锁多少次,都不会刷新队列排序和分数。

在这5分钟内,该线程没有进行再次加锁尝试,就会被移出队列和有序集合。所以5分钟后,该线程才尝试再次加锁,那么会重新入队,导致队列重排。

(2)旧版本在5秒后尝试再次加锁就会队列重排

旧版本的公平锁中,获取锁失败的线程默认会进入队列最多等待5秒钟。

在这5秒钟内,该线程只要重新尝试进行加锁,那么就会延长其最多等待时间,也就是刷新有序集合中的排队分数。

在这5秒钟内,该线程没有进行再次加锁尝试,就会被移出队列和有序集合。所以5秒钟后,该线程才尝试再次加锁,那么会重新入队,导致队列重排。

(3)导致队列重排的是lua脚本的步骤一(移除等待超时的线程)

也就是公平锁lua脚本中while循环的作用。

当客户端线程使用RedissonLock的tryAcquire()方法尝试获取公平锁,并且指定了一个获取锁的超时时间时。比如指定客户端线程在队列里排队超过了20秒,就不再尝试获取锁了。如果获取锁的超时时间没有指定,新版本是默认5分钟超时,旧版本是默认5秒后超时。

此时由于这些等待获取锁已超时的线程元素还存在队列和有序集合里,所以可以通过while循环的逻辑来清除这些不再尝试获取锁的客户端线程。

在新版本,随着时间推移,这些等待获取锁超时的线程就会被移出队列。在旧版本,随着时间推移,这些等待获取锁超时的线程只要不再尝试加锁,那么其等待获取锁的超时时间就不会更新被不断延长,就会被移除队列。

如果客户端宕机了,那么客户端就不会重新尝试获取锁。在新版本中,随着时间推移,宕机的客户端线程就会被移出队列。在旧版本中,就不会刷新和延长有序集合中的超时时间分数,这样while循环的逻辑就会将这些宕机的客户端线程从队列中移出。

在新版本中,最多5分钟后,宕机的客户端线程会被移出队列。在旧版本中,最多5秒钟后,宕机的客户端线程就会被移出队列。

因为网络延迟等原因,可能会导致客户端线程等待锁时间过长,从而触发各个客户端线程的排队顺序的重排序。有的客户端如果在队列里等待时间过长,可能就会触发一次队列的重排序。新版本触发重排序的频率是每5分钟,旧版本触发重排序的频率是每5秒。

//步骤一:移除等待超时的线程
"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +
"end;" +

6.公平锁源码之释放锁

(1)释放公平锁的流程

释放公平锁首先调用的还是RedissonLock的unlock()方法。

在RedissonLock的unlock()方法中,会调用get(unlockAsync())。也就是首先调用RedissonBaseLock的unlockAsync()方法,然后调用RedissonObject的get()方法。

其中个RedissonBaseLock的unlockAsync()方法是异步化执行的方法,释放锁的操作是异步执行的。而RedisObject的get()方法会通过RFuture同步等待获取异步执行的结果。所以,可以将get(unlockAsync())理解为异步转同步。

在RedissonBaseLock的unlockAsync()方法中,就会调用公平锁RedissonFairLock的unlockInnerAsync()方法进行释放锁。然后当完成释放锁的处理后,会通过异步去取消定时调度任务。

public class Application {public static void main(String[] args) throws Exception {Config config = new Config();config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");//创建RedissonClient实例RedissonClient redisson = Redisson.create(config);//获取公平的可重入锁RLock fairLock = redisson.getFairLock("myLock");fairLock.lock();fairLock.unlock();...}
}public class RedissonLock extends RedissonBaseLock {...@Overridepublic void unlock() {...//异步转同步//首先调用的是RedissonBaseLock的unlockAsync()方法//然后调用的是RedissonObject的get()方法get(unlockAsync(Thread.currentThread().getId()));...}...
}public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {...@Overridepublic RFuture<Void> unlockAsync(long threadId) {//异步执行释放锁的lua脚本RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {//取消定时调度任务cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);...
}public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;private final CommandAsyncExecutor commandExecutor;private final String threadsQueueName;private final String timeoutSetName;public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor, name, 60000*5);}public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.threadWaitTime = threadWaitTime;threadsQueueName = prefixName("redisson_lock_queue", name);timeoutSetName = prefixName("redisson_lock_timeout", name);}@Overrideprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//步骤一:移除等待超时的线程"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +"end;" +//步骤二:判断锁是否还存在,判断key为锁名的Hash值是否存在"if (redis.call('exists', KEYS[1]) == 0) then " +//获取队列中排第一的线程"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +//ARGV[1]为通知事件的类型"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; " +"end;" +//步骤二:判断锁是否还存在,判断key为UUID+线程ID的Hash值是否存在"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//对key为UUID+线程ID的Hash值还存递减1"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"end; " +"redis.call('del', KEYS[1]); " +"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +//发布一个事件给在队列中排第一的线程"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; ",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),LockPubSub.UNLOCK_MESSAGE,//ARGV[1]internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());}...
}

(2)释放公平锁的lua脚本分析

步骤一:移除等待超时的线程

首先也会进入while循环,移除等待超时的线程。即获取队列中排第一的线程,判断该线程的过期时间是否已小于当前时间。如果小于当前时间,那么就说明该线程在队列中的排队已经过期,于是便将该线程从有序集合 + 队列中移除。后续如果该线程再次尝试加锁,那么会重新排序 + 重新入队。

步骤二:判断锁是否还存在

如果key为锁名的Hash值已不存在,那么先获取队列中排第一的线程,然后发布一个事件给该线程对应的客户端让其获取锁。

如果key为锁名的Hash值还存在,那么判断field为UUID + 线程ID的映射是否存在。如果field为UUID + 线程ID的映射不存在,那么表示锁已经被释放了,直接返回nil。如果field为UUID + 线程ID的映射存在,那么在key为锁名的Hash值中,对field为UUID + 线程ID的value值递减1。也就是调用Redis的hincrby命令,进行递减1处理。

步骤三:对递减1后的结果进行如下判断处理

如果递减1后的结果大于0,表示线程还在持有锁。对应于持有锁的线程多次重入锁,此时需要重置锁的过期时间。

如果递减1后的结果小于0,表示线程不再持有锁,则删除锁对应的key,并且发布一个事件给在队列中排第一的线程所对应的客户端。

7.公平锁源码之按顺序依次加锁

假设客户端A先持有锁,而客户端B在队列里面是排在客户端C的后面。那么如果客户端A释放了锁后,客户端B和C是如何按顺序加锁的。

(1)锁被释放后,排第二的客户端线程先来加锁

锁被客户端A释放掉,锁key被删除之后,客户端B先来进行尝试加锁。此时客户端B执行的lua脚本步骤二的逻辑:

//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +...
"end;"

首先,执行判断"exists myLock = 0",由于当前锁存在,所以条件不成立。

然后,执行判断"exists redisson_lock_queue:{myLock} = 0",由于队列存在,所以条件不成立。

接着,执行判断"lindex redisson_lock_queue:{myLock} 0 == UUID2:ThreadID2",由于队列存在,但是在队列中排第一的不是客户端B而是客户端C,所以条件不成立,客户端B无法加锁。

由此可见:即使锁释放掉后,多个客户端来尝试加锁也只认队列中排第一的客户端。从而实现按队列的顺序依次获取锁,保证了公平性。

(2)锁被释放后,排第一的客户端线程再来加锁

当在队列中排第一的客户端C此时过来尝试加锁时,就会执行如下步骤三的尝试加锁逻辑:

//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//remove this thread from the queue and timeout set//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//decrease timeouts for all waiting in the queue//递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间//zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])//ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//acquire the lock and set the TTL for the lease//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;"

首先,执行命令"lpop redisson_lock_queue:{myLock}",将队列中的第一个元素弹出来。

然后,执行命令"zrem redisson_lock_timeout:{myLock} UUID3:ThreadID3",将有序集合中客户端C的线程对应的元素给删除掉。

接着,执行"hset myLock UUID3:ThreadID3 1"进行加锁,设置field为UUID + 线程ID的value值为1。

最后,执行命令"pexpire myLock 30000",设置key为锁名的Hash值的过期时间为30000毫秒。

客户端C完成加锁后,客户端C就会从队列中出队,此时排在队头的就是客户端B。

文章转载自:东阳马生架构

原文链接:分布式锁—3.Redisson的公平锁 - 东阳马生架构 - 博客园

体验地址:JNPF快速开发平台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/95462.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/95462.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

上网行为安全概述和组网方案

一、上网行为安全概述1. 背景与需求互联网的双刃剑特性&#xff1a;网络普及改变工作生活方式&#xff0c;业务向互联网迁移。缺乏管理导致风险&#xff1a;带宽滥用、监管困难、信息泄露、网络违法、安全威胁。核心问题&#xff1a;带宽滥用&#xff1a;P2P/流媒体占用70%带宽…

某处卖600的【独角仙】尾盘十分钟短线 尾盘短线思路 手机电脑通用无未来函数

通达信指标【独角仙】尾盘十分钟套装-主图-副图-选古指标&#xff0c;支持手机电脑使用。在股市收盘的前十分钟第二天冲高卖出&#xff0c;信号可以盘中预警也可以尾盘选股&#xff0c;如果要保证信号固定建议是尾盘选股即可&#xff0c;当天信号固定后&#xff0c;不会产生漂移…

日志数据链路的 “搬运工”:Flume 分布式采集的组件分工与原理

flume详解&#xff1a;分布式日志采集的核心原理与组件解析 在大数据体系中&#xff0c;日志采集是数据处理的第一步。Flume 作为 Apache 旗下的分布式日志采集工具&#xff0c;以高可用、高可靠、易扩展的特性&#xff0c;成为处理海量日志数据的首选方案。本文将从 Flume 的…

大消费新坐标中的淘宝大会员

一站式消费需要一站式权益。作者|古廿编辑|杨舟淘宝的大会员体系落地了。8月6日&#xff0c;淘宝首次整合饿了么、飞猪等阿里系平台资源&#xff0c;推出覆盖购物、外卖、出行、旅游的一体化会员体系——用户在三大平台的消费&#xff0c;都能累积淘气值&#xff0c;根据淘气值…

MIME(多用途互联网邮件扩展)

MIME&#xff08;Multipurpose Internet Mail Extensions&#xff09; MIME 是 多用途互联网邮件扩展 的缩写&#xff0c;它最初是为了解决传统电子邮件只能传输纯文本的局限性而设计的&#xff0c;后来逐渐成为互联网中 数据格式标识与传输 的通用标准&#xff0c;被广泛应用…

PHP imagick扩展安装以及应用

Date: 2025-08-13 10:48:12 author: lijianzhan php_imagick是PHP的一个强大的扩展模块&#xff0c;用于调用ImageMagick图像处理库的功能&#xff0c;支持处理JPEG、PNG、GIF等超过185种格式的图像&#xff0c;实现缩放、旋转、动画生成等操作&#xff0c;常用于网页图片动态生…

2025年度14款CRM销售管理系统横向评测

本文深入对比了以下14款CRM销售管理软件&#xff1a;1.纷享销客&#xff1b; 2.Zoho CRM&#xff1b; 3.红圈销售&#xff1b; 4.销帮帮&#xff1b; 5.Salesforce&#xff1b; 6.Pipedrive&#xff1b; 7.Microsoft Dynamics 365&#xff1b; 8.悟空 CRM&#xff1b; 9.励销云…

akamai鼠标轨迹

各位肯定被akamai鼠标轨迹、点击事件、键盘事件&#xff0c;网页交互困扰 那么我们就研究一下鼠标轨迹、点击事件AST解混淆, 拿到解混淆后的代码&#xff0c; 如下&#xff0c;sensor_data就是我们要搞的参数 如何解混淆这里就不赘述了&#xff0c;需要的可以看我上一篇文章&am…

飞算JavaAI开发全流程解析:从自然语言到可运行工程的智能进化

引言 在数字经济时代&#xff0c;企业级应用开发面临着需求多变、交付周期紧、质量要求高的三重挑战。传统Java开发模式依赖人工进行需求确认、架构设计、代码编写和测试验证&#xff0c;导致开发效率低下、沟通成本高企。据统计&#xff0c;一个中等规模的项目需要平均8周完成…

垃圾回收标记算法:三色标记

文章目录1 三色标记流程1.1 初始标记1.2 并发标记1.3 重新标记1.4 清除阶段&#xff08;Sweep&#xff09;1.5 为什么初始标记和重新标记需要STW&#xff0c;而并发标记不需要?2 并发标记的写屏障3 多标问题4.漏标问题4.1 漏标的两个必要条件4.2 解决方案一&#xff1a;增量更…

反射的详解

目录一、反射1.JDK,JRE,JVM的关系2.什么是反射3. 三种获取Class对象(类的字节码)的方式4.Class常用方法5. 获取类的构造器6.反射获取成员变量&使用7.反射获取成员方法8.综合例子一、反射 1.JDK,JRE,JVM的关系 三者是Java运行环境的核心组成部分&#xff0c;从包含关系上看…

Grafana Tempo日志跟踪平台

以下是Grafana Tempo文档的总结&#xff08;基于最新版文档内容&#xff09;&#xff1a; 核心概念 分布式追踪系统&#xff1a;Tempo是开源的分布式追踪后端&#xff0c;专注于高吞吐量、低成本存储和与现有监控生态的深度集成 架构组成&#xff1a; Distributor&#xff1a…

Qt基本控件

Qt 的基本控件是构建用户界面的基础&#xff0c;涵盖了按钮、输入框、容器、显示组件等&#xff0c;适用于传统 Widget 开发&#xff08;基于 QWidget&#xff09;。以下是常用基本控件的分类总结&#xff1a;一、按钮类控件用于触发交互操作&#xff0c;如提交、取消、选择等。…

用Voe3做AI流量视频,条条10W+(附提示词+白嫖方法)

最近 AI 视频的风从大洋彼岸吹过来&#xff0c;Voe3 的技术升级&#xff0c;诞生了很多很有意思的玩法。 比如&#xff1a;AI ASMR 切水果解压视频&#xff0c;卡皮巴拉旅行博主、雪怪 AI Vlog&#xff0c;动物奥运会、第一人称视角穿越古战场直播。 这些视频的流量很好&…

嵌入式学习的第四十八天-中断+OCP原则

一、GIC通用中断控制器 1.GIC通用中断控制器 GIC 是 ARM 公司给 Cortex-A/R 内核提供的一个中断控制器&#xff0c;GIC接收众多外部中断&#xff0c;然后对其进行处理&#xff0c;最终通过VFIQ、VIRQ、FIQ 和 IRQ给内核&#xff1b;这四个 信号的含义如下&#xff1a; VFIQ:虚拟…

一周学会Matplotlib3 Python 数据可视化-绘制条形图(Bar)

锋哥原创的Matplotlib3 Python数据可视化视频教程&#xff1a; 2026版 Matplotlib3 Python 数据可视化 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 课程介绍 本课程讲解利用python进行数据可视化 科研绘图-Matplotlib&#xff0c;学习Matplotlib图形参数基本设置&…

阿里研发效能提升【60篇】

阿里研发效能提升【60篇】 1、建立研发效能提升的系统框架 01、《从DevOps到BizDevOps&#xff0c;研发效能提升的系统方法》 视频版&#xff1a;2021云栖大会云效BizDevOps论坛 文字版&#xff1a;深度 | 从DevOps到BizDevOps, 研发效能提升的系统方法-阿里云开发者社区 …

面试实战 问题二十六 JDK 1.8 核心新特性详解

JDK 1.8 核心新特性详解 1. Lambda表达式 最核心的特性&#xff0c;简化函数式编程&#xff0c;语法&#xff1a;(参数) -> 表达式 // 传统方式 Runnable r1 new Runnable() {Overridepublic void run() {System.out.println("传统方式");} };// Lambda方式 Runn…

STM32H743开发周记问题汇总(串口通讯集中)

溢出错误出现的串口接收过程中&#xff0c;中断接收在溢出后无法进入&#xff0c;需要重点考虑溢出问题&#xff0c;以下是溢出恢复代码波特率115200 优先级0-1 高于定时器 初步诊断是数据流导致的接收溢出问题/*** brief 检查并清除UART溢出错误&#xff08;带状态…

Linux中FTP配置与vsftpd服务部署指南

Linux中FTP配置与vsftpd服务部署指南 一、FTP 核心概念 1、基本定义 文件传输协议&#xff08;FTP&#xff09;&#xff0c;基于 C/S模式 工作。控制端口&#xff1a;21&#xff08;身份验证与指令传输&#xff09; 数据端口&#xff1a;20&#xff08;主动模式数据传输&#x…