目录
概述
Redis分布式锁的实现方式
1. 基于SETNX命令(String类型)
2. 使用SET命令的NX和EX参数(推荐方式)
3. 基于Lua脚本实现复杂逻辑
4. RedLock算法(多节点Redis实现)
Redisson的分布式锁
Redisson的锁类型
Redisson可重入锁的实现
Redis其他实现分布式锁的方式
1. 基于Sorted Set实现公平锁
2. 基于List实现分布式锁
总结
实际应用场景
场景一:库存超卖控制
业务场景
分布式锁解决方案
关键实现细节
实际开发注意事项
场景二:订单防重复提交
业务场景
分布式锁解决方案
关键实现细节
实际开发注意事项
场景三:促销活动限流与防超卖
业务场景
分布式锁解决方案
关键实现细节
实际开发注意事项
分布式锁在电商中的最佳实践
概述
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。下面详细介绍Redis分布式锁的实现方式以及Redisson的锁类型。
Redis分布式锁的实现方式
1. 基于SETNX命令(String类型)
最基本的实现方式是使用Redis的SETNX
(SET if Not eXists)命令:
# 使用Python和Redis-py库实现简单分布式锁
import redis
import timedef acquire_lock(redis_client, lock_name, acquire_timeout=10, lock_timeout=10):end_time = time.time() + acquire_timeoutlock_value = str(time.time() + lock_timeout) # 锁的过期时间while time.time() < end_time:# 使用SETNX尝试获取锁if redis_client.setnx(lock_name, lock_value):# 设置锁的过期时间,防止死锁redis_client.expire(lock_name, lock_timeout)return True# 检查锁是否过期current_value = redis_client.get(lock_name)if current_value and float(current_value) < time.time():# 锁已过期,尝试竞争并获取锁old_value = redis_client.getset(lock_name, lock_value)if old_value == current_value:redis_client.expire(lock_name, lock_timeout)return Truetime.sleep(0.1) # 短暂休眠避免频繁重试return Falsedef release_lock(redis_client, lock_name):redis_client.delete(lock_name)
这种方式的问题是:
- 锁的释放缺乏原子性
- 不支持可重入
- 没有锁失效机制(需手动设置过期时间)
2. 使用SET命令的NX和EX参数(推荐方式)
Redis 2.6.12版本后,SET
命令支持原子化操作:
# 使用SET命令的NX和EX参数实现分布式锁
def acquire_lock(redis_client, lock_name, acquire_timeout=10, lock_timeout=10):end_time = time.time() + acquire_timeoutwhile time.time() < end_time:# 使用SET命令原子化操作:NX表示只有键不存在时才设置,EX设置过期时间result = redis_client.set(lock_name, "locked", nx=True, ex=lock_timeout)if result:return Truetime.sleep(0.1)return False
这种方式解决了原子性问题,但仍不支持可重入。
3. 基于Lua脚本实现复杂逻辑
使用Lua脚本可以实现更复杂的原子操作,例如锁的释放:
# 使用Lua脚本确保锁释放的原子性
RELEASE_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("del", KEYS[1])
elsereturn 0
end
"""def release_lock(redis_client, lock_name, lock_value):return redis_client.eval(RELEASE_SCRIPT, 1, lock_name, lock_value)
4. RedLock算法(多节点Redis实现)
为了提高可靠性,Redis作者提出了RedLock算法,基于多个Redis节点:
# RedLock算法的简化实现
def redlock_acquire(redis_clients, lock_name, acquire_timeout=10, lock_timeout=10):start_time = time.time()lock_value = str(uuid.uuid4()) # 唯一标识锁acquired_locks = 0for client in redis_clients:if time.time() > start_time + acquire_timeout:breakif client.set(lock_name, lock_value, nx=True, px=lock_timeout):acquired_locks += 1# 需要获得多数节点的锁才算成功if acquired_locks >= (len(redis_clients) // 2 + 1):return lock_value# 获取锁失败,释放已获得的锁for client in redis_clients:client.eval(RELEASE_SCRIPT, 1, lock_name, lock_value)return False
Redisson的分布式锁
Redisson是一个基于Redis的Java驻内存数据网格(In-Memory Data Grid),它提供了分布式锁的实现。
Redisson的锁类型
Redisson的分布式锁是可重入锁,但从锁的性质上来说,它属于悲观锁。因为:
- 它在获取锁后会阻塞其他线程的访问
- 遵循"先获取锁再操作"的悲观策略
Redisson可重入锁的实现
Redisson的可重入锁通过Hash结构实现:
// Redisson可重入锁的内部实现(简化版)
public class RedissonLock {// 锁的名称private String lockName;// 当前线程IDprivate String threadId;// 重入次数private int reentrantCount = 0;// 尝试获取锁public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();// 使用Hash结构存储锁信息// HSET lockName threadId 1Long currentLockCount = redisClient.hset(lockName, threadId, 1);if (currentLockCount == 0) {// 锁已存在,检查是否是当前线程持有String currentValueStr = redisClient.hget(lockName, threadId);if (currentValueStr != null) {int count = Integer.parseInt(currentValueStr);// 锁重入redisClient.hset(lockName, threadId, count + 1);reentrantCount = count + 1;return true;}return false;}// 设置锁的过期时间redisClient.expire(lockName, leaseTime, unit);reentrantCount = 1;return true;}// 释放锁public void unlock() {String threadId = Thread.currentThread().getId();// 获取当前锁的重入次数String currentValueStr = redisClient.hget(lockName, threadId);if (currentValueStr == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ threadId + " thread-id: " + Thread.currentThread().getId());}int count = Integer.parseInt(currentValueStr);if (count == 1) {// 完全释放锁redisClient.hdel(lockName, threadId);return;}// 减少重入次数redisClient.hset(lockName, threadId, count - 1);reentrantCount = count - 1;}
}
Redis其他实现分布式锁的方式
1. 基于Sorted Set实现公平锁
# 使用Sorted Set实现公平锁
def acquire_fair_lock(redis_client, lock_name, client_id, acquire_timeout=10):end_time = time.time() + acquire_timeoutwhile time.time() < end_time:# 使用当前时间戳作为分数timestamp = time.time()# ZADD命令添加成员,NX确保只有不存在时才添加result = redis_client.zadd(lock_name, {client_id: timestamp}, nx=True)if result:# 获取锁成功return True# 检查自己是否是队列中的第一个first_client = redis_client.zrange(lock_name, 0, 0)if first_client and first_client[0] == client_id:return Truetime.sleep(0.01)return False
2. 基于List实现分布式锁
# 使用List实现分布式锁
def acquire_lock_with_list(redis_client, lock_name, acquire_timeout=10):end_time = time.time() + acquire_timeoutwhile time.time() < end_time:# LPUSH返回列表的长度result = redis_client.lpush(lock_name, "locked")if result == 1:# 设置过期时间redis_client.expire(lock_name, 10)return Truetime.sleep(0.1)return False
总结
Redis实现分布式锁的核心思想是利用其原子操作和单线程特性。不同的实现方式适用于不同的场景:
- 简单场景:使用SET命令的NX和EX参数
- 复杂场景:使用Redisson等成熟框架
- 高可靠性场景:使用RedLock算法
Redisson的分布式锁属于悲观锁范畴,同时支持可重入特性,是企业级应用中常用的分布式锁实现方案。在电商平台中,分布式锁是保障数据一致性和业务正确性的关键技术。以下是三个典型场景及实际开发中的使用方法:
实际应用场景
场景一:库存超卖控制
业务场景
用户下单时需要扣减库存,若多个用户同时购买同一商品,可能导致库存超卖(如库存1件,却被10个用户同时下单成功)。
分布式锁解决方案
# 使用Redis分布式锁防止库存超卖
def deduct_stock(product_id, quantity):lock_name = f"stock_lock:{product_id}"with redisson_client.getLock(lock_name): # 获取分布式锁# 查询当前库存current_stock = redis_client.get(f"stock:{product_id}")if current_stock >= quantity:# 扣减库存redis_client.decr(f"stock:{product_id}", quantity)return Truereturn False
关键实现细节
- 锁粒度:使用商品ID作为锁名,确保同一商品的库存操作互斥
- 原子性:库存查询和扣减操作必须在锁内完成
- 超时设置:设置合理的锁超时时间(如30秒),防止死锁
实际开发注意事项
- 使用Redisson的可重入锁,避免同一线程重复获取锁时阻塞
- 结合Lua脚本进一步优化库存扣减逻辑,确保原子性
- 库存预扣减机制:先扣减缓存库存,异步同步到数据库
场景二:订单防重复提交
业务场景
用户点击提交订单按钮后,由于网络延迟等原因可能重复提交,导致创建多个相同订单。
分布式锁解决方案
// 使用Redis分布式锁防止订单重复提交
public Response createOrder(OrderRequest request) {String orderToken = request.getOrderToken(); // 前端生成的唯一标识RLock lock = redissonClient.getLock("order:" + orderToken);try {// 尝试获取锁,1秒后自动释放boolean isLocked = lock.tryLock(100, 1000, TimeUnit.MILLISECONDS);if (!isLocked) {return Response.failure("请勿重复提交订单");}// 处理订单创建逻辑Order order = orderService.createOrder(request);return Response.success(order);} catch (InterruptedException e) {Thread.currentThread().interrupt();return Response.failure("系统繁忙,请稍后再试");} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}
}
关键实现细节
- 唯一标识:前端生成唯一订单Token(如UUID),随请求一起传递
- 短超时:设置较短的锁超时时间(如1秒),防止用户正常操作被长时间阻塞
- 幂等性:结合数据库唯一索引,确保订单号唯一性
实际开发注意事项
- 前端防抖处理:按钮点击后禁用,防止用户手动重复点击
- 锁的粒度控制:按订单Token加锁,而非全局锁
- 异常处理:确保锁在异常情况下仍能释放
场景三:促销活动限流与防超卖
业务场景
限时秒杀、优惠券领取等促销活动中,大量并发请求可能导致系统崩溃或资源超发。
分布式锁解决方案
# 使用Redis分布式锁和计数器实现秒杀限流
def seckill(product_id, user_id):lock_name = f"seckill_lock:{product_id}"with redisson_client.getLock(lock_name):# 检查活动是否结束if not is_activity_active(product_id):return "活动已结束"# 检查用户是否已参与if redis_client.sismember(f"seckill_users:{product_id}", user_id):return "每个用户限参与一次"# 检查库存stock = redis_client.get(f"seckill_stock:{product_id}")if stock <= 0:return "已售罄"# 扣减库存redis_client.decr(f"seckill_stock:{product_id}")# 记录用户参与redis_client.sadd(f"seckill_users:{product_id}", user_id)return "秒杀成功"
关键实现细节
- 双重检查:先检查库存再获取锁,减少锁竞争
- 用户限制:使用Set记录已参与用户,防止重复参与
- 库存预热:活动开始前将库存加载到Redis
- 异步处理:订单创建等耗时操作放入MQ异步处理
实际开发注意事项
- 热点商品处理:对热门商品采用多节点RedLock或分段锁
- 熔断机制:当并发过高时自动拒绝请求,保护系统
- 降级策略:库存为0时直接返回,不再加锁
分布式锁在电商中的最佳实践
- 合理设置锁超时时间:根据业务处理时间设置合理的超时值,避免死锁
- 锁粒度控制:尽量使用细粒度锁(如按商品ID、订单ID),避免全局锁
- 异常处理:使用try-finally确保锁释放,或依赖Redisson的看门狗机制
- 性能优化:
-
- 减少锁内操作时间
- 使用读写锁分离读操作
- 采用分段锁处理热点数据
- 监控与报警:监控锁的持有时间、等待队列长度等指标,及时发现异常
分布式锁是电商系统的重要组成部分,但需结合业务场景选择合适的实现方式,并注意性能与可靠性的平衡。