Flink2.0学习笔记:Flink服务器搭建与flink作业提交

一,下载flink:Downloads | Apache Flink,解压后放入IDE工作目录:我这里以1.17版本为例

可以看到,flink后期的版本中没有提供window启动脚本:start-cluster.bat

所以这里要通过windows自带的wsl 系统启动它

打开终端依次运行下列命令完成wsl linux 系统的安装以及jdk的安装

wsl --install
wsl.exe -d Ubuntu
sudo apt update
sudo apt install openjdk-11-jdk -y

之后继续在终端中执行 wsl.exe -d Ubuntu 启动wsl,wsl 默认系统为:Ubuntu,当然也可以切换其他类型的系统,重要的是:wsl会自动挂载windows 目录,这就实现了在wsl上运行windows目录中的项目。

然后 一路cd 到flink bin目录,启动flink:


这里启动前要注意修改flink 的配置:把localhost 统统改为 0.0.0.0,,除jobmanager.rpc.address: 这项要设置为wsl  的ip,不然flink集群选举master会失败: [jobmanager.rpc.address: 172.29.145.42],这样启动后,就可以在本机浏览器输入wsl的ip访问flink服务的web ui了


二,提交flink作业

为了方便测试,这里写一个程序每隔1秒向本机(192.168.0.39) 端口:9999发送数据:“test flink window hallo word”。

package org.example.demo01;import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class PushDataTo9999 {private static final String HOST = "192.168.0.39";private static final int PORT = 9999;private static final String DATA = "test flink window hallo word";public static void main(String[] args) {try {System.out.println("Connecting to " + HOST + ":" + PORT);// 创建到WSL的连接try (Socket socket = new Socket(HOST, PORT);OutputStream outputStream = socket.getOutputStream()) {System.out.println("Connected to " + HOST + ":" + PORT);// 持续发送数据while (!Thread.currentThread().isInterrupted() && !socket.isClosed()) {// 获取当前系统时间String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));// 每秒发送一次带时间戳的数据String dataToSend = DATA + " " + currentTime + "\n";outputStream.write(dataToSend.getBytes(StandardCharsets.UTF_8));outputStream.flush();System.out.println("Sent: " + dataToSend.trim());// 等待1秒Thread.sleep(1000);}}} catch (IOException | InterruptedException e) {System.err.println("Error: " + e.getMessage());}}
}

 

然后flink 作业内容为在wsl服务器(172.29.145.42)中 监听本机(192.168.0.39)端口9999,并实时统计每个单词出现的次数,这里注意关闭windows 防火墙

package org.example.demo01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.util.Collector;/*** Hello world!*/
public class App {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置为流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 基本配置env.setParallelism(1); // 设置并行度为1env.disableOperatorChaining(); // 禁用算子链,使执行更清晰// 禁用检查点,因为是简单的演示程序env.getCheckpointConfig().disableCheckpointing();// 创建周期性的数据源
//        DataStream<String> dataStream = env
//                .socketTextStream("localhost", 9999) // 从socket读取数据
//                .name("source-strings")
//                .setParallelism(1);DataStream<String> dataStream = env.addSource(new SocketTextStreamFunction("192.168.0.39", 9999, "\n", 0)).name("socket-source");// 转换算子 keyBy: 按单词分组并计数dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}).name("flatmap-split-words").setParallelism(1).keyBy(tuple -> tuple.f0) // 按单词分组.sum(1) // 计算每个单词的出现次数.print().name("printer-word-count");// 执行任务env.execute("Flink Streaming Java API Hello");}
}

注意pom 需要加入flink的打包插件:

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>org.example.demo01.App</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>

通过maven编译,打包后,我们把jar包通过web ui上传到flink 服务端:

点击我们上传的jar,进入提交项:

提交了后作业会自动启动:


作业的print输出可以在taskmanagers中查看:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/93380.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/93380.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL锁的分类

MySQL锁可以按照多个维度进行分类&#xff0c;下面我用最清晰的方式为你梳理所有分类方式&#xff1a;一、按锁的粒度分类&#xff08;最常用分类&#xff09;锁类型作用范围特点适用引擎示例场景表级锁整张表开销小、加锁快&#xff0c;并发度低MyISAM, MEMORY数据迁移、全表统…

电脑上搭建HTTP服务器在局域网内其它客户端无法访问的解决方案

在电脑上开发一套HTTP服务器的程序在调试时&#xff0c;在本机内访问正常&#xff0c;但是在本机外访问就不正常&#xff0c;外部客户端无法访问或无法连接到本机的服务器的问题&#xff0c;这可能涉及网络配置、防火墙、端口转发或服务绑定等问题&#xff0c;在这里提供了解决…

双指针和codetop2(最短路问题BFS)

双指针和codetop21.双指针1.[复写0](https://leetcode.cn/problems/duplicate-zeros/)2.动态规划1.[珠宝的最高价值](https://leetcode.cn/problems/li-wu-de-zui-da-jie-zhi-lcof/description/)2.[解码方法](https://leetcode.cn/problems/decode-ways/)3.[下降路径最小和](ht…

基于K邻近算法(KNN)的数据回归预测模型

一、作品详细简介 1.1附件文件夹程序代码截图 全部完整源代码&#xff0c;请在个人首页置顶文章查看&#xff1a; 学行库小秘_CSDN博客https://blog.csdn.net/weixin_47760707?spm1000.2115.3001.5343 1.2各文件夹说明 1.2.1 main.m主函数文件 该MATLAB代码实现了一个基于…

【123页PPT】化工行业数字化解决方案(附下载方式)

篇幅所限&#xff0c;本文只提供部分资料内容&#xff0c;完整资料请看下面链接 https://download.csdn.net/download/2501_92808859/91654005 资料解读&#xff1a;【123页PPT】化工行业数字化解决方案 详细资料请看本解读文章的最后内容。化工行业作为国民经济的重要支柱之…

c++--文件头注释/doxygen

文件头注释 开源项目&#xff1a; /*** file robot_base.cpp* author Mr.Wu* date 2025-05-28* version 1.0.0* brief Robot basic drive to communicate with controller** copyright Copyright (c) 2025 google.** Licensed under the Apache License, Version 2.…

【教程】笔记本安装FnOS设置合盖息屏不休眠

重装FnOS好几次了&#xff0c;合盖后屏幕关闭但不休眠的问题每次都要网上找参差不齐的教程&#xff0c;麻烦死了&#xff0c;索性记录一下方便以后复制粘贴。 使用root登录 sudo -i修改系统配置文件编辑logind.conf文件&#xff1a; 打开终端&#xff0c;输入以下命令以编辑log…

深入解析 Monkey OCR:本地化、多语言文本识别的利器与实践指南

在信息爆炸的时代&#xff0c;从图片、扫描文档中高效提取结构化文本的需求日益迫切。OCR&#xff08;光学字符识别&#xff09;技术成为解决这一问题的核心工具。尽管市面上有 Abbyy FineReader、Adobe Acrobat 等商业巨头&#xff0c;以及 Tesseract、PaddleOCR 等开源方案&a…

动态规划法 - 53. 最大子数组和

什么是动态规划法&#xff1f; 简单说&#xff0c;动态规划&#xff08;Dynamic Programming&#xff0c;简称 DP&#xff09; 是一种**「把复杂问题拆解成小问题&#xff0c;通过解决小问题来解决大问题」**的方法。 核心思路有两个&#xff1a; 1.拆分问题&#xff1a;把原问…

STM32CUBEMX配置stm32工程

1.新建工程2.选择芯片3.配置各种片上外设和时钟4.创建工程5.根据文件内容进行修改工程注意&#xff1a;最好根据工程规范来做&#xff0c;因为有时我们需要更改配置并重新生成&#xff0c;如果不按规范来会导致部分代码会被系统清除&#xff0c;在工程中中有很多成对的BEGIN和E…

Day07 缓存商品 购物车

缓存菜品问题说明用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大。结果&#xff1a;系统响应慢&#xff0c;用户体验差实现思路通过 Redis 来缓存菜品数据&#xff0c;减少数据库查询操作。缓存逻辑分…

Jenkins(集群与流水线配置)

Jenkins&#xff08;集群与流水线配置&#xff09; Jenkins集群 集群化构建可以提升构建效率&#xff0c;也可以并发在多台机器上执行构建。 安装前提&#xff1a;内存至少512MB、Java 17 以上、Maven环境、Git环境 配置集群步骤 配置节点菜单新建节点查看节点配置状态 新建完节…

深入剖析ROS参数服务器通信机制 ——共享全局数据的“云端仓库”实现原理

​1. 核心概念&#xff1a;分布式数据共享容器​ ​定位​&#xff1a;ROS参数服务器&#xff08;Parameter Server&#xff09;是ROS架构中的全局共享存储系统&#xff0c;相当于机器人的“云端仓库”。 ​作用​&#xff1a; 存储多节点共享的静态配置参数&#xff08;如机器…

21.AlexNet

虽然LeNet在手写数字识别上取得了不错的结果&#xff0c;但是他在对于更大的数据集效果就十分有限。 一方面&#xff0c;对于更大尺寸的图像效果有限 另一方面&#xff0c;对于更多分类的任务效果有限 自LeNet后的十几年&#xff0c;计算机视觉领域步入寒冬&#xff0c;神经网络…

Shell脚本-条件判断相关参数

一、前言在 Shell 脚本编程中&#xff0c;条件判断 是实现流程控制的核心机制之一。无论是判断文件是否存在、字符串是否相等&#xff0c;还是数值大小比较&#xff0c;都离不开条件判断语句。本文将带你全面掌握 Shell 脚本中与条件判断相关的参数和语法&#xff0c;包括&…

何为“低空经济”?

低空经济&#xff08;Low-Altitude Economy&#xff09;是指以1000米以下空域&#xff08;部分场景可延伸至3000米&#xff09;为核心&#xff0c;以无人机&#xff08;UAV&#xff09;、电动垂直起降飞行器&#xff08;eVTOL&#xff09;、直升机、通航飞机等航空器为载体&…

线性代数 | 直观理解一些概念

注&#xff1a;本文为 “线性代数 直观理解概念” 相关合辑。 英文引文&#xff0c;机翻未校。 中文引文&#xff0c;略作重排。 如有内容异常&#xff0c;请看原文。 直观理解线性代数的一些概念 2015-03-06 Updated: 2015-05-09 本文介绍矩阵的一些相关概念的直观理解&…

Spring AI 集成阿里云百炼平台

Spring AI 集成阿里云百炼平台 创建API key 在阿里云百炼平台创建API key设置系统变量。阿里云百炼 api key 创建 API 参考 官方API地址&#xff1a;https://bailian.console.aliyun.com &#xff08;1&#xff09;在阿里云百炼控制台&#xff0c;选择API参考菜单。 API…

Codeforces Round 859 (Div. 4) A - D + F - G2 题解

Codeforces Round 859 (Div. 4) A - D F - G2 题解A. Plus or Minus&#xff08;800 分难度&#xff09; 思路&#xff1a; 直接 if - else 判断。 参考代码&#xff1a; #include<bits/stdc.h> using namespace std; void solve(){int a, b, c;cin >> a >&g…

【Java web】Servlet 详解

一、什么是 Servlet&#xff1f;—— 你不知道的 "网页服务员"想象你走进一家网红书店&#xff08;比如 "在线 Java 书店"&#xff09;&#xff0c;想买一本《Java 编程思想》。你告诉前台服务员你的需求&#xff0c;服务员去仓库找书、包装、收款&#xf…