10人将获赠CNCF商店$100美元礼券!

你填了吗?

问卷链接(https://www.wjx.cn/jq/9714648...)


作者:Alex Collins

Python 是用户在 Kubernetes 上编写机器学习工作流的风行编程语言。

开箱即用时,Argo 并没有为 Python 提供一流的反对。相同,咱们提供Java、Golang 和 Python API 客户端。

但这对大多数用户来说还不够。许多用户须要一个形象层来增加组件和特定于用例的个性。

明天你有两个抉择。

KFP 编译器+ Python 客户端

Argo 工作流被用作执行 Kubeflow 流水线的引擎。你能够定义一个 Kubeflow 流水线,并在 Python 中将其间接编译到 Argo 工作流中。

而后你能够应用Argo Python 客户端向 Argo 服务器 API 提交工作流。

这种办法容许你利用现有的 Kubeflow 组件。

装置:

pip3 install kfppip3 install argo-workflows

例子:

import kfp as kfpdef flip_coin():    return kfp.dsl.ContainerOp(        name='Flip a coin',        image='python:alpine3.6',        command=['python', '-c', """import randomres = "heads" if random.randint(0, 1) == 0 else "tails"with open('/output', 'w') as f:    f.write(res)        """],        file_outputs={'output': '/output'}    )def heads():    return kfp.dsl.ContainerOp(name='Heads', image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"'])def tails():    return kfp.dsl.ContainerOp(name='Tails', image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"'])@kfp.dsl.pipeline(name='Coin-flip', description='Flip a coin')def coin_flip_pipeline():    flip = flip_coin()    with kfp.dsl.Condition(flip.output == 'heads'):        heads()    with kfp.dsl.Condition(flip.output == 'tails'):        tails()def main():    kfp.compiler.Compiler().compile(coin_flip_pipeline, __file__ + ".yaml")if __name__ == '__main__':    main()

运行这个来创立你的工作流:

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: coin-flip-  annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-21T17:17:54.299235',    pipelines.kubeflow.org/pipeline_spec: '{"description": "Flip a coin", "name":      "Coin-flip"}'}  labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0}spec:  entrypoint: coin-flip  templates:  - name: coin-flip    dag:      tasks:      - name: condition-1        template: condition-1        when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "heads"'        dependencies: [flip-a-coin]      - name: condition-2        template: condition-2        when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "tails"'        dependencies: [flip-a-coin]      - {name: flip-a-coin, template: flip-a-coin}  - name: condition-1    dag:      tasks:      - {name: heads, template: heads}  - name: condition-2    dag:      tasks:      - {name: tails, template: tails}  - name: flip-a-coin    container:      command:      - python      - -c      - "\nimport random\nres = \"heads\" if random.randint(0, 1) == 0 else \"tails\"\        \nwith open('/output', 'w') as f:\n    f.write(res)        \n        "      image: python:alpine3.6    outputs:      parameters:      - name: flip-a-coin-output        valueFrom: {path: /output}      artifacts:      - {name: flip-a-coin-output, path: /output}  - name: heads    container:      command: [sh, -c, echo "it was heads"]      image: alpine:3.6  - name: tails    container:      command: [sh, -c, echo "it was tails"]      image: alpine:3.6  arguments:    parameters: []  serviceAccountName: pipeline-runner

留神,Kubeflow 不反对这种办法。

你能够应用客户端提交上述工作流程如下:

import yamlfrom argo.workflows.client import (ApiClient,                                   WorkflowServiceApi,                                   Configuration,                                   V1alpha1WorkflowCreateRequest)def main():    config = Configuration(host="http://localhost:2746")    client = ApiClient(configuration=config)    service = WorkflowServiceApi(api_client=client)with open("coin-flip.py.yaml") as f:        manifest: dict = yaml.safe_load(f)del manifest['spec']['serviceAccountName']service.create_workflow('argo', V1alpha1WorkflowCreateRequest(workflow=manifest))if __name__ == '__main__':    main()

Couler

Couler是一个风行的我的项目,它容许你以一种平台无感的形式指定工作流,但它次要反对 Argo 工作流(打算在将来反对 Kubeflow 和 AirFlow):

装置:

pip3 install git+https://github.com/couler-proj/couler

例子:

import couler.argo as coulerfrom couler.argo_submitter import ArgoSubmitterdef random_code():    import randomres = "heads" if random.randint(0, 1) == 0 else "tails"    print(res)def flip_coin():    return couler.run_script(image="python:alpine3.6", source=random_code)def heads():    return couler.run_container(        image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']    )def tails():    return couler.run_container(        image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']    )result = flip_coin()couler.when(couler.equal(result, "heads"), lambda: heads())couler.when(couler.equal(result, "tails"), lambda: tails())submitter = ArgoSubmitter()couler.run(submitter=submitter)

这会创立以下工作流程:

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: couler-example-spec:  templates:    - name: couler-example      steps:        - - name: flip-coin-29            template: flip-coin        - - name: heads-31            template: heads            when: '{{steps.flip-coin-29.outputs.result}} == heads'          - name: tails-32            template: tails            when: '{{steps.flip-coin-29.outputs.result}} == tails'    - name: flip-coin      script:        name: ''        image: 'python:alpine3.6'        command:          - python        source: |import randomres = "heads" if random.randint(0, 1) == 0 else "tails"          print(res)    - name: heads      container:        image: 'alpine:3.6'        command:          - sh          - '-c'          - echo "it was heads"    - name: tails      container:        image: 'alpine:3.6'        command:          - sh          - '-c'          - echo "it was tails"  entrypoint: couler-example  ttlStrategy:    secondsAfterCompletion: 600  activeDeadlineSeconds: 300

点击浏览网站原文。


CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux  Foundation,是非营利性组织。
CNCF(云原生计算基金会)致力于培养和保护一个厂商中立的开源生态系统,来推广云原生技术。咱们通过将最前沿的模式民主化,让这些翻新为公众所用。扫描二维码关注CNCF微信公众号。