关于sql:Flink-在唯品会的实践

120次阅读

共计 7191 个字符,预计需要花费 18 分钟才能阅读完成。

简介:Flink 在唯品会的容器化实际利用以及产品化教训。

唯品会自 2017 年开始基于 k8s 深刻打造高性能、稳固、牢靠、易用的实时计算平台,反对唯品会外部业务在平时以及大促的安稳运行。现平台反对 Flink、Spark、Storm 等支流框架。本文次要分享 Flink 的容器化实际利用以及产品化教训。内容包含:

  1. 倒退概览
  2. Flink 容器化实际
  3. Flink SQL 平台化建设
  4. 利用案例
  5. 将来布局

GitHub 地址
https://github.com/apache/flink
欢送大家给 Flink 点赞送 star~

一、倒退概览

平台反对公司外部所有部门的实时计算利用。次要的业务包含实时大屏、举荐、试验平台、实时监控和实时数据荡涤等。

1.1 集群规模

平台现有异地双机房双集群,具备 2000 多的物理机节点,利用 k8s 的 namespaces,labels 和 taints 等,实现业务隔离以及初步的计算负载隔离。目前线上实时利用有大略 1000 个,平台最近次要反对 Flink SQL 工作的上线。

1.2 平台架构

  • 上图是唯品会实时计算平台的整体架构。
  • 最底层是计算工作节点的资源调度层,理论是以 deployment 的模式运行在 k8s 上,平台尽管反对 yarn 调度,然而 yarn 调度是与批工作共享资源,所以支流工作还是运行在 k8s 上。
  • 存储层这一层,反对公司外部基于 kafka 实时数据 vms,基于 binlog 的 vdp 数据和原生 kafka 作为音讯总线,状态存储在 hdfs 上,数据次要存入 redis,mysql,hbase,kudu,clickhouse 等。
  • 计算引擎层,平台反对 Flink,Spark,Storm 支流框架容器化,提供了一些框架的封装和组件等。每个框架会都会反对几个版本的镜像满足不同的业务需要。
  • 平台层提供作业配置、调度、版本治理、容器监控、job 监控、告警、日志等性能,提供多租户的资源管理(quota,label 治理),提供 kafka 监控。在 Flink 1.11 版本之前,平台自建元数据管理系统为 Flink SQL 治理 schema,1.11 版本开始,通过 hive metastore 与公司元数据管理系统交融。

最上层就是各个业务的应用层。

二、Flink 容器化实际

2.1 容器化实际

上图是实时平台 Flink 容器化的架构。Flink 容器化是基于 standalone 模式部署的。

  • 部署模式共有 client,jobmanager 和 taskmanager 三个角色,每一个角色都由一个 deployment 管制。
  • 用户通过平台上传工作 jar 包,配置等,存储于 hdfs 上。同时由平台保护的配置,依赖等也存储在 hdfs 上,当 pod 启动时,会进行拉取等初始化操作。
  • client 中主过程是一个由 go 开发的 agent,当 client 启动时,会首先查看集群状态,当集群 ready 后,从 hdfs 上拉取 jar 包向 Flink 集群提交工作。同时,client 的次要性能还有监控工作状态,做 savepoint 等操作。
  • 通过部署在每台物理机上的 smart – agent 采集容器的指标写入 m3,以及通过 Flink 暴漏的接口将 metrics 写入 prometheus,联合 grafana 展现。同样通过部署在每台物理机上的 vfilebeat 采集挂载进去的相干日志写入 es,在 dragonfly 能够实现日志检索。

■ Flink 平台化

在实际过程中,联合具体场景以及易用性思考,做了平台化工作。

  • 平台的工作配置与镜像,Flink 配置,自定义组件等解耦合,现阶段平台反对 1.7、1.9、1.11、1.12 等版本。
  • 平台反对流水线编译或上传 jar、作业配置、告警配置、生命周期治理等,从而缩小用户的开发成本。
  • 平台开发了容器级别的如火焰图等调优诊断的页面化性能,以及登陆容器的性能,反对用户进行作业诊断。

■ Flink 稳定性

在利用部署和运行过程中,不可避免的会出现异常。以下是平台保障工作在出现异常情况后的稳定性做的策略。

  • pod 的衰弱和可用,由 livenessProbe 和 readinessProbe 检测,同时指定 pod 的重启策略。
  • Flink 工作异样时:

    1.Flink 原生的 restart 策略和 failover 机制,作为第一层的保障。
    2. 在 client 中会定时监控 Flink 状态,同时将最新的 checkpoint 地址更新到本人的缓存中,并汇报到平台,固化到 MySQL 中。当 Flink 无奈再重启时,由 client 从新从最新的胜利 checkpoint 提交工作。作为第二层保障。这一层将 checkpoint 固化到 MySQL 中后,就不再应用 Flink HA 机制了,少了 zk 的组件依赖。
    3. 以后两层无奈重启时或集群出现异常时,由平台主动从固化到 MySQL 中的最新 chekcpoint 从新拉起一个集群,提交工作,作为第三层保障。

  • 机房容灾:

    • 用户的 jar 包,checkpoint 都做了异地双 HDFS 存储
    • 异地双机房双集群

2.2 kafka 监控计划

kafka 监控是咱们的工作监控里绝对重要的一部分,整体监控流程如下所示。

平台提供监控 kafka 沉积,生产 message 等配置信息,从 MySQL 中将用户 kafka 监控配置提取后,通过 jmx 监控 kafka,写入上游 kafka,再通过另一个 Flink 工作实时监控,同时将这些数据写入 ck,从而展现给用户。

三、Flink SQL 平台化建设

基于 k8s 的 Flink 容器化实现当前,不便了 Flink api 利用的公布,然而对于 Flink SQL 的工作依然不够便捷。于是平台提供了更加不便的在线编辑公布、SQL 治理等一栈式开发平台。

3.1 Flink SQL 计划

平台的 Flink SQL 计划如上图所示,工作公布零碎与元数据管理系统齐全解耦。

■ Flink SQL 工作公布平台化

在实际过程中,联合易用性思考,做了平台化工作,主操作界面如下图所示:

  • Flink SQL 的版本治理,语法校验,拓扑图治理等;
  • UDF 通用和工作级别的治理,反对用户自定义 UDF;
  • 提供参数化的配置界面,不便用户上线工作。

■ 元数据管理

平台在 1.11 之前通过构建本人的元数据管理系统 UDM,MySQL 存储 kafka,redis 等 schema,通过自定义 catalog 买通 Flink 与 UDM,从而实现元数据管理。1.11 之后,Flink 集成 hive 逐步欠缺,平台重构了 FlinkSQL 框架,通过部署一个 SQL – gateway service 服务,两头调用本人保护的 SQL – client jar 包,从而与离线元数据买通,实现了实时离线元数据对立,为之后的流批一体做好工作。在元数据管理系统创立的 Flink 表操作界面如下所示,创立 Flink 表的元数据,长久化到 hive 里,Flink SQL 启动时从 hive 里读取对应表的 table schema 信息。

3.2 Flink SQL 相干实际

平台对于官网原生反对或者不反对的 connector 进行整合和开发,镜像和 connector,format 等相干依赖进行解耦,能够快捷的进行更新与迭代。

■ FLINK SQL 相干实际

  • connector 层,现阶段平台反对官网反对的 connector,并且构建了 redis,kudu,clickhouse,vms,vdp 等平台外部的 connector。平台构建了外部的 pb format,反对 protobuf 实时荡涤数据的读取。平台构建了 kudu,vdp 等外部 catalog,反对间接读取相干的 schema,不必再创立 ddl。
  • 平台层次要是在 UDF、罕用运行参数调整、以及降级 hadoop3。
  • runntime 层次要是反对拓扑图执行打算批改、维表关联 keyBy cache 优化等

■ 拓扑图执行打算批改

针对现阶段 SQL 生成的 stream graph 并行度无奈批改等问题,平台提供可批改的拓扑预览批改相干参数。平台会将解析后的 FlinkSQL 的 excution plan json 提供给用户,利用 uid 保障算子的唯一性,批改每个算子的并行度,chain 策略等,也为用户解决反压问题提供办法。例如针对 clickhouse sink 小并发大批次的场景,咱们反对批改 clickhouse sink 并行度,source 并行度 = 72,sink 并行度 = 24,进步 clickhouse sink tps。

■ 维表关联 keyBy 优化 cache

针对维表关联的状况,为了升高 IO 申请次数,升高维表数据库读压力,从而升高提早,进步吞吐,有以下几种措施:

  • 当维表数据量不大时,通过全量维表数据缓存在本地,同时 ttl 管制缓存刷新的时候,这能够极大的升高 IO 申请次数,但会要求更多但内存空间。
  • 当维表数据量很大时,通过 async 和 LRU cache 策略,同时 ttl 和 size 来管制缓存数据的生效工夫和缓存大小,能够进步吞吐率并升高数据库的读压力。
  • 当维表数据量很大同时支流 qps 很高时,能够开启把维表 join 的 key 作为 hash 的条件,将数据进行分区,即在 calc 节点的分区策略是 hash,这样上游算子的 subtask 的维表数据是独立的,不仅能够进步命中率,也可升高内存应用空间。

优化之前维表关联 LookupJoin 算子和失常算子 chain 在一起。

优化之间维表关联 LookupJoin 算子和失常算子不 chain 在一起,将 join key 作为 hash 策略的 key。采纳这种形式优化之后,例如原先 3000W 数据量的维表,10 个 TM 节点,每个节点都要缓存 3000W 的数据,总共须要缓存 3000W * 10 = 3 亿的量。而通过 keyBy 优化之后,每个 TM 节点只须要缓存 3000W / 10 = 300W 的数据量,总共缓存的数据量只有 3000W,大大减少缓存数据量。

■ 维表关联提早 join

维表关联中,有很多业务场景,在维表数据新增数据之前,支流数据曾经产生 join 操作,会呈现关联不上的状况。因而,为了保证数据的正确,将关联不上的数据进行缓存,进行提早 join。

最简略的做法是,在维表关联的 function 里设置重试次数和重试距离,这个办法会增大整个流的提早,但支流 qps 不高的状况下,能够解决问题。

减少提早 join 的算子,当 join 维表未关联时,先缓存起来,依据设置重试次数和重试距离从而进行提早的 join。

四、利用案例

4.1 实时数仓

■ 实时数据入仓

  • 流量数据一级 kafka 通过实时荡涤之后,写到二级荡涤 kafka,次要是 protobuf 格局,再通过 Flink SQL 写入 hive 5min 表,以便做后续的准实时 ETL,减速 ods 层数据源的筹备工夫。
  • MySQL 业务库的数据,通过 VDP 解析造成 binlog cdc 音讯流,再通过 Flink SQL 写入 hive 5min 表。
  • 业务零碎通过 VMS API 产生业务 kafka 音讯流,通过 Flink SQL 解析之后写入 hive 5min 表。反对 string、json、csv 等音讯格局。
  • 应用 Flink SQL 做流式数据入仓,十分的不便,而且 1.12 版本曾经反对了小文件的主动合并,解决了小文件的痛点。
  • 咱们自定义分区提交策略,以后分区 ready 时候会调一下实时平台的分区提交 api,在离线调度定时调度通过这个 api 查看分区是否 ready。

采纳 Flink SQL 对立入仓计划当前,咱们能够取得的收益:可解决以前 Flume 计划不稳固的问题,而且用户可自助入仓,大大降低入仓工作的保护老本。晋升了离线数仓的时效性,从小时级升高至 5min 粒度入仓。

■ 实时指标计算

  • 实时利用生产荡涤后 kafka,通过 redis 维表、api 等形式关联,再通过 Flink window 增量计算 UV,长久化写到 Hbase 里。
  • 实时利用生产 VDP 音讯流之后,通过 redis 维表、api 等形式关联,再通过 Flink SQL 计算出销售额等相干指标,增量 upsert 到 kudu 里,不便依据 range 分区批量查问,最终通过数据服务对实时大屏提供最终服务。

以往指标计算通常采纳 Storm 形式,须要通过 api 定制化开发,采纳这样 Flink 计划当前,咱们能够取得的收益:将计算逻辑切到 Flink SQL 上,升高计算工作口径变动快,批改上线周期慢等问题。切换至 Flink SQL 能够做到疾速批改,疾速上线,升高保护老本。

■ 实时离线一体化 ETL 数据集成

Flink SQL 在最近的版本中继续强化了维表 join 的能力,不仅能够实时关联数据库中的维表数据,当初还能关联 Hive 和 Kafka 中的维表数据,能灵便满足不同工作负载和时效性的需要。

基于 Flink 弱小的流式 ETL 的能力,咱们能够对立在实时层做数据接入和数据转换,而后将明细层的数据回流到离线数仓中。

咱们通过将 presto 外部应用的 HyperLogLog (前面简称 HLL) 实现引入到 Spark UDAF 函数里,买通 HLL 对象在 Spark SQL 与 presto 引擎之间的互通,如 Spark SQL 通过 prepare 函数生成的 HLL 对象,不仅能够在 Spark SQL 里 merge 查问而且能够在 presto 里进行 merge 查问。具体流程如下:

UV 近似计算示例:

Step 1: Spark SQL 生成 HLL 对象

insert overwrite dws\_goods\_uv partition (dt=’${dt}’,hm=’${hm}’) AS select goods\_id, estimate\_prepare(mid) as pre\_hll from dwd\_table\_goods group by goods\_id where dt = ${dt} and hm = ${hm}

Step 2: Spark SQL 通过 goods\_id 维度的 HLL 对象 merge 成品牌维度

insert overwrite dws\_brand\_uv partition (dt=’${dt}’,hm=’${hm}’) AS select b.brand\_id, estimate\_merge(pre\_hll) as merge\_hll from dws\_table\_brand A left join dim\_table\_brand\_goods B on A.goods\_id = B.goods\_id where dt = ${dt} and hm = ${hm}

Step 3: Spark SQL 查问品牌维度的 UV

select brand\_id, estimate\_compute(merge\_hll) as uv from dws\_brand\_uv where dt = ${dt}

Step 4: presto merge 查问 park 生成的 HLL 对象

select brand\_id,cardinality(merge(cast(merge\_hll AS HyperLogLog))) uv from dws\_brand\_uv group by brand\_id

所以基于实时离线一体化 ETL 数据集成的架构,咱们能取得的收益:

  • 对立了根底公共数据源;
  • 晋升了离线数仓的时效性;
  • 缩小了组件和链路的保护老本。

4.2 试验平台(Flink 实时数据入 OLAP)

唯品会试验平台是通过配置多维度剖析和下钻剖析,提供海量数据的 A / B – test 试验成果剖析的一体化平台。一个试验是由一股流量(比方用户申请)和在这股流量上进行的绝对比照试验的批改组成。试验平台对于海量数据查问有着低提早、低响应、超大规模数据 (百亿级) 的需要。整体数据架构如下:

通过 Flink SQL 将 kafka 里的数据荡涤解析开展等操作之后,通过 redis 维表关联商品属性,通过分布式表写入到 clickhouse,而后通过数据服务 adhoc 查问。业务数据流如下:

咱们通过 Flink SQL redis connector,反对 redis 的 sink、source 维表关联等操作,能够很不便的读写 redis,实现维表关联,维表关联内可配置 cache,极大进步利用的 TPS。通过 Flink SQL 实现实时数据流的 pipeline,最终将大宽表 sink 到 CK 里,并依照某个字段粒度做 murmurHash3\_64 存储,保障雷同用户的数据都存在同一 shard 节点组内,从而使得 ck 大表之间的 join 变成 local 本地表之间的 join, 缩小数据 shuffle 操作,晋升 join 查问效率。

五、将来布局

5.1 进步 Flink SQL 易用性

以后咱们的 Flink SQL 调试起来很有很多不不便的中央,对于做离线 hive 用户来说还有肯定的应用门槛,例如手动配置 kafka 监控、工作的压测调优,如何能让用户的应用门槛升高至最低,是一个比拟大的挑战。未来咱们思考做一些智能监控通知用户当前任务存在的问题,尽可能自动化并给用户一些优化倡议。

5.2 数据湖 CDC 剖析计划落地

目前咱们的 VDP binlog 音讯流,通过 Flink SQL 写入到 hive ods 层,以减速 ods 层数据源的筹备工夫,然而会产生大量反复音讯去重合并。咱们会思考 Flink + 数据湖的 cdc 入仓计划来做增量入仓。此外,像订单打宽之后的 kafka 音讯流、以及聚合后果都须要十分强的实时 upsert 能力,目前咱们次要是用 kudu,然而 kudu 集群,比拟独立小众,保护老本高,咱们会调研数据湖的增量 upsert 能力来替换 kudu 增量 upsert 场景。

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群~


流动举荐:

仅需 99 元即可体验阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版!点击下方链接理解流动详情:https://www.aliyun.com/product/bigdata/sc?utm\_content=g\_1000250506

版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

正文完
 0