血缘元数据采集开放标准:OpenLineage Guides 使用 Apache Airflow® 和 OpenLineage + Marquez 入门

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

使用 Apache Airflow® 和 OpenLineage + Marquez 入门

Getting Started with Apache Airflow® and OpenLineage+Marquez

本教程将指导你配置 Apache Airflow® 以将 OpenLineage 事件发送到 Marquez,并通过一个真实的故障排查场景进行探索。

目录

  • 前提条件
  • 获取并启动 Marquez
  • 配置 Airflow 将 OpenLineage 事件发送到 Marquez
  • 编写 Airflow DAG
  • 在 Marquez 中查看已收集的血缘
  • 使用 Marquez 排查失败的 DAG
  • 后续步骤
  • 反馈

前提条件

开始前,请确保已安装:

  • Docker 17.05+
  • Apache Airflow 2.7+ 本地运行。

如需在本地轻松安装并运行 Airflow 以用于开发,请参阅:快速开始。

获取并启动 Marquez

  1. 创建 Marquez 目录,然后通过运行以下命令检出 Marquez 源码:

    MacOS/Linux

    git clone https://github.com/MarquezProject/marquez && cd marquez
    

    Windows

    git config --global core.autocrlf false
    git clone https://github.com/MarquezProject/marquez && cd marquez
    
  2. Airflow 和 Marquez 都需要 5432 端口用于其元数据库,但 Marquez 服务更易于配置。你也可以即时为数据库服务分配一个新端口。要使用 2345 端口启动 Marquez,请运行:

    MacOS/Linux

    ./docker/up.sh --db-port 2345
    

    Windows

    验证 Postgres 和 Bash 是否在 PATH 中,然后运行:

    sh ./docker/up.sh --db-port 2345
    
  3. 要查看 Marquez UI 并验证其运行状态,请打开 http://localhost:3000。该 UI 允许你:

    • 查看跨平台依赖关系,即你可在生态系统中查看生成或消费关键表的工具中的作业。
    • 查看当前和先前作业运行的运行级元数据,使你能够看到作业的最新状态和数据集的更新历史。
    • 获取资源使用情况的高级视图,使你能够查看操作中的趋势。

配置 Airflow 将 OpenLineage 事件发送到 Marquez

  1. 要配置 Airflow 以将 OpenLineage 事件发送到 Marquez,你需要修改本地 Airflow 环境并添加依赖。首先,定义一个 OpenLineage 传输。一种方法是使用环境变量。要使用 http 并将事件发送到本地端口 5000 上运行的 Marquez API,请运行:

    MacOS/Linux

    export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
    

    Windows

    set AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
    
  2. 你还需要为 Airflow 作业定义一个命名空间。它可以是任意字符串。请运行:

    MacOS/Linux

    export AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
    

    Windows

    set AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
    
  3. 要将所需的 Airflow OpenLineage Provider 包添加到你的 Airflow 环境,请运行:

    MacOS/Linux

    pip install apache-airflow-providers-openlineage
    

    Windows

    pip install apache-airflow-providers-openlineage
    
  4. 要完成本教程,你还需要在 Airflow 中启用本地 Postgres 操作。为此,请运行:

    MacOS/Linux

    pip install apache-airflow-providers-postgres
    

    Windows

    pip install apache-airflow-providers-postgres
    
  5. 在本地 Postgres 实例中创建一个数据库,并使用默认 ID (postgres_default) 创建一个 Airflow Postgres 连接。如需前者帮助,请参阅:Postgres 文档。如需后者帮助,请参阅:管理连接。

编写 Airflow DAG

在此步骤中,你将创建两个新的 Airflow DAG,它们执行简单任务,并将其添加到你现有的 Airflow 实例。counter DAG 每分钟将列值加 1,而 sum DAG 每五分钟计算一次总和。这将形成一个包含两个作业和两个数据集的简单管道。

  1. dags/ 目录下,创建一个名为 counter.py 的文件,并添加以下代码:

    import pendulum
    from airflow.decorators import dag, task
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from airflow.utils.dates import days_ago@dag(schedule='*/1 * * * *',start_date=days_ago(1),catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that generates a new count value equal to 1.'
    )def counter():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS counts (value INTEGER);''',)query2 = PostgresOperator(task_id='inc',postgres_conn_id='postgres_default',sql='''INSERT INTO "counts" (value) VALUES (1);''',)query1 >> query2counter()
  2. dags/ 目录下,创建一个名为 sum.py 的文件,并添加以下代码:

    import pendulum
    from airflow.decorators import dag, task
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from airflow.utils.dates import days_ago@dag(start_date=days_ago(1),schedule='*/5 * * * *',catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that sums the total of generated count values.'
    )def sum():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS sums (value INTEGER);''')query2 = PostgresOperator(task_id='total',postgres_conn_id='postgres_default',sql='''INSERT INTO sums (value)SELECT SUM(value) FROM counts;''')query1 >> query2sum()
  3. 重启 Airflow 以应用更改。然后,取消暂停两个 DAG。

在 Marquez 中查看已收集的血缘

  1. 要查看 Marquez 从 Airflow 收集的血缘,请访问 http://localhost:3000 打开 Marquez UI。然后,使用左上角搜索栏搜索 counter.inc 作业。要查看 counter.inc 的血缘元数据,请从下拉列表中点击该作业:

    image

  2. 查看 counter.inc 的血缘图,你应看到 <database>.public.counts 作为输出数据集,sum.total 作为下游作业:

    image

使用 Marquez 排查失败的 DAG

  1. 在此步骤中,你将模拟由于跨 DAG 依赖项更改导致的管道中断,并了解来自 OpenLineage + Marquez 的增强血缘如何使架构更改的故障排查变得轻松。

    假设 Team A 拥有 DAG counterTeam A 更新 counter 以将 counts 表中的 values 列重命名为 value_1_to_10,但未将架构更改正确传达给拥有 sum 的团队。

    counter 应用以下更改以模拟破坏性更改:

    query1 = PostgresOperator(
    -   task_id='if_not_exists',
    +   task_id='alter_name_of_column',postgres_conn_id='example_db',sql='''
    -   CREATE TABLE IF NOT EXISTS counts (
    -     value INTEGER
    -   );''',
    +   ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10";
    +   '''
    )
    
    query2 = PostgresOperator(task_id='inc',postgres_conn_id='example_db',sql='''
    -    INSERT INTO counts (value)
    +    INSERT INTO counts (value_1_to_10)VALUES (1)''',
    )
    

    正如 sum 的所有者 Team B 所做的那样,注意 Marquez 中 DataOps 视图的失败运行:

    image

    Team B 只能猜测 DAG 失败的可能原因,因为 DAG 最近没有更改。因此,团队决定检查 Marquez。

  2. 在 Marquez 中,导航到 Datasets 视图,并从右上角的命名空间下拉菜单中选择你的 Postgres 实例。然后,点击 <database>.public.counts 数据集并检查图表。你将在节点上找到架构:

    image

  3. 假设你不认识该列,并希望了解其原始名称及更改时间。点击节点将打开详情抽屉。在那里,使用版本历史查找架构更改的运行:

    image

  4. 在 Airflow 中,通过更新计算计数总和的任务以使用新列名来修复中断的下游 DAG:

    query2 = PostgresOperator(task_id='total',postgres_conn_id='example_db',sql='''
    -    INSERT INTO sums (value)
    -       SELECT SUM(value) FROM counts;
    +       SELECT SUM(value_1_to_10) FROM counts;'''
    )
    
  5. 重新运行 DAG。在 Marquez 中,通过查看 DataOps 视图中最近的运行历史来验证修复:

    image

后续步骤

  • 查看用于收集 Airflow DAG 元数据的 Marquez HTTP API,并学习如何使用 OpenLineage 构建自己的集成。
  • 查看可与 Airflow 一起使用的 openlineage-spark 集成。

反馈

你觉得本指南如何?请在 OpenLineage Slack 或 Marquez Slack 中告诉我们。你也可以通过 提交拉取请求 直接提出更改。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/95435.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/95435.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FPGA|Quartus II 中使用TCL文件进行引脚一键分配

在FPGA设计过程中&#xff0c;合理的引脚分配是确保硬件功能正确实现的关键步骤之一。Quartus II 提供了通过 TCL&#xff08;Tool Command Language&#xff09;脚本自动化引脚分配的功能&#xff0c;这不仅可以大大提高设计效率&#xff0c;还能够确保引脚分配的精确性和可重…

【Docker/Redis】服务端高并发分布式结构演进之路

目录 概述 常见概念 基本概念 应用&#xff08;Application&#xff09;/ 系统&#xff08;System&#xff09; 模块&#xff08;Module&#xff09;/ 组件&#xff08;Component&#xff09; 分布式&#xff08;Distributed&#xff09; 集群&#xff08;Cluster&#x…

【Excel】将一个单元格内​​的多行文本,​​拆分成多个单元格,每个单元格一行​​

​​所有文本都堆积在“prefix”列顶部的同一个单元格里&#xff08;很可能是B10单元格&#xff09;&#xff0c;并且它们是用空格分隔的&#xff0c;而不是换行符。​​因此&#xff0c;您不需要处理换行符&#xff0c;而是需要​​按“空格”进行分列&#xff0c;并且将分列后…

新手SEO操作第一步

内容概要 网站优化对于新手而言&#xff0c;常常感觉无从下手。别担心&#xff0c;这篇文章就是为你量身打造的入门指南。我们将从最基础也是最重要的关键词研究开始讲起&#xff0c;手把手教你如何精准找到目标用户搜索的词。掌握了关键词&#xff0c;接下来就是如何创作出搜索…

【高阶数据结构】秘法(一)——并查集:探索如何高效地管理集合

前言&#xff1a; 前面我们已经学习了简单的数据结构&#xff0c;包括栈与队列、二叉树、红黑树等等&#xff0c;今天我们继续数据结构的学习&#xff0c;但是难度上会逐渐增大&#xff0c;在高阶数据结构中我们要学习的重点是图等 目录 一、并查集的原理 二、并查集的基本操作…

spring boot 整合AI教程

1、pom.xml配置<?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4…

基于SpringBoot2+Vue2开发的储物柜管理系统

角色 管理员&#xff1a;管理系统、用户&#xff0c;管理储物柜用户&#xff1a;借用、归还储物柜&#xff0c;报修故障 技术栈 后端&#xff1a;Springboot2, JWT, PageHelper前端&#xff1a;Vue2数据库&#xff1a;MySQL 核心功能 提供智能储物柜管理&#xff0c;包括用户注…

uniapp中输入金额的过滤(只允许输入数字和小数点)

一、完整代码&#xff1a; <template><view class"numberIndex" :style"{ paddingTop: navbarHeight px }"><view class"custom-navbar" :style"{ paddingTop: statusBarHeight px }"><view class"navbar…

系统科学核心概念辨析及其在人工智能领域的应用研究:一个整合性分析框架

摘要&#xff1a;本文旨在系统性地梳理和辨析系统科学中的核心概念——结构、功能与层级。文章首先追溯系统思想的理论源流&#xff0c;确立其作为一种超越还原论的整体性研究范式。在此基础上&#xff0c;深度剖析系统结构的内在构成&#xff08;组分、框架、动态性&#xff0…

Ubuntu环境下删除Docker镜像与容器、配置静态IP地址

删除Docker镜像与容器删除容器&#xff1a;要删除特定的Docker容器&#xff0c;首先需要停止该容器&#xff1a;docker stop <container_id_or_name>然后可以使用以下命令删除它&#xff1a;docker rm <container_id_or_name>如果要强制删除正在运行的容器&#xf…

零样本视觉模型(DINOv3)

DINOv3是Meta于2025年8月14日发布的第三代自监督视觉基础模型&#xff0c;通过17亿张无标注图像训练&#xff0c;参数规模最大达70亿&#xff0c;首次在密集预测任务中全面超越弱监督模型&#xff0c;成为计算机视觉领域的里程碑。其核心突破在于无需人工标注即可生成高分辨率密…

【机器学习入门】5.2 回归的起源——从身高遗传到线性模型的百年演变

提到 “回归”&#xff0c;很多刚入门的同学会觉得它是个抽象的数学概念&#xff0c;但你可能想不到&#xff0c;这个术语的诞生&#xff0c;竟然源于 19 世纪一位生物学家对 “身高遗传” 的研究。回归分析从 “观察生物现象” 出发&#xff0c;逐步发展成机器学习中预测连续值…

轻型载货汽车变速器设计cad+设计说明书

摘 要 变速器是汽车重要的传动系组成&#xff0c;在较大范围内改变汽车行驶速度的大小和汽车驱动轮上扭矩的大小。变速器能在发动机旋转方向不变的前提下&#xff0c;使汽车倒退行驶&#xff0c;而且利用挡位可以中断动力的传递。所以变速器的结构设计的合理性直接影响到汽车动…

如何对嵌入式软件进行单元测试

ceedling就是一款嵌入式软件测试框架。ceedling是一个用ruby语言编写的C语言自动化测试框架&#xff0c;它集成了Cmock、Unity和Cexception等多个开源项目。在整个ceedling框架中&#xff0c;使用unity进行代码测试&#xff0c;使用CMock生成模拟函数&#xff0c;使用CExceptio…

通义万相Wan2.2-S2V-14B:AI视频生成的革命性突破与实践指南

一张图片+一段音频=电影级数字人视频?这不是魔法,是开源AI技术带来的现实。 近日,阿里巴巴通义万相团队开源了Wan2.2-S2V-14B模型,仅在短短几天内就引发了AI视频生成领域的震动。这个仅需**一张静态图片**和**一段音频**就能生成影视级质量视频的模型,正在改变我们对AI视…

基于 HTML、CSS 和 JavaScript 的智能图像锐化系统

目录 1 前言 2 技术实现 2.1 HTML&#xff1a;构建系统骨架​ 2.2 CSS&#xff1a;打造视觉与交互体验​ 2.3 JavaScript&#xff1a;实现核心锐化逻辑​ 3 代码解析 3.1 数据存储与初始化 3.2 图像加载流程 3.3 锐化算法核心&#xff1a;卷积计算​ 3.4 下载功能实现…

(MySQL)分布式锁

在分布式系统中&#xff0c;多个进程可能会同时对同一资源进行操作&#xff0c;如果没有同步机制&#xff0c;就会造成数据不一致问题。为了避免这种情况&#xff0c;需要分布式锁。Redis 是常见的实现方式&#xff0c;但在某些场景下&#xff0c;我们也可以使用 MySQL 来实现分…

基于RS-485接口的芯片的FPGA驱动程序

1.简介ADM3485E 是一款 3.3V 低功耗数据收发器&#xff0c;具有 15kV 的 ESD&#xff08;静电放电&#xff09; 保护&#xff0c;专为多点总线传输线上的半双工通信设计。它支持平衡数据传输&#xff0c;符合 TIA/EIA 标准 RS-485 和 RS-422 的要求。作为一款半双工收发器&…

SQLSERVER关键字:N

在 SQL Server 中&#xff0c;单独的 N 并不是一个 “关键字”&#xff0c;但它作为前缀有特殊含义 —— 用于标识字符串为 Unicode 字符串&#xff08;对应 NVARCHAR、NCHAR 等 Unicode 数据类型&#xff09;。具体作用当字符串前加 N 前缀时&#xff0c;SQL Server 会将该字符…

【MySQL基础】MySQL核心操作全解析

【MySQL基础】MySQL核心操作全解析前言一、数据库操作&#x1f636;‍&#x1f32b;️1.1 查看数据库&#x1f50d;1.2 创建数据库➕ 1.3 选择数据库&#x1f4cc; 1.4 删除数据库❌ 二、数据表操作&#x1f4cb; 2.1 创建数据表➕ 2.2 查看数据表&#x1f50d; 2.3 查看表结构…