Flink2.0学习笔记:Stream API 常用转换算子

EC0720/FLINKTASK-TEST-STREAM/demo at master · stevensu1/EC0720

先看测试效果:控制台

测试效果:监控服务端

主要的转换算子包括:

转换算子 filter:过滤包含“Flink”的输入

转换算子 map: 将每行数据前添加“Processed: ”并转为大写

转换算子 flatMap: 将每行数据拆分为单词

转换算子 sum/keyBy: 按单词分组并计数

转换算子 reduce: 规约合并单词

转换算子 union: 合并两个数据流

主要的代码:

package com.example;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;public class App {public static void main(String[] args) {try {// 创建本地配置Configuration conf = new Configuration();// Web UI 配置conf.setString("rest.bind-port", "8081"); // 设置Web UI端口conf.setString("rest.bind-address", "0.0.0.0"); // 绑定所有网络接口conf.setString("rest.address", "localhost"); // 设置Web UI地址conf.setString("rest.enable", "true"); // 启用REST服务conf.setString("web.submit.enable", "true"); // 允许通过Web UI提交作业conf.setString("web.upload.dir", System.getProperty("java.io.tmpdir")); // 设置上传目录conf.setString("web.access-control-allow-origin", "*"); // 允许跨域访问// 使用配置创建支持Web UI的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 设置为流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 基本配置env.setParallelism(1); // 设置并行度为1env.disableOperatorChaining(); // 禁用算子链,使执行更清晰// 禁用检查点,因为是简单的演示程序env.getCheckpointConfig().disableCheckpointing();// 创建周期性的数据源DataStream<String> text = env.socketTextStream("localhost", 9999) // 从socket读取数据.name("source-strings").setParallelism(1);// 转换算子 filter:过滤包含“Flink”的输入text.filter(line -> line.contains("Flink")).name("filter-flink-strings").setParallelism(1).map(String::toUpperCase).name("uppercase-mapper").setParallelism(1).print().name("printer");// 转换算子 map: 将每行数据前添加“Processed: ”并转为大写text.map(line -> "Processed: " + line.toUpperCase()).name("map-processed-strings").setParallelism(1).print().name("printer-processed");// 转换算子 flatMap: 将每行数据拆分为单词text.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) {for (String word : line.split(" ")) {out.collect(word);}}}).name("flatmap-split-words").setParallelism(1).print().name("printer-split-words");// 转换算子 keyBy: 按单词分组并计数text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}).name("flatmap-split-words").setParallelism(1).keyBy(tuple -> tuple.f0) // 按单词分组.sum(1) // 计算每个单词的出现次数.print().name("printer-word-count");// 转换算子 reduce: 规约合并单词text.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) {for (String word : line.split(" ")) {out.collect(word);}}}).name("flatmap-split-words").setParallelism(1).keyBy(word -> word) // 按单词分组.reduce((word1, word2) -> word1 + ", " + word2) // 合并单词.print().name("printer-word-reduce");// 转换算子 union: 合并两个数据流DataStream<String> anotherText = env.fromSequence(1, Long.MAX_VALUE) // 持续生成数据.map(i -> {try {Thread.sleep(3000); // 每3秒生成一条消息return "Stream2> Auto Message " + i + ": Hello Flink";} catch (InterruptedException e) {return "Stream2> Error occurred";}}).name("source-another-strings").setParallelism(1);// 将两个流合并并处理text.map(str -> "Stream1> " + str) // 为第一个流添加前缀.union(anotherText) // 合并两个数据流.filter(str -> str.contains(":")) // 过滤掉不符合格式的数据.map(str -> {String[] parts = str.split(">");return String.format("%-8s | %s",parts[0].trim() + ">", // 对齐源标识parts[1].trim()); // 消息内容}).print().name("printer-union");// 执行任务env.execute("Flink Streaming Java API Hello");} catch (Exception e) {System.err.println("任务执行失败:" + e.getMessage());e.printStackTrace();}}
}

关于监控服务端集成:REST API |Apache Flink

在引入  本地执行UI支持 的依赖后

    <!-- 本地执行UI支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency>

还需要在构建环境时指定 支持Web UI的执行环境

            // 创建本地配置Configuration conf = new Configuration();// Web UI 配置conf.setString("rest.bind-port", "8081"); // 设置Web UI端口conf.setString("rest.bind-address", "0.0.0.0"); // 绑定所有网络接口conf.setString("rest.address", "localhost"); // 设置Web UI地址conf.setString("rest.enable", "true"); // 启用REST服务conf.setString("web.submit.enable", "true"); // 允许通过Web UI提交作业conf.setString("web.upload.dir", System.getProperty("java.io.tmpdir")); // 设置上传目录conf.setString("web.access-control-allow-origin", "*"); // 允许跨域访问// 使用配置创建支持Web UI的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

同时设置为无界处理 :

          // 设置为流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

不然以批处理模式的话,程序执行完就会终止Web UI环境,从而无法进入Web UI界面。不过通常都是打包后发布到专门的fink监控服务器执行任务。

下面是完整依赖:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>demo</artifactId><version>1</version><name>demo</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><flink.version>2.0.0</flink.version></properties><dependencies><!--flink-streaming-java--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink Clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- 本地执行UI支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- 日志支持 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.6</version></dependency><!-- 测试依赖 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>com.example.App</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins>
</build></project>

关于本地windows ncat服务器搭建,ncat 是 Nmap 软件包的一部分,所以我们需要安装 Nmap:

  1. 从官方网站下载 Nmap 安装程序:

    • 访问 https://nmap.org/download.html
    • 下载 "Latest stable release self-installer" 的 Windows 版本
    • 通常文件名类似 "nmap-7.94-setup.exe"
    • 打开新的 PowerShell 窗口(以使环境变量生效)
    • 运行以下命令来启动 ncat 服务器:ncat -l 9999

然后在另一个窗口中运行 Flink 程序:跑起来监听9999端口后,就可以在PowerShell 窗口输入对应的内容回车 作为程序的输入了

cd FLINKTASK-TEST-STREAM/demo
mvn clean package
java -jar target/demo-1.jar

关于更多概念,我也在持续学习实践中,比如flink内存模型等,希望大家多支持。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/916559.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/916559.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一、Python环境、Jupyter与Pycharm

安装Python由于RAG项目中所需要的Python版本必须高于3.8&#xff0c;经过筛选&#xff0c;最终选择了3.10.11这个版本py --version Python 3.10.11安装过程略过&#xff0c;但对于几个基础的命令作个笔记记录where python找到python启动器的位置D:\>where python C:\Users\x…

Flink CEP 动态模板与规则动态修改实践完全手册

1. Flink CEP:从静态规则到动态江湖 Flink 的复杂事件处理(CEP)库就像一个武功高强的侠客,能从数据流中精准捕获特定模式,堪称流处理界的“降龙十八掌”。但问题来了:传统 CEP 规则通常是写死在代码里的,就像刻在石碑上的武功秘籍,改起来费劲不说,还得重启应用,简直…

vue3.2 + echarts5.6 + ant-design-vue 3.x 实现自定义 echarts 图例

文章目录概要技术细节效果概要 需求需要实现图例移入显示描述说明 故实现自定义图例 技术细节 <template><div class"custom-legend"><divv-for"item in legends":key"item.name"class"legend-item":class"{ i…

【2025年7月25日】TrollStore巨魔商店恢复在线安装

就在今日7月25日&#xff0c;TrollStore的在线安装功能再次变得可用&#xff0c;这对于许多iPhone用户来说无疑是个喜讯。在经历了近三个月的中断后&#xff0c;巨魔商店的企业证书意外的到来了&#xff0c;使得用户能够重新采用在线安装的方式&#xff01; 在线安装地址在文…

【05】C#入门到精通——C# 面向对象、类、静态变量static、类与类之间的调用

文章目录1 引入例子2 创建类2.1 类的访问属性2.2 英雄 特点类2.3 英雄信息打印3 静态变量static4 类 调用 类4.1 非静态 成员函数4.2 静态 成员函数1 引入例子 比如游戏中 描述英雄的角色&#xff0c; 我们可以像下面这样&#xff0c;给每一个英雄特点及拥有技能分别定义变量…

单片机的硬件结构

单片机的硬件结构 一、课程导入 在上一节课《认识单片机》中&#xff0c;我们知道单片机就像一个超级迷你的工厂&#xff0c;有着类似工厂的各个组成部分。而这个 “迷你工厂” 能正常运转&#xff0c;离不开其内部严谨的硬件结构。就像一座大厦&#xff0c;只有基础结构稳固且…

multiprocessing模块使用方法(二)

spawn_main是Python multiprocessing模块的核心内部函数&#xff0c;用于实现spawn启动方法的子进程初始化。以下结合代码Demo详细说明其使用方法和推荐场景。一、spawn_main的功能与定位核心作用&#xff1a; 在spawn模式下启动子进程&#xff0c;负责进程间通信管道的建立和资…

编程与数学 03-002 计算机网络 07_路由算法

编程与数学 03-002 计算机网络 07_路由算法一、静态路由算法&#xff08;一&#xff09;手工配置路由表的方法&#xff08;二&#xff09;静态路由的优缺点二、动态路由算法原理&#xff08;一&#xff09;距离矢量算法&#xff08;如贝尔曼 - 福特算法&#xff09;&#xff08…

使用Python,OpenCV计算跑图的图像彩色度

使用Python&#xff0c;OpenCV计算跑图的图像彩色度 这篇博客将介绍如何计算跑图里最鲜艳的top25图片和最灰暗的top25图片并显示色彩彩色度值展示。 效果图 以下分别是最鲜艳top25和最灰暗top25对比效果图&#xff1a; 最鲜艳top25效果图&#xff1a; 最灰暗top25效果图…

LeetCode 60:排列序列

LeetCode 60&#xff1a;排列序列问题定义与核心挑战 给定整数 n 和 k&#xff0c;返回集合 {1,2,...,n} 的第 k 个字典序排列。直接生成所有排列再遍历到第 k 个的方法&#xff08;时间复杂度 O(n!)&#xff09;会因 n≥10 时阶乘爆炸而超时&#xff0c;因此需要 数学推导 贪…

亚远景-传统功能安全VS AI安全:ISO 8800填补的标准空白与实施难点

一、为什么需要ISO 8800&#xff1a;传统安全标准的“盲区”传统功能安全&#xff08;ISO 26262&#xff09;• 假设&#xff1a;系统行为可被完整规格化&#xff0c;失效模式可枚举&#xff0c;风险可用概率-危害矩阵量化。• 盲区&#xff1a;对“设计意图正确&#xff0c;但…

菜鸟教程 R语言基础运算 注释 和数据类型

菜鸟教程 R语言基础运算 注释 和数据类型 1.注释 注释主要用于一段代码的解析&#xff0c;可以让阅读者更易理解&#xff0c;编程语言的注释会被编译器忽略掉&#xff0c;且不会影响代码的执行。 一般编程语言的注释分为单行注释与多行注释&#xff0c;但是 R 语言只支持单行注…

华为云ELB(弹性负载均衡)持续报异常

华为云ELB&#xff08;弹性负载均衡&#xff09;持续报异常&#xff0c;需结合实例类型&#xff08;共享型/独享型&#xff09;和异常代码进行针对性排查。以下是分步排查建议&#xff1a;一、根据实例类型排查网络配置共享型实例 安全组规则&#xff1a;检查后端服务器安全组是…

《R for Data Science (2e)》免费中文翻译 (第2章) --- Workflow: basics

写在前面 本系列推文为《R for Data Science (2)》的中文翻译版本。所有内容都通过开源免费的方式上传至Github&#xff0c;欢迎大家参与贡献&#xff0c;详细信息见&#xff1a; Books-zh-cn 项目介绍&#xff1a; Books-zh-cn&#xff1a;开源免费的中文书籍社区 r4ds-zh-cn …

开源深度学习新宠:Burn框架助您无忧高效建模

在日新月异的人工智能世界里&#xff0c;各类深度学习框架如雨后春笋般涌现&#xff0c;而Burn&#xff0c;作为新一代的深度学习框架&#xff0c;以其不妥协的灵活性、高效性和可移植性崭露头角。本文将深入探讨Burn的核心功能、应用场景及具体使用方法&#xff0c;帮助您更好…

基于深度学习的图像分割:使用DeepLabv3实现高效分割

前言 图像分割是计算机视觉领域中的一个重要任务&#xff0c;其目标是将图像中的每个像素分配到不同的类别中。近年来&#xff0c;深度学习技术&#xff0c;尤其是卷积神经网络&#xff08;CNN&#xff09;&#xff0c;在图像分割任务中取得了显著的进展。DeepLabv3是一种高效的…

如何高效合并音视频文件(时间短消耗资源少)(二)

英语字幕 1 00:00:06,480 --> 00:00:08,400 Good morning. We have a banger for you2 00:00:08,400 --> 00:00:09,840 today. We&amp;#39;re going to launch chatbt3 00:00:09,840 --> 00:00:11,519 agent. But before jumping into that, I&amp;#39;d4 00…

内网后渗透攻击过程(实验环境)--4、权限维持(2)

用途限制声明&#xff0c;本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具&#xff0c;严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果&#xff0c;作者及发布平台不承担任何责任。渗透测试涉及复杂技…

CentOS 9 配置国内 YUM 源

1.备份 sudo mv /etc/yum.repos.d/centos.repo /etc/yum.repos.d/centos.repo.backup sudo mv /etc/yum.repos.d/centos-addons.repo /etc/yum.repos.d/centos-addons.repo.backup2.创建新文件 vi /etc/yum.repos.d/centos.repo[baseos] nameCentOS Stream $releasever - BaseO…

【算法】递归、搜索与回溯算法入门

文章目录递归什么是递归为什么会用到递归如何理解递归如何写好一个递归搜索 vs 深度优先遍历 vs 深度优先搜索 vs 宽度&#xff08;广度&#xff09;优先遍历 vs 宽度&#xff08;广度&#xff09;优先搜索 vs 暴搜深度优先遍历 vs 深度优先搜索&#xff08;dfs&#xff09;宽度…