简介:阿里云 EMR OLAP 与 Flink 团队深度单干,反对了 Flink 到 ClickHouse 的 Exactly-Once 写入来保障整个实时数仓数据的准确性。本文介绍了基于 EMR OLAP 的开源实时数仓解决方案。
作者简介
阿里云 EMR-OLAP 团队;次要负责开源大数据 OLAP 引擎的研发,例如 ClickHouse,Starrocks,Trino 等。通过 EMR 产品向阿里云用户提供一站式的大数据 OLAP 解决方案。
内容框架
- 背景
- 机制梳理
- 技术计划
- 测试后果
- 将来布局
一、背景
Flink 和 ClickHouse 别离是实时流式计算和 OLAP 畛域的翘楚,很多互联网、广告、游戏等客户都将两者联结应用于构建用户画像、实时 BI 报表、利用监控指标查问、监控等业务,造成了实时数仓解决方案(如图 -1)。这些业务对数据的准确性要求都非常严格,所以实时数仓整个链路须要保障端到端的 Exactly-Once。
通常来说 Flink 的上游是能够反复读取或者生产的 pull-based 长久化存储(例如 Kafka),要实现 Source 端的 Exactly-Once 只须要回溯 Source 端的读取进度即可。Sink 端的 Exactly-Once 则比较复杂,因为 Sink 是 push-based 的,须要依赖指标输入零碎的事务保障,但社区 ClickHouse 对事务并不反对。
所以针对此状况,阿里云 EMR ClickHouse 与 Flink 团队一起深度研发,反对了 Flink 到 ClickHouse 的 Exactly-Once 写入来保障整个实时数仓数据的准确性。本文将别离介绍下现有机制以及实现计划。
图 -1 实时数仓架构
二 机制梳理
ClickHouse 写入机制
ClickHouse 是一个 MPP 架构的列式 OLAP 零碎(如图 -2),各个节点是对等的,通过 Zookeeper 协同数据,能够通过并发对各个节点写本地表的形式进行大批量的数据导入。
ClickHouse 的 data part 是数据存储的最小单元,ClickHouse 接管到的数据 Block 在写入时,会依照 partition 粒度进行拆分,造成一个或多个 data part。data part 在写入磁盘后,会通过后盾 merge 线程一直的合并,将小块的 data part 合并成大块的 data part,以此升高存储和读取的开销。
在向本地表写入数据时,ClickHouse 首先会写入一个长期的 data part,这个长期 data part 的数据对客户端不可见,之后会间接进行 rename 操作,使这个长期 data part 成为正式 data part,此时数据对客户端可见。简直所有的长期 data part 都会疾速地胜利被 rename 成正式 data part,没有被 rename 胜利的长期 data part 最终将被 ClickHouse 清理策略从磁盘上删除。
通过上述剖析,能够看出 ClickHouse 的数据写入有一个从长期 data part 转为正式 data part 的机制,加以批改能够合乎两阶段提交协定,这是实现分布式系统中事务提交一致性的重要协定。
图 -2 Flink 作业写入 ClickHouse
注:多个 Flink Task 能够写入同一个 shard 或 replica
Flink 写机制
Flink 作为一个分布式解决引擎,提供了基于事务的 Sink 机制,该机制能够保障写入的 Exactly-Once,相应的数据接管方须要提供恪守 XA 标准的 JDBC。因为残缺的 XA 标准相当简单,因而,咱们先对 Flink 的解决机制进行梳理,联合 ClickHouse 的理论状况,确定须要实现的接口范畴。
为了实现分布式写入时的事务提交对立,Flink 借助了 checkpoint 机制。该机制可能周期性地将各个 Operator 中的状态生成快照并进行长久化存储。在 checkpoint 机制中,有一个 Coordinator 角色,用来协调所有 Operator 的行为。从 Operator 的角度来看,一次 checkpoint 有三个阶段,初始化 –> 生成快照 –> 实现 / 废除 checkpoint。从 Coordinator 的角度来看,须要定时触发 checkpoint,以及在所有 Operator 实现快照后,触发 complete 告诉。(参考附录 1)
接下来介绍 Flink 中的 Operator 是如何借助事务和 checkpoint 机制来保障 Exactly-Once,Operator 的残缺执行须要通过 initial、writeData、snapshot、commit 和 close 阶段。
initial 阶段:
从快照中取出上次工作执行时长久化的 xid 记录。快照中次要存储两种 xid,一组是未实现 snapshot 阶段的 xid,一组是曾经实现了 snapshot 的 xid。
接下来对上次未实现 snapshot 的 xid 进行 rollback 操作;对上次曾经实现了 snapshot 但 commit 未胜利的 xid 进行 commit 重试操作。
若上述操作失败,则工作初始化失败,工作停止,进入 close 阶段;若上述操作胜利,则持续。
创立一个新的惟一的 xid,作为本次事务 ID,将其记录到快照中。
应用新生成的 xid,调用 JDBC 提供的 start() 接口。
writeData 阶段:
事务开启后,进入写数据的阶段,Operator 的大部分工夫都会处于这个阶段。在与 ClickHouse 的交互中,此阶段为调用 JDBC 提供的 preparedStatement 的 addBatch() 和 executeBatch() 接口,每次写数据时都会在报文中携带以后 xid。
在写数据阶段,首先将数据写到 Operator 内存中,向 ClickHouse 提交内存中的批量数据有三种触发形式:内存中的数据条数达到 batchsize 的阈值;后盾定时线程每隔一段时间触发主动 flush;在 snapshot 阶段调用 end() 和 prepare() 接口之前会调用 flush 清空缓存。
snapshot 阶段:
以后事务会调用 end() 和 prepare() 接口,期待 commit,并更新快照中的状态。
接下来,会开启一个新的事务,作为本 Task 的下一次 xid,将新事务记录到快照中,并调用 JDBC 提供的 start() 接口开启新事务。
将快照长久化存储。
complete 阶段:
在所有 Operator 的 snapshot 阶段全副失常实现后,Coordinator 会告诉所有 Operator 对曾经胜利的 checkpoint 进行 complete 操作,在与 ClickHouse 的交互中,此阶段为 Operator 调用 JDBC 提供的 commit() 接口对事务进行提交。
close 阶段:
若以后事务尚未进行到 snapshot 阶段,则对以后事务进行 rollback 操作。
敞开所有资源。
从上述流程能够总结出,Flink 通过 checkpoint 和事务机制,将上游数据按 checkpoint 周期宰割成批,保障每一批数据在全副写入实现后,再由 Coordinator 告诉所有 Operator 共同完成 commit 操作。当有 Operator 写入失败时,将会退回到上次胜利的 checkpoint 的状态,并依据快照记录的 xid 对这一批 checkpoint 的所有 xid 进行 rollback 操作。在有 commit 操作失败时,将会重试 commit 操作,依然失败将会交由人工染指解决。
三、技术计划
整体计划
依据 Flink 和 ClickHouse 的写入机制,能够描绘出一个 Flink 到 ClickHouse 的事务写入的时序图(如图 -3)。因为写的是 ClickHouse 的本地表,并且事务的对立提交由 Coordinator 保障,因而 ClickHouse 无需实现 XA 标准中规范的分布式事务,只需实现两阶段提交协定中的多数要害接口,其余接口在 JDBC 侧进行缺省即可。
图 -3 Flink 到 ClickHouse 事务写入的时序图
ClickHouse-Server
状态机
为了实现 ClickHouse 的事务,咱们首先定义一下所要实现的事务容许的几种操作:
- Begin:开启一个事务。
- Write Data:在一个事务内写数据。
- Commit:提交一个事务。
- Rollback:回滚一个未提交的事务。
事务状态:
- Unknown:事务未开启,此时执行任何操作都是非法的。
- Initialized:事务已开启,此时容许所有操作。
- Committing:事务正在被提交,不再容许 Begin/Write Data 两种操作。
- Committed:事务曾经被提交,不再容许任何操作。
- Aborting:事务正在被回滚,不再容许任何操作。
- Aborted:事务曾经被回滚,不再容许任何操作。
残缺的状态机如下图 - 4 所示:
图 -4 ClickHouse Server 反对事务的状态机
图中所有操作均是幂等的。其中,Committing 到 Committed 和 Aborting 到 Aborted 是不须要执行任何操作的,在开始执行 Commit 或 Rollback 时,事务的状态即转成 Committing 或 Aborting;在执行完 Commit 或 Rollback 之后,事务的状态会被设置成 Committed 或 Aborted。
事务处理
Client 通过 HTTP Restful API 拜访 ClickHouse Server,Client 与 ClickHouse Server 间一次残缺事务的交互过程如图 - 5 所示:
图 -5 Clickhouse 事务处理的时序图
失常流程:
- Client 向 ClickHouse 集群任意一个 ClickHouse Server 发送 Begin Transaction 申请,并携带由 Client 生成的全局惟一的 Transaction ID。ClickHouse Server 收到 Begin Transaction 申请时,会向 Zookeeper 注册该 Transaction ID(包含创立 Transaction ID 及子 Znode 节点),并初始化该 Transaction 的状态为 Initialized。
- Client 接管到 Begin Transaction 胜利响应时,能够开始写入数据。当 ClickHouse Server 收到来自 Client 发送的数据时,会生成长期 data part,但不会将其转为正式 data part,ClickHouse Server 会将写入的长期 data part 信息,以 JSON 的模式,记录到 Zookeeper 上该 Transaction 的信息中。
- Client 实现数据的写入后,会向 ClickHouse Server 发送 Commit Transaction 申请。ClickHouse Server 在收到 Commit Transaction 申请后,依据 ZooKeeper 上对应的 Transaction 的 data part 信息,将 ClickHouse Server 本地长期 data part 数据转为正式的 data part 数据,并更新 Transaction 状态为 Committed。Rollback 的过程与 Commit 相似。
异样解决:
- 如果创立 Transaction ID 过程中发现 Zookeeper 中曾经存在雷同 Transaction ID,依据 Zookeeper 中记录的 Transaction 状态进行解决:如果状态是 Unknown 则持续进行解决;如果状态是 Initialized 则间接返回;否则会抛异样。
- 目前实现的事务还不反对分布式事务,只反对单机事务,所以 Client 只能往记录该 Transaction ID 的 ClickHouse Server 节点写数据,如果 ClickHouse Server 接管到到非该节点事务的数据,ClickHouse Server 会间接返回错误信息。
- 与写入数据不同,如果 Commit 阶段 Client 向未记录该 Transaction ID 的 ClickHouse Server 发送了 Commit Transaction 申请,ClickHouse Server 不会返回错误信息,而是返回记录该 Transaction ID 的 ClickHouse Server 地址给 Client,让 Client 端重定向到正确的 ClickHouse Server。Rollback 的过程与 Commit 相似。
ClickHouse-JDBC
依据 XA 标准,残缺的分布式事务机制须要实现大量的标准接口(参考附录 2)。在本设计中,实际上只须要实现大量要害接口,因而,采纳了基于组合的适配器模式,向 Flink 提供基于规范 XA 接口的 XAResource 实现,同时对 ClickHouse Server 屏蔽了不须要反对的接口。
对于 XADataSource 的实现,采纳了基于继承的适配器模式,并针对 Exactly-Once 的个性,批改了局部默认配置,如发送失败的重试次数等参数。
另外,在生产环境中,通常不会通过分布式表,而是通过 SLB 进行数据写入时的负载平衡。在 Exactly-Once 场景中,Flink 侧的 Task 须要放弃针对某一 ClickHouse Server 节点的连贯,因而不能应用 SLB 的形式进行负载平衡。针对这一问题,咱们借鉴了 BalanceClickHouseDataSource 的思路,通过在 URL 中配置多个 IP,并在 properties 配置中将 write_mode 设置为 Random,能够使 XADataSource 在保障 Exactly-Once 的同时,具备负载平衡的能力。
Flink-Connector-ClickHouse
Flink 作为一个流式数据处理引擎,反对向多种数据接收端写入的能力,每种接收端都须要实现特定的 Connector。针对 Exactly-Once,ClickHouse Connector 减少了对于 XADataSource 的选项配置,依据客户端的配置提供 Exactly-Once 性能。
四、测试后果
ClickHouse 事务性能测试
写入 ClickHouse 单批次数据量和总批次雷同,Client 端并发写线程不同性能比拟。由图 - 6 能够看出,无论 ClickHouse 是否开启事务, ClickHouse 的吞吐量都与 Client 端并发写的线程数成正比。开启事务时,ClickHouse 中长期 data part 不会立即被转为正式 data part,所以在事务实现前大量长期 data part 不会参加 ClickHouse merge 过程,升高磁盘 IO 对写性能的影响,所以开启事务写性能较未开启事务写性能更好;但事务内蕴含的批次变多,长期 data part 在磁盘上的增多导致了合并时 CPU 的压力增大,从而影响了写入的性能,开启事务的写性能也会升高。
图 -6 ClickHouse 写入性能压测(一)
- 写入 ClickHouse 总批次 和 Client 端并发写线程雷同,单批次写入 ClickHouse 数据量不同性能比拟。
- 由图 - 7 能够看出,无论 ClickHouse 是否开启事务, ClickHouse 的吞吐量都与单批次数据量大小成正比。开启事务时,每批次数据越小,ClickHouse 的吞吐量受事务是否开启的影响就越大,这是因为每批次写入的工夫在事务处理的占比拟小,事务会对此产生肯定的影响,因而,一次事务蕴含的批次数量越多,越可能缩小事务对写入性能的影响;当事务蕴含批次的增大,事务处理工夫在写入中的占比逐步升高,ClickHouse merge 产生的影响越来越大,从而影响了写入的性能,开启事务较不开启事务写性能更好。
图 -7 ClickHouse 写入性能压测(二)
总体来说,开启事务对写入性能简直没有影响,这个论断是合乎咱们预期的。
Flink 写入 ClickHouse 性能比拟
- 对于雷同数据量和不同 checkpoint 周期,Flink 写入 ClickHouse 总耗时如图 - 8 所示。能够看出,checkpoint 周期对于不开启 Exactly-Once 的工作耗时没有影响。对于开启 Exactly-Once 的工作,在 5s 到 60s 的范畴内,耗时出现一个先升高后增长的趋势。起因是在 checkpoint 周期较短时,开启 Exactly-Once 的 Operator 与 Clickhouse 之间无关事务的交互过于频繁;在 checkpoint 周期较长时,开启 Exactly-Once 的 Operator 须要期待 checkpoint 周期完结能力提交最初一次事务,使数据可见。在本测试中,checkpoint 周期数据仅作为一个参考,生产环境中,须要依据机器规格和数据写入速度进行调整。
- 总体来说,Flink 写入 Clickhouse 时开启 Exactly-Once 个性,性能会稍有影响,这个论断是合乎咱们预期的。
图 -8 Flink 写入 ClickHouse 测试
五、将来布局
该版本 EMR ClickHouse 实现的事务还不是很欠缺,只反对单机事务,不反对分布式事务。分布式系统个别都是通过 Meta Server 来做对立元数据管理来反对分布式事务机制。以后咱们也正在规划设计 ClickHouse MetaServer 来反对分布式事务,同时能够移除 ClickHouse 对 ZooKeeper 的依赖。
原文链接
本文为阿里云原创内容,未经容许不得转载。