利用事务钩子函数解决业务异步发送问题
- 一、问题背景
- 二、实现方案
- 1、生产者代码
- 2、消费者代码
- 三、测试与验证
- 1、未开启事务场景
- 2、开启事务场景
- 四、项目结构及源码
一、问题背景
在某项业务中,需要在事务完成后,写入日志到某数据库中。需要要么都成功,要么都失败,而且需要异步实现。在不考虑分布式事务框架下,如何实现这个业务功能呢?
二、实现方案
前提需要启动kafka_2.12-3.9.1内置的zookeeper和kafka。
在kafka创建好topic
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testTopic
可以利用事务钩子函数实现异步发送,保证同时成功和失败。注册事务钩子,在事务提交或回滚后执行。
if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 调用异步方法发送日志System.out.println("事务未开启");// 异步发送日志(解决由于同一个类内部方法调用不会创建代理,所以aop不生效,则@Async注解无作用问题)kafkaSender.send();} else {// 注册事务钩子,在事务提交或回滚后执行TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCompletion(int status) {System.out.println("事务开启并执行完毕");// 异步发送日志(解决由于同一个类内部方法调用不会创建代理,所以aop不生效,则@Async注解无作用问题)kafkaSender.send();}});}
1、生产者代码
KafkaController
@RestController
public class KafkaController {@Autowiredprivate TestService testService;@GetMapping("/send/{type}")public String sendMessageToKafka(@PathVariable int type) {if(type == 1){// 模拟执行事务未开启的业务逻辑testService.executeServiceNoTranscational(type);}else{// 模拟执行事务开启的业务逻辑testService.executeService(type);}//模拟还要执行其他的serviceSystem.out.println("执行其他业务逻辑");return "ok";}
}
KafkaSender
@Component
@Slf4j
public class KafkaSender {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@Asyncpublic void send() {System.out.println("异步发送消息");Map<String, String> messageMap = new HashMap<>();messageMap.put("log", "日志:执行完成");ObjectMapper objectMapper = new ObjectMapper();String data;try {data = objectMapper.writeValueAsString(messageMap);} catch (JsonProcessingException e) {throw new RuntimeException(e);}String key = String.valueOf(UUID.randomUUID());//kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)this.send("testTopic", key, data);}public void send(String topic, String key, String data) {//发送消息CompletableFuture<SendResult<String, Object>> completable = kafkaTemplate.send(topic, key, data);completable.whenCompleteAsync((result, ex) -> {if (null == ex) {log.info(topic + "生产者发送消息成功:" + result.toString());} else {log.info(topic + "生产者发送消息失败:" + ex.getMessage());}});}
}
LogService注册钩子函数,异步发送
@Service
public class LogService {@AutowiredKafkaSender kafkaSender;public void sendLogAsync() {if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 调用异步方法发送日志System.out.println("事务未开启");// 异步发送日志(解决由于同一个类内部方法调用不会创建代理,所以aop不生效,则@Async注解无作用问题)kafkaSender.send();} else {// 注册事务钩子,在事务提交或回滚后执行TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCompletion(int status) {System.out.println("事务开启并执行完毕");// 异步发送日志(解决由于同一个类内部方法调用不会创建代理,所以aop不生效,则@Async注解无作用问题)kafkaSender.send();}});}}
}
TestService
@Service
public class TestService {@AutowiredLogService logService;@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class)public void executeService(int type){System.out.println("执行业务逻辑");/*System.out.println("业务执行完成");if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 调用异步方法发送日志System.out.println("事务未开启");}else{System.out.println("事务已开启");}*/logService.sendLogAsync();}public void executeServiceNoTranscational(int type){System.out.println("业务执行完成");if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 调用异步方法发送日志System.out.println("事务未开启");}else{System.out.println("事务已开启");}logService.sendLogAsync();}
}
2、消费者代码
KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaReceiver listener() {return new KafkaReceiver();}
}
KafkaReceiver
@Component
@Slf4j
public class KafkaReceiver {/*** 下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用","隔开*/@KafkaListener(topics = {"testTopic"})public void receive(ConsumerRecord<?, ?> record){log.info("消费者收到的消息key: " + record.key());log.info("消费者收到的消息value: " + record.value().toString());}
}
三、测试与验证
1、未开启事务场景
生产者执行结果
消费者执行结果
2、开启事务场景
生产者执行结果
消费者执行结果
四、项目结构及源码
源码下载,欢迎star!