使用pyflink进行kafka实时数据消费

目录

背景

代码demo

 踩坑记录

1、kafka连接器,kafka客户端jar包找不到

2、java模块系统访问限制

3、执行demo任务,一直报错连接kafka topic超时

总结 


背景

        实际项目中经常遇到source是kafka,需要实时消费kafka某个topic中的数据,在此写个demo学习了解。

        环境:本地windows10,python版本3.8.5,pyflink版本1.14.2,kafka版本2.12_2.0.1

代码demo

        代码很简单,在本地启动一个flink任务,消费对端一个kafka的topic,从中取出数据并进行打印,代码如下:

import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema# 设置执行环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 设置并行度
env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")# Kafka 消费者配置
kafka_topic = 'zhubao'
kafka_bootstrap_servers = 'xxx:9092'  # Kafka 服务器地址和端口
consumer = FlinkKafkaConsumer(kafka_topic, SimpleStringSchema(), properties={'bootstrap.servers': kafka_bootstrap_servers,'group.id': 'zhubao-group','auto.offset.reset': 'earliest','session.timeout.ms': '120000','request.timeout.ms': '120000','max.poll.interval.ms': '600000'
})# 将 Kafka 数据源添加到执行环境中
data_stream = env.add_source(consumer)# 数据处理逻辑,例如打印数据
data_stream.print()# 启动执行环境
env.execute("Flink Kafka Consumer Example")

使用kafka客户端生产一些数据,在本地则能实时消费到数据 

在本地终端执行上述flink demo任务,看到相关消费结果 

 踩坑记录

1、kafka连接器,kafka客户端jar包找不到

        上述demo代码中,开头需要在程序运行环境中引入两个jar包,分别是kafka连接器和kafka客户端

env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")

这两个包默认在本地是没有的,需要进行额外的下载,下载地址:

Flink Kafka连接器‌:

  • 下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka_2.12/

Kafka客户端‌:

  • 下载地址:https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/

注意选择对应的版本,我的本地环境是pyflink1.14.2,因此下载的jiar包是对应版本,客户端是3.6.1版本

此外,在add_jars上,需要使用file:///方式,并且最好是绝对路径,否则会报jar包找不到等类似错误

2、java模块系统访问限制

py4j.protocol.Py4JJavaError: An error occurred while calling o10.addSource.
: java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @32cf48b7

        该报错是因为在windows环境Java模块系统(JPMS)的访问限制导致,需通过JVM参数解除模块封装限制。

        解决方法:在程序启动开头,增加JVM启动参数,注意格式,如下:

import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"

3、执行demo任务,一直报错连接kafka topic超时

Traceback (most recent call last):File ".\kafkademo.py", line 31, in <module>env.execute("Flink Kafka Consumer Example")File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\datastream\stream_execution_environment.py", line 691, in executereturn JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\java_gateway.py", line 1285, in __call__return_value = get_return_value(File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in decoreturn f(*a, **kw)File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_valueraise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed....at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:537)at akka.actor.Actor.aroundReceive$(Actor.scala:535)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)at akka.actor.ActorCell.invoke(ActorCell.scala:548)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)at akka.dispatch.Mailbox.run(Mailbox.scala:231)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)... 5 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined

        报错信息很长,重点关注最后的

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined

        解决方法:尝试对程序增加各种延长超时时间,但都没有啥效果,最后排查寻找发现,是由于本地hosts未配置kafka的域名映射,需要在本地hosts里增加对应映射关系

添加后,问题得以解决

总结 

        该demo很简单,完成在windows环境上使用pyflink进行连接kafka并进行数据消费,主要在windows上并且使用pyflink做开发,遇到一些奇奇怪怪的问题,在此做个记录,方便后续查阅。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/87123.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/87123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件测试理论框架与发展:分类、原则与质量保障策略

第一章 一、计算机软件的发展分类 早期软件开发的特点&#xff1a; 软件规模小、复杂程度低、开发过程不规范 测试的情况&#xff1a; 测试等同于调试 目的纠正软件的已经知道的故障 投入少&#xff0c;介入晚 成为一种发现软件的活动&#xff08;1957&#xff09; 测试不等于…

未知威胁攻击原理和架构

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 未知威胁&#xff08;Unknown Threats&#xff09;指利用零日漏洞、合法工具滥用、高级逃逸技术等**绕过传统特征检测**的攻击&#xff0c;其核心在于**动态对抗防御体系的认知盲区**。以下从攻击原理、…

基于Netty-WebSocket构建高性能实时通信服务

引言&#xff1a;WebSocket在现代应用中的重要性 在当今实时交互应用盛行的时代&#xff0c;WebSocket协议已成为实现双向通信的核心技术。相比传统的HTTP轮询&#xff0c;WebSocket提供了&#xff1a; 真正的全双工通信极低的延迟&#xff08;毫秒级&#xff09;高效的连接管…

咸虾米项目总结1--const用法

在 UniApp&#xff08;或 Vue 3&#xff09;中&#xff0c;声明一个空对象可使用下面这2种写法&#xff1a; // 写法1 const a ref(null);// 写法2 const a ref({}); 在UniApp中&#xff0c;const a ref()用法概述&#xff1a; 用途&#xff1a; 创建一个响应式引用&#x…

在mac下手动编译迁移的android版webrtc组件

我原先使用的android版webrtc是在linux下编译的&#xff0c;现在因为某些原因需要把整个库迁移到mac下编译。 把代码迁移完后&#xff0c;正常是需要通过gclient sync 重新构建编译环境&#xff0c;但是由于网络限制等方面原因&#xff0c;会导致完成的比较慢。 在摸索一阵后…

Linux 命令:mkdir

Linux mkdir 命令详细教程 一、mkdir 命令的基本功能 mkdir&#xff08;Make Directory&#xff09;是 Linux 系统中用于创建新目录&#xff08;文件夹&#xff09;的基础命令。它支持一次性创建单个或多个目录&#xff0c;以及递归创建多层目录结构&#xff0c;是文件系统操…

Django 数据迁移全解析:makemigrations migrate 常见错误与解决方案

1. 迁移机制与底层原理 在 Django 中&#xff0c;ORM&#xff08;Object-Relational Mapping&#xff09;是连接模型&#xff08;Model&#xff09;和数据库结构的桥梁。Django 鼓励开发者通过编写 Python 类&#xff08;模型&#xff09;来定义业务数据结构&#xff0c;而不是…

SuperGlue:使用图神经网络学习特征匹配

摘要 本文提出了 SuperGlue&#xff0c;一种神经网络&#xff0c;用于通过联合寻找对应关系并排除不可匹配点来匹配两组局部特征。匹配结果通过求解一个可微的最优传输问题来估计&#xff0c;该问题的代价由一个图神经网络预测。我们引入了一种基于注意力的灵活上下文聚合机制…

ssh -T git@github.com失败后解决方案

这个错误表示你的 SSH 连接无法到达 GitHub 服务器。以下是详细解决方案&#xff0c;按照优先级排序&#xff1a; 首选解决方案&#xff1a;使用 SSH over HTTPS&#xff08;端口 443&#xff09; 这是最有效的解决方案&#xff0c;因为许多网络会阻止 22 端口&#xff1a; …

从苹果事件看 ARM PC市场的未来走向

最近&#xff0c;苹果宣布部分搭载 Intel 处理器的 Mac 不再支持最新的 macOS 系统更新&#xff0c;这一消息犹如一颗石子投入平静湖面&#xff0c;激起层层涟漪。它不仅让 Intel 芯片在 Mac 产品线上彻底成为历史&#xff0c;也促使我们重新审视 PC 行业的发展脉络&#xff0c…

vue + element ui 实现超出宽度展示..,鼠标移入显示完整内容

vue element ui 实现超出宽度展示…&#xff0c;鼠标移入显示完整内容 代码理念&#xff1a; 当高度大于对应行数的高度 则说明需要展示"…" 子组件 <template><div class"tooltip"><div ref"tooltipRef" :class"[tooltip…

HarmonyOSNext应用无响应全解析:从机制到实战的卡死问题排查

HarmonyOSNext应用无响应全解析&#xff1a;从机制到实战的卡死问题排查 ##Harmony OS Next ##Ark Ts ##教育 本文适用于教育科普行业进行学习&#xff0c;有错误之处请指出我会修改。 喂喂喂&#xff01;应用卡成PPT了&#xff1f;点啥都没反应&#xff1f;别慌&#xff01…

git 迁移之获取原库所有分支

以下是一个安全的 Bash 脚本&#xff0c;用于将远程 Git 仓库的所有分支检出到本地&#xff08;自动跳过已存在的分支&#xff09;&#xff1a; #!/bin/bash# 获取所有远程分支&#xff08;排除 HEAD&#xff09; remote_branches$(git branch -r | grep -v HEAD\|->)# 循环…

设计模式 | 适配器模式

适配器模式&#xff08;Adapter Pattern&#xff09; 是结构型设计模式中的连接器大师&#xff0c;它允许不兼容接口的类能够协同工作。本文将深入探索适配器模式的核心思想、实现技巧以及在C中的高效实践&#xff0c;解决现实开发中的接口兼容性问题。 为什么需要适配器模式 …

RTL 级机器人电机控制器的 FPGA 设计

借助Verilog&#xff0c;在FPGA中实现了带编码器的两台电机的电机控制系统的RTL级设计。 介绍 借助硬件描述语言 (HDL) Verilog 和 AMD Vivado 设计套件&#xff0c;在 AMD Spartan-7 FPGA 中实现带编码器的两个电机的控制器系统的 RTL 设计。 在这个项目中&#xff0c;使用了搭…

4_Flink CEP

Flink CEP 1、何为CEP&#xff1f; CEP&#xff0c;全称为复杂事件处理&#xff08;Complex Event Processing&#xff09;&#xff0c;是一种用于实时监测和分析数据流的技术。 CEP详细讲解&#xff1a; CEP是基于动态环境的事件流的分析技术&#xff0c;事件是状态变化&am…

容器基础知识2-K8s 和 Docker 的关系与管理逻辑详解

K8s 和 Docker 的关系与管理逻辑详解 一、先搞懂&#xff1a;Docker 和 K8s 分别是做什么的&#xff1f; Docker&#xff08;容器工具&#xff09;&#xff1a;好比「集装箱工厂」&#xff0c;负责把应用和依赖打包成标准化容器&#xff08;类似集装箱&#xff09;&#xff0…

QT MaintenanceTool 登录无法找到 QtAccount 凭据

亲测有效&#xff1a;QT6 Maintenance Tool 登录问题_qt6 maintenancetool-CSDN博客 将ini这个配置文件移出文件夹后&#xff0c;在切换自己账户登录即可

华为云Flexus+DeepSeek征文|利用华为云一键部署 Dify 平台并接入 DeepSeek 大模型,构建长篇文章生成助手

目录 前言 1 华为云一键部署 Dify 平台 1.1 华为云 Dify 平台介绍 1.2 部署过程介绍 1.3 登录 Dify 平台 2 接入华为云 ModelArts Studio 中的 DeepSeek 大模型 3 构建长篇文章生成助手 3.1 简要介绍长篇文章生成助手 3.2 开始节点 3.3 生成标题和大纲&#xff08;LL…

js的一些基础概念总结

1.变量声明 首先js变量声明有三种&#xff0c;var&#xff0c;const&#xff0c;let&#xff0c;这三种变量声明中我们第一优先使用const&#xff0c;需要改变这个值的时候我们用ley&#xff0c;var是尽量不去使用。 那么我们现在来总结一下三种声明变量的区别。首先是var let …