共计 2970 个字符,预计需要花费 8 分钟才能阅读完成。
Mara-pipelines 是一个轻量级的数据转换框架,具备通明和低复杂性的特点。其余特点如下:
- 基于非常简单的 Python 代码就能实现流水线开发。
- 应用 PostgreSQL 作为数据处理引擎。
- 有 Web 界面可视化剖析流水线执行过程。
- 基于 Python 的 multiprocessing 单机流水线执行。不须要分布式工作队列。轻松调试和输入日志。
- 基于老本的优先队列:首先运行具备较高老本(基于记录的运行工夫)的节点。
此外,在 Mara-pipelines 的 Web 界面中,你不仅能够查看和治理流水线及其工作节点,你还能够间接触发这些流水线和节点,十分好用:
1. 装置
因为应用了大量的依赖,Mara-pipelines 并不适用于 Windows,如果你须要在 Windows 上应用 Mara-pipelines,请应用 Docker 或者 Windows 下的 linux 子系统。
应用 pip 装置 Mara-pipelines:
pip install mara-pipelines
或者:
pip install git+https://github.com/mara/mara-pipelines.git
2. 应用示例
这是一个根底的流水线演示,由三个相互依赖的节点组成,包含 工作 1(ping_localhost), 子流水线 (sub_pipeline), 工作 2(sleep):
# 留神,这个示例中应用了局部国外的网站,如果无法访问,请变更为国内网站。from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively
pipeline = Pipeline(
id='demo',
description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')
pipeline.add(Task(id='ping_localhost', description='Pings localhost',
commands=[RunBash('ping -c 3 localhost')]))
sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')
for host in ['google', 'amazon', 'facebook']:
sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
commands=[RunBash(f'ping -c 3 {host}.com')]))
sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
commands=[RunBash('ping foo')]), ['ping_amazon'])
pipeline.add(sub_pipeline, ['ping_localhost'])
pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
commands=[RunBash('sleep 2')]), ['sub_pipeline'])
能够看到,Task 蕴含了多个 commands,这些 commands 会用于真正地执行动作。
而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:
pipeline.add(sub_pipeline, ['ping_localhost'])
则表明必须执行完 ping_localhost 才会执行 sub_pipeline.
为了运行这个流水线,须要配置一个 PostgreSQL 数据库来存储运行时信息、运行输入和增量解决状态:
import mara_db.auto_migration
import mara_db.config
import mara_db.dbs
mara_db.config.databases \
= lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}
mara_db.auto_migration.auto_discover_models_and_migrate()
如果 PostgresSQL 正在运行并且账号密码正确,输入如下所示(创立了一个蕴含多个表的数据库):
Created database "postgresql+psycopg2://root@localhost/example_etl_mara"
CREATE TABLE data_integration_file_dependency (node_path TEXT[] NOT NULL,
dependency_type VARCHAR NOT NULL,
hash VARCHAR,
timestamp TIMESTAMP WITHOUT TIME ZONE,
PRIMARY KEY (node_path, dependency_type)
);
.. more tables
为了运行这个流水线,你须要:
from mara_pipelines.ui.cli import run_pipeline
run_pipeline(pipeline)
这将运行单个流水线节点及其 (sub_pipeline
) 所依赖的所有节点:
run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)
3.Web 界面
我认为 mara-pipelines 最有用的是他们提供了基于 Flask 管控流水线的 Web 界面。
对于每条流水线,他们都有一个页面显示:
- 所有子节点的图以及它们之间的依赖关系
- 流水线的总体运行工夫图表以及过来 30 天内最低廉的节点(可配置)
- 所有流水线节点及其均匀运行工夫和由此产生的排队优先级的表
- 流水线最初一次运行的输入和工夫线
对于每个工作,都有一个页面显示
- 流水线中工作的上游和上游
- 最近 30 天内工作的运行工夫
- 工作的所有命令
- 工作最初运行的输入
此外,流水线和工作能够间接从网页端调用运行,这是十分棒的特点:
咱们的文章到此就完结啦,如果你喜爱明天的 Python 实战教程,能够关注公众号:Python 编程学习圈 ,理解更多编程技术常识!