2.1 本地开发环境搭建
单机模式安装
- 下载与解压:前往Apache Kafka 官网,下载最新稳定版本的 Kafka 二进制包(如
kafka_2.13-3.6.0.tgz
,其中2.13
为 Scala 版本)。解压到本地目录,例如/opt/kafka
:
tar -xzf kafka\_2.13-3.6.0.tgz
mv kafka\_2.13-3.6.0 /opt/kafka
- 配置文件调整:Kafka 的核心配置文件位于
/opt/kafka/config
目录下。
server.properties
:修改关键参数,如listeners=PLAINTEXT://``localhost:9092
指定 Broker 监听地址和端口;log.dirs=/var/lib/kafka-logs
设置消息存储目录;zookeeper.connect=``localhost:2181
(若使用 Zookeeper)配置元数据管理地址。zookeeper.properties
(若未单独安装 Zookeeper):可保持默认配置,默认数据存储目录为/tmp/zookeeper
,端口为2181
。
- 启动服务:依次启动 Zookeeper 和 Kafka Broker:
# 启动Zookeeper(若未单独安装)
/opt/kafka/bin/zookeeper-server-start.sh
/opt/kafka/config/zookeeper.properties# 启动Kafka Broker
/opt/kafka/bin/kafka-server-start.sh
/opt/kafka/config/server.properties
启动后,Kafka 将在localhost:9092
监听 Producer 和 Consumer 的请求。
Docker 容器化部署
使用 Docker Compose 可快速搭建多节点 Kafka 集群,并简化环境管理:
- 创建
docker-compose.yml
文件:
version: '3' # 指定Docker Compose文件版本为3services:zookeeper:image: confluentinc/cp-zookeeper:7.3.0 # 使用Confluent提供的Zookeeper镜像,版本7.3.0environment:ZOOKEEPER_CLIENT_PORT: 2181 # 设置Zookeeper客户端连接端口为2181ZOOKEEPER_TICK_TIME: 2000 # 设置Zookeeper的心跳时间(单位:毫秒)ports:- "2181:2181" # 将容器内的2181端口映射到主机的2181端口kafka:image: confluentinc/cp-kafka:7.3.0 # 使用Confluent提供的Kafka镜像,版本7.3.0depends_on:- zookeeper # 指定Kafka服务依赖于Zookeeper服务environment:KAFKA_BROKER_ID: 1 # 设置Kafka broker的唯一IDKAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # 指定Kafka连接的Zookeeper地址KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT # 定义监听器安全协议映射KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_INTERNAL://localhost:9093 # 定义对外广播的监听器地址KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093 # 定义Kafka监听的地址和端口KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL # 指定broker间通信使用的监听器名称ports:- "9092:9092" # 将容器内的9092端口映射到主机的9092端口
- 启动集群:在包含
docker-compose.yml
的目录下执行:
docker-compose up -d
此配置启动一个单节点 Zookeeper 和一个 Kafka Broker,通过映射本地端口9092
实现外部访问。如需扩展集群,可增加kafka
服务实例并调整配置。
2.2 基础操作入门
命令行工具实战
- 创建 Topic:使用
kafka-topics.sh
命令创建一个名为test_topic
,包含 3 个分区、2 个副本的 Topic:
/opt/kafka/bin/kafka-topics.sh --create \--topic test_topic \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 2
- 生产与消费消息:
- 生产者:通过
kafka-console-producer.sh
向test_topic
发送消息:
/opt/kafka/bin/kafka-console-producer.sh \--topic test_topic \--bootstrap-server localhost:9092
输入消息内容(如Hello, Kafka!
)并回车发送。
- 消费者:使用
kafka-console-consumer.sh
从test_topic
拉取消息,支持从头开始消费或从最新位置消费:
# 从头开始消费
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--from-beginning \--bootstrap-server localhost:9092# 从最新位置消费
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--bootstrap-server localhost:9092
- 查看 Topic 元数据:使用
--describe
参数查看test_topic
的分区分布、Leader 副本等信息:
/opt/kafka/bin/kafka-topics.sh --describe \--topic test_topic \--bootstrap-server localhost:9092
- 消费位移管理:默认情况下,Consumer 自动提交 Offset。如需手动提交,可在消费时添加
--enable-auto-commit=false
参数,并通过commitSync()
或commitAsync()
方法控制提交时机。
2.3 首个 Java 程序:Producer & Consumer
Maven 依赖配置
在pom.xml
中添加 Kafka 客户端依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
Producer 代码示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException; public class KafkaProducerExample {public static void main(String[] args) {// 1. 配置Kafka生产者属性Properties props = new Properties();// 设置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置值的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置消息确认机制:等待所有副本确认(最可靠但最慢)props.put(ProducerConfig.ACKS_CONFIG, "all");// 设置发送失败时的重试次数props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2. 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 3. 创建要发送的消息记录// 参数:topic名称,消息key,消息valueProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key1", "message1");try {// 4. 发送消息(同步方式)// send()返回Future,get()会阻塞直到收到响应RecordMetadata metadata = producer.send(record).get();// 5. 打印消息发送成功的元数据System.out.println("Message sent to partition " + metadata.partition() + " with offset " + metadata.offset());} catch (InterruptedException | ExecutionException e) {// 6. 处理发送过程中可能出现的异常e.printStackTrace();} finally {// 7. 关闭生产者(重要!避免资源泄漏)producer.close();}}
}
Consumer 代码示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties; public class KafkaConsumerExample {public static void main(String[] args) {// 1. 配置Kafka消费者属性Properties props = new Properties();// 设置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置消费者组ID(同一组内的消费者共享消息)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");// 设置键的反序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置值的反序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 注意:默认是自动提交offset,这里我们改为手动提交(见下方commitSync())// 2. 创建Kafka消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅主题(可以订阅多个主题,这里用单例集合订阅单个主题)consumer.subscribe(Collections.singletonList("test_topic"));// 4. 持续轮询消息while (true) {// poll()方法获取消息,参数是等待时间(避免CPU空转)// 返回一批记录(可能包含0到多条消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 5. 处理收到的每条消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}// 6. 手动同步提交offset(确保消息被成功处理后再提交)// 注意:生产环境应考虑错误处理和异步提交(commitAsync)consumer.commitSync(); }// 实际应用中应该添加关闭逻辑(如通过ShutdownHook)// consumer.close();}
}
上述 Java 程序分别实现了消息的生产与消费,通过配置 Producer 和 Consumer 的参数,可灵活控制消息发送策略与消费行为。