Flink中Kafka连接器的基本应用

文章目录

    • 前言
    • Kafka连接器基础案例演示
      • 前置说明和环境准备步骤
      • Kafka连接器基本配置
      • 关联数据源
      • 映射转换
      • 案例效果演示
    • 基于Kafka连接器同步数据到MySQL
      • 案例说明
      • 前置准备
      • Kafka连接器消费位点调整
      • 映射转换与数据投递
      • MysqlSlink持久化收集器数据
      • 最终效果演示
    • 小结
    • 参考

前言

本文将基于内置kafka连接器演示如何使用kafka内置流收集器的api完成Kafka数据的采集,同时我们也会给出一个收集Kafka数据流数据保存到MySQL的示例,希望对你有帮助。

Kafka连接器基础案例演示

前置说明和环境准备步骤

本案例将基于Kafka投递的单词(用逗号分隔),通过flink完成抽取,切割为独立单词,并完成词频统计,例如我们输入hello,world,最终控制台就会输出hello,1world,1

在正式演示之前,笔者介绍一些flink的使用版本:

<flink.version>1.16.0</flink.version>

对应还有下面这些依赖分别用于:

  1. 使用Kafka连接器
  2. 使用hutool的jdbc连接器
  3. MySQL驱动包
 <!-- CSV Format for Kafka (因为你的配置中用了 'format' = 'csv') --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- JDBC Connector (用于你的 spend_report 表写入 MySQL) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version> <!-- 推荐使用 8.0.x 版本 --></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency>

完成这些后我们将Kafka等相关环境准备好就可以着手编码工作了。

Kafka连接器基本配置

首先我们基于StreamExecutionEnvironment 初始化环境构建配置:

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后我们就可以基于内置的KafkaSource的建造者模式完成如Kafka连接器的构建:

  1. setBootstrapServers设置Kafka地址为broker字符串配置的ip和端口号
  2. setTopics设置消费的主题为input-topic
  3. setGroupId当前kafka消费者组为my-group
  4. setStartingOffsets设置为从最早偏移量开始消费
  5. setValueOnlyDeserializer设置收到Kafka数据时直接反序列化为字符串

对应的代码如下所示:

	//基于建造者模式完成Kafka连接器的配置KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers)//设置Kafka server端地址.setTopics("input-topic") //指定消费的Topic为input-topic.setGroupId("my-group")//设置消费组ID为my-group.setStartingOffsets(OffsetsInitializer.earliest())//设置从Kafka的最开始位置开始消费.setValueOnlyDeserializer(new SimpleStringSchema())// 设置数据直接反序列化为字符串.build();

这里需要补充一下关于Kafka消费位点的设置,flink已经内置了如下几种消费位点的设置,对应的代码配置示例如下,读者可参阅并进行配置:

KafkaSource.builder()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/82457.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode 刷题记录 11 —— 二叉树第二弹

本系列为笔者的 Leetcode 刷题记录&#xff0c;顺序为 Hot 100 题官方顺序&#xff0c;根据标签命名&#xff0c;记录笔者总结的做题思路&#xff0c;附部分代码解释和疑问解答&#xff0c;01~07为C语言&#xff0c;08及以后为Java语言。 01 二叉树的层序遍历 /*** Definition…

【R语言科研绘图】

R语言在绘制SCI期刊图像时具有显著优势&#xff0c;以下从功能、灵活性和学术适配性三个方面分析其适用性&#xff1a; 数据可视化库丰富 R语言拥有ggplot2、lattice、ggpubr等专业绘图包&#xff0c;支持生成符合SCI期刊要求的高分辨率图像&#xff08;如TIFF/PDF格式&#…

【Node.js】Web开发框架

个人主页&#xff1a;Guiat 归属专栏&#xff1a;node.js 文章目录 1. Node.js Web框架概述1.1 Web框架的作用1.2 Node.js主要Web框架生态1.3 框架选择考虑因素 2. Express.js2.1 Express.js概述2.2 基本用法2.2.1 安装Express2.2.2 创建基本服务器 2.3 路由2.4 中间件2.5 请求…

PDF 转 JPG 图片小工具:CodeBuddy 助力解决转换痛点

本文所使用的 CodeBuddy 免费下载链接&#xff1a;腾讯云代码助手 CodeBuddy - AI 时代的智能编程伙伴 前言 在数字化办公与内容创作的浪潮中&#xff0c;将 PDF 文件转换为 JPG 图片格式的需求日益频繁。无论是学术文献中的图表提取&#xff0c;还是宣传资料的视觉化呈现&am…

Linux 文件系统层次结构

Linux 的文件系统遵循 Filesystem Hierarchy Standard (FHS) 标准&#xff0c;其目录结构是层次化的&#xff0c;每个目录都有明确的用途。以下是 Linux 中部分目录的作用解析&#xff1a; 1. 根目录 / 作用&#xff1a;根目录是整个文件系统的顶层目录&#xff0c;所有其他目…

密码学标准(Cryptography Standards)介绍

密码学标准(Cryptography Standards)是为确保信息安全传输、存储和处理而制定的一系列技术规范和协议,广泛应用于通信、金融、互联网等领域。以下从分类、主流标准、应用场景和发展趋势四个方面进行详细介绍: 一、密码学标准的分类 密码学标准可根据技术原理和应用场景分…

ubuntu 22.04安装和使用docker介绍

docker安装和使用 准备环境常见的docker操作linux系统常用的配置卸载docker 准备环境 本机环境&#xff1a; Linux yz-MS-7E06 6.8.0-59-generic #61~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue Apr 15 17:03:15 UTC 2 x86_64 x86_64 x86_64 GNU/Linux安装依赖软件&#xff1a;…

obsidian 中的查找和替换插件,支持正则

最近用着 obsidian 时&#xff0c;发现想要在当前文档中 查找和替换 内容时&#xff0c;没有自动查找和替换的功能&#xff0c;去插件市场查找也没有发现好用的插件&#xff0c;那就自己写一个吧。 全程用的 AI 来写的&#xff0c;当然&#xff0c;我对 JS/CSS/TypeScript 等没…

针对vue项目的webpack优化攻略

一、开发阶段优化 1. 热更新加速&#xff08;HMR&#xff09; // vue.config.js module.exports {devServer: {hot: true, // 开启热更新injectClient: true, // 自动注入HMR客户端watchOptions: {ignored: /node_modules/, // 忽略node_modules变化aggregateTimeout: 300…

BTC官网关注巨鲸12亿美元平仓,XBIT去中心化交易平台表现稳定

在全球加密货币市场波动加剧的背景下&#xff0c;2025年5月25日传出重磅消息。据今日最新国际报道&#xff0c;知名巨鲸James Wynn完全平仓价值12亿美元的BTC多头仓位&#xff0c;整体盈利约845万美元&#xff0c;此举引发市场广泛关注。与此同时&#xff0c;收益型稳定币市场迎…

在WPF中添加动画背景

在WPF中添加动画背景 在WPF中创建动画背景可以大大增强应用程序的视觉效果。以下是几种实现动画背景的方法&#xff1a; 方法1&#xff1a;使用动画ImageBrush&#xff08;图片轮播&#xff09; <Window x:Class"AnimatedBackground.MainWindow"xmlns"htt…

单点击登录sso实现

一、单点登录&#xff08;SSO&#xff09;是什么&#xff1f; 核心定义 单点登录&#xff08;Single Sign-On&#xff0c;SSO&#xff09;是一种身份认证解决方案&#xff0c;允许用户通过一次登录访问多个相互信任的应用系统。其核心逻辑是统一认证中心与分布式会话管理&…

JavaWebsocket-demo

Websocket客户端 pom依赖 <dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.4.0</version></dependency>客户端代码片段 Component Slf4j public class PositionAlarmL…

Java Collection(集合) 接口

Date: 2025-05-21 20:21:32 author: lijianzhan Java 集合框架提供了一组接口和类&#xff0c;以实现各种数据结构和算法。 以下是关于 Java 集合的核心内容说明&#xff1a; /*** Java Collection Framework 说明&#xff1a;** 在 Java 中&#xff0c;集合&#xff08;Collec…

让MySQL更快:EXPLAIN语句详尽解析

前言 在数据库性能调优中&#xff0c;SQL 查询的执行效率是影响系统整体性能的关键因素之一。MySQL 提供了强大的工具——EXPLAIN 语句&#xff0c;帮助开发者和数据库管理员深入分析查询的执行计划&#xff0c;从而发现潜在的性能瓶颈并进行针对性优化。 EXPLAIN 语句能够模…

Java基础 Day20

一、HashSet 集合类 1、简介 HashSet 集合底层采取哈希表存储数据 底层是HashMap 不能使存取有序 JDK8之前的哈希表是数组和链表&#xff0c;头插法 JDK8之后的哈希表是数组、链表和红黑树&#xff0c;尾插法 2、存储元素 &#xff08;1&#xff09;如果要保证元素的唯…

2505C++,32位转64位

原文 假设有个想要将一个32位值传递给一个带64位值的函数的函数.你不关心高32位的内容,因为该值是传递给回调函数的直通值,回调函数会把它截断为32位值. 因此,你都担心编译器一般生成的将32位值扩展到64位值的那条指令的性能影响. 我怀疑这条指令不是程序中的性能瓶颈. 我想出…

光伏电站及时巡检:守护清洁能源的“生命线”

在“双碳”目标驱动下&#xff0c;光伏电站作为清洁能源的主力军&#xff0c;正以年均20%以上的装机增速重塑全球能源格局。然而&#xff0c;这些遍布荒漠、屋顶的“光伏矩阵”并非一劳永逸的能源提款机&#xff0c;其稳定运行高度依赖精细化的巡检维护。山东枣庄触电事故、衢州…

C++初阶-list的使用2

目录 1.std::list::splice的使用 2.std::list::remove和std::list::remove_if的使用 2.1remove_if函数的简单介绍 基本用法 函数原型 使用函数对象作为谓词 使用普通函数作为谓词 注意事项 复杂对象示例 2.2remove与remove_if的简单使用 3.std::list::unique的使用 …

OpenHarmony平台驱动使用(一),ADC

OpenHarmony平台驱动使用&#xff08;一&#xff09; ADC 概述 功能简介 ADC&#xff08;Analog to Digital Converter&#xff09;&#xff0c;即模拟-数字转换器&#xff0c;可将模拟信号转换成对应的数字信号&#xff0c;便于存储与计算等操作。除电源线和地线之外&#…