Redis分布式锁实现

概述

为什么要要分布式锁

在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。
通常,我们以synchronized 、Lock来使用它。Java中的锁,只能保证在同一个JVM进程内中执行
如果需要在分布式集群环境下的话,便需要分布式锁

分布式锁/线程锁/进程锁区别

分布式锁:当多个进程不在同一个系统中(jvm),用分布式锁控制多个进程对资源的访问

线程锁:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段

  • 线程锁只在同一JVM中有效果
  • 因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。

进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁

分布式锁的使用场景

虽然线程间并发问题和进程间并发问题都可以通过分布式锁解决的,但是不推荐这样去做,因为采用分布式锁解决这些小问题是非常消耗资源

分布式锁应该用来解决分布式情况下的多进程并发的问题才最合适

情境:线程A和线程B都共享某个变量X。

  • 如果是单机情况下(单JVM),线程之间共享内存,只要使用线程锁就可以解决并发问题。
  • 如果是分布式情况下(多JVM),线程A和线程B很可能不是在同一JVM中,这样线程锁就无法起到作用了,这时候就要用到分布式锁来解决

分布式锁实现逻辑

分布式锁实现的关键是:在分布式的应用服务器外,搭建一个存储服务器,存储锁的信息

实现要点:

  • 锁信息需要设置过期超时的,不能让一个线程长期占有一个锁而导致死锁
  • 同一时刻只有一个线程可以获取到锁

实现方式:

  1. 数据库乐观锁
  2. 基于Redis的分布式锁

    使用 Redis 实现锁,主要是将状态放到 Redis 当中,利用其原子性,当其他线程访问时,如果 Redis 中已经存在这个状态,就不允许之后的一些操作

  3. 基于ZooKeeper的分布式锁

分布式锁实现要求

锁的实现同时满足以下四个条件

  1. 互斥性在任意时刻,只有一个客户端能持有锁
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。也就是设置一个超时时间
  3. 具有容错性只要大部分的Redis节点正常运行,客户端就可以加锁和解锁
  4. 解铃还须系铃人加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

实现redis分布式锁需要的命令/API

redis命令

# “set if not exits”,若该key-value不存在,则成功加入缓存并且返回1,否则返回0。
setnx(key, value)# 获得key对应的value值,若不存在则返回nil。
get(key)# 先获取key对应的value值,若不存在则返回nil,然后将旧的value更新为新的value。
getset(key, value)# 设置key-value的有效期为seconds秒。
expire(key, seconds)

Set

语法:

  • key: 要设置的键。
  • value: 与键关联的值。
  • EX seconds: 设置键的过期时间(以秒为单位)。
  • PX milliseconds: 设置键的过期时间(以毫秒为单位)。
  • NX: 仅在键不存在时设置键的值。
  • XX: 仅在键已经存在时设置键的值
SET key value [EX seconds] [PX milliseconds] [NX|XX]

使用示例

# 基本使用
SET mykey "Hello, Redis!"# 条件设置(仅在键不存在时保存, 如果 mykey 不存在,返回 (nil)。)
SET mykey "Hello, Redis!" NX  # 条件更新(仅在键已经存在时)
SET mykey "New Value" XX# 设置过期时间(以毫秒为单位) 5000毫秒后过期
SET mykey "Hello, Redis!" PX 5000 
# 设置过期时间(以秒为单位) 5000秒后过期
SET mykey "Hello, Redis!" EX 5000 

总结:

  • SET 命令 是 Redis 中最常用的命令之一,用于存储键值对
  • 通过 EX 和 PX 设置过期时间
  • 通过 NX 和 XX 控制设置的条件`。

Del

在 Redis 中,DEL 命令用于删除一个或多个键。这个命令可以用来清除不再需要的数据。
参数说明

  • key: 要删除的键,可以指定一个或多个键。

返回值:返回被删除的键的数量。如果指定的键不存在,则不会报错,返回值仍然是被删除的键的数量

DEL key [key ...]

示例

# 删除单个键。如果 mykey 存在,返回 1;如果不存在,返回 0。
DEL mykey# 删除多个键。如果 key1、key2 和 key3 中的某些键存在,返回被删除的键的数量。
DEL key1 key2 key3

注意事项

  • 使用 DEL 命令时,如果键不存在,不会报错,返回值仍然是被删除的键的数量
  • DEL 命令是一个 O(1) 操作,但在删除大量键时,可能会影响性能

call

在 Redis 中,CALL 命令并不是一个直接的命令,而是 Lua 脚本中用于调用 Redis 命令的函数

通过 redis.call,你可以在 Lua 脚本中执行 Redis 的原生命令。

redis.call('COMMAND_NAME', arg1, arg2, ...)

详细说明

  • KEYS: 在脚本中,KEYS 是一个数组,包含传递给脚本的所有键
  • ARGV: 你也可以使用 ARGV 数组来传递额外的参数

示例

-- Lua 脚本:将两个键的值相加并返回结果
local value1 = redis.call('GET', KEYS[1])  -- 获取第一个键的值
local value2 = redis.call('GET', KEYS[2])  -- 获取第二个键的值
return value1 + value2  -- 返回两个值的和

总结

  • redis.call 是在 Lua 脚本中执行 Redis 命令的方式。
  • 通过 KEYS 和 ARGV 数组,可以灵活地传递键和参数
  • Lua 脚本的执行是原子性的,可以提高操作的效率

Jedis 接口

pom依赖

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version>
</dependency>

eval()

在 Jedis 中,eval 方法用于执行 Lua 脚本

通过这个方法,你可以在 Redis 服务器上运行 Lua 脚本,从而实现原子操作和复杂的逻辑处理。

String result = jedis.eval(String script, List<String> keys, List<String> args);

参数说明

  1. script: 要执行的 Lua 脚本,作为字符串传入
  2. keys: 需要在脚本中使用的键的列表。键的数量可以在脚本中通过 KEYS 表达式访问。
  3. args: 传递给脚本的参数列表。参数的数量可以在脚本中通过 ARGV 表达式访问。

注意事项

  • 原子性: Lua 脚本在 Redis 中是原子执行的,这意味着在脚本执行期间,其他命令不会干扰。
  • 性能: 使用 Lua 脚本可以减少网络往返次数,提高性能,尤其是在需要执行多个命令时。
  • 调试: Lua 脚本的调试相对较难,因此在编写时要确保逻辑正确。

示例

import redis.clients.jedis.Jedis;public class RedisLuaExample {public static void main(String[] args) {Jedis jedis = new Jedis("localhost");// Lua 脚本:将两个键的值相加并返回结果String script = "return redis.call('GET', KEYS[1]) + redis.call('GET', KEYS[2])";// 需要使用的键List<String> keys = Arrays.asList("key1", "key2");// 执行脚本String result = jedis.eval(script, keys, Collections.emptyList()).toString();System.out.println("Result: " + result);jedis.close();}
}

API(Springboot)

Redisson Pom依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.10.1</version>
</dependency>

redis pom依赖

<!-- 引入redis依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

spring boot使用Redis的操作主要是通过RedisTemplate(或StringRedisTemplate )来实现

RedisTemplate和StringRedisTemplate的区别

RedisTemplate和StringRedisTemplate的区别:

  • 两者的关系是StringRedisTemplate继承RedisTemplate
  • 两者的数据是不共通的;也就是说StringRedisTemplate只能管理StringRedisTemplate里面的数据,RedisTemplate只能管理RedisTemplate中的数据
  • SDR默认采用的序列化策略有两种,一种是String的序列化策略,一种是JDK的序列化策略。
  • StringRedisTemplate默认采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的(StringRedisSerializer)。
  • RedisTemplate默认采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。(JdkSerializationRedisSerializer)

总结:

  • StringRedisTemplate:当你的redis数据库里面本来存的是字符串数据或者你要存取的数据就是字符串类型数据的时候

    Redis当中的数据值是以数组形式显示出来的时候,只能使用RedisTemplate才能获取到里面的数据

  • RedisTemplate:但是如果你的数据是复杂的对象类型,而取出的时候又不想做任何的数据转换,直接从Redis里面取出一个对象。

    Redis当中的数据值是以可读形式显示出来的时候,只能使用StringRedisTemplate才能获取到里面的数据

redisTemplate

// 将锁状态放入 Redis:setIfAbsent如果键不存在则新增,存在则不改变已经有的值。
redisTemplate.opsForValue().setIfAbsent("lockkey", "value"); 
// 设置锁的过期时间
redisTemplate.expire("lockkey", 30000, TimeUnit.MILLISECONDS);//spring-data-redis 2.1 之后版本,加锁的同时设置过期时间,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);// 删除/解锁
redisTemplate.delete("lockkey");// 获取锁
redisTemplate.opsForValue().get("lockkey");

StringRedisTemplate

@Autowired
private StringRedisTemplate stringRedisTemplate;//在设置值的同时指定过期时间, 时间单位 s
stringRedisTemplate.opsForValue().set("key","value",7200, TimeUnit.SECONDS);//删除key对应的键值对
stringRedisTemplate.opsForValue().delete("key");//获取对应key的value
stringRedisTemplate.opsForValue().get("key");

实现redis分布式锁

单节点Redis的分布式锁

如果你的项目中Redis是多机部署的,那么可以尝试使用Redisson实现分布式锁,

加锁 实现

加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间

但是不建议分别使用加锁和设置超时这两个命令去设置值和过期时间,因为违背了原子性,也就是一旦锁被创建,而没有设置过期时间,则锁会一直存在

Jedis 实现
参数说明

第一个为key,我们使用key来当锁,因为key是唯一的

第二个为value,我们传的是requestId,可靠性保证,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成

第三个为NX(键不存在时设置值),这个参数的意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;表示仅在键不存在时设置值

redis在set时,如果原先有值,SET 命令会返回 "OK"

第四个为PX(毫秒为单位),这个参数我们传的是PX,意思是我们要给这个key加一个过期时间的设置,具体时间由第五个参数决定

第五个为expireTime,与第四个参数相呼应,代表key的过期时间大小

代码实现

public class RedisTool {private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";/*** 尝试获取分布式锁** @param jedis      Redis客户端* @param lockKey    锁* @param requestId  请求标识* @param expireTime 超期时间* @return 是否获取成功*/public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {return true;}return false;}
}
redis命令
-- 设置一个键 myLock,值为 12345,并且希望在 5000 毫秒后过期
SET myLock 12345 NX PX 5000
springBoot

但是在spring-data-redis 2.1 之后的版本,便可以直接设置过期时间了

spring-data-redis 2.1 前

spring-data-redis 2.1 前的版本:在jedis当中是有这种原子操作的方法的,需要通过 RedisTemplate 的 execute 方法获取到jedis里操作命令的对象设置

String result = template.execute(new RedisCallback<String>() {@Overridepublic String doInRedis(RedisConnection connection) throws DataAccessException {JedisCommands commands = (JedisCommands) connection.getNativeConnection();return commands.set(key, "锁定的资源", "NX", "PX", 3000);}
});
spring-data-redis 2.1 后

spring-data-redis 2.1 之后的版本

//加锁的同时设置过期时间,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);

解锁实现

解锁的过程就是将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉

为什么不直接删除锁?这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的

Jedis实现
代码
public class RedisTool {private static final Long RELEASE_SUCCESS = 1L;/*** 释放分布式锁** @param jedis     Redis客户端* @param lockKey   锁* @param requestId 请求标识* @return 是否释放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {return true;}return false;}
}
为什么不直接删除

为什么不直接使用jedis.del()方法删除锁?这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的

 jedis.del(lockKey);
为什么不判断是后在删除

如果想判断是不是这个客户端的锁,再去解锁行不行不行,如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。

比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了

 // 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {// 若在此时,这把锁突然不是这个客户端的,则会误解锁jedis.del(lockKey);
}
Springboot
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {// 生成一个唯一的 UUID 作为锁的标识String uuid = UUID.randomUUID().toString();// 获取 Redis 的 ValueOperations 对象ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();// 尝试在 Redis 中设置锁,过期时间为 5 秒Boolean lock = ops.setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS);// 如果成功获取到锁if (lock) {// 获取分类数据Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();// 获取当前锁的值String lockValue = ops.get("lock");// Lua 脚本,用于安全释放锁String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +"    return redis.call(\"del\",KEYS[1])\n" +"else\n" +"    return 0\n" +"end";// 执行 Lua 脚本,释放锁stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);// 返回获取的分类数据return categoriesDb;} else {// 如果未能获取到锁,线程休眠 100 毫秒try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace(); // 打印异常堆栈}// 递归调用,尝试再次获取锁return getCatalogJsonDbWithRedisLock();}
}

多节点的redis分布式锁

概述

如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态

为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期

默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了

高并发场景下的问题

高并发场景下如下问题:

  • 主从切换后,原从库被推举为主库,当在其他请求加锁的时候,连接的redis可能还没有同步到第一次加的锁,造成锁失效。
  • 主库发生故障,加锁完成,还未同步到从节点或者集群中其他节点的时候,当前节点挂掉,锁就丢失了。
  • 两种情况导致出现的原因就是redis的数据同步是异步的

Redisson

相对于Jedis而言,Redisson强大很多。当然了,随之而来的就是它的复杂性。它里面也实现了分布式锁,而且包含多种类型的锁

具体内容:分布式锁和同步器

可重入锁(Reentrant Lock)示例

下述已可重入锁(Reentrant Lock)示例,获取客户端进行加解锁操作如下

public static void main(String[] args) {// 创建 Redis 配置对象Config config = new Config();// 设置 Redis 服务器地址config.useSingleServer().setAddress("redis://127.0.0.1:6379");// 设置 Redis 服务器密码config.useSingleServer().setPassword("redis1234");// 创建 Redisson 客户端实例final RedissonClient client = Redisson.create(config);  // 获取名为 "lock1" 的分布式锁RLock lock = client.getLock("lock1");try {// 尝试获取锁,最多等待 10 秒,锁定 30 秒if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {try {// 在此处执行需要加锁的操作// 例如:处理共享资源或执行关键业务逻辑} finally {// 确保在操作完成后释放锁lock.unlock(); }} else {// 如果无法获取锁,输出提示信息System.out.println("无法获取锁,操作被跳过");}} catch (InterruptedException e) {// 如果线程被中断,恢复中断状态Thread.currentThread().interrupt(); // 输出中断信息System.out.println("线程被中断");}
}
加锁

调用lock方法,定位到lockInterruptibly。在这里,完成了加锁的逻辑。

代码
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 当前线程的 IDlong threadId = Thread.currentThread().getId();// 尝试获取锁,返回值为锁的剩余时间(TTL)Long ttl = tryAcquire(leaseTime, unit, threadId);// 如果 ttl 为空,则证明获取锁成功if (ttl == null) {return; // 成功获取锁,直接返回}// 如果获取锁失败,则订阅到对应这个锁的 channelRFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future); // 同步订阅,等待锁的释放try {while (true) {// 再次尝试获取锁ttl = tryAcquire(leaseTime, unit, threadId);// ttl 为空,说明成功获取锁,跳出循环if (ttl == null) {break; // 成功获取锁,退出循环}// ttl 大于 0 则等待 ttl 时间后继续尝试获取if (ttl >= 0) {// 尝试在 ttl 时间内获取锁getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// ttl 小于 0,表示无限期等待getEntry(threadId).getLatch().acquire();}}} finally {// 取消对 channel 的订阅,确保资源释放unsubscribe(future, threadId);}// get(lockAsync(leaseTime, unit)); // 可能是异步获取锁的逻辑,注释掉的部分
}// 两种处理方式,一种是带有过期时间的锁,一种是不带过期时间的锁。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {// 如果带有过期时间,则按照普通方式获取锁if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 先按照 30 秒的过期时间来执行获取锁的方法RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return; // 如果获取锁失败,直接返回}Long ttlRemaining = future.getNow();// 锁已成功获取if (ttlRemaining == null) {// 开启定时任务以刷新锁的过期时间scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture; // 返回异步获取锁的结果
}// 正执行获取锁的逻辑,它是一段LUA脚本代码,hash数据结构。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     long threadId, RedisStrictCommand<T> command) {// 将过期时间转换为毫秒internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// 如果锁不存在,则通过 hset 设置它的值,并设置过期时间"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果锁已存在,并且锁是当前线程,则通过 hincrby 给数值递增 1"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果锁已存在,但并非本线程,则返回过期时间 ttl"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
流程图
存在
不存在
是当前线程
不是当前线程
失败
获取失败
获取成功
获取成功
开始
尝试获取锁
判断锁是否存在
设置值和过去时间
加锁成功
结束
判断锁是否为当前线程
1:锁的值递增加1, 2:设置过去时间
加锁失败,返回当前锁的过期时间
获取成功/失败
订阅对应锁的channel
再次尝试获取锁
获取成功/失败
等待channel是否锁信息
解锁

通过调用unlock方法来解锁。

代码如下
// 这是一个公开的异步解锁方法,接受当前线程的 ID 作为参数,返回一个 RFuture<Void> 对象。
public RFuture<Void> unlockAsync(final long threadId) {// 创建一个 Promise 对象,用于异步操作的结果final RPromise<Void> result = new RedissonPromise<Void>();// 调用解锁的内部异步方法RFuture<Boolean> future = unlockInnerAsync(threadId);// 添加监听器以处理解锁操作的结果future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {// 检查解锁操作是否成功if (!future.isSuccess()) {// 如果失败,取消过期时间的续期任务cancelExpirationRenewal(threadId);// 将失败原因传递给 Promiseresult.tryFailure(future.cause());return;}// 获取解锁操作的返回值Boolean opStatus = future.getNow();// 如果返回值为空,表示当前线程未持有锁,抛出异常if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}// 解锁成功,取消刷新过期时间的定时任务if (opStatus) {cancelExpirationRenewal(null);}// 将成功结果传递给 Promiseresult.trySuccess(null);}});// 返回 Promise 对象return result;
}// 执行 Redis 脚本:使用 evalWriteAsync 方法执行 Lua 脚本,进行原子操作。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,//如果锁已经不存在, 发布锁释放的消息"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//通过hincrby递减1的方式,释放一次锁//若剩余次数大于0 ,则刷新过期时间"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +//否则证明锁已经释放,删除key并发布锁释放的消息"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}
流程图
存在
同一个线程
小于等于0
大于0
不是同一个线程
不存在
开始
exist判断锁是否存在
判断解锁线程和当前线程是否为同一个线程
递减解锁,获取剩余次数
剩余次数是否大于等于0
删除key,并发布锁释放信息
结束
解锁成功
解锁失败

redlock

使用流程

redlock的使用流程大致如下:

  1. 客户端获取到当前的时间戳。
  2. 客户端按顺序向部署的N个Redis实例执行加锁操作。在设定时间内,不管加锁成功还是失败,都会继续向下一个实例申请加锁操作
  3. 加锁成功的实例个数>= (N/2) + 1,并且加锁的总耗时要<锁设定的过期时间,Redlock就判断加锁成功,反之就是加锁失败。
  4. 加锁成功了,就继续往下操作,比如操作MySQL资源;若加锁失败,则会向所有节点发起锁释放的操作请求。
设计规则

Redlock的设计规则就是:

  • 客户端要在所有实例上申请加锁,只有保证大多数节点加锁成功了才判定为加锁成功
  • 加锁的总耗时要 < 锁设定的过期时间
  • 释放锁的时候,要向所有节点发起锁释放的请求,不管之前加锁是否成功为了确保只释放自己的锁,需要用前面提到的 Lua 脚本来代替直接使用 DEL 命令进行解锁操作
代码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 实现要点之允许加锁失败节点限制(N-(N/2+1))int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());// 实现要点之遍历所有节点通过EVAL命令执行lua加锁for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {// 对节点尝试加锁lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);} catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 抛出异常表示获取锁失败lockAcquired = false;}if (lockAcquired) {// 成功获取锁集合acquiredLocks.add(lock);} else {// 如果达到了允许加锁失败节点限制,那么break,即此次Redlock加锁失败if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}               }}return true;
}
RedLock 红锁 已经废弃

RedLock 红锁:RedLock 会对集群的每个节点进行加锁,如果大多数(N/2+1)加锁成功了,则认为获取锁成功

  • 这个过程中可能会因为网络问题,或节点超时的问题,影响加锁的性能,故而在最新的 Redisson 版本中中已经正式宣布废弃 RedLock

redisTemplate

为了解决多节点的上述问题,可以使用redisTemplate中的setIfAbsent方法

setIfAbsent方法是原子性的

  • 单个 Redis 实例:在单个 Redis 实例中,setIfAbsent 是原子操作,确保在键不存在时才会设置值。
  • Redis 集群:在 Redis 集群中,setIfAbsent 仍然是原子操作,但它只在同一个分片(slot)内有效。如果不同的节点(分片)之间存在竞争条件,可能会导致不一致的结果

可以在这个方法中,构造一个和可重用锁差不多的代码,及判断当前线程是否为加锁线程,去实现多节点先的分布式锁

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/80464.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件设计师-错题笔记-网络基础知识

1. 解析&#xff1a; 1.子网划分相关知识&#xff1a; 在IPv4地址中&#xff0c;/27表示子网掩码为255.255.255.224&#xff0c;它将一个C类网络&#xff08;默认子网掩码255.255.255.0&#xff09;进一步划分 对于子网掩码255.255.255.224&#xff0c;其对应的二进制为111…

Fine-Tuning Llama2 with LoRA

Fine-Tuning Llama2 with LoRA 1. What is LoRA?2. How does LoRA work?3. Applying LoRA to Llama2 models4. LoRA finetuning recipe in torchtune5. Trading off memory and model performance with LoRAModel ArgumentsReferences https://docs.pytorch.org/torchtune/ma…

python打卡day29

类的装饰器 知识点回顾 类的装饰器装饰器思想的进一步理解&#xff1a;外部修改、动态类方法的定义&#xff1a;内部定义和外部定义 回顾一下&#xff0c;函数的装饰器是 &#xff1a;接收一个函数&#xff0c;返回一个修改后的函数。类也有修饰器&#xff0c;类装饰器本质上确…

十一、STM32入门学习之FREERTOS移植

目录 一、FreeRTOS1、源码下载&#xff1a;2、解压源码 二、移植步骤一&#xff1a;在需要移植的项目中新建myFreeRTOS的文件夹&#xff0c;用于存放FREERTOS的相关源码步骤二&#xff1a;keil中包含相关文件夹和文件引用路径步骤三&#xff1a;修改FreeRTOSConfig.h文件的相关…

2025 年十大网络安全预测

随着我们逐步迈向 2026 年&#xff0c;网络安全领域正处于一个关键的转折点&#xff0c;技术创新与数字威胁以前所未有的复杂态势交织在一起。 地缘政治环境进一步加剧了这些网络安全挑战&#xff0c;国际犯罪组织利用先进的技术能力来追求战略目标。 人工智能在这一不断演变…

Mac 环境下 JDK 版本切换全指南

概要 在 macOS 上安装了多个 JDK 后&#xff0c;可以通过系统自带的 /usr/libexec/java_home 工具来查询并切换不同版本的 Java。只需在终端中执行 /usr/libexec/java_home -V 列出所有已安装的 JDK&#xff0c;然后将你想使用的版本路径赋值给环境变量 JAVA_HOME&#xff0c;…

中级网络工程师知识点6

1.堆叠方式可以共享使用交换机背板带宽&#xff1b;级联方式可以使用双绞线将交换机连接在一起 2.光功率计是专门测量光功率大小的仪器&#xff0c;在对光缆进行检测时&#xff0c;通过在光缆的发送端和接收端分别测量光功率&#xff0c;进而计算出光衰情况。 3.光时域反射计…

动态规划——乌龟棋

题目描述 解题思路 首先这是一个很明显的线性dp的题目&#xff0c;很容易发现规律 数据输入 我们用 h[ N ] 数组存储每一个格子的分数 用 cnt [ ]&#xff0c;数组表示每一中卡片的数目 1&#xff0c;状态表示 因为这里一个有4种跳跃方式可以选择 f[ i ][ a ][ b ][ c ][ d…

C#自定义控件-实现了一个支持平移、缩放、双击重置的图像显示控件

1. 控件概述 这是一个继承自 Control 的自定义控件&#xff0c;主要用于图像的显示和交互操作&#xff0c;具有以下核心功能&#xff1a; 图像显示与缩放&#xff08;支持鼠标滚轮缩放&#xff09;图像平移&#xff08;支持鼠标拖拽&#xff09;视图重置&#xff08;双击重置…

C++ map multimap 容器:赋值、排序、大小与删除操作

概述 map和multimap是C STL中的关联容器&#xff0c;它们存储的是键值对(key-value pairs)&#xff0c;并且会根据键(key)自动排序。两者的主要区别在于&#xff1a; map不允许重复的键multimap允许重复的键 本文将详细解析示例代码中涉及的map操作&#xff0c;包括赋值、排…

AI Agent开发第70课-彻底消除RAG知识库幻觉(4)-解决知识库问答时语料“总重复”问题

开篇 “解决知识库幻觉”系列还在继续,这是因为:如果只是个人玩玩,像自媒体那些说的什么2小时搭一个知识库+deepseek不要太香一类的RAG或者是基于知识库的应用肯定是没法用在企业级落地上的。 我们真的经历过或者正在经历的人都是知道的,怎么可能2小时就搭建完成一个知识…

【DAY22】 复习日

内容来自浙大疏锦行python打卡训练营 浙大疏锦行 仔细回顾一下之前21天的内容 作业&#xff1a; 自行学习参考如何使用kaggle平台&#xff0c;写下使用注意点&#xff0c;并对下述比赛提交代码 kaggle泰坦里克号人员生还预测

【Docker】Docker Compose方式搭建分布式协调服务(Zookeeper)集群

开发分布式应用时,往往需要高度可靠的分布式协调,Apache ZooKeeper 致力于开发和维护开源服务器&#xff0c;以实现高度可靠的分布式协调。具体内容见zookeeper官网。现代应用往往使用云原生技术进行搭建,如何用Docker搭建Zookeeper集群,这里介绍使用Docker Compose方式搭建分布…

若依框架Consul微服务版本

1、最近使用若依前后端分离框架改造为Consul微服务版本 在这里分享出来供大家参考 # Consul微服务配置参数已经放置/bin/Consul微服务配置目录 仓库地址&#xff1a; gitee&#xff1a;https://gitee.com/zlxls/Ruoyi-Consul-Cloud.git gitcode&#xff1a;https://gitcode.c…

BOM知识点

BOM&#xff08;Browser Object Model&#xff09;即浏览器对象模型&#xff0c;是用于访问和操作浏览器窗口的编程接口。以下是一些BOM的知识点总结&#xff1a; 核心对象 • window&#xff1a;BOM的核心对象&#xff0c;代表浏览器窗口。它也是全局对象&#xff0c;所有全…

什么是迁移学习(Transfer Learning)?

什么是迁移学习&#xff08;Transfer Learning&#xff09;&#xff1f; 一句话概括 迁移学习研究如何把一个源领域&#xff08;source domain&#xff09;/源任务&#xff08;source task&#xff09;中获得的知识迁移到目标领域&#xff08;target domain&#xff09;/目标任…

[创业之路-362]:企业战略管理案例分析-3-战略制定-华为使命、愿景、价值观的演变过程

一、华为使命、愿景、价值观的演变过程 1、创业初期&#xff08;1987 - 1994 年&#xff09;&#xff1a;生存导向&#xff0c;文化萌芽 使命愿景雏形&#xff1a;1994年华为提出“10年之后&#xff0c;世界通信行业三分天下&#xff0c;华为将占一份”的宏伟梦想&#xff0c…

Python黑魔法与底层原理揭秘:突破语言边界的深度探索

Python黑魔法与底层原理揭秘&#xff1a;突破语言边界的深度探索 开篇&#xff1a;超越表面的Python Python常被称为"胶水语言"&#xff0c;但其真正的威力在于对底层的高度可控性。本文将揭示那些鲜为人知的Python黑魔法&#xff0c;带你深入CPython实现层面&…

Es的text和keyword类型以及如何修改类型

昨天同事触发定时任务发现es相关服务报了一个序列化问题&#xff0c; 今天早上捕获异常将异常堆栈全部打出来看&#xff0c;才发现是聚合的字段不是keyword类型的问题。 到kibbna命令行执行也是一样的错误 使用 /_mapping查看索引的字段类型&#xff0c;才发现userUniqueid是te…

大语言模型 07 - 从0开始训练GPT 0.25B参数量 - MiniMind 实机训练 预训练 监督微调

写在前面 GPT&#xff08;Generative Pre-trained Transformer&#xff09;是目前最广泛应用的大语言模型架构之一&#xff0c;其强大的自然语言理解与生成能力背后&#xff0c;是一个庞大而精细的训练流程。本文将从宏观到微观&#xff0c;系统讲解GPT的训练过程&#xff0c;…