血缘元数据采集开放标准:OpenLineage Integrations Apache Spark Quickstart with Jupyter

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

Apache Spark

该集成已知适用于最新 Spark 版本以及其他 Apache Spark 3.*。如需获取受支持版本的最新信息,请查看此处。

该集成通过 OpenLineageSparkListener 使用 SparkListener 接口,提供全面的监控方案。 它会监听 SparkContext 发出的事件,提取与作业和数据集相关的元数据,并利用 RDD 和 DataFrame 的依赖图。方法可有效从各种数据源收集信息,包括文件系统源(如 S3 和 GCS)、JDBC 后端以及 Redshift 和 BigQuery 等数据仓库。

使用 Jupyter 快速入门

Quickstart with Jupyter

如果您已安装 Docker Desktop 和 git,体验 Spark 集成将非常简单。

若您使用 macOS Monterey (macOS 12),开始前需先释放 5000 端口,方法为关闭 AirPlay Receiver。

在工作空间克隆 OpenLineage 项目:

git clone https://github.com/OpenLineage/OpenLineage

进入 spark 集成目录($OPENLINEAGE_ROOT/integration/spark)并执行:

docker-compose up

该命令将启动 Marquez(作为 OpenLineage 客户端)与 Jupyter Spark notebook,均运行于 localhost:8888。
启动后,notebook 容器日志会列出包含访问令牌的 URL,例如:

notebook_1  |     To access the notebook, open this file in a browser:
notebook_1  |         file:///home/jovyan/.local/share/jupyter/runtime/nbserver-9-open.html
notebook_1  |     Or copy and paste one of these URLs:
notebook_1  |         http://abc12345d6e:8888/?token=XXXXXX
notebook_1  |      or http://127.0.0.1:8888/?token=XXXXXX

从您自己的日志中复制以 127.0.0.1 为主机名的 URL(token 与示例不同),粘贴到浏览器即可看到一个空白可用的 Jupyter notebook 环境。

image
环境就绪后,点击 notebooks 目录,再点击 New 按钮,新建一个 Python 3 notebook。

image

在首个单元格中粘贴以下内容:

from pyspark.sql import SparkSessionspark = (SparkSession.builder.master('local').appName('sample_spark').config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener').config('spark.jars.packages', 'io.openlineage:openlineage-spark:{{PREPROCESSOR:OPENLINEAGE_VERSION}}').config('spark.openlineage.transport.type', 'console').getOrCreate())

Spark 上下文启动后,将日志级别设为 INFO

spark.sparkContext.setLogLevel("INFO")

接着创建一个 Spark 表:

spark.createDataFrame([{'a': 1, 'b': 2},{'a': 3, 'b': 4}
]).write.mode("overwrite").saveAsTable("temp")

命令将以日志形式输出 OpenLineage 事件:

22/08/01 06:15:49 INFO ConsoleTransport: {"eventType":"START","eventTime":"2022-08-01T06:15:49.671Z","run":{"runId":"204d9c56-6648-4d46-b6bd-f4623255d324","facets":{"spark_unknown":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunFacet","inputs":[{"description":{"@class":"org.apache.spark.sql.execution.LogicalRDD","id":1,"streaming":false,"traceEnabled":false,"canonicalizedPlan":false},"inputAttributes":[],"outputAttributes":[{"name":"a","type":"long","metadata":{}},{"name":"b","type":"long","metadata":{}}]}]},"spark.logicalPlan":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunFacet","plan":[{"class":"org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand","num-children":1,"table":{"product-class":"org.apache.spark.sql.catalyst.catalog.CatalogTable","identifier":{"product-class":"org.apache.spark.sql.catalyst.TableIdentifier","table":"temp"},"tableType":{"product-class":"org.apache.spark.sql.catalyst.catalog.CatalogTableType","name":"MANAGED"},"storage":{"product-class":"org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat","compressed":false,"properties":null},"schema":{"type":"struct","fields":[]},"provider":"parquet","partitionColumnNames":[],"owner":"","createTime":1659334549656,"lastAccessTime":-1,"createVersion":"","properties":null,"unsupportedFeatures":[],"tracksPartitionsInCatalog":false,"schemaPreservesCase":true,"ignoredProperties":null},"mode":null,"query":0,"outputColumnNames":"[a, b]"},{"class":"org.apache.spark.sql.execution.LogicalRDD","num-children":0,"output":[[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"a","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":6,"jvmId":"6a1324ac-917e-4e22-a0b9-84a5f80694ad"},"qualifier":[]}],[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"b","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":7,"jvmId":"6a1324ac-917e-4e22-a0b9-84a5f80694ad"},"qualifier":[]}]],"rdd":null,"outputPartitioning":{"product-class":"org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning","numPartitions":0},"outputOrdering":[],"isStreaming":false,"session":null}]},"spark_version":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunFacet","spark-version":"3.1.2","openlineage-spark-version":"0.12.0-SNAPSHOT"}}},"job":{"namespace":"default","name":"sample_spark.execute_create_data_source_table_as_select_command","facets":{}},"inputs":[],"outputs":[{"namespace":"file","name":"/home/jovyan/notebooks/spark-warehouse/temp","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"a","type":"long"},{"name":"b","type":"long"}]},"lifecycleStateChange":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet","lifecycleStateChange":"CREATE"}},"outputFacets":{}}],"producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunEvent"}

生成的 JSON 包含输出数据集的名称与位置 {"namespace":"file","name":"/home/jovyan/notebooks/spark-warehouse/temp"、Schema 字段 [{"name":"a","type":"long"},{"name":"b","type":"long"}] 等。

更全面的演示(将 Spark 事件与 Marquez 后端集成)可在我们的博客查看:使用 OpenLineage 和 Apache Spark 追踪数据血缘

关于 OpenLineage

About OpenLineage

OpenLineage 是一个用于数据血缘收集和分析的开放框架。其核心是一个可扩展的规范,系统可通过该规范与血缘元数据实现互操作。

设计

OpenLineage 是一个用于血缘元数据收集的 开放标准,旨在为执行中的 作业 记录元数据。

该标准定义了 数据集作业运行 实体的通用模型,这些实体通过一致的命名策略进行唯一标识。核心模型通过 Facet 实现高度可扩展。Facet 是用户定义的元数据,可用于丰富实体。我们鼓励您先熟悉下面的核心模型:
image

OpenLineage 如何惠及生态系统

下面,我们说明了从多个来源、调度器和/或数据处理框架收集血缘元数据的挑战,随后概述了定义 开放标准 用于血缘元数据收集的设计优势。

之前

image

  • 每个项目都必须自行实现自定义的元数据收集集成,从而造成重复劳动。
  • 集成是外部的,可能会随着底层调度器和/或数据处理框架的新版本而中断,要求项目确保 向后 兼容性。

使用 OpenLineage

image

  • 集成工作可在项目间 共享
  • 集成可以 推送 到底层调度器和/或数据处理框架;不再需要追赶并确保兼容性!

范围

OpenLineage 定义了正在运行作业及其对应事件的元数据。
可配置的后端允许用户选择将事件发送到的协议。
image

核心模型

image
Facet 是一个附加到某个核心实体的原子元数据片段。
有关更多详细信息,请参阅规范。

规范

规范 使用 OpenAPI 定义,并允许通过自定义 Facet 进行扩展。

集成

OpenLineage 仓库包含与多个系统的集成。

  • Apache Airflow
  • Apache Flink
  • Apache Spark
  • Dagster
  • dbt
  • SQL

相关项目

  • Marquez:Marquez 是一个 LF AI & DATA 项目,用于收集、聚合和可视化数据生态系统的元数据。它是 OpenLineage API 的参考实现。

    • OpenLineage collection implementation
  • Egeria:Egeria 开放元数据和治理。一个元数据总线。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/96882.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/96882.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智能之数学基础:离散随机变量和连续随机变量

本文重点 随机变量是概率论与统计学中的核心概念,用于将随机现象的抽象结果转化为可量化的数值。根据取值特性的不同,随机变量可分为离散型和连续型两大类。 在前面的课程中我们学习了随机变量,随机变量可以理解为一个函数,通过这个函数我们就可以将随机试验中的结果数值…

SQL语句(查询)

单表查询 常量查询 让我们来看一个具体的 SQL 代码和结果示例,假设有一张名为 orders 的数据表,它存储了订单信息,包括订单编号(order_id)、商品单价(unit_price)、购买数量(quantit…

Java 大视界 -- Java 大数据机器学习模型在金融市场波动预测与资产配置动态调整中的应用

Java 大视界 -- Java 大数据机器学习模型在金融市场波动预测与资产配置动态调整中的应用引言:正文:一、Java 构建的金融数据处理架构1.1 多源数据实时融合与清洗1.2 跨市场数据关联(风险传导分析)二、Java 驱动的市场波动预测模型…

基于muduo库的图床云共享存储项目(一)

基于muduo库的图床云共享存储项目(一)项目简介整体架构项目依赖基础组件muduo库Channel类Poller / EpollPoller 类EventLoopAcceptor类FastDfsJSON的使用项目简介 当前所实现的项目是一个基于muduo库的图床云共享存储项目,他的主要的功能就是…

数字化转型三阶段:从信息化、数字化到数智化的战略进化

企业的数字化转型包括信息化、数字化、数智化三个阶段,并非一个阶段结束才能进入到下一个阶段。01信息化→业务数据化信息化是将企业在生产经营过程中产生的业务信息进行记录、储存和管理,通过电子终端呈现,便于信息的传播与沟通。信息化是对…

SpringBoot如何获取系统Controller名称和方法名称

这种代码里面的Controller和里面的方法怎么获取代码:/*** 获取所有Controller名称*/ApiDescription("获取所有Controller名称")PostMapping("/getControllerNames")public Result getControllerNames() {return dataDesensitizationRulesServic…

(二十二)深入了解AVFoundation-编辑:视频变速功能-实战在Demo中实现视频变速

一. 引言视频变速(Speed Ramp)是视频编辑中最常见的特效之一:慢动作(Slow Motion):强调细节,让观众捕捉到肉眼难以察觉的瞬间;快动作(Fast Motion)&#xff1…

MCP零基础学习(7)|实战指南:构建论文分析智能体

在之前的教程中,我们已经介绍了 MCP(Model Context Protocol)的基本概念及其核心组件。在本篇教程中,我们将通过一个实际案例,演示如何运用 MCP 构建一个能够分析学术论文的智能体。这个智能体将具备读取 PDF 文件、提…

Unity URP半透明物体自身交叠解决方案

前言 在 Unity 的通用渲染管线(URP)中,处理半透明物体的自身交叠是一个常见挑战。当半透明物体(如玻璃、水或透明材质)的某些部分相互重叠时,可能会出现渲染顺序问题,导致视觉瑕疵。 对惹&…

哈希算法入门:深入浅出讲明白HASH哈希算法

一、先搞懂:哈希算法到底是 “啥玩意儿”?咱们先别碰复杂概念,从你每天都会遇到的事说起 —— 你会发现,“哈希思维” 其实早就藏在生活里了。(一)生活中的 “哈希例子”:给东西 “贴标签、找位…

Vuex 和 Pinia 各自的优点

核心总结(一句话概括) Vuex:Vue 官方曾经的状态管理标准解决方案,成熟稳定,概念清晰,但语法稍显冗长。Pinia:Vue 官方推荐的新一代状态管理库,API 设计极其简洁,完美支持…

几种方式实现文件自动上传到服务器共享文件夹

文章目录一、方案核心逻辑二、详细实现步骤(以Windows系统为例)1. 确认服务器共享文件夹的“访问权限”(前提)2. 选择“传输触发方式”(按需求选实时/周期)(1)周期传输(如…

Milvus介绍及多模态检索实践

1、核心组件 1.1 Collection (集合) 可以用一个图书馆的比喻来理解 Collection: Collection (集合): 相当于一个图书馆,是所有数据的顶层容器。一个 Collection 可以包含多个 Partition,每个 Partition 可以包含多个 Entity。 Partition (分区…

第二十三天-LCD液晶显示实验

一、LCD结构体定义LCD为LCD_TypeDef类型的指针,指向0x6C000000的地址空间(bank1分区4的地址范围)。为什么需要并上0x000007FE呢?因为虽然驱动SRAM的时序和16位8080接口时序(驱动LCD时序)很像,但…

SQL性能调优

MySQL出现性能差的原因有哪些? 可能是 SOL查询使用了全表扫描,也可能是查询语句过于复杂,如多表 IOIN 或嵌套子查询。 也有可能是单表数据量过大。 通常情况下,添加索引就能解决大部分性能问题。对于一些热点数据,还可以通过增加…

dapo:开源大规模llm强化学习系统的突破与实现

本文由「大千AI助手」原创发布,专注用真话讲AI,回归技术本质。拒绝神话或妖魔化。搜索「大千AI助手」关注我,一起撕掉过度包装,学习真实的AI技术! ✨ 1. dapo概述:开源llm强化学习系统的重要突破 dapo&…

【车载开发系列】ParaSoft集成测试环境配置(五)

【车载开发系列】ParaSoft集成测试环境配置(五) 【车载开发系列】ParaSoft集成测试环境配置(五) 【车载开发系列】ParaSoft集成测试环境配置(五) 一. 剥离硬件环境的设置 二. 灵活使用编译开关 三. 导入修改后的bdf文件 四. 自动生成底层桩函数 五. 开始跑集成测试用例 六…

大模型(一)什么是 MCP?如何使用 Charry Studio 集成 MCP?

目录一、什么是 MCP?1.1 🤔 开始之前的思考1.2 MCP 的定义1.3 MCP 结构二、MCP 的使用2.1 uv 的安装2.2 MCP 广场2.3 MCP 的配置2.4 MCP 的依赖安装2.5 Charry Studio2.6 测试结果背景: MCP 这个概念大概是 2025 年上半年火起来的&#xff0c…

源码导航页

一、Python捕捉动作发送到Unity驱动模型跟着动(获取源码) 二、AI输入法源码(获取源码) 三、Java企业级后台管理系统-登录授权角色菜单(获取源码) 四、Jetson实现纯视觉导航(获取源码&#xff09…

HTTP/2 性能提升的核心原因

一、协议架构优化‌‌二进制分帧(Binary Framing)‌HTTP/2 将传统文本格式的报文(如请求头、数据体)拆分为独立的二进制帧(Frame),每个帧包含流标识符(Stream ID)&#x…