Dagster Pipes系列-1:调用外部Python脚本

本文是"Dagster Pipes教程"的第一部分,介绍如何通过Dagster资产调用外部Python脚本并集成到数据管道中。首先,创建Dagster资产subprocess_asset,利用PipesSubprocessClient资源执行外部脚本external_code.py,实现跨进程的数据处理。通过dagster dev启动UI,可在Dagster界面中监控子进程的执行状态和日志输出,包括标准输出(stdout)内容。本文详细讲解了资产定义、资源注入及命令执行的完整流程,为后续修改外部代码以支持Dagster Pipes通信奠定基础。此方法适用于需要将现有脚本集成到Dagster数据管道的场景,提升自动化与可观测性。完成本部分后,读者可继续学习第二部分,掌握如何增强外部脚本与Dagster的交互能力。

教程概述

本教程将指导你完成以下步骤:

  1. 创建一个调用外部Python脚本的Dagster资产
  2. 定义必要的Dagster资源(resources)
  3. 在Dagster UI中运行并查看结果
    在这里插入图片描述

前提条件

在开始之前,请确保你已经:

  • 安装了Dagster
  • 创建了一个名为external_code.py的独立Python脚本,内容如下:
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2],"item_id": [432, 878]})total_orders = len(orders_df)print(f"processing total {total_orders} orders")

第一步:定义Dagster资产

首先,在与external_code.py相同的目录下创建一个名为dagster_code.py的新文件。

1.1 创建资产定义

将以下代码复制到dagster_code.py中:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()

代码解析:

  • 我们创建了一个名为subprocess_asset的资产
  • 使用AssetExecutionContext作为上下文参数,它提供了系统信息如资源、配置和日志记录
  • 指定了PipesSubprocessClient资源
  • 构建了一个命令列表来执行外部脚本
  • 使用pipes_subprocess_client.run()方法在管道会话中同步执行子进程

1.2 从资产调用外部代码

上述代码中的关键部分是:

pipes_subprocess_client.run(command=cmd,context=context
).get_materialize_result()

这段代码做了什么:

  • PipesSubprocessClient资源暴露了一个run方法
  • 当资产执行时,这个方法会在管道会话中同步执行子进程
  • 返回一个PipesClientCompletedInvocation对象
  • 可以使用get_materialize_result()方法访问子进程报告的MaterializeResult事件

第二步:定义Definitions对象

为了让Dagster工具(如CLI、UI和Dagster+)能够加载和访问资产及子进程资源,我们需要创建一个Definitions对象。

dagster_code.py文件末尾添加以下代码:

from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

此时,dagster_code.py文件应该如下所示:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

第三步:从Dagster UI运行子进程

现在,让我们在Dagster UI中执行我们创建的子进程资产。

  1. 在新的命令行会话中运行以下命令启动UI:

    dagster dev -f dagster_code.py
    
  2. 点击右上角的"Materialize"按钮来运行你的代码

  3. 导航到"Run details"页面,在这里你可以看到运行的日志

  4. external_code.py中,我们有一个打印语句将输出到stdout。Dagster会在UI的原始计算日志视图中显示这些内容。

  5. 要查看stdout日志,切换日志部分到stdout:

在这里插入图片描述

下一步

到目前为止,你已经创建了一个调用外部Python脚本的Dagster资产,在子进程中执行了代码,并在Dagster UI中查看了结果。接下来,你将学习如何修改外部代码以与Dagster Pipes配合工作,将信息发送回Dagster。

总结

通过本教程的第一部分,我们实现了:

  • 创建了一个Dagster资产来调用外部Python脚本
  • 配置了必要的资源来支持子进程执行
  • 在Dagster UI中成功运行并查看了结果

这个基础设置为你在后续步骤中实现更复杂的管道通信打下了良好的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/80803.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/80803.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SQL系列】多表关联更新

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

C++进阶学习:STL常用容器--map/multimap容器

1. map 容器基本概念 map 中所有元素都是 pair pair 中第一个元素为 key (键值) 起到索引运用 第二个元素为 value(实值) 所有元素都会根据元素的键值自动排序 本质: map/multimap 属于关联式容器 底层结构是用二…

let,const,var关键字的区别

let,const,var关键字 let,const,var都存在变量提升 它们都存在变量提升但是稍微有点不同 var变量声明会被提升到作用域的顶部,并且会被初始化为 undefinedlet 和 const:变量声明也会被提升到作用域的顶部,但不会被初…

Nuitka 已经不再安全? Nuitka/Cython 打包应用逆向工具 -- pymodhook

pymodhook是一个记录任意对Python模块的调用的库,用于Python逆向分析。 pymodhook库类似于Android的xposed框架,但不仅能记录函数的调用参数和返回值,还能记录模块的类的任意方法调用,以及任意派生对象的访问,基于pyob…

path环境变量满了如何处理,分割 PATH 到 Path1 和 Path2

要正确设置 Path1 的值,你需要将现有的 PATH 环境变量 中的部分路径复制到 Path1 和 Path2 中。以下是详细步骤: 步骤 1:获取当前 PATH 的值 打开环境变量窗口: 按 Win R,输入 sysdm.cpl,点击 确定。在 系…

SEMI E40-0200 STANDARD FOR PROCESSING MANAGEMENT(加工管理标准)-(一)

1 目的 物料(例如晶圆)加工在设备中的自动化管理与控制是实现工厂自动化的关键要素。本标准针对半导体制造环境中与设备内部物料处理相关的通信需求进行了规范。本标准规定了在加工单元接收到的指定材料所应适用的加工方法(例如Etch腔室需要Run哪支Recipe)。它阐述了物料加工的…

【Hadoop】集群搭建实战:超详细保姆级教程

🐇明明跟你说过:个人主页 🏅个人专栏:《大数据前沿:技术与应用并进》🏅 🔖行路有良友,便是天堂🔖 目录 一、引言 1、Hadoop简介 2、Hadoop集群概念 3、 Hadoop 集…

阿里云人工智能大模型通义千问Qwen3开发部署

本文主要描述阿里云人工智能大模型开源社区ModelScope提供的通义千问Qwen3开发部署。 与阿里云一起 轻松实现数智化 让算力成为公共服务:用大规模的通用计算,帮助客户做从前不能做的事情,做从前做不到的规模。让数据成为生产资料:…

24.(vue3.x+vite)引入组件并动态挂载(mount)

示例截图 组件代码: <template><div><div>{{message }}</div>

《Python星球日记》 第56天:循环神经网络(RNN)入门

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 目录 一、序列数据的特点与挑战1. 什么是序列数据?2. 序列数据的挑战二、RNN 的基本结构与前向传播1. RNN的核心理念2. RNN的数学表达3. RNN的前向传…

手写 vue 源码 === computed 实现

目录 计算属性的基本概念 计算属性的核心实现 ComputedRefImpl 类的实现 ReactiveEffect 与计算属性的关系 计算属性的工作流程 1. 创建计算属性 2. 依赖收集过程 3. 嵌套 effect 的处理 4. 更新过程 嵌套 effect 关系图解 依赖关系建立过程 代码实现分析 1. 创建…

【Lattice FPGA 开发】Diamond在线调试Reveal逻辑乱跳的解决

在Vivado中在always块中写逻辑时如果出现always块中的异步复位敏感词在块内部未使用的情况&#xff0c;如下例的rst&#xff1a; always (posedge clk or posedge rst) begin if(~tx_sense_flag)o_rd_adr < d1;else if((o_rd_adr d94) & (bit_cnt d7))o_rd_adr <…

【hadoop】Sqoop数据迁移工具的安装部署

一、Sqoop安装与配置 步骤&#xff1a; 1、使用XFTP将Sqoop安装包sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz发送到master机器的主目录。 2、解压安装包&#xff1a; tar -zxvf ~/sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 3、修改文件夹的名字&#xff0c;将其改为s…

BUUCTF——PYWebsite

BUUCTF——PYWebsite 进入靶场 看看基本信息 没有什么信息 扫个目录看看 http://node5.buuoj.cn:28115/.DS_Store http://node5.buuoj.cn:28115/flag.php http://node5.buuoj.cn:28115/index.html访问flag.php 提示保存购买者的IP 抓包看看 直接XFF伪造一下 X-Forwarded-F…

基于Qt开发的多线程TCP服务端

目录 一、Qt TCP服务端开发环境准备1. 项目配置2. 核心类说明二、服务端搭建步骤详解步骤1:初始化服务端对象步骤2:启动端口监听步骤3:处理客户端连接三、数据通信与状态管理1. 数据收发实现2. 客户端状态监控四、进阶功能扩展1. 多客户端并发处理2. 心跳检测机制五、调试与…

【Tools】VScode使用CMake构建项目

这里写目录标题 vscode 使用 CMake**安装插件**新建CMake项目 vscode 使用 CMake 安装插件 CMake和CMake Tools c等等 CMake插件主要功能是CMake语法高亮、自动补全CMake Tools的功能主要是结合VSCode IDE使用CMake这个工具&#xff0c;比如生成CMake项目、构建CMake项目等…

neo4j图数据库基本概念和向量使用

一.节点 1.新建节点 create (n:GroupProduct {name:都邦高保额团意险,description: "保险产品名称"} ) return n CREATE&#xff1a;Neo4j 的关键字&#xff0c;用于创建新节点或关系。 (n:GroupProduct)&#xff1a; n 是节点的临时别名&#xff08;变量名&#…

2025年渗透测试面试题总结-渗透测试红队面试八(题目+回答)

网络安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 渗透测试红队面试八 二百一十一、常见中间件解析漏洞利用方式 二百一十二、MySQL用户密码存储与加密 …

大语言模型主流架构解析:从 Transformer 到 GPT、BERT

&#x1f4cc; 友情提示&#xff1a; 本文内容由银河易创AI&#xff08;https://ai.eaigx.com&#xff09;创作平台的gpt-4-turbo模型生成&#xff0c;旨在提供技术参考与灵感启发。文中观点或代码示例需结合实际情况验证&#xff0c;建议读者通过官方文档或实践进一步确认其准…

Java设计模式之装饰器模式:从基础到高级的全面解析(万字解析)

装饰器模式(Decorator Pattern)是一种结构型设计模式,它允许向一个现有的对象添加新的功能,同时又不改变其结构。这种模式创建了一个装饰类,用来包装原有的类,并在保持类方法签名完整性的前提下,提供了额外的功能。 一、装饰器模式基础概念 1.1 什么是装饰器模式 装饰…