Flink数据流高效写入MySQL实战

这段代码展示了如何使用 Apache Flink 将数据流写入 MySQL 数据库,并使用了 JdbcSink 来实现自定义的 Sink 逻辑。以下是对代码的详细解析和说明:

代码结构

  • 包声明package sink
    定义了代码所在的包。

  • 导入依赖
    导入了必要的 Flink 和 JDBC 相关类库,包括:

    • java.sql.PreparedStatement:用于执行 SQL 语句。
    • org.apache.flink.connector.jdbc:Flink 的 JDBC 连接器相关类。
    • org.apache.flink.streaming.api.scala._:Flink 流处理 API。
  • sinkToMysql 对象
    主程序入口,包含 Flink 流处理逻辑和 MySQL Sink 的配置。

package sinkimport java.sql.PreparedStatementimport org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: sink* @author: 赵嘉盟-HONOR* @data: 2023-11-20 15:23* @DESCRIPTION**/
object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink( JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1,u.user)t.setString(2,u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()))env.execute("sinkRedis")}
}

基于scala使用flink将读取到的数据写入到Mysql

  1. data.addSink( JdbcSink.sink(...) ):这行代码将一个JdbcSink添加到Flink的数据流中,用于将数据写入到数据库中。

  2. "insert into clicks values(?,?)":这是SQL语句,表示将用户和URL插入到名为clicks的表中。

  3. new JdbcStatementBuilder[Event] {...}:这是一个匿名内部类,用于构建PreparedStatement对象。在这个类中,我们重写了accept方法,该方法接受一个PreparedStatement对象和一个Event对象,然后将Event对象的user和url属性设置到PreparedStatement对象中。

  4. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()...:这是一个JdbcConnectionOptionsBuilder对象,用于构建数据库连接选项。在这个对象中,我们设置了数据库的URL、驱动名称、用户名和密码。

  5. .build():这是JdbcConnectionOptionsBuilder对象的一个方法,用于构建JdbcConnectionOptions对象。

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment
  • 创建 Flink 流处理环境 StreamExecutionEnvironment
(2) 定义数据流
val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L)
)
  • 使用 fromElements 方法生成一个包含 4 个 Event 对象的流。
(3) 自定义 MySQL Sink
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()
))
  • 使用 JdbcSink.sink 方法将数据写入 MySQL:
    • SQL 语句insert into clicks values(?,?),插入 user 和 url 字段。
    • JdbcStatementBuilder:用于将 Event 对象映射到 SQL 语句的参数。
    • JdbcConnectionOptions:配置 MySQL 连接信息,包括 URL、驱动名称、用户名和密码。
(4) 执行任务
env.execute("sinkRedis")
  • 启动 Flink 流处理任务,任务名称为 sinkRedis

优化版本

异常处理
  • 在 Sink 中添加异常处理逻辑,避免程序因 MySQL 写入失败而崩溃:
    data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build()
    ))
批量写入
  • 如果需要提高写入性能,可以启用批量写入功能:
    data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 设置批量大小.build()
    ))

优化后的代码

以下是优化后的完整代码:

package sinkimport java.sql.PreparedStatement
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 启用批量写入.build()))env.execute("sinkToMysql")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/89299.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/89299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MATLAB下载安装教程(附安装包)2025最新版(MATLAB R2024b)

文章目录前言一、MATLAB R2024b下载二、MATLAB下载安装教程前言 MATLAB R2024b 的推出,进一步提升了其在工程实践中的实用性和专业性。它不仅提供了更多针对特定工程领域的解决方案,还在性能和兼容性方面进行了显著改进。 本教程将一步一步引导完成 MA…

Linux 基础命令学习,立即上手Linux操作

Linux 基础命令学习本文挑选最常用、最容易上手的 Linux 命令。每条都附带一句话说明 真实示例,直接复制即可练习,零基础也能跟得上。1  先掌握 目录导航:pwd / ls / cdpwd – 显示当前所在目录 pwd # 输出示例 /home/yournamels ‑a…

Android构建流程与Transform任务

1. 完整构建流程概览 1.1 主要构建阶段 预构建阶段 → 代码生成阶段 → 资源处理阶段 → 编译阶段 → Transform阶段 → 打包阶段1.2 详细任务执行顺序 ┌─────────────────────────────────────────────────────────…

CKS认证 | Day6 监控、审计和运行时安全 sysdig、falco、审计日志

一、分析容器系统调用:Sysdig Sysdig:定位是系统监控、分析和排障的工具,在 linux 平台上,已有很多这方面的工具 如tcpdump、htop、iftop、lsof、netstat,它们都能用来分析 linux 系统的运行情况,而且还有…

Redis:持久化配置深度解析与实践指南

🧠 1、简述 Redis 是一款基于内存的高性能键值数据库,为了防止数据丢失,Redis 提供了两种主要的持久化机制:RDB(快照)和 AOF(追加日志)。本文将从原理到配置,再到实际项目…

共创 Rust 十年辉煌时刻:RustChinaConf 2025 赞助与演讲征集正式启动

🚀 共创 Rust 十年辉煌时刻:RustChinaConf 2025 赞助与演讲征集正式启动2025年,是 Rust 编程语言诞生十周年的里程碑时刻。在这个具有历史意义的节点,RustChinaConf 2025 携手 RustGlobal 首次登陆中国,联合 GOSIM HAN…

EMS4100芯祥科技USB3.1高速模拟开关芯片规格介绍

EMS4100一款适用于USB Type-C应用的二通道差分2:1/1:2 USB 3.1高速双向被动开关。该器件支持USB 3.1 Gen 1和Gen 2数据速率,具有高带宽、低串扰、宽供电电压范围等特点。EMS4100芯片内部框架:EMS4100主要特性:2-独立频道1:2/2:1 M…

HTML 常用语义标签与常见搭配详解

一、什么是语义标签&#xff1f; 语义标签是 HTML5 引入的一组具有特定含义的标签&#xff0c;用于描述页面中不同部分的内容类型&#xff0c;如页眉、导航栏、主内容区域、侧边栏、页脚等。相比传统的 <div> 和 <span>&#xff0c;语义标签更具表达力和结构化。 …

迁移学习的概念和案例

迁移学习概念 预训练模型 定义: 简单来说别人训练好的模型。一般预训练模型具备复杂的网络模型结构&#xff1b;一般是在大量的语料下训练完成的。 预训练语言模型的类别&#xff1a; 现在我们接触到的预训练语言模型&#xff0c;基本上都是基于transformer这个模型迭代而来…

DAOS系统架构-RDB

1. 概述 基于Raft共识算法和强大的领导地位策略&#xff0c;pool service和container service可以通过复制其内部的元数据来实现高可用。通过这种方法实现具有副本能力的服务可以容忍少数副本中的任何一个出现故障。通过将每个服务的副本分布在容灾域中&#xff0c;pool servic…

深入GPU硬件架构及运行机制

转自深入GPU硬件架构及运行机制 - 0向往0 - 博客园&#xff0c;基本上是其理解。 一、GPU概述 1.1 GPU是什么&#xff1f; GPU全称是Graphics Processing Unit&#xff0c;图形处理单元。它的功能最初与名字一致&#xff0c;是专门用于绘制图像和处理图元数据的特定芯片&…

数值计算库:Eigen与Boost.Multiprecision全方位解析

在科学计算、工程模拟、机器学习等领域&#xff0c;高效的数值计算能力是构建高性能应用的基石。C作为性能优先的编程语言&#xff0c;拥有众多优秀的数值计算库&#xff0c;其中Eigen和Boost.Multiprecision是两个极具代表性的工具。本文将深入探讨这两个库的核心特性、使用场…

第十八节:第三部分:java高级:反射-获取构造器对象并使用

Class提供的获取类构造器的方法以及获取类构造器的作用代码&#xff1a;掌握获取类的构造器&#xff0c;并对其进行操作 Cat类 package com.itheima.day9_reflect;public class Cat {private String name;private int age;private Cat(String name, int age) {this.name name;…

集中打印和转换Office 批量打印精灵:Word/Excel/PDF 全兼容,效率翻倍

各位办公小能手们&#xff01;你们平时办公的时候&#xff0c;是不是经常要打印一堆文件&#xff0c;烦得要命&#xff1f;别慌&#xff0c;今天我给大家介绍一款超厉害的神器——Office批量打印精灵&#xff01; 软件下载地址安装包 这玩意儿啊&#xff0c;是专门为高效办公设…

docker的搭建

一、安装docker使用以下命令进行安装dockerapt-get install docker.io docker-compose使用以下命令进行查看docker是否开启systemctl status docker由此可见&#xff0c;docker没有打开&#xff0c;进行使用命令打开。systemctl start docker再次查看是否开启。肉眼可见&#x…

数据库管理-第349期 Oracle DB 23.9新特性一览(20250717)

数据库管理349期 2025-07-17数据库管理-第349期 Oracle DB 23.9新特性一览&#xff08;20250717&#xff09;1 JavaScript过程和函数的编译时语法检查2 不再需要JAVASCRIPT上的EXECUTE权限3 GROUP BY ALL4 使用SQL创建并测试UUID5 IVF索引在线重组6 JSON到二元性迁移器&#xf…

将CSDN文章导出为PDF

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处前言在日常学习和技术积累过程中&#xff0c;我们经常会在 CSDN 等技术博客平台上阅读高质量的技术文章。然而&#xff0c;网页阅读…

macOS - Chrome 关闭自动更新

进入 Google 相关资源文件夹 删除 GoogleSoftwareUpdate 文件夹 open ~/Library/Google 部分教程推荐&#xff0c;在 chrome://flags/ 页面设置&#xff0c;但最近没看到 自动更新相关开关。2025-07-13&#xff08;日&#xff09;

Python 模块化编程全解析:模块、包与第三方库管理指南

模块与包 模块化编程是什么&#xff1f;用生活例子秒懂 想象你在搭乐高积木&#xff1a; 每个小积木块都有特定功能&#xff08;比如轮子、窗户、墙壁&#xff09;—— 这就像模块&#xff08;一个.py 文件&#xff0c;封装了函数或类&#xff09;。把相关的积木块装进一个盒…

小白学Python,网络爬虫篇(2)——selenium库

前言 selenium 库是一种用于 Web 应用程序测试的工具&#xff0c;它可以驱动浏览器执行特定操作&#xff0c;自动按照脚本代码做出单击、输入、打开、验证等操作&#xff0c;支持的浏览器包括 IE、Firefox、Safari、Chrome、Opera 等。 与 requests 库不同的是&#xff0c;se…