使用pyflink编写demo并将任务提交到yarn集群

目录

背景

一、pyflink安装

二、编写demo程序

三、提交yarn前准备

 四、提交任务

五、踩坑记录

1、提交任务时客户端出现语法错误

2、提交任务时客户端出现lzma包找不到

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

5、提交任务时taskmanager上报错出现Permission denied

六、总结


背景

        最近项目需要,学习研究使用python来开发flink任务,在此做相关笔记。

一、pyflink安装

        在本地执行如下命令,需要注意的是,pyflink必须要求python版本大于等于3.6,我本地是3.12

创建虚拟环境

python -m venv venv

切换虚拟环境

venv/bin/activate

执行安装命令

pip install apache-flink==1.14

 安装成功后如下

二、编写demo程序

        这里直copy官方教程

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().get_configuration().set_string("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

在本地执行该脚本,输出如下

三、提交yarn前准备

准备一台linux服务器,并装有flink客户端(我使用的版本是flink1.14.2,这里不说客户端如何安装了,下载包解压安装即可)

在服务器上搭建pyflink运行环境,参考第一章节

将demo程序上传到该服务器上

其中env为python虚拟环境目录,py_env.zip为将env使用zip进行压缩的文件

 四、提交任务

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

看到终端打印如下日志

访问yarn集群web管理页面,在running下看到有对应的任务时,即表示任务已经提交到yarn集群

查看任务详情

五、踩坑记录

1、提交任务时客户端出现语法错误

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]File "main.py", line 55ds.print()^
SyntaxError: invalid syntax

解决方法:上述问题排查发现是flink客户端版本差异导致,编写的demo代码,和flink客户端版本要一致,否则会出现一些莫名其妙的问题,统一调整flink版本为一致,包括pyflink,flink客户端等

2、提交任务时客户端出现lzma包找不到

Traceback (most recent call last):File "/home/zhubao/env/lib/python3.7/site-packages/fastavro/read.py", line 2, in <module>from . import _readFile "fastavro/_read.pyx", line 11, in init fastavro._readFile "/home/master/python3/lib/python3.7/lzma.py", line 27, in <module>from _lzma import *
ModuleNotFoundError: No module named '_lzma'

解决方法:该错误表明系统缺少Python的LZMA压缩模块依赖(_lzma),这是Python标准库中处理.xz压缩文件的底层C扩展模块,需要进行安装

# 使用root权限执行如下命令
yum install -y xz-devel python-backports-lzma
# 使用普通用户执行安装lzma包命令
pip install backports.lzma -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn

安装完成后,需要对lzma文件进行修改,找到lzma.py文件,一般在$PYTHON_HOME/lib/python3.7目录下(根据实际版本),主要是加上try except

修改完成后保存退出,重新执行解决该问题

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

Traceback (most recent call last):File "main.py", line 99, in <module>word_count(known_args.input, known_args.output)File "main.py", line 39, in word_countds = env.from_collection(word_count_data)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 958, in from_collectionreturn self._from_collection(collection, type_info)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 981, in _from_collectionj_input_format = PythonTypeUtils.getCollectionInputFormat(File "/home/zhubao/env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1550, in __getattr__"{0}.{1} does not exist in the JVM".format(self._fqn, name))
py4j.protocol.Py4JError: org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM

解决方法:还是版本不匹配导致,请确保pyflink与客户端版本一致

Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: Traceback (most recent call last):File "<string>", line 1, in <module>
ImportError: No module named pyflink

解决方法:在flink配置文件中,加上python执行环境,打开flink-conf.yaml文件,一般在$FLINK_HOME/conf目录下,编辑该文件,在末尾加上python执行环境

python.client.executable: /home/zhubao/env/bin/python

5、提交任务时taskmanager上报错出现Permission denied

Caused by: java.io.IOException: Cannot run program "/home/zhubao/env/bin/python": error=13, Permission denied

解决方法:找了一些方案,但最终通过将整个python执行环境打包提交到yarn上解决,方法如下

将python环境达成zip包

zip -r py_env.zip /home/zhubao/env/

提交任务命令增加指定环境包与执行环境

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

六、总结

        以上是使用pyflink进行flink任务开发,以及将任务提交到yarn集群方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/86844.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/86844.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 3 最基础核心知识详解

Vue3作为现代前端主流框架&#xff0c;是前后端开发者都应当掌握的核心技能。本篇文章将带你了解vue3的基础核心知识&#xff0c;适合学习与复习 一、Vue 3 应用创建 1.1 创建Vue应用的基本步骤 // main.js import { createApp } from vue // 1. 导入createApp函数 import …

Bootstrap 5学习教程,从入门到精通,Bootstrap 5 Flex 布局语法知识点及案例(27)

Bootstrap 5 Flex 布局语法知识点及案例 Bootstrap 5 提供了强大的 Flexbox 工具集&#xff0c;让布局变得更加简单灵活。以下是 Bootstrap 5 Flex 布局的完整知识点和详细案例代码。 一、Flex 布局基础语法 1. 启用 Flex 布局 <div class"d-flex">我是一个…

HarmonyOS 5智能单词应用开发:记忆卡(附:源码

一、应用概述与核心价值 在语言学习过程中&#xff0c;单词记忆是基础也是难点。本文介绍的智能单词记忆卡应用通过创新的交互设计和科学的学习模式&#xff0c;帮助用户高效记忆单词。应用采用ArkUI框架开发&#xff0c;主要特点包括&#xff1a; 双模式学习系统&#xff1a…

LeetCode--38.外观数列

前言&#xff1a;之前我不是说&#xff0c;我后续可能会讲一下递归吗&#xff0c;现在它来了&#xff0c;这道题会用到回溯的方法&#xff0c;并且比较纯粹哦 解题思路&#xff1a; 1.获取信息&#xff1a;&#xff08;下面这些信息差不多是力扣上面的题目信息了&#xff0c;所…

服务器的安装与安全设置

1&#xff1a;安装操作系统 1、创建虚拟机Win49&#xff08;49为序号&#xff09;&#xff0c;并安装Windows Server 2019操作系统 参考配置&#xff1a;安装系统的分区大小为20GB&#xff0c;其余分区暂不划分&#xff0c; 文件系统格式为NTFS&#…

Sensodrive SensoJoint机器人力控关节模组抗振动+Sensodrive力反馈系统精准对接

Sensodrive成立于2003年&#xff0c;起源于德国航空航天中心&#xff08;DLR&#xff09;的LBR项目。公司由一批传感器技术专家创立&#xff0c;专注于高精度工业扭矩传感器的研发。凭借二十余年的技术积累&#xff0c;Sensodrive将DLR轻型机器人扭矩技术引入工业领域&#xff…

【AI实践】Mac一天熟悉AI模型智能体应用(百炼版)

25.6.29增加Gummy 实时/一句话语音识别25.6.28增加Qwen TTS本地音频和实时播报 背景 准备环境 MacOS M1电脑&#xff08;其他M系列芯片也可以&#xff09; 为了方便python的使用环境&#xff0c;使用Miniconda&#xff1a;下载链接&#xff1a;Download Anaconda Distribution…

WEB安全--Java安全--jsp webshell免杀1

1.1、BCEL ClassLoader 介绍&#xff08;仅适用于BCEL 6.0以下&#xff09;&#xff1a; BCEL&#xff08;Apache Commons BCEL™&#xff09;是一个用于分析、创建和操纵Java类文件的工具库&#xff1b;BCEL的类加载器在解析类名时会对ClassName中有$$BCEL$$标识的类做特殊处…

Valkey与Redis评估对比:开源替代方案的技术演进

#作者&#xff1a;朱雷 文章目录 1 概述1.1内存数据结构存储核心特性1.2主流内存数据结构存储设计与适用场景1.3目前主流内存数据结构存储对比 2 Valkey 说明2.1 哨兵架构设计2.2 集群架构设计2.3 valkey 使用企业和业内生态‌ 3 评估指标4 评估结果 1 概述 内存数据结构存储…

华为云Flexus+DeepSeek征文 | 基于华为云ModelArts Studio安装NoteGen AI笔记应用程序

华为云FlexusDeepSeek征文 | 基于华为云ModelArts Studio安装NoteGen AI笔记应用程序 引言一、ModelArts Studio平台介绍华为云ModelArts Studio简介ModelArts Studio主要特点 二、NoteGen介绍NoteGen简介主要特点 三、安装NoteGen工具下载NoteGen软件安装NoteGen工具 四、开通…

BUUCTF在线评测-练习场-WebCTF习题[BJDCTF2020]Easy MD51-flag获取、解析

解题思路 打开靶场&#xff0c;有个提交框&#xff0c;输入后url会出现我们提交的参数password http://a48577ed-9a1c-4751-aba0-ae99f1eb8143.node5.buuoj.cn:81/leveldo4.php?password123 查看源码并没用发现什么猫腻&#xff0c;抓包在响应头发现了猫腻 hint: select * …

面向对象三大特性深度解析:封装、继承与多态

面向对象三大特性深度解析&#xff1a;封装、继承与多态 思维导图概览 #mermaid-svg-v2u0XIzKotjyXYei {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-v2u0XIzKotjyXYei .error-icon{fill:#552222;}#mermaid-svg-v2…

mmap映射物理内存之三invalid cache

目录 流程设计 invalid 命令 内核态invalid 内核态invalid&#xff0c;用户态mmap物理地址 PAN机制 PAN机制历程 硬件支持 ARMv8.1-PAN 特性 Linux 内核的适配 软件模拟 PAN&#xff08;SW PAN&#xff09; 背景 Linux 的实现 总结 前述刷新cache的流程也同样可…

记忆化搜索(dfs+memo)无环有向图

这是一道可以当作板子的极简记忆化搜索 建立a 是邻接表&#xff0c;其中 a[x] 存储从节点 x 出发能到达的所有节点。 b[x] 记录从节点 x 出发的所有边的权重之和。根据数学原理&#xff0c;我们很容易发现&#xff0c;一个根&#xff08;起点&#xff09;的期望&#xff0c;等…

使用AI豆包写一个车辆信息管理页面

记录一个基本的车辆信息管理页面&#xff0c;由豆包撰写完成&#xff0c;只需要微调页面即可。 主要功能是车辆信息的查询、新增、编辑&#xff0c;项目用到了uniapp、vue3、ts、uni-ui、z-paging 页面效果如下&#xff1a; 以上界面均由豆包生成&#xff0c;完成度非常高&am…

《HarmonyOSNext应用防崩指南:30秒定位JS Crash的破案手册》

《HarmonyOSNext应用防崩指南&#xff1a;30秒定位JS Crash的破案手册》 ##Harmony OS Next ##Ark Ts ##教育 本文适用于教育科普行业进行学习&#xff0c;有错误之处请指出我会修改。 &#x1f4a5; 哇哦&#xff01;JS Crash崩溃日志完全解析手册 当你的应用突然闪退时&am…

阅读笔记(3) 单层网络:回归(下)

阅读笔记(3) 单层网络:回归(下) 该笔记是DataWhale组队学习计划&#xff08;共度AI新圣经&#xff1a;深度学习基础与概念&#xff09;的Task03 以下内容为个人理解&#xff0c;可能存在不准确或疏漏之处&#xff0c;请以教材为主。 1. 为什么书上要提到决策理论&#xff1f; …

Mac OS系统每次开机启动后,提示:输入密码来解锁磁盘“Data”,去除提示的解决方法

问题描述&#xff1a; Mac mini外接了一个磁盘&#xff08;EX_Mac&#xff09;为默认使用的系统盘&#xff0c;内置的硬盘&#xff08;Macintosh HD&#xff09;为Mac mini自带的系统盘 外置硬盘系统每次开机都会挂载内置磁盘&#xff0c;同时会提示需要输入密码来解锁磁盘“…

CSS Flex 布局中flex-shrink: 0使用

flex-shrink: 0 是 CSS Flexbox 布局中的一个关键属性&#xff0c;用于禁止弹性项目&#xff08;flex item&#xff09;在容器空间不足时被压缩。以下是详细解释和示例&#xff1a; 核心作用 当容器的可用空间小于所有弹性项目的总宽度&#xff08;或高度&#xff09;时&#…

WHERE 子句中使用子查询:深度解析与最佳实践

&#x1f50d; WHERE 子句中使用子查询&#xff1a;深度解析与最佳实践 在 WHERE 子句中使用子查询是 SQL 的高阶技巧&#xff0c;可实现动态条件过滤。以下是全面指南&#xff0c;涵盖语法、类型、陷阱及优化策略&#xff1a; &#x1f4dc; 一、基础语法结构 SELECT 列 FR…