简介:唯品会 Flink 的容器化实际利用,Flink SQL 平台化建设,以及在实时数仓和试验平台上的利用案例。
转自 dbaplus 社群公众号
作者:王康,唯品会数据平台高级开发工程师
自 2017 年起,为保障外部业务在平时和大促期间的安稳运行,唯品会就开始基于 Kubernetes 深刻打造高性能、稳固、牢靠、易用的实时计算平台,当初的平台反对 Flink、Spark、Storm 等支流框架。
本文将分为五个方面,分享唯品会 Flink 的容器化实际利用以及产品化教训:
- 倒退概览
- Flink 容器化实际
- Flink SQL 平台化建设
- 利用案例
- 将来布局
一、倒退概览
1、集群规模
在集群规模方面,咱们有 2000+ 的物理机,次要部署 Kubernetes 异地双活的集群,利用 Kubernetes 的 namespaces,labels 和 taints 等实现业务隔离以及初步的计算负载隔离。
Flink 工作数、Flink SQL 工作数、Storm 工作数、Spark 工作数,这些线上实时利用加起来有 1000 多个。目前咱们次要反对 Flink SQL 这一块,因为 SQL 化是一个趋势,所以咱们要反对 SQL 工作的上线平台。
2、平台架构
咱们从下往上进行解析实时计算平台的整体架构:
- 资源调度层(最底层)
实际上是用 deployment 的模式运行 Kubernetes 上,平台尽管反对 yarn 调度,然而 yarn 调度与批工作共享资源,所以支流工作还是运行在 Kubernetes 上的。并且,yarn 调度这一层次要是离线部署的一套 yarn 集群。在 2017 年的时候,咱们自研了 Flink on Kubernetes 的一套计划,因为底层调度分了两层,所以在大促资源缓和的时候,实时跟离线就能够做一个资源的借调。
- 存储层
次要用来反对公司外部基于 Kafka 的实时数据 vms,基于 binlog 的 vdp 数据和原生 Kafka 作为音讯总线,状态存储在 HDFS 上,数据次要存入 Redis、MySQL、HBase、Kudu、HDFS、ClickHouse 等。
- 计算引擎层
次要是 Flink、Storm、Spark,目前主推的是 Flink,每个框架会都会反对几个版本的镜像以满足不同的业务需要。
- 实时平台层
次要提供作业配置、调度、版本治理、容器监控、job 监控、告警、日志等性能,提供多租户的资源管理(quota,label 治理)以及 Kafka 监控。资源配置也分为大促日和平时日,大促的资源和平时的资源是不一样的,资源的权限管控也是不一样的。在 Flink 1.11 版本之前,平台自建元数据管理系统为 Flink SQL 治理 schema;从 1.11 版本开始,则是通过 Hive metastore 与公司元数据管理系统交融。
- 应用层
次要是反对实时大屏、举荐、试验平台、实时监控和实时数据荡涤的一些场景。
二、Flink 容器化实际
1、容器化计划
下面是实时平台 Flink 容器化的架构图。Flink 容器化其实是基于 Standalone 模式部署的。
咱们的部署模式共有 Client、Job Manager、Task Manager 三个角色,每一个角色都会有一个 Deployment 来管制。
用户通过平台上传工作 jar 包、配置等,存储于 HDFS 上。同时由平台保护的配置、依赖等也存储在 HDFS 上,当 pod 启动时,就会进行拉取等初始化操作。
Client 中主过程是一个由 go 开发的 agent,当 Client 启动时,会首先查看集群状态,当集群筹备好后,从 HDFS 上拉取 jar 包,再向这个集群提交工作。Client 的次要工作是做容错,它次要性能还有监控工作状态,做 savepoint 等操作。
通过部署在每台物理机上的 smart-agent 采集容器的指标写入 m3,以及通过 Flink 暴漏的接口将 metrics 写入 prometheus,联合 grafana 展现。同样通过部署在每台物理机上的 vfilebeat 采集挂载进去的相干日志写入 es,在 dragonfly 能够实现日志检索。
1)Flink 平台化
在实际过程中,肯定要联合具体场景和易用性,再去思考做平台化工作。
2)Flink 稳定性
在咱们利用部署以及运行过程中,异样是不可避免的,这时候平台就须要做一些保障工作在出现异常情况后,仍旧放弃稳定性的一些策略。
- pod 的衰弱和可用:
由 livenessProbe 和 readinessProbe 检测,同时指定 pod 的重启策略,Kubernetes 自身能够做一个 pod 的拉起。
- Flink 工作产生异样时:
Flink 有自已自身的一套 restart 策略和 failover 机制,这是它的第一层保障。
在 Client 中会定时监控 Flink 状态,同时将最新的 checkpoint 地址更新到本人的缓存中,并汇报到平台,而后固化到 MySQL 中。当 Flink 无奈再重启时,由 Client 从新从最新的胜利 checkpoint 提交工作。这是它的第二层保障。
这一层将 checkpoint 固化到 MySQL 中后,就不再应用 Flink HA 机制了,少了 zk 的组件依赖。
以后两层无奈重启时或集群出现异常时,由平台主动从固化到 MySQL 中的最新 checkpoint 从新拉起一个集群,提交工作,这是它的第三层保障。
- 机房容灾:
用户的 jar 包,checkpoint 都做了异地双 HDFS 存储。
异地双机房双集群。
2、Kafka 监控计划
Kafka 监控是工作监控里十分重要的一个环节,整体的流程如下:
平台提供监控 Kafka 沉积,用户在界面上,能够配置本人的 Kafka 监控,告知在怎么的集群,以及用户生产 message 等配置信息。能够从 MySQL 中将用户 Kafka 监控配置提取后,再通过 jmx 监控 Kafka,这样的信息采集之后,写入上游 Kafka,再通过另一个 Flink 工作实时监控告警,同时将这些数据同步写入 ck 外面,从而反馈给咱们的用户(这里也能够不必 ck,用 Prometheus 去做监控也是能够的,但 ck 会更加适宜),最初再用 Grafana 组件去展现给用户。
三、Flink SQL 平台化建设
有了后面 Flink 的容器化计划之后,就要开始 Flink SQL 平台化建设了。大家都晓得,这样流式的 api 开发起来,还是有肯定的老本的。Flink 必定是比 Storm 快的,也绝对比较稳定、容易一些,然而对于一些用户,特地是 Java 开发的一些同学来说,做这个是有肯定门槛的。
Kubernetes 的 Flink 容器化实现当前,不便了 Flink api 利用的公布,然而对于 Flink SQL 的工作依然不够便当。于是平台提供了更加不便的在线编辑公布、SQL 治理等一栈式开发平台。
1、Flink SQL 计划
平台的 Flink SQL 计划如上图所示,工作公布零碎与元数据管理系统是齐全解耦的。
1)Flink SQL 工作公布平台化
在实际过程中,须要思考易用性,做平台化工作,主操作界面如下图所示:
- Flink SQL 的版本治理、语法校验、拓扑图治理等;
- UDF 通用和工作级别的治理,反对用户自定义 udf;
- 提供参数化的配置界面,不便用户上线工作。
下图是一个用户界面配置的例子:
下图是一个集群配置的范例:
2)元数据管理
平台在 1.11 之前通过构建本人的元数据管理系统 UDM,MySQL 存储 Kafka,Redis 等 schema,通过自定义 catalog 买通 Flink 与 UDM,从而实现元数据管理。
在 1.11 之后,Flink 集成 Hive 逐步欠缺,平台重构了 Flink SQL 框架,并通过部署一个 SQL-gateway service 服务,两头调用本人保护的 SQL-Client jar 包,从而与离线元数据买通,实现了实时离线元数据的对立,为之后的流批一体打好了根底。
在元数据管理系统创立的 Flink 表操作界面如下图所示:创立 Flink 表的元数据,长久化到 Hive 里,Flink SQL 启动时从 Hive 里读取对应表的 table schema 信息。
2、Flink SQL 相干实际
平台对于官网原生反对或者不反对的 connector 进行整合和开发,镜像和 connector,format 等相干依赖进行解耦,能够快捷的进行更新与迭代。
1)Flink SQL 相干实际
Flink SQL 次要分为以下三层:
- connector 层
反对 VDP connector 读取 source 数据源;
反对 Redis string、hash 等数据类型的 sink & 维表关联;
反对 kudu connector & catalog & 维表关联;
反对 protobuf format 解析实时荡涤数据;
反对 vms connector 读取 source 数据源;
反对 ClickHouse connector sink 分布式表 & 本地表高 TPS 写入;
Hive connector 反对数坊 Watermark Commit Policy 分区提交策略 & array、decimal 等简单数据类型。
- runtime 层
次要反对拓扑图执行打算批改;
维表关联 keyBy 优化 cache 晋升查问性能;
维表关联提早 join。
- 平台层
Hive UDF;
反对 json HLL 相干处理函数;
反对 Flink 运行相干参数设置如 minibatch、聚合优化参数;
Flink 降级 hadoop3。
2)拓扑图执行打算批改
针对现阶段 SQL 生成的 stream graph 并行度无奈批改等问题,平台提供可批改的拓扑预览批改相干参数。平台会将解析后的 FlinkSQL 的 excution plan json 提供给用户,利用 uid 保障算子的唯一性,批改每个算子的并行度,chain 策略等,也为用户解决反压问题提供办法。例如针对 ClickHouse sink 小并发大批次的场景,咱们反对批改 ClickHouse sink 并行度,source 并行度 = 72,sink 并行度 = 24,进步 ClickHouse sink tps。
3)维表关联 keyBy 优化 cache
针对维表关联的状况,为了升高 IO 申请次数,升高维表数据库读压力,从而升高提早,进步吞吐,有以下三种措施:
上面是维表关联 KeyBy 优化 cache 的图:
在优化之前的时候,维表关联 LookupJoin 算子和失常算子 chain 在一起,优化之间维表关联 Lookup Join 算子和失常算子不 chain 在一起,将 join key 作为 hash 策略的 key。
采纳这种形式优化后,例如原来的 3000W 数据量维表,10 个 TM 节点,每个节点都要缓存 3000W 的数据,总共须要缓存 3 亿的量。而通过 keyBy 优化之后,每个 TM 节点只须要缓存 3000W/10 = 300W 的数据量,总共缓存的数据量只有 3000W,这十分大程度缩小了缓存数据量。
4)维表关联提早 join
维表关联中,有很多业务场景,在维表数据新增数据之前,支流数据曾经产生 join 操作,会呈现关联不上的状况。因而,为了保证数据的正确,将关联不上的数据进行缓存,进行提早 join。
最简略的做法是,在维表关联的 function 里设置重试次数和重试距离,这个办法会增大整个流的提早,但支流 qps 不高的状况下,能够解决问题。
减少提早 join 的算子,当 join 维表未关联时,先缓存起来,依据设置重试次数和重试距离从而进行提早的 join。
四、利用案例
1、实时数仓
1)实时数据入仓
实时数仓次要分为三个过程:
- 流量数据一级 Kafka 进行实时数据荡涤后,能够写到二级荡涤 Kafka,次要是 protobuf 格局,再通过 Flink SQL 写入 Hive 5min 表,以便做后续的准实时 ETL,减速 ods 层数据源的筹备工夫。
- MySQL 业务库的数据,通过 VDP 解析造成 binlog cdc 音讯流,再通过 Flink SQL 写入 Hive 5min 表,同时会提交到自定义分区,再把分区状态汇报到服务接口,最初再做一个离线的调度。
- 业务零碎通过 VMS API 产生业务 Kafka 音讯流,通过 Flink SQL 解析之后写入 Hive 5min 表。能够反对 string、json、csv 等音讯格局。
应用 Flink SQL 做流式数据入仓是十分不便的,而且 1.12 版本曾经反对了小文件的主动合并,解决了大数据层一个十分广泛的痛点。
咱们自定义分区提交策略,以后分区 ready 时候会调一下实时平台的分区提交 api,在离线调度定时调度通过这个 api 查看分区是否 ready。
采纳 Flink SQL 对立入仓计划当前,咱们可取得以下成绩:
首先咱们不仅解决了以往 Flume 计划不稳固的问题,用户也能够实现自助入仓,大大降低入仓工作的保护老本,稳定性也能够失去保障。
其次咱们还晋升了离线数仓的时效性,从小时级升高至 5min 粒度入仓,时效性能够加强。
2)实时指标计算
- 实时利用生产荡涤后 Kafka,通过 Redis 维表、api 等形式关联,再通过 Flink window 增量计算 UV,长久化写到 HBase 里。
- 实时利用生产 VDP 音讯流之后,通过 Redis 维表、api 等形式关联,再通过 Flink SQL 计算出销售额等相干指标,增量 upsert 到 kudu 里,不便依据 range 分区批量查问,最终通过数据服务对实时大屏提供最终服务。
以往指标计算通常采纳 Storm 形式,这个形式须要通过 api 定制化开发,采纳这样 Flink 计划当前,咱们能够取得了以下成绩:
将计算逻辑切到 Flink SQL 上,升高计算工作口径变动快,解决批改上线周期慢等问题;
切换至 Flink SQL 能够做到疾速批改,并且实现疾速上线,升高了保护的老本。
3)实时离线一体化 ETL 数据集成
具体的流程如下图所示:
Flink SQL 在最近的版本中继续强化了维表 join 的能力,不仅能够实时关联数据库中的维表数据,还能关联 Hive 和 Kafka 中的维表数据,能灵便满足不同工作负载和时效性的需要。
基于 Flink 弱小的流式 ETL 的能力,咱们能够对立在实时层做数据接入和数据转换,而后将明细层的数据回流到离线数仓中。
咱们通过将 presto 外部应用的 HyperLogLog(前面简称 HLL)实现引入到 Spark UDAF 函数里,买通 HLL 对象在 Spark SQL 与 presto 引擎之间的互通。如 Spark SQL 通过 prepare 函数生成的 HLL 对象,不仅能够在 Spark SQL 里 merge 查问而且能够在 presto 里进行 merge 查问。
具体流程如下:
UV 近似计算示例:
2、试验平台(Flink 实时数据入 OLAP)
唯品会试验平台是通过配置多维度剖析和下钻剖析,提供海量数据的 A/B-test 试验成果剖析的一体化平台。一个试验是由一股流量(比方用户申请)和在这股流量上进行的绝对比照试验的批改组成。试验平台对于海量数据查问有着低提早、低响应、超大规模数据(百亿级)的需要。
整体数据架构如下:
- 离线数据是通过 waterdrop 导入到 ClickHouse 外面去;
- 实时数据通过 Flink SQL 将 Kafka 里的数据荡涤解析开展等操作之后,通过 Redis 维表关联商品属性,通过分布式表写入到 ClickHouse,而后通过数据服务 adhoc 查问,通过数据服务提供对外的接口。
业务数据流如下:
咱们的试验平台有一个很重要的 ES 场景,咱们上线一个利用场景后,如果我想看成果如何,包含上线产生的曝光、点击、加购、珍藏是怎么的。咱们须要把每一个数据的明细,比如说分流的一些数据,依据场景分区,写到 ck 外面去。
咱们通过 Flink SQL Redis connector,反对 Redis 的 sink、source 维表关联等操作,能够很不便地读写 Redis,实现维表关联,维表关联内可配置 cache,极大进步利用的 TPS。通过 Flink SQL 实现实时数据流的 pipeline,最终将大宽表 sink 到 CK 里,并依照某个字段粒度做 murmurHash3_64 存储,保障雷同用户的数据都存在同一 shard 节点组内,从而使得 ck 大表之间的 join 变成 local 本地表之间的 join,缩小数据 shuffle 操作,晋升 join 查问效率。
五、将来布局
1、进步 Flink SQL 易用性
Flink SQL 对于 Hive 用户来说,应用起来还是有一点不一样的中央。不论是 Hive,还是 Spark SQL,都是批量解决的一个场景。
所以以后咱们的 Flink SQL 调试起来仍有很多不不便的中央,对于做离线 Hive 的用户来说还有肯定的应用门槛,例如手动配置 Kafka 监控、工作的压测调优。所以如何能让用户的应用门槛降至最低,让用户只须要懂 SQL 或者懂业务,把 Flink SQL 外面的概念对用户屏蔽掉,简化用户的应用流程,是一个比拟大的挑战。
未来咱们思考做一些智能监控,通知用户当前任务存在的问题,不须要用户去学习太多的货色,尽可能自动化并给用户一些优化倡议。
2、数据湖 CDC 剖析计划落地
一方面,咱们做数据湖次要是为了解决咱们 binlog 实时更新的场景,目前咱们的 VDP binlog 音讯流,通过 Flink SQL 写入到 Hive ods 层,以减速 ods 层数据源的筹备工夫,然而会产生大量反复音讯去重合并。咱们会思考 Flink + 数据湖的 cdc 入仓计划来做增量入仓。
另一方面咱们心愿通过数据湖,来代替咱们 Kudu,咱们这边一部分重要的业务在用 Kudu。尽管 Kudu 没有大量的应用,但鉴于 Kudu 的运维比个别的数据库运维简单得多、比拟小众,并且像订单打宽之后的 Kafka 音讯流、以及聚合后果都须要十分强的实时 upsert 能力,所以咱们就开始调研 CDC+ 数据湖这种解决方案,用这种计划的增量 upsert 能力来替换 kudu 增量 upsert 场景。
Q&A
Q1:vdp connector 是 MySQL binlog 读取吗?和 canal 是一种工具吗?
A1:vdp 是公司 binlog 同步的一个组件,将 binlog 解析之后发送到 Kafka。是基于 canal 二次开发的。咱们定义了一个 cdc format 能够对接公司的 vdp Kafka 数据源,与 Canal CDC format 有点相似。目前没有开源,使咱们公司用的 binlog 的一个同步计划。
Q2 : uv 数据输入到 HBase,销售数据输入到 kudu,输入到了不同的数据源,次要是因为什么采取的这种策略?
A2:kudu 的利用场景没有 HBase 这么宽泛。uv 实时写入的 TPS 比拟高,HBase 比拟适宜单条查问的场景,写入 HBase 高吞吐 + 低提早,小范畴查问提早低;kudu 的话具备一些 OLAP 的个性,能够存订单类明细,列存减速,联合 Spark、presto 等做 OLAP 剖析。
Q3 : 请问一下,你们怎么解决的 ClickHouse 的数据更新问题?比方数据指标更新。
A3 : ck 的更新是异步 merge,只能在同一 shard 同一节点同一分区内异步 merge,是弱一致性。对于指标更新场景不太倡议应用 ck。如果在 ck 里有更新强需要的场景,能够尝试 AggregatingMergeTree 解决方案,用 insert 替换 update,做字段级的 merge。
Q4:binlog 写入怎么保证数据的去重和一致性?
A4 : binlog 目前还没有写入 ck 的场景,这个计划看起来不太成熟。不倡议这么做,能够用采纳 CDC + 数据湖的解决方案。
Q5 : 如果 ck 各个节点写入不平衡,怎么去监控,怎么解决?怎么样看数据歪斜呢?
A5:能够通过 ck 的 system.parts 本地表监控每台机器每个表每个分区的写入数据量以及 size,来查看数据分区,从而定位到某个表某台机器某个分区。
Q6 : 你们在实时平台是如何做工作监控或者健康检查的?又是如何在出错后主动复原的?当初用的是 yarn-application 模式吗?存在一个 yarn application 对应多个 Flink job 的状况吗?
A6 : 对于 Flink 1.12+ 版本,反对了 PrometheusReporter 形式裸露一些 Flink metrics 指标,比方算子的 watermark、checkpoint 相干的指标如 size、耗时、失败次数等要害指标,而后采集、存储起来做工作监控告警。
Flink 原生的 restart 策略和 failover 机制,作为第一层的保障。
在 Client 中会定时监控 Flink 状态,同时将最新的 checkpoint 地址更新到本人的缓存中,并汇报到平台,固化到 MySQL 中。当 Flink 无奈再重启时,由 Client 从新从最新的胜利 checkpoint 提交工作。作为第二层保障。这一层将 checkpoint 固化到 MySQL 中后,就不再应用 Flink HA 机制了,少了 zk 的组件依赖。
以后两层无奈重启时或集群出现异常时,由平台主动从固化到 MySQL 中的最新 chekcpoint 从新拉起一个集群,提交工作,作为第三层保障。
咱们反对 yarn-per-job 模式,次要基于 Flink on Kubernetes 模式部署 standalone 集群。
Q7 : 目前你们大数据平台上所有的组件都是容器化的还是混合的?
A7:目前咱们实时这一块的组件 Flink、Spark、Storm、Presto 等计算框架实现了容器化,详情可看上文 1.2 平台架构。
Q8 :kudu 不是在 Kubernetes 上跑的吧?
A8:kudu 不是在 Kubernetes 上运行,这个目前还没有特地成熟的计划。并且 kudu 是基于 cloudera manager 运维的,没有上 Kubernetes 的必要。
Q9 : Flink 实时数仓维度表存到 ck 中,再去查问 ck,这样的计划能够吗?
A9:这是能够的,是能够值得尝试的。事实表与维度表数据都能够存,能够依照某个字段做哈希(比方 user_id),从而实现 local join 的成果。
原文链接
本文为阿里云原创内容,未经容许不得转载。