DataX 框架学习笔记

官方仓库

https://github.com/alibaba/DataX?tab=readme-ov-file

1. 介绍

1.1. 基本介绍:

DadaX 是阿里云 DataWorks 数据集成 的开源版本(异构数据同步、离线数据同步工具 / 平台)。主要抽象为 Reader 和 Writer 插件,管理源数据和目标数据的读和写。

1.2. DataX 3.0 框架(插件式、Reader、writer ):

Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。

  • Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端
  • Framework:Framework 用于连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。目前支持单机多线程的模式完成同步作业任务

1.2.1. 官方例子:

  • 单个 Job 可以分为多个 task,一个 TaskGroup 可以按设置的并发度执行任务。
  • 核心模块介绍:
  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
  3. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  4. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  5. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • DataX调度流程:

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
  3. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  • 数据来源多、封装成网页平台(数据平台)优先选择 DataX (单进程多线程、日志完善)

1.3. 安装使用

  1. 由于 datax 依赖于 java 1.8 及 以上,和 python 2 (3) 均可以,需要提前安装 JDK 和 Conda 环境。
  2. datax 安装:直接下载 DataX工具包:DataX下载地址 下载后解压至本地某个目录,进入bin目录,即可运行同步作业:
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
# 自检脚本:    
$ python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

2. 案例 Demo

2.1. Stream To Stream Demo

  1. 可以在datax目录下使用 bin/datax.py -r YOURreader -w YOURwriter查看当前组合的 json 配置模板。

  1. 书写配置文件,将配置文件放入 datax/job路径下,执行python bin/datax.py job/demo.json命令。
{"job": {"content": [{"reader": {"name": "streamreader","parameter": {"column": [{"type": "string","value": "xy"},{"type": "string","value": "25"}],"sliceRecordCount": "10"}},"writer": {"name": "streamwriter","parameter": {"encoding": "UTF-8","print": true}}}],"setting": {"speed": {"channel": "1"}}}
}

2.2. 从 Mysql 读取数据存放到 HDFS

  • HDFS 是为大数据而生的分布式文件系统,具备高容错、高吞吐、强扩展性,适合存储海量的结构化和非结构化数据

  • mysqlreader 参数解析:

  • hdfswrite 参数解析:

  • 其参数配置如下:
{"job": {"content": [{"reader": {"name": "mysqlreader",  "parameter": {"column": [],"connection": [{"jdbcUrl": [],"table": [],"querySql":[]}],"password": "","username": "","where": ""}},"writer": {"name": "hdfswriter","parameter": {"column": [],"compress": "","defaultFS": "","fieldDelimiter": "","fileName": "","fileType": "","path": "","writeMode": ""}}}],"setting": {"speed": {"channel": ""}}}
}
  • 如何链路中有部分线程的内容失败,datax 会回滚部分成功的数据。

2.2.1. Mysql (准备) 建库建表

$ mysql -uroot -p
create database datax;
use datax;
create table student(id int, name varchar(20));
// 插入数据
insert into student values(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu');

2.2.2. hdfs 准备

  1. 安装 hdfs 需要前置准备 Java 8+。
  2. 下载并解压 Hadoop:
wget https://downloads.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1.tar.gz
tar -zxvf hadoop-3.4.1.tar.gz -C ~ # 解压到根路径下
cd hadoop-3.4.1
  1. 配置环境变量:
vim ~/.bashrc
# 加入以下内容:
export HADOOP_HOME=你的路径/hadoop-3.4.1
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATHsource ~/.bashrc
  1. 配置 HDFS , 进入 $HADOOP_HOME/etc/hadoop/,修改如下文件 :

core-site.xml:

<configuration><property><name>fs.defaultFS</name><value>hdfs://localhost:9000</value></property>
</configuration>

hdfs-site.xml:

<configuration><property><name>dfs.replication</name><value>1</value>  <!-- 单节点设为 1 --></property><property><name>dfs.namenode.name.dir</name><value>file:///home/YOURUSER/hadoopdata/hdfs/namenode</value></property><property><name>dfs.datanode.data.dir</name><value>file:///home/YOURUSER/hadoopdata/hdfs/datanode</value></property>
</configuration>

替换 youruser 为当前用户名,并确保创建了对应目录:

mkdir -p ~/hadoopdata/hdfs/namenode
mkdir -p ~/hadoopdata/hdfs/datanode

5. 格式化并启动 HDFS,并验证启动。

# 格式化 HDFS
hdfs namenode -format# 启动 NameNode 和 DataNode
start-dfs.sh# 查看 Java 进程(有 namenode 和 datanode 即成功)
jps# 输出类似:
12345 NameNode
12346 DataNode# 测试命令是否可用:
hdfs dfs -mkdir /demo
hdfs dfs -ls /

6. 测试 JSON,以及测试结果

{"job": {"content": [{"reader": {"name": "mysqlreader",  "parameter": {"column": ["id","name"],"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/datax?useUnicode=true&characterEncoding=UTF-8"],"table": ["student"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "int"},{"name": "name","type": "string"}],"defaultFS": "hdfs://localhost:9000","fieldDelimiter": "|","fileName": "student.txt","fileType": "text","path": "/","writeMode": "append"}}}],"setting": {"speed": {"channel": "1"}}}
}

内部有保证数据一致性的判断,当多线程环境执行完后需要对比是否全部任务成功,否则触发回滚机制。

  • 可以看到管理界面下有成功同步的两个 student 文件。

3. DataX 原理分析

3.1. datax.py

  • 配置参数:
  • -j 可以指定 jvm 参数,可以用于配置堆内存。
  • 在该文件中启动了 java 执行 com.alibaba.datax.core.Engine 启动类。

3.2. JobContainor

有完整的生命周期管理(对应上方结构图的描述):

  1. init()
    1. reader 初始化。(根据不同的配置文件)
    2. writer 初始化。
  1. prepare()
    1. 调用插件,做一些基本准备工作,清理等。
  1. split()
    1. task 切分逻辑
  1. schedule()
  2. post()

3.2.1. Task 切分

  • 调整 channel 数量(并发数的确定)

在配置中主要分三个模块 reader writer setting,setting 中可以配置 channel、byte、record(数据条数),源码中选择如果设定了 数据量或带宽的速度,算出来的 channel 以小的为主。直接指定channel 的优先级最低。

3.2.2. Schedule 调度

根据拆分中计算的任务数和并发数取最小值,避免资源浪费。

  • assignFairly 将 task 公平的分配到 taskgroup

3.3. 优化(并发、限流)

  • job.setting.speed.channel : channel 并发数
  • job.setting.speed.record : 全局配置 channel 的 record 限速
  • job.setting.speed.byte:全局配置 channel 的 byte 限速
  • core.transport.channel.speed.record:单个 channel 的 record 限速
  • core.transport.channel.speed.byte:单个 channel 的 byte 限速

3.3.1. 优化1:提升每个 channel 的速度

在 DataX 内部对每个 Channel 会有严格的速度控制

  1. 控制每秒同步的记录数 record。
  2. 每秒同步的字节数 byte,默认的速度限制是 1MB/s,可以根据具体硬件情况设

置这个 byte 速度或者 record 速度,一般设置 byte 速度,比如:我们可以把单个 Channel 的

速度上限配置为 5MB。

3.3.2. 优化 2:提升 DataX Job 内 Channel 并发数

并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。

提升 job 内 Channel 并发有三种配置方式:

  1. Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速。
  2. Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速。
  3. 直接配置 Channel 个数。

3.3.3. 优化 3:提高 JVM 堆内存

当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据

交换通道,在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据

交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,为了防止 OOM 等错

误,调大 JVM 的堆内存。

建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。

调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动

的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json

-Xms8G JVM 堆内存 初始大小

-Xmx8G JVM 堆内存 最大大小

设置一致防止内存抖动 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/909462.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MaxCompute的Logview分析详解

文章目录 一、Logview简介1、概述2、标题与功能3、基础信息 二、作业详情1、Job Details2、Fuxi Sensor3、Result①当前作业运行成功&#xff0c;显示的为运行结果。②当前作业运行失败&#xff0c;显示的为失败原因。 4、SourceXML5、SQL Script6、History7、SubStatusHistory…

HTML5白云飘飘动态效果教程

HTML5白云飘飘动态效果教程 这里写目录标题 HTML5白云飘飘动态效果教程效果介绍实现步骤步骤一&#xff1a;创建HTML结构步骤二&#xff1a;设计CSS样式步骤三&#xff1a;添加JavaScript交互 代码解析HTML结构解析CSS样式解析JavaScript功能解析 自定义调整总结 效果介绍 本教…

tcp高难度问题

以下是针对这些问题&#xff0c;在面试场景下&#xff0c;既保证理论扎实、逻辑清晰&#xff0c;又具备交流延展性的回答思路与内容&#xff0c;可根据实际面试节奏和面试官反馈灵活调整展开&#xff1a; 1. 客户端端口号如何确定的&#xff1f; 面试官您好&#xff0c;客户端…

广东省省考备考(第二十八天6.13)—资料分析(第二节课)

基期与现期 官方定义&#xff1a;作为对比参照的是基期&#xff0c;而相对于基期比较的是现期 通俗说法&#xff1a;时间靠前的为基期&#xff0c;时间靠后的为现期 增长量与增长率 增长量用来表述基期量与现期量变化的绝对量&#xff1b; 增长率用来表述基期量与现期量变化…

pytorch 中前向传播和后向传播的自定义函数

系列文章目录 文章目录 系列文章目录一、torch.autograd.function代码实例 在开始正文之前&#xff0c;请各位姥爷动动手指&#xff0c;给小店增加一点访问量吧&#xff0c;点击小店&#xff0c;同时希望我的文章对你的学习有所帮助。本文也很简单&#xff0c;主要讲解pytorch的…

【项目实训#08】HarmonyOS知识图谱前端可视化实现

【项目实训#08】HarmonyOS知识图谱前端可视化实现 文章目录 【项目实训#08】HarmonyOS知识图谱前端可视化实现一、背景简介二、技术方案与架构设计2.1 技术选型2.2 组件架构设计 三、知识图谱可视化组件实现3.1 KGResultTab组件设计组件模板结构不同状态的处理用户交互控制节点…

【软件开发】什么是DSL

什么是DSL DSL&#xff08;Domain-Specific Language&#xff0c;领域特定语言&#xff09;是一种为特定领域或任务设计的编程语言&#xff0c;目的在于提高该领域中的表达能力与开发效率。 1 在脚本语言中的 DSL 是什么&#xff1f; 在脚本语言&#xff08;如 Python、Lua、…

JasperReport生成PDF/A类型文档

当JasperReport导出的文档为PDF/A模式时&#xff0c;该PDF为只读可以防止被修改。 设置导出参数 JRPdfExporter exporter new JRPdfExporter();exporter.setExporterInput(SimpleExporterInput.getInstance(jasperPrints));exporter.setExporterOutput(new SimpleOutputStre…

微信小程序使用画布实现飘落泡泡功能

微信小程序使用画布实现飘落泡泡功能&#xff1a;从组件封装到页面调用的完整实践 先看示例截图&#xff1a; 一、背景与技术选型 在微信小程序中实现类似于飘落的泡泡或者樱花飘落的功能&#xff0c;一般主要有 Canvas 和图片两种方案&#xff1a; &#xff08;1&#xff…

使用STM32设置GPIO中断

使用S™ 32设置GPIO中断 中断示例按键中断实例设计&#xff1a;EXTI0和EXTI9硬件连接分析STM32代码实现代码说明 中断示例 设计一个按键中断的实例。设置两个中断&#xff1a;EXTI0、EXTI9&#xff0c; 在EXTI9的中断服务之程序中实现LED灯的控制 按键中断实例设计&#xff…

解决在微信小程序中view组件下的text和images设置了样式display: flex; align-items: center;对不齐

原始代码的问题 <view style"display: flex; align-items: center;"><text style"line-height: 1;">全国</text><image src"/images/xia.png" style"height: 20rpx; width: 20rpx; display: block;"></im…

归并排序详解:优雅的分治艺术

什么&#xff1f;归并排序&#xff1f;这让博主想起了大学那会被《数据结构与算法》支配的恐惧… 哈哈言归正传&#xff0c;一直想对算法做一个专栏&#xff0c;因为其实工作中很少很少有机会用到算法&#xff0c;倒是很多工具方法底层会使用&#xff0c;工作被各种需求业务“折…

新零售视域下实体与虚拟店融合的技术逻辑与商业模式创新——基于开源AI智能名片与链动2+1模式的S2B2C生态构建

摘要&#xff1a;新零售的核心在于打破线上线下边界&#xff0c;构建“人、货、场”的全场景融合生态。本文提出&#xff0c;实体线下店与虚拟店的协同发展是新零售的重要演进方向&#xff0c;其底层逻辑在于满足消费者作为“现实人”的体验需求与“虚拟人”的效率需求。通过引…

可视化图解算法51:寻找第K大(数组中的第K个最大的元素)

牛客网 面试笔试 TOP101 | LeetCode 215. 数组中的第K个最大元素 1. 题目 描述 有一个整数数组&#xff0c;请你找出数组中第 k 大的数。 给定一个整数数组 a ,同时给定它的大小n和要找的 k &#xff0c;请返回第 k 大的数(包括重复的元素&#xff0c;不用去重)&…

DataWhale-零基础网络爬虫技术(一)

课程链接先给各位 ↓↓↓ &#xff08;点击即可食用.QAQ Datawhale-学用 AI,从此开始 一、引言 还是在笔记的开始&#xff0c;唠唠一些自己的故事 十年前第一次接触网络&#xff0c;也可以说是第一次接触计算机的时候&#xff0c;那时候还是在中学阶段&#xff0c;那时候大…

Linux02

目录 linux常用命令 用户和权限 压缩和解压缩 其他相关命令 Linux中安装常用软件 1.1. jdk的安装 1.1.1. 卸载linux中自带的open-jdk 1.1.2. 把安装包上传到 linux上 1.1.3. 解压安装包 1.1.4. 配置环境变量 1.1.5 验证环境变量 1.3 安装mysql 1.3.1. 检查依赖 1.…

JavaSE超详细笔记-网络编程篇-基于黑马

1. 什么是网络编程【理解】 1.1 概念 在网络通信协议下&#xff0c;不同计算机上运行的程序&#xff0c;进行的数据传输。 应用场景: 即时通信、网游对战、金融证券、国际贸易、邮件、等等。 不管是什么场景&#xff0c;都是计算机跟计算机之间通过网络进行数据传输Java中可以使…

时序数据库Influxdb3 core安装

本文介绍时序数据库Influxdb3 core(开源版本)的安装和简单使用以及调优参数的介绍。 预期&#xff1a; 安装时序数据库Influxdb3 core 创建数据库mydb 写入数据&#xff1b; 使用influxdb3-cli 和 grafana2种方式查询写入的数据 前期准备&#xff1a; linux服务器(本文服…

区间合并:区间合并问题

区间合并&#xff1a;区间合并问题 区间合并 www.acwing.com/problem/content/805/ 按区间的左端点排序 扫描整个区间&#xff0c;在这过程中把可能有交点的区间合并 全包含&#xff1a;不做改动相交&#xff1a;right 后移相离&#xff1a;更新至下一个维护区间 import j…

中国古代数学符号的演进 | 算筹 / 符号 / 算法

注&#xff1a;本文为“中国古代数学符号”相关合辑。 图片清晰度受引文原图所限。 略作重排&#xff0c;未整理去重。 如有内容异常&#xff0c;请看原文。 这个中国古代的数学瑰宝&#xff0c;到底厉害在哪&#xff1f; 原创 朱一文 科普中国 2024 年 07 月 31 日 15:30 北…