环境准备
确保项目中已引入 Spring Boot、Spring Cloud、Kafka 和 MyBatis 的依赖。以下是一个典型的 Maven 依赖配置:
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Cloud Starter --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter</artifactId></dependency><!-- Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- MyBatis --><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.0</version></dependency><!-- 数据库驱动(如 MySQL) --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>
</dependencies>
配置 Kafka
在 application.yml
或 application.properties
中配置 Kafka 的相关属性:
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
配置 MyBatis
在 application.yml
中配置 MyBatis 和数据源:
spring:datasource:url: jdbc:mysql://localhost:3306/testusername: rootpassword: passworddriver-class-name: com.mysql.cj.jdbc.Drivermybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.demo.model
创建 Kafka 生产者和消费者
定义一个 Kafka 生产者用于发送消息:
@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
定义一个 Kafka 消费者用于接收消息:
@Service
public class KafkaConsumerService {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}
创建 MyBatis Mapper 和实体
定义一个实体类:
public class User {private Long id;private String name;private String email;// getters and setters
}
创建 MyBatis Mapper 接口:
@Mapper
public interface UserMapper {@Select("SELECT * FROM users WHERE id = #{id}")User findById(Long id);@Insert("INSERT INTO users(name, email) VALUES(#{name}, #{email})")@Options(useGeneratedKeys = true, keyProperty = "id")void insert(User user);
}
业务逻辑整合
在业务逻辑中整合 Kafka 和 MyBatis,例如在接收到 Kafka 消息后保存到数据库:
@Service
public class UserService {@Autowiredprivate UserMapper userMapper;@Autowiredprivate KafkaProducerService kafkaProducerService;public void processUser(User user) {userMapper.insert(user);kafkaProducerService.sendMessage("user-topic", "User saved: " + user.getName());}
}
测试
编写一个简单的测试 Controller 来验证整合是否成功:
@RestController
@RequestMapping("/api")
public class TestController {@Autowiredprivate UserService userService;@PostMapping("/user")public String saveUser(@RequestBody User user) {userService.processUser(user);return "User saved successfully!";}
}
注意事项
- 确保 Kafka 服务已启动并正常运行。
- 确保数据库已正确配置,并且表结构与实体类匹配。
- 在 Spring Boot 主类上添加
@EnableKafka
注解以启用 Kafka 支持。
通过以上步骤,可以成功整合 Spring Cloud、Kafka 和 MyBatis,实现消息的发送、接收以及数据库操作。