前言
-
技术栈
Windows 10 Python 3.8.10 poetry 1.2.0 dagster 1.0.16 dagit 1.0.16
- poetry github:https://github.com/python-poe…
- dagster github:https://github.com/dagster-io…
- dagster 官网文档:https://docs.dagster.io/
装置
-
用 poetry 初始化我的项目后在 pyproject.toml 增加以下依赖,而后运行 poetry update
# 国内镜像源(可选)[[tool.poetry.source]] name = "aliyun" url = "https://mirrors.aliyun.com/pypi/simple/" default = true [tool.poetry.dependencies] python = "^3.8.10" dagster = "~1.0.16" dagit = "~1.0.16"
测试代码
-
test_dagster.py
# encoding: utf-8 # author: qbit # date: 2022-11-09 # summary: 测试 dagster,加减乘除 from dagster import get_dagster_logger, job, op, asset loggerDag = get_dagster_logger() @op def OpSeed(): r"""种子函数,生成初始参数""" return (2, 1) @op def OpAdd(seed): r"""读入参数做加法""" x, y = seed result = x + y loggerDag.info(f"{x} + {y} = {result}") return result @op def OpSubtract(x): r"""读入参数减 1""" result = x - 1 loggerDag.info(f"{x} - 1 = {result}") return result @op def OpMultiply(x): r"""读入参数乘以 2""" result = x * 2 loggerDag.info(f"{x} * 2 = {result}") # raise Exception('haha') return result @op def OpDivide(x, y): r"""读入参数做除法""" result = y / x loggerDag.info(f"{y} / {x} = {result}") return result @job def arithmetic(): r"""四则运算""" seed = OpSeed() addResult = OpAdd(seed) subResult = OpSubtract(addResult) mulResult = OpMultiply(addResult) OpDivide(subResult, mulResult) if __name__ == "__main__": result = arithmetic.execute_in_process(run_config={"loggers": {"console": {"config": {"log_level": "info"}}}})
间接运行
-
间接应用 python 运行
poetry run python test_dagster.py
-
运行后果
2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpAdd - 2 + 1 = 3 2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpMultiply - 3 * 2 = 6 2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpSubtract - 3 - 1 = 2 2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpDivide - 6 / 2 = 3.0
用 daster 运行
-
编辑配置文件 run_config.yaml
loggers: console: config: log_level: INFO
-
运行命令
poetry run dagster job execute -f test_dagster.py -c run_config.yaml
-
运行后果
2022-11-09 10:13:19 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 23368 - STEP_WORKER_STARTED - Executing step "OpSeed" in subprocess. 2022-11-09 10:13:22 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 21660 - STEP_WORKER_STARTED - Executing step "OpAdd" in subprocess. 2022-11-09 10:13:22 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpAdd - 2 + 1 = 3 2022-11-09 10:13:25 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 23996 - STEP_WORKER_STARTED - Executing step "OpMultiply" in subprocess. 2022-11-09 10:13:25 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 8072 - STEP_WORKER_STARTED - Executing step "OpSubtract" in subprocess. 2022-11-09 10:13:25 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpMultiply - 3 * 2 = 6 2022-11-09 10:13:25 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpSubtract - 3 - 1 = 2 2022-11-09 10:13:29 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 21156 - STEP_WORKER_STARTED - Executing step "OpDivide" in subprocess. 2022-11-09 10:13:29 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpDivide - 6 / 2 = 3.0
用 dagit 运行
-
PowerShell 设置环境变量
$env:DAGSTER_HOME='C:\Users\qbit\dagster\dagster_home'
- 在
DAGSTER_HOME
创立一个空文件dagster.yaml
-
运行命令
poetry run dagit -f test_dagster.py
- 用浏览器关上 http://127.0.0.1:3000 查看
- 点击 Launchpad 标签页,点击 Launch Run 按钮运行,运行后果如下
仓库和工作区
- 官网文档:https://docs.dagster.io/conce…
-
创立仓库文件
repo_qbit.py
,内容如下from dagster import repository from test_dagster import arithmetic @repository def hello_world_repository(): return [arithmetic]
-
创立工作区文件
workspace.yaml
,内容如下load_from: - python_file: repo_qbit.py
dagit
与dagster-daemon
:https://docs.dagster.io/deplo…-
dagit
poetry run dagit -h 0.0.0.0 -p 3000
从新运行失败的工作
- dagster 官网文档:https://docs.dagster.io/guide…
本文出自 qbit snap