Kafka 消息模式实战:从简单队列到流处理(二)

四、Kafka 流处理实战

4.1 Kafka Streams 简介

Kafka Streams 是 Kafka 提供的流处理库,它为开发者提供了一套简洁而强大的 API,用于构建实时流处理应用程序。Kafka Streams 基于 Kafka 的高吞吐量、分布式和容错特性,能够处理大规模的实时数据流,并提供低延迟的处理能力。

Kafka Streams 的设计理念是将流处理逻辑简化为一系列的操作,开发者可以使用类似于 SQL 的语法来定义这些操作,从而实现复杂的流处理任务。它支持有状态和无状态的处理,并且能够自动管理分布式环境下的状态存储和故障恢复。

4.2 流处理拓扑(Topology)

流处理拓扑定义了流处理的逻辑和流程,它是一个有向无环图(DAG),由数据源(Source)、处理器(Processor)和接收器(Sink)组成。

  • 数据源:数据源是拓扑的起点,它从 Kafka 主题中读取数据,并将数据发送给下游的处理器。数据源可以是一个或多个 Kafka 主题。
  • 处理器:处理器是拓扑的核心组件,它对输入的数据进行处理和转换。处理器可以执行各种操作,如过滤、映射、聚合、连接等。一个拓扑中可以包含多个处理器,它们按照顺序依次对数据进行处理。
  • 接收器:接收器是拓扑的终点,它将处理后的结果数据发送到 Kafka 主题或其他外部系统中。接收器可以是一个或多个 Kafka 主题,也可以是其他类型的输出目标,如文件系统、数据库等。

4.3 单词计数示例

下面我们通过一个 Java 代码示例,展示如何使用 Kafka Streams 实现单词计数功能。在这个示例中,我们从一个 Kafka 主题读取文本数据,对每个单词进行计数,并将结果输出到另一个 Kafka 主题。

首先,在 Maven 项目的pom.xml文件中添加 Kafka Streams 依赖:

 

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-streams</artifactId>

<version>3.5.1</version>

</dependency>

接下来,编写实现单词计数功能的代码:

 

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.kstream.KStream;

import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;

import java.util.Properties;

public class WordCountExample {

public static void main(String[] args) {

// 配置Kafka Streams应用

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 构建流处理拓扑

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream("input-topic");

KTable<String, Long> wordCounts = source

.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

.filter((key, word) ->!word.isEmpty())

.groupBy((key, word) -> word)

.count();

wordCounts.toStream().to("output-topic",

org.apache.kafka.streams.kstream.Produced.with(Serdes.String(), Serdes.Long()));

// 创建并启动Kafka Streams实例

KafkaStreams streams = new KafkaStreams(builder.build(), props);

streams.start();

// 添加关闭钩子,在程序终止时优雅地关闭Kafka Streams

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}

}

在上述代码中:

  • 首先配置了 Kafka Streams 应用的基本属性,包括应用 ID、Kafka 集群地址以及默认的键和值序列化器。
  • 然后使用StreamsBuilder构建流处理拓扑。从input-topic主题读取数据,将每行文本拆分成单词,过滤掉空单词,按单词分组并计数。
  • 最后将计数结果转换为流,并输出到output-topic主题。
  • 创建并启动KafkaStreams实例,并添加关闭钩子,确保程序在终止时能够优雅地关闭 Kafka Streams。

4.4 高级功能

Kafka Streams 提供了许多高级功能,使其能够满足复杂的实时流处理需求。

窗口操作:窗口操作允许在特定的时间范围内对流数据进行聚合和计算。Kafka Streams 支持固定窗口(Tumbling Window)、滑动窗口(Hopping Window)和会话窗口(Session Window)。例如,使用固定窗口计算每 5 分钟内的订单数量:

 

KTable<Windowed<String>, Long> windowedCounts = source

.groupByKey()

.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))

.count();

连接操作:连接操作可以将多个流或表的数据进行合并。Kafka Streams 支持内连接(Inner Join)、左连接(Left Join)和外连接(Outer Join)。例如,将用户信息表和订单流进行连接,获取每个订单对应的用户信息:

 

KTable<String, User> userTable = builder.table("user-topic");

KStream<String, Order> orderStream = builder.stream("order-topic");

KStream<String, OrderWithUser> joinedStream = orderStream.join(userTable,

(order, user) -> new OrderWithUser(order, user));

状态存储:Kafka Streams 支持有状态处理,能够在处理过程中保存中间状态。状态存储可以保存在内存中或使用 RocksDB 持久化存储。例如,在单词计数示例中,count操作会将计数结果存储在状态存储中,以便后续查询和更新:

 

KTable<String, Long> wordCounts = source

.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

.filter((key, word) ->!word.isEmpty())

.groupBy((key, word) -> word)

.count(Materialized.as("word-count-store"));

容错处理:Kafka Streams 内置了容错机制,能够自动处理数据丢失、节点故障等问题,保证数据处理的一致性和完整性。它会将应用程序的状态保存到 Kafka 中,以便在发生故障时恢复状态。当某个 Kafka Streams 实例发生故障时,其他实例可以接管其工作,继续处理数据,确保流处理任务的连续性。

五、总结与展望

在本次 Kafka 消息模式的探索之旅中,我们从简单队列起步,逐步深入到流处理的复杂领域,全面领略了 Kafka 作为强大分布式消息系统的魅力与实力。

在简单队列场景中,Kafka 展现了其作为消息队列的基础能力。通过搭建 Kafka 和 Zookeeper 环境,我们顺利创建主题,实现了生产者与消费者之间的消息传递。生产者可以灵活地选择同步或异步方式发送消息,消费者则通过自动或手动提交偏移量来确保消息的可靠消费。这种简单而高效的消息队列模式,在许多应用场景中发挥了关键作用,如解耦系统组件、实现异步通信以及流量控制等,为构建稳定、可扩展的应用架构提供了有力支持。

而当我们踏入 Kafka 流处理的世界,更是发现了其无限的潜力。Kafka Streams 提供了一套简洁而强大的 API,使我们能够轻松构建实时流处理应用。通过单词计数示例,我们看到了如何从 Kafka 主题读取数据,对数据进行处理和转换,并将结果输出到其他主题。窗口操作、连接操作、状态存储以及容错处理等高级功能,进一步拓展了 Kafka 流处理的应用范围,使其能够应对各种复杂的实时数据处理需求,如实时监控、实时推荐、欺诈检测等。

展望未来,Kafka 在大数据和实时处理领域的发展前景一片光明。随着技术的不断进步,Kafka 有望在以下几个方面取得更大的突破:

  • 流处理能力持续增强:Kafka Streams 和 KSQL 将不断进化,提供更强大的功能和更高的性能。未来,它们可能会支持更多复杂的流处理任务,以及更多 SQL 特性,使开发者能够更加便捷地处理实时数据流。
  • 云原生支持不断深化:随着 Kubernetes 等云原生技术的普及,Kafka 将更好地融入云原生环境。未来,Kafka 在 Kubernetes 上的部署和管理将变得更加简单,资源利用将更加高效,弹性扩展能力也将进一步增强,为企业在云端构建实时数据处理平台提供更优质的解决方案。
  • 多租户支持更加完善:为了满足多租户环境下的应用需求,Kafka 将进一步增强其安全性和隔离性。通过更细粒度的访问控制和配额管理,Kafka 将确保不同租户之间的数据和资源得到有效隔离,同时提供更好的审计和监控功能,保障多租户环境的稳定运行。
  • 运维和监控工具不断优化:Kafka 将持续提升其运维和监控工具的能力,增强 Kafka Manager、Confluent Control Center 等工具的功能,并与 Prometheus、Grafana 等主流监控系统实现更好的集成,为用户提供更全面、更实时的监控和报警机制,降低 Kafka 集群的运维成本。
  • 存储引擎持续演进:分层存储(Tiered Storage)等新技术的应用,将使 Kafka 能够将数据分层存储到不同的存储介质上,从而降低存储成本并提高存储效率。未来,Kafka 的存储引擎可能会进一步优化,以适应不断增长的数据量和多样化的存储需求。
  • 性能和可靠性进一步提升:Kafka 社区正在考虑引入 Raft 协议来替代目前的 ZooKeeper 协议,这将有望简化 Kafka 的部署和管理,并提供更高的可用性和一致性保障。此外,Kafka 还可能在数据处理速度、容错能力等方面进行优化,以满足对性能和可靠性要求极高的应用场景。
  • 智能数据路由和处理成为趋势:借助机器学习和人工智能技术,Kafka 未来可能会实现智能数据路由和处理。通过动态调整数据路由策略,Kafka 能够更高效地处理和分发数据,提高整个系统的性能和效率,为用户提供更加智能化的实时数据处理服务。

Kafka 作为大数据和实时处理领域的重要工具,将继续引领技术发展的潮流。无论是在简单队列场景还是复杂的流处理应用中,Kafka 都将发挥不可替代的作用,为企业的数字化转型和创新发展提供强大的技术支持。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/83161.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VAS1086Q 奇力科技线性芯片车规用品LED驱动芯片

一、产品概述 名称与定位&#xff1a;VAS1086Q 是奇力科技&#xff08;Chiplead Technology&#xff09;推出的汽车级恒流 LED 驱动器&#xff0c;属于 Value Added Solutions 系列&#xff0c;专为汽车 LED 照明应用提供高性价比方案。 核心功能&#xff1a; 支持 10~400mA 可…

适应性Java用于现代 API:REST、GraphQL 和事件驱动

在快速发展的软件开发领域&#xff0c;REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名&#xff0c;不断适应这些现代范式的需求。随着不断发展的生态系统&#xff0c;Java 在现代 API 方…

浮点数精度问题(CSP38思考)

CSP38的第一题&#xff0c;考到了浮点数的除法&#xff08;当然考完发现其实也可以不涉及浮点数&#xff0c;直接转化为整型&#xff09;&#xff0c;我第一题一直卡到70、80分&#xff0c;故写下此文。 浮点数的运算有精度损失问题&#xff0c;那么应该如何解决和避免呢&#…

F5 – TCP 连接管理:会话、池级和节点级操作

在 F5 BIG-IP 中,您可以在池成员级别或节点级别管理流向服务器的流量。节点级别状态会影响与该节点关联的所有池,而池成员状态则仅限于单个池。了解每种方法以及何时使用它们对于顺利进行维护窗口和流量管理至关重要。 池级状态:启用、禁用、强制离线、移除 在 BIG-IP 配置…

StoreView SQL,让数据分析不受地域限制

作者&#xff1a;章建&#xff08;处知&#xff09; 引言 日志服务 SLS 是云原生观测和分析平台&#xff0c;为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务。SLS 提供了多地域支持【1】&#xff0c;方便用户可以根据数据源就近接入 SLS 服务&#xff0c…

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…

Golang——10、日志处理和正则处理

日志处理和正则处理 1、logx日志处理1.1、logx简介1.2、日志初始化与配置1.3、常用方法1.4、配合defer捕获panic 2、正则处理2.1、正则表达式语法大全2.2、基本匹配2.3、常见函数使用2.4、从html提取汉字demo 1、logx日志处理 1.1、logx简介 logx 是 go-zero 框架中用于日志记…

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…

软件工程:如何做好软件产品

1、什么是产品 从项目到产品 产品&#xff1a;满足行业共性需求的标准产品。即要能够做到配置化的开发&#xff0c;用同一款产品最大限度地满足不同客户的需求&#xff0c;同时让产品具有可以快速响应客户需求变化的能力。 好的产品一定吸收了多个项目的共性&#xff0c;一定是…

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…

sqlsugar WhereIF条件的大于等于和等于查出来的坑

一、如下图所示&#xff0c;当我用 .WhereIF(input.Plancontroltype > 0, u > u.Plancontroltype (DnjqPlancontroltype)input.Plancontroltype) 这里面用等于的时候&#xff0c;返回结果一条数据都没有。 上图中生成的SQL如下&#xff1a; SELECT id AS Id ,code AS …

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…

React从基础入门到高级实战:React 实战项目 - 项目四:企业级仪表盘

React 实战项目&#xff1a;企业级仪表盘 欢迎来到 React 开发教程专栏 的第 29 篇&#xff01;在前 28 篇文章中&#xff0c;我们从 React 的基础概念逐步深入到高级技巧&#xff0c;涵盖了组件设计、状态管理、路由配置、性能优化和实时通信等核心内容。这一次&#xff0c;我…

STM32----IAP远程升级

一、概述&#xff1a; IAP&#xff0c;全称是“In-Application Programming”&#xff0c;中文解释为“在程序中编程”。IAP是一种对通过微控制器的对外接口&#xff08;如USART&#xff0c;IIC&#xff0c;CAN&#xff0c;USB&#xff0c;以太网接口甚至是无线射频通道&#…

模拟搭建私网访问外网、外网访问服务器服务的实践操作

目录 实验环境 实践要求 一、准备工作 1、准备四台虚拟机&#xff0c;分别标号 2、 防火墙额外添加两块网卡&#xff0c;自定义网络连接模式 3、 关闭虚拟机的图形管理工具 4、关闭防火墙 5、分别配置四台虚拟机的IP地址&#xff0c;此处举一个例子&#xff08;使用的临…

删除远程已经不存在但本地仍然存在的Git分支

1. 获取远程分支列表 首先&#xff0c;确保你获取了远程仓库的最新分支信息&#xff1a; git fetch -p -p 参数会自动清理本地仓库中那些在远程已经被删除的分支的引用。 2. 查看本地分支与远程分支的对比 运行以下命令来查看哪些本地分支没有对应的远程分支&#xff1a; …

GIT(AI回答)

在Git中&#xff0c;git push 命令主要用于将本地分支的提交推送到‌远程仓库‌&#xff08;如GitHub、GitLab等&#xff09;。如果你希望将本地分支的改动同步到另一个‌本地分支‌&#xff0c;这不是 git push 的设计目的。以下是正确的替代方法&#xff1a; 方法1&#xff1…

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…

React - 组件通信

组件通信 概念&#xff1a;组件通信就是组件之间数据传递&#xff0c;根据组件嵌套关系不同&#xff0c;有不同的通信方法 父传子 —— 基础实现 实现步骤 父组件传递数据 - 在子组件标签上绑定属性子组件接收数据 - 子组件通过props参数接收数据 声明子组件并使用 //声明子…