关于阿里云:阿里云-Serverless-异步任务处理系统在数据分析领域的应用

34次阅读

共计 7025 个字符,预计需要花费 18 分钟才能阅读完成。

异步工作解决零碎中的数据分析

数据处理、机器学习训练、数据统计分析是最为常见的一类离线工作。这类工作往往都是通过了一系列的预处理后,由上游对立发送到工作平台进行批量训练及剖析。在解决语言方面,Python 因为其所提供的丰盛的数据处理库,成为了数据畛域最为罕用的语言之一。函数计算原生反对 Python runtime,并反对快捷的引入第三方库,使得应用函数计算异步工作进行解决变得极为不便。

数据分析场景常见诉求

数据分析场景往往具备执行工夫长、并发量大的特点。在离线场景中,往往会定时触发一批大量的数据进行集中处理。因为这种触发个性,业务方往往会对资源利用率(老本)具备较高的要求,冀望可能满足效率的同时,尽量降低成本。具体归纳如下:

  1. 程序开发便捷,对于第三方包及自定义依赖敌对;
  2. 反对长时运行。可能查看执行过程中的工作状态,或登录机器进行操作。如果呈现数据谬误反对手动进行工作;
  3. 资源利用率高,老本最优。

以上诉求非常适合应用函数计算异步工作。

典型案例 – 数据库自治服务

  • 业务根本状况

阿里云团体外部的数据库巡检平台次要用于对 sql 语句的慢查问、日志等进行优化剖析。整个平台工作分为离线训练及在线剖析两类次要工作,其中在线剖析业务的的计算规模达到了上万核,离线业务的每日执行时长也在数万核小时。因为在线剖析、离线训练工夫上的不确定性,很难进步集群整体资源利用率,并且在业务顶峰来时须要极大的弹性算力反对。应用函数计算后,整个业务的架构图如下:

  • 业务痛点及架构演进

数据库巡检平台负责阿里巴巴全网各 Region 的数据库 SQL 优化及剖析工作。Mysql 数据来源于各 Region 的各个集群,并对立在 Region 维度进行一次预聚合及存储。在进行剖析时,因为须要跨 region 的聚合及统计,巡检平台首先尝试在内网搭建大型 Flink 集群进行统计分析工作。然而在理论应用中,遇到了如下问题:

  1. 数据处理算法迭代繁琐。次要体现在算法的部署、测试及公布上。Flink 的 Runtime 能力极大限度了公布周期;
  2. 对于常见的及一些自定义的第三方库,Flink 反对不是很好。算法所依赖的一些机器学习、统计的库在 Flink 官网 Python runtime 中要么没有,要么版本老旧,应用不便,无奈满足要求;
  3. 走 Flink 转发链路较长,Flink 排查问题艰难;
  4. 峰值时弹性速度及资源均较难满足要求。并且整体老本十分高。

在理解了函数计算后,针对 Flink 计算局部进行了算法工作的迁徙工作,将外围训练及统计算法迁徙至函数计算。通过应用函数计算异步工作所提供的相干能力,整个开发、运维及老本失去了极大的晋升。

  • 迁徙函数计算架构后的成果
  1. 迁徙函数计算后,零碎可能残缺承接峰值流量,疾速实现每日剖析及训练任务;
  2. 函数计算丰盛的 Runtime 能力反对了业务的疾速迭代;
  3. 计算上雷同的核数老本变为了原来 Flink 的 1/3。

函数计算异步工作十分实用于这类数据处理工作。函数计算在升高运算资源的老本同时,可能将您从繁冗的平台运维工作中解放出来,专一于算法开发及优化。

函数计算异步工作最佳实际 -Kafka ETL

ETL 是数据处理中较为常见的工作。原始数据或存在于 Kafka 中,或存在于 DB 中,因为业务须要对数据进行解决后转储到其余存储介质(或存回原来的工作队列)。这类业务也属于显著的工作场景。如果您采纳了云上的中间件服务(如云上的 Kafka),您就能够利用函数计算弱小的触发器集成生态便捷的集成 Kafka,而无需关注诸如 Kafka Connector 的部署、错误处理等与业务无关的操作。

ETL 工作场景的需要

一个 ETL 工作往往蕴含 Source、Sink 及处理单元三个局部,因而 ETL 工作除了对算力的要求外,还须要工作零碎具备极强的上下游连贯生态。除此之外,因为数据处理的准确性要求,须要工作解决零碎可能提供工作去重、Exactly Once 的操作语义。并且,对于解决失败的音讯,须要可能进行弥补(如重试、死信队列)的能力。总结如下:

  1. 工作的精确执行:

<!—->

    1. 工作反复触发反对去重;
    2. 工作反对弥补,死信队列;
  1. 工作的上下游:

<!—->

    1. 可能不便的拉取数据,并在解决后将数据传递至其余零碎;
  1. 算子能力的要求:

<!—->

    1. 反对用户自定义算子的能力,可能灵便的执行各种数据处理工作。

Serverless Task 对 ETL 工作的反对

函数计算反对的 Destinationg 性能能够很好的反对 ETL 工作对于便捷连贯上下游、工作精确执行的相干诉求。函数计算丰盛的 Runtime 反对也使得对于数据处理的工作变得极为灵便。在 Kafka ETL 工作解决场景中,咱们次要用到的 Serverless Task 能力如下:

  1. 异步指标配置性能:

<!—->

    1. 通过配置工作胜利指标,反对主动将工作投递至上游零碎(如队列中);
    2. 通过配置工作失败指标,反对死信队列能力,将失败的工作投递至音讯队列,期待后续的弥补解决;
  1. 灵便的算子及第三方库反对:

<!—->

    1. Python 因为其丰盛的统计、运算的第三方库的反对,在数据处理畛域 Python 是用的最为宽泛的语言之一。函数计算的 Python Runtime 反对对第三方库打包,使您可能疾速的进行原型验证及测试上线。

Kafka ETL 工作解决示例

咱们以简略的 ETL 工作解决为例,数据源来自 Kafka,通过函数计算解决后,将工作执行后果及上下游信息推送至音讯服务 MNS。见函数计算局部我的项目源码 [ 1]

  • Kafka 资源筹备
  1. 进入 Kafka 控制台,点击购买实例,之后部署。期待实例部署实现;
  2. 进入创立好的实例中,创立一个测试用 Topic。
  • 指标资源筹备(MNS)

进入 MNS 控制台,别离创立两个队列:

  1. dead-letter-queue:作为死信队列应用。当音讯解决失败后,执行的上下文信息将投递到这里;
  2. fc-etl-processed-message:作为工作胜利执行后的推送指标。

创立实现后,如下图所示:

  • 部署
  1. 下载安装 Serverless Devs:
npm install @serverless-devs/s

具体文档能够参考 Serverless Devs 装置文档 [ 2]

  1. 配置密钥信息:
s config add

具体文档能够参考阿里云密钥配置文档 [ 3]

  1. 进入我的项目,批改 s.yaml 文件中的指标 ARN 为上述创立后的 MNS 队列 ARN,并批改服务角色为已存在的角色;
  2. 部署:s deploy -t s.yaml 
  • 配置 ETL 工作
  • 进入 kafka 控制台 – connector 工作列表标签页,点击创立 Connector;
  1. 在配置完根本信息、源的 Topic 后,配置指标服务。在这外面咱们选择函数计算作为指标:

您能够依据业务需要配置发送批大小及重试次数。至此,咱们已实现工作的根本配置。留神:这外面的发送模式请抉择“异步”模式。

进入到函数计算异步配置页面,咱们能够看到目前的配置如下:

  • 测试 ETL 工作

1. 进入 kafka 控制台 – connector 工作列表标签页,点击测试;填完音讯内容后,点击发送:

  1. 发送多条音讯后,进入到函数控制台。咱们能够看到有多条音讯在执行中。此时咱们抉择应用进行工作的形式来模仿一次工作执行失败:
  1. 进入到音讯服务 MNS 控制台中,咱们能够看到两个先前创立的队列中均有一条可用音讯,别离代表一次执行和失败的工作内容:
  1. 进入到队列详情中,咱们能够看到两条音讯内容。以胜利的音讯内容为例:
{
    "timestamp":1646826806389,
    "requestContext":{
        "requestId":"919889e7-60ff-408f-a0c7-627bbff88456",
        "functionArn":"acs:fc:::services/fc-etl-job.LATEST/functions/fc-job-function",
        "condition":"","approximateInvokeCount":1
    },
    "requestPayload":"[{"key":"k1","offset":1,"overflowFlag":false,"partition":5,"timestamp":1646826803356,"topic":"connector-demo","value":"k1","valueSize":4}]",
    "responseContext":{
        "statusCode":200,
        "functionError":""},"responsePayload":"[\n    {\n        "key": "k1",\n        "offset": 1,\n        "overflowFlag": false,\n        "partition": 5,\n        "timestamp": 1646826803356,\n        "topic": "connector-demo",\n        "value": "k1",\n        "valueSize": 4\n}\n]"
}

在这外面,咱们能够看到 “responsePayload” 这一个 Key 中有函数返回的原始内容。个别状况下咱们会将数据处理的后果作为 response 返回,所以在后续的解决中,能够通过读取 “responsePayload” 来获取解决后的后果。

“requestPayload” 这一个 Key 中是 Kafka 触发函数计算的原始内容,通过读取这条数据中的内容,便能够获取原始数据。

函数计算异步工作最佳实际 - 音视频解决

随着计算机技术和网络的倒退,视频点播技术因其良好的人机交互性和流媒体传输技术倍受教育、娱乐等行业的青眼。以后云计算平台厂商的产品线一直成熟欠缺,如果想要搭建视频点播类利用,间接上云会扫清硬件洽购、技术等各种阻碍。以阿里云为例,典型的解决方案如下:

在该解决方案中,对象存储 OSS 能够反对海量视频存储,采集上传的视频被转码以适配各种终端、CDN 减速终端设备播放视频的速度。此外还有一些内容平安 [ 4] 审查需要,例如鉴黄、鉴恐等。

音视频是典型的长时解决场景,非常适合应用函数计算工作。

音视频解决的需要

在视频点播解决方案中,视频转码是最耗费计算力的一个子系统,尽管您能够应用云上专门的转码服务,但在某些场景下,您仍会抉择本人搭建转码服务,例如:

  • 须要更弹性的视频解决服务。例如,曾经在虚拟机或容器平台上基于 FFmpeg 部署了一套视频解决服务,但想在此基础上 晋升资源利用率,实现具备显著波峰波谷、流量突增状况下的快弹及稳定性;
  • 须要批量疾速解决多个超大的视频。例如,每周五定时产生几百个 4 GB 以上 1080P 的大视频,每个工作可能执行时长达数小时;
  • 对视频解决工作心愿实时 把握进度 ;并在一些呈现谬误的状况下须要登录实例排查问题甚至 进行执行中的工作 防止资源耗费。

Serverless Task 对音视频场景的反对

上述诉求是典型的工作场景。而因为这类工作往往具备波峰波谷的个性,如何进行计算资源的运维,并尽可能的升高其老本,这部分的工作量甚至比理论视频解决业务的工作量还要大。Serverless Task 这一产品状态就是为了解决这类场景而诞生的,通过 Serverless Task,您能够疾速构建高弹性、高可用、低成本免运维的视频解决平台。

在这个场景中,咱们会用到的 Serverless Task 的次要能力如下:

  1. 免运维 & 低成本:计算资源随用随弹,不应用不付费;
  2. 长时执行工作负载敌对:单个实例最长反对 24h 的执行时长;
  3. 工作去重:反对触发端的谬误弥补。对于繁多工作,Serverless Task 可能做到主动去重的能力,执行更牢靠;
  4. 工作可观测:所有执行中、执行胜利、执行失败的工作可追溯,可查问;反对工作的执行历史数据查问、工作日志查问;
  5. 工作可操作:您能够进行、重试工作;
  6. 麻利开发 & 测试:官网反对 S 工具进行自动化一键部署;反对登录运行中函数实例的能力,您能够间接登录实例调试 ffmpeg 等第三方程序,所见即所得。

Serverless -FFmpeg 视频转码

我的项目源码 [ 5] 见文末

  • 部署
  1. 下载安装 Serverless Devs:
npm install @serverless-devs/s

具体文档能够参考 Serverless Devs 装置文档 [ 2]

  1. 配置密钥信息:
s config add

具体文档能够参考阿里云密钥配置文档 [ 3 ]

  1. 初始化我的项目:s init video-transcode -d video-transcode
  2. 进入我的项目并部署:cd video-transcode && s deploy
  • 调用函数
  1. 发动 5 次异步工作函数调用
$ s VideoTranscoder invoke -e '{"bucket":"my-bucket","object":"480P.mp4","output_dir":"a","dst_format":"mov"}' --invocation-type async   --stateful-async-invocation-id my1-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: bf7d7745-886b-42fc-af21-ba87d98e1b1c

$ s VideoTranscoder invoke -e '{"bucket":"my-bucket","object":"480P.mp4","output_dir":"a","dst_format":"mov"}' --invocation-type async   --stateful-async-invocation-id my2-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: edb06071-ca26-4580-b0af-3959344cf5c3

$ s VideoTranscoder invoke -e '{"bucket":"my-bucket","object":"480P.mp4","output_dir":"a","dst_format":"flv"}' --invocation-type async   --stateful-async-invocation-id my3-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: 41101e41-3c0a-497a-b63c-35d510aef6fb

$ s VideoTranscoder invoke -e '{"bucket":"my-bucket","object":"480P.mp4","output_dir":"a","dst_format":"avi"}' --invocation-type async   --stateful-async-invocation-id my4-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: ff48cc04-c61b-4cd3-ae1b-1aaaa1f6c2b2

$ s VideoTranscoder invoke -e '{"bucket":"my-bucket","object":"480P.mp4","output_dir":"a","dst_format":"m3u8"}' --invocation-type async   --stateful-async-invocation-id my5-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: d4b02745-420c-4c9e-bc05-75cbdd2d010f
  1. 登录 FC 控制台 [ 6 ]

能够清晰看出每一次转码工作的执行状况:

    • A 视频是什么时候开始转码的, 什么时候转码完结
    • B 视频转码工作不太合乎预期,我中途能够点击进行调用
    • 通过调用状态过滤和工夫窗口过滤,我能够晓得当初有多少个工作正在执行,历史实现状况是怎么样的
    • 能够追溯每次转码工作执行日志和触发 payload
    • 当您的转码函数有异样时候,会触发 dest-fail 函数的执行,您在这个函数能够增加您自定义的逻辑,比方报警

转码结束后,您也能够登录 OSS 控制台到指定的输入目录查看转码后的视频。

在本地应用该我的项目时,不仅能够部署,还能够进行更多的操作,例如查看日志,查看指标,进行多种模式的调试等,这些操作详情能够参考函数计算组件命令文档 [ 7]

参考链接:

[1] 函数计算局部我的项目源码:

https://github.com/awesome-fc…

[2] Serverless Devs 装置文档:

https://github.com/Serverless-Devs/ServerlessDevs/blob/master/docs/zh/install.md

[3] 阿里云密钥配置文档:

https://github.com/devsapp/fc…

[4] 内容平安:

https://help.aliyun.com/produ…

[5] 我的项目源码:

https://github.com/devsapp/st…

[6] FC 控制台:

https://fcnext.console.aliyun…

[7] 函数计算组件命令文档:

https://github.com/devsapp/fc#%E6%96%87%E6%A1%A3%E7%9B%B8%E5%85%B3

正文完
 0