使用Dagster资产工厂模式高效管理重复ETL任务

本文介绍了如何利用Dagster的资产工厂模式来高效管理和自动化重复的ETL(提取、转换、加载)任务。通过Python函数和YAML配置文件的结合,我们可以轻松地创建和管理大量相似的资产,同时提高代码的可维护性和可配置性。文章还探讨了如何使用Pydantic和Jinja进一步提升配置文件的类型安全性和灵活性。

资产工厂模式简介

在数据工程领域,我们经常需要处理大量相似的ETL任务。例如,从S3下载CSV文件,执行SQL查询,然后将结果上传回S3。这种重复性工作不仅耗时,而且容易出错。幸运的是,Dagster提供了资产工厂模式,可以帮助我们优雅地解决这个问题。

资产工厂模式是一种设计模式,它允许我们通过一个函数接收配置参数,并返回一组Dagster资产定义。这种方法非常适合处理大量相似的资产,因为它可以将重复的逻辑封装在一个函数中,从而减少代码冗余和提高可维护性。

在这里插入图片描述

Python实现资产工厂

让我们通过一个具体的例子来了解如何使用Python实现资产工厂。假设我们有一个团队需要频繁地从S3下载CSV文件,执行SQL查询,然后将结果上传回S3。我们可以定义一个资产工厂函数build_etl_job,它接收S3资源配置、桶名、源文件、目标文件和SQL查询作为参数,并返回一组Dagster资产定义。

import tempfileimport dagster_aws.s3 as s3
import duckdbimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# asset keys cannot contain '.'asset_key = f"etl_{bucket}_{target_object}".replace(".", "_")@dg.asset(name=asset_key)def etl_asset(context):with tempfile.TemporaryDirectory() as root:source_path = f"{root}/{source_object}"target_path = f"{root}/{target_object}"# these steps could be split into separate assets, but# for brevity we will keep them together.# 1. extractcontext.resources.s3.download_file(bucket, source_object, source_path)# 2. transformdb = duckdb.connect(":memory:")db.execute(f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}');")db.query(sql).to_csv(target_path)# 3. loadcontext.resources.s3.upload_file(bucket, target_object, target_path)return dg.Definitions(assets=[etl_asset],resources={"s3": s3_resource},)s3_resource = s3.S3Resource(aws_access_key_id="...", aws_secret_access_key="...")defs = dg.Definitions.merge(build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="raw_transactions.csv",target_object="cleaned_transactions.csv",sql="SELECT * FROM source WHERE amount IS NOT NULL;",),build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="all_customers.csv",target_object="risky_customers.csv",sql="SELECT * FROM source WHERE risk_score > 0.8;",),
)

通过这种方式,我们可以轻松地创建和管理多个相似的ETL任务,只需调用build_etl_job函数并传入不同的配置参数即可。

使用YAML配置资产工厂

虽然Python实现已经大大简化了代码,但我们还可以进一步改进,使配置更加灵活和易于管理。我们可以使用YAML文件来定义ETL任务的配置,并通过Python代码解析YAML文件并创建资产定义。

etl_jobs.yaml
aws:access_key_id: "YOUR_ACCESS_KEY_ID"secret_access_key: "YOUR_SECRET_ACCESS_KEY"
etl_jobs:- bucket: my_bucketsource: raw_transactions.csvtarget: cleaned_transactions.csvsql: SELECT * FROM source WHERE amount IS NOT NULL- bucket: my_bucketsource: all_customers.csvtarget: risky_customers.csvsql: SELECT * FROM source WHERE risk_score > 0.8

然后,我们可以编写一个Python函数load_etl_jobs_from_yaml来解析YAML文件并创建资产定义。

import dagster_aws.s3 as s3
import yamlimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# Code from previous example omittedreturn dg.Definitions()def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:config = yaml.safe_load(open(yaml_path))s3_resource = s3.S3Resource(aws_access_key_id=config["aws"]["access_key_id"],aws_secret_access_key=config["aws"]["secret_access_key"],)defs = []for job_config in config["etl_jobs"]:defs.append(build_etl_job(s3_resource=s3_resource,bucket=job_config["bucket"],source_object=job_config["source"],target_object=job_config["target"],sql=job_config["sql"],))return dg.Definitions.merge(*defs)defs = load_etl_jobs_from_yaml("etl_jobs.yaml")

用Pydantic 和 Jinja提升配置文件的类型安全性和灵活性

虽然YAML配置文件已经大大简化了配置过程,但它仍然存在一些问题,例如缺乏类型检查和安全性。为了解决这些问题,我们可以使用Pydantic来定义配置文件的schema,并使用Jinja来模板化配置文件中的环境变量。

首先定义etl_jobs.yaml文件:

aws:access_key_id: "{{ env.AWS_ACCESS_KEY_ID }}"secret_access_key: "{{ env.AWS_SECRET_ACCESS_KEY }}"etl_jobs:- bucket: my_bucketsource: raw_transactions.csvtarget: cleaned_transactions.csvsql: SELECT * FROM source WHERE amount IS NOT NULL- bucket: my_bucketsource: all_customers.csvtarget: risky_customers.csvsql: SELECT * FROM source WHERE risk_score > 0.8

python实现代码:

import os
from typing import Listimport dagster_aws.s3 as s3
import jinja2
import pydantic
import yamlimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# Code from previous example omittedreturn dg.Definitions()class AwsConfig(pydantic.BaseModel):access_key_id: strsecret_access_key: strdef to_resource(self) -> s3.S3Resource:return s3.S3Resource(aws_access_key_id=self.access_key_id,aws_secret_access_key=self.secret_access_key,)class JobConfig(pydantic.BaseModel):bucket: strsource: strtarget: strsql: strdef to_etl_job(self, s3_resource: s3.S3Resource) -> dg.Definitions:return build_etl_job(s3_resource=s3_resource,bucket=self.bucket,source_object=self.source,target_object=self.target,sql=self.sql,)class EtlJobsConfig(pydantic.BaseModel):aws: AwsConfigetl_jobs: list[JobConfig]def to_definitions(self) -> dg.Definitions:s3_resource = self.aws.to_resource()return dg.Definitions.merge(*[job.to_etl_job(s3_resource) for job in self.etl_jobs])def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:yaml_template = jinja2.Environment().from_string(open(yaml_path).read())config = yaml.safe_load(yaml_template.render(env=os.environ))return EtlJobsConfig.model_validate(config).to_definitions()defs = load_etl_jobs_from_yaml("etl_jobs_with_jinja.yaml")

通过这种方式,我们可以确保配置文件的类型安全性,并且可以轻松地引用环境变量,从而避免在配置文件中硬编码敏感信息。

总结

通过本文的介绍,我们了解了如何使用Dagster的资产工厂模式来高效管理和自动化重复的ETL任务。从Python实现到YAML配置,再到使用Pydantic和Jinja提升配置文件的类型安全性和灵活性,我们一步步地构建了一个强大而灵活的ETL任务管理框架。这种方法不仅可以减少代码冗余,提高可维护性,还可以使配置更加灵活和安全。希望本文能为你在数据工程领域的实践提供有价值的启发和帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/911734.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/911734.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈开源在线客服系统与 APP 集成的技术方案与优劣势

在为移动端 App 接入在线客服系统的过程中,我经历了长时间的技术选型探索。最初,我也曾被一些“技术理想主义”选项所吸引,比如让用户自己研发界面我提供 API 以获得最高自由度,或集成 SDK 以追求原生体验。然而,随着项…

prometheus+grafana+MySQL监控

prometheusgrafanaMySQL监控 环境说明 操作前提: 先去搭建Docker部署prometheusgrafana...这篇文章的系统 Docker部署prometheusgrafana...的参考文章: Docker部署prometheusgrafana…-CSDN博客 在的节点服务器上搭建MySQL数据库(可以采用直…

多模态+类人认知:Embodied AI迈向AGI的三大瓶颈与突破路径

作者:Yequan Wang;Aixin Sun 摘要 AGI常被视为本质上具有具身特性。随着机器人技术和基础人工智能模型的最新进展,我们正站在一个新时代的门槛上——这一时代以日益通用化的具身人工智能系统为标志。本文通过提出一个涵盖五个层级&#xff…

wpf DataTemplate 宽度和控件宽度一样

wpf DataTemplate 宽度和控件宽度一样 在WPF中,如果你想要一个DataTemplate的宽度与它内部控件的宽度一致,你可以使用几种不同的方法来实现这一点。下面是一些常见的方法: 方法1:使用DataTemplate的Width属性 你可以在DataTemplate中直接设置Width属性,使其与内部控件的…

C#上位机实现报警语音播报

我们在开发C#上位机时,有时候会需要将报警信息通过语音进行播报,今天跟大家分享一下具体的实现过程。 一、组件安装 首先我们创建好一个Windows窗体项目,然后添加System.Speech库引用。 点击引用,右击添加引用,在程…

01-StarRocks安装部署FAQ

StarRocks安装部署FAQ 概述 本文档整理了StarRocks安装部署过程中常见的问题和解决方案,涵盖了环境准备、集群部署、配置优化等各个方面,帮助用户快速解决安装部署过程中遇到的问题。 环境准备FAQ Q1: StarRocks对硬件配置有什么要求? A: StarRocks的硬件配置要求如下:…

MinIO入门教程:从零开始搭建方便快捷的分布式对象存储服务

目录 一、MinIO简介二、环境准备三、MinIO服务部署1. 下载指定版本MinIO镜像2. 启动MinIO容器3. 参数详解 四、访问MinIO控制台1. 在浏览器中打开管理控制台:2. 输用户名和密码登录3. 创建存储桶Bucket4. 设置访问权限为公有5. 上传文件6. 访问文件 一、MinIO简介 …

多卡解决报错torch.distributed.elastic.multiprocessing.errors.ChildFailedError的问题

使用多卡运行 Pytorch出现下面的报错: E0619 10:29:15.774000 5065 site-packages/torch/distributed/elastic/multiprocessing/api.py:874] failed (exitcode: -11) local_rank: 0 (pid: 5184) of binary: /root/miniconda3/bin/python Traceback (most recent ca…

Kubernetes 架构的两种节点

前言 Kubernetes 采用主从(master-node)架构模式,主要由主节点,也称 控制平面(Control Plane)和工作节点(node)组成。 master 节点职责: ‌集群管理‌:负责整个集群的全局决策和状态管理API服务‌:通过 kube-apiser…

数据迷雾中的灯塔:奥威BI+AI数据分析如何照亮企业决策之路

决策进化史:从“盲人摸象”到“智能导航” 在每天83%的中国企业所面临的决策场景中,数据往往沉默不语,无法为管理者提供明确的指引。从决策依赖人工统计的“石器时代”(2010年前),到依赖静态报表的“铁器时…

Flutter 与 原生(Android/iOS)通信 Platform Channel

在Flutter中,Platform Channel是实现Flutter与原生平台(Android/iOS)通信的核心机制,其设计遵循轻量级异步通信原则,用于解决Flutter跨平台开发时与原生功能的交互需求。 一、核心作用 Flutter作为跨平台框架&#x…

django调用 paramiko powershell 获取cpu 个数

在Django中调用paramiko库执行PowerShell命令来获取CPU个数,可以通过以下步骤实现: 步骤1:安装paramiko 首先,确保你的Django项目中已经安装了paramiko库。如果尚未安装,可以通过pip安装: pip install pa…

React 表单太卡?也许你用错了控制方式

🎙 欢迎来到《前端达人 播客书单》第 23 期。 视频版(播客风格更精彩) 今天我们聚焦一个「写前端永远逃不掉」的主题:表单处理。 你有没有遇到过这些问题: 表单怎么一改就卡?state 是不是用错了&#xff1…

`customRef` 在实战中的使用:防抖、计算属性缓存和异步数据获取

🤍 前端开发工程师、技术日更博主、已过CET6 🍨 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 🕠 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》、《前端求职突破计划》 🍚 蓝桥云课签约作者、…

腾讯混元3D制作角色模型的教程-3

腾讯混元3D中实现角色骨骼绑定与动画生成的详细操作指南,结合官方功能说明及实操要点整理: ⚙️ 一、前期准备:模型要求 角色姿态规范 仅支持标准T-pose(大字型站立) 的全身人物模型(如卡通角色)。 非标准姿态或非人形模型(如动物、道具)暂不支持自动绑骨。 模型来源…

React 和 Vue 项目中集成基于 Svelte 的 `Bytemd` 库 || @bytemd/react` 底层实现原理

Bytemd 并使用Svelte 框架编写的。Svelte 是一种不同的前端框架,它的核心思想是在编译时将组件代码转换成高效、原生 JavaScript,从而避免运行时虚拟 DOM 的开销。 理解了这一点,我们就可以深入探讨如何在 React 和 Vue 项目中适配 Svelte 编…

【新品解读】高性能紧凑型 RFSoC FPGA 开发平台 AXW22,重塑射频开发体验

如果您正在烦恼如何在有限的物理空间和预算内,依然实现卓越的射频带宽与处理能力,ALINX 基于 AMD RFSoC FPGA 开发板 AXW22 正是为您准备的。 (AMD Zynq UltraScale RFSoC FPGA 射频开发平台 AXW22) 和所有 RFSoC 平台一样&#…

Spring @ModelAttribute注解全解析:数据绑定与模型管理

Spring 的 @ModelAttribute 注解主要用于数据绑定和模型属性管理,支持方法级别和参数级别的应用,以下是其核心特性和使用场景: 🔧 一、核心功能 数据绑定 将 HTTP 请求参数(如表单字段、查询参数)自动绑定到 Java 对象。支持从请求参数、URI 路径变量、请求头等多来源获…

[project-based-learning] 开源贡献指南 | 自动化链接验证 | Issue模板规范

第四章:贡献指南 欢迎回来!在上一章《项目分类体系》中,我们探讨了README.md文件如何通过编程语言和子类别组织教程,从而提升检索效率。 现在已了解教程列表的构成(《教程列表》)、条目编写规范&#xff…

OSCP备战-LordOfTheRoot靶机复现步骤

PDF下载: Target-practice/Range at main szjr123/Target-practice 一、靶机描述 靶机地址:https://www.vulnhub.com/entry/lord-of-the-root-101,129/ 靶机难度:中等(CTF) 靶机描述:这是KoocSec为黑…