SpringBoot3.x入门到精通系列:4.2 整合 Kafka 详解

SpringBoot 3.x 整合 Kafka 详解

🎯 Kafka简介

Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具有高吞吐量、低延迟、可扩展性和容错性等特点。

核心概念

  • Producer: 生产者,发送消息到Kafka集群
  • Consumer: 消费者,从Kafka集群读取消息
  • Topic: 主题,消息的分类,类似于消息队列
  • Partition: 分区,Topic的物理分割,提高并行处理能力
  • Broker: 代理,Kafka集群中的服务器节点
  • Consumer Group: 消费者组,多个消费者组成的组,共同消费Topic
  • Offset: 偏移量,消息在分区中的位置标识

核心特性

  • 高吞吐量: 支持每秒数百万条消息
  • 低延迟: 毫秒级的消息传递延迟
  • 持久化: 消息持久化存储到磁盘
  • 分布式: 支持集群部署和水平扩展
  • 容错性: 支持数据复制和故障恢复

🚀 快速开始

1. 添加依赖

<dependencies><!-- SpringBoot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SpringBoot Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 数据验证 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Kafka测试依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. Kafka配置

spring:# Kafka配置kafka:# Kafka服务器地址bootstrap-servers: localhost:9092# 生产者配置producer:# 重试次数retries: 3# 批量发送大小batch-size: 16384# 缓冲区大小buffer-memory: 33554432# 键序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值序列化器value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 确认模式acks: all# 压缩类型compression-type: gzip# 发送超时时间properties:delivery.timeout.ms: 120000request.timeout.ms: 30000# 消费者配置consumer:# 消费者组IDgroup-id: demo-group# 自动提交偏移量enable-auto-commit: false# 自动提交间隔auto-commit-interval: 1000# 键反序列化器key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 从最早的消息开始消费auto-offset-reset: earliest# 每次拉取的最大记录数max-poll-records: 500# 拉取超时时间fetch-max-wait: 500# JSON反序列化配置properties:spring.json.trusted.packages: "com.example.demo.dto"spring.json.type.mapping: "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto"# 监听器配置listener:# 确认模式ack-mode: manual_immediate# 并发数concurrency: 3# 轮询超时时间poll-timeout: 3000# 错误处理器type: batch# 日志配置
logging:level:org.springframework.kafka: DEBUGorg.apache.kafka: DEBUG

🔧 Kafka配置类

package com.example.demo.config;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;/*** 生产者配置*/@Beanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");return new DefaultKafkaProducerFactory<>(configProps);}/*** KafkaTemplate配置*/@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** 消费者配置*/@Beanpublic ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);// JSON反序列化配置props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.TYPE_MAPPINGS, "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto");return new DefaultKafkaConsumerFactory<>(props);}/*** 监听器容器工厂配置*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 设置并发数factory.setConcurrency(3);// 设置确认模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置错误处理器factory.setCommonErrorHandler(new org.springframework.kafka.listener.DefaultErrorHandler());return factory;}/*** 用户事件消费者工厂*/@Beanpublic ConsumerFactory<String, UserEventDto> userEventConsumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserEventDto.class);return new DefaultKafkaConsumerFactory<>(props);}/*** 用户事件监听器容器工厂*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, UserEventDto> userEventKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, UserEventDto> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(userEventConsumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

📊 消息DTO类

1. 用户事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.time.LocalDateTime;public class UserEventDto {@NotBlank(message = "事件ID不能为空")private String eventId;@NotBlank(message = "事件类型不能为空")private String eventType; // CREATE, UPDATE, DELETE@NotNull(message = "用户ID不能为空")private Long userId;private String username;private String email;private String operation;private String operatorId;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;private Object data; // 额外数据// 构造函数public UserEventDto() {this.timestamp = LocalDateTime.now();}public UserEventDto(String eventId, String eventType, Long userId, String operation) {this();this.eventId = eventId;this.eventType = eventType;this.userId = userId;this.operation = operation;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getEmail() { return email; }public void setEmail(String email) { this.email = email; }public String getOperation() { return operation; }public void setOperation(String operation) { this.operation = operation; }public String getOperatorId() { return operatorId; }public void setOperatorId(String operatorId) { this.operatorId = operatorId; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public Object getData() { return data; }public void setData(Object data) { this.data = data; }@Overridepublic String toString() {return "UserEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", userId=" + userId +", username='" + username + '\'' +", operation='" + operation + '\'' +", timestamp=" + timestamp +'}';}
}

2. 订单事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;public class OrderEventDto {@NotBlank(message = "事件ID不能为空")private String eventId;@NotBlank(message = "事件类型不能为空")private String eventType; // CREATED, PAID, SHIPPED, DELIVERED, CANCELLED@NotNull(message = "订单ID不能为空")private Long orderId;@NotNull(message = "用户ID不能为空")private Long userId;private String orderNo;private BigDecimal totalAmount;private String status;private List<OrderItem> items;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// 订单项public static class OrderItem {private Long productId;private String productName;private Integer quantity;private BigDecimal price;// 构造函数public OrderItem() {}public OrderItem(Long productId, String productName, Integer quantity, BigDecimal price) {this.productId = productId;this.productName = productName;this.quantity = quantity;this.price = price;}// Getter和Setter方法public Long getProductId() { return productId; }public void setProductId(Long productId) { this.productId = productId; }public String getProductName() { return productName; }public void setProductName(String productName) { this.productName = productName; }public Integer getQuantity() { return quantity; }public void setQuantity(Integer quantity) { this.quantity = quantity; }public BigDecimal getPrice() { return price; }public void setPrice(BigDecimal price) { this.price = price; }}// 构造函数public OrderEventDto() {this.timestamp = LocalDateTime.now();}public OrderEventDto(String eventId, String eventType, Long orderId, Long userId) {this();this.eventId = eventId;this.eventType = eventType;this.orderId = orderId;this.userId = userId;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getOrderId() { return orderId; }public void setOrderId(Long orderId) { this.orderId = orderId; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getOrderNo() { return orderNo; }public void setOrderNo(String orderNo) { this.orderNo = orderNo; }public BigDecimal getTotalAmount() { return totalAmount; }public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }public String getStatus() { return status; }public void setStatus(String status) { this.status = status; }public List<OrderItem> getItems() { return items; }public void setItems(List<OrderItem> items) { this.items = items; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "OrderEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", orderId=" + orderId +", userId=" + userId +", orderNo='" + orderNo + '\'' +", totalAmount=" + totalAmount +", status='" + status + '\'' +", timestamp=" + timestamp +'}';}
}

📤 消息生产者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.CompletableFuture;@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// Topic名称常量public static final String USER_EVENTS_TOPIC = "user-events";public static final String ORDER_EVENTS_TOPIC = "order-events";public static final String NOTIFICATION_TOPIC = "notifications";/*** 发送用户事件*/public void sendUserEvent(UserEventDto userEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(USER_EVENTS_TOPIC, userEvent.getUserId().toString(), userEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("用户事件发送成功: " + userEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("用户事件发送失败: " + userEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送用户事件异常: " + e.getMessage());}}/*** 发送订单事件*/public void sendOrderEvent(OrderEventDto orderEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(ORDER_EVENTS_TOPIC, orderEvent.getOrderId().toString(), orderEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("订单事件发送成功: " + orderEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("订单事件发送失败: " + orderEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送订单事件异常: " + e.getMessage());}}/*** 发送通知消息*/public void sendNotification(String message) {try {String messageId = UUID.randomUUID().toString();CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(NOTIFICATION_TOPIC, messageId, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("通知消息发送成功: " + messageId + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("通知消息发送失败: " + messageId + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送通知消息异常: " + e.getMessage());}}/*** 发送带分区的消息*/public void sendMessageToPartition(String topic, int partition, String key, Object message) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, partition, key, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息发送到分区成功: partition=" + partition + " offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("消息发送到分区失败: " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送分区消息异常: " + e.getMessage());}}/*** 批量发送消息*/public void sendBatchMessages(String topic, java.util.List<Object> messages) {messages.forEach(message -> {String key = UUID.randomUUID().toString();kafkaTemplate.send(topic, key, message);});// 刷新缓冲区,确保消息立即发送kafkaTemplate.flush();System.out.println("批量发送 " + messages.size() + " 条消息完成");}
}

📥 消息消费者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class KafkaConsumerService {/*** 消费用户事件*/@KafkaListener(topics = "user-events", groupId = "user-event-group", containerFactory = "userEventKafkaListenerContainerFactory")public void consumeUserEvent(@Payload UserEventDto userEvent,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset,Acknowledgment acknowledgment) {try {System.out.println("接收到用户事件: " + userEvent);System.out.println("Topic: " + topic + ", Partition: " + partition + ", Offset: " + offset);// 处理用户事件的业务逻辑processUserEvent(userEvent);// 手动确认消息acknowledgment.acknowledge();System.out.println("用户事件处理完成: " + userEvent.getEventId());} catch (Exception e) {System.err.println("处理用户事件失败: " + e.getMessage());// 这里可以实现重试逻辑或将消息发送到死信队列}}/*** 消费订单事件*/@KafkaListener(topics = "order-events", groupId = "order-event-group")public void consumeOrderEvent(ConsumerRecord<String, OrderEventDto> record,Acknowledgment acknowledgment) {try {OrderEventDto orderEvent = record.value();System.out.println("接收到订单事件: " + orderEvent);System.out.println("Key: " + record.key() + ", Partition: " + record.partition() + ", Offset: " + record.offset());// 处理订单事件的业务逻辑processOrderEvent(orderEvent);// 手动确认消息acknowledgment.acknowledge();System.out.println("订单事件处理完成: " + orderEvent.getEventId());} catch (Exception e) {System.err.println("处理订单事件失败: " + e.getMessage());}}/*** 消费通知消息*/@KafkaListener(topics = "notifications", groupId = "notification-group")public void consumeNotification(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,Acknowledgment acknowledgment) {try {System.out.println("接收到通知消息: " + message);System.out.println("Message Key: " + key);// 处理通知消息的业务逻辑processNotification(message);// 手动确认消息acknowledgment.acknowledge();System.out.println("通知消息处理完成");} catch (Exception e) {System.err.println("处理通知消息失败: " + e.getMessage());}}/*** 批量消费消息*/@KafkaListener(topics = "batch-topic", groupId = "batch-group")public void consumeBatchMessages(List<ConsumerRecord<String, Object>> records,Acknowledgment acknowledgment) {try {System.out.println("接收到批量消息,数量: " + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println("处理消息: Key=" + record.key() + ", Value=" + record.value() + ", Partition=" + record.partition() + ", Offset=" + record.offset());// 处理单条消息processBatchMessage(record.value());}// 批量确认所有消息acknowledgment.acknowledge();System.out.println("批量消息处理完成");} catch (Exception e) {System.err.println("处理批量消息失败: " + e.getMessage());}}/*** 多Topic消费*/@KafkaListener(topics = {"user-events", "order-events"}, groupId = "multi-topic-group")public void consumeMultiTopicEvents(ConsumerRecord<String, Object> record,Acknowledgment acknowledgment) {try {String topic = record.topic();Object value = record.value();System.out.println("接收到多Topic消息: Topic=" + topic + ", Value=" + value);// 根据Topic类型处理不同的消息switch (topic) {case "user-events":if (value instanceof UserEventDto) {processUserEvent((UserEventDto) value);}break;case "order-events":if (value instanceof OrderEventDto) {processOrderEvent((OrderEventDto) value);}break;default:System.out.println("未知Topic: " + topic);}acknowledgment.acknowledge();} catch (Exception e) {System.err.println("处理多Topic消息失败: " + e.getMessage());}}// 业务处理方法private void processUserEvent(UserEventDto userEvent) {// 根据事件类型处理用户事件switch (userEvent.getEventType()) {case "CREATE":System.out.println("处理用户创建事件: " + userEvent.getUserId());// 发送欢迎邮件、初始化用户数据等break;case "UPDATE":System.out.println("处理用户更新事件: " + userEvent.getUserId());// 同步用户信息到其他系统break;case "DELETE":System.out.println("处理用户删除事件: " + userEvent.getUserId());// 清理用户相关数据break;default:System.out.println("未知用户事件类型: " + userEvent.getEventType());}}private void processOrderEvent(OrderEventDto orderEvent) {// 根据事件类型处理订单事件switch (orderEvent.getEventType()) {case "CREATED":System.out.println("处理订单创建事件: " + orderEvent.getOrderId());// 库存扣减、发送确认邮件等break;case "PAID":System.out.println("处理订单支付事件: " + orderEvent.getOrderId());// 更新订单状态、准备发货等break;case "SHIPPED":System.out.println("处理订单发货事件: " + orderEvent.getOrderId());// 发送物流信息、更新状态等break;case "DELIVERED":System.out.println("处理订单送达事件: " + orderEvent.getOrderId());// 确认收货、评价提醒等break;case "CANCELLED":System.out.println("处理订单取消事件: " + orderEvent.getOrderId());// 退款处理、库存回滚等break;default:System.out.println("未知订单事件类型: " + orderEvent.getEventType());}}private void processNotification(String message) {System.out.println("处理通知消息: " + message);// 发送邮件、短信、推送通知等}private void processBatchMessage(Object message) {System.out.println("处理批量消息项: " + message);// 批量处理逻辑}
}

🎮 Controller层

package com.example.demo.controller;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import com.example.demo.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import jakarta.validation.Valid;
import java.math.BigDecimal;
import java.util.*;@RestController
@RequestMapping("/api/kafka")
@CrossOrigin(origins = "*")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;/*** 发送用户事件*/@PostMapping("/user-events")public ResponseEntity<Map<String, String>> sendUserEvent(@RequestBody @Valid UserEventDto userEvent) {kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用户事件发送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 发送订单事件*/@PostMapping("/order-events")public ResponseEntity<Map<String, String>> sendOrderEvent(@RequestBody @Valid OrderEventDto orderEvent) {kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "订单事件发送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 发送通知消息*/@PostMapping("/notifications")public ResponseEntity<Map<String, String>> sendNotification(@RequestBody Map<String, String> request) {String message = request.get("message");kafkaProducerService.sendNotification(message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "通知消息发送成功");return ResponseEntity.ok(response);}/*** 快速创建用户事件*/@PostMapping("/quick/user-event")public ResponseEntity<Map<String, String>> quickUserEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long userId = Long.valueOf(request.get("userId").toString());String username = (String) request.get("username");String email = (String) request.get("email");UserEventDto userEvent = new UserEventDto(UUID.randomUUID().toString(),eventType,userId,"API_OPERATION");userEvent.setUsername(username);userEvent.setEmail(email);userEvent.setOperatorId("system");kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用户事件创建并发送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 快速创建订单事件*/@PostMapping("/quick/order-event")public ResponseEntity<Map<String, String>> quickOrderEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long orderId = Long.valueOf(request.get("orderId").toString());Long userId = Long.valueOf(request.get("userId").toString());String orderNo = (String) request.get("orderNo");BigDecimal totalAmount = new BigDecimal(request.get("totalAmount").toString());OrderEventDto orderEvent = new OrderEventDto(UUID.randomUUID().toString(),eventType,orderId,userId);orderEvent.setOrderNo(orderNo);orderEvent.setTotalAmount(totalAmount);orderEvent.setStatus(eventType.toLowerCase());kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "订单事件创建并发送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 批量发送消息*/@PostMapping("/batch")public ResponseEntity<Map<String, String>> sendBatchMessages(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");@SuppressWarnings("unchecked")List<String> messages = (List<String>) request.get("messages");List<Object> messageObjects = new ArrayList<>(messages);kafkaProducerService.sendBatchMessages(topic, messageObjects);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "批量消息发送成功");response.put("count", String.valueOf(messages.size()));return ResponseEntity.ok(response);}/*** 发送到指定分区*/@PostMapping("/partition")public ResponseEntity<Map<String, String>> sendToPartition(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");Integer partition = (Integer) request.get("partition");String key = (String) request.get("key");Object message = request.get("message");kafkaProducerService.sendMessageToPartition(topic, partition, key, message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "消息发送到指定分区成功");response.put("partition", partition.toString());return ResponseEntity.ok(response);}
}

📊 最佳实践

1. 消息设计

  • 设计合理的消息格式
  • 使用版本化的消息结构
  • 包含必要的元数据信息
  • 考虑消息的向后兼容性

2. 性能优化

  • 合理设置批量大小
  • 使用压缩减少网络传输
  • 优化序列化方式
  • 合理设置分区数量

3. 可靠性保证

  • 启用生产者确认机制
  • 实现消费者幂等性
  • 处理重复消息
  • 实现死信队列机制

4. 监控与运维

  • 监控消息积压情况
  • 跟踪消费者延迟
  • 监控集群健康状态
  • 实现告警机制

本文关键词: Kafka, 消息队列, 流处理, 分布式系统, 事件驱动, 微服务通信

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/91953.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/91953.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android audio之 AudioDeviceInventory

1. 类介绍 AudioDeviceInventory 是 Android 音频系统中的一个核心类,位于 frameworks/base/services/core/java/com/android/server/audio/ 路径下。它负责 管理所有音频设备的连接状态,包括设备的添加、移除、状态更新以及策略应用。 设备连接状态管理:记录所有已连接的音…

系统设计入门:成为更优秀的工程师

系统设计入门指南 动机 现在你可以学习如何设计大规模系统&#xff0c;为系统设计面试做准备。本指南包含的是一个有组织的资源集合&#xff0c;旨在帮助你了解如何构建可扩展的系统。 学习设计大规模系统 学习如何设计可扩展系统将帮助你成为更优秀的工程师。系统设计是一个…

Pandas数据分析工具基础

文章目录 0. 学习目标 1. Pandas的数据结构分析 1.1 Series - 序列 1.1.1 Series概念 1.1.2 Series类的构造方法 1.1.3 创建Series对象 1.1.3.1 基于列表创建Series对象 1.1.3.2 基于字典创建Series对象 1.1.4 获取Series对象的数据 1.1.5 Series对象的运算 1.1.6 增删Series对…

大模型——Qwen开源会写中文的生图模型Qwen-Image

Qwen开源会写中文的生图模型Qwen-Image 会写中文,这基本上是开源图片生成模型的独一份了。 这次开源的Qwen-Image 的最大卖点是“像素级文字生成”。它能直接在像素空间内完成排版:从小字注脚到整版海报均可清晰呈现,且同时支持英文字母与汉字。 以下图片均来自官网的生成…

大模型知识库(1)京东云 JoyAgent介绍

一、核心定位​ JoyAgent 是京东云推出的 ​首个 100% 开源的企业级多智能体平台&#xff0c;定位为“可插拔的智能发动机”&#xff0c;旨在通过开箱即用的产品级能力&#xff0c;降低企业部署智能体的门槛。其特点包括&#xff1a; ​完整开源​&#xff1a;前端&#xff0…

PowerShell 入门2: 使用帮助系统

PowerShell 入门 2&#xff1a;使用帮助系统 &#x1f3af; 一、认识 PowerShell 帮助系统 1. 使用 Get-Help 查看命令说明 Get-Help Get-Service或使用别名&#xff1a; gsv2. 更新帮助系统 Update-Help3. 搜索包含关键词的命令&#xff08;模糊搜索&#xff09; Help *log*&a…

hyper-v实战系列:显卡虚拟化(GPU分区)--windows篇详解

一般来说&#xff0c;windows系统中最常使用的虚拟机就3个&#xff1a;vmware workstation&#xff0c;virtualbox和微软系统自带的hyper-v。后面与前两者最大的区别就是能调用物理显卡的性能。 我在这篇博文会详述如何设置windows虚拟机的显卡虚拟化&#xff0c;并会随之…

WebGL应用实时云渲染改造后如何与网页端实现数据通信

WebGL是一种基于OpenGL ES 2.0的Web技术&#xff0c;属于BS架构&#xff0c;它允许在浏览器中渲染交互式3D和2D图形。 随着大场景高精度的开发要求深入&#xff0c;对于较高级的 WebGL 应用程序&#xff0c;需要性能更强的系统要求&#xff0c;如仍然维持低端硬件或浏览器&…

初始化列表,变量存储区域和友元变量

前言初始化列表是书写构造函数的一种方式&#xff0c;某些成员变量之只能通过初始化列表进行初始化。另外学习c不可避免地需要知道什么样的变量存储在什么区域当中如栈&#xff0c;堆&#xff0c;静态区&#xff0c;常量区初始化列表书写格式书写上&#xff0c;初始化列表&…

excel插入复选框 亲测有效

特别说明 1.开始位置是0 2.\u0052是勾选对号 3.\u25A1是不勾选 4.\u0052长度是1 5.\u25A1长度是1 6.汉字长度是1 7.起止位置不能超过索引位置(比如整体长度是6,截止位置最大填写5) 示例代码 package com.zycfc.xz.Util.excel;import org.apache.poi.hssf.usermodel.HSSFRichT…

Mac上优雅简单地使用Git:从入门到高效工作流

Mac上优雅简单地使用Git&#xff1a;从入门到高效工作流 本文将带你解锁在Mac上优雅使用Git的技巧&#xff0c;结合命令行与图形工具&#xff0c;让版本控制变得轻松高效&#xff01; 一、为什么Mac是Git的最佳搭档&#xff1f; 天生支持Unix命令&#xff1a;Git基于Linux开发…

一文了解SOA的纹波

什么是光谱纹波我们在SOA/RSOA/SLD的ASE&#xff08;放大的自发辐射&#xff09;光谱测试中&#xff0c;经常会观察到光谱中有周期性的变化&#xff0c;通常我们称之为纹波。在实际应用中&#xff0c;我们大多不希望这些纹波的存在。添加图片注释&#xff0c;不超过 140 字&…

ossutil 使用方法

目录 ossutil 使用方法 1. &#x1f4e4; 上传文件/文件夹到 OSS 上传单个文件&#xff1a; 上传整个文件夹&#xff08;递归&#xff09;&#xff1a; 2. &#x1f4e5; 从 OSS 下载文件/文件夹 下载单个文件&#xff1a; 下载整个文件夹&#xff1a; ossutil 使用方法…

从“多、老、旧”到“4i焕新”:品牌官方商城(小程序/官网/APP···)的范式跃迁与增长再想象

全新升级版本「佛罗伦萨小镇奥莱GO」商城正式上线&#xff01;会员福利加码 2025年&#xff0c;品牌官方商城应该如何定义&#xff1f;—— 还是一套“电商货架”&#xff1f; 在商派看来&#xff0c;现如今“品牌官方商城”则需要重新定义&#xff0c;结合不同品牌企业的业务…

WIN QT libsndfile库编译及使用

一、概述 libsndfile库是一个用 C 语言编写的开源库&#xff0c;用于读取和写入多种音频文件格式。 环境&#xff1a;QT5.9.9、cmakegui3.23.0、QT的编译器是minWG32 二、安装 1、下载libsndfile源码&#xff0c;连接&#xff1a;https://github.com/libsndfile/libsndfile…

Supergateway教程

Supergateway 是一款专为 MCP&#xff08;Model Context Protocol&#xff09;服务器设计的远程调试与集成工具&#xff0c;通过 SSE&#xff08;Server-Sent Events&#xff09;或 WebSocket&#xff08;WS&#xff09;协议实现基于 stdio 的服务器与客户端的高效通信。 Super…

203.移除链表元素 707.设计链表 206.反转链表

203.移除链表元素 Python链表节点定义&#xff1a; class ListNode:def __init__(self, val, nextNone):self.val valself.next next 性能分析 链表的特性和数组的特性进行一个对比&#xff0c;如图所示&#xff1a; 203. 移除链表元素 这道题就是给大家一个链表&#x…

人工智能之数学基础:利用全概率公式如何将复杂事件转为简单事件

本文重点 全概率公式是概率论中的核心工具,用于计算复杂事件的概率。其核心思想是将复杂事件分解为若干互斥且穷尽的简单事件,通过计算各简单事件的概率及其条件概率,最终求得目标事件的概率。 全概率公式 全概率公式就是将复杂事件简单化,定义如下: 如果随机事件A1,…

飞算JavaAI深度解析:从入门到对比

目录 一、飞算JavaAI是什么 二、如何注册和开始使用 三、使用体验&#xff1a;它能带来什么 四、与其他大模型的对比分析 五、总结与展望 随着人工智能技术的飞速发展&#xff0c;大模型在软件开发领域的应用越来越广泛。其中&#xff0c;代码生成工具作为提升开发效率的利…

Flutter各大主流状态管理框架技术选型分析及具体使用步骤

技术选型决策树 #mermaid-svg-m5gUL7Cpx4rYV2BQ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-m5gUL7Cpx4rYV2BQ .error-icon{fill:#552222;}#mermaid-svg-m5gUL7Cpx4rYV2BQ .error-text{fill:#552222;stroke:#552…