多线程生产者消费者模型实战案例

多线程生产者消费者模型实战案例

  • 前言
  • 业务场景
  • 术前准备
  • 无锁
    • 无事务
    • 有事务
  • synchronized
    • 事务在锁外
    • 事务在锁内
  • 数据库行锁
    • 什么是数据库行锁
    • 有事务
    • 没有事务
  • 乐观锁
  • ReentrantLock
  • 分布式锁

前言

曾经一直有一个疑惑,就是关于多线程生产者消费者模型的学习过程中,老师给出的案例例如卖票这种场景,都是在代码中定义一个票数这么一个变量,然后让多个线程去消费这个变量。

public class Task implements Runnable{private int ticket = 1000;//由于三个窗口都需要卖票,所以设为全局变量private Lock lock = new ReentrantLock();@Overridepublic void run() {while(ticket > 0){lock.lock();if(ticket > 0){System.out.println(Thread.currentThread().getName() + "正在销售第" + ticket + "张票");ticket--;}if(ticket <= 0){System.out.println(Thread.currentThread().getName() + "票已售完");}lock.unlock();}}
}

然后在new 多个Thread去分别调用这个方法去卖票,以此保障线程安全问题。基于此于是我就产生了一个疑惑,在实际的业务场景中应该是要和数据库操作相关的,那么如果把数据库操作加上,会是什么效果?今天就来测试一下。

业务场景

我们假设这样一个场景,有一个银行账户A,然后大家都可以往这个账户里存钱或者取钱,存取的时候添加记录,并且记录是要连续的。类似下图
在这里插入图片描述

术前准备

  • 准备两张数据库表,一张银行账户表,其中balance字段就是用来存储当前余额,另一张存取记录表,记录每次存入或支出的金额。可以看到在t_bank_account中有个字段叫version 这个字段是我们使用mybatis plus乐观锁的字段。
CREATE TABLE `t_bank_account` (`id` int NOT NULL,`user_id` int NOT NULL COMMENT '用户id',`account` varchar(25) COLLATE utf8mb4_general_ci NOT NULL COMMENT '账号',`bank` varchar(50) COLLATE utf8mb4_general_ci NOT NULL COMMENT '银行',`card_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '银行卡号',`card_type` tinyint DEFAULT NULL COMMENT '卡片类型',`balance` decimal(10,2) DEFAULT NULL COMMENT '余额',`opening_bank` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '开户行',`create_time` datetime DEFAULT NULL,`create_by` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,`update_time` datetime DEFAULT NULL,`update_by` datetime DEFAULT NULL,`version` int DEFAULT NULL,`deleted` tinyint NOT NULL DEFAULT '0',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='银行账户表';CREATE TABLE `t_bank_operation` (`id` int NOT NULL AUTO_INCREMENT,`account` varchar(50) COLLATE utf8mb4_general_ci NOT NULL COMMENT '银行账号',`money` decimal(10,2) NOT NULL COMMENT '发生金额',`flow_type` tinyint NOT NULL COMMENT '资金流向(0转入1转出)',`pre_money` decimal(10,2) NOT NULL COMMENT '发生前金额',`after_money` decimal(10,2) NOT NULL COMMENT '发生后金额',`operator` varchar(20) COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作人',`operation_time` datetime NOT NULL COMMENT '操作时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=939 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  • 初始化一条数据,给该账户设置初始金额为1000元。
INSERT INTO `practice`.`t_bank_account` (`id`, `user_id`, `account`, `bank`, `card_name`, `card_type`, `balance`, `opening_bank`, `create_time`, `create_by`, `update_time`, `update_by`, `version`, `deleted`) VALUES (335478786, 700001, '6222359596616068885', '工商银行', '工商银行贷记卡金卡', 2, 1000.00, '中国工商银行(银川石油城支行)', NULL, '', NULL, NULL, 0, 0);
  • 银行账户实体类
@Data
@TableName("t_bank_account")
@ApiModel(value = "BankAccount对象", description = "银行账户表")
public class BankAccount implements Serializable {private static final long serialVersionUID = 1L;private Integer id;/*** 用户id*/@ApiModelProperty("用户id")private Integer userId;/*** 账号*/@ApiModelProperty("账号")private String account;/*** 银行*/@ApiModelProperty("银行")private String bank;/*** 银行卡号*/@ApiModelProperty("卡片名称")private String cardName;/*** 卡片类型*/@ApiModelProperty("卡片类型")private BankCardType cardType;/*** 余额*/@ApiModelProperty("余额")private BigDecimal balance;/*** 开户行*/@ApiModelProperty("开户行")private String openingBank;private LocalDateTime createTime;private String createBy;private LocalDateTime updateTime;private LocalDateTime updateBy;@Versionprivate Integer version;private Integer deleted;
  • 操作记录实体类

@Data
@TableName("t_bank_operation")
@ApiModel(value = "BankOperation对象", description = "")
public class BankOperation implements Serializable {private static final long serialVersionUID = 1L;/*** 银行账号*/@TableId(value = "id",type = IdType.AUTO)private Integer id;@ApiModelProperty("银行账号")private String account;/*** 发生金额*/@ApiModelProperty("发生金额")private BigDecimal money;/*** 资金流向(0转入1转出)*/@ApiModelProperty("资金流向(0转入1转出)")private BankFlowType flowType;/*** 发生前金额*/@ApiModelProperty("发生前金额")private BigDecimal preMoney;/*** 发生后金额*/@ApiModelProperty("发生后金额")private BigDecimal afterMoney;/*** 操作人*/@ApiModelProperty("操作人")private String operator;/*** 操作时间*/@ApiModelProperty("操作时间")private LocalDateTime operationTime;}
  • service

public interface IBankAccountService extends IService<BankAccount> {//普通查询BankAccount getByAccountNo(String accountNo);//FOR UPDATE 查询BankAccount getByAccountNoForUpdate(String accountNo);//普通加钱void addMoney(String account, double money);//普通减钱void subMoney(String account, double money);//事务加钱void transactionAddMoney(String account, double money);//事务减钱void transactionSubMoney(String account, double money);//synchronized加锁加钱void syncAddMoney(String account, double money);//synchronized加锁减钱void syncSubMoney(String account, double money);//数据库行锁加钱void forUpdateAddMoney(String account, double money);//数据库行锁减钱void forUpdateSubMoney(String account, double money);//数据库行锁没有事务加钱void noTransactionForUpdateAddMoney(String account, double money);//数据库行锁没有事务减钱void noTransactionForUpdateSubMoney(String account, double money);//乐观锁加钱void optimisticLockAddMoney(String account, double money);//乐观锁减钱void optimisticLockSubMoney(String account, double money);//ReentrantLock 加钱void reentrantLockAddMoney(String account, double money);//ReentrantLock 减钱void reentrantLockSubMoney(String account, double money);
}
  • serviceImpl
/*** <p>* 银行账户表 服务实现类* </p>** @author baomidou* @since 2025-07-01*/
@Service
public class BankAccountServiceImpl extends ServiceImpl<BankAccountMapper, BankAccount> implements IBankAccountService {@Autowiredprivate IBankOperationService bankOperationService;private final Object Lock = new Object();private final ReentrantLock reentrantLock = new ReentrantLock();@Overridepublic void addMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Overridepublic void subMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}@Override@Transactionalpublic void transactionAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void transactionSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}@Overridepublic BankAccount getByAccountNo(String accountNo) {return getBaseMapper().selectOne(new LambdaQueryWrapper<BankAccount>().eq(BankAccount::getAccount,accountNo));}@Overridepublic void syncAddMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.inflow);}}@Overridepublic void syncSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}@Overridepublic void syncSleepAddMoney(String account, double money) {synchronized (Lock){try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}operation(account,money,BankFlowType.inflow);}}@Overridepublic void syncSleepSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}@Transactionalpublic void operation(String account, double money, BankFlowType bankFlowType) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();if (BankFlowType.inflow.getCode() == bankFlowType.getCode()){bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));}else{bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));}int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程"+(BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "入" : "出")+"账:"+money+ " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setFlowType(bankFlowType);operation.setAfterMoney(bankAccount.getBalance());operation.setOperator((BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "生产者" : "消费者"));operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}}public void operationForUpdate(String account, double money, BankFlowType bankFlowType) {BankAccount bankAccount = getByAccountNoForUpdate(account);BigDecimal preBalance = bankAccount.getBalance();if (BankFlowType.inflow.getCode() == bankFlowType.getCode()){bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));}else{bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));}int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程"+(BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "入" : "出")+"账:"+money+ " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setFlowType(bankFlowType);operation.setAfterMoney(bankAccount.getBalance());operation.setOperator((BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "生产者" : "消费者"));operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}}@Overridepublic BankAccount getByAccountNoForUpdate(String accountNo) {return getBaseMapper().selectOne(new LambdaQueryWrapper<BankAccount>().eq(BankAccount::getAccount,accountNo).last("FOR UPDATE"));}@Override@Transactionalpublic void forUpdateAddMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void forUpdateSubMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.outflow);}@Overridepublic void noTransactionForUpdateAddMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.inflow);}@Overridepublic void noTransactionForUpdateSubMoney(String account, double money) {operationForUpdate(account,money,BankFlowType.outflow);}//使用乐观锁@Overridepublic void optimisticLockAddMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程入账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.inflow);operation.setOperator("生产者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{//乐观锁冲突更新失败重试System.out.println("乐观锁冲突 生产失败 重试");optimisticLockAddMoney(account,money);}}@Overridepublic void optimisticLockSubMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程出账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.outflow);operation.setOperator("消费者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{System.out.println("乐观锁冲突 消费失败 重试");optimisticLockSubMoney(account,money);}}@Overridepublic void reentrantLockAddMoney(String account, double money) {reentrantLock.lock();try {operation(account,money,BankFlowType.inflow);}finally {reentrantLock.unlock();}}@Overridepublic void reentrantLockSubMoney(String account, double money) {reentrantLock.lock();try {operation(account,money,BankFlowType.outflow);}finally {reentrantLock.unlock();}}
}
  • 单元测试类

@SpringBootTest
public class BankTest {@Autowiredprivate IBankAccountService bankAccountService;@Testpublic void test() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.addMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.subMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join();  // 等待thread完成thread2.join(); // 等待thread2完成}@Testpublic void transactionTest() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.transactionAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.transactionSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join();  // 等待thread完成thread2.join(); // 等待thread2完成}@Testpublic void syncTest() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 50; i++) {System.out.println("存入100元");bankAccountService.syncAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 50; i++) {System.out.println("消费100元");bankAccountService.syncSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join();  // 等待thread完成thread2.join(); // 等待thread2完成}@Testpublic void testForUpdate() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.forUpdateAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.forUpdateSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join();  // 等待thread完成thread2.join(); // 等待thread2完成System.out.println("执行结束");Thread.sleep(10000);System.out.println("睡眠结束");}@Testpublic void optimisticLockTest() throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("存入100元");bankAccountService.optimisticLockAddMoney("6222359596616068885", 100);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 20; i++) {System.out.println("消费100元");bankAccountService.optimisticLockSubMoney("6222359596616068885", 100);}});thread.start();thread2.start();thread.join();  // 等待thread完成thread2.join(); // 等待thread2完成System.out.println("执行结束");Thread.sleep(10000);System.out.println("睡眠结束");}}

针对上面的代码我们分为以下几种情况来测试

  • 不加锁的情况
    • 不加锁无事务
    • 不加锁有事务
  • synchronized
    • 事务在锁外
    • 事务在锁内
  • 数据库行锁FOR UPDATE
    • 有事务
    • 没有事务
  • 乐观锁
  • 分布式锁

无锁

无事务

首先我们在无锁并且没有事务的情况测试一下,将实体类中的version字段注释掉,取消使用乐观锁,调用addMoney subMoney方法在两个线程中分别加钱减钱

  • 结果:
    在这里插入图片描述

  • 总结:
    在没有锁的情况一定会发生错误,原因也很好介绍,A线程查询到1000元增加100元,B线程也拿到1000元减少100元,B线程的结果覆盖A线程 = 900元,多次发生这种情况,最终价格变得无法预测。

有事务

我们将数据恢复成一千元的初始资金,再清空,同时在方法上新增事务注解,再次使用两个线程分别加钱和减钱。

    @Override@Transactionalpublic void transactionAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void transactionSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}
  • 结果:仍然错误,理由也很好解释,不论什么样的隔离级别,A线程读到1000元,B线程同样读到1000元,A线程修改后变成1100 B线程修改后变成900,这与事务的隔离级别没有关系,因为查询是共享锁,不论哪个线程都可以读取。

synchronized

事务在锁外

    @Override@Transactionalpublic void syncAddMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.inflow);}}@Override@Transactionalpublic void syncSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}

事务在锁内

@Overridepublic void syncAddMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.inflow);}}@Overridepublic void syncSubMoney(String account, double money) {synchronized (Lock){operation(account,money,BankFlowType.outflow);}}@Transactionalpublic void operation(String account, double money, BankFlowType bankFlowType) {//...}
  • 结论:
    • 事务在锁外是无效的
    • 事务在锁内是有效的
  • 总结:
    • synchronize锁的对象及service单例情况 首先我们这里的Lock锁是声明在service实例中的一个new出来的Object对象
      • 如果这个Lock是static的则锁由类持有,无论哪个线程都要先去获取类的锁。
      • 如果非static,则锁由当前对象实例持有,无论哪个线程都要先去获取对象实例的锁。
    • 而spring中bean的实例是单例的,因此也就是无论这个Lock是static,或则非static,亦或者synchronized(this)效果都一样。因为这些线程都用的同一个service对象实例来调用方法,因此在单元测试的两个线程中都要竞争这把锁,拿到锁才能执行。
    • 如果spring中bean是多例的(@Scope(“prototype”)),则非static的Lock和synchronized(this)都将完全失效,每个请求会创建新实例,Lock 是实例级别的,锁完全失效(不同线程用不同的锁)。绝对不能这样做!。
    • 事务在锁内与锁外的情况
      • 事务在锁内,正常提交,流程:线程A拿到锁,线程B等待,线程A开启事务、执行结束、提交事务、释放锁,线程B拿到锁,读到A提交后的数据,然后开启事务、执行并提交事务;依次执行效果正常。
      • 事务在锁外,结果异常,流程:线程A开启事务,线程B开启事务,线程A抢到锁,线程B等待,线程A执行结束,释放锁,此时事务并未提交,因为 @Transactional 是由 Spring AOP 代理控制的,会在方法退出时提交事务,而由于事务的隔离级别,此时B线程拿到锁之后读到的数据并不是A线程修改后的数据,这就导致A将拿到的balance的值为1000的数据+100后=1100,B也将balance的值为1000的数据-100后=900,正确的结果应该是B要拿到的balance的值为1100才对。最终这样重复几次之后结果将无法预测。

数据库行锁

什么是数据库行锁

关于数据库行锁我们写一个小例子来看看效果

在这里插入图片描述

  • 第一步我们执行START TRANSACTION 与 select * from t_bank_account where id =335478786 for update两条语句,使用FOR UPDATE在开启的事务中查询,此时我们不执行update 与 commit 我们将这个查询称之为A

  • 第二步我们新开一个查询B,执行select * from t_bank_account where id =335478786 此时的结果是正常的,获取到的balance为1000元。原因如下在这里插入图片描述

  • 第三步我们在A查询中再执行update t_bank_account set balance = 12 WHERE id=335478786; ,仍然是不提交不执行commit,此时再执行一次B查询,可以看到B的结果仍然是1000,这是因为受MySQL默认的事务隔离级别可重复读,无法读到未提交的数据,因此A查询的update语句结果B无法读取。

  • 第四步我们再执行C查询SELECT * FROM t_bank_account WHERE id = 335478786 for update ; 此时我们可以发现,相比于B查询,C查询多了for update,也就是与A查询中的select语句相同,此时的结果是C查询卡主不动,不显示结果,原因就是A查询持有该行的行锁,而C查询也需要这一行的行锁,因此C查询需要在等待A查询释放锁后才能显示查询结果。同时FOR UPDATE语句所持有的锁会一直等到事务提交之后才会释放,因此以上就是通过FOR UPDATE数据库行锁实现多线程操作账户没有并发问题的原因。

  • 第五步我们新开一个查询D,执行update t_bank_account set balance = 43 WHERE id=335478786; 此时我们发现D查询同样卡主不动,因为FOR UPDATE 的行锁会禁止其他事务修改该条数据。不论是update亦或者delete都无法对该行产生效果。

  • 第六步,我们执行A查询的commit,然后会发现卡主的C查询和D查询都可以继续正常执行了。

到此我们对FOR UPDATE有了一个基本的概念,对于FOR UPDATE 的意思我个人的理解就是我要拿着这个查询去修改,如果同样有人要干这个事儿你就先等着,那么在这个过程中有事务和没有事务又有什么区别和效果呢?我们继续往下看

有事务

    @Overridepublic BankAccount getByAccountNoForUpdate(String accountNo) {return getBaseMapper().selectOne(new LambdaQueryWrapper<BankAccount>().eq(BankAccount::getAccount,accountNo).last("FOR UPDATE"));}@Override@Transactionalpublic void forUpdateAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Override@Transactionalpublic void forUpdateSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}public void operation(String account, double money, BankFlowType bankFlowType) {BankAccount bankAccount = getByAccountNoForUpdate(account);BigDecimal preBalance = bankAccount.getBalance();if (BankFlowType.inflow.getCode() == bankFlowType.getCode()){bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));}else{bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));}int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程"+(BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "入" : "出")+"账:"+money+ " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setFlowType(bankFlowType);operation.setAfterMoney(bankAccount.getBalance());operation.setOperator((BankFlowType.inflow.getCode() == bankFlowType.getCode() ? "存储者" : "消费者"));operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}}

没有事务

	@Overridepublic void forUpdateAddMoney(String account, double money) {operation(account,money,BankFlowType.inflow);}@Overridepublic void forUpdateSubMoney(String account, double money) {operation(account,money,BankFlowType.outflow);}public void operation(String account, double money, BankFlowType bankFlowType) {//...}
  • 总结:
    • 有事务:
      • 线程A开启事务,线程B也开启事务,线程A执行FOR UPDAT查询getByAccountNoForUpdate(),同时给该行添加行锁,线程B也执行FOR UPDATE查询getByAccountNoForUpdate(),发现需要等待A线程的事务释放行锁,B线程等待,A线程执行结束提交事务释放行锁(事务提交后(即事务结束时)才会释放),此时B线程拿到锁,继续执行然后提交事务然后释放锁。
    • 无事务:
      • 无事务情况下的FOR UPDATE 会锁定数据库行,但锁的持有时间仅限当前 SQL 语句的执行期间。如果没有事务,锁会在查询结束后立即释放,其他线程可以立刻修改数据,导致并发问题。在我们的示例中线程A FOR Update 查询并加行锁,查询结束后立即释放锁,此时B线程同样查询并加锁,然后立即释放锁,此时A线程还没有修改,因此导致A线程与B线程拿到的都是1000元余额,最终导致A线程修改为1100,B线程修改为900

乐观锁

使用mybatis plus的乐观锁插件 version注解实现,在更新记录的时候返回的值大于零即表示成功int row = getBaseMapper().updateById(bankAccount); 否则重试即可,并且这里也可以不添加事务。关于乐观锁与悲观锁的概念与实现这里不多叙述。

@Overridepublic void optimisticLockAddMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().add(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程入账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.inflow);operation.setOperator("生产者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{//乐观锁冲突更新失败重试System.out.println("乐观锁冲突 生产失败 重试");optimisticLockAddMoney(account,money);}}@Overridepublic void optimisticLockSubMoney(String account, double money) {BankAccount bankAccount = getByAccountNo(account);BigDecimal preBalance = bankAccount.getBalance();bankAccount.setBalance(bankAccount.getBalance().subtract(BigDecimal.valueOf(money)));int row = getBaseMapper().updateById(bankAccount);System.out.println(Thread.currentThread() + " 线程出账:"+money + " 当前余额:"+ bankAccount.getBalance().toPlainString());if (row > 0 ){//添加资金记录BankOperation operation = new BankOperation();operation.setAccount(account);operation.setMoney(BigDecimal.valueOf(money));operation.setPreMoney(preBalance);operation.setAfterMoney(bankAccount.getBalance());operation.setFlowType(BankFlowType.outflow);operation.setOperator("消费者");operation.setOperationTime(LocalDateTime.now());bankOperationService.save(operation);}else{System.out.println("乐观锁冲突 消费失败 重试");optimisticLockSubMoney(account,money);}}

ReentrantLock

与synchronized效果类似,不过ReentrantLock更灵活一些。

分布式锁

这里我们使用基于Redis的分布式锁,所谓的分布式锁就是同样的代码在集群环境中的控制,例如负载均衡策略将不同的请求转发到不同的节点处理加钱或者减钱业务,那么这个时候同样需要保障线程安全问题,如果我们使用synchronized或者ReentrantLock这些锁,只能保证在单节点的情况下不出现并发问题,集群环境并不能保证。

@Overridepublic void setNXAddMoney(String account, double money) {String key = "bank:account:"+account;boolean b = lock(key);if (b){operation(account,money,BankFlowType.inflow);deleteLock(key);}}@Overridepublic void setNXSubMoney(String account, double money) {String key = "bank:account:"+account;boolean b = lock(key);if (b){operation(account,money,BankFlowType.outflow);deleteLock(key);}}/*** 获得锁** @param lock* @return*/public boolean lock(String lock) {return (boolean) redisTemplate.execute((RedisCallback) connection -> {//获取时间毫秒值long expireAt = System.currentTimeMillis() + 600 + 1;//获取锁Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes());if (acquire) {return true;} else {byte[] bytes = connection.get(lock.getBytes());//非空判断if (Objects.nonNull(bytes) && bytes.length > 0) {long expireTime = Long.parseLong(new String(bytes));// 如果锁已经过期if (expireTime < System.currentTimeMillis()) {// 重新加锁,防止死锁byte[] set = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + 600 + 1).getBytes());return Long.parseLong(new String(set)) < System.currentTimeMillis();}}}return false;});}/*** 删除锁** @param key*/public void deleteLock(String key) {redisTemplate.delete(key);}

对于上面的代码表面看起来似乎可以,但是实际上并不行,存在很多问题,首先加钱和减钱的方法在获取不到锁的情况会直接跳过,那么是否可以加入重试机制?

	@Overridepublic void setNXSubMoney(String account, double money) {String key = "bank:account:"+account;boolean b = lock(key);if (b){operation(account,money,BankFlowType.outflow);deleteLock(key);}else {setNXSubMoney(account,money);}}

这么虽然也可以,但是可能导致递归过深栈溢出问题,高并发下可能递归过深,抛出 StackOverflowError。可以改为循环重试,当获取不到锁的时候休眠100毫秒再次重试获取锁,直到获取到为止。

while (!lock(key)) {Thread.sleep(100); // 避免CPU空转
}
operation(account, money, flowType);
deleteLock(key);

当然这也并不是完美的解决方案,还会存在很多问题,例如:

  • 锁过期时间固定(600ms)
    • 问题:如果业务操作超过 200ms,锁可能提前释放,导致其他线程进入。
    • 改进:使用 看门狗机制(如 Redisson 的 lockWatchdogTimeout)自动续期。或根据业务耗时动态调整过期时间。
  • getSet 非原子性
    • 问题:
      byte[] set = connection.getSet(lock.getBytes(), newExpireTime.getBytes());
      在 getSet 执行间隙,其他线程可能修改锁,导致误判。

    • 改进:用 Lua 脚本保证原子性:

      if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("set", KEYS[1], ARGV[2])
      elsereturn 0
      end
      
  • 锁释放风险
    • 问题:deleteLock 直接删除锁,可能误删其他线程的锁(如果当前线程因 GC 停顿导致锁过期后被其他线程获取)。
    • 改进:删除前验证锁的值是否属于当前线程:
      public void deleteLock(String key, String expectedValue) {String lockValue = redisTemplate.opsForValue().get(key);if (expectedValue.equals(lockValue)) {redisTemplate.delete(key);}
      }
      
  • 未处理锁获取失败
    • 问题:如果始终无法获取锁,线程会无限重试。
    • 改进:增加最大重试次数或超时时间:
      int retry = 0;
      while (!lock(key) && retry++ < 3) {Thread.sleep(100);
      }
      if (retry >= 3) {throw new RuntimeException("获取锁失败");
      }
      

推荐直接使用 Redisson(已解决上述所有问题)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87363.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87363.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

青少年编程与数学 02-022 专业应用软件简介 03 三维建模及动画软件:Autodesk Maya

青少年编程与数学 02-022 专业应用软件简介 03 三维建模及动画软件&#xff1a;Autodesk Maya 一、什么是三维建模二、什么是计算机动画三、三维建模及动画设计软件的发展历程&#xff08;一&#xff09;早期探索阶段&#xff08;20世纪60年代 - 80年代&#xff09;&#xff08…

获得 OCM 大师证书学习历练

当我站在山城重庆的洪崖洞前&#xff0c;看着璀璨的夜景倒映在嘉陵江上&#xff0c;手中紧握着 OCM 大师证书&#xff0c;那一刻&#xff0c;备考时的艰辛与考试时的紧张都化作了满满的成就感。这段在重庆获得 OCM 大师证书的经历&#xff0c;就像一场充满挑战与惊喜的冒险&…

srs-gb28181 与 SRS 5.0 对 GB28181 国标支持

srs-gb28181 是基于 SRS 4.0/5.0 的国标&#xff08;GB28181&#xff09;扩展分支&#xff0c;而 SRS 5.0 官方版本也逐步增强了对 GB28181 的支持。以下是两者的主要区别&#xff1a; 1. 功能支持对比 功能srs-gb28181&#xff08;扩展分支&#xff09;SRS 5.0&#xff08;官…

算法第18天|继续二叉树:修剪二叉搜索树、将有序数组转化为二叉搜索树、把二叉搜索树转换为累加树

今日总结&#xff1a; 1、修剪二叉搜索树&#xff08;重点思考如何修剪&#xff09; &#xff08;1&#xff09;递归的返回值是什么&#xff1f;&#xff08;与插入、删除一样&#xff09; &#xff08;2&#xff09;递归的单层逻辑一定要缕清&#xff08;3中情况讨论&#xff…

C# 多线程(三)线程池

目录 1.通过TPL使用线程池 2.不使用TPL进入线程池的办法 异步委托 3.线程池优化技术 最小线程数的工作原理 每当启动一个新线程时&#xff0c;系统都需要花费数百微秒来分配资源&#xff0c;例如创建独立的局部变量栈空间。默认情况下&#xff0c;每个线程还会占用约1…

学习笔记(29):训练集与测试集划分详解:train_test_split 函数深度解析

学习笔记(29):训练集与测试集划分详解&#xff1a;train_test_split 函数深度解析 一、为什么需要划分训练集和测试集&#xff1f; 在机器学习中&#xff0c;模型需要经历两个核心阶段&#xff1a; 训练阶段&#xff1a;用训练集数据学习特征与目标值的映射关系&#xff08;…

【全网唯一】自动化编辑器 Windows版纯本地离线文字识别插件

目的 自动化编辑器超轻量级RPA工具&#xff0c;零代码制作RPA自动化任务&#xff0c;解放双手&#xff0c;释放双眼&#xff0c;轻松玩游戏&#xff0c;刷任务。本篇文章主要讲解下自动化编辑器的TomatoOCR纯本地离线文字识别Windows版插件如何使用和集成。 准备工作 1、下载自…

GitHub 2FA绑定

GitHub 2FA绑定 作为全球最大的代码托管平台&#xff0c;GitHub对账号安全的重视程度不断提升——自2023年3月起&#xff0c;GitHub已要求所有在GitHub.com上贡献代码的用户必须启用双因素身份验证&#xff08;2FA&#xff09;。如果你是符合条件的用户&#xff0c;会收到一封…

pytest fixture基础大全详解

一、介绍 作用 fixture主要有两个作用&#xff1a; 复用测试数据和环境&#xff0c;可以减少重复的代码&#xff1b;可以在测试用例运行前和运行后设置和清理资源&#xff0c;避免对测试结果产生影响&#xff0c;同时也可以提高测试用例的运行效率。 优势 pytest框架的fix…

Unity知识点-Renderer常用材质变量

本篇总结了Unity中renderer的3种常用的材质相关的变量&#xff1a;renderer.material,renderer.sharedMaterial,renderer.MaterialPropertyBlock。以及三者对SRPBatcher的影响。 一.介绍及对比 1.概念介绍 1.material 定义&#xff1a;material 是Render组件&#xff08;如…

【算法】​​如何判断时间复杂度?

文章目录 1. 什么是时间复杂度&#xff1f;为什么需要时间复杂度&#xff1f; 2. 常见时间复杂度对比3. 如何分析时间复杂度&#xff1f;&#xff08;Java版&#xff09;&#x1f539; 步骤1&#xff1a;找出基本操作&#x1f539; 步骤2&#xff1a;分析循环结构&#xff08;1…

MySQL使用C语言连接

文章目录 版本查看以及编译mysql接口介绍初始化链接数据库下发mysql命令mysql_query获取执行结果mysql_store_result获取结果行数mysql_num_rows获取结果列数mysql_num_fields获取列名mysql_fetch_fields获取结果内容mysql_fetch_row关闭mysql链接mysql_closeC语言操作mysql查看…

坚持每日Codeforces三题挑战:Day 7 - 题目详解(2025-06-11,难度:1200,1300,1500)

每天坚持写三道题第七天&#xff1a; Problem - A - Codeforces 1200 Problem - B - Codeforces 1300 Problem - A - Codeforces 1500 目录 题目一: 题目大意: 解题思路: 代码(C): 题目二: 题目大意: 解题思路: 代码(C): 题目三: 题目大意: 解题思路: 代码(C): …

洛谷 P4305:[JLOI2011] 不重复数字 ← unordered_set

【题目来源】 https://www.luogu.com.cn/problem/P4305 【题目描述】 给定 n 个数&#xff0c;要求把其中重复的去掉&#xff0c;只保留第一次出现的数。 【输入格式】 第一行一个整数 T&#xff0c;表示数据组数。 对于每组数据&#xff0c;第一行一个整数 n。第二行 n 个数…

STM32固件升级设计——SPIFLASH模拟U盘升级固件

目录 概述 一、功能描述 1、BootLoader部分&#xff1a; 2、APP部分&#xff1a; 二、BootLoader程序制作 1、分区定义 2、 主函数 3、配置USB 4、配置fatfs文件系统 5、程序跳转 三、APP程序制作 四、工程配置&#xff08;默认KEIL5&#xff09; 五、运行测试 六…

解锁阿里云日志服务SLS:云时代的日志管理利器

引言&#xff1a;开启日志管理新篇 在云计算时代&#xff0c;数据如同企业的血液&#xff0c;源源不断地产生并流动。从用户的每一次点击&#xff0c;到系统后台的每一个操作&#xff0c;数据都在记录着企业运营的轨迹。而在这些海量的数据中&#xff0c;日志数据占据着至关重…

Keye-VL-8B-Preview:由快手 Kwai Keye 团队精心打造的尖端多模态大语言模型

&#x1f525; News 2025.06.26 &#x1f31f; 我们非常自豪地推出Kwai Keye-VL&#xff0c;这是快手Kwai Keye团队精心打造的前沿多模态大语言模型。作为快手先进技术生态中的核心AI产品&#xff0c;Keye在视频理解、视觉感知和推理任务方面表现卓越&#xff0c;树立了新的性…

Web前端之JavaScript实现图片圆环、圆环元素根据角度指向圆心、translate、rotate

MENU 前言效果HtmlStyleJavaScript 前言 代码段创建了一个由6个WiFi图标组成的圆形排列&#xff0c;每个图标均匀分布在圆周上。 效果 Html 代码 <div class"ring"><div class"item"><img class"img" src"../image/icon/W…

1 Studying《Computer Vision: Algorithms and Applications 2nd Edition》11-15

目录 Chapter 11 Structure from motion and SLAM 11.1 几何内禀校准 11.2 姿态估计 11.3 从运动中获得的双帧结构 11.4 从运动中提取多帧结构 11.5 同步定位与建图&#xff08;SLAM&#xff09; 11.6 额外阅读 Chapter 12 Depth estimation 12.1 极点几何 12.2 稀疏…

phpstudy 可以按照mysql 数据库

phpstudy 可以按照mysql 数据库 PHPStudy&#xff08;小皮面板&#xff09;是一款专为开发者设计的集成环境工具&#xff0c;涵盖服务器配置、开发环境搭建、网站部署等多项功能。以下是其核心用途及优势的详细解析&#xff1a; 一、开发环境快速搭建 一站式集成环境集成Apa…