消息队列处理模式:流式与批处理的艺术

🌊 消息队列处理模式:流式与批处理的艺术

📌 深入解析现代分布式系统中的数据处理范式

一、流式处理:实时数据的"活水"

在大数据时代,流式处理已成为实时分析的核心技术。它将数据视为无限的流,而非有限的集合,实现了毫秒级的数据处理响应。

1️⃣ Kafka Streams核心概念

Kafka Streams是构建在Kafka之上的客户端库,提供了强大的流处理能力:

// Kafka Streams应用示例
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders-topic");// 过滤出大额订单并转换为通知消息
KStream<String, Notification> notifications = orders.filter((key, order) -> order.getAmount() > 10000).mapValues(order -> new Notification("大额订单提醒", order));// 输出到通知主题
notifications.to("notifications-topic");

核心抽象

  • KStream:代表无界、连续的记录流
  • KTable:可更新的数据表视图,支持查询
  • GlobalKTable:全局分布式表,适合小规模数据关联

2️⃣ 窗口计算与状态管理

流处理中,窗口是处理时间维度数据的关键机制:

窗口类型特点应用场景
滚动窗口固定大小,不重叠每分钟订单统计
滑动窗口固定大小,可重叠最近5分钟热门商品
会话窗口动态大小,基于活动间隔用户行为分析

状态存储

// 配置状态存储
StoreBuilder<KeyValueStore<String, Long>> countStore =Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("counts"),Serdes.String(),Serdes.Long());// 注册状态存储
builder.addStateStore(countStore);// 使用状态存储进行计算
orders.process(() -> new OrderProcessor(), "counts");

3️⃣ Exactly-Once实现

Kafka Streams通过事务和幂等生产者实现了端到端的精确一次语义:

// 配置Exactly-Once语义
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

实现原理

  • 消费者偏移量与处理结果在同一事务中提交
  • 幂等生产者确保重试不会导致重复
  • 事务协调器管理跨分区的原子性

二、批处理:大规模数据的"蓄水池"

批处理适合处理大量历史数据,或者定期执行的数据处理任务。

1️⃣ 消息积压处理策略

当消息堆积时,系统面临巨大压力,需要合理的处理策略:

// 消费者配置批量拉取
Properties props = new Properties();
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50MB

积压处理最佳实践

  • 临时扩容:增加消费者实例和分区数
  • 跳过非关键消息:设置过滤条件,优先处理重要消息
  • 批量压缩存储:将积压消息归档,延后处理

2️⃣ 消费者并行度调整

合理的并行度设计是批处理性能的关键:

// 动态调整消费者线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy()
);// 根据积压量动态调整线程数
if (getLagSize() > 10000) {executor.setCorePoolSize(executor.getCorePoolSize() + 5);
}

并行度优化公式

  • 理想并行度 = min(分区数, 可用CPU核心数 × (1 + I/O等待比率))
  • 消费者实例数 ≤ 分区数(避免资源浪费)

3️⃣ 背压控制机制

背压(Backpressure)是处理生产速度超过消费速度的关键机制:

// RxJava背压示例
Flowable.create(emitter -> {// 消息源for (Message msg : messageSource) {if (emitter.isCancelled()) return;// 检查背压while (!emitter.requested() > 0) {Thread.sleep(100);}emitter.onNext(msg);}emitter.onComplete();
}, BackpressureStrategy.BUFFER)
.onBackpressureBuffer(10000, () -> log.warn("缓冲区已满"))
.observeOn(Schedulers.io(), false, 512)
.subscribe(message -> process(message));

背压策略对比

策略描述适用场景
缓冲使用队列暂存过多消息短暂峰值,内存充足
丢弃丢弃无法处理的消息非关键数据,如监控
限流降低生产者发送速率关键业务,不允许丢失
采样只处理部分消息统计分析类应用

三、流批融合:未来的趋势

现代数据处理框架正在打破流处理和批处理的界限:

// Flink流批统一处理示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 批处理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 或流处理模式
// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 相同的代码,不同的执行模式
DataStream<Order> orders = env.fromSource(KafkaSource.<Order>builder().setTopics("orders").setValueOnlyDeserializer(new OrderDeserializer()).build(),WatermarkStrategy.noWatermarks(),"Kafka Orders"
);orders.keyBy(Order::getCustomerId).window(TumblingEventTimeWindows.of(Time.minutes(5))).aggregate(new OrderAggregator()).sinkTo(new DatabaseSink());

融合优势

  • 统一的编程模型,降低开发复杂度
  • 灵活切换处理模式,适应不同场景
  • 充分利用历史数据增强实时分析

🔍 关注我,每周解锁更多分布式系统与消息队列的技术干货!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/909032.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一起学习swin-transformer(一)

Transform学习链接 从零开始设计Transformer模型&#xff08;1/2&#xff09;——剥离RNN&#xff0c;保留Attention-CSDN博客 Transformer-PyTorch实战项目——文本分类_transformer文本分类 pytorch-CSDN博客 从零开始设计Transformer模型&#xff08;2/2&#xff09;——…

PyQt常用控件的使用:QFileDialog、QMessageBox、QTreeWidget、QRadioButton等

文章目录 一、控件常用函数介绍二、QFileDialog&#xff08;文件类操作&#xff09;三、QMessageBox(对话框)四、QTreeWidget&#xff08;树结构类操作&#xff09;4.1 树结构的初始化4.2 递归读取完整树结构4.3 两QTreeWidget滑轮同步滑动4.4 信号槽绑定 五、QCombox改写下拉多…

校园导航系统核心技术解析:高精度定位与 AR 实景导航的应用实践

本文面向校园信息化建设者、技术开发者及教育行业数字化转型关注者&#xff0c;旨在解析如何通过 “高精度定位 AR/VR 场景化服务” 技术体系&#xff0c;破解校区因建筑复杂、人流密集导致的寻路效率低下问题&#xff0c;探讨如何利用现有技术解决校园内导航难题&#xff0c;…

java大文件分段下载

后端代码 package com.jy.jy.controller;import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.a…

antd-vue - - - - - a-table排序

antd-vue - - - - - a-table排序 1. 重点代码:2. 代码示例&#xff1a;3. 进阶版写法 1. 重点代码: sorter: {compare: (a, b) > a.columnsKeys - b.columnsKeys,multiple: 1, },解析&#xff1a; compare: 自定义排序函数&#xff0c;用于比较两个对象。 multiple: 排序优…

【AI】模型vs算法(以自动驾驶为例)

模型vs算法&#xff08;以自动驾驶为例&#xff09; 一、自动驾驶的核心任务二、以自动驾驶为例&#xff0c;模型vs算法的实际分工1. 感知环节&#xff1a;“看懂”周围环境&#xff08;如识别行人、车道线、车辆&#xff09;2. 预测环节&#xff1a;“预判”其他交通参与者的行…

机器学习与深度学习19-线性代数02

目录 前文回顾6.协方差矩阵与主成分分析7.矩阵的奇异值分解8.神经网络的前向传播和反向传播9.矩阵的迹10.特征工程的多项式特征扩展 前文回顾 上一篇文章链接&#xff1a;地址 6.协方差矩阵与主成分分析 协方差矩阵是一个对称矩阵&#xff0c;用于衡量随机变量之间的线性相关…

青藏高原ASTER_GDEM数据集(2011)

共享方式&#xff1a;开放获取数据大小&#xff1a;73.69 GB数据时间范围&#xff1a;2012-04-08 — 2012-05-08元数据更新时间&#xff1a;2021-10-15 数据集摘要 ASTER Global Digital Elevation Model &#xff08;ASTER GDEM&#xff09;是美国航空航天局 &#xff08;NAS…

代码随想录训练营二十六天| 654.最大二叉树 617.合并二叉树 700.二叉搜索树的搜索 98.验证二叉搜索树

654.最大二叉树&#xff1a; 文档讲解&#xff1a;代码随想录|654.最大二叉树 视频讲解&#xff1a;又是构造二叉树&#xff0c;又有很多坑&#xff01;| LeetCode&#xff1a;654.最大二叉树_哔哩哔哩_bilibili 状态&#xff1a;已做出 思路&#xff1a; 这道题目要求使用给定…

临时抱佛脚v2

术语解释 多范式 (Multi-paradigm) 指支持多种编程范式&#xff0c;如面向对象编程和函数式编程&#xff0c;允许开发者根据需求选择最合适的风格。 函数式编程 (Functional Programming) 一种编程范式&#xff0c;将计算视为数学函数的求值&#xff0c;强调不变性、无副作用…

MCGS和1200plc变量表格式编辑

设备编辑窗口---设备信息导出---另存为xx.CSV文件 在上面导出的表格基础上编辑 本体位的编辑&#xff1a; db数据块位编辑 db数据块int类型 (4.14应改为4.140,0不省略) db数据块real类型 通道号&#xff0c;地址均按顺序排列 &#xff0c;寄存期地址最后一位0不能省略&#…

Android高性能音频与图形开发:OpenSL ES与OpenGL ES最佳实践

引言 在移动应用开发中&#xff0c;音频和图形处理是提升用户体验的关键要素。本文将深入探讨Android平台上两大核心多媒体API&#xff1a;OpenSL ES&#xff08;音频&#xff09;和OpenGL ES&#xff08;图形&#xff09;&#xff0c;提供经过生产环境验证的优化实现方案。 …

GaussDB分布式数据库调优方法总结:从架构到实践的全链路优化指南

GaussDB分布式数据库调优方法总结&#xff1a;从架构到实践的全链路优化指南 GaussDB作为华为自主研发的分布式数据库&#xff0c;基于MPP&#xff08;大规模并行处理&#xff09;架构设计&#xff0c;支持存储与计算分离、列存/行存混合引擎、向量化执行等核心技术&#xff0…

NLP学习路线图(三十九):对话系统

在人工智能领域,自然语言处理(NLP)无疑是推动人机交互革命的核心引擎。当清晨的闹钟响起,你轻声一句“小爱同学,关掉闹钟”;当开车迷路时说“嘿Siri,导航到最近加油站”;当深夜向客服机器人询问订单状态时——我们已在不知不觉中与对话系统建立了千丝万缕的联系。这类系…

Cambridge Pixel为警用反无人机系统(C-UAS)提供软件支持

警用 C-UAS 系统受益于 Cambridge Pixel 和 OpenWorks Engineering 的技术合作。 作为雷达数据处理和雷达目标跟踪的专家公司&#xff0c;Cambridge Pixel宣布与OpenWorks Engineering 合作&#xff0c;为警用系统提供先进的C-UAS系统。OpenWorks Engineering以创新的光学系统和…

【ArcGIS Pro微课1000例】0072:如何自动保存编辑内容及保存工程?

文章目录 一、自动保存编辑内容二、自动保存工程在使用ArcGIS或者ArcGIS Pro时,经常会遇到以下报错,无论点击【发送报告】,还是【不发送】,软件都会强制退出,这时如果对所操作没有保存,就会前功尽弃。 此时,自动保存工作就显得尤为重要,接下来讲解两种常见的自动保存方…

进行性核上性麻痹健康护理指南:全方位照护之道

进行性核上性麻痹&#xff08;PSP&#xff09;是一种罕见的神经系统变性疾病&#xff0c;会严重影响患者的生活质量。做好健康护理&#xff0c;能在一定程度上缓解症状&#xff0c;提高患者生活质量。 ​饮食护理是基础。患者常伴有吞咽困难&#xff0c;饮食应选择质地均匀、易…

第二节:Vben Admin v5 (vben5) Python-Flask 后端开发详解(附源码)

目录 前言项目准备项目结构应用创建应用工厂`vben5-admin-backend/app/__init__.py` 文件`vben5-admin-backend/app/config.py` 文件`vben5-admin-backend/app/.env` 文件`vben5-admin-backend/app/logging_config.py` 文件`vben5-admin-backend/app/start.py` 文件`vben5-admi…

从零打造前沿Web聊天组件:从设计到交互

作者现在制作一款网页端聊天室&#xff08;青春版&#xff09;&#xff0c;之前一直有这个想法&#xff0c;现在总算是迈出了第一步开始制作了… 雄关漫道真如铁&#xff0c;而今迈步从头越&#xff01; 启程 当前已经完成左侧聊天室列表显示&#xff0c;通过http://localhos…

计算机网络 : 传输层协议UDP与TCP

计算机网络 &#xff1a; 传输层协议UDP与TCP 目录 计算机网络 &#xff1a; 传输层协议UDP与TCP引言1. 传输层协议UDP1.2 UDP协议段格式1.3 UDP的特点1.4 面向数据报1.5 UDP的缓冲区1.6 基于UDP的应用层协议及使用注意事项 2. 传输层协议TCP2.1 再谈端口号2.2 TCP协议段格式2.…