目录
1. Kafka 简介
1.1 Kafka 核心概念
(1)消息系统 vs. 流处理平台
(2)核心组件
1.2 Kafka 核心特性
(1)高吞吐 & 低延迟
(2)持久化存储
(3)分布式 & 高可用
(4)水平扩展
(5)流处理能力
1.3 Kafka 典型应用场景
1.4 Kafka 架构示例
数据流示例(订单处理)
1.5 Kafka vs 其他消息队列
2. kafka部署
2.1 创建Namespace
2.2 创建ConfigMap
2.3 创建Headless Service
2.4 创建Statefulset
2.5 部署所有资源
2.6 检查kafka Pod状态
1. Kafka 简介
Apache Kafka 是一个 分布式流处理平台,主要用于构建 高吞吐量、低延迟、可扩展 的实时数据管道和流式应用程序。它最初由 LinkedIn 开发,后成为 Apache 顶级开源项目,广泛应用于大数据、日志聚合、事件驱动架构等领域。
1.1 Kafka 核心概念
(1)消息系统 vs. 流处理平台
-
传统消息队列(如 RabbitMQ):主要用于解耦生产者和消费者,保证消息可靠传递。
-
Kafka:
-
不仅是一个消息队列,还是一个 分布式流存储系统,支持持久化存储和流式计算。
-
适用于 高吞吐、大规模数据流 场景(如日志、指标、事件数据)。
-
(2)核心组件
组件 | 说明 |
---|---|
Producer(生产者) | 向 Kafka 发送消息(如日志、交易数据)。 |
Consumer(消费者) | 从 Kafka 读取并处理消息。 |
Broker(代理) | Kafka 服务器,负责存储和转发消息。 |
Topic(主题) | 消息的分类(类似数据库表),如 orders 、logs 。 |
Partition(分区) | 每个 Topic 可分成多个 Partition,提高并行处理能力。 |
Offset(偏移量) | 每条消息在 Partition 中的唯一 ID(类似数据库主键)。 |
Consumer Group(消费者组) | 多个消费者共同消费一个 Topic,实现负载均衡。 |
ZooKeeper | 管理 Kafka 集群元数据(新版本 Kafka 已逐步移除依赖)。 |
1.2 Kafka 核心特性
(1)高吞吐 & 低延迟
-
支持每秒百万级消息处理(取决于硬件配置)。
-
采用 顺序 I/O(相比随机 I/O 更快)和 零拷贝 技术优化性能。
(2)持久化存储
-
消息默认持久化到磁盘(可配置保留时间),支持 重放(replay) 数据。
-
适用于 事件溯源(Event Sourcing) 和 审计日志。
(3)分布式 & 高可用
-
支持 多副本(Replication),防止数据丢失。
-
自动故障转移(Leader/Follower 机制)。
(4)水平扩展
-
可动态增加 Broker 和 Partition,提升吞吐量。
(5)流处理能力
-
配合 Kafka Streams 或 ksqlDB 可实现实时流计算(如聚合、窗口计算)。
1.3 Kafka 典型应用场景
场景 | 说明 |
---|---|
日志聚合 | 收集应用日志(替代 ELK 中的 Logstash)。 |
消息队列 | 解耦微服务,如订单系统 → 库存系统。 |
实时数据处理 | 结合 Flink/Spark Streaming 做实时分析。 |
事件驱动架构 | 如用户行为追踪、IoT 设备数据采集。 |
Commit Log(提交日志) | 数据库变更捕获(CDC),如 Debezium + Kafka。 |
1.4 Kafka 架构示例
生产者(Producer) → Kafka Cluster(Broker1, Broker2...)↓ 消费者(Consumer Group)→ 实时处理(Flink/Spark)↓存储(HDFS/DB)
数据流示例(订单处理)
-
订单服务(Producer)发送消息到
orders
Topic。 -
库存服务(Consumer)读取
orders
消息,扣减库存。 -
分析服务(Consumer)统计实时销售额。
1.5 Kafka vs 其他消息队列
特性 | Kafka | RabbitMQ | Pulsar |
---|---|---|---|
吞吐量 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ |
延迟 | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
持久化 | 支持(磁盘) | 可选(内存/磁盘) | 支持 |
流处理 | 原生支持(Kafka Streams) | 不支持 | 支持(Pulsar Functions) |
适用场景 | 大数据、日志 | 任务队列、RPC | 多租户、云原生 |
✅ 适用 Kafka 的场景:
-
需要高吞吐、持久化存储的实时数据流(如日志、事件)。
-
流处理(如实时分析、监控)。
❌ 不适用 Kafka 的场景:
-
需要复杂路由(RabbitMQ 更合适)。
-
低延迟任务队列(Redis Streams/RabbitMQ 更好)。
Kafka 已成为现代数据架构的核心组件,广泛应用于大数据、微服务、实时计算等领域。
2. kafka部署
2.1 创建Namespace
kubectl create namespace elk
2.2 创建ConfigMap
vim kafka-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata:name: ldc-kafka-scriptsnamespace: elk data:setup.sh: |- #启动脚本#!/bin/bashexport KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
2.3 创建Headless Service
vim kafka-headless.yaml
apiVersion: v1 kind: Service metadata:name: kafka-headlessnamespace: elk spec:clusterIP: Noneselector:app: kafkaports:- name: brokerport: 9092- name: controllerport: 9093
2.4 创建Statefulset
vim kafka-statefulset.yaml
apiVersion: apps/v1 kind: StatefulSet metadata:name: kafkanamespace: elklabels:app: kafka spec:selector:matchLabels:app: kafkaserviceName: kafka-headlesspodManagementPolicy: Parallelreplicas: 1 #根据资源情况设置实例数,推荐3个副本updateStrategy:type: RollingUpdatetemplate:metadata:labels:app: kafkaspec:affinity:nodeAffinity: #这里做了节点亲和性调度到master节点requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: node-role.kubernetes.io/control-planeoperator: Exists#values:#- mastertolerations:- key: "node-role.kubernetes.io/control-plane"operator: "Exists"effect: "NoSchedule"containers:- name: kafkaimage: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.4.0imagePullPolicy: "IfNotPresent"command:- /opt/leaderchain/setup.shenv:- name: BITNAMI_DEBUGvalue: "true" #详细日志# KRaft settings - name: MY_POD_NAME # 用于生成KAFKA_CFG_NODE_IDvalueFrom:fieldRef:fieldPath: metadata.name - name: KAFKA_CFG_PROCESS_ROLESvalue: "controller,broker"- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: "0@kafka-0.kafka-headless:9093" #修改实例数时要更新- name: KAFKA_KRAFT_CLUSTER_IDvalue: "Jc7hwCMorEyPprSI1Iw4sW" # Listeners - name: KAFKA_CFG_LISTENERSvalue: "PLAINTEXT://:9092,CONTROLLER://:9093"- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: "PLAINTEXT://:9092"- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: "CONTROLLER"- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: "PLAINTEXT"- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"ports:- containerPort: 9092name: broker- containerPort: 9093name: controllerprotocol: TCP volumeMounts:- mountPath: /bitnami/kafkaname: kafka-data- mountPath: /opt/leaderchain/setup.shname: scriptssubPath: setup.shreadOnly: true securityContext:fsGroup: 1001runAsUser: 1001volumes: - configMap:defaultMode: 493name: ldc-kafka-scripts #ConfigMap的名字name: scripts volumeClaimTemplates:- apiVersion: v1kind: PersistentVolumeClaimmetadata:name: kafka-dataspec:accessModes: [ "ReadWriteOnce" ] storageClassName: nfs-client #存储类的名称resources:requests:storage: 1Gi
2.5 部署所有资源
[root@master1 Kafka]# ls kafka-configmap.yaml kafka-headless.yaml kafka-statefulset.yaml [root@master1 Kafka]# kubectl apply -f ./ configmap/ldc-kafka-scripts created service/kafka-headless created statefulset.apps/kafka created
2.6 检查kafka Pod状态
[root@master1 Kafka]# kubectl get pod -n elk NAME READY STATUS RESTARTS AGE filebeat-6db9l 1/1 Running 0 62m filebeat-qllxg 1/1 Running 0 62m filebeat-r5hw7 1/1 Running 0 62m kafka-0 1/1 Running 0 2m2s