文章目录
- 分布式锁
- 分布式锁的实现
- zookeeper 分布式锁原理
- Curator 实现分布式锁API
- 1. InterProcessMutex(分布式可重入互斥锁)
- 2. InterProcessSemaphoreMutex(分布式非可重入互斥锁)
- 3. InterProcessReadWriteLock(分布式读写锁)
- 4. InterProcessSemaphoreV2(分布式信号量)
- 5. MultiSharedLock(多共享锁)
- 方案对比与推荐
- 模拟12306售票案例
- 案例背景
- 实现步骤
- 1. 依赖引入
- 2. ZooKeeper 连接配置
- 3. 票务服务(核心逻辑)
- 4. 模拟并发抢票
- 关键点解析
- 运行结果示例
- 扩展优化
分布式锁
- 在进行单机应用开发时,涉及女并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都在同一个JVM之下,没有任何问题。
- 但当我们的应用是分布式集群工作的情况下,属于多个JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
- 那么就需要一种更高级的锁机制来处理这种跨机器的进程之间的数据同步问题,这就是分布式锁。
分布式锁的实现
- 基于缓存实现分布式锁:redis,memcache
- zookeeper实现分布式锁:Curator
- 数据库层面实现分布式锁: 乐观锁,悲观锁
zookeeper 分布式锁原理
- 核心思想: 当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
- 客户端时,在lock节点下创建 临时顺序 节点。
- 然后获取lock下面的所有子节点,客户端获取到所有的子节点后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
3 如果发现自己创建的节点并非lock所有节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听,监听删除事件。
4 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
Curator 实现分布式锁API
Curator 提供了五种分布式锁方案,每种方案适用于不同的业务场景,以下是具体介绍:
1. InterProcessMutex(分布式可重入互斥锁)
- 特点:
- 可重入:同一线程可多次获取锁,避免死锁。
- 互斥性:基于 ZooKeeper 临时顺序节点实现全局互斥,确保任何时刻只有一个客户端持有锁。
- 自动释放:通过临时节点的特性,客户端宕机时锁自动释放。
- 适用场景:
- 需要线程安全的分布式资源访问(如订单处理、库存扣减)。
- 示例代码:
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock"); lock.acquire(); // 获取锁 try {// 执行业务逻辑 } finally {lock.release(); // 释放锁 }
2. InterProcessSemaphoreMutex(分布式非可重入互斥锁)
- 特点:
- 非可重入:同一线程重复获取锁会阻塞,避免误用导致的死锁。
- 轻量级:相比可重入锁,实现更简单,性能略高。
- 适用场景:
- 需要严格互斥且无需重入的场景(如分布式任务调度)。
- 示例代码:
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/locks/nonReentrantLock"); lock.acquire(); // 获取锁 try {// 执行业务逻辑 } finally {lock.release(); // 释放锁 }
3. InterProcessReadWriteLock(分布式读写锁)
- 特点:
- 读写分离:允许多个读锁并发,写锁独占。
- 公平性:基于 ZooKeeper 节点顺序实现公平锁,避免写锁饥饿。
- 适用场景:
- 读多写少的场景(如分布式缓存更新、配置中心)。
- 示例代码:
InterProcessReadWriteLock rwLock = new InterProcessReadWriteLock(client, "/locks/rwLock"); InterProcessMutex readLock = rwLock.readLock(); // 读锁 InterProcessMutex writeLock = rwLock.writeLock(); // 写锁 readLock.acquire(); // 获取读锁 try {// 执行读操作 } finally {readLock.release(); // 释放读锁 }
4. InterProcessSemaphoreV2(分布式信号量)
- 特点:
- 资源限制:控制同时访问资源的客户端数量(如许可证数量)。
- 动态调整:可通过 ZooKeeper 节点动态修改信号量值。
- 适用场景:
- 限流控制(如 API 调用限流、连接池管理)。
- 示例代码:
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/locks/semaphore", 3); // 允许3个客户端同时访问 Lease lease = semaphore.acquire(); // 获取信号量 try {// 执行业务逻辑 } finally {semaphore.returnLease(lease); // 释放信号量 }
5. MultiSharedLock(多共享锁)
- 特点:
- 组合锁:允许客户端同时获取多个锁,实现跨资源的原子操作。
- 避免死锁:通过全局顺序获取锁,防止死锁。
- 适用场景:
- 需要跨多个资源协调的场景(如分布式事务)。
- 示例代码:
List<String> lockPaths = Arrays.asList("/locks/resource1", "/locks/resource2"); MultiSharedLock multiLock = new MultiSharedLock(client, lockPaths); multiLock.acquire(); // 获取所有锁 try {// 执行跨资源操作 } finally {multiLock.release(); // 释放所有锁 }
方案对比与推荐
锁类型 | 可重入 | 并发性 | 适用场景 |
---|---|---|---|
InterProcessMutex | 是 | 低 | 需要线程安全的资源访问 |
InterProcessSemaphoreMutex | 否 | 低 | 严格互斥场景 |
InterProcessReadWriteLock | 读是 | 读高/写低 | 读多写少场景 |
InterProcessSemaphoreV2 | 否 | 高(有限制) | 限流控制 |
MultiSharedLock | 是 | 低 | 跨资源原子操作 |
- 推荐选择:
- 默认场景:优先使用
InterProcessMutex
,兼顾安全性和灵活性。 - 高性能读场景:选择
InterProcessReadWriteLock
的读锁。 - 限流场景:使用
InterProcessSemaphoreV2
控制并发量。 - 复杂协调场景:
MultiSharedLock
实现跨资源同步。
- 默认场景:优先使用
模拟12306售票案例
以下是使用 Curator 的 InterProcessMutex
模拟 12306 售票系统 的分布式锁案例,解决高并发下超卖问题:
案例背景
12306 售票系统需要保证:
- 同一车次余票的原子性操作:多个用户同时购票时,不能出现超卖(如余票为 1 时,两个用户同时抢到票)。
- 分布式环境下的线程安全:多个售票服务节点(如不同服务器)同时处理请求时,需通过分布式锁协调。
实现步骤
1. 依赖引入
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency>
2. ZooKeeper 连接配置
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;public class ZkClient {private static final String ZK_ADDRESS = "localhost:2181";private static CuratorFramework client;static {client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new ExponentialBackoffRetry(1000, 3));client.start();}public static CuratorFramework getClient() {return client;}
}
3. 票务服务(核心逻辑)
import org.apache.curator.framework.recipes.locks.InterProcessMutex;public class TicketService {private final InterProcessMutex lock;private int remainingTickets; // 剩余票数(模拟数据库)public TicketService(String lockPath, int initialTickets) {this.lock = new InterProcessMutex(ZkClient.getClient(), lockPath);this.remainingTickets = initialTickets;}// 购票方法public boolean buyTicket(String userId) {try {// 1. 获取分布式锁(阻塞式)if (lock.acquire(10, TimeUnit.SECONDS)) { // 超时时间10秒try {// 2. 检查余票(双重检查,避免锁内耗时)if (remainingTickets <= 0) {System.out.println("用户 " + userId + " 购票失败:票已售罄");return false;}// 3. 模拟业务逻辑(如扣减库存、生成订单)Thread.sleep(50); // 模拟网络延迟或耗时操作// 4. 扣减票数remainingTickets--;System.out.println("用户 " + userId + " 购票成功!剩余票数:" + remainingTickets);return true;} finally {// 5. 释放锁lock.release();}}} catch (Exception e) {e.printStackTrace();}return false;}
}
4. 模拟并发抢票
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class TicketSimulation {public static void main(String[] args) throws InterruptedException {// 初始化票务服务(锁路径为 /tickets/train123,初始票数10)TicketService ticketService = new TicketService("/tickets/train123", 10);// 模拟5个用户并发抢票ExecutorService executor = Executors.newFixedThreadPool(2);for (int i = 1; i <= 5; i++) {final String userId = "User-" + i;executor.execute(() -> ticketService.buyTicket(userId));}executor.shutdown();executor.awaitTermination(1, TimeUnit.MINUTES);ZkClient.getClient().close();}
}
关键点解析
-
分布式锁的作用:
- 确保同一时间只有一个线程能操作余票,避免超卖。
- 使用 ZooKeeper 临时顺序节点实现,客户端宕机时锁自动释放。
-
锁的粒度:
- 锁路径
/tickets/train123
对应具体车次,不同车次互不影响。
- 锁路径
-
超时控制:
lock.acquire(10, TimeUnit.SECONDS)
防止死锁(如客户端崩溃未释放锁)。
-
双重检查:
- 获取锁后再次检查余票,避免锁内耗时导致其他线程重复扣减。
-
性能优化:
- 锁的持有时间尽可能短(仅包裹关键代码段)。
运行结果示例
用户 User-1 购票成功!剩余票数:9
用户 User-2 购票成功!剩余票数:8
...
用户 User-3 购票成功!剩余票数:0
用户 User-2 购票失败:票已售罄
...
扩展优化
-
数据库集成:
- 实际场景中,余票应存储在数据库,通过锁保证分布式事务(如扣减库存和生成订单的原子性)。
-
锁的公平性:
- Curator 的
InterProcessMutex
默认公平锁,按请求顺序获取锁。
- Curator 的
-
Redisson 替代方案:
- 如果使用 Redis,可用 Redisson 的
RLock
实现类似功能。
- 如果使用 Redis,可用 Redisson 的
-
锁重试策略:
- 可通过
RetryNTimes
或RetryUntilElapsed
自定义重试逻辑。
- 可通过
通过 InterProcessMutex
,12306 售票系统能安全处理高并发请求,确保数据一致性。