前言

  • 技术栈

    Windows 10Python  3.8.10poetry  1.2.0dagster 1.0.16dagit   1.0.16
  • poetry github:https://github.com/python-poe...
  • dagster github:https://github.com/dagster-io...
  • dagster 官网文档:https://docs.dagster.io/

装置

  • 用 poetry 初始化我的项目后在 pyproject.toml 增加以下依赖,而后运行 poetry update

    # 国内镜像源(可选)[[tool.poetry.source]]name = "aliyun"url = "https://mirrors.aliyun.com/pypi/simple/"default = true[tool.poetry.dependencies]python = "^3.8.10"dagster = "~1.0.16"dagit = "~1.0.16"

测试代码

  • test_dagster.py

    # encoding: utf-8# author: qbit# date: 2022-11-09# summary: 测试 dagster,加减乘除from dagster import get_dagster_logger, job, op, assetloggerDag = get_dagster_logger()@opdef OpSeed():  r""" 种子函数,生成初始参数 """  return (2, 1)@opdef OpAdd(seed):  r""" 读入参数做加法 """  x, y = seed  result = x + y  loggerDag.info(f"{x} + {y} = {result}")  return result@opdef OpSubtract(x):  r""" 读入参数减 1 """  result = x - 1  loggerDag.info(f"{x} - 1 = {result}")  return result@opdef OpMultiply(x):  r""" 读入参数乘以 2 """  result = x * 2  loggerDag.info(f"{x} * 2 = {result}")  # raise Exception('haha')  return result@opdef OpDivide(x, y):  r""" 读入参数做除法 """  result = y / x  loggerDag.info(f"{y} / {x} = {result}")  return result@jobdef arithmetic():  r""" 四则运算 """  seed = OpSeed()  addResult = OpAdd(seed)  subResult = OpSubtract(addResult)  mulResult = OpMultiply(addResult)  OpDivide(subResult, mulResult)if __name__ == "__main__":  result = arithmetic.execute_in_process(      run_config={"loggers": {"console": {"config": {"log_level": "info"}}}})

间接运行

  • 间接应用 python 运行

    poetry run python test_dagster.py
  • 运行后果

    2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpAdd - 2 + 1 = 32022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpMultiply - 3 * 2 = 62022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpSubtract - 3 - 1 = 22022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpDivide - 6 / 2 = 3.0

用 daster 运行

  • 编辑配置文件 run_config.yaml

    loggers:console:  config:    log_level: INFO
  • 运行命令

    poetry run dagster job execute -f test_dagster.py -c run_config.yaml
  • 运行后果

    2022-11-09 10:13:19 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 23368 - STEP_WORKER_STARTED - Executing step "OpSeed" in subprocess.2022-11-09 10:13:22 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 21660 - STEP_WORKER_STARTED - Executing step "OpAdd" in subprocess.2022-11-09 10:13:22 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpAdd - 2 + 1 = 32022-11-09 10:13:25 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 23996 - STEP_WORKER_STARTED - Executing step "OpMultiply" in subprocess.2022-11-09 10:13:25 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 8072 - STEP_WORKER_STARTED - Executing step "OpSubtract" in subprocess.2022-11-09 10:13:25 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpMultiply - 3 * 2 = 62022-11-09 10:13:25 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpSubtract - 3 - 1 = 22022-11-09 10:13:29 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 21156 - STEP_WORKER_STARTED - Executing step "OpDivide" in subprocess.2022-11-09 10:13:29 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpDivide - 6 / 2 = 3.0

用 dagit 运行

  • PowerShell 设置环境变量

    $env:DAGSTER_HOME='C:\Users\qbit\dagster\dagster_home'
  • DAGSTER_HOME 创立一个空文件 dagster.yaml
  • 运行命令

    poetry run dagit -f test_dagster.py
  • 用浏览器关上 http://127.0.0.1:3000 查看
  • 点击 Launchpad 标签页,点击 Launch Run 按钮运行,运行后果如下

仓库和工作区

  • 官网文档:https://docs.dagster.io/conce...
  • 创立仓库文件 repo_qbit.py,内容如下

    from dagster import repositoryfrom test_dagster import arithmetic@repositorydef hello_world_repository():  return [arithmetic]
  • 创立工作区文件 workspace.yaml,内容如下

    load_from:- python_file: repo_qbit.py
  • dagitdagster-daemon:https://docs.dagster.io/deplo...
  • dagit

    poetry run dagit -h 0.0.0.0 -p 3000

从新运行失败的工作

  • dagster 官网文档:https://docs.dagster.io/guide...
本文出自 qbit snap