第一讲、Kafka 初识与环境搭建

一、Kafka 是什么?

Apache Kafka 是一个分布式的消息队列(Message Queue)流处理平台
它最早由 LinkedIn 开发,后来捐赠给 Apache 基金会,现已广泛应用于日志收集、实时数据管道和大数据处理。

Kafka 的特点:

  • 高吞吐:单机可处理百万级消息/秒。
  • 低延迟:毫秒级别,支持实时应用。
  • 可扩展:天然支持水平扩展,集群规模可随需求增加。
  • 持久化:消息存储在磁盘日志文件中,可回放和追溯。
  • 可靠性:数据副本机制,保证故障恢复。

二、为什么需要消息队列?

在分布式系统中,如果系统间直接同步调用,会遇到:

  1. 耦合度高:调用链复杂,服务间强依赖。
  2. 抗压能力差:高并发流量容易压垮下游。
  3. 响应时间长:耗时任务导致用户体验差。
  4. 数据丢失风险:网络波动或服务宕机时,消息容易丢。

消息队列的优势:

  • 解耦:上游只需把消息放进队列,不关心下游实现。
  • 削峰填谷:高峰请求存入队列,消费者按能力消费。
  • 异步处理:耗时任务后台执行,前端快速响应。
  • 可追溯:消息持久化,可回放,便于数据分析。

三、Kafka 与 RabbitMQ、RocketMQ 对比

特性KafkaRabbitMQRocketMQ
定位分布式日志系统 / 流处理平台传统消息队列分布式消息中间件(阿里出品)
消息模型发布-订阅(Topic/Partition)队列 + 交换机Topic + Tag
吞吐量极高(百万级/秒)中等(万级/秒)高(十万级/秒)
延迟毫秒级毫秒级毫秒级
持久化磁盘顺序写(高效)内存 + 磁盘磁盘存储
场景日志收集、实时数据流传统异步通信金融、电商事务消息

👉 总结:

  • Kafka 擅长大数据、日志流处理
  • RabbitMQ 更适合传统企业系统的异步解耦
  • RocketMQ 常用于金融、电商,强调事务消息

四、Kafka 核心组件

  • Producer(生产者):消息发送方。
  • Consumer(消费者):消息消费方。
  • Broker(代理节点):Kafka 服务器实例,集群由多个 Broker 组成。
  • Topic(主题):消息按主题分类。
  • Partition(分区):一个主题可拆分成多个分区,提升并行能力。
  • Offset(偏移量):消费者在分区里的游标位置。
  • Consumer Group(消费者组):一组消费者共享消费任务,实现负载均衡。

📌 工作原理简图

Producer → Topic →   Partition0 [offset 0,1,2…]Partition1 [offset 0,1,2…]Partition2 [offset 0,1,2…]Consumer Group G1:- Consumer A ← Partition0- Consumer B ← Partition1- Consumer C ← Partition2

Kafka 的工作原理

  1. Producer 把消息发送到 Topic。
  2. Kafka 将消息按分区存储,每个分区内保证顺序。
  3. 消费者组里的消费者按分区消费,每条消息只会被组内一个消费者消费。
  4. Offset 保证消费者能从上次中断的位置继续。

📊 可视化架构图

Producer
Broker
Topic demo.hello
Partition 0
Partition 1
Partition 2
Consumer A (G1)
Consumer B (G1)
Consumer C (G1)

五、本地快速环境搭建

方案一:Mac 安装 Kafka(推荐)

1. 使用 Homebrew 安装

Homebrew 是 Mac 最简单的安装方式:

# 安装 Homebrew(如果尚未安装)
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"# 安装 Kafka(会自动安装 Zookeeper 依赖)
brew install kafka# 查看安装位置
brew --prefix kafka
# 通常在 /opt/homebrew/bin/kafka 或 /usr/local/bin/kafka
2. 启动服务
# 启动 Zookeeper
brew services start zookeeper# 启动 Kafka
brew services start kafka# 或者手动启动(前台运行)
zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建 Topic
kafka-topics --create --topic demo.hello \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 1# 验证创建成功
kafka-topics --list --bootstrap-server localhost:9092
4. 停止服务
# 停止 Kafka 和 Zookeeper
brew services stop kafka
brew services stop zookeeper

方案二:传统安装(手动下载)

如果不使用 Homebrew,也可以手动安装:

1. 下载
# 下载 Kafka
cd ~/Downloads
wget https://downloads.apache.org/kafka/2.13-3.6.0/kafka_2.13-3.6.0.tgz# 解压
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0# 添加到 PATH(可选)
echo 'export PATH="$HOME/Downloads/kafka_2.13-3.6.0/bin:$PATH"' >> ~/.zshrc
source ~/.zshrc
2. 启动服务
# 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 新终端启动 Kafka
bin/kafka-server-start.sh config/server.properties
3. 创建 Topic
bin/kafka-topics.sh --create --topic demo.hello \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 1

方案三:Docker 一键启动(推荐新手)

如果不想本机安装,可用 Docker 快速拉起单机环境:

1. 创建 docker-compose.yml

在项目根目录创建 docker-compose.yml

version: '3.8'
services:zookeeper:image: bitnami/zookeeper:3.8environment:- ALLOW_ANONYMOUS_LOGIN=yesports:- "2181:2181"kafka:image: bitnami/kafka:3ports:- "9092:9092"environment:- KAFKA_BROKER_ID=1- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092- KAFKA_LISTENERS=PLAINTEXT://:9092depends_on:- zookeeper
2. 启动/停止
# 启动服务
docker compose up -d# 查看日志
docker compose logs -f kafka# 停止服务
docker compose down -v

安装 Python 环境和 kafka-python

1. Python 环境准备
# 检查 Python 版本(推荐 3.7+)
python3 --version# 创建虚拟环境(推荐)
python3 -m venv kafka_env
source kafka_env/bin/activate# 或使用 conda
conda create -n kafka_env python=3.9
conda activate kafka_env
2. 安装 kafka-python
# 基础安装
pip install kafka-python# 如果需要额外功能,可以安装可选依赖
pip install kafka-python[crc32c]  # 更快的CRC32校验# 验证安装
python -c "import kafka; print(f'kafka-python version: {kafka.__version__}')"
3. 环境变量配置
# 设置 Kafka 连接地址(可选)
export KAFKA_BOOTSTRAP=localhost:9092# 添加到 shell 配置文件(永久生效)
echo 'export KAFKA_BOOTSTRAP=localhost:9092' >> ~/.zshrc
source ~/.zshrc

六、案例:Hello Kafka

1. 命令行体验

启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo.hello

输入几条消息:

hello-1
hello-2
hello-3
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic demo.hello --from-beginning

输出:

hello-1
hello-2
hello-3

🎉 恭喜!你完成了第一个 Kafka "生产-消费"实验。

2. Python 代码实现

我们提供了增强版的 Python 脚本,展示了消息键、头部、批量发送、消息过滤等高级特性:

核心脚本介绍
  • lesson01_hello_producer.py - 增强版生产者,支持消息键、自定义头部、批量发送
  • lesson01_hello_consumer.py - 增强版消费者,支持消息过滤、偏移量管理、错误处理
  • lesson01_consumer_group_demo.py - 消费者组演示,展示分区分配和负载均衡
  • common.py - 公共工具函数,包含生产者和消费者工厂函数
运行示例
# 1. 先启动消费者(观察消息)
python lesson01_hello_consumer.py# 2. 新终端启动生产者(发送消息)
python lesson01_hello_producer.py# 3. 或运行消费者组演示
python lesson01_consumer_group_demo.py
增强版生产者特性

生产者脚本包含两个函数,演示不同场景:

带键消息发送

# 发送带键的消息,确保相同键分配到相同分区
user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
key = random.choice(user_ids)# 构造消息内容
message = {"id": i + 1,"user_id": key,"message": f"Hello from producer! Message #{i + 1}","timestamp": datetime.now().isoformat(),"source": "lesson01_producer"
}# 添加自定义头部
headers = [("message_type", b"greeting"),("priority", b"normal"),("batch_id", str(i // 3).encode())
]

批量发送优化

# 高吞吐量配置
producer = make_producer(acks="all",               # 等待所有副本确认linger_ms=50,             # 批量发送延迟batch_size=32768,         # 更大的批次compression_type="gzip",  # 启用压缩
)# 预热发送确保连接稳定
producer.send(TOPIC, value={"warmup": True}).get(timeout=30)# 批量异步发送 + 统一flush
futures = [producer.send(TOPIC, value=msg) for msg in messages]
producer.flush(timeout=60)
增强版消费者特性

消息过滤

def filter_messages(self, message):"""消息过滤逻辑"""headers_raw = _headers_map(message)# 只处理 message_type=greetingmsg_type = _hget(headers_raw, "message_type", None)if msg_type is not None and msg_type != "greeting":return False# 只处理特定用户if isinstance(message.value, dict) and "user_id" in message.value:return message.value["user_id"] in {"user_001", "user_002"}return True

优雅关闭和统计

# 信号处理优雅关闭
signal.signal(signal.SIGINT, self.signal_handler)# 实时统计
def print_stats(self):duration = time.time() - self.start_timethroughput = self.message_count / durationprint(f"📊 消费统计: {self.message_count} 条消息, "f"吞吐量: {throughput:.2f} 消息/秒")
预期输出示例

生产者输出

🎬 Kafka Producer 示例开始
⏳ 等待Kafka服务就绪...
✅ Kafka服务已就绪
🚀 启动带键的生产者...
✅ 消息 1 发送成功: topic=demo.hello, partition=1, offset=10, key=user_002
✅ 消息 2 发送成功: topic=demo.hello, partition=0, offset=8, key=user_001
📦 启动批量消息生产者...
🧩 主题 demo.hello 分区: [0, 1, 2] | 压缩: gzip
🔥 预热完成
🎯 批量发送完成:成功 50/50,耗时 2.15s,吞吐 23.3 msg/s

消费者输出

🎬 Kafka Consumer 示例开始
⏳ 等待Kafka服务就绪...
✅ Kafka服务已就绪
🎯 消费者已创建: topic=demo.hello, group_id=lesson01_consumer_group
🚀 开始消费消息... (按 Ctrl+C 停止)📨 消息 #1主题: demo.hello分区: 1偏移量: 10键: user_002头部: {"message_type": "greeting", "priority": "normal"}内容: {"id": 1,"user_id": "user_002","message": "Hello from producer! Message #1","timestamp": "2024-01-15T10:30:15.123456"}👤 用户消息: user_002
配置参数说明

你可以在 common.py 中调整各种参数:

生产者参数

  • acks - 确认级别(0/1/all),影响可靠性
  • enable_idempotence - 幂等性,避免重复消息
  • linger_ms - 批量发送延迟,影响吞吐量
  • batch_size - 批次大小,影响内存使用
  • compression_type - 压缩类型(gzip/snappy/lz4)

消费者参数

  • auto_offset_reset - 偏移量重置策略
    • earliest: 从最早的消息开始消费(适合首次启动或重新处理历史数据)
    • latest: 从最新的消息开始消费(适合只关心新消息的场景)
    • none: 如果没有已提交的偏移量则抛出异常
  • enable_auto_commit - 自动提交偏移量
    • True: 每隔 auto_commit_interval_ms 自动提交,简单但可能重复消费
    • False: 手动提交,可精确控制处理完成后再提交,避免消息丢失
  • max_poll_records - 单次拉取最大消息数
    • 默认500,建议根据消息大小和处理速度调整
    • 数值越大吞吐量越高,但内存占用和处理延迟也会增加
  • session_timeout_ms - 会话超时时间(默认30秒)
    • 消费者多久没有发送心跳就被认为已死亡
    • 过短容易误判,过长影响故障检测速度
  • heartbeat_interval_ms - 心跳间隔时间(默认3秒)
    • 向协调器发送心跳的频率,通常设为 session_timeout_ms 的 1/3
    • 确保消费者存活状态及时同步
参数配置示例

高可靠性消费者配置(金融、订单等关键业务):

consumer = make_consumer(topic="critical_orders",group_id="order_processing_group",auto_offset_reset="earliest",      # 确保不丢失消息enable_auto_commit=False,          # 手动提交,确保处理完才确认max_poll_records=10,               # 小批量处理,降低风险session_timeout_ms=60000,          # 60秒超时,避免网络抖动误判heartbeat_interval_ms=20000        # 20秒心跳,减少网络开销
)

高吞吐量消费者配置(日志、监控等允许少量丢失的场景):

consumer = make_consumer(topic="application_logs",group_id="log_analysis_group", auto_offset_reset="latest",        # 只处理新日志enable_auto_commit=True,           # 自动提交,简化逻辑max_poll_records=500,              # 大批量处理,提升吞吐session_timeout_ms=10000,          # 10秒超时,快速故障检测heartbeat_interval_ms=3000         # 3秒心跳,及时同步状态
)

实时处理消费者配置(消息推送、实时计算等):

consumer = make_consumer(topic="realtime_events",group_id="realtime_processing_group",auto_offset_reset="latest",        # 只处理最新事件enable_auto_commit=False,          # 手动提交,控制处理节奏max_poll_records=50,               # 中等批量,平衡延迟和吞吐session_timeout_ms=15000,          # 15秒超时,快速响应heartbeat_interval_ms=5000         # 5秒心跳,保持连接活跃
)

七、小结

  • Kafka 是一款分布式消息中间件,擅长高吞吐和实时流处理。
  • 消息队列的价值:解耦、削峰、异步、可靠。
  • 与 RabbitMQ、RocketMQ 对比,Kafka 更适合大数据、日志、实时管道
  • 核心概念:Producer、Consumer、Broker、Topic、Partition、Offset。
  • 本地环境可快速启动 Zookeeper + Kafka,并用命令行体验"Hello Kafka"。
  • Python 代码示例展示了如何在实际项目中使用 Kafka。

附录 A:术语速查(Glossary)

  • Topic:消息主题,按业务维度分类。
  • Partition:主题的分片,提升并发度;分区内有序,分区间无序。
  • Offset:偏移量,消费者在分区内的位置游标。
  • Consumer Group:同一组内的消费者共享分区实现水平扩展。
  • Replication Factor:副本数,提高容灾能力。
  • ACKS:生产者写入确认级别(0/1/all)。
  • Idempotence:幂等写入,避免重复消息。
  • Broker:Kafka 服务器实例,负责消息存储和转发。
  • Producer:消息生产者,负责向 Topic 发送消息。
  • Consumer:消息消费者,负责从 Topic 消费消息。

附录 B:常见问题排查(Troubleshooting)

  • 错误:NoBrokersAvailable

    • 检查 Kafka 是否启动、localhost:9092 端口是否可达,或设置 KAFKA_BOOTSTRAP
  • 错误:连接被拒绝或超时

    • 如使用 Docker,确保 KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092;Windows/WLS2 或远程容器需用宿主 IP。
  • 主题不存在/消费不到数据

    • 先用 CLI 创建 demo.hello;或确保 Broker 允许自动创建主题;消费者组首次消费需设置 auto_offset_reset=earliest(已在示例中设置)。
  • 端口占用

    • 停掉本地占用 2181/9092 的进程,或改端口。
  • Docker 容器启动失败

    • 检查 Docker 服务状态,确保有足够内存(建议 4GB+)。
    • 查看容器日志:docker compose logs zookeeperdocker compose logs kafka

附录 C:练习与思考(建议动手)

基础练习

  1. Mac 环境搭建:使用 Homebrew 安装 Kafka,体验 brew services 管理服务的便利性。

  2. 分区分配观察

    # 修改分区数并观察
    kafka-topics --alter --topic demo.hello --partitions 2 --bootstrap-server localhost:9092# 同时启动 2 个消费者(同一组)
    python lesson01_consumer_group_demo.py
    
  3. 消息键实验

    • 运行生产者,观察相同 user_id 的消息如何分配到相同分区
    • 修改 user_ids 列表,测试不同键的分区分配

进阶实验

  1. 性能对比测试

    # 在 lesson01_hello_producer.py 中修改配置
    # 测试1:高可靠性
    producer = make_producer(acks="all", enable_idempotence=True)# 测试2:高吞吐量
    producer = make_producer(acks="1", linger_ms=100, batch_size=65536)# 测试3:无确认(最快但不可靠)
    producer = make_producer(acks="0", linger_ms=0)
    
  2. 压缩效果对比

    # 分别测试不同压缩算法
    compression_types = [None, "gzip", "snappy", "lz4"]
    # 观察吞吐量和网络使用的差异
    
  3. 消息过滤实验

    • 修改消费者的 filter_messages() 函数
    • 测试只消费特定优先级或特定用户的消息
    • 观察过滤对性能的影响
  4. 错误处理和重试

    # 在发送过程中故意制造错误
    # 观察重试机制和幂等性的效果
    try:future = producer.send("nonexistent-topic", value=message)future.get(timeout=5)
    except KafkaError as e:print(f"发送失败:{e}")
    

消费者组实验

  1. 负载均衡观察

    # 启动消费者组演示,观察分区分配
    python lesson01_consumer_group_demo.py# 动态添加/移除消费者,观察 rebalance 过程
    
  2. 偏移量管理

    • 对比自动提交 vs 手动提交的区别
    • 模拟消费者崩溃,观察重启后的消费位置
    • 尝试重置偏移量:
    kafka-consumer-groups --bootstrap-server localhost:9092 \--group lesson01_consumer_group --reset-offsets --to-earliest \--topic demo.hello --execute
    

性能调优实验

  1. 批量大小优化

    # 测试不同的 batch_size 值
    batch_sizes = [1024, 16384, 32768, 65536]
    # 记录每种配置下的吞吐量
    
  2. 预热效果验证

    • 注释掉预热代码,对比首次发送的延迟
    • 观察元数据获取对性能的影响
  3. 连接池优化

    # 测试不同的连接参数
    max_in_flight_requests = [1, 5, 10]
    # 观察对吞吐量和消息顺序的影响
    

思考题与参考答案

  1. 架构设计

Q: 为什么 Kafka 选择"分区内有序,分区间无序"的设计?

A: 这是性能与一致性的巧妙平衡:

  • 分区内有序:保证单分区内消息按发送顺序消费,满足大多数业务场景的顺序需求
  • 分区间无序:允许多分区并行处理,大幅提升吞吐量
  • 设计优势
    • 避免了全局排序的性能瓶颈
    • 通过消息键确保相关消息在同一分区内有序
    • 支持水平扩展,分区数可根据需求调整

Q: 消费者组中的消费者数量超过分区数会发生什么?

A: 会出现消费者闲置情况:

主题有3个分区,但消费者组有5个消费者:
分区0 -> 消费者A ✅
分区1 -> 消费者B ✅  
分区2 -> 消费者C ✅
空闲   -> 消费者D ❌ (无分区分配)
空闲   -> 消费者E ❌ (无分区分配)
  • 最佳实践:消费者数量 ≤ 分区数
  • 动态调整:当消费者退出时,闲置消费者会被重新分配分区

Q: 如何设计一个高可用的 Kafka 集群?

A: 关键要素包括:

# 集群配置示例
集群规模: >= 3 个 Broker (奇数个,便于选举)
副本因子: >= 3 (容忍 1 个 Broker 故障)
分区分布: 副本分散在不同 Broker 上
网络隔离: 不同机架/可用区部署
监控告警: JMX 指标 + 日志监控
备份策略: 定期快照 + 增量备份
  1. 性能权衡

Q: acks=all vs acks=1 vs acks=0 的性能和可靠性差异?

A: 三种模式的对比:

模式可靠性性能使用场景
acks=0最低最高日志收集、指标上报
acks=1中等中等一般业务消息
acks=all最高最低金融交易、订单处理
# 性能测试示例
# acks=0: ~100,000 msg/s, 0% 确保送达
# acks=1: ~50,000 msg/s,  99% 确保送达  
# acks=all: ~20,000 msg/s, 99.99% 确保送达

Q: 启用压缩的CPU开销是否值得?

A: 通常值得,特别是网络带宽有限时:

压缩效果对比(JSON消息):
无压缩:   100MB/s 网络, 0% CPU
gzip:     40MB/s 网络,  5% CPU  (压缩比60%)
snappy:   60MB/s 网络,  2% CPU  (压缩比40%)
lz4:      70MB/s 网络,  1% CPU  (压缩比30%)
  • 推荐:生产环境使用 gzipsnappy
  • 权衡:网络成本 > CPU成本时启用压缩

Q: 批量发送的延迟 vs 吞吐量权衡?

A: 关键在于 linger.ms 的设置:

# 延迟敏感应用
linger_ms=0      # 立即发送,延迟 <1ms,吞吐量较低
linger_ms=5      # 等待5ms,延迟 ~5ms,吞吐量提升50%# 吞吐量优先应用  
linger_ms=50     # 等待50ms,延迟 ~50ms,吞吐量提升200%
linger_ms=100    # 等待100ms,延迟 ~100ms,吞吐量提升300%
  1. 实际应用

Q: 如何选择合适的分区数?

A: 综合考虑多个因素:

分区数计算公式:
分区数 = max(目标吞吐量 / 单分区吞吐量,目标吞吐量 / (消费者数 × 单消费者吞吐量)
)实际建议:
- 起始值:max(预期消费者数, 目标吞吐量MB/s ÷ 10)
- 上限:单Broker建议不超过1000个分区
- 扩展:只能增加不能减少,提前规划

Q: 什么时候使用消息键,什么时候不用?

A: 根据业务需求决定:

使用消息键的场景:

# 1. 需要顺序处理
user_orders = {"user_id": "12345", "order": "..."}
producer.send("orders", key="12345", value=user_orders)# 2. 相关消息聚合
log_events = {"session_id": "abc", "event": "click"}  
producer.send("logs", key="abc", value=log_events)# 3. 负载均衡
producer.send("tasks", key=f"worker_{task_id % 10}", value=task)

不使用消息键的场景:

# 1. 独立事件(随机分布即可)
metrics = {"cpu_usage": 80, "timestamp": "..."}
producer.send("metrics", value=metrics)  # key=None# 2. 需要最大并行度
notifications = {"message": "系统维护通知"}
producer.send("notifications", value=notifications)

Q: 如何处理消费者处理失败的消息?

A: 多种策略组合使用:

# 1. 重试机制
def process_message(message):max_retries = 3for attempt in range(max_retries):try:# 业务处理逻辑handle_business_logic(message.value)return Trueexcept RetryableError as e:if attempt == max_retries - 1:send_to_dlq(message)  # 发送到死信队列time.sleep(2 ** attempt)  # 指数退避except FatalError:send_to_dlq(message)      # 直接进入死信队列return False# 2. 死信队列模式
def send_to_dlq(message):dlq_producer.send("dead_letter_queue",key=message.key,value={"original_topic": message.topic,"original_message": message.value,"error_reason": "processing_failed","failed_at": datetime.now().isoformat()})# 3. 手动确认模式
consumer = make_consumer("my_topic", group_id="my_group",enable_auto_commit=False  # 关闭自动提交
)for message in consumer:try:process_message(message)consumer.commit()  # 处理成功才提交except Exception:# 不提交,消息会重新消费continue

最佳实践组合:

  1. 幂等处理 - 确保重复消费不会产生副作用
  2. 错误分类 - 区分可重试错误和致命错误
  3. 监控告警 - 监控失败率和死信队列大小
  4. 人工介入 - 定期处理死信队列中的消息

扩展挑战

  1. 监控和可观测性

    • 添加消息发送成功率统计
    • 实现消费延迟监控
    • 添加分区级别的吞吐量统计
  2. 错误恢复机制

    • 实现死信队列(DLQ)
    • 添加消息重试机制
    • 实现优雅降级策略
  3. 多环境适配

    • 支持 SASL/SSL 认证
    • 适配云服务(如 Confluent Cloud)
    • 实现配置外部化

完整案例代码

common:

# -*- coding: utf-8 -*-
import os, json, time
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import NoBrokersAvailable, KafkaTimeoutError, KafkaError# 优先用 IPv4,避免 macOS 把 localhost 解析为 ::1 的坑
BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "127.0.0.1:9092")
DEFAULT_COMPRESSION = os.getenv("KAFKA_COMPRESSION", "gzip")  # gzip/snappy/lz4/zstd/none# ─────────────────────────── 基础连通性 ───────────────────────────def wait_kafka(timeout=30):"""等待 Kafka 可用(尝试建立短连接)。可结合上层打印 BOOTSTRAP,便于排查。"""start = time.time()last_err = Nonewhile time.time() - start < timeout:try:KafkaProducer(bootstrap_servers=BOOTSTRAP).close()returnexcept Exception as e:last_err = etime.sleep(1)raise RuntimeError(f"Kafka not available at {BOOTSTRAP}; last error: {last_err}")# ─────────────────────────── Producer ───────────────────────────def _normalize_compression(name: str | None):if not name or str(name).lower() in ("none", "false", "0", ""):return Nonereturn str(name).lower()def make_producer(acks: str = "all",enable_idempotence: bool = False,   # 仅作语义标记;kafka-python 不支持该开关linger_ms: int = 50,                # 适中等待,避免首条卡顿batch_size: int = 64 * 1024,        # 64KB 批compression_type: str | None = DEFAULT_COMPRESSION,security: dict | None = None,       # {username, password}(如接入云厂商时)request_timeout_ms: int = 60_000,retries: int = 8,retry_backoff_ms: int = 300,max_in_flight_requests_per_connection: int = 5,metadata_max_age_ms: int = 10_000,
):"""生产者:稳健默认值- acks="all" 生产更可靠(单副本等价于1)- gzip 默认免安装;snappy/lz4/zstd 需额外依赖- 如需幂等/事务,请使用 confluent-kafka 客户端"""params = dict(bootstrap_servers=BOOTSTRAP,acks=acks,value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),key_serializer=lambda v: None if v is None else str(v).encode("utf-8"),linger_ms=linger_ms,batch_size=batch_size,compression_type=_normalize_compression(compression_type),retries=retries,retry_backoff_ms=retry_backoff_ms,request_timeout_ms=request_timeout_ms,max_in_flight_requests_per_connection=max_in_flight_requests_per_connection,metadata_max_age_ms=metadata_max_age_ms,)# 可选:SASL/SSL(连云服务时开启)# if security:#     params.update(dict(#         security_protocol="SASL_SSL",#         sasl_mechanism="PLAIN",#         sasl_plain_username=security["username"],#         sasl_plain_password=security["password"],#     ))# 处理压缩库缺失的场景:snappy/lz4/zstd 没装则自动降级到 gziptry:return KafkaProducer(**params)except AssertionError as e:msg = str(e).lower()if "compression codec" in msg:# 自动降级到 gzipparams["compression_type"] = "gzip"return KafkaProducer(**params)raisedef warmup_producer(producer: KafkaProducer, topic: str, timeout: int = 30):"""发送一条极小消息并等待 ACK,完成连接/元数据/leader 预热。"""fut = producer.send(topic, value={"__warmup__": True, "ts": datetime.now().isoformat()})fut.get(timeout=timeout)producer.flush(timeout=timeout)# ─────────────────────────── Consumer ───────────────────────────def make_consumer(topic: str,group_id: str | None,auto_offset_reset: str = "earliest",     # 教学用 earliest;生产更常用 latestenable_auto_commit: bool = False,        # 推荐手动提交,先处理后提交max_poll_records: int = 100,security: dict | None = None,session_timeout_ms: int = 10_000,heartbeat_interval_ms: int = 3_000,request_timeout_ms: int = 305_000,fetch_min_bytes: int = 1,fetch_max_wait_ms: int = 500,fetch_max_bytes: int = 50 * 1024 * 1024, # 50MBconsumer_timeout_ms: int = 0,            # >0: 无消息 N ms 后抛 StopIterationretry_backoff_ms: int = 100,reconnect_backoff_ms: int = 50,reconnect_backoff_max_ms: int = 1000,
):"""消费者:稳健默认值(更少超时、更平滑批量)"""params = dict(bootstrap_servers=BOOTSTRAP,group_id=group_id,auto_offset_reset=auto_offset_reset,enable_auto_commit=enable_auto_commit,value_deserializer=lambda v: json.loads(v.decode("utf-8")),key_deserializer=lambda v: None if v is None else v.decode("utf-8"),max_poll_records=max_poll_records,# 稳定性 & 吞吐session_timeout_ms=session_timeout_ms,heartbeat_interval_ms=heartbeat_interval_ms,request_timeout_ms=request_timeout_ms,fetch_min_bytes=fetch_min_bytes,fetch_max_wait_ms=fetch_max_wait_ms,fetch_max_bytes=fetch_max_bytes,# 控制退出consumer_timeout_ms=consumer_timeout_ms,# 退避重连retry_backoff_ms=retry_backoff_ms,reconnect_backoff_ms=reconnect_backoff_ms,reconnect_backoff_max_ms=reconnect_backoff_max_ms,)# if security:#     params.update(dict(#         security_protocol="SASL_SSL",#         sasl_mechanism="PLAIN",#         sasl_plain_username=security["username"],#         sasl_plain_password=security["password"],#     ))return KafkaConsumer(topic, **params)# ─────────────────────────── 小工具 ───────────────────────────def partitions_of(producer: KafkaProducer, topic: str):"""获取主题分区集合(None/空集合 => 元数据未取到或主题不存在)"""return producer.partitions_for(topic) or set()def close_safely(x):try:x.flush(timeout=30) if hasattr(x, "flush") else Noneexcept Exception:passtry:x.close()except Exception:pass

生产者:

#!/usr/bin/env python3
"""
Kafka Producer 示例 - 发送消息到 demo.hello 主题
演示:消息键、自定义头部、错误处理、配置调优
"""
import os, time
import json
import random
from datetime import datetime
from kafka.errors import KafkaTimeoutError, KafkaError
from common import make_producer, BOOTSTRAP,wait_kafkaTOPIC = "demo.hello"def send_messages_with_keys():"""发送带键的消息,演示分区分配"""print("🚀 启动带键的生产者...")# 创建生产者,启用幂等性producer = make_producer(acks="all",  # 等待所有副本确认enable_idempotence=True,  # 启用幂等性,避免重复linger_ms=10,  # 批量发送延迟batch_size=16384,  # 批次大小compression_type="gzip"  # 启用压缩)# 定义一些用户ID作为消息键user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]try:for i in range(10):# 随机选择用户ID作为键key = random.choice(user_ids)# 构造消息内容message = {"id": i + 1,"user_id": key,"message": f"Hello from producer! Message #{i + 1}","timestamp": datetime.now().isoformat(),"source": "lesson01_producer"}# 添加自定义头部headers = [("message_type", b"greeting"),("priority", b"normal"),("batch_id", str(i // 3).encode())]# 发送消息future = producer.send(topic="demo.hello",key=key,value=message,headers=headers)# 等待发送完成并检查结果record_metadata = future.get(timeout=10)print(f"✅ 消息 {i + 1} 发送成功: "f"topic={record_metadata.topic}, "f"partition={record_metadata.partition}, "f"offset={record_metadata.offset}, "f"key={key}")# 模拟消息间隔time.sleep(0.5)except Exception as e:print(f"❌ 发送消息时出错: {e}")finally:# 确保所有消息都发送完成producer.flush()producer.close()print("🔒 生产者已关闭")def send_batch_messages():"""批量发送消息(预热 + 更稳的超时/重试 + gzip)"""print("\n📦 启动批量消息生产者...")print(f"🔌 BOOTSTRAP = {BOOTSTRAP}")# 1) 使用更稳的参数;先用 gzip,排除 snappy 干扰producer = make_producer(acks="all",               # 单副本等价于1,稳定一点linger_ms=50,             # 避免首条等太久batch_size=32768,compression_type="gzip",# 如果你在 common.py 里支持以下键,建议设置;否则忽略# request_timeout_ms=60000,# retries=8,# retry_backoff_ms=300,# max_in_flight_requests_per_connection=5,)# 2) 元数据/Topic 自检parts = producer.partitions_for(TOPIC)if not parts:print(f"❌ 无法获取 {TOPIC} 的分区信息;请先创建主题或检查 Kafka 监听地址。")producer.close()returnprint(f"🧩 主题 {TOPIC} 分区: {sorted(parts)} | 压缩: gzip")# 3) 预热发送(确保连接/元数据/leader 完全就绪)try:producer.send(TOPIC, value={"warmup": True}).get(timeout=30)producer.flush(timeout=30)print("🔥 预热完成")except Exception as e:print(f"❌ 预热失败:{e}")producer.close()return# 4) 准备批量数据messages = [{"batch_id": i // 10,"sequence": i + 1,"content": f"Batch message #{i + 1}","timestamp": datetime.now().isoformat()} for i in range(50)]# 5) 异步发送 + 最后统一 flush;只在 flush 后检查结果start = time.time()futures = [producer.send(TOPIC, value=m) for m in messages]# 6) 等待缓冲区落盘errors = 0try:producer.flush(timeout=60)# 可选:逐条确认(此时元数据/连接稳定,失败概率更低)for i, fut in enumerate(futures, 1):try:fut.get(timeout=10)if i % 10 == 0:print(f"📊 已确认 {i}/{len(messages)} 条")except (KafkaTimeoutError, KafkaError) as e:errors += 1print(f"❌ 第 {i} 条失败:{e}")finally:duration = time.time() - startok = len(messages) - errorstput = ok / duration if duration > 0 else 0print(f"🎯 批量发送完成:成功 {ok}/{len(messages)},耗时 {duration:.2f}s,吞吐 {tput:.1f} msg/s")producer.close()print("🔒 批量生产者已关闭")def main():"""主函数"""print("🎬 Kafka Producer 示例开始")print("=" * 50)# 等待Kafka就绪print("⏳ 等待Kafka服务就绪...")wait_kafka()print("✅ Kafka服务已就绪")# 发送带键的消息send_messages_with_keys()# 发送批量消息send_batch_messages()print("\n🎉 所有消息发送完成!")print("💡 提示:现在可以启动消费者来接收这些消息")if __name__ == "__main__":main()

消费者:

#!/usr/bin/env python3
"""
Kafka Consumer 示例 - 从 demo.hello 主题消费消息
演示:消息过滤、偏移量管理、错误处理、消费者组管理
"""import json
import time
import signal
from datetime import datetime
from common import make_consumer, wait_kafka# ────────────── Headers 工具函数 ──────────────
def _headers_map(msg):"""将 message.headers 转为 {str: bytes} 的字典"""# kafka-python: headers 是 List[Tuple[str, bytes]]return dict(msg.headers or [])def _hget(headers: dict, key: str, default=None, decode=True):"""从 headers 获取 key;默认把 bytes 解码成 str。"""val = headers.get(key, default)if decode and isinstance(val, (bytes, bytearray)):try:return val.decode("utf-8")except Exception:return defaultreturn val# ────────────── 消费者类 ──────────────
class KafkaMessageConsumer:"""Kafka 消息消费者类"""def __init__(self, topic, group_id, auto_commit=True):self.topic = topicself.group_id = group_idself.auto_commit = auto_commitself.running = Trueself.message_count = 0self.start_time = time.time()# 设置信号处理,优雅关闭signal.signal(signal.SIGINT, self.signal_handler)signal.signal(signal.SIGTERM, self.signal_handler)# 创建消费者self.consumer = make_consumer(topic=topic,group_id=group_id,auto_offset_reset="earliest",  # 从最早的消息开始消费(首次启动或重新处理历史数据)enable_auto_commit=auto_commit,# 是否自动提交偏移量(False=手动提交,更安全)max_poll_records=100,          # 每次拉取的最大消息数(平衡吞吐量与内存使用)session_timeout_ms=30000,      # 会话超时时间30秒(心跳丢失多久认为消费者死亡)heartbeat_interval_ms=3000,    # 心跳间隔3秒(通常为session_timeout的1/3))print(f"🎯 消费者已创建: topic={topic}, group_id={group_id}")def signal_handler(self, signum, frame):"""信号处理函数,优雅关闭"""print(f"\n🛑 收到信号 {signum},正在优雅关闭...")self.running = Falsedef process_message(self, message):"""处理单条消息"""try:value = message.valuekey = message.keypartition = message.partitionoffset = message.offsettimestamp = message.timestampheaders_raw = _headers_map(message)  # {str: bytes}headers = {k: (v.decode("utf-8", "ignore") if isinstance(v, (bytes, bytearray)) else v)for k, v in headers_raw.items()}self.message_count += 1# 格式化输出print(f"\n📨 消息 #{self.message_count}")print(f"   主题: {message.topic}")print(f"   分区: {partition}")print(f"   偏移量: {offset}")print(f"   键: {key}")print(f"   时间戳: {datetime.fromtimestamp(timestamp/1000) if timestamp else 'N/A'}")print(f"   头部: {headers}")print(f"   内容: {json.dumps(value, ensure_ascii=False, indent=2)}")# 示例业务逻辑if isinstance(value, dict):if "user_id" in value:print(f"   👤 用户消息: {value['user_id']}")if "batch_id" in value:print(f"   📦 批量消息: 批次 {value['batch_id']}")# 判断优先级priority = _hget(headers_raw, "priority", "normal")if priority == "high":print("   ⚠️ 高优先级消息!")# 模拟处理耗时time.sleep(0.1)except Exception as e:print(f"❌ 处理消息时出错: {e}")def filter_messages(self, message):"""消息过滤逻辑"""try:value = message.valueheaders_raw = _headers_map(message)# 条件1:只处理 message_type=greetingmsg_type = _hget(headers_raw, "message_type", None)if msg_type is not None and msg_type != "greeting":return False# 条件2:只处理特定用户if isinstance(value, dict) and "user_id" in value:return value["user_id"] in {"user_001", "user_002"}return Trueexcept Exception as e:print(f"❌ 消息过滤出错: {e}")return True  # 出错时默认放行def commit_offsets(self):"""手动提交偏移量"""if not self.auto_commit:try:self.consumer.commit()print("💾 偏移量已手动提交")except Exception as e:print(f"❌ 提交偏移量失败: {e}")def print_stats(self):"""打印统计信息"""if self.message_count > 0:duration = time.time() - self.start_timethroughput = self.message_count / durationprint(f"\n📊 消费统计:")print(f"   总消息数: {self.message_count}")print(f"   运行时间: {duration:.2f} 秒")print(f"   吞吐量: {throughput:.2f} 消息/秒")def run(self):"""运行消费者"""print(f"🚀 开始消费消息... (按 Ctrl+C 停止)")print("=" * 60)try:while self.running:# 拉取消息messages = self.consumer.poll(timeout_ms=1000, max_records=10)for _, partition_messages in messages.items():for message in partition_messages:if not self.running:breakif self.filter_messages(message):self.process_message(message)# 定期手动提交偏移量if not self.auto_commit and self.message_count % 10 == 0:self.commit_offsets()if self.message_count > 0 and self.message_count % 20 == 0:print(f"💡 已处理 {self.message_count} 条消息...")except KeyboardInterrupt:print("\n🛑 用户中断,正在关闭...")except Exception as e:print(f"❌ 消费过程中出错: {e}")finally:self.cleanup()def cleanup(self):"""清理资源"""try:self.print_stats()if not self.auto_commit:self.commit_offsets()self.consumer.close()print("🔒 消费者已关闭")except Exception as e:print(f"❌ 清理资源时出错: {e}")# ────────────── 主入口 ──────────────
def main():print("🎬 Kafka Consumer 示例开始")print("=" * 50)topic = "demo.hello"group_id = "lesson01_consumer_group"auto_commit = False  # 手动提交,便于观察print("⏳ 等待Kafka服务就绪...")wait_kafka()print("✅ Kafka服务已就绪")consumer = KafkaMessageConsumer(topic, group_id, auto_commit)try:consumer.run()except Exception as e:print(f"❌ 消费者运行出错: {e}")print("\n👋 消费者示例结束")if __name__ == "__main__":main()

演示demo:

#!/usr/bin/env python3
"""
Kafka 消费者组演示 - 展示分区分配和负载均衡
演示:多个消费者如何共享分区、消费者组的行为
"""import json
import time
import signal
import sys
import threading
from datetime import datetime
from common import make_consumer, wait_kafkaclass GroupConsumer:"""消费者组中的单个消费者"""def __init__(self, consumer_id, topic, group_id, auto_commit=True):self.consumer_id = consumer_idself.topic = topicself.group_id = group_idself.auto_commit = auto_commitself.running = Trueself.message_count = 0self.start_time = time.time()# 创建消费者self.consumer = make_consumer(topic=topic,group_id=group_id,auto_offset_reset="earliest",    # 从最早的消息开始消费enable_auto_commit=auto_commit,  # 自动提交偏移量设置max_poll_records=50,             # 每次拉取50条消息(演示用,实际可调整)session_timeout_ms=30000,        # 30秒会话超时heartbeat_interval_ms=3000       # 3秒心跳间隔)print(f"👤 消费者 {consumer_id} 已创建: group_id={group_id}")def process_message(self, message):"""处理消息"""try:value = message.valuekey = message.keypartition = message.partitionoffset = message.offsetself.message_count += 1# 格式化输出,显示消费者IDprint(f"[{self.consumer_id}] 📨 消息 #{self.message_count} "f"| 分区:{partition} | 偏移量:{offset} | 键:{key}")if isinstance(value, dict):if "user_id" in value:print(f"   👤 用户: {value['user_id']}")if "batch_id" in value:print(f"   📦 批次: {value['batch_id']}")# 模拟处理时间time.sleep(0.2)except Exception as e:print(f"[{self.consumer_id}] ❌ 处理消息出错: {e}")def run(self):"""运行消费者"""print(f"[{self.consumer_id}] 🚀 开始消费...")try:while self.running:messages = self.consumer.poll(timeout_ms=1000, max_records=10)for topic_partition, partition_messages in messages.items():for message in partition_messages:if not self.running:breakself.process_message(message)# 定期打印状态if self.message_count > 0 and self.message_count % 10 == 0:print(f"[{self.consumer_id}] 💡 已处理 {self.message_count} 条消息")except Exception as e:print(f"[{self.consumer_id}] ❌ 运行出错: {e}")finally:self.cleanup()def stop(self):"""停止消费者"""self.running = Falsedef cleanup(self):"""清理资源"""try:if self.message_count > 0:duration = time.time() - self.start_timethroughput = self.message_count / durationprint(f"[{self.consumer_id}] 📊 统计: {self.message_count} 条消息, "f"吞吐量 {throughput:.1f} 消息/秒")if not self.auto_commit:self.consumer.commit()self.consumer.close()print(f"[{self.consumer_id}] 🔒 已关闭")except Exception as e:print(f"[{self.consumer_id}] ❌ 清理出错: {e}")def run_consumer_group(num_consumers=3, topic="demo.hello", group_id="demo_group"):"""运行消费者组"""print(f"🎬 启动消费者组演示")print(f"   主题: {topic}")print(f"   消费者组: {group_id}")print(f"   消费者数量: {num_consumers}")print("=" * 60)# 等待Kafka就绪print("⏳ 等待Kafka服务就绪...")wait_kafka()print("✅ Kafka服务已就绪")# 创建消费者列表consumers = []threads = []try:# 创建并启动所有消费者for i in range(num_consumers):consumer_id = f"Consumer-{i+1}"consumer = GroupConsumer(consumer_id, topic, group_id, auto_commit=False)consumers.append(consumer)# 在新线程中运行消费者thread = threading.Thread(target=consumer.run, daemon=True)threads.append(thread)thread.start()# 间隔启动,便于观察分区分配time.sleep(2)print(f"\n🎯 所有消费者已启动,观察分区分配情况...")print("💡 提示:相同键的消息会分配到相同分区")print("💡 提示:按 Ctrl+C 停止所有消费者")print("-" * 60)# 等待用户中断while True:time.sleep(1)except KeyboardInterrupt:print(f"\n🛑 收到中断信号,正在停止所有消费者...")except Exception as e:print(f"❌ 运行出错: {e}")finally:# 停止所有消费者for consumer in consumers:consumer.stop()# 等待所有线程结束for thread in threads:thread.join(timeout=5)print("\n🎉 消费者组演示结束")def main():"""主函数"""print("🎬 Kafka 消费者组演示")print("=" * 50)# 配置参数topic = "demo.hello"group_id = "lesson01_demo_group"num_consumers = 3  # 建议与主题分区数相同print("📋 演示说明:")print("1. 启动多个消费者,观察分区如何分配")print("2. 相同键的消息会分配到相同分区")print("3. 每个分区只会被组内一个消费者消费")print("4. 观察负载均衡效果")print()try:run_consumer_group(num_consumers, topic, group_id)except Exception as e:print(f"❌ 演示运行出错: {e}")print("\n👋 演示结束")if __name__ == "__main__":main()

kafka1

📖 扩展阅读

  • Kafka 官方文档
  • kafka-python 文档
  • Kafka 设计原理

👉 下一篇课程:Kafka Topic 与 Partition 深入理解 —— 我们将通过实验观察"消息如何分区"和"消费者组如何分工"。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/95517.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/95517.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Conda相关的用法

1、背景 此文主要记录conda的一些用法&#xff0c;大部分命令来自ai搜索以及自己的理解。 2、安装conda 2.1 选择 conda 版本 2.1.1 Anaconda 含有 Conda 大量科学计算包&#xff08;NumPy、Pandas、Matplotlib 等&#xff09;适合数据科学、机器学习初学者下载地址&…

数据库选择有讲究?SQLite、PostgreSQL还是MySQL?

不同规模的项目&#xff0c;数据库选择有讲究。大家好&#xff0c;我是技术支持彼得&#xff0c;每天两眼一睁就是为客户解决问题。在日常使用我们的视频平台时&#xff0c;很多用户会问到数据库选择的问题。今天就来详细说说EasyGBS、EasyNVR和EasyCVR三大平台该如何选择数据库…

在VMware的Win10虚拟机中安装使用ENSP

VMware安装Windows10 安装ENSP及相关软件 把安装ENSP所使用的相关复制到已安装好的Windows10虚拟机中&#xff0c;如下图所示。 安装VirtualBox 安装时请确保路径为英文目录&#xff0c;并在出现"安装设备软件"或"Oracle USB设备"提示时选择安装选项。具…

Go 语言面试题详解之接口 (Interface) 详解一文吃透

自古流传着一个传言...在 Go 语言面试的时候必有人会问接口&#xff08;interface&#xff09;的实现原理。这又是为什么&#xff1f;为何对接口如此执着&#xff1f;实际上&#xff0c;Go 语言的接口设计在整体扮演着非常重要的角色&#xff0c;没有他&#xff0c;很多程序估计…

ansible循环+判断(with,loop,when,if,for)

一、文档核心定位 本文档聚焦Ansible自动化运维中的两大核心功能——循环与判断&#xff0c;通过“功能说明完整Playbook代码”的形式&#xff0c;覆盖循环迭代场景&#xff08;列表、字典、文件等&#xff09;、数据处理过滤器&#xff08;字符串、数字、加密等&#xff09;、…

在linux下使用MySQL常用的命令集合

1. 数据库查看和选择-- 查看所有数据库 SHOW DATABASES;-- 选择使用某个数据库&#xff08;需要修改&#xff1a;your_database_name&#xff09; USE your_database_name;-- 查看当前正在使用的数据库 SELECT DATABASE();说明&#xff1a;your_database_name 替换为你要操作的…

mysy2使用

参考链接 https://blog.csdn.net/qq_36525177/article/details/115279468 介绍 要把linux程序在windows上编译&#xff0c;且最好兼容posix标准&#xff0c;就用msys2。 使用 1、先下载安装&#xff0c;我装在D:\mysy2 2、打开vscode&#xff0c;不要切换目录&#xff0c;…

【Protues仿真】基于AT89C52单片机的温湿度测量

目录 0案例视频效果展示 1DHT11温度湿度传感器 1.1传感器简介 1.2引脚定义&#xff08;从左到右&#xff0c;面对网格面&#xff09; 1.3时序 & 校验&#xff08;原理速览&#xff09; 1.4常见故障排查 2 DHT11温度湿度传感器数据 2.1 DHT11温度湿度传感器数据格式…

JavaScript箭头函数与普通函数:两种工作方式的深度解析

文章目录JavaScript箭头函数与普通函数&#xff1a;两种"工作方式"的深度解析 &#x1f3f9;&#x1f19a;&#x1f468;&#x1f4bc;引言&#xff1a;为什么需要箭头函数&#xff1f;核心区别全景图对比表格&#xff1a;箭头函数 vs 普通函数关系示意图一、this绑定…

蓝光三维扫描技术赋能内衣胸垫设计:从精准制造到个性化体验的革新之旅

在竞争激烈的内衣市场中&#xff0c;产品设计的精准性、舒适度和个性化已成为品牌制胜的关键。传统内衣设计依赖主观经验与样品反复调整&#xff0c;不仅周期长、成本高&#xff0c;且难以实现对复杂胸型的精准适配。为应对这一挑战&#xff0c;某知名内衣品牌采用新拓三维XTOM…

内存保护单元MPU

一、介绍内存保护单元 是一种硬件模块&#xff0c;通常集成在处理器内核中&#xff0c;用于管理和管理对内存的访问&#xff0c;以提高系统的可靠性和安全性。它的核心任务是保护。想象一下&#xff0c;一个操作系统中有多个任务在运行&#xff1a;* 任务A的代码 bug 可能会错误…

【Kubernetes知识点】监控升级,备份及Kustomize管理

目录 1.举例说明K8s中都有哪些常规的维护管理操作。 2.如何升级K8s到新的版本&#xff1f;在升级过程中应该注意哪些事项&#xff1f; 3.解释ETCD及其备份和恢复的过程。 4.Kustomization在Kubernetes中的作用 1.举例说明K8s中都有哪些常规的维护管理操作。 集群状态监控…

《Effective Java》第4条:通过私有构造器强化不可实例化的能力

说明&#xff1a; 关于本博客使用的书籍&#xff0c;源代码Gitee仓库 和 其他的相关问题&#xff0c;请查看本专栏置顶文章&#xff1a;《Effective Java》第0条&#xff1a;写在前面&#xff0c;用一年时间来深度解读《Effective Java》这本书 正文&#xff1a; 原文P15&am…

20.Linux进程信号(一)

信号: 产生->保存->处理一、预备知识信号vs信号量->没有任何关系什么叫做信号&#xff1f;中断我们正在做的事情&#xff0c;是一种事件的异步通知机制。同步和异步理解&#xff1a;同步指事件发生具有一定的顺序性&#xff08;如命名管道中服务端读方式打开会阻塞&am…

【C++】Vector核心实现:类设计到迭代器陷阱

vector 模拟实现代码的核心下面从类设计、核心接口、内存安全、常见陷阱、测试场景5 个维度&#xff0c;提炼需重点掌握的知识点&#xff0c;覆盖面试高频考点与实践易错点&#xff1a;一、类结构与成员变量&#xff08;基础框架&#xff09;vector 的核心是通过三个迭代器&…

并发编程指南 内存模型

文章目录5.1 内存模型5.1.1 对象和内存位置5.1.2 对象、内存位置和并发5.1.3 修改顺序5.1 内存模型 内存模型&#xff1a;一方面是内存布局&#xff0c;另一方面是并发。并发的基本结构很重要&#xff0c;特别是低层原子操作。因为C所有的对象都和内存位置有关&#xff0c;所以…

血缘元数据采集开放标准:OpenLineage Integrations Compatibility Tests Structure

OpenLineage 是一个用于元数据和血缘采集的开放标准&#xff0c;专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业&#xff08;Job&#xff09;、运行实例&#xff08;Run&#xff09;和数据集&#xff08;Dataset&#xff09; 组成的通用模型&#xff0…

执行一条select语句期间发生了什么?

首先是连接器的工作&#xff0c;嗯&#xff0c;与客户端进行TCP三次握手建立连接&#xff0c;校验客户端的用户名和密码&#xff0c;如果用户名和密码都对了&#xff0c;那么就会检查该用户的权限&#xff0c;之后执行的所有SQL语句都是基于该权限接着客户端就可以向数据库发送…

element el-select 默认选中数组的第一个对象

背景&#xff1a;在使用element组件的时候&#xff0c;我们期望默认选中第一个数值。这里我们默认下拉列表绑定的lable是中文文字&#xff0c;value绑定的是数值。效果展示&#xff1a;核心代码&#xff1a;<template><el-select v-model"selectValue" plac…

【论文阅读】LightThinker: Thinking Step-by-Step Compression (EMNLP 2025)

论文题目&#xff1a;LightThinker: Thinking Step-by-Step Compression 论文来源&#xff1a;EMNLP 2025&#xff0c;CCF B 论文作者&#xff1a; 论文链接&#xff1a;https://arxiv.org/abs/2502.15589 论文源码&#xff1a;https://github.com/zjunlp/LightThinker 一、…