高级Kafka应用之流处理

40 Kafka Streams与其他流处理平台的差异在哪里?

什么是流处理平台?

“Streaming Systems”一书是这么定义“流处理平台”的:流处理平台(Streaming System)是处理无限数据集(Unbounded Dataset)的数据处理引擎,而流处理是与批处理(Batch Processing)相对应的。

所谓的无限数据,是指数据永远没有尽头。流处理平台是专门处理这种数据集的系统或框架。当然,这并不是说批处理系统不能处理这种无限数据集,只是通常情况下,它更擅长处理有限数据集(Bounded Dataset)。

那流处理和批处理究竟该如何区分呢?下面这张图应该能帮助你快速且直观地理解它们的区别。

在这里插入图片描述
现在我来详细解释一下流处理和批处理的区别。

长期以来,流处理给人的印象通常是低延时,但是结果不准确。每来一条消息,它就能计算一次结果,但由于它处理的大多是无界数据,可能永远也不会结束,因此在流处理中,我们很难精确描述结果何时是精确的。理论上,流处理的计算结果会不断地逼近精确结果。

但是,它的竞争对手批处理则正好相反。批处理能提供准确的计算结果,但往往延时很高。

因此,业界的大神们扬长避短,将两者结合在一起使用。一方面,利用流处理快速地给出不那么精确的结果;另一方面,依托于批处理,最终实现数据一致性。这就是所谓的Lambda架构

延时低是个很好的特性,但如果计算结果不准确,流处理是无法完全替代批处理的。所谓计算结果准确,在教科书或文献中有个专属的名字,叫正确性(Correctness)。可以这么说,目前难以实现正确性是流处理取代批处理的最大障碍,而实现正确性的基石是精确一次处理语义(Exactly Once Semantics,EOS)。

这里的精确一次是流处理平台能提供的一类一致性保障。常见的一致性保障有三类:

  • 至多一次(At most once)语义:消息或事件对应用状态的影响最多只有一次。
  • 至少一次(At least once)语义:消息或事件对应用状态的影响最少一次。
  • 精确一次(Exactly once)语义:消息或事件对应用状态的影响有且只有一次。

注意,我这里说的都是对应用状态的影响。对于很多有副作用(Side Effect)的操作而言,实现精确一次语义几乎是不可能的。举个例子,假设流处理中的某个步骤是发送邮件操作,当邮件发送出去后,倘若后面出现问题要回滚整个流处理流程,已发送的邮件是没法追回的,这就是所谓的副作用。当你的流处理逻辑中存在包含副作用的操作算子时,该操作算子的执行是无法保证精确一次处理的。因此,我们通常只是保证这类操作对应用状态的影响精确一次罢了。后面我们会重点讨论Kafka Streams是如何实现EOS的。

我们今天讨论的流处理既包含真正的实时流处理,也包含微批化(Microbatch)的流处理。所谓的微批化,其实就是重复地执行批处理引擎来实现对无限数据集的处理。典型的微批化实现平台就是Spark Streaming。

Kafka Streams的特色

相比于其他流处理平台,Kafka Streams最大的特色就是它不是一个平台,至少它不是一个具备完整功能的平台,比如其他框架中自带的调度器和资源管理器,就是Kafka Streams不提供的。

Kafka官网明确定义Kafka Streams是一个Java客户端库你可以使用这个库来构建高伸缩性、高弹性、高容错性的分布式应用以及微服务。

使用Kafka Streams API构建的应用就是一个普通的Java应用程序。你可以选择任何熟悉的技术或框架对其进行编译、打包、部署和上线。

在我看来,这是Kafka Streams与Storm、Spark Streaming或Flink最大的区别。

Java客户端库的定位既可以说是特色,也可以说是一个缺陷。目前Kafka Streams在国内推广缓慢的一个重要原因也在于此。毕竟,很多公司希望它是一个功能完备的平台,既能提供流处理应用API,也能提供集群资源管理域调度方面的能力。所以,这个定位到底是特色还是缺陷,仁者见仁、智者见智吧。

Kafka Streams与其他框架的差异

接下来,我从应用部署、上下游数据源、协调方式和消息语义保障4个方面,详细分析一下Kafka Streams与其他框架的差异。

应用部署

首先,我们从流处理应用部署方式上对Kafka Streams及其他框架进行区分。

我们刚刚提到过,Kafka Streams应用需要开发人员自行打包和部署,你甚至可以将Kafka Streams应用嵌入到其他Java应用中。因此,作为开发者的你,除了要开发代码之外,还要自行管理Kafka Streams应用的生命周期,要么将其打包成独立的jar包单独运行,要么将流处理逻辑嵌入到微服务中,开放给其他服务调用。

但不论是哪种部署方式,你需要自己处理,不要指望Kafka Streams帮你做这些事情。

相反地,其他流处理平台则提供了完整的部署方案。我以Apache Flink为例来解释一下。在Flink中,流处理应用会被建模成单个的流处理计算逻辑,并封装进Flink的作业中。类似地,Spark中也有作业的概念,而在Storm中则叫拓扑。作业的生命周期由框架来管理,特别是在Flink中,Flink框架自行负责管理作业,包括作业的部署和更新等。这些都无需应用开发人员干预。

另外,Flink这类框架都存在资源管理器的角色。一个作业所需的资源完全由框架层的资源管理器来支持。常见的资源管理器,如YARN、Kubernetes、Mesos等,比较新的流处理框架(如Spark、Flink等)都是支持的。像Spark和Flink这样的框架,也支持Standalone集群的方式,即不借助于任何已有的资源管理器,完全由集群自己来管理资源。这些都是Kafka Streams无法提供的。

因此,从应用部署方面来看,Kafka Streams更倾向于将部署交给开发人员来做,而不是依赖于框架自己实现。

上下游数据源

简单来说,Kafka Streams目前只支持从Kafka读数据以及向Kafka写数据。在没有Kafka Connect组件的支持下,Kafka Streams只能读取Kafka集群上的主题数据,在完成流处理逻辑后也只能将结果写回到Kafka主题上。

反观Spark Streaming和Flink这类框架,它们都集成了丰富的上下游数据源连接器(Connector),比如常见的连接器MySQL、ElasticSearch、HBase、HDFS、Kafka等。如果使用这些框架,你可以很方便地集成这些外部框架,无需二次开发。

当然,由于开发Connector通常需要同时掌握流处理框架和外部框架,因此在实际使用过程中,Connector的质量参差不齐,在具体使用的时候,你可以多查查对应的jira官网,看看有没有明显的“坑”,然后再决定是否使用。

在这个方面,我是有前车之鉴的。曾经,我使用过一个Connector,我发现它在读取Kafka消息向其他系统写入的时候似乎总是重复消费。费了很多周折之后,我才发现这是一个已知的Bug,而且早就被记录在jira官网上了。因此,我推荐你多逛下jira,也许能提前避开一些“坑”。

总之,目前Kafka Streams只支持与Kafka集群进行交互,它没有提供开箱即用的外部数据源连接器。

协调方式

在分布式协调方面,Kafka Streams应用依赖于Kafka集群提供的协调功能,来提供高容错性和高伸缩性。

Kafka Streams应用底层使用了消费者组机制来实现任意的流处理扩缩容。应用的每个实例或节点,本质上都是相同消费者组下的独立消费者,彼此互不影响。它们之间的协调工作,由Kafka集群Broker上对应的协调者组件来完成。当有实例增加或退出时,协调者自动感知并重新分配负载。

我画了一张图来展示每个Kafka Streams实例内部的构造,从这张图中,我们可以看出,每个实例都由一个消费者实例、特定的流处理逻辑,以及一个生产者实例组成,而这些实例中的消费者实例,共同构成了一个消费者组。

在这里插入图片描述
通过这个机制,Kafka Streams应用同时实现了高伸缩性和高容错性,而这一切都是自动提供的,不需要你手动实现。

而像Flink这样的框架,它的容错性和扩展性是通过专属的主节点(Master Node)全局来协调控制的。

Flink支持通过ZooKeeper实现主节点的高可用性,避免单点失效:某个节点出现故障会自动触发恢复操作。这种全局性协调模型对于流处理中的作业而言非常实用,但不太适配单独的流处理应用程序。原因就在于它不像Kafka Streams那样轻量级,应用程序必须要实现特定的API来开启检查点机制(checkpointing),同时还需要亲身参与到错误恢复的过程中。

应该这样说,在不同的场景下,Kafka Streams和Flink这种重量级的协调模型各有优劣。

消息语义保障

精确一次处理语义(Exactly Once Semantics,EOS)

我们刚刚提到过EOS,目前很多流处理框架都宣称它们实现了EOS,也包括Kafka Streams本身。关于精确一次处理语义,有一些地方需要澄清一下。

实际上,当把Spark、Flink与Kafka结合使用时,如果不使用Kafka在0.11.0.0版本引入的幂等性Producer和事务型Producer,这些框架是无法实现端到端的EOS的。

因为这些框架与Kafka是相互独立的,彼此之间没有任何语义保障机制。但如果使用了事务机制,情况就不同了。这些外部系统利用Kafka的事务机制,保障了消息从Kafka读取到计算再到写入Kafka的全流程EOS。这就是所谓的端到端精确一次处理语义。

之前Spark和Flink宣称的EOS都是在各自的框架内实现的,无法实现端到端的EOS。只有使用了Kafka的事务机制,它们对应的Connector才有可能支持端到端精确一次处理语义。

Spark官网上明确指出了用户若要实现与Kafka的EOS,必须自己确保幂等输出和位移保存在同一个事务中。如果你不能自己实现这套机制,那么就要依赖于Kafka提供的事务机制来保证。

而Flink在Kafka 0.11之前也宣称提供EOS,不过是有前提条件的,即每条消息对Flink应用状态的影响有且只有一次。

举个例子,如果你使用Flink从Kafka读取消息,然后不加任何处理直接写入到MySQL,那么这个操作就是无状态的,此时Flink无法保证端到端的EOS。

换句话说,Flink最后写入到MySQL的Kafka消息可能有重复的。当然,Flink社区自1.4版本起正式实现了端到端的EOS,其基本设计思想正是基于Kafka 0.11幂等性Producer的两阶段提交机制。

两阶段提交(2PC)机制是一种分布式事务机制,用于实现分布式系统上跨多个节点事务的原子性提交。下面这张图来自于神书“Designing Data-Intensive Applications”中关于2PC讲解的章节。它清晰地描述了一次成功2PC的过程。在这张图中,两个数据库参与到分布式事务的提交过程中,它们各自做了一些变更,现在需要使用2PC来保证两个数据库的变更被原子性地提交。如图所示,2PC被分为两个阶段:Prepare阶段和Commit阶段。只有完整地执行了这两个阶段,这个分布式事务才算是提交成功。
在这里插入图片描述
分布式系统中的2PC常见于数据库内部实现或以XA事务的方式供各种异质系统使用。Kafka也借鉴了2PC的思想,在Kafka内部实现了基于2PC的事务机制。

但是,对于Kafka Streams而言,情况就不同了。它天然支持端到端的EOS,因为它本来就是和Kafka紧密相连的。

下图展示了一个典型的Kafka Streams应用的执行逻辑。
在这里插入图片描述
通常情况下,一个Kafka Streams需要执行5个步骤:

  1. 读取最新处理的消息位移;
  2. 读取消息数据;
  3. 执行处理逻辑;
  4. 将处理结果写回到Kafka;
  5. 保存位置信息。

这五步的执行必须是原子性的,否则无法实现精确一次处理语义。

在设计上,Kafka Streams在底层大量使用Kafka事务机制和幂等性Producer来实现多分区的原子性写入,又因为它只能读写Kafka,因此Kafka Streams很容易地就实现了端到端的EOS。

总之,虽然Flink自1.4版本也提供与Kafka的EOS,但从适配性来考量的话,应该说Kafka Streams与Kafka的适配性是最好的。

41 Kafka Streams DSL开发实例

DSL,也就是Domain Specific Language,意思是领域特定语言。它提供了一组便捷的API帮助我们实现流式数据处理逻辑。今天,我就来分享一些Kafka Streams中的DSL开发方法以及具体实例。

Kafka Streams背景介绍

流处理平台是专门处理无限数据集的引擎。就Kafka Streams而言,它仅仅是一个客户端库。所谓的Kafka Streams应用,就是调用了Streams API的普通Java应用程序。只不过在Kafka Streams中,流处理逻辑是用拓扑来表征的。

一个拓扑结构本质上是一个有向无环图(DAG),它由多个处理节点(Node)和连接节点的多条边组成,如下图所示:
在这里插入图片描述
图中的节点也称为处理单元或Processor,它封装了具体的事件处理逻辑。Processor在其他流处理平台也被称为操作算子。常见的操作算子包括转换(map)、过滤(filter)、连接(join)和聚合(aggregation)等。后面我会详细介绍几种常见的操作算子。

大体上,Kafka Streams开放了两大类API供你定义Processor逻辑。

第1类就是我刚刚提到的DSL,它是声明式的函数式API,使用起来感觉和SQL类似,你不用操心它的底层是怎么实现的,你只需要调用特定的API告诉Kafka Streams你要做什么即可。
举个简单的例子,你可以看看下面这段代码,尝试理解下它是做什么的。

movies.filter((title, movie) -> movie.getGenre().equals("动作片")).xxx()...

这段代码虽然用了Java 8的Lambda表达式,但从整体上来看,它要做的事情应该还是很清晰的:它要从所有Movie事件中过滤出影片类型是“动作片”的事件。这就是DSL声明式API的实现方式。

第2类则是命令式的低阶API,称为Processor API。比起DSL,这组API提供的实现方式更加灵活。你可以编写自定义的算子来实现一些DSL天然没有提供的处理逻辑。事实上,DSL底层也是用Processor API实现的。

目前,Kafka Streams DSL提供的API已经很丰富了,基本上能够满足我们大部分的处理逻辑需求,我今天重点介绍一下DSL的使用方法。

不论是用哪组API实现,所有流处理应用本质上都可以分为两类:有状态的(Stateful)应用和无状态的(Stateless)应用。

有状态的应用指的是应用中使用了类似于连接聚合时间窗口(Window)的API。一旦调用了这些API,你的应用就变为有状态的了,也就是说你需要让Kafka Streams帮你保存应用的状态。

无状态的应用是指在这类应用中,某条消息的处理结果不会影响或依赖其他消息的处理。常见的无状态操作包括事件转换以及刚刚那个例子中的过滤等。

关键概念

了解了这些背景之后,你还需要掌握一些流处理领域内的关键概念,即流、表以及流表二元性,还有时间和时间窗口。

流表二元性

流就是一个永不停止(至少理论上是这样的)的事件序列,而表和关系型数据库中的概念类似,是一组行记录。在流处理领域,两者是有机统一的:流在时间维度上聚合之后形成,表在时间维度上不断更新形成,这就是所谓的流表二元性。流表二元性在流处理领域内的应用是Kafka框架赖以成功的重要原因之一。

下面这张图展示了表转换成流,流再转换成表的全过程。
在这里插入图片描述
刚开始时,表中只有一条记录“张三:1”。将该条记录转成流,变成了一条事件。接着,表增加了新记录“李四:1”。针对这个变更,流中也增加了对应的新事件。之后,表中张三的对应值,从1更新为2,流也增加了相应的更新事件。最后,表中添加了新数据“王五:1”,流也增加了新记录。至此,表转换成流的工作就完成了。

从这个过程中我们可以看出,流可以看作是表的变更事件日志(Changelog)。与之相反的是,流转换成表的过程,可以说是这个过程的逆过程:我们为流中的每条事件打一个快照(Snapshot),就形成了表。

流和表的概念在流处理领域非常关键。在Kafka Streams DSL中,流用KStream表示,而表用KTable表示。

Kafka Streams还定义了GlobalKTable。本质上它和KTable都表征了一个表,里面封装了事件变更流,但是它和KTable的最大不同在于,当Streams应用程序读取Kafka主题数据到GlobalKTable时,它会读取主题所有分区的数据,而对KTable而言,Streams程序实例只会读取部分分区的数据,这主要取决于Streams实例的数量。

时间

在流处理领域内,精确定义事件时间是非常关键的:一方面,它是决定流处理应用能否实现正确性的前提;另一方面,流处理中时间窗口等操作依赖于时间概念才能正常工作。

常见的时间概念有两类:事件发生时间(Event Time)和事件处理时间(Processing Time)。理想情况下,我们希望这两个时间相等,即事件一旦发生就马上被处理,但在实际场景中,这是不可能的,Processing Time永远滞后于Event Time,,而且滞后程度又是一个高度变化,无法预知,就像“Streaming Systems”一书中的这张图片所展示的那样:
在这里插入图片描述
该图中的45°虚线刻画的是理想状态,即Event Time等于Processing Time,而粉色的曲线表征的是真实情况,即Processing Time落后于Event Time,而且落后的程度(Lag)不断变化,毫无规律。

如果流处理应用要实现结果的正确性,就必须要使用基于Event Time的时间窗口,而不能使用基于Processing Time的时间窗口。

时间窗口

所谓的时间窗口机制,就是将流数据沿着时间线切分的过程。常见的时间窗口包括:固定时间窗口(Fixed Windows)、滑动时间窗口(Sliding Windows)和会话窗口(Session Windows)。Kafka Streams同时支持这三类时间窗口。在后面的例子中,我会详细介绍如何使用Kafka Streams API实现时间窗口功能。

运行WordCount实例

关于Kafka Streams及其DSL的基本概念我都阐述完了,下面我给出大数据处理领域的Hello World实例:WordCount程序。

每个大数据处理框架第一个要实现的程序基本上都是单词计数。我们来看下Kafka Streams DSL如何实现WordCount。我先给出完整代码,稍后我会详细介绍关键部分代码的含义以及运行它的方法。

package kafkalearn.demo.wordcount;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public final class WordCountDemo {public static void main(final String[] args) {final Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-demo");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");final StreamsBuilder builder = new StreamsBuilder();final KStream<String, String> source = builder.stream("wordcount-input-topic");final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("wordcount-output-topic", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("wordcount-stream-demo-jvm-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0)}
}

在程序开头,我构造了一个Properties对象实例,对Kafka Streams程序的关键参数进行了赋值,比如application id、bootstrap servers和默认的KV序列化器(Serializer)和反序列化器(Deserializer)。其中,application id是Kafka Streams应用的唯一标识,必须要显式地指定。默认的KV序列化器、反序列化器是为消息的Key和Value进行序列化和反序列化操作的。

接着,我构造了一个StreamsBuilder对象,并使用该对象实例创建了一个KStream,这个KStream从名为wordcount-input-topic的Kafka主题读取消息。该主题消息由一组单词组成,单词间用空格分割,比如zhangsan lisi wangwu。

由于我们要进行单词计数,所以就需要将消息中的单词提取出来。有了前面的概念介绍,你应该可以猜到,KTable是很合适的存储结构,因此,下一步就是将刚才的这个KStream转换成KTable。

我们先对单词进行分割,这里我用到了flatMapValues方法,代码中的Lambda表达式实现了从消息中提取单词的逻辑。由于String.split()方法会返回多个单词,因此我们使用flatMapValues而不是mapValues。原因是,前者能够将多个元素“打散”成一组单词,而如果使用后者,我们得到的就不是一组单词,而是多组单词了。

这些都做完之后,程序调用groupBy方法对单词进行分组。由于是计数,相同的单词必须被分到一起,然后就是调用count方法对每个出现的单词进行统计计数,并保存在名为counts的KTable对象中。

最后,我们将统计结果写回到Kafka中。由于KTable是表,是静态的数据,因此这里要先将其转换成KStream,然后再调用to方法写入到名为wordcount-output-topic的主题中。此时,counts中事件的Key是单词,而Value是统计个数,因此我们在调用to方法时,同时指定了Key和Value的序列化器,分别是字符串序列化器和长整型序列化器。

至此,Kafka Streams的流计算逻辑就编写完了,接下来就是构造KafkaStreams实例并启动它了。通常来说,这部分的代码都是类似的,即调用start方法启动整个流处理应用,以及配置一个JVM关闭钩子(Shutdown Hook)实现流处理应用的关闭等。

总体来说,Kafka Streams DSL实现WordCount的方式还是很简单的,仅仅调用几个操作算子就轻松地实现了分布式的单词计数实时处理功能。事实上,现在主流的实时流处理框架越来越倾向于这样的设计思路,即通过提供丰富而便捷的开箱即用操作算子,简化用户的开发成本,采用类似于搭积木的方式快捷地构建实时计算应用。

待启动该Java程序之后,你需要创建出对应的输入和输出主题,并向输入主题不断地写入符合刚才所说的格式的单词行,之后,你需要运行下面的命令去查看输出主题中是否正确地统计了你刚才输入的单词个数:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic wordcount-output-topic \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

开发API

介绍了具体的例子之后,我们来看下Kafka Streams还提供了哪些功能强大的API。我们可以重点关注两个方面,一个是常见的操作算子,另一个是时间窗口API。

常见操作算子

操作算子的丰富程度和易用性是衡量流处理框架受欢迎程度的重要依据之一。Kafka Streams DSL提供了很多开箱即用的操作算子,大体上分为两大类:无状态算子和有状态算子。下面我就向你分别介绍几个经常使用的算子。

在无状态算子中,filter的出场率是极高的。它执行的就是过滤的逻辑。依然拿WordCount为例,假设我们只想统计那些以字母s开头的单词的个数,我们可以在执行完flatMapValues后增加一行代码,代码如下:

.filter(((key, value) -> value.startsWith("s")))

另一个常见的无状态算子当属map一族了。Streams DSL提供了很多变体,比如map、mapValues、flatMap和flatMapValues。我们已经见识了flatMapValues的威力,其他三个的功能也是类似的,只是所有带Values的变体都只对消息体执行转换,不触及消息的Key,而不带Values的变体则能修改消息的Key。

举个例子,假设当前消息没有Key,而Value是单词本身,现在我们想要将消息变更成这样的KV对,即Key是单词小写,而Value是单词长度,那么我们可以调用map方法,代码如下:

KStream<String, Integer> transformed = stream.map((key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

最后,我再介绍一组调试用的无状态算子:print和peek。Streams DSL支持你使用这两个方法查看你的消息流中的事件。这两者的区别在于,print是终止操作,一旦你调用了print方法,后面就不能再调用任何其他方法了,而peek则允许你在查看消息流的同时,依然能够继续对其进行处理,比如下面这两段代码所示:

stream.print(Printed.toFile("streams.out").withLabel("debug"));
stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value)).map(...);

常见的有状态操作算子主要涉及聚合(Aggregation)方面的操作,比如计数、求和、求平均值、求最大最小值等。Streams DSL目前只提供了count方法用于计数,其他的聚合操作需要你自行使用API实现。

假设我们有个消息流,每条事件就是一个单独的整数,现在我们想要对其中的偶数进行求和,那么Streams DSL中的实现方法如下:

final KTable<Integer, Integer> sumOfEvenNumbers = input.filter((k, v) -> v % 2 == 0).selectKey((k, v) -> 1).groupByKey().reduce((v1, v2) -> v1 + v2);

我简单解释一下selectKey调用。由于我们要对所有事件中的偶数进行求和,因此需要把这些消息的Key都调整成相同的值,因此这里我使用selectKey指定了一个Dummy Key值,即上面这段代码中的数值1。它没有任何含义,仅仅是让所有消息都赋值上这个Key而已。真正核心的代码在于reduce调用,它是执行求和的关键逻辑。

时间窗口实例

前面说过,Streams DSL支持3类时间窗口。前两类窗口通过TimeWindows.of方法来实现,会话窗口通过SessionWindows.with来实现。

假设在刚才的WordCount实例中,我们想每一分钟统计一次单词计数,那么需要在调用count之前增加下面这行代码:

.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))

同时,你还需要修改counts的类型,此时它不再是KTable了,而变成了KTable,因为引入了时间窗口,所以,事件的Key也必须要携带时间窗口的信息。除了这两点变化,WordCount其他部分代码都不需要修改。

可见,Streams DSL在API封装性方面还是做得很好的,通常你只需要增加或删减几行代码,就能实现处理逻辑的修改了。

小结

  • 流表二元性:流就是一个永不停止(至少理论上是这样的)的事件序列,而表是一组行记录。流在时间维度上聚合之后形成表,表在时间维度上不断更新形成流。
  • 时间概念:常见的有两类,分别是事件发生时间和事件处理时间。在实际场景中,事件处理时间永远滞后于事件发生时间。
  • 时间窗口机制:将流数据沿着时间线切分的过程常见的时间窗口包括固定时间窗口、滑动时间窗口和会话窗口。
  • 常见操作算子:无状态算子中的filter、map一族、print和peek;有状态算子中涉及聚合方面的操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/921029.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/921029.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Custom SRP - LOD and Reflections

1 LOD Groups 场景中对象越多,场景就越丰富,但是过多的对象,也会增加 CPU 和 GPU 的负担.同时如果对象最终渲染在屏幕上后覆盖的像素太少,就会产生模糊不清的像素点/噪点.如果能够不渲染这些过小的对象,就能解决噪点问题,同时释放 CPU GPU,去处理更重要的对象. 裁剪掉这些对象…

【Linux篇章】互联网身份密码:解密 Session 与 Cookie 的隐藏玩法和致命漏洞!

本篇摘要 本篇将承接上篇HTTP讲解&#xff08; 戳我查看 &#xff09;遗留的关于Cookie与Session的介绍&#xff0c;在本篇&#xff0c;将会介绍Cookie的由来&#xff0c;作用&#xff0c;以及缺点等&#xff0c;进而引出Session&#xff0c;最后介绍一下它们的性质等&#xf…

Postman接口测试工具:高效管理测试用例与环境变量,支持断言验证及团队协作同步

之前跟你们聊过能搭知识网络的 Obsidian&#xff0c;今天换个偏向接口测试的方向 —— 给你们安利一个 Github 上的「Postman」&#xff0c;它是个接口测试工具&#xff0c;官网能直接下载&#xff08;Postman: The Worlds Leading API Platform | Sign Up for Free&#xff09…

可可图片编辑 HarmonyOS 上架应用分享

可可图片编辑 HarmonyOS 上架应用分享 介绍 可可图片编辑 原名 图片编辑大师&#xff0c;因为上架审核的时候 &#xff0c;提示与一些已有应用重名&#xff0c;为了避免冲突&#xff0c;需要改名字&#xff0c;所以苦心思考了一分钟&#xff0c;就调整成 可可图片编辑。 应用…

Notepad++近期版本避雷

近期Notepad若干版本存在投毒事件&#xff0c;虽然也欢迎大家使用替代软件&#xff0c;但是Notepad作为一款开源软件&#xff0c;如有需要也可以继续白嫖使用&#xff0c;但是请务必避开若干埋雷版本&#xff01; 经检查&#xff0c;部分版本在帮助菜单中加入了有关tw的部分个人…

【lucene核心】impacts的由来

在 Lucene 的 Impact 概念&#xff08;出现在 ImpactsEnum / Impact 对象里&#xff09;中&#xff1a;字段 含义 freq 当前 term 在该文档中出现了多少次&#xff08;即词频 term frequency&#xff09;。 norm 当前 文档在该字段中的长度因子&#xff08;即之前 norms 里保存…

基于Echarts+HTML5可视化数据大屏展示-惠民服务平台

效果展示代码结构&#xff1a;主要代码实现 index.html布局 <!doctype html> <html><head><meta charset"utf-8"><title>双数智慧公卫-传染病督导平台</title><meta http-equiv"refresh" content"60;urlhttps…

【Flink】DataStream API:执行环境、执行模式、触发程序执行

目录执行环境getExecutionEnvironmentcreateLocalEnvironmentcreateRemoteEnvironment执行模式流执行模式&#xff08;Streaming&#xff09;批执行模式&#xff08;Batch&#xff09;自动模式&#xff08;AutoMatic&#xff09;触发程序执行DataStream API是Flink的核心层API&…

CentOS7.6

腾讯云服务器 腾讯云 产业智变云启未来 - 腾讯 服务器在控制台显示 点击进入面板&#xff0c;显示所有信息 现在来安装桌面的远程控制软件 宝塔SSH终端:一款同时支持SSH和SFTP客户端的免费软件! 点击立即下载 在云服务器的实例列表复制公网ip 密码就是服务器的密码&#xff…

前端架构知识体系:常见图片格式详解与最佳实践

前端开发必备&#xff1a; 在前端开发中&#xff0c;合理选择图片格式直接影响网页加载性能、用户体验和带宽成本。本文将系统梳理常见图片格式&#xff0c;分析它们的优缺点、压缩原理、兼容性和推荐使用场景&#xff0c;并提供前端优化实战建议。1. JPEG / JPG 全称&#xff…

ARM的编程模型

ARM的编程模型 ARM 的编程模型指的是从程序员&#xff08;特别是汇编程序员和编译器设计者&#xff09;视角所看到的 ARM 处理器架构。它定义了程序员可以使用的资源、数据操作方式以及规则&#xff0c;主要包括&#xff1a;寄存器组、数据类型、内存访问方式、执行状态和异常处…

最大熵强化学习相比传统强化学习,有什么缺点?

要理解最大熵强化学习&#xff08;MaxEnt RL&#xff09;相比传统强化学习&#xff08;如DQN、PPO、DDPG等&#xff09;的缺点&#xff0c;首先需要明确两者的核心差异&#xff1a;传统RL的目标是“最大化累积奖励”&#xff0c;而MaxEnt RL在该目标基础上额外增加了“最大化策…

python生成器与协程深度剖析

目录 生成器 传统列表 vs 生成器对比 yield机制深度解析 生成器的高级用法 协程的演进:从yield到async/await 基于yield的协程 现代async/await语法 协程的错误处理和超时控制 异步生成器与异步迭代器 异步生成器 异步迭代器实现 实战案例:异步爬虫框架设计 生成器…

论文解读:基于 77 GHz FMCW 毫米波雷达的舱内占位检测

毫米波 (mm-Wave) 雷达是汽车应用&#xff08;例如高级驾驶辅助系统 (ADAS)&#xff09;的一种解决方案。本研究探索了商用毫米波雷达技术在车内应用领域的应用。本文提出了一种基于 77 GHz 毫米波雷达的车辆占用检测器框架。本研究采用了德州仪器 (Texas Instruments) 的多输入…

进程优先级(Process Priority)

&#x1f381;个人主页&#xff1a;工藤新一 &#x1f50d;系列专栏&#xff1a;C面向对象&#xff08;类和对象篇&#xff09; &#x1f31f;心中的天空之城&#xff0c;终会照亮我前方的路 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 文章目录进…

OpenCV的轮廓检测

1. 轮廓检测的基本概念轮廓是图像中连续的、闭合的曲线段&#xff0c;代表物体的边界&#xff08;如圆形的轮廓是一条闭合曲线&#xff09;。OpenCV 的轮廓检测通过 cv2.findContours() 实现&#xff0c;可用于形状识别、物体计数、图像分割等场景。2. 核心函数与参数&#xff…

亚信安全亮相鸿蒙生态大会2025 携手鸿蒙生态绘就万物智联新蓝图

8 月30 日&#xff0c;以 “新场景・新体验” 为主题的鸿蒙生态大会 2025 在深圳福田会展中心隆重开幕。本次大会由全球智慧物联网联盟&#xff08;GIIC&#xff09;主办、鸿蒙生态服务&#xff08;深圳&#xff09;有限公司承办&#xff0c;旨在搭建全球鸿蒙生态伙伴的高层次交…

Linux内核进程管理子系统有什么第四十回 —— 进程主结构详解(36)

接前一篇文章&#xff1a;Linux内核进程管理子系统有什么第三十九回 —— 进程主结构详解&#xff08;35&#xff09; 本文内容参考&#xff1a; Linux内核进程管理专题报告_linux rseq-CSDN博客 《趣谈Linux操作系统 核心原理篇&#xff1a;第三部分 进程管理》—— 刘超 《…

面试问题:进程和线程,编译步骤,const,map和unordered_map,深入理解unordered_map

目录 进程和线程的区别 const修饰指针(左边内容&#xff0c;右边指向) 1. const 修饰指针指向的内容&#xff08;指向常量&#xff09; 2. const 修饰指针本身&#xff08;常量指针&#xff09; 3. const 同时修饰指针本身和指向的内容&#xff08;指向常量的常量指针&…

利用棒棒糖图探索Office (US)的IMDB评分

利用棒棒糖图探索Office (US)的IMDB评分 import numpy as np import pandas as pd import matplotlib.colors as mc import matplotlib.image as image import matplotlib.pyplot as pltfrom matplotlib.cm import ScalarMappable from matplotlib.lines import Line2D from m…