基础
Java Kafka消费者主要通过以下核心类实现:
- KafkaConsumer:消费者的核心类,用于创建消费者对象进行数据消费1
- ConsumerConfig:获取各种配置参数,如果不配置就使用默认值1
- ConsumerRecord:每条数据都要封装成一个ConsumerRecord对象才可以进行消费1
偏移量(Offset)的含义
- Offset 是 Kafka 分区内部的消息序号,唯一标识一条消息在分区内的位置。
- 对于 consumer,offset 代表“下一个要消费的消息”。
- 提交 offset 是容错的关键,当 consumer 崩溃/重启/再均衡后,能从正确位置恢复消费。
- 如果 offset 提交得过早,可能会丢消息;如果太晚,可能会重复消费。
Kafka提供两种主要的消费方式:
(1)手动提交offset方式
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交
props.put(ConsumerConfig.GROUP_ID_CONFIG, "csdn");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("testKafka"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());}consumer.commitAsync(); // 异步提交// 或者使用 consumer.commitSync(); // 同步提交
}
(2)自动提交offset方式
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 开启自动提交
props.put("auto.commit.interval.ms", "1000"); // 自动提交时间间隔while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());}// 不需要手动提交offset
}
手动提交offset有两种具体实现:
- commitSync():同步提交,会失败重试,一直到提交成功1
- commitAsync():异步提交,没有失败重试机制,但延迟较低1
消费者组(Consumer Group)
// 通过group.id配置指定消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "experiment");
消费者组的特性:
- 同一个组中的consumer订阅同样的topic,每个consumer接收topic一些分区中的消息4
- 同一个分区不能被一个组中的多个consumer消费4
Broker连接
// 指定Kafka集群的broker地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
偏移量(Offset)管理
// 记录分区的offset信息
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();// 在处理消息时更新offset
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata")
);// 提交特定的偏移量
consumer.commitAsync(currentOffsets, null);
消费者通过pull(拉)模式从broker中读取数据:
-
轮询机制:通过
consumer.poll(timeout)
方法轮询获取消息 -
批量处理:poll方法返回的是一批数据,不是单条
-
心跳维护:消费者通过向GroupCoordinator发送心跳来维持和群组以及分区的关系
在生产环境中,一般使用手动提交offset方式,因为:
- 手动提交offset取到的数据是可控的
- 可以通过控制提交offset和消费数据的顺序来保证数据的可靠性
- 虽然commitAsync没有失败重试机制,但实际工作中用它比较多,因为延迟较低
KafkaConsumer
类分析
KafkaConsumer
是 Kafka 的客户端核心类之一,用于消费 Kafka 集群中的消息。它支持高可靠的消息消费、自动容错、分区分配与再均衡、消费组(Consumer Group)等机制。
主要成员
- delegate:
KafkaConsumer
实际的大部分操作都委托给内部的ConsumerDelegate
对象。 - CREATOR:用于创建
ConsumerDelegate
的工厂。
主要构造方法
-
支持多种方式初始化 Consumer(通过 Properties、Map,及自定义反序列化器)。
-
构造过程会初始化配置、反序列化器、底层网络客户端等。
主要方法
- subscribe/assign/unsubscribe:主题和分区的订阅与管理。
- poll:拉取消息的核心方法。
- commitSync/commitAsync:手动或自动提交消费位移(offset)。
- seek/seekToBeginning/seekToEnd:手动控制消费位移。
- position/committed/beginningOffsets/endOffsets:查询当前偏移量、提交的偏移量、起始/末尾偏移量等。
- close/wakeup:关闭消费者、唤醒阻塞的 poll 等。
关键代码与核心算法
2.1 订阅与分区分配
-
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
- 通过订阅主题,KafkaConsumer 会自动和 Broker 进行“组协调”,由服务器端分配分区。
- 可指定回调监听分区分配及撤销(ConsumerRebalanceListener)。
-
assign(Collection<TopicPartition> partitions)
- 手动指定消费哪些分区,此时不参与组协调(不属于消费组)。
2.2 拉取消息
- poll(Duration timeout)
- 这是消息获取的核心方法。内部流程大致是:
- 检查当前分区分配、偏移量状态,自动心跳维护。
- 向 Broker 发送 Fetch 请求,获取分配分区的消息数据。
- 更新本地偏移量、缓存消息,返回给用户。
- 处理组再均衡及回调。
poll
同时承担了心跳(维持消费组成员关系)、数据拉取、再均衡等多项职责。
- 这是消息获取的核心方法。内部流程大致是:
2.3 偏移量管理
-
commitSync/commitAsync
- 将消费到的 offset 提交到 Kafka 的 __consumer_offsets 主题。
- commitSync 为同步,commitAsync 为异步,有回调。
-
offsetsForTimes、seek、position、committed
- 提供了丰富的偏移量控制与查询能力。比如按时间查 offset、手动指定 offset、获取当前 offset、已提交 offset 等。
2.4 消费组与再均衡
- KafkaConsumer 通过 group.id 配置参与消费组。
- 在订阅/拉取数据时自动和 Broker 协作,进行分区分配(GroupCoordinator/PartitionAssignor)。
- 当消费组成员变动(增减、订阅主题变更、分区数变更等)时,自动触发 group rebalance。
核心数据结构
3.1 SubscriptionState
-
跟踪当前订阅主题、分区、偏移量等。
3.2 ConsumerConfig
-
消费者的配置参数。
3.3 ConsumerRecords、ConsumerRecord
-
消费到的数据结构(批量和单条消息)。
3.4 TopicPartition
-
主题 + 分区的封装对象。
3.5 OffsetAndMetadata
- 偏移量及其元数据,用于提交/查询 offset。
与 Broker 的交互流程
-
启动/订阅
- consumer 通过 group.id 加入消费组,向 Broker 发送 JoinGroup 请求。
- Broker 分配分区,返回 Assignment。
- consumer 通过 Fetch 请求拉取分配到的分区数据。
-
心跳与再均衡
- consumer 定期发送心跳(Heartbeat)给 Broker,维持消费组成员关系。
- 发生成员变更时,Broker 发起再均衡,consumer 收到分区分配变化的通知。
-
拉取消息
- consumer 向分区主副本 broker 发送 Fetch 请求,拉取新消息。
-
提交偏移量
- consumer 通过 OffsetCommit 请求,将消费进度(offset)提交到 __consumer_offsets 主题。
- 下次重启或再均衡后,会以 committed offset 作为消费起点。
消费组(Consumer Group)机制与关联
- 消费组:同一个 group.id 的多个消费者共同组成消费组。
- 一个分区只能被一个消费组内的 consumer 消费。
- 消费组间互不影响,可以实现广播(多个 group 消费同一 topic)。
- 分区分配算法:Kafka 内置多种分配策略(如 RangeAssignor、RoundRobinAssignor),也支持自定义。
- 组协调器(GroupCoordinator):消费组内所有成员和 broker 的协调节点,负责分配分区和管理组成员状态。
- 组再均衡:消费组成员变动(上线、下线、订阅变更)时,broker 会触发 rebalance,重新分配分区。
总结
KafkaConsumer
封装了和 Kafka broker 的全部交互,包括组管理、分区分配、消息拉取、偏移量提交等。- 关键流程:订阅分区 → 拉取消息 → 提交偏移量 → 处理再均衡。
- 数据结构如 TopicPartition、OffsetAndMetadata 贯穿分区与 offset 的管理。
- 消费组机制保证了分布式消费的高可用、横向扩展性和容错。
网络连接分析
1. 消费组与 Broker 的连接与交互
1.1 消费组与 Broker 的网络连接
- KafkaConsumer 实际的网络连接是通过底层的
KafkaClient
(如NetworkClient
)实现的。 - 每一个 KafkaConsumer 对象会维护一组 TCP 连接,这些连接包括:
- 与每个被消费分区的 leader broker 的连接:用于拉取数据(Fetch)。
- 与 GroupCoordinator(消费组协调者)的连接:用于管理 group membership、分区再均衡、offset 提交等。
KafkaConsumer
→ConsumerDelegate
→KafkaConsumerDelegate
→KafkaClient
(通常是NetworkClient
)- 构造器中初始化 client
连接建立流程
- 启动时,通过
bootstrap.servers
配置连接部分 broker,自动发现集群。 - 加入消费组时,向 GroupCoordinator 发送 JoinGroup、SyncGroup、Heartbeat 等请求,维持“组”状态。
2. 拉取数据的批量处理机制
拉取数据不是“一条一条”
- KafkaConsumer 一次 poll 拉取的是一批 record,而非单个。
- 批量拉取由参数控制:
fetch.min.bytes
:每次最少拉取的字节数fetch.max.bytes
:单次最大拉取的字节数max.poll.records
:每次 poll 返回的最大消息数
这样做的原因:
- 批量拉取能极大提升吞吐量,减少网络和序列化开销。
- 只有在消息极少/网络慢时,才可能一次只拉一个 record,实际场景几乎不会。
关键源码位置
- 拉取主逻辑在
KafkaConsumer.poll()
,内部委托到ConsumerNetworkClient
→NetworkClient
。 - 数据结构:
ConsumerRecords<K, V>
(批量记录集合) - 相关参数解析见 KafkaConsumerConfig
数据拉取的底层流程
- poll() 方法被调用
- 根据分区分配情况,构造 FetchRequest
- 通过
NetworkClient
向每个 leader broker 发送 FetchRequest - Broker 返回批量数据
- 反序列化为
ConsumerRecords<K, V>
,返回给用户
关键代码入口
KafkaConsumer.poll()
ConsumerDelegate.poll()
Fetcher.fetchRecords()
- 底层 socket 通信:
NetworkClient.send()
3. 多线程实现与线程安全
KafkaConsumer 并不是多线程的
- 它本身不是线程安全的,所有方法必须在同一个线程中调用,除了
wakeup()
。 - 官方建议:一个线程一个 Consumer 实例,或者用单线程 poll,其他线程异步处理数据。
源码注释说明
-
KafkaConsumer.java 注释:“The consumer is not thread-safe...”
多线程架构推荐
- 如果需要多线程消费,加一层 queue,把消息分发到多个工作线程,由 poll 线程专门负责与 broker 通信。
4. 复杂和有意思的实现分析
4.1 消费组协调与再均衡
- 消费组成员通过心跳(Heartbeat)、JoinGroup、SyncGroup 维护 membership。
- 分区分配算法在
ConsumerPartitionAssignor
中实现(Range, RoundRobin, Sticky 等)。 - 再均衡期间,consumer 会暂停拉取、等待新分配。
代码位置
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
org.apache.kafka.clients.consumer.internals.Fetcher
4.2 批量拉取的背后——高效网络 IO
- Kafka 的 Fetch API 支持“多分区合并拉取”,同一台 broker 上的多个分区会合并在一次网络请求里。
NetworkClient
负责 socket 通信,异步 IO,支持高并发。Selector
(NIO)负责事件分发,提高并发和效率。
代码位置
org.apache.kafka.clients.NetworkClient
org.apache.kafka.common.network.Selector
4.3 Offset 管理的强一致性
- Offset 提交实际上是把偏移量写入特殊的 topic(__consumer_offsets),由 GroupCoordinator 管理。
- 消费组内 offset 的一致性和再均衡机制确保了“至少一次”语义。
5. 直接源码定位
功能 | 关键类或方法 |
---|---|
网络连接的建立 | KafkaConsumer → ConsumerDelegate → KafkaClient (NetworkClient) |
消费组协调 | ConsumerCoordinator、GroupCoordinator、JoinGroup/SyncGroup |
消息批量拉取 | KafkaConsumer.poll()、Fetcher.fetchRecords() |
多线程相关说明 | KafkaConsumer 注释、wakeup() |
Offset 管理 | commitSync/commitAsync、OffsetCommitRequest、__consumer_offsets |
负载均衡与再均衡 | ConsumerPartitionAssignor、ConsumerCoordinator |
小结
- 每个 Consumer 进程会与需要的 broker 建立 TCP 连接(1个消费组协调,N个分区 leader)。
- 每次 poll 拉取的是“批量数据”,不是一条!由参数决定批量大小。
- KafkaConsumer 本身不是多线程的,多线程要用 queue 解耦。
- 复杂逻辑:消费组协调、分区分配、批量拉取、offset 强一致性,源码分布见上表。
协作
详细说明各个组件及其关系:
-
ClassicKafkaConsumer<K, V>
(客户端)- 角色: 这是用户应用程序直接与之交互的 Kafka 消费者高级 API 的一个实现(遵循经典组协议)。它封装了消费消息的整个生命周期,包括连接到 Kafka 集群、加入消费组、分配分区、拉取消息、提交位移等。
- 主要职责:
- 管理消费者的配置。
- 协调内部组件如
Fetcher
和ConsumerCoordinator
的工作。 - 向用户应用程序提供
poll()
方法来获取消息。 - 处理用户发起的位移提交请求。
- 管理消费者的生命周期(如
subscribe()
,assign()
,close()
)。
- 与
Fetcher
的关系:ClassicKafkaConsumer
拥有并管理一个Fetcher
实例。当poll()
方法被调用时,如果需要拉取数据,ClassicKafkaConsumer
会委托Fetcher
来执行实际的数据拉取操作。 - 与
ConsumerCoordinator
的关系:ClassicKafkaConsumer
拥有并管理一个ConsumerCoordinator
实例(前提是配置了group.id
)。ConsumerCoordinator
负责所有与消费组协调相关的任务。
-
Fetcher<K, V>
(客户端)- 角色: 负责从 Kafka Broker 拉取实际的消息数据。
- 主要职责:
- 根据当前分配给消费者的分区,构建
FetchRequest
。 - 执行请求合并: 将发往同一个 Broker 的多个分区的拉取请求合并成一个网络请求。
- 通过
ConsumerNetworkClient
(网络客户端的底层实现) 将FetchRequest
发送给相应的 Broker。 - 接收
FetchResponse
,解析消息数据。 - 对消息进行反序列化。
- 管理拉取会话 (Fetch Session) 以优化拉取效率 (KIP-227)。
- 将拉取到的数据缓冲起来,供
ClassicKafkaConsumer
的poll()
方法消费。
- 根据当前分配给消费者的分区,构建
- 与
ClassicKafkaConsumer
的关系:Fetcher
是ClassicKafkaConsumer
的一个内部组件,由ClassicKafkaConsumer
创建和控制。 - 与 Broker 的关系:
Fetcher
直接与存储 Topic/Partition 数据的 Broker 进行网络通信,发送拉取请求并接收数据。
-
ConsumerCoordinator
(客户端)- 角色: 负责代表消费者与 Kafka 集群中的
GroupCoordinator
(运行在某个 Broker 上) 进行交互,以管理消费者在消费组中的成员资格和分区分配。 - 主要职责:
- 发现 GroupCoordinator: 找到负责当前消费组的
GroupCoordinator
Broker。 - 加入消费组 (JoinGroup): 发送
JoinGroupRequest
给GroupCoordinator
,表明消费者希望加入该组。 - 同步消费组 (SyncGroup): 在
JoinGroup
成功后(通常由 leader 消费者执行),发送SyncGroupRequest
以获取分配给该消费者的分区列表。非 leader 消费者也发送SyncGroupRequest
来获取分配结果。 - 心跳 (Heartbeat): 定期向
GroupCoordinator
发送心跳,以表明消费者仍然存活,并维持其在消费组中的成员资格以及对所分配分区的占有。 - 位移提交 (OffsetCommit): 将消费者处理过的消息的位移提交给
GroupCoordinator
进行存储。 - 位移拉取 (OffsetFetch): 从
GroupCoordinator
获取已提交的位移。 - 处理 Rebalance 相关的回调(通过
ConsumerRebalanceListener
)。
- 发现 GroupCoordinator: 找到负责当前消费组的
- 与
ClassicKafkaConsumer
的关系:ConsumerCoordinator
是ClassicKafkaConsumer
的一个内部组件,由ClassicKafkaConsumer
创建和控制(如果配置了group.id
)。 - 与
GroupCoordinator
(Broker 端) 的关系:ConsumerCoordinator
通过网络与运行在某个 Broker 上的GroupCoordinator
服务进行通信。
- 角色: 负责代表消费者与 Kafka 集群中的
-
GroupCoordinator
(服务端,运行在 Broker 上)- 角色: Kafka Broker 上的一个服务,负责管理一个或多个消费组。每个消费组都有一个对应的
GroupCoordinator
。 - 主要职责:
- 处理来自
ConsumerCoordinator
(客户端) 的请求,如JoinGroup
,SyncGroup
,Heartbeat
,OffsetCommit
,OffsetFetch
等。 - 维护消费组的元数据,包括组成员列表、每个成员的订阅信息、当前的分区分配方案、已提交的位移等。这些信息通常存储在内部的
__consumer_offsets
topic 中。 - 选举 Leader Consumer: 在消费组内选举一个 Leader Consumer,由 Leader Consumer 负责计算分区分配方案。
- 触发和协调 Rebalance: 当消费组成员发生变化(加入/离开)、订阅的 Topic 分区发生变化时,
GroupCoordinator
会启动 Rebalance 过程。 - 存储和管理消费位移。
- 处理来自
- 与
ConsumerCoordinator
(客户端) 的关系:GroupCoordinator
是ConsumerCoordinator
(客户端) 的服务端对应组件,接收并处理来自客户端的协调请求。 - 与 Broker 的关系:
GroupCoordinator
是 Broker 进程内的一个模块/服务。
- 角色: Kafka Broker 上的一个服务,负责管理一个或多个消费组。每个消费组都有一个对应的
-
Broker (服务端)
- 角色: Kafka 集群中的一个服务器节点。
- 主要职责:
- 存储数据: 存储 Topic 的 Partition 数据(消息日志)。
- 处理生产请求: 接收来自生产者的消息并写入相应的 Partition。
- 处理拉取请求: 响应来自
Fetcher
(消费者客户端) 的数据拉取请求,从磁盘读取消息并返回。 - 运行
GroupCoordinator
服务: 部分 Broker 会运行GroupCoordinator
服务来管理消费组。 - 副本管理: 参与 Partition 的副本同步和 Leader 选举。
- 元数据管理: 维护集群元数据的一部分。
- 与
Fetcher
的关系:Fetcher
从 Broker 拉取消息数据。 - 与
GroupCoordinator
的关系:GroupCoordinator
服务运行在 Broker 内部。
Fetcher
sendFetches 实现了基本发送请求
/*** Set up a fetch request for any node that we have assigned partitions for which doesn't already have* an in-flight fetch or pending fetch data.* @return number of fetches sent*/public synchronized int sendFetches() {final Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = prepareFetchRequests();sendFetchesInternal(fetchRequests,(fetchTarget, data, clientResponse) -> {synchronized (Fetcher.this) {handleFetchSuccess(fetchTarget, data, clientResponse);}},(fetchTarget, data, error) -> {synchronized (Fetcher.this) {handleFetchFailure(fetchTarget, data, error);}});return fetchRequests.size();}
总结一下合并的流程:
-
准备阶段 (
prepareFetchRequests()
- 位于AbstractFetch
类中):Fetcher
(通过继承的prepareFetchRequests
方法) 遍历所有当前消费者订阅的、并且已经分配给此消费者的、并且已经知道 leader broker 的分区。- 对于每个这样的分区,它会确定其 leader broker (即
Node
)。 - 它将所有需要从同一个
Node
拉取数据的分区信息收集起来,形成一个FetchSessionHandler.FetchRequestData
对象。 - 最终,
prepareFetchRequests()
返回一个Map<Node, FetchSessionHandler.FetchRequestData>
。这个 Map 的结构本身就体现了合并:每个Node
(Broker) 对应一个FetchRequestData
,这个FetchRequestData
聚合了发往该 Broker 的所有分区的拉取需求。
-
发送阶段 (
sendFetchesInternal()
- 位于Fetcher
类中):sendFetchesInternal
方法接收这个Map
。- 它遍历 Map 中的每一个
Node
(Broker)。 - 对于每一个
Node
,它使用对应的FetchSessionHandler.FetchRequestData
来创建一个FetchRequest.Builder
实例。这个Builder
会被配置为包含该FetchRequestData
中指定的所有 TopicPartition 的拉取信息。 - 然后,为每个
Node
发送一个(且仅一个)FetchRequest
。
合并的本质是将发往同一个 Broker 的多个分区的拉取操作打包到一个网络请求中。 这样做的好处是:
- 减少网络开销: 显著减少了客户端和 Broker 之间的网络往返次数。
- 提高吞吐量: Broker 可以一次性处理来自一个消费者的多个分区的请求,提高了处理效率。
- 降低延迟: 减少了等待多个独立请求响应的时间。