「Flink」算子主要方法介绍

背景:

上期文章主要讲了Flink项目搭建的一些方法,其中对于数据流的处理很大一部分是通过算子来进行计算和处理的,算子也是Flink中功能非常庞大,且很重要的一部分。

算子介绍:

算子在Flink的开发者文档中是这样介绍的:通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。这简单总结就有点类似于Flink的一些API,来对数据流进行操作处理。

算子介绍目录:

主要介绍几个在日常开发中,比较常用的几个算子方法:

1.FlatMap

2.Filter

3.Window

4.join

5.coGroup

1.FlatMap

flatMap是输入一个元素同时产生零个、一个或多个元素。通常在日常开发中用于对于数据流的初步处理和合并,将数据流转换成我们希望输入的数据格式

方法举例:

dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});

日常使用举例:

/// 将binglog获取的dataChangInfo格式转换成OrderInfo业务格式
dataStream1.flatMap(new FlatMapFunction<DataChangeInfo, OrderInfo>() {@Overridepublic void flatMap(DataChangeInfo dataChangeInfo, Collector<OrderInfo> collector) throws Exception {OrderInfo orderInfo = JSONObject.parseObject(dataChangeInfo.getAfterData(), OrderInfo.class);log.info("订单数据:{}", orderInfo);collector.collect(orderInfo);}
});
2.Filter

对数据流进行过滤操作,将一些脏数据或者我们不希望流入的数据进行排除处理
使用举例:

/// 筛选出订单状态小于等于40的订单数据
orderInfoSingleOutputStream.filter(new FilterFunction<OrderInfo>() {@Overridepublic boolean filter(OrderInfo orderInfo) throws Exception {if (orderInfo.getStatus() <= 40){return true;}return false;}
});
3.Window

Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组。就类似于上期文章所讲述的窗口,具体介绍可以查看上期文章「Flink」Flink项目搭建方法介绍;

/// 先通过keyby设置主键
/// 然后设置一个以事件时间为标定,设一个5秒的窗口
dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))); 
4.Join

根据指定的 key 和窗口 join 两个数据流。
这个方法通常用在两个数据流需要通过某个key值进行合并的时候,比如订单主表和订单副表需要通过orderId进行数据合并的时候,进行数据处理。

方法举例:

dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))).apply (new JoinFunction () {...});

日常使用举例:

DataStream<OrderOutputInfo> outputInfoDataStream = orderInfoSingleOutputStream.join(orderCodeInfoSingleOutputStream).where(OrderInfo::getId).equalTo(OrderCodeInfo::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<OrderInfo, OrderCodeInfo, OrderOutputInfo>() {@Overridepublic OrderOutputInfo join(OrderInfo orderInfo, OrderCodeInfo orderCodeInfo) throws Exception {OrderOutputInfo orderOutputInfo = new OrderOutputInfo();orderOutputInfo.setId(orderInfo.getId());orderOutputInfo.setStatus(orderInfo.getStatus());orderOutputInfo.setCode(orderCodeInfo.getCode());orderOutputInfo.setCreate_time(orderInfo.getCreate_time());log.info("输出数据:{}", orderOutputInfo);return orderOutputInfo;}});

在这里插入图片描述

通过断点,其实可以发现,数据并不是按照一批一批进行输出的,而是根据key,进行一条一条的输出的,这个需要注意写入库的方法,以免对数据库写入产生较大的压力。
然后该方法会发现一个弊端,那就是如果不在事件窗口期输入的,那么无法匹配到对应的数据行,那么就会出现数据无法输出,数据丢失的情况,使用outside,官方推荐的侧输出,也无法有效输出,这时候比较推荐下面这个方法Cogroup,可以通过自定义的方法进行对未匹配的数据进行输出报错;

5.CoGroup

根据指定的 key 和窗口将两个数据流组合在一起。
CoGroup和Join是个类似的方法,但是CoGroup的数据处理方法里面可以有迭代器,然后在实际数据处理过程中可以通过判断迭代器,从而实现对于未匹配成功的订单进行打印输出。

方法举例:

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))).apply (new CoGroupFunction () {...});

日常使用举例:

orderInfoSingleOutputStream.coGroup(orderCodeInfoSingleOutputStream).where(OrderInfo::getId).equalTo(OrderCodeInfo::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupFunction<OrderInfo, OrderCodeInfo, OrderOutputInfo>() {@Overridepublic void coGroup(Iterable<OrderInfo> iterable, Iterable<OrderCodeInfo> iterable1, Collector<OrderOutputInfo> collector) throws Exception {if(iterable.iterator().hasNext() && iterable1.iterator().hasNext()){OrderInfo orderInfo = iterable.iterator().next();OrderCodeInfo orderCodeInfo = iterable1.iterator().next();System.out.println("匹配成功的订单ID:" + orderInfo.getId() + "  订单创建时间:" + orderInfo.getCreate_time() + "  status " + orderInfo.getStatus());System.out.println("=============================");OrderOutputInfo orderOutputInfo = new OrderOutputInfo();orderOutputInfo.setId(orderInfo.getId());orderOutputInfo.setStatus(orderInfo.getStatus());orderOutputInfo.setCode(orderCodeInfo.getCode());orderOutputInfo.setCreate_time(orderInfo.getCreate_time());collector.collect(orderOutputInfo);}else if(iterable.iterator().hasNext() && !iterable1.iterator().hasNext()){OrderInfo order = iterable.iterator().next();System.out.println("订单未找到匹配的订单-----------Code:"+ order.getId());} else  if(!iterable.iterator().hasNext() && iterable1.iterator().hasNext()){OrderCodeInfo orderCodeInfo = iterable1.iterator().next();System.out.println("未找到匹配的Code订单-----------Code:" + orderCodeInfo.getId() );}}});

数据输出日志:
可以看到数据也是一条条匹配后输出,无法匹配的数据也会在窗口结束后进行输出展示或告警。

在这里插入图片描述

总结:

以上几个算子方法就是平时日常开发中比较常用且好用的方法,大家可以结合各自的业务场景,进行挑选使用。

相关链接

Flink

Flink开发者文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/88200.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/88200.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3405. 统计恰好有 K 个相等相邻元素的数组数目

3405. 统计恰好有 K 个相等相邻元素的数组数目 给你三个整数 n &#xff0c;m &#xff0c;k 。长度为 n 的 好数组 arr 定义如下&#xff1a; arr 中每个元素都在 闭 区间 [1, m] 中。恰好 有 k 个下标 i &#xff08;其中 1 < i < n&#xff09;满足 arr[i - 1] arr…

Spring AI 项目实战(十):Spring Boot + AI + DeepSeek 构建智能合同分析技术实践(附完整源码)

系列文章 序号文章名称1Spring AI 项目实战(一):Spring AI 核心模块入门2Spring AI 项目实战(二):Spring Boot + AI + DeepSeek 深度实战(附完整源码)3Spring AI 项目实战(三):Spring Boot + AI + DeepSeek 打造智能客服系统(附完整源码)4

impala中时间戳转(DATE)指定格式的字符串

注意i&#xff1a;注意大小写 timestamp\date–>string SELECT now(),from_timestamp(now(),yyyyMMdd);string->timestamp SELECT 20230710,to_timestamp(20230710,yyyyMMdd);日期加减 select 20231201,from_timestamp(date_add(to_timestamp(20231201,yyyyMMdd),1),…

百度下拉框出词技术解密:72小时出下拉词软件原理分享

如何才能刷下拉词&#xff1f;这个问题一直是企业做流量时最纠结的问题&#xff0c;百度下拉词作为百度搜索体验中的一项智能化功能&#xff0c;极大地方便了用户快速完成搜索&#xff0c;也成为了企业在搜索引擎优化&#xff08;SEO&#xff09;策略中的重要流量入口。通过研究…

上海人工智能实验室明珠湖会议首开,解答AI前沿疑问,推进科学智能

在通用人工智能&#xff08;AGI&#xff09;探索如火如荼的当下&#xff0c;如何加速突破&#xff1f;如何凝练关键问题、孕育颠覆性创新&#xff1f;2025年6月13日&#xff0c;上海人工智能实验室主任、首席科学家&#xff0c;清华大学惠妍讲席教授周伯文在首届明珠湖会议&…

BeyondCompare安装(永久免费使用+全网最详细版)

一.下载&#xff1a; 官网下载&#xff08;速度较慢&#xff09;&#xff1a; https://www.scootersoftware.com/download.php 阿里云盘&#xff08;不限速&#xff09; https://www.alipan.com/s/WaG1z54BQ2U 二.安装&#xff08;无脑下一步即可&#xff09; 三.永久免费…

如何用AI开发完整的小程序<7>—让AI微调UI排版

上一节我们介绍了如何让AI修改整体UI视觉效果。 不过有时候AI调整的并不理想&#xff0c;一些UI的布局还是需要微调。 比如已经实现的这个开始页面&#xff0c;我觉得标题太高了&#xff0c;这时候可以自己调&#xff0c;也可以让AI单独调&#xff0c;下面详细介绍。 一、手动…

64-Oracle Redo Log

小伙伴们&#xff0c;关于数据库的redo log相信大家都操作很多次了,且这是OCM考试必考内容。Oracle Redo Log是一种特殊的日志文件&#xff0c;用于完整地记录数据库中所有数据变更的详细信息。当数据库执行插如、更新或删除等更新操作&#xff0c;这些操作并不会立刻写入数据库…

hive集群优化和治理常见的问题答案

Hive 集群优化与治理常见问题答案合集 &#x1f42d;1. Q&#xff1a;Hive中如何优化大表Join操作&#xff1f; A&#xff1a; 使用Map Join&#xff08;小表Join大表时&#xff09;避免Reduce阶段。启用自动Map Join&#xff08;设置hive.auto.convert.jointrue&#xff09;…

C#采集电脑硬件(CPU、GPU、硬盘、内存等)温度和使用状况

这是采集出来的Json&#xff0c;部分电脑&#xff08;特别是笔记本&#xff09;无法获取到&#xff1a; {"HardwareList": [{"Name": "MITX-6999","Type": "主板","Sensors": [],"WmiReport": null}, …

C3新增特性

✅ 一、选择器&#xff08;Selectors&#xff09; 1. 属性选择器 [attr^value]: 匹配属性值以特定字符串开头的元素。[attr$value]: 匹配属性值以特定字符串结尾的元素。[attr*value]: 匹配属性值包含特定字符串的元素。 2. 子元素和兄弟元素选择器 :nth-child(n): 匹配父元…

报错 @import “~element-ui/packages/theme-chalk/src/index“;

报错 import "~element-ui/packages/theme-chalk/src/index"; 具体报错报错原因 具体报错 SassError: Can’t find stylesheet to import. import “~element-ui/packages/theme-chalk/src/index”; src\views\login\theme\element-variables.scss 8:9 root stylesh…

ESLint从入门到实战

引言 作为前端开发者&#xff0c;你是否遇到过这样的情况&#xff1a;团队成员写出的代码风格各异&#xff0c;有人喜欢用分号&#xff0c;有人不用&#xff1b;有人用双引号&#xff0c;有人用单引号&#xff1b;代码评审时总是在纠结这些格式问题而不是业务逻辑&#xff1f;…

vue3实现markdown文档转HTML并可更换样式

vue3实现markdown文档转HTML 安装marked npm install marked<template><!-- 后台可添加样式编辑器 --><div class"markdown-editor" :class"{ fullscreen: isFullscreen, preview-mode: isPreviewMode }"><div class"editor-c…

Temu 实时获取商品动态:一个踩坑后修好的抓数脚本笔记

Temu 作为一个增长迅猛的购物平台&#xff0c;其商品价格、库存等信息&#xff0c;对许多做运营分析的小伙伴来说非常有参考价值。 我在写这个小工具的时候&#xff0c;踩了很多坑&#xff0c;特别记录下来&#xff0c;希望对你有用。 初版代码&#xff1a;想当然的“直接来一下…

【软考高级系统架构论文】论数据分片技术及其应用

论文真题 数据分片就是按照一定的规则,将数据集划分成相互独立、 正交的数据子集,然后将数据子集分布到不同的节点上。通过设计合理的数据分片规则,可将系统中的数据分布在不同的物理数据库中,达到提升应用系统数据处理速度的目的。 请围绕“论数据分片技术及其应用”论题…

VR飞夺泸定桥沉浸式历史再现​

当你戴上 VR 设备开启这场震撼人心的 VR 飞夺泸定桥体验&#xff0c;瞬间就会被拉回到 1935 年那个战火纷飞的 VR 飞夺泸定桥的岁月&#xff0c;置身于泸定桥的西岸 。映入眼帘的是一座由 13 根铁索组成的泸定桥&#xff0c;它横跨在波涛汹涌的大渡河上&#xff0c;桥下江水咆哮…

libwebsockets编译

#安装 libwebsocket git clone https://github.com/warmcat/libwebsockets && \ mkdir libwebsockets/build && cd libwebsockets/build && \ cmake -DMAKE_INSTALL_PREFIX:PATH/usr -DCMAKE_C_FLAGS"-fpic" .. && \ make &&…

使用docker部署epg节目单,同时管理自己的直播源

配置 Docker 环境 拉取镜像并运行&#xff1a; docker run -d \--name php-epg \-v /etc/epg:/htdocs/data \-p 5678:80 \--restart unless-stopped \taksss/php-epg:latest 默认数据目录为 /etc/epg &#xff0c;根据需要自行修改 默认端口为 5678 &#xff0c;根据需要自行修…

H5新增属性

✅ 一、表单相关新增属性&#xff08;Form Attributes&#xff09; 这些属性增强了表单功能&#xff0c;提升用户体验和前端验证能力。 1. placeholder 描述&#xff1a;在输入框为空时显示提示文本。示例&#xff1a; <input type"text" placeholder"请输…