关于airflow:workflow-之-Dagster-基本用法qbit

35次阅读

共计 3624 个字符,预计需要花费 10 分钟才能阅读完成。

前言

  • 技术栈

    Windows 10
    Python  3.8.10
    poetry  1.2.0
    dagster 1.0.16
    dagit   1.0.16
  • poetry github:https://github.com/python-poe…
  • dagster github:https://github.com/dagster-io…
  • dagster 官网文档:https://docs.dagster.io/

装置

  • 用 poetry 初始化我的项目后在 pyproject.toml 增加以下依赖,而后运行 poetry update

    # 国内镜像源(可选)[[tool.poetry.source]]
    name = "aliyun"
    url = "https://mirrors.aliyun.com/pypi/simple/"
    default = true
    
    [tool.poetry.dependencies]
    python = "^3.8.10"
    dagster = "~1.0.16"
    dagit = "~1.0.16"

测试代码

  • test_dagster.py

    # encoding: utf-8
    # author: qbit
    # date: 2022-11-09
    # summary: 测试 dagster,加减乘除
    from dagster import get_dagster_logger, job, op, asset
    
    loggerDag = get_dagster_logger()
    
    @op
    def OpSeed():
      r"""种子函数,生成初始参数"""
      return (2, 1)
    
    @op
    def OpAdd(seed):
      r"""读入参数做加法"""
      x, y = seed
      result = x + y
      loggerDag.info(f"{x} + {y} = {result}")
      return result
    
    @op
    def OpSubtract(x):
      r"""读入参数减 1"""
      result = x - 1
      loggerDag.info(f"{x} - 1 = {result}")
      return result
    
    @op
    def OpMultiply(x):
      r"""读入参数乘以 2"""
      result = x * 2
      loggerDag.info(f"{x} * 2 = {result}")
      # raise Exception('haha')
      return result
    
    @op
    def OpDivide(x, y):
      r"""读入参数做除法"""
      result = y / x
      loggerDag.info(f"{y} / {x} = {result}")
      return result
    
    @job
    def arithmetic():
      r"""四则运算"""
      seed = OpSeed()
      addResult = OpAdd(seed)
      subResult = OpSubtract(addResult)
      mulResult = OpMultiply(addResult)
      OpDivide(subResult, mulResult)
    
    if __name__ == "__main__":
      result = arithmetic.execute_in_process(run_config={"loggers": {"console": {"config": {"log_level": "info"}}}})

间接运行

  • 间接应用 python 运行

    poetry run python test_dagster.py
  • 运行后果

    2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpAdd - 2 + 1 = 3
    2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpMultiply - 3 * 2 = 6
    2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpSubtract - 3 - 1 = 2
    2022-11-09 10:07:03 +0800 - dagster - INFO - arithmetic - 9de5e326-f907-4a9f-9049-0d217cbe52c4 - OpDivide - 6 / 2 = 3.0

用 daster 运行

  • 编辑配置文件 run_config.yaml

    loggers:
    console:
      config:
        log_level: INFO
  • 运行命令

    poetry run dagster job execute -f test_dagster.py -c run_config.yaml
  • 运行后果

    2022-11-09 10:13:19 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 23368 - STEP_WORKER_STARTED - Executing step "OpSeed" in subprocess.
    2022-11-09 10:13:22 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 21660 - STEP_WORKER_STARTED - Executing step "OpAdd" in subprocess.
    2022-11-09 10:13:22 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpAdd - 2 + 1 = 3
    2022-11-09 10:13:25 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 23996 - STEP_WORKER_STARTED - Executing step "OpMultiply" in subprocess.
    2022-11-09 10:13:25 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 8072 - STEP_WORKER_STARTED - Executing step "OpSubtract" in subprocess.
    2022-11-09 10:13:25 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpMultiply - 3 * 2 = 6
    2022-11-09 10:13:25 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpSubtract - 3 - 1 = 2
    2022-11-09 10:13:29 +0800 - dagster - DEBUG - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - 21156 - STEP_WORKER_STARTED - Executing step "OpDivide" in subprocess.
    2022-11-09 10:13:29 +0800 - dagster - INFO - arithmetic - 4bb77c69-c625-42e4-8c51-54020ab897a9 - OpDivide - 6 / 2 = 3.0

用 dagit 运行

  • PowerShell 设置环境变量

    $env:DAGSTER_HOME='C:\Users\qbit\dagster\dagster_home'
  • DAGSTER_HOME 创立一个空文件 dagster.yaml
  • 运行命令

    poetry run dagit -f test_dagster.py
  • 用浏览器关上 http://127.0.0.1:3000 查看
  • 点击 Launchpad 标签页,点击 Launch Run 按钮运行,运行后果如下

仓库和工作区

  • 官网文档:https://docs.dagster.io/conce…
  • 创立仓库文件 repo_qbit.py,内容如下

    from dagster import repository
    from test_dagster import arithmetic
    
    @repository
    def hello_world_repository():
      return [arithmetic]
  • 创立工作区文件 workspace.yaml,内容如下

    load_from:
    - python_file: repo_qbit.py
  • dagitdagster-daemon:https://docs.dagster.io/deplo…
  • dagit

    poetry run dagit -h 0.0.0.0 -p 3000

从新运行失败的工作

  • dagster 官网文档:https://docs.dagster.io/guide…

本文出自 qbit snap

正文完
 0