基于redis的stream结构作为消息队列,实现异步秒杀下单
需求:
-
创建一个Stream类型的消息队列,名为stream.oreders
-
修改之前的秒杀下单Lua脚本,在认定有抢够资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId
-
项目启动后,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
首先,使用命令行来创建消息队列:
其次,需要修改Lua脚本:
------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by 20893.--- DateTime: 2025/6/27 15:48----- 1.参数列表--1.1优惠券IDlocal voucherId = ARGV[1]--1.2用户IDlocal userId = ARGV[2]-- 1.3 订单IDlocal orderId = ARGV[3]--2.数据key--2.1库存keylocal stockKey = 'seckill:stock:' .. voucherId--2.2订单keylocal orderKey = 'seckill:order:' .. voucherId--3.业务逻辑--3.1判断库存是否充足-- redis.call('get',stockKey)中取出的值是String类型,需要将其转化成int类型,调用tonumber()方法if (tonumber(redis.call('get', stockKey))<= 0) then--3.2. 库存不足 返回1return 1end--3.3判断用户是否重复下单if (redis.call('sismember', orderKey, userId) == 1) then--3.4. 重复下单 返回2return 2end--3.5扣减库存redis.call('incrby', stockKey, -1)--3.6记录订单redis.call('sadd', orderKey, userId)--3.7发送消息到队列中,xadd stream.orders * k1 v1 ...redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0
还需要修改Java业务代码:
public Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));//2.判断结果是否为0int r = result.intValue();if (result != 0){//2.1.不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//3.返回订单IDreturn Result.ok(orderId);
最终,根据我们的伪代码进行对异步线程中业务流程的修改
while(true){//尝试今天队列,使用阻塞模式,最长等待2000毫秒Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 >");if(msg == null){//null说明没有消息,继续下一次continue;}try{//处理消息,完成后需要确认消息(ACK)handleMessage(msg);}catch(Exception e){while(true){Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 0");if(msg == null){//null说明没有异常消息,所有消息已确认,结束循环break;}try{//说明有异常消息,再次处理handleMessage(msg);}catch(Exception e){//再次出现异常,记录日志,继续循环continue;}}}}
代码展示:
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//该注解表示在类初始化之后执行@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while ( true){try {//获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//判断消息获取是否成功if(list == null || list.isEmpty()){//2.1如果获取失败,说明没有消息,继续下一次循环continue;}//2.2.解析消息中的数据MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//将map对象转换成订单对象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果获取成功,可以下单handleVoucherOrder(order);//3.ACK确认 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());} catch (Exception e) {log.error("获取订单信息异常",e);handlePendingList();}}}private void handlePendingList() {while( true){try {//获取pending-list中的订单信息 xreadgroup group g1 c1 count 1 streams stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.from("0")));//判断消息获取是否成功if(list == null || list.isEmpty()){//2.1如果获取失败,说明pending-list没有消息,继续下一次循环break;}//2.2.解析消息中的数据MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//将map对象转换成订单对象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果获取成功,可以下单handleVoucherOrder(order);//3.ACK确认 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());} catch (Exception e) {log.error("获取pending-list订单信息异常",e);//如果害怕因为报错导致陷入循环,可以设置休眠时间try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}}
整体业务流程描述:
首先尝试从消息队列中读取数据,如果数据获取失败,直接进行下一次循环,再来读一次消息队列,如果获取成功,说明有订单信息需要处理,就去解析订单信息,完成下单,在进行ack确认。在进行下一次读取,继续循环,如果在处理消息的过程抛出异常,导致该消息没有确认,那么该消息就会进入pending-list,就被catch到,在catch中执行handlePendinglist()函数,在该函数中,首先去pending-list获取未确认消息,如果读到,则解析消息,处理,下单,ack确认,如果没有异常消息,就会直接跳出循环,异常处理结束,如果再抛异常,就再度循环。直到pending-list中所有异常全部处理完成为止。
进行测试:
再次测试:
至此优化秒杀下单的业务需求完成。
希望对大家有所帮助。