第5篇、 Kafka 数据可靠性与容错机制

在分布式消息队列系统中,数据可靠性容错能力 是核心指标。Kafka 作为高吞吐、可扩展的流式处理平台,依靠副本复制、Leader 选举和 ISR 机制,保证了在节点故障时消息依然能够可靠传输与消费。

📚 目录

理论基础

  • 一、数据复制机制与 ISR

    • 核心概念介绍
      • 1. 分区副本(Partition Replica)
      • 2. Leader 与 Follower
      • 3. ISR(In-Sync Replica)
    • 工作机制
    • 数据流向图
  • 二、Leader 选举流程

    • 核心概念介绍
      • 1. Controller(控制器)
      • 2. Leader 选举触发条件
      • 3. 选举规则与流程
    • 选举优势
  • 三、Broker 宕机后的恢复

    • 故障恢复流程图
  • 四、min.insync.replicas 配置与数据丢失风险

    • 核心概念介绍
      • 1. min.insync.replicas 参数
      • 2. Producer acks 参数
      • 3. 数据丢失风险分析
    • 配置建议
    • 性能与可靠性权衡
    • ISR 状态变化图

实践案例

  • 五、案例演示:关停一个 Broker

    • 1. 环境准备
    • 2. 启动 Producer & Consumer
    • 3. 模拟故障
    • 4. 观察现象
    • 5. Broker 恢复
  • 六、总结

代码实战

  • 七、动手实战:可靠性配置 + 故障注入(含代码)
    • 1) 环境准备
    • 2) 创建 Topic(3 副本 + 要求至少 2 个副本同步)
    • 3) 代码实现与运行
      • 生产者代码(可靠性配置)
      • 消费者代码(可靠性监控)
      • 运行命令
    • 4) 故障注入与现象观察
    • 5) 小结与实践建议

可视化工具

  • 八、交互式可视化:实时查看 ISR 与发送/消费统计
    • 1) 指标服务代码(Flask)
    • 2) 可视化页面代码
    • 3) 建议的演示流程

本文将从以下几个方面展开:

  • 数据复制机制与 ISR(In-Sync Replica)
  • Leader 选举流程
  • Broker 宕机后的恢复过程
  • min.insync.replicas 配置与数据丢失风险
  • 案例演示:关停一个 Broker,观察副本切换与消费情况

重要术语解释

在深入讨论之前,先了解几个关键术语:

LEO(Log End Offset):日志结束偏移量,表示分区中最后一条消息的偏移量位置。

HW(High Watermark):高水位线,表示消费者可见的最大偏移量,HW ≤ LEO。

Replication Factor:副本因子,指定每个分区需要多少个副本。

Controller:Kafka 集群的控制器,负责管理分区和副本的分配。

ZooKeeper:Kafka 的元数据存储和协调服务(Kafka 2.8+ 版本开始支持 KRaft 模式,不再依赖 ZooKeeper)。


一、数据复制机制与 ISR

核心概念介绍

1. 分区副本(Partition Replica)

定义:Kafka 中每个分区都有多个副本,分布在不同的 Broker 上,用于提供数据冗余和容错能力。

特点

  • 每个分区有一个 Leader 和若干个 Follower
  • Leader 负责处理所有读写请求
  • Follower 只负责从 Leader 拉取数据并同步
  • 副本数量由 replication-factor 参数控制
2. Leader 与 Follower

Leader

  • 处理所有客户端的读写请求
  • 维护分区的元数据信息
  • 负责向 Follower 推送数据变更

Follower

  • 被动接收 Leader 的数据同步
  • 不处理客户端请求
  • 在 Leader 故障时可能被选为新的 Leader
3. ISR(In-Sync Replica)

定义:ISR 是"同步副本集合",包含所有与 Leader 保持同步状态的副本。

同步条件

  • Follower 的 LEO(Log End Offset)与 Leader 的 HW(High Watermark)差距在阈值内
  • 默认阈值由 replica.lag.time.max.ms 控制(通常为 10 秒)

ISR 的作用

  • 数据一致性保证:只有 ISR 中的副本才被认为是"安全"的
  • Leader 选举:新 Leader 只能从 ISR 中选择
  • 写入确认:Producer 的 acks=all 需要等待 ISR 中所有副本确认

工作机制

  • Leader 写入 → ISR 跟进:只有 ISR 副本完成写入,消息才被确认
  • Follower 滞后处理:若某个 Follower 落后过多,会被移出 ISR,避免影响整体可用性
  • 动态调整:ISR 会根据副本的同步状态动态调整

这样,Kafka 保证了即使部分 Broker 宕机,只要 ISR 中至少有一个副本存活,数据就不会丢失。

数据流向图

Consumer 客户端
ISR 管理
Kafka 集群
Broker 1 (Leader)
Broker 2 (Follower)
Broker 3 (Follower)
Producer 客户端
1. 发送消息
acks=all
2. 写入本地日志
3. 拉取数据
4. 写入本地日志
3. 拉取数据
4. 写入本地日志
5. 更新 HW
6. 报告 LEO
6. 报告 LEO
7. 检查同步状态
8. 确认写入成功
9. 可见消息
Consumer
ISR: Leader, Follower1, Follower2
High Watermark
Follower Broker 2
Follower Log 2
Follower Broker 1
Follower Log 1
Local Log
Producer
Leader Broker

数据流向说明

  1. Producer 发送:Producer 向 Leader 发送消息,设置 acks=all
  2. Leader 写入:Leader 将消息写入本地日志
  3. Follower 拉取:Follower 从 Leader 拉取新消息
  4. Follower 写入:Follower 将消息写入各自的本地日志
  5. 更新 HW:Leader 根据 Follower 的 LEO 更新 HW
  6. 报告 LEO:Follower 向 Leader 报告自己的 LEO
  7. ISR 检查:Leader 检查所有副本的同步状态
  8. 确认成功:当 ISR 中所有副本都同步后,向 Producer 确认
  9. Consumer 消费:Consumer 只能看到 HW 以下的消息

二、Leader 选举流程

核心概念介绍

1. Controller(控制器)

定义:Controller 是 Kafka 集群中的一个特殊 Broker,负责管理整个集群的元数据和协调工作。

职责

  • 监控集群中所有 Broker 的状态
  • 管理分区的 Leader 和 Follower 分配
  • 处理 Broker 的加入和离开事件
  • 协调 Leader 选举过程

选举机制

  • 集群启动时,所有 Broker 竞争成为 Controller
  • 使用 ZooKeeper 的临时节点机制确保只有一个 Controller
  • Controller 故障时,其他 Broker 会重新选举新的 Controller
2. Leader 选举触发条件

自动触发

  • Leader 所在的 Broker 宕机
  • Leader 所在的 Broker 网络分区
  • Leader 副本数据损坏

手动触发

  • 管理员主动触发分区重新分配
  • 集群扩容或缩容
3. 选举规则与流程

选举规则

  1. 优先从 ISR 中选择:新 Leader 必须来自 ISR 集合
  2. 选择第一个可用副本:在 ISR 中选择第一个可用的副本作为新 Leader
  3. 避免脏读:确保新 Leader 的数据与原 Leader 一致

选举流程

  1. 检测故障:Controller 检测到 Leader 失效
  2. 更新 ISR:从 ISR 中移除失效的副本
  3. 选择新 Leader:从剩余 ISR 中选择一个作为新 Leader
  4. 更新元数据:通知所有 Broker 更新分区元数据
  5. 客户端感知:Producer 和 Consumer 自动感知 Leader 变化

选举优势

👉 数据一致性保证:基于 ISR 的 Leader 选举确保新 Leader 上的数据与原 Leader 一致,避免数据丢失

👉 快速故障恢复:选举过程通常在几秒内完成,最小化服务中断时间

👉 自动容错:无需人工干预,系统自动处理故障和恢复


三、Broker 宕机后的恢复

Broker 故障是分布式系统的常见情况。Kafka 的恢复过程大致如下:

  1. 故障发生:Leader Broker 宕机,分区不可写。

  2. Leader 切换:Controller 从 ISR 中选出新的 Leader。

  3. 客户端感知

    • Producer 会自动更新元数据,继续向新 Leader 写入。
    • Consumer 自动订阅新的 Leader 进行消费。
  4. Broker 恢复上线

    • 重新加入集群。
    • 从新的 Leader 拉取缺失的数据,追上进度后重新加入 ISR。

这样,Kafka 在 Broker 失效和恢复时,能够保证 高可用 + 数据不丢失

故障恢复流程图

ProducerLeader BrokerFollower Broker 1Follower Broker 2ControllerConsumer正常状态:3个Broker都在线发送消息同步数据同步数据确认同步确认同步写入成功确认故障发生:Leader Broker宕机连接断开连接断开连接断开Controller检测故障并选举新Leader检测到Leader失效检测到Leader失效选举为新Leader更新元数据客户端自动感知Leader变化重新连接新Leader重新连接新Leader继续处理请求继续提供数据原Leader恢复重新加入集群从新Leader同步数据追上进度后重新加入ISRProducerLeader BrokerFollower Broker 1Follower Broker 2ControllerConsumer

故障恢复阶段说明

  1. 正常状态:3个Broker都在线,数据正常同步
  2. 故障检测:Controller检测到Leader Broker宕机
  3. Leader选举:从ISR中选择新的Leader
  4. 元数据更新:通知所有Broker更新分区元数据
  5. 客户端重连:Producer和Consumer自动连接到新Leader
  6. 服务恢复:新Leader开始处理读写请求
  7. 原Leader恢复:原Leader重新加入集群并同步数据

四、min.insync.replicas 配置与数据丢失风险

核心概念介绍

1. min.insync.replicas 参数

定义min.insync.replicas 是 Topic 级别的配置参数,指定 Producer 使用 acks=all 时,至少需要多少个副本确认写入才算成功。

作用机制

  • 当 ISR 中的副本数量 < min.insync.replicas 时,acks=all 的写入会被拒绝
  • 这是一个"安全阀",防止在副本数量不足时冒险写入
  • 默认值为 1,生产环境建议设置为 2 或更高
2. Producer acks 参数

acks=0

  • Producer 不等待任何确认
  • 最高性能,但可能丢失数据
  • 适用于对数据丢失容忍度高的场景

acks=1

  • 等待 Leader 确认写入
  • 平衡性能和可靠性
  • 在 Leader 故障时可能丢失数据

acks=all

  • 等待 ISR 中所有副本确认
  • 最高可靠性,但性能较低
  • 需要配合 min.insync.replicas 使用
3. 数据丢失风险分析

风险场景

  1. ISR 副本不足

    场景:3 副本 Topic,min.insync.replicas=2
    情况:2 个 Broker 宕机,ISR 只剩 1 个副本
    结果:acks=all 写入被拒绝,避免数据丢失
    
  2. 配置不当

    场景:min.insync.replicas=1
    情况:Leader 写入后立即宕机,Follower 未同步
    结果:数据丢失
    
  3. 网络分区

    场景:Leader 与 Follower 网络隔离
    情况:Leader 继续写入,Follower 无法同步
    结果:可能导致数据不一致
    

配置建议

生产环境最佳实践

# Topic 配置
replication-factor=3
min.insync.replicas=2# Producer 配置
acks=all
retries=10
enable.idempotence=true

配置说明

  • 副本数 ≥3:提供足够的容错能力
  • min.insync.replicas ≥2:确保至少 2 个副本同步
  • Producer 使用 acks=all:等待所有 ISR 副本确认
  • 启用幂等性:避免重复消息

性能与可靠性权衡

可靠性优先

  • min.insync.replicas=2acks=all
  • 适合金融、支付等对数据一致性要求极高的场景

性能优先

  • min.insync.replicas=1acks=1
  • 适合日志收集、监控数据等对丢失容忍度高的场景

平衡配置

  • min.insync.replicas=2acks=all
  • 在大多数场景下提供最佳的性能与可靠性平衡

ISR 状态变化图

集群启动
Leader宕机
Follower1宕机
Follower2宕机
Controller检测
移除故障副本
新Leader开始工作
移除故障副本
ISR仍有2个副本
移除故障副本
ISR仍有2个副本
原Leader重新上线
Follower1重新上线
Follower2重新上线
从新Leader拉取数据
从Leader拉取数据
从Leader拉取数据
追上进度
恢复完整ISR
正常状态
Broker1故障
Broker2故障
Broker3故障
选举新Leader
更新ISR
服务恢复
继续服务
Broker1恢复
Broker2恢复
Broker3恢复
数据同步
重新加入ISR
ISR: [Leader, Follower1, Follower2]
状态: 3个副本同步
写入: acks=all 成功
ISR: [Leader, Follower2]
状态: 2个副本同步
写入: acks=all 成功
ISR: [新Leader, Follower2]
状态: 2个副本同步
写入: acks=all 成功

ISR 状态说明

  • 正常状态:3个副本都在ISR中,数据完全同步
  • 单点故障:1个Follower故障,ISR减少但服务继续
  • Leader故障:需要选举新Leader,短暂服务中断
  • 多点故障:如果ISR副本数 < min.insync.replicas,写入被拒绝
  • 故障恢复:故障Broker重新上线后需要同步数据才能重新加入ISR

五、案例演示:关停一个 Broker

我们可以通过一个小实验观察 Kafka 的容错性。

1. 环境准备

  • 3 个 Broker(broker1, broker2, broker3)
  • 主题 demo_topic,副本数 = 3

2. 启动 Producer & Consumer

# Producer
kafka-console-producer.sh --broker-list localhost:9092 --topic demo_topic# Consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo_topic --from-beginning

3. 模拟故障

# 停掉一个 Broker,例如 broker1
systemctl stop kafka@broker1

4. 观察现象

  • Controller 会自动触发 Leader 重选
  • Producer 仍然可以继续写入消息(可能有短暂抖动)。
  • Consumer 会自动切换到新 Leader 继续消费。

5. Broker 恢复

systemctl start kafka@broker1

恢复后,broker1 会 从当前 Leader 拉取缺失的数据,追上 ISR 进度,再次加入副本集合。


六、总结

Kafka 的数据可靠性与容错机制可以总结为:

  • 副本机制:保证分区多副本存储,避免单点故障。
  • ISR 集合:确保只有数据一致的副本才参与写入确认。
  • Leader 选举:在 Broker 宕机时快速切换,保证可用性。
  • min.insync.replicas:合理配置可在可靠性与性能间找到平衡。

通过本篇案例,我们看到 Kafka 即便在 Broker 故障时,也能保证 高可用与数据可靠性,这正是它被广泛应用于大规模流式处理场景的原因。


要不要我帮你画一张 Kafka 副本与 ISR 切换流程图(类似上一篇你用过的图示)?这样博客会更直观。


七、动手实战:可靠性配置 + 故障注入(含代码)

本节提供可运行的示例与故障注入步骤,帮助你直观看到 acks=allmin.insync.replicas 与 ISR 的影响。

1) 环境准备

python3 -m venv .venv && source .venv/bin/activate
pip install kafka-python

2) 创建 Topic(3 副本 + 要求至少 2 个副本同步)

kafka-topics.sh --create \--topic demo-reliability \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 3 \--config min.insync.replicas=2

验证与观察 ISR:

kafka-topics.sh --describe --topic demo-reliability --bootstrap-server localhost:9092

3) 代码实现与运行

生产者代码(可靠性配置)
# lesson_five/producer_reliability.py
import argparse
import json
import sys
import time
from typing import Optionalfrom kafka import KafkaProducer
from kafka.errors import KafkaErrordef parse_args() -> argparse.Namespace:parser = argparse.ArgumentParser(description="Kafka reliability demo producer")parser.add_argument("--bootstrap", default="localhost:9092", help="Kafka bootstrap servers")parser.add_argument("--topic", default="demo-reliability", help="Target topic")parser.add_argument("--count", type=int, default=50, help="Number of messages to send")parser.add_argument("--acks",choices=["all", "1"],default="all",help="Producer acks setting (all or 1)",)parser.add_argument("--sleep-ms",type=int,default=50,help="Sleep between sends in milliseconds (for readability)",)return parser.parse_args()def create_producer(bootstrap_servers: str, acks: str) -> KafkaProducer:# kafka-python uses acks as int or 'all'; map string accordinglyacks_opt: Optional[object]acks_opt = "all" if acks == "all" else 1return KafkaProducer(bootstrap_servers=bootstrap_servers,acks=acks_opt,retries=10,  # retry on transient errorslinger_ms=5,value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),key_serializer=lambda v: v.encode("utf-8") if v is not None else None,)def main() -> int:args = parse_args()producer = create_producer(args.bootstrap, args.acks)print(f"Producer starting: bootstrap={args.bootstrap}, topic={args.topic}, count={args.count}, acks={args.acks}")errors = 0for i in range(args.count):key = f"user-{i % 3}"value = {"index": i, "ts": int(time.time() * 1000), "key": key}future = producer.send(args.topic, key=key, value=value)try:metadata = future.get(timeout=10)print(f"SENT ok: i={i}, partition={metadata.partition}, offset={metadata.offset}, acks={args.acks}")except KafkaError as e:errors += 1print(f"SENT error: i={i}, error={repr(e)}", file=sys.stderr)if args.sleep_ms > 0:time.sleep(args.sleep_ms / 1000.0)producer.flush()print(f"Done. total={args.count}, errors={errors}, acks={args.acks}")producer.close()return 0 if errors == 0 else 1if __name__ == "__main__":raise SystemExit(main())
消费者代码(可靠性监控)
# lesson_five/consumer_reliability.py
import argparse
from typing import Listfrom kafka import KafkaConsumer, TopicPartitiondef parse_args() -> argparse.Namespace:parser = argparse.ArgumentParser(description="Kafka reliability demo consumer")parser.add_argument("--bootstrap", default="localhost:9092", help="Kafka bootstrap servers")parser.add_argument("--topic", default="demo-reliability", help="Topic to consume")parser.add_argument("--group", default="reliability-group", help="Consumer group id")parser.add_argument("--auto-offset-reset", default="earliest", choices=["earliest", "latest"], help="Auto offset reset policy")parser.add_argument("--enable-auto-commit", action="store_true", help="Enable auto commit (default false)")parser.add_argument("--max-records", type=int, default=50, help="Print up to N messages then exit (0 means infinite)")return parser.parse_args()def main() -> int:args = parse_args()consumer = KafkaConsumer(args.topic,bootstrap_servers=args.bootstrap,group_id=args.group,enable_auto_commit=args.enable_auto_commit,auto_offset_reset=args.auto_offset_reset,value_deserializer=lambda v: v.decode("utf-8", "ignore"),)print(f"Consumer starting: bootstrap={args.bootstrap}, topic={args.topic}, group={args.group}, auto_commit={args.enable_auto_commit}")# Print initial assignment if availableconsumer.poll(timeout_ms=200)assignment: List[TopicPartition] = list(consumer.assignment())if assignment:parts = ", ".join([f"{tp.topic}-{tp.partition}" for tp in assignment])print(f"Assigned partitions: {parts}")seen = 0for message in consumer:print(f"RECV partition={message.partition}, offset={message.offset}, key={message.key}, value={message.value}")if not args.enable_auto_commit:consumer.commit()if args.max_records and args.max_records > 0:seen += 1if seen >= args.max_records:breakconsumer.close()print("Consumer closed")return 0if __name__ == "__main__":raise SystemExit(main())
运行命令
# 启动消费者(建议先启动,便于观察早期消息)
python lesson_five/consumer_reliability.py --topic demo-reliability# 另起终端,启动生产者(默认 acks=all,发送 50 条)
python lesson_five/producer_reliability.py --topic demo-reliability --count 50 --acks all

生产者参数说明

  • --acks:可选 all1(默认 all
  • --count:发送条数(默认 50)
  • --bootstrap:Kafka 地址(默认 localhost:9092
  • --sleep-ms:发送间隔毫秒数(默认 50ms)

4) 故障注入与现象观察

下述步骤可在本地多 Broker 环境(3 节点)或容器环境中完成。

  1. 先保持 3 个 Broker 正常,确认 ISR 列表包含 3 个副本,生产者以 acks=all 发送,应全部成功。
  2. 停掉其中 1 个 Broker(示例命令二选一):
# systemd(按你的服务名替换)
sudo systemctl stop kafka@broker1# 或 Docker Compose(按你的服务名替换)
docker compose stop kafka-1
  1. 再次 describe,此时 ISR 可能为 2。继续以 acks=all 发送,仍可成功(偶有短暂抖动)。
  2. 再停掉第 2 个 Broker,使 ISR=1。此时:
    • acks=all 发送将失败,常见错误为 NotEnoughReplicasNotEnoughReplicasAfterAppend
    • 若切换为 acks=1 发送,可能成功返回,但在随后 Leader 故障时存在数据丢失风险(不建议生产使用)。
  3. 依次启动被关停的 Broker:
sudo systemctl start kafka@broker1
# 或
docker compose start kafka-1
  1. 等待副本追上进度重新加入 ISR,再次 describe 可见 ISR 恢复。生产与消费恢复稳定。

5) 小结与实践建议

  • 生产环境建议:replication-factor >= 3min.insync.replicas >= 2、生产者 acks=all
  • 在 ISR 数量不足时,acks=all 会拒绝写入而非“冒险”成功,这是避免数据丢失的关键机制。
  • Python 的 kafka-python 不提供生产端幂等/事务能力,如需更强语义(EOS),可选用 confluent-kafka-python 并开启 enable.idempotence 与事务 API(需要 Kafka 端到端配合)。

进阶练习:把生产者 --acks 切换为 1all,在不同 ISR 场景下对比成功率与风险;同时观察消费者端分区与位移的变化。


八、交互式可视化:实时查看 ISR 与发送/消费统计

为了更直观地观察副本 ISR 的变化与发送/消费成效,提供了一个轻量级的可视化页面与本地指标服务:

1) 指标服务代码(Flask)

# lesson_five/metrics_server.py
import json
import threading
import time
from collections import deque, defaultdict
from dataclasses import dataclass
from typing import Deque, Dict, List, Optional, Tuplefrom flask import Flask, jsonify, request
from kafka import KafkaAdminClient
from kafka.errors import KafkaErrorapp = Flask(__name__)@dataclass
class Counters:sent_ok: int = 0sent_err: int = 0recv: int = 0metrics_lock = threading.Lock()
metrics: Dict[str, Counters] = defaultdict(Counters)
recent_errors: Deque[Tuple[float, str]] = deque(maxlen=200)def record_error(msg: str) -> None:with metrics_lock:recent_errors.append((time.time(), msg))def inc(topic: str, key: str, value: int = 1) -> None:with metrics_lock:c = metrics[topic]if key == "sent_ok":c.sent_ok += valueelif key == "sent_err":c.sent_err += valueelif key == "recv":c.recv += value@app.route("/api/metrics")
def api_metrics():with metrics_lock:data = {t: {"sent_ok": c.sent_ok, "sent_err": c.sent_err, "recv": c.recv} for t, c in metrics.items()}errs = list(recent_errors)return jsonify({"metrics": data, "errors": errs, "ts": int(time.time() * 1000)})def describe_topic(bootstrap: str, topic: str) -> Optional[dict]:try:admin = KafkaAdminClient(bootstrap_servers=bootstrap, client_id="lesson5-metrics")# kafka-python AdminClient doesn't expose describe directly; use _client for metadatamd = admin._client.clustermd.request_update()# wait a bit for metadatadeadline = time.time() + 5while time.time() < deadline and not md.topics():time.sleep(0.1)if topic not in md.topics():return Noneparts = md.partitions_for_topic(topic) or []partitions = []for p in parts:leader = md.leader_for_partition(topic, p)replicas = md.replicas_for_partition(topic, p) or []isr = md.in_sync_replicas_for_partition(topic, p) or []partitions.append({"partition": p,"leader": getattr(leader, "id", leader),"replicas": [getattr(n, "id", n) for n in replicas],"isr": [getattr(n, "id", n) for n in isr],})brokers = [getattr(b, "nodeId", getattr(b, "id", None)) for b in md.brokers()]return {"topic": topic, "brokers": brokers, "partitions": partitions}except Exception as e:record_error(f"describe_topic error: {e}")return None@app.route("/api/isr")
def api_isr():bootstrap = request.args.get("bootstrap", "localhost:9092")topic = request.args.get("topic", "demo-reliability")data = describe_topic(bootstrap, topic)if not data:return jsonify({"ok": False, "error": "topic not found or metadata unavailable"}), 404return jsonify({"ok": True, "data": data, "ts": int(time.time() * 1000)})@app.route("/api/mark", methods=["POST"])
def api_mark():try:body = request.get_json(force=True) or {}except Exception:body = {}topic = body.get("topic", "demo-reliability")kind = body.get("kind", "sent_ok")  # sent_ok | sent_err | recvvalue = int(body.get("value", 1))inc(topic, kind, value)return jsonify({"ok": True})@app.route("/")
def root():return jsonify({"ok": True, "endpoints": ["/api/metrics", "/api/isr?topic=...", "/api/mark"]})def run(host: str = "127.0.0.1", port: int = 5005):app.run(host=host, port=port, debug=False)if __name__ == "__main__":run()

启动服务

python lesson_five/metrics_server.py

API 接口说明

  • GET /api/isr?bootstrap=...&topic=...:查询 Topic 的分区 Leader/Replicas/ISR
  • GET /api/metrics:返回发送/接收累计计数与错误列表
  • POST /api/mark:可手动上报一条计数(示例页面不需要手动调用)

2) 可视化页面代码

<!-- lesson_five/visualization.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8" /><meta name="viewport" content="width=device-width, initial-scale=1.0" /><title>Kafka ISR 与可靠性可视化</title><link rel="preconnect" href="https://cdn.jsdelivr.net" /><script src="https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js"></script><style>:root {--bg: #0f172a;--panel: #111827;--muted: #9ca3af;--text: #e5e7eb;--accent: #60a5fa;--good: #34d399;--warn: #f59e0b;--bad: #ef4444;--border: #374151;}html, body {margin: 0;padding: 0;background: var(--bg);color: var(--text);font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial;}.container { max-width: 1200px; margin: 0 auto; padding: 24px; }h1, h2 { font-weight: 600; letter-spacing: 0.2px; }h1 { font-size: 24px; margin: 0 0 12px; }h2 { font-size: 18px; margin: 24px 0 12px; }.panel { background: var(--panel); border: 1px solid var(--border); border-radius: 10px; padding: 16px; margin-bottom: 16px; }.row { display: flex; gap: 12px; flex-wrap: wrap; align-items: center; }label { color: var(--muted); }input, select { background: #0b1220; color: var(--text); border: 1px solid var(--border); border-radius: 8px; padding: 8px 10px; }input[type="text"]{ min-width: 220px; }button { background: #1f2937; color: var(--text); border: 1px solid var(--border); border-radius: 8px; padding: 8px 12px; cursor: pointer; }button:hover { border-color: var(--accent); }.grid { display: grid; grid-template-columns: 1fr; gap: 16px; }@media (min-width: 980px) { .grid { grid-template-columns: 1.1fr 0.9fr; } }.diagram { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; overflow: auto; }.logs { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; height: 200px; overflow: auto; font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; font-size: 12px; line-height: 1.5; }.stats { display: grid; grid-template-columns: repeat(3, 1fr); gap: 12px; }.stat { background: #0b1220; border: 1px solid var(--border); border-radius: 10px; padding: 12px; text-align: center; }.stat h3 { margin: 0 0 6px; font-size: 12px; color: var(--muted); font-weight: 500; }.stat .v { font-size: 20px; font-weight: 700; }</style>
</head>
<body><div class="container"><h1>Kafka ISR 与可靠性可视化</h1><div class="panel"><div class="row"><label>Bootstrap</label><input id="bootstrap" type="text" value="localhost:9092" /><label>Topic</label><input id="topic" type="text" value="demo-reliability" /><label>Metrics API</label><input id="api" type="text" value="http://127.0.0.1:5005" /><button id="refresh">刷新 ISR</button></div></div><div class="grid"><div class="panel"><h2>分区与 ISR</h2><div id="isr" class="diagram"></div></div><div class="panel"><h2>发送/接收统计</h2><div class="stats"><div class="stat"><h3>发送成功</h3><div id="sent_ok" class="v">0</div></div><div class="stat"><h3>发送失败</h3><div id="sent_err" class="v">0</div></div><div class="stat"><h3>消费条数</h3><div id="recv" class="v">0</div></div></div></div></div><div class="panel"><h2>近期错误</h2><div id="logs" class="logs"></div></div></div><script>(function ensureMermaid(cb){function ok(){ try{ mermaid.initialize({ startOnLoad: false, theme: 'dark', securityLevel: 'loose' }); cb(); }catch(e){ console.error(e); } }if (window.mermaid) return ok();const s = document.createElement('script');s.src = 'https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js';s.onload = ok; s.onerror = ok; document.head.appendChild(s);})(function(){ /* ready */ });const isrDiv = document.getElementById('isr');const logsDiv = document.getElementById('logs');const sentOkEl = document.getElementById('sent_ok');const sentErrEl = document.getElementById('sent_err');const recvEl = document.getElementById('recv');const bootstrapInput = document.getElementById('bootstrap');const topicInput = document.getElementById('topic');const apiInput = document.getElementById('api');const refreshBtn = document.getElementById('refresh');function log(line){const t = new Date().toLocaleTimeString();logsDiv.innerText += `[${t}] ${line}\n`;logsDiv.scrollTop = logsDiv.scrollHeight;}async function renderMermaid(targetEl, def) {try {const id = `m-${Math.random().toString(36).slice(2)}`;const out = await mermaid.render(id, def);targetEl.innerHTML = out.svg || '';} catch (e) {targetEl.innerHTML = `<pre style="white-space:pre-wrap;color:#fca5a5">渲染失败: ${String(e)}</pre>`;}}function buildIsrDiagram(data){// data: {brokers:number[], partitions:[{partition, leader, replicas[], isr[]}]}]let def = 'graph TD\n';def += '  subgraph "Topic ' + data.topic + '"\n';for (const p of data.partitions){const isr = p.isr.join(',');const leader = p.leader;const label = `P${p.partition} (L:${leader} | ISR:[${isr}])`;def += `    P${p.partition}["${label}"]\n`;}def += '  end\n';for (const b of data.brokers){def += `  B${b}((Broker ${b}))\n`;}for (const p of data.partitions){def += `  P${p.partition} --> B${p.leader}\n`;for (const r of p.replicas){ def += `  P${p.partition} -.replica.-> B${r}\n`; }}return def;}async function fetchIsr(){const api = apiInput.value.replace(/\/$/, '');const topic = encodeURIComponent(topicInput.value);const bootstrap = encodeURIComponent(bootstrapInput.value);try{const res = await fetch(`${api}/api/isr?topic=${topic}&bootstrap=${bootstrap}`);if (!res.ok) throw new Error('isr http ' + res.status);const j = await res.json();if (!j.ok) throw new Error('isr api not ok');const def = buildIsrDiagram(j.data);await renderMermaid(isrDiv, def);log('刷新 ISR 成功');}catch(e){ log('刷新 ISR 失败: ' + e); }}async function fetchMetrics(){const api = apiInput.value.replace(/\/$/, '');try{const res = await fetch(`${api}/api/metrics`);if (!res.ok) throw new Error('metrics http ' + res.status);const j = await res.json();const t = topicInput.value;const m = (j.metrics && j.metrics[t]) || {sent_ok:0, sent_err:0, recv:0};sentOkEl.innerText = m.sent_ok;sentErrEl.innerText = m.sent_err;recvEl.innerText = m.recv;// errors listconst errs = j.errors || [];if (errs.length){logsDiv.innerText = '';for (const [ts, line] of errs){const tline = new Date(ts*1000).toLocaleTimeString();logsDiv.innerText += `[${tline}] ${line}\n`;}}}catch(e){ /* ignore transient */ }}refreshBtn.addEventListener('click', fetchIsr);setInterval(fetchMetrics, 1000);setInterval(fetchIsr, 5000);setTimeout(fetchIsr, 300);</script>
</body>
</html>

使用说明

  • 填写 Kafka BootstrapTopicMetrics API 地址(默认 localhost:9092http://127.0.0.1:5005
  • 点击"刷新 ISR"或等待定时刷新,页面会展示每个分区的 Leader 与 ISR 列表,并通过 Mermaid 图连线到 Broker
  • 右侧看板实时展示:发送成功、发送失败、消费条数

统计的来源:

  • producer_reliability.pyconsumer_reliability.py 可在业务处理处通过 HTTP 调用 /api/mark 上报计数;
  • 本示例为了最少侵入,先使用“外部观察 + 手动触发”模式,推荐你按需将上报逻辑嵌入到生产/消费流程中。

3) 建议的演示流程

  1. 启动指标服务与可视化页面。
  2. 启动消费者:
python lesson_five/consumer_reliability.py --topic demo-reliability
  1. 启动生产者(默认 acks=all):
python lesson_five/producer_reliability.py --topic demo-reliability --count 50 --acks all
  1. 在多 Broker 环境下,依照“七、动手实战”的故障注入步骤逐一停/启 Broker:
    • 页面左侧 ISR 图会随 describe 结果变化(每 5s 刷新一次);
    • 发送失败计数会在 ISR < min.insync.replicasacks=all 场景上升;
    • 重新启动 Broker 后,ISR 追上恢复,可再次观察指标变化。

你也可以将生产者与消费者脚本中对成功/失败、消费计数的位置,调用 POST /api/mark 进行自动上报,使统计更精确。
image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/96248.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/96248.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Excel表格如何制作?【图文详解】表格Excel制作教程?电脑Excel表格制作?

一、问题背景 在日常办公中&#xff0c;无论是统计数据、整理报表&#xff0c;还是记录信息&#xff0c;Excel表格都是必不可少的工具。 但对新手来说&#xff0c;打开Excel后面对空白的单元格&#xff0c;常常不知道从何下手——不知道怎么选表格范围、怎么加边框让表格显形、…

阿里兵临城下,美团迎来至暗时刻?

9月10日&#xff0c;赶在阿里巴巴成立26周年之际&#xff0c;高德地图推出了首个基于用户行为产生的榜单“高德扫街榜”&#xff0c;被定义为“阿里生活服务超级新入口”&#xff0c;试图重新构建一套线下服务的信用体系。 上线第二天&#xff0c;就有媒体报道称“使用高德扫街…

Android逆向学习(十一) IDA动态调试Android so文件

Android逆向学习&#xff08;十一&#xff09; IDA动态调试Android so文件 一、 写在前面 这是吾爱破解论坛正己大大的第12个教程&#xff0c;并且发现一个神奇的事情&#xff0c;正己大大的教程竟然没有第11个&#xff0c;感觉很奇怪 写这个博客的主要原因是希望提供一种新的解…

Django全栈班v1.03 Linux常用命令 20250911 下午

课程定位 命令行 ! 黑客专属。 这套视频带你从Linux小白到命令行大师&#xff0c;涵盖文件管理文本处理系统监控网络操作。 零基础也能30分钟掌握程序员必备的技能。 课程亮点 1、零基础友好&#xff1a;从最基础的ls&#xff0c;cd命令开始&#xff0c;循序渐进 2、实战导向&a…

离线应用开发:Service Worker 与缓存

引言&#xff1a;离线应用开发在 Electron 中的 Service Worker 与缓存核心作用与必要性 在 Electron 框架的开发实践中&#xff0c;离线应用开发是提升用户体验和应用可用性的关键技术&#xff0c;特别是使用 Service Worker 实现缓存和离线功能&#xff0c;结合 Node.js 处理…

英发睿能闯关上市:业绩波动明显,毅达创投退出,临场“移民”

撰稿|张君来源|贝多商业&贝多财经近日&#xff0c;四川英发睿能科技股份有限公司&#xff08;下称“英发睿能”&#xff09;递交招股书&#xff0c;报考在港交所上市。据贝多商业&贝多财经了解&#xff0c;英发睿能还于9月3日披露《整体协调人公告&#xff0d;委任&…

Elixir通过Onvif协议控制IP摄像机,ExOnvif库给视频流叠加字符

Elixir 通过 ExOnvif 库&#xff0c;Onvif 协议可以控制IP摄像机等设备&#xff0c;这篇文章记录&#xff1a;使用ExOnvif库&#xff0c;给视频流叠加文字&#xff0c;使用ExOnvif库的接口模块&#xff1a;ExOnvif.Media、ExOnvif.Media2。 ExOnvif官方文档 此文章内容&#xf…

线程安全相关的注解

主要有下面三个加在类上的线程安全相关的注解。一.Immutable标记一个类为不可变的。这意味着该类的实例在构造完成后&#xff0c;其状态&#xff08;数据&#xff09;永远不能被更改。实现不可变性的严格条件&#xff08;Java内存模型中的定义&#xff09;&#xff1a;所有字段…

基于Springboot + vue3实现的在线智慧考公系统

项目描述本系统包含管理员、教师、用户三个角色。管理员角色&#xff1a;用户管理&#xff1a;管理系统中所有用户的信息&#xff0c;包括添加、删除和修改用户。配置管理&#xff1a;管理系统配置参数&#xff0c;如上传图片的路径等。权限管理&#xff1a;分配和管理不同角色…

赋能高效设计:12套中后台管理信息系统通用原型框架

中后台管理信息系统是企业数字化转型的核心引擎&#xff0c;肩负着提升运营效率、赋能精准决策的重任。面对多样化的业务场景和复杂的逻辑需求&#xff0c;如何快速、高质量地完成系统设计与原型构建&#xff0c;成为产品、设计与开发团队共同面临的挑战。 为此&#xff0c;一套…

LangGraph中ReAct模式的深度解析:推理与行动的完美融合——从理论到实践的智能Agent构建指南

在人工智能的演进历程中&#xff0c;ReAct&#xff08;Reasoning and Acting&#xff09;模式无疑是最具革命性的突破之一。它不仅仅是一种技术实现&#xff0c;更是对智能Agent思维模式的深刻重构。而LangGraph&#xff0c;作为这一理念的优秀实践者&#xff0c;将ReAct模式演…

蜂窝物联网模组在换电柜场景的发展前景分析

蜂窝物联网模组在换电柜场景中正迎来爆发式增长机遇&#xff0c;特别是在Cat.1技术路线主导的市场格局下&#xff0c;其应用价值已从基础通信服务拓展至安全监测、智能管理、电池溯源等核心领域&#xff0c;成为换电柜行业标准化、智能化升级的关键技术支撑。随着2025年新国标全…

机器学习之K折交叉验证

为了更好的评估机器学习训练出模型的泛化能力&#xff0c;即避免模型在训练集上表现良好&#xff0c;但在未见过的数据上表现不佳&#xff08;即过拟合&#xff09;&#xff0c;同时也减少了单一训练/测试集划分带来的随机性影响。一、什么是K折交叉验证&#xff1f;1、将数据集…

详细解读k8s的kind中service与pod的区别

Pod 是运行应用实例的“容器”&#xff0c;而 Service 是访问这些 Pod 的“稳定网络门户”。Pod&#xff08;容器组&#xff09;1. 核心概念&#xff1a; Pod 是 Kubernetes 中可以创建和管理的最小、最简单的计算单元。一个 Pod 代表集群上正在运行的一个工作负载实例。2. 职责…

python---PyInstaller(将Python脚本打包为可执行文件)

在Python开发中&#xff0c;我们常需要将脚本分享给不熟悉Python环境的用户。此时&#xff0c;直接提供.py文件需要对方安装Python解释器和依赖库&#xff0c;操作繁琐。PyInstaller作为一款主流的Python打包工具&#xff0c;能将脚本及其依赖打包为单个可执行文件&#xff08;…

利用归并算法对链表进行排序

/*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *next) : val(x), next(next) {}* };这里是链…

论文阅读_大模型情绪分析预测股票趋势

英文名称&#xff1a;Stock Price Trend Prediction using Emotion Analysis of Financial Headlines with Distilled LLM Model 中文名称&#xff1a;利用蒸馏大型语言模型对财务新闻标题情绪分析以预测股价趋势 链接: https://dl.acm.org/doi/pdf/10.1145/3652037.3652076作…

websocket和socket区别

websocket和socket区别&#xff0c;这是一个非常经典的问题。简单来说&#xff0c;Socket 是构建网络通信的工具和基础&#xff0c;而 WebSocket 是建立在它之上的一种具体的通信协议。可以把它们的关系想象成&#xff1a;Socket 像是修路和建立交通规则的基础工程。它定义了车…

网络复习1

1.网络协议栈 一般一个主机内的应用&#xff08;进程&#xff09;进行通信&#xff0c;直接在操作系统层面进行 进程交互即可。而不同位置两台主机进行通信需要通过网线传输信号&#xff0c;因此 这些通信的数据为网络数据&#xff0c;而网络数据进程传输必须从应用层依次向下…

AFSim2.9.0学习笔记 —— 4.2、ArkSIM文件结构介绍及项目结构整理

&#x1f514; AFSim2.9.0 相关技术、疑难杂症文章合集&#xff08;掌握后可自封大侠 ⓿_⓿&#xff09;&#xff08;记得收藏&#xff0c;持续更新中…&#xff09; 若还没有下载AFSim2.9.0完整软件或源码&#xff0c;请先进入本人另篇文章了解下载。 文章概要 本文主要对上篇…