血缘元数据采集开放标准:OpenLineage Guides 在 Airflow 中使用 OpenLineage Proxy

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

在 Airflow 中使用 OpenLineage Proxy

本教程介绍如何将 OpenLineage Proxy 与 Airflow 结合使用。OpenLineage 提供多种集成方案,可在使用 Airflow 集成 时让 Airflow 发出 OpenLineage 事件。本教程将使用 Docker Compose 运行本地 Airflow 实例,并学习如何启用和配置 OpenLineage 以发出数据血缘事件。教程将使用两个后端来查看数据血缘:1)Proxy,2)Marquez。

目录

  • 使用 Docker Compose 搭建本地 Airflow 环境
  • 配置 Marquez
  • 启动所有服务
  • 访问 Airflow UI
  • 运行示例 DAG

使用 Docker Compose 搭建本地 Airflow 环境

Airflow 提供一种便捷方式,通过 Docker Compose 搭建并运行完整环境。因此,在开始本教程前,需先安装以下组件。

前提条件

  • Docker 20.10.0+
  • Docker Desktop
  • Docker Compose
  • Java 11

若使用 macOS Monterey(macOS 12),需通过禁用 AirPlay 接收器释放 5000 端口。若需访问 Marquez Web UI,还需确保 3000 端口空闲。

参考以下指南使用 Docker Compose 搭建并运行 Airflow。

首先,创建一个新目录,用于存放所有工作文件。

mkdir ~/airflow-ol &&
cd ~/airflow-ol

然后,下载我们将要运行的 Docker Compose 文件。

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml'

这将允许向 Docker 容器传递新的环境变量 OPENLINEAGE_URL,OpenLineage 需要该变量才能工作。

接着,创建以下目录,这些目录将被挂载并由启动 Airflow 的 Docker Compose 使用。

mkdir dags &&
mkdir logs &&
mkdir plugins

同时,创建 .env 文件,其中包含 Airflow 用于安装所需额外 Python 包的环境变量。本教程将安装 openlineage-airflow 包。

echo "_PIP_ADDITIONAL_REQUIREMENTS=openlineage-airflow" > .env

还需告知 OpenLineage 将血缘数据发送至何处。

echo "OPENLINEAGE_URL=http://host.docker.internal:4433" >> .env

将后端设置为 host.docker.internal 的原因是我们将在主机而非 Airflow 的 Docker 环境中运行 OpenLineage Proxy。代理将在 4433 端口监听血缘数据。

将 OpenLineage Proxy 配置为接收端

OpenLineage Proxy 是一个简单工具,可轻松搭建并运行以接收 OpenLineage 数据。代理本身不执行任何操作,仅显示接收到的数据。可选地,它还可通过 HTTP 将数据转发至任何兼容的 OpenLineage 后端。

从 git 下载代理代码并构建:

cd ~ &&
git clone https://github.com/OpenLineage/OpenLineage.git &&
cd OpenLineage/proxy/backend &&
./gradlew build

现在,复制 proxy.dev.yml 并按以下内容编辑,保存为 proxy.yml

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.server:applicationConnectors:- type: httpport: ${OPENLINEAGE_PROXY_PORT:-4433}adminConnectors:- type: httpport: ${OPENLINEAGE_PROXY_ADMIN_PORT:-4434}logging:level: ${LOG_LEVEL:-INFO}appenders:- type: consoleproxy:source: openLineageProxyBackendstreams:- type: Console- type: Httpurl: http://localhost:5000/api/v1/lineage

配置 Marquez

最后一步是配置 Marquez 后端。使用 Marquez 的快速开始文档搭建 Marquez 环境。

cd ~ &&
git clone https://github.com/MarquezProject/marquez.git

在 marquez/docker-compose.dev.yml 中,更改 pghero 的端口,以释放 8080 端口供 Airflow 使用:

version: "3.7"
services:api:build: .seed_marquez:build: .pghero:image: ankane/pgherocontainer_name: pgheroports:- "8888:8888"environment:DATABASE_URL: postgres://postgres:password@db:5432

启动所有服务

启动 Marquez

启动 Docker Desktop,然后:

cd ~/marquez &&
./docker/up.sh

启动 OpenLineage proxy

cd ~/OpenLineage/proxy/backend &&
./gradlew runShadow

启动 Airflow

cd ~/airflow-ol
docker-compose up

airflow_dev_setup

此时,Apache Airflow 应已运行,并能够将血缘数据发送至 OpenLineage Proxy,而 OpenLineage Proxy 将数据转发至 Marquez。因此,我们既可以检查数据负载,也可以以图形形式查看血缘数据。

访问 Airflow UI

所有服务启动后,现在可通过浏览器访问 Airflow UI,地址为 http://localhost:8080

初始登录 ID 和密码为 airflow/airflow

运行示例 DAG

登录 Airflow UI 后,会看到启动时已预填充的多个示例 DAG。我们可以运行其中一些,以查看它们生成的 OpenLineage 事件。

运行 Bash Operator

在 DAGs 页面,找到 example_bash_operator

airflow_trigger_dag

点击右侧的 ► 按钮,将弹出提示框。选择 Trigger DAG 手动触发并运行 DAG。

将看到 DAG 运行并最终完成。

检查 OpenLineage 事件

一切完成后,应在 OpenLineage proxy 的控制台中看到多条 JSON 数据负载输出。

INFO  [2022-08-16 21:39:41,411] io.openlineage.proxy.api.models.ConsoleLineageStream: {"eventTime" : "2022-08-16T21:39:40.854926Z","eventType" : "START","inputs" : [ ],"job" : {"facets" : { },"name" : "example_bash_operator.runme_2","namespace" : "default"},"outputs" : [ ],"producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","run" : {"facets" : {"airflow_runArgs" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet","externalTrigger" : true},"airflow_version" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet","airflowVersion" : "2.3.3","openlineageAirflowVersion" : "0.12.0","operator" : "airflow.operators.bash.BashOperator","taskInfo" : "{'_BaseOperator__init_kwargs': {'task_id': 'runme_2', 'params': <***.models.param.ParamsDict object at 0xffff7467b610>, 'bash_command': 'echo \"example_bash_operator__runme_2__20220816\" && sleep 1'}, '_BaseOperator__from_mapped': False, 'task_id': 'runme_2', 'task_group': <weakproxy at 0xffff74676ef0 to TaskGroup at 0xffff7467ba50>, 'owner': '***', 'email': None, 'email_on_retry': True, 'email_on_failure': True, 'execution_timeout': None, 'on_execute_callback': None, 'on_failure_callback': None, 'on_success_callback': None, 'on_retry_callback': None, '_pre_execute_hook': None, '_post_execute_hook': None, 'executor_config': {}, 'run_as_user': None, 'retries': 0, 'queue': 'default', 'pool': 'default_pool', 'pool_slots': 1, 'sla': None, 'trigger_rule': <TriggerRule.ALL_SUCCESS: 'all_success'>, 'depends_on_past': False, 'ignore_first_depends_on_past': True, 'wait_for_downstream': False, 'retry_delay': datetime.timedelta(seconds=300), 'retry_exponential_backoff': False, 'max_retry_delay': None, 'params': <***.models.param.ParamsDict object at 0xffff7467b4d0>, 'priority_weight': 1, 'weight_rule': <WeightRule.DOWNSTREAM: 'downstream'>, 'resources': None, 'max_active_tis_per_dag': None, 'do_xcom_push': True, 'doc_md': None, 'doc_json': None, 'doc_yaml': None, 'doc_rst': None, 'doc': None, 'upstream_task_ids': set(), 'downstream_task_ids': {'run_after_loop'}, 'start_date': DateTime(2021, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'end_date': None, '_dag': <DAG: example_bash_operator>, '_log': <Logger ***.task.operators (INFO)>, 'inlets': [], 'outlets': [], '_inlets': [], '_outlets': [], '_BaseOperator__instantiated': True, 'bash_command': 'echo \"example_bash_operator__runme_2__20220816\" && sleep 1', 'env': None, 'output_encoding': 'utf-8', 'skip_exit_code': 99, 'cwd': None, 'append_env': False}"},"nominalTime" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet","nominalStartTime" : "2022-08-16T21:39:38.005668Z"},"parentRun" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet","job" : {"name" : "example_bash_operator","namespace" : "default"},"run" : {"runId" : "39ad10d1-72d9-3fe9-b2a4-860c651b98b7"}}},"runId" : "313b4e71-9cde-4c83-b641-dd6773bf114b"}
}

检查 Marquez

还可打开浏览器访问 http://localhost:3000 进入 Marquez UI,查看来自 Airflow 的 OpenLineage 事件。

marquez_bash_jobs

运行其他 DAG

由于教程篇幅限制,此处不再运行其他示例 DAG,但你可以尝试运行它们,观察各 DAG 如何发出 OpenLineage 事件。请尝试运行其他示例,如 example_python_operator,它也会发出 OpenLineage 事件。

通常,当 DAG 运行涉及某些被使用或创建的 dataset 时,数据血缘会更加完整和有用。运行这些 DAG 后,你将能看到不同 DAG 和任务如何连接同一数据集,最终形成如下所示的数据血缘图:

以下是目前已具备提取器、能够提取并发出 OpenLineage 事件的 Airflow 算子:

  • PostgresOperator
  • MySqlOperator
  • BigQueryOperator
  • SnowflakeOperator
  • GreatExpectationsOperator
  • PythonOperator

更多可在 Airflow 中运行的 OpenLineage 示例 DAG,请参阅 Apache 示例。

故障排查

  • 若未在 proxy 或 Marquez 中看到任何数据,请检查 Airflow 的任务日志,查看是否出现以下消息:[2022-08-16, 21:23:19 UTC] {factory.py:122} ERROR - Did not find openlineage.yml and OPENLINEAGE_URL is not set。若出现,说明环境变量 OPENLINEAGE_URL 未正确设置,导致 OpenLineage 无法发出任何事件。请确保在通过 docker compose 设置 Airflow 时正确设置了环境变量。
  • 有时 Marquez 可能无响应,无法通过 API 端口 5000 接收数据。若发现收到 Marquez 的 500 响应码,或 Marquez UI 卡死,只需停止并重启 Marquez 即可。

结论

本简短教程介绍了如何搭建并运行一个简单的 Apache Airflow 环境,使其在 DAG 运行期间能够发出 OpenLineage 事件。我们还通过 OpenLineage proxy 与 Marquez 的组合,监控并接收了血缘事件。希望本教程有助于理解如何将 Airflow 与 OpenLineage 结合,以及如何轻松使用 proxy 和 Marquez 监控其数据和最终结果。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/95479.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/95479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】网络(中)

目录1. 序列化和反序列化1.1 序列化1.2 反序列化2. 网络版本计算器&#xff08;自定义协议&#xff09;3. 再次理解OSI七层模型4. HTTP协议4.1 HTTP协议格式4.2 HTTP的方法4.3 HTTP的状态码4.4 HTTP常见Header4.5 长连接和短连接4.6 Cookie5. HTTPS协议5.1 对称加密和非对称加密…

AI 写作实战:用 GPT-4o+ Claude 3 生成小红书文案,转化率提升 30%

引言・AI 写作开启小红书营销新引擎在社交媒体营销的浪潮中&#xff0c;小红书以其独特的社区氛围和庞大的年轻用户群体&#xff0c;成为品牌推广的关键阵地。然而&#xff0c;撰写既吸引眼球又能高效转化的文案并非易事&#xff0c;传统人工编写不仅耗时费力&#xff0c;还难以…

一个月涨粉30万,Coze智能体一键生成民间传说爆款视频,3分钟上手

最近发现一个账号&#xff0c;用AI将民间传说故事转化为生动视频&#xff0c;短短一个月涨粉30万&#xff0c;条均播放 量破百万。这种视频制作真的需要专业团队吗&#xff1f;今天教大家用Coze智能体工作流&#xff0c;一键生成 爆款民间故事视频&#xff01;工作流功能 用Coz…

Linux arm64 PTE contiguous bit

文章目录一、简介1.1 contiguous PTE1.2 demo二、Linux 内核中的实现2.1 宏定义2.2 __create_pgd_mapping2.2.1 alloc_init_cont_pmdinit_pmd2.2.2 alloc_init_cont_pteinit_pte2.3 hugetlbpage2.3.1 find_num_contig2.3.2 num_contig_ptes2.3.3 huge_pte_offset2.3.4 huge_pte…

深入分析 json2(新)与标准的 jsonrpc的区别

这两个模块都用于实现 JSON 风格的远程过程调用&#xff08;RPC&#xff09;接口&#xff0c;但设计哲学、使用方式、安全性和现代化程度有显著差异。 &#x1f4c2; 对比背景 文件 功能 来源 jsonrpc.py 标准的 JSON-RPC 2.0 兼容接口 Odoo 内核已有逻辑 json2.py 自定…

IO_HW_9_3

一、使用消息队列实现两个程序间的相互通信二、思维导图三、牛客网

fastlio配置与过程中遇到的问题

&#x1f680; Fast-LIO 安装与运行指南 我之前已经创建并使用原有的工作空间 catkin_ws,如果没有创建一个。 使用环境 ubantu20.04 ros1 noetic版本 我作的是要在已有的 ~/catkin_ws 中编译 原版 FAST-LIO&#xff08;来自 HKU-MARS 官方仓库&#xff09;。 最终下载官方文档中…

Python 工具: Windows 带宽监控工具

Python 工具&#xff1a; Windows 带宽监控工具环境介绍会使用的库多线程关键代码&#xff1a;系统流量采集&#xff1a;用 psutil 获取网络数据概念&#xff1a;网络流量的“增量”与“总量”代码中的流量采集逻辑Flask Web框架&#xff1a;搭建后端服务前端部分交互逻辑&…

【Java】Redis(中间件)

一、对Redis的理解Reids是一种基于内存的数据库&#xff0c;对数据的读写操作都在内存中完成&#xff0c;因此读写速度非常快&#xff0c;常用于缓存、消息队列、分布式锁等场景。除此之外&#xff0c;Redis还支持事务、持久化、Lua脚本、多种集群方案&#xff08;主从复制模式…

【题解】洛谷P1776 宝物筛选 [单调队列优化多重背包]

二进制优化还是不够快&#xff0c;如果我们想时间复杂度为 &#xff0c;还得找新的方法。 &#xff08;W 为背包最大可承载量&#xff0c;N 为物品种类数&#xff09; 例题&#xff1a;P1776 宝物筛选 - 洛谷 原来的转移式很普通&#xff1a; 注意到对于每个 &#xff0c;有…

数据结构_循环队列_牺牲一个存储空间_不牺牲额外的存储空间 Circular Queue(C语言实现_超详细)

目录循环队列的引出区别普通队列和循环队列两种循环队列的概念循环队列深入理解题目&#xff1a;此题&#xff0c;分为牺牲一个额外空间和不牺牲一个额外空间不牺牲一个额外空间完成第一步完成第二步完成第三步完成第四步牺牲一个额外空间完成第一步完成第二步完成第三步完成第…

Linux_网络基础

✨✨ 欢迎大家来到小伞的大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;LInux_st 小伞的主页&#xff1a;xiaosan_blog 制作不易&#xff01;点个赞吧&#xff01;&#xff01;谢谢喵&#xff01;&a…

Portainer:Docker可视化管理神器部署与使用攻略

Portainer是一款优秀的Docker可视化管理工具&#xff0c;它提供了简洁美观的Web界面&#xff0c;可以通过点击鼠标轻松管理Docker环境。 一、Portainer简介 Portainer是一个轻量级的Docker管理界面&#xff0c;具有以下特点&#xff1a; 可视化操作&#xff1a;通过Web界面管…

OVITO3.13.1_ Mac中文_材料科学、物理及化学领域设计的数据可视化和分析软件_安装教程

软件下载 【名称】&#xff1a;****OVITO3.13.1Mac中文 【大小】&#xff1a;****154M 【语言】&#xff1a;简体中文 【安装环境】&#xff1a;****mac 【网站下载链接】&#xff1a; https://a-xing.top/3008.html软件应用 软件应用 Ovito能做什么&#xff1f; Ovito的功能十…

MySQL 开发避坑:DROP TABLE 前你必须知道的几件事

MySQL 中删除表主要使用 DROP TABLE 语句。这是一个需要非常谨慎的操作&#xff0c;因为一旦执行&#xff0c;表结构和表中的所有数据都会被永久删除。1. 基本语法&#xff1a;删除单个表sqlDROP TABLE [IF EXISTS] table_name;* DROP TABLE: 核心命令&#xff0c;用于删除表…

浅谈人工智能之阿里云搭建coze平台

浅谈人工智能之阿里云搭建coze平台 一、部署环境准备 阿里云服务器配置要求 ○ 规格&#xff1a;最低2核CPU 4GB内存&#xff08;推荐4核8GB保障流畅运行&#xff09;&#xff0c;作者原先想要利旧&#xff0c;使用了2核2GB的服务器&#xff0c;但是跑不起来&#xff0c;后来自…

ego(2)---初始轨迹生成后的关键点采样

在初始的多项式轨迹生成后&#xff0c;是要经过一个关键点采样&#xff0c;使用关键点来进行后续的 B 样条曲线拟合的。即&#xff1a;初始多项式拟合->关键点采样->B样条拟合关键点采样的思路关键点采样使用时间步长 ts 来在初始轨迹方程中取点。在上一步的初始轨迹生成…

专项智能练习(信息安全防护措施)

3.以下属于网络安全威胁的是&#xff08;A &#xff09;。 A.非授权访问、病毒感染、信息泄露、拒绝网络服务 B.信息泄露、非授权访问、病毒感染、硬盘损坏 C.信息篡改、非授权访问、病毒感染、硬盘损坏 D.网络异常、非授权访问、信息篡改、病毒感染 解析本题考查网络安全威胁。…

ubuntu编译webrtc库

一. 前言 本文介绍在 ubuntu 下如何通过 webrtc 源码编译出指定版本 webrtc.lib 库&#xff08;以 m94 版本为例&#xff09;。 二. 编译步骤 1. 下载depot_tools工具 depot_tools 是 Google 用来管理大型项目代码&#xff08;例如 WebRTC&#xff09;的工具集&#xff0c;它…

基于ZooKeeper实现分布式锁(Spring Boot接入)及与Kafka实现的对比分析

在分布式系统中,多节点对共享资源的并发访问往往会引发数据一致性问题,分布式锁正是解决这一问题的核心组件。本文将从原理出发,详细讲解基于ZooKeeper实现分布式锁的完整流程,提供Spring Boot接入的可运行代码,并深入对比其与Kafka实现分布式锁的异同点及优缺点,帮助开发…