多线程生产者消费者模型实战案例
- 前言
- 业务场景
- 术前准备
- 无锁
- 无事务
- 有事务
- synchronized
- 事务在锁外
- 事务在锁内
- 数据库行锁
- 什么是数据库行锁
- 有事务
- 没有事务
- 乐观锁
- ReentrantLock
- 分布式锁
前言
曾经一直有一个疑惑,就是关于多线程生产者消费者模型的学习过程中,老师给出的案例例如卖票这种场景,都是在代码中定义一个票数这么一个变量,然后让多个线程去消费这个变量。
public class Task implements Runnable{private int ticket = 1000;//由于三个窗口都需要卖票,所以设为全局变量private Lock lock = new ReentrantLock();@Overridepublic void run() {while(ticket > 0){lock.lock();if(ticket > 0){System.out.println(Thread.currentThread().getName() + "正在销售第" + ticket + "张票");ticket--;}if(ticket <= 0){System.out.println(Thread.currentThread().getName() + "票已售完");}lock.unlock();}}
}
然后在new 多个Thread去分别调用这个方法去卖票,以此保障线程安全问题。基于此于是我就产生了一个疑惑,在实际的业务场景中应该是要和数据库操作相关的,那么如果把数据库操作加上,会是什么效果?今天就来测试一下。
业务场景
我们假设这样一个场景,有一个银行账户A,然后大家都可以往这个账户里存钱或者取钱,存取的时候添加记录,并且记录是要连续的。类似下图
术前准备
- 准备两张数据库表,一张银行账户表,其中balance字段就是用来存储当前余额,另一张存取记录表,记录每次存入或支出的金额。可以看到在
t_bank_account
中有个字段叫version 这个字段是我们使用mybatis plus乐观锁的字段。
CREATE TABLE `t_bank_account` (`id` int NOT NULL,`user_id` int NOT NULL COMMENT '用户id',`account` varchar(25) COLLATE utf8mb4_general_ci NOT NULL COMMENT '账号',`bank` varchar(50) COLLATE utf8mb4_general_ci NOT NULL COMMENT '银行',`card_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '银行卡号',`card_type` tinyint DEFAULT NULL COMMENT '卡片类型',`balance` decimal(10,2) DEFAULT NULL COMMENT '余额',`opening_bank` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '开户行',`create_time` datetime DEFAULT NULL,`create_by` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,`update_time` datetime DEFAULT NULL,`update_by` datetime DEFAULT NULL,`version` int DEFAULT NULL,`deleted` tinyint NOT NULL DEFAULT '0',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='银行账户表';CREATE TABLE `t_bank_operation` (`id` int NOT NULL AUTO_INCREMENT,`account` varchar(50) COLLATE utf8mb4_general_ci NOT NULL COMMENT '银行账号',`money` decimal(10,2) NOT NULL COMMENT '发生金额',`flow_type` tinyint NOT NULL COMMENT '资金流向(0转入1转出)',`pre_money` decimal(10,2) NOT NULL COMMENT '发生前金额',`after_money` decimal(10,2) NOT NULL COMMENT '发生后金额',`operator` varchar(20) COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作人',`operation_time` datetime NOT NULL COMMENT '操作时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=939 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
- 初始化一条数据,给该账户设置初始金额为1000元。
INSERT INTO `practice`.`t_bank_account` (`id`, `user_id`, `account`, `bank`, `card_name`, `card_type`, `balance`, `opening_bank`, `create_time`, `create_by`, `update_time`, `update_by`, `version`, `deleted`) VALUES (335478786, 700001, '6222359596616068885', '工商银行', '工商银行贷记卡金卡', 2, 1000.00, '中国工商银行(银川石油城支行)', NULL, '', NULL, NULL, 0, 0);
- 银行账户实体类
@Data
@TableName("t_bank_account")
@ApiModel(value = "BankAccount对象", description = "银行账户表")
public class BankAccount implements Serializable {private static final long serialVersionUID = 1L;private Integer id;/*** 用户id*/@ApiModelProperty("用户id")private Integer userId;/*** 账号*/@ApiModelProperty("账号")private String account;/*** 银行*/@ApiModelProperty("银行")private String bank;/*** 银行卡号*/@ApiModelProperty("卡片名称")private String cardName;/*** 卡片类型*/@ApiModelProperty("卡片类型")private BankCardType cardType;/*** 余额*/@ApiModelProperty("余额")private BigDecimal balance;/*** 开户行*/@ApiModelProperty("开户行")private String openingBank;private LocalDateTime createTime;private String createBy;private LocalDateTime updateTime;private LocalDateTime updateBy;@Versionprivate Integer version;private Integer deleted;
- 操作记录实体类
@Data
@TableName("t_bank_operation")
@ApiModel(value = "BankOperation对象", description = "")
public class BankOperation implements Serializable {private static final long serialVersionUID = 1L;/*** 银行账号*/@TableId(value = "id",type = IdType.AUTO)private Integer id;@ApiModelProperty("银行账号")private String account;/*** 发生金额*/@ApiModelProperty("发生金额")private BigDecimal money;/*** 资金流向(0转入1转出)*/@ApiModelProperty("资金流向(0转入1转出)")private BankFlowType flowType;/*** 发生前金额*/@ApiModelProperty("发生前金额")private BigDecimal preMoney;/*** 发生后金额*/@ApiModelProperty("发生后金额")private BigDecimal afterMoney;/*** 操作人*/@ApiModelProperty("操作人")private String operator;/*** 操作时间*/@ApiModelProperty("操作时间")private LocalDateTime operationTime;}
- service
public interface IBankAccountService extends IService<BankAccount> {//普通查询BankAccount getByAccountNo(String accountNo);//FOR UPDATE 查询BankAccount getByAccountNoForUpdate(String accountNo);//普通加钱void addMoney(String account, double money);//普通减钱void subMoney(String account, double money);//事务加钱void transactionAddMoney(String account, double money);//事务减钱void transactionSubMoney(String account, double money);//synchronized加锁加钱void syncAddMoney(String account, double money);//synchronized加锁减钱void syncSubMoney(String account, double money);//数据库行锁加钱void forUpdateAddMoney(String account, double money);//数据库行锁减钱void forUpdateSubMoney(String account, double money);//数据库行锁没有事务加钱void noTransactionForUpdateAddMoney(String account, double money);//数据库行锁没有事务减钱void noTransactionForUpdateSubMoney(String account, double money);//乐观锁加钱void optimisticLockAddMoney(String account, double money);//乐观锁减钱void optimisticLockSubMoney(String account, double money);//ReentrantLock 加钱void reentrantLockAddMoney(String account, double money);//ReentrantLock 减钱void reentrantLockSubMoney(String account, double money);
}
- serviceImpl
/*** <p>* 银行账户表 服务实现类* </p>** @author baomidou* @since 2025-07-01*/
@Service
public class BankAccountServiceImpl extends ServiceImpl<BankAccountMapper, BankAccount> implements IBankAccountService {@Autowiredprivate IBankOperationService bankOperationService;private final Object Lock = new Object();private final ReentrantLock reentrantLock = new ReentrantLock();@Overridepublic void addMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Overridepublic void subMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}@Override@Transactionalpublic void transactionAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void transactionSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}@Overridepublic BankAccount getByAccountNo(String accountNo) {return getBaseMapper().selectOne(new LambdaQueryWrapper<BankAccount>().eq(BankAccount::getAccount,accountNo));}@Overridepublic void syncAddMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.inflow);}}@Overridepublic void syncSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}@Overridepublic void syncSleepAddMoney(String account, double money) {synchronized (Lock){try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}operation(account,money,BankFlowType.inflow);}}@Overridepublic void syncSleepSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}@Transactionalpublic void operation(String account, double money, BankFlowType bankFlowType) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();if (BankFlowType.inflow.getCode() == bankFlowType.getCode()){bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));}else{bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));}int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程"+(BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "入" : "出")+"账:"+money+ " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setFlowType(bankFlowType);operation.setAfterMoney(bankAccount.getBalance());operation.setOperator((BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "生产者" : "消费者"));operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}}public void operationForUpdate(String account, double money, BankFlowType bankFlowType) {BankAccount bankAccount = getByAccountNoForUpdate(account);BigDecimal preBalance = bankAccount.getBalance();if (BankFlowType.inflow.getCode() == bankFlowType.getCode()){bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));}else{bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));}int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程"+(BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "入" : "出")+"账:"+money+ " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setFlowType(bankFlowType);operation.setAfterMoney(bankAccount.getBalance());operation.setOperator((BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "生产者" : "消费者"));operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}}@Overridepublic BankAccount getByAccountNoForUpdate(String accountNo) {return getBaseMapper().selectOne(new LambdaQueryWrapper<BankAccount>().eq(BankAccount::getAccount,accountNo).last("FOR UPDATE"));}@Override@Transactionalpublic void forUpdateAddMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void forUpdateSubMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.outflow);}@Overridepublic void noTransactionForUpdateAddMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.inflow);}@Overridepublic void noTransactionForUpdateSubMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.outflow);}//使用乐观锁@Overridepublic void optimisticLockAddMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程入账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.inflow);operation.setOperator("生产者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{//乐观锁冲突更新失败重试System.out.println("乐观锁冲突 生产失败 重试");optimisticLockAddMoney(account,money);}}@Overridepublic void optimisticLockSubMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程出账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.outflow);operation.setOperator("消费者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{System.out.println("乐观锁冲突 消费失败 重试");optimisticLockSubMoney(account,money);}}@Overridepublic void reentrantLockAddMoney(String account, double money) {reentrantLock.lock();try {operation(account,money,BankFlowType.inflow);}finally {reentrantLock.unlock();}}@Overridepublic void reentrantLockSubMoney(String account, double money) {reentrantLock.lock();try {operation(account,money,BankFlowType.outflow);}finally {reentrantLock.unlock();}}
}
- 单元测试类
@SpringBootTest
public class BankTest {@Autowiredprivate IBankAccountService bankAccountService;@Testpublic void test() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.addMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.subMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join(); // 等待thread完成thread2.join(); // 等待thread2完成}@Testpublic void transactionTest() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.transactionAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.transactionSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join(); // 等待thread完成thread2.join(); // 等待thread2完成}@Testpublic void syncTest() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 50; i++) {System.out.println("存入100元");bankAccountService.syncAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 50; i++) {System.out.println("消费100元");bankAccountService.syncSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join(); // 等待thread完成thread2.join(); // 等待thread2完成}@Testpublic void testForUpdate() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.forUpdateAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.forUpdateSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join(); // 等待thread完成thread2.join(); // 等待thread2完成System.out.println("执行结束");Thread.sleep(10000);System.out.println("睡眠结束");}@Testpublic void optimisticLockTest() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.optimisticLockAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.optimisticLockSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join(); // 等待thread完成thread2.join(); // 等待thread2完成System.out.println("执行结束");Thread.sleep(10000);System.out.println("睡眠结束");}}
针对上面的代码我们分为以下几种情况来测试
- 不加锁的情况
- 不加锁无事务
- 不加锁有事务
- synchronized
- 事务在锁外
- 事务在锁内
- 数据库行锁FOR UPDATE
- 有事务
- 没有事务
- 乐观锁
- 分布式锁
无锁
无事务
首先我们在无锁并且没有事务的情况测试一下,将实体类中的version字段注释掉,取消使用乐观锁,调用addMoney subMoney
方法在两个线程中分别加钱减钱
-
结果:
-
总结:
在没有锁的情况一定会发生错误,原因也很好介绍,A线程查询到1000元增加100元,B线程也拿到1000元减少100元,B线程的结果覆盖A线程 = 900元,多次发生这种情况,最终价格变得无法预测。
有事务
我们将数据恢复成一千元的初始资金,再清空,同时在方法上新增事务注解,再次使用两个线程分别加钱和减钱。
@Override@Transactionalpublic void transactionAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void transactionSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}
- 结果:仍然错误,理由也很好解释,不论什么样的隔离级别,A线程读到1000元,B线程同样读到1000元,A线程修改后变成1100 B线程修改后变成900,这与事务的隔离级别没有关系,因为查询是共享锁,不论哪个线程都可以读取。
synchronized
事务在锁外
@Override@Transactionalpublic void syncAddMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.inflow);}}@Override@Transactionalpublic void syncSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}
事务在锁内
@Overridepublic void syncAddMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.inflow);}}@Overridepublic void syncSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}@Transactionalpublic void operation(String account, double money, BankFlowType bankFlowType) {//...}
- 结论:
- 事务在锁外是无效的
- 事务在锁内是有效的
- 总结:
- synchronize锁的对象及service单例情况 首先我们这里的
Lock
锁是声明在service实例中的一个new出来的Object对象- 如果这个Lock是static的则锁由类持有,无论哪个线程都要先去获取类的锁。
- 如果非static,则锁由当前对象实例持有,无论哪个线程都要先去获取对象实例的锁。
- 而spring中bean的实例是单例的,因此也就是无论这个Lock是static,或则非static,亦或者
synchronized(this)
效果都一样。因为这些线程都用的同一个service对象实例来调用方法,因此在单元测试的两个线程中都要竞争这把锁,拿到锁才能执行。 - 如果spring中bean是多例的(@Scope(“prototype”)),则非static的Lock和synchronized(this)都将完全失效,每个请求会创建新实例,Lock 是实例级别的,锁完全失效(不同线程用不同的锁)。绝对不能这样做!。
- 事务在锁内与锁外的情况
- 事务在锁内,正常提交,流程:线程A拿到锁,线程B等待,线程A开启事务、执行结束、提交事务、释放锁,线程B拿到锁,读到A提交后的数据,然后开启事务、执行并提交事务;依次执行效果正常。
- 事务在锁外,结果异常,流程:线程A开启事务,线程B开启事务,线程A抢到锁,线程B等待,线程A执行结束,释放锁,此时事务并未提交,因为 @Transactional 是由 Spring AOP 代理控制的,会在方法退出时提交事务,而由于事务的隔离级别,此时B线程拿到锁之后读到的数据并不是A线程修改后的数据,这就导致A将拿到的balance的值为1000的数据+100后=1100,B也将balance的值为1000的数据-100后=900,正确的结果应该是B要拿到的balance的值为1100才对。最终这样重复几次之后结果将无法预测。
- synchronize锁的对象及service单例情况 首先我们这里的
数据库行锁
什么是数据库行锁
关于数据库行锁我们写一个小例子来看看效果
-
第一步我们执行
START TRANSACTION 与 select * from t_bank_account where id =335478786 for update
两条语句,使用FOR UPDATE
在开启的事务中查询,此时我们不执行update 与 commit
我们将这个查询称之为A -
第二步我们新开一个查询B,执行
select * from t_bank_account where id =335478786
此时的结果是正常的,获取到的balance为1000元。原因如下 -
第三步我们在A查询中再执行
update t_bank_account set balance = 12 WHERE id=335478786;
,仍然是不提交不执行commit,此时再执行一次B查询,可以看到B的结果仍然是1000,这是因为受MySQL默认的事务隔离级别可重复读,无法读到未提交的数据,因此A查询的update语句结果B无法读取。 -
第四步我们再执行C查询
SELECT * FROM t_bank_account WHERE id = 335478786 for update ;
此时我们可以发现,相比于B查询,C查询多了for update
,也就是与A查询中的select语句相同,此时的结果是C查询卡主不动,不显示结果,原因就是A查询持有该行的行锁,而C查询也需要这一行的行锁,因此C查询需要在等待A查询释放锁后才能显示查询结果。同时FOR UPDATE语句所持有的锁会一直等到事务提交之后才会释放,因此以上就是通过FOR UPDATE
数据库行锁实现多线程操作账户没有并发问题的原因。 -
第五步我们新开一个查询D,执行
update t_bank_account set balance = 43 WHERE id=335478786;
此时我们发现D查询同样卡主不动,因为FOR UPDATE 的行锁会禁止其他事务修改该条数据。不论是update亦或者delete都无法对该行产生效果。 -
第六步,我们执行A查询的
commit
,然后会发现卡主的C查询和D查询都可以继续正常执行了。
到此我们对FOR UPDATE
有了一个基本的概念,对于FOR UPDATE 的意思我个人的理解就是我要拿着这个查询去修改,如果同样有人要干这个事儿你就先等着
,那么在这个过程中有事务和没有事务又有什么区别和效果呢?我们继续往下看
有事务
@Overridepublic BankAccount getByAccountNoForUpdate(String accountNo) {return getBaseMapper().selectOne(new LambdaQueryWrapper<BankAccount>().eq(BankAccount::getAccount,accountNo).last("FOR UPDATE"));}@Override@Transactionalpublic void forUpdateAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void forUpdateSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}public void operation(String account, double money, BankFlowType bankFlowType) {BankAccount bankAccount = getByAccountNoForUpdate(account);BigDecimal preBalance = bankAccount.getBalance();if (BankFlowType.inflow.getCode() == bankFlowType.getCode()){bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));}else{bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));}int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程"+(BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "入" : "出")+"账:"+money+ " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setFlowType(bankFlowType);operation.setAfterMoney(bankAccount.getBalance());operation.setOperator((BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "存储者" : "消费者"));operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}}
没有事务
@Overridepublic void forUpdateAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Overridepublic void forUpdateSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}public void operation(String account, double money, BankFlowType bankFlowType) {//...}
- 总结:
- 有事务:
- 线程A开启事务,线程B也开启事务,线程A执行FOR UPDAT查询
getByAccountNoForUpdate()
,同时给该行添加行锁,线程B也执行FOR UPDATE查询getByAccountNoForUpdate()
,发现需要等待A线程的事务释放行锁,B线程等待,A线程执行结束提交事务释放行锁(事务提交后(即事务结束时)才会释放
),此时B线程拿到锁,继续执行然后提交事务然后释放锁。
- 线程A开启事务,线程B也开启事务,线程A执行FOR UPDAT查询
- 无事务:
- 无事务情况下的FOR UPDATE 会锁定数据库行,但锁的持有时间仅限当前 SQL 语句的执行期间。如果没有事务,锁会在查询结束后立即释放,其他线程可以立刻修改数据,导致并发问题。在我们的示例中线程A FOR Update 查询并加行锁,查询结束后立即释放锁,此时B线程同样查询并加锁,然后立即释放锁,此时A线程还没有修改,因此导致A线程与B线程拿到的都是1000元余额,最终导致A线程修改为1100,B线程修改为900
- 有事务:
乐观锁
使用mybatis plus的乐观锁插件 version注解实现,在更新记录的时候返回的值大于零即表示成功int row = getBaseMapper().updateById(bankAccount);
否则重试即可,并且这里也可以不添加事务。关于乐观锁与悲观锁的概念与实现这里不多叙述。
@Overridepublic void optimisticLockAddMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程入账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.inflow);operation.setOperator("生产者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{//乐观锁冲突更新失败重试System.out.println("乐观锁冲突 生产失败 重试");optimisticLockAddMoney(account,money);}}@Overridepublic void optimisticLockSubMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程出账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.outflow);operation.setOperator("消费者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{System.out.println("乐观锁冲突 消费失败 重试");optimisticLockSubMoney(account,money);}}
ReentrantLock
与synchronized效果类似,不过ReentrantLock更灵活一些。
分布式锁
这里我们使用基于Redis的分布式锁,所谓的分布式锁就是同样的代码在集群环境中的控制,例如负载均衡策略将不同的请求转发到不同的节点处理加钱或者减钱业务,那么这个时候同样需要保障线程安全问题,如果我们使用synchronized或者ReentrantLock这些锁,只能保证在单节点的情况下不出现并发问题,集群环境并不能保证。
@Overridepublic void setNXAddMoney(String account, double money) {String key = "bank:account:"+account;boolean b = lock(key);if (b){operation(account,money,BankFlowType.inflow);deleteLock(key);}}@Overridepublic void setNXSubMoney(String account, double money) {String key = "bank:account:"+account;boolean b = lock(key);if (b){operation(account,money,BankFlowType.outflow);deleteLock(key);}}/*** 获得锁** @param lock* @return*/public boolean lock(String lock) {return (boolean) redisTemplate.execute((RedisCallback) connection -> {//获取时间毫秒值long expireAt = System.currentTimeMillis() + 600 + 1;//获取锁Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes());if (acquire) {return true;} else {byte[] bytes = connection.get(lock.getBytes());//非空判断if (Objects.nonNull(bytes) && bytes.length > 0) {long expireTime = Long.parseLong(new String(bytes));// 如果锁已经过期if (expireTime < System.currentTimeMillis()) {// 重新加锁,防止死锁byte[] set = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + 600 + 1).getBytes());return Long.parseLong(new String(set)) < System.currentTimeMillis();}}}return false;});}/*** 删除锁** @param key*/public void deleteLock(String key) {redisTemplate.delete(key);}
对于上面的代码表面看起来似乎可以,但是实际上并不行,存在很多问题,首先加钱和减钱的方法在获取不到锁的情况会直接跳过,那么是否可以加入重试机制?
@Overridepublic void setNXSubMoney(String account, double money) {String key = "bank:account:"+account;boolean b = lock(key);if (b){operation(account,money,BankFlowType.outflow);deleteLock(key);}else {setNXSubMoney(account,money);}}
这么虽然也可以,但是可能导致递归过深栈溢出问题,高并发下可能递归过深,抛出 StackOverflowError。可以改为循环重试,当获取不到锁的时候休眠100毫秒再次重试获取锁,直到获取到为止。
while (!lock(key)) {Thread.sleep(100); // 避免CPU空转
}
operation(account, money, flowType);
deleteLock(key);
当然这也并不是完美的解决方案,还会存在很多问题,例如:
- 锁过期时间固定(600ms)
- 问题:如果业务操作超过 200ms,锁可能提前释放,导致其他线程进入。
- 改进:使用 看门狗机制(如 Redisson 的 lockWatchdogTimeout)自动续期。或根据业务耗时动态调整过期时间。
- getSet 非原子性
-
问题:
byte[] set = connection.getSet(lock.getBytes(), newExpireTime.getBytes());
在 getSet 执行间隙,其他线程可能修改锁,导致误判。 -
改进:用 Lua 脚本保证原子性:
if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("set", KEYS[1], ARGV[2]) elsereturn 0 end
-
- 锁释放风险
- 问题:deleteLock 直接删除锁,可能误删其他线程的锁(如果当前线程因 GC 停顿导致锁过期后被其他线程获取)。
- 改进:删除前验证锁的值是否属于当前线程:
public void deleteLock(String key, String expectedValue) {String lockValue = redisTemplate.opsForValue().get(key);if (expectedValue.equals(lockValue)) {redisTemplate.delete(key);} }
- 未处理锁获取失败
- 问题:如果始终无法获取锁,线程会无限重试。
- 改进:增加最大重试次数或超时时间:
int retry = 0; while (!lock(key) && retry++ < 3) {Thread.sleep(100); } if (retry >= 3) {throw new RuntimeException("获取锁失败"); }
推荐直接使用 Redisson(已解决上述所有问题)。