血缘元数据采集开放标准:OpenLineage Integrations Compatibility Tests Structure

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

在 Spark 中使用 OpenLineage

改编自 Michael Collado 的博文

本指南基于早期版本的集成开发,可能需要根据最新版本进行调整。

将 OpenLineage 添加到 Spark 非常简单,这得益于 Spark 的 SparkListener 接口。OpenLineage 通过实现 SparkListener 并收集 Spark 应用程序内部执行的作业信息来与 Spark 集成。要激活监听器,请将以下属性添加到集群的 spark-defaults.conf 文件中的 Spark 配置中,或者通过 spark-submit 命令将它们添加到特定作业的提交中:

spark.jars.packages     io.openlineage:openlineage-spark:{{PREPROCESSOR:OPENLINEAGE_VERSION}}
spark.extraListeners    io.openlineage.spark.agent.OpenLineageSparkListener

激活后,监听器需要知道将血缘事件报告到哪里以及作业的命名空间。将以下附加配置行添加到 spark-defaults.conf 文件或 Spark 提交脚本中:

spark.openlineage.transport.url      {your.openlineage.host}
spark.openlineage.transport.type     {your.openlineage.transport.type}
spark.openlineage.namespace {your.openlineage.namespace}

使用 OpenLineage 运行 Spark

前提条件

  • Docker Desktop
  • git
  • Google Cloud Service 账户
  • Google Cloud Service 账户 JSON 密钥文件

注意:你的 Google Cloud 账户应拥有 BigQuery 的访问权限以及对你的 GCS 存储桶的读写权限。建议为你的密钥文件起一个容易记住的名称(bq-spark-demo.json)。最后,如果使用 macOS Monterey (macOS 12),则需要通过禁用 AirPlay 接收器释放端口 5000。

使用说明

克隆 OpenLineage 项目,导航到 spark 目录,并为你的 Google Cloud Service 凭据创建一个目录:

git clone https://github.com/OpenLineage/OpenLineage
cd integration/spark
mkdir -p docker/notebooks/gcs

将你的 Google Cloud Service 凭据文件复制到该目录中,然后运行:

docker-compose up

这将启动一个带有 Spark 的 Jupyter notebook,以及一个已安装的 Marquez API 端点,用于报告血缘。一旦 notebook 服务器启动并运行,你应在日志中看到如下内容:

notebook_1  | [I 21:43:39.014 NotebookApp] Jupyter Notebook 6.4.4 is running at:
notebook_1  | [I 21:43:39.014 NotebookApp] http://082cb836f1ec:8888/?token=507af3cf9c22f627f6c5211d6861fe0804d9f7b19a93ca48
notebook_1  | [I 21:43:39.014 NotebookApp]  or http://127.0.0.1:8888/?token=507af3cf9c22f627f6c5211d6861fe0804d9f7b19a93ca48
notebook_1  | [I 21:43:39.015 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

从你自己的日志中复制以 127.0.0.1 作为主机名的 URL(令牌将与此不同)并将其粘贴到浏览器窗口中。你应该会看到一个空白的 Jupyter notebook 环境,准备就绪。

Jupyter notebook 环境

点击 notebooks 目录,然后点击 New 按钮创建一个新的 Python 3 notebook。

Jupyter 新 notebook

在窗口的第一个单元格中粘贴以下文本。更新 GCP 项目和存储桶名称以及服务账户凭据文件,然后运行代码:

from pyspark.sql import SparkSession
import urllib.request# 下载 BigQuery 和 GCS 的依赖项
gc_jars = ['https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.1.1/gcs-connector-hadoop3-2.1.1-shaded.jar','https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/bigquery-connector/hadoop3-1.2.0/bigquery-connector-hadoop3-1.2.0-shaded.jar','https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.22.2/spark-bigquery-with-dependencies_2.12-0.22.2.jar']files = [urllib.request.urlretrieve(url)[0] for url in gc_jars]# 设置为你自己的项目和存储桶
project_id = 'bq-openlineage-spark-demo'
gcs_bucket = 'bq-openlineage-spark-demo-bucket'
credentials_file = '/home/jovyan/notebooks/gcs/bq-spark-demo.json'spark = (SparkSession.builder.master('local').appName('openlineage_spark_test').config('spark.jars', ",".join(files))# 安装并设置 OpenLineage 监听器.config('spark.jars.packages', 'io.openlineage:openlineage-spark:{{PREPROCESSOR:OPENLINEAGE_VERSION}}').config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener').config('spark.openlineage.transport.url', 'http://marquez-api:5000').config('spark.openlineage.transport.type', 'http').config('spark.openlineage.namespace', 'spark_integration')# 配置 Google 凭据和项目 ID.config('spark.executorEnv.GCS_PROJECT_ID', project_id).config('spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS', '/home/jovyan/notebooks/gcs/bq-spark-demo.json').config('spark.hadoop.google.cloud.auth.service.account.enable', 'true').config('spark.hadoop.google.cloud.auth.service.account.json.keyfile', credentials_file).config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem').config('spark.hadoop.fs.AbstractFileSystem.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS').config("spark.hadoop.fs.gs.project.id", project_id).getOrCreate())

以上内容大部分是用于在 notebook 环境中安装 BigQuery 和 GCS 库的样板代码。这还设置了配置参数,告诉库使用哪个 GCP 项目以及如何与 Google 进行身份验证。特定于 OpenLineage 的参数是已提到的四个:spark.jars.packagesspark.extraListenersspark.openlineage.hostspark.openlineage.namespace。在此,主机已配置为由 Docker 启动的 marquez-api 容器。

配置好 OpenLineage 后,是时候获取一些数据了。以下代码用两个 COVID-19 公共数据集的数据填充 Spark DataFrame。在 notebook 中创建一个新单元格并粘贴以下内容:

from pyspark.sql.functions import expr, colmask_use = spark.read.format('bigquery') \.option('parentProject', project_id) \.option('table', 'bigquery-public-data:covid19_nyt.mask_use_by_county') \.load() \.select(expr("always + frequently").alias("frequent"),expr("never + rarely").alias("rare"),"county_fips_code")opendata = spark.read.format('bigquery') \.option('parentProject', project_id) \.option('table', 'bigquery-public-data.covid19_open_data.covid19_open_data') \.load() \.filter("country_name == 'United States of America'") \.filter("date == '2021-10-31'") \.select("location_key",expr('cumulative_deceased/(population/100000)').alias('deaths_per_100k'),expr('cumulative_persons_fully_vaccinated/(population - population_age_00_09)').alias('vaccination_rate'),col('subregion2_code').alias('county_fips_code'))
joined = mask_use.join(opendata, 'county_fips_code')joined.write.mode('overwrite').parquet(f'gs://{gcs_bucket}/demodata/covid_deaths_and_mask_usage/')

以上内容的一些背景:covid19_open_data 表被过滤为仅包含 2021 年 10 月 31 日的美国数据。deaths_per_100k 数据点使用现有的 cumulative_deceasedpopulation 列计算,vaccination_rate 使用总人口减去 9 岁以下人口计算,因为当时他们没有资格接种疫苗。对于 mask_use_by_county 数据,“rarely” 和 “never” 数据合并为一个数字,“frequently” 和 “always” 也是如此。然后存储来自两个数据集的选定列。

现在,在 notebook 中添加一个单元格并粘贴此行:

spark.read.parquet(f'gs://{gcs_bucket}/demodata/covid_deaths_and_mask_usage/').count()

notebook 应打印警告和堆栈跟踪(可能是调试语句),然后返回总共 3142 条记录。

现在管道已运行,可用于血缘收集。

与 OpenLineage 仓库一起提供的 docker-compose.yml 文件仅包括 Jupyter notebook 和 Marquez API。为了直观地探索血缘,请启动 Marquez web 项目。在不终止现有 docker 容器的情况下,在新终端中运行以下命令:

docker run --network spark_default -p 3000:3000 -e MARQUEZ_HOST=marquez-api -e MARQUEZ_PORT=5000 -e WEB_PORT=3000 --link marquez-api:marquez-api marquezproject/marquez-web:0.19.1

接下来,打开新的浏览器标签页并导航到 http://localhost:3000,其外观应如下所示:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

注意:spark_integration 命名空间是自动选择的,因为没有其他命名空间可用。UI 的作业页面上列出了三个作业。它们都以 openlineage_spark_test 开头,这是在构建 notebook 的第一个单元格时传递给 SparkSession 的 appName。每个查询执行或 RDD 操作都表示为一个不同的作业,操作的名称附加到应用程序名称以形成作业名称。点击 openlineage_spark_test.execute_insert_into_hadoop_fs_relation_command 节点会调出我们 notebook 的血缘图:

Marquez 作业图

该图显示 openlineage_spark_test.execute_insert_into_hadoop_fs_relation_command 作业从两个输入数据集 bigquery-public-data.covid19_nyt.mask_use_by_countybigquery-public-data.covid19_open_data.covid19_open_data 读取,并写入第三个数据集 /demodata/covid_deaths_and_mask_usage。该第三个数据集的命名空间缺失,但完全限定名称为 gs://<your_bucket>/demodata/covid_deaths_and_mask_usage

底部栏显示从 Spark 作业收集的一些有趣数据。向上拖动栏可展开视图以便更仔细地查看。

Marquez 作业切面

从 Spark 作业始终收集的两个切面是 spark_versionspark.logicalPlan。第一个仅报告执行 Spark 的版本以及 openlineage-spark 库的版本。这有助于调试作业运行。

第二个切面是作业运行时 Spark 报告的序列化优化 LogicalPlan。Spark 的查询优化会对查询作业的执行时间和效率产生巨大影响。跟踪查询计划如何随时间变化可以显著帮助调试生产环境中的慢查询或 OutOfMemory 错误。

点击第一个 BigQuery 数据集可提供有关数据的信息:

Marquez BigQuery 数据集

可以看到数据集的架构以及数据源。

关于写入 GCS 的数据集的类似信息也可用:

Marquez 输出数据集

与 BigQuery 数据集一样,可以看到输出架构和数据源——在此情况下为 gs:// 方案和写入的存储桶名称。

除了架构之外,还可以看到统计切面,报告输出记录数和字节数为 -1。

底部栏上的 VERSIONS 标签会显示多个版本(如果有的话)(此处不是这种情况)。点击版本会显示相同架构和统计切面,但它们特定于所选版本。

Marquez 输出数据集版本

在生产环境中,此数据集将有许多版本,因为每次作业运行时都会创建数据集的新版本。这允许跟踪统计和架构随时间的变化,有助于调试慢作业或数据质量问题以及作业失败。

UI 中的最终作业是 HashAggregate 作业。这表示末尾调用的 count() 方法以显示数据集中的记录数。这可以很容易地是 toPandas() 调用或读取和处理该数据的其他作业——也许将输出存回 GCS 或更新 Postgres 数据库,发布新模型等。无论输出存储在何处,OpenLineage 集成都允许查看整个血缘图,统一对象存储、关系数据库和更传统数据仓库中的数据集。

结论

OpenLineage 的 Spark 集成为用户提供了对存储在 S3、GCS 和 Azure Blob Storage 等对象存储以及 BigQuery 和 Postgres 等关系数据库中的数据集图的洞察。现在支持 Spark 3.1,OpenLineage 在 Databricks、EMR 和 Dataproc 集群等更多环境中提供可见性。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/95500.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/95500.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

执行一条select语句期间发生了什么?

首先是连接器的工作&#xff0c;嗯&#xff0c;与客户端进行TCP三次握手建立连接&#xff0c;校验客户端的用户名和密码&#xff0c;如果用户名和密码都对了&#xff0c;那么就会检查该用户的权限&#xff0c;之后执行的所有SQL语句都是基于该权限接着客户端就可以向数据库发送…

element el-select 默认选中数组的第一个对象

背景&#xff1a;在使用element组件的时候&#xff0c;我们期望默认选中第一个数值。这里我们默认下拉列表绑定的lable是中文文字&#xff0c;value绑定的是数值。效果展示&#xff1a;核心代码&#xff1a;<template><el-select v-model"selectValue" plac…

【论文阅读】LightThinker: Thinking Step-by-Step Compression (EMNLP 2025)

论文题目&#xff1a;LightThinker: Thinking Step-by-Step Compression 论文来源&#xff1a;EMNLP 2025&#xff0c;CCF B 论文作者&#xff1a; 论文链接&#xff1a;https://arxiv.org/abs/2502.15589 论文源码&#xff1a;https://github.com/zjunlp/LightThinker 一、…

ABAQUS多尺度纤维增强混凝土二维建模

本案例是通过ABAQUS对论文Study on the tensile and compressive mechanical properties of multi-scale fiber-reinforced concrete: Laboratory test and mesoscopic numerical simulation&#xff08;https://doi.org/10.1016/j.jobe.2024.108852&#xff09;中纤维增强混凝…

C++ ---- 模板的半特化与函数模板的偏特化

在 C 中&#xff0c;模板提供了一种强大的泛型编程方式&#xff0c;使得我们能够编写类型无关的代码。然而&#xff0c;在实际使用中&#xff0c;有时我们需要根据具体的类型或类型组合对模板进行定制&#xff0c;这时就需要用到模板的特化。本文将介绍半模板特化和函数模板的偏…

为何 React JSX 循环需要使用 key

key 是 React 用于识别列表中哪些子元素被改变、添加或删除的唯一标识符 它帮助 React 更高效、更准确地更新和重新渲染列表 1、核心原因&#xff1a;Diff算法与性能优化 React 的核心思想之一是通过虚拟 DOM (Virtual DOM) 来减少对真实 DOM 的直接操作&#xff0c;从而提升性…

Jetson AGX Orin平台R36.3.0版本1080P25fps MIPI相机图像采集行缺失调试记录

1.前言 主板:AGX Orin 官方开发套件 开发版本: R36.3.0版本 相机参数如下: 相机硬件接口: 2. 梳理大致开发流程 核对线序/定制相机转接板 编写camera driver驱动 编写camera dts配置文件 调camera参数/测试出图 前期基本流程就不多介绍了直接讲正题 3. 问题描述 …

力扣hot100:螺旋矩阵(边界压缩,方向模拟)(54)

在解决螺旋矩阵问题时&#xff0c;我们需要按照顺时针螺旋顺序遍历矩阵&#xff0c;并返回所有元素。本文将分享两种高效的解决方案&#xff1a;边界收缩法和方向模拟法。题目描述边界收缩法边界收缩法通过定义四个边界&#xff08;上、下、左、右&#xff09;来模拟螺旋遍历的…

[嵌入式embed][Qt]Qt5.12+Opencv4.x+Cmake4.x_用Qt编译linux-Opencv库 测试

[嵌入式embed][Qt]Qt5.12Opencv4.xCmake4.x_用Qt编译linux-Opencv库 & 测试前文:准备环境安装qt-opencv必备库git-clone opencv库编译opencv库特殊:opencv编译的include,编译出来后多嵌套了一层文件夹,手工处理下改为include/opencv2测试demo新建项目QOpencv3.promain.cpp百…

百度智能云「智能集锦」自动生成短剧解说,三步实现专业级素材生产

备受剪辑压力困扰的各位自媒体老板、MCN 同学们、投放平台大佬们&#xff0c;解放双手和大脑的好机会它来了&#xff01; 在这个数字化飞速发展的时代&#xff0c;智能技术正以前所未有的速度改变着我们的生活与工作方式。百度智能云&#xff0c;作为智能科技的引领者&#xf…

FPGA笔试面试常考问题及答案汇总

经历了无数的笔试面试之后&#xff0c;不知道大家有没有发现FPGA的笔试面试还是有很多共通之处和规律可循的。所以一定要掌握笔试面试常考的问题。FPGA设计方向&#xff08;部分题目&#xff09;1. 什么是同步逻辑和异步逻辑&#xff1f;同步逻辑 是指在同一个时钟信号的控制下…

从0开始的github学生认证并使用copilot教程(超详细!)

目录 一.注册github账号 1.1、仅仅是注册 1.2、完善你的profile 二、Github 学生认证 邮箱 学校名称 How do you plan to use Github? Upload Proof 学校具体信息 一.注册github账号 1.1、仅仅是注册 1.用如QQ邮箱的第三方邮箱注册github 再添加.edu结尾的教育邮箱&…

自动驾驶叉车与 WMS 集成技术方案:数据交互、协议适配与系统对接实现

自动驾驶叉车与仓库管理系统&#xff08;WMS&#xff09;是现代物流自动化的核心。当这两项技术协同工作时&#xff0c;仓库将实现前所未有的效率、准确性和可扩展性。以下是利用其集成实现最佳效果的方法。 为何集成至关重要 仓库管理在当今运营中扮演着至关重要的角色&…

“企业版维基百科”Confluence

“企业版维基百科”Confluence Confluence 是一款由澳大利亚公司 Atlassian 开发的企业级团队协作与知识管理软件。您可以把它理解为一个功能非常强大的 “企业版维基百科” 或 “团队知识库”。 它的核心目标是帮助团队在一个统一的平台上创建、共享、组织和讨论项目文档、会议…

QT去除显示的红色和黄色下划线的办法

在使用 Qt Creator 开发项目时,有时候会遇到这样的情况: 代码明明没有错误,但编辑器里却出现了红色或黄色的下划线提示,甚至让人误以为代码有问题。其实,这通常是 Qt Creator 的代码模型没有及时更新 导致的,而不是项目本身的错误。 为什么会出现红色和黄色下划线? 红…

域内的权限提升

CVE-2020-1472域内有一个服务&#xff1a;MS-NRPC&#xff08;建立与域控安全通道&#xff09;&#xff0c;可利用此漏洞获取域管访问权限。检测这个漏洞能不能打&#xff0c;能打之后&#xff0c;将域控的机器hash置空&#xff0c;密码为空&#xff0c;那么你就可以通过空的ha…

一键掌握服务器健康状态与安全风险

一键掌握服务器健康状态与安全风险 在服务器运维工作中,定期对系统进行全面检查是保障服务稳定运行的关键环节。手动检查不仅耗时费力,还容易遗漏关键指标。今天我将为大家介绍一款功能全面的系统综合巡检工具,只需一键运行,即可完成系统状态、性能、安全等多维度检查,并…

线性代数第一讲—向量组

文章目录考纲术语向量组的线性表示与线性相关判别线性相关性的七大定理极大线性无关组、等价向量组、向量组的秩等价矩阵和等价向量组向量空间基本概念基变换、坐标变换 考纲术语 n维向量n维行向量n维列向量分量向量相等向量的加法向量的数乘向量的内积正交向量的模单位向量标准…

涉私数据安全与可控匿名化利用机制研究(下)

文章目录前言三、可信数据空间支撑可控匿名化机制&#xff08;一&#xff09;基于政府可信根的可控匿名化&#xff08;二&#xff09;可信数据空间“中国模式”保障数据全生命周期合规可控&#xff08;三&#xff09;可控匿名化对大模型数据可逆风险的防御机制前言 尽管《个人…

More Effective C++ 条款25:将构造函数和非成员函数虚拟化

More Effective C 条款25&#xff1a;将构造函数和非成员函数虚拟化核心思想&#xff1a;通过虚拟构造函数和非成员函数&#xff0c;实现运行时的多态行为&#xff0c;允许在不知道对象具体类型的情况下创建新对象或执行操作&#xff0c;增强代码的灵活性和扩展性。 &#x1f6…