更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
摘要
字节数据中台 DataLeap 的 Data Catalog 零碎通过接管 MQ 中的近实时音讯来同步局部元数据。Apache Atlas 对于实时音讯的生产解决不满足性能要求,外部应用 Flink 工作的解决计划在 ToB 场景中也存在诸多限度,所以团队自研了轻量级异步音讯解决框架,很好的反对了字节外部和火山引擎上同步元数据的诉求。本文定义了需要场景,并具体介绍框架的设计与实现。
背景
动机
字节数据中台 DataLeap 的 Data Catalog 零碎基于 Apache Atlas 搭建,其中 Atlas 通过 Kafka 获取内部零碎的元数据变更音讯。在开源版本中,每台服务器反对的 Kafka Consumer 数量无限,在每日百万级音讯体量下,常常有长延时等问题,影响用户体验。
在 2020 年底,咱们针对 Atlas 的音讯生产局部做了重构,将音讯的生产和解决从后端服务中剥离进去,并编写了 Flink 工作承当这部分工作,比拟好的解决了扩展性和性能问题。然而,到 2021 年年中,团队开始重点投入私有化部署和火山私有云反对,对于 Flink 集群的依赖引入了可维护性的痛点。
在认真的剖析了应用场景和需要,并调研了现成的解决方案后,咱们决定投入人力自研一个音讯解决框架。以后这个框架很好的反对了字节外部以及 ToB 场景中 Data Catalog 对于音讯生产和解决的场景。
本文会具体介绍框架解决的问题,整体的设计,以及实现中的要害决定。
需要定义
应用上面的表格将具体场景定义分明。
相干工作
在启动自研之前,咱们评估了两个比拟相干的计划,别离是 Flink 和 Kafka Streaming。
Flink 是咱们之前生产上应用的计划,在能力上是符合要求的,最次要的问题是长期的可维护性。在私有云场景,那个阶段 Flink 服务在火山云上还没有公布,咱们本人的服务又有严格的工夫线,所以必须思考代替;在私有化场景,咱们不确认客户的环境肯定有 Flink 集群,即便部署的数据底座中带有 Flink,后续的保护也是个头疼的问题。另外一个角度,作为通用流式解决框架,Flink 的大部分性能其实咱们并没有用到,对于单条音讯的流转门路,其实只是简略的读取和解决,应用 Flink 有些“杀鸡用牛刀”了。
另外一个比拟规范的计划是 Kafka Streaming。作为 Kafka 官网提供的框架,对于流式解决的语义有较好的反对,也满足咱们对于轻量的诉求。最终没有采纳的次要思考点是两个:
- 对于 Offset 的保护不够灵便:咱们的场景不能应用主动提交(会丢音讯),而对于同一个 Partition 中的数据又要求肯定水平的并行处理,应用 Kafka Streaming 的原生接口较难反对。
- 与 Kafka 强绑定:大部分场景下,咱们团队不是元数据音讯队列的拥有者,也有团队应用 RocketMQ 等提供元数据变更,在应用层,咱们心愿应用同一套框架兼容。
设计
概念阐明
- MQ Type:Message Queue 的类型,比方 Kafka 与 RocketMQ。后续内容以 Kafka 为主,设计肯定水平兼容其余 MQ。
- Topic:一批音讯的汇合,蕴含多个 Partition,能够被多个 Consumer Group 生产。
- Consumer Group:一组 Consumer,同一 Group 内的 Consumer 数据不会反复生产。
- Consumer:生产音讯的最小单位,属于某个 Consumer Group。
- Partition:Topic 中的一部分数据,同一 Partition 内音讯有序。同一 Consumer Group 内,一个 Partition 只会被其中一个 Consumer 生产。
- Event:由 Topic 中的音讯转换而来,局部属性如下。
- Event Type:音讯的类型定义,会与 Processor 有对应关系;
- Event Key:蕴含音讯 Topic、Partition、Offset 等元数据,用来对音讯进行 Hash 操作;
- Processor:音讯解决的单元,针对某个 Event Type 定制的业务逻辑。
- Task:生产音讯并解决的一条 Pipeline,Task 之间资源是互相独立的。
框架架构
整个框架次要由 MQ Consumer, Message Processor 和 State Manager 组成。
- MQ Consumer:负责从 Kafka Topic 拉取音讯,并依据 Event Key 将音讯投放到外部队列,如果音讯须要延时生产,会被投放到对应的延时队列;该模块还负责定时查问 State Manager 中记录的音讯状态,并依据返回提交音讯 Offset;上报与音讯生产相干的 Metric。
- Message Processor:负责从队列中拉取音讯并异步进行解决,它会将音讯的处理结果更新给 State Manager,同时上报与音讯解决相干的 Metric。
- State Manager:负责保护每个 Kafka Partition 的音讯状态,并裸露以后应提交的 Offset 信息给 MQ Consumer。
实现
线程模型
每个 Task 能够运行在一台或多台实例,倡议部署到多台机器,以取得更好的性能和容错能力。
每台实例中,存在两组线程池:
- Consumer Pool:负责管理 MQ Consumer Thread 的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个 Thread 平安退出或者超时进行。整体无效 Thread 的下限与 Topic 的 Partition 的总数无关。
- Processor Pool:负责管理 Message Processor Thread 的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个 Thread 平安退出或者超时进行。能够依据 Event Type 所须要解决的并行度来灵便配置。
两类 Thread 的性质别离如下:
- Consumer Thread:每个 MQ Consumer 会封装一个 Kafka Consumer,能够生产 0 个或者多个 Partition。依据 Kafka 的机制,当 MQ Consumer Thread 的个数超过 Partition 的个数时,以后 Thread 不会有理论流量。
- Processor Thread:惟一对应一个外部的队列,并以 FIFO 的形式生产和解决其中的音讯。
StateManager
在 State Manager 中,会为每个 Partition 保护一个优先队列(最小堆),队列中的信息是 Offset,两个优先队列的职责如下:
- 解决中的队列:一条音讯转化为 Event 后,MQ Consumer 会调用 StateManager 接口,将音讯 Offset 插入该队列。
- 解决完的队列:一条音讯解决完结或最终失败,Message Processor 会调用 StateManager 接口,将音讯 Offset 插入该队列。
MQ Consumer 会周期性的查看以后能够 Commit 的 Offset,状况枚举如下:
- 解决中的队列堆顶 < 解决完的队列堆顶或者解决完的队列为空:代表以后生产回来的音讯还在处理过程中,本轮不做 Offset 提交。
- 解决中的队列堆顶 = 解决完的队列堆顶:示意以后音讯曾经解决完,两边同时出队,并记录以后堆顶为可提交的 Offset,反复查看过程。
- 解决中的队列堆顶 > 解决完的队列堆顶:异常情况,通常是数据回放到某些中间状态,将解决完的队列堆顶出堆。
留神:当产生 Consumer 的 Rebalance 时,须要将对应 Partition 的队列清空
KeyBy 与 Delay Processing 的反对
因源头的 Topic 和音讯格局有可能不可管制,所以 MQ Consumer 的职责之一是将音讯对立封装为 Event。
依据需要,会从原始音讯中拼装出 Event Key,对 Key 取 Hash 后,雷同后果的 Event 会进入同一个队列,能够保障分区内的此类事件处理程序的稳固,同时将音讯的生产与处了解耦,反对增大外部队列数量来减少吞吐。
Event 中也反对设置是否提早解决属性,能够依据 Event Time 提早固定工夫后处理,须要被提早解决的事件会被发送到有界提早队列中,有界提早队列的实现继承了 DelayQueue,限度 DelayQueue 长度, 达到限定值入队会被阻塞。
异样解决
Processor 在音讯处理过程中,可能遇到各种异常情况,设计框架的动机之一就是为业务逻辑的编写者屏蔽掉这种复杂度。Processor 相干框架的逻辑会与 State Manager 合作,解决异样并充沛裸露状态。比拟典型的异常情况以及解决策略如下:
- 解决音讯失败:主动触发重试,重试到用户设置的最大次数或默认值后会将音讯失败状态告诉 State Manager。
- 解决音讯超时:超时对于吞吐影响较大,且通常重试的成果不显著,因而以后策略是不会对音讯重试,间接告诉 State Manager 音讯解决失败。
- 解决音讯较慢:上游 Topic 存在 Lag,Message Consumer 生产速率大于 Message Processor 解决速率时,音讯会沉积在队列中,达到队列最大长度,Message Consumer 会被阻塞在入队操作,进行拉取音讯,相似 Flink 框架中的背压。
监控
为了不便运维,在框架层面裸露了一组监控指标,并反对用户自定义 Metrics。其中默认反对的 Metrics 如下表所示:
线上运维 Case 举例
理论生产环境运行时,偶然须要做些运维操作,其中最常见的是音讯沉积和音讯重放。
对于 Conusmer Lag 这类问题的解决步骤大抵如下:
- 查看 Enqueue Time,Queue Length 的监控确定服务内队列是否有沉积。
- 如果队列有沉积,查看 Process Time 指标,确定是否是某个 Processor 解决慢,如果是,依据指标中的 Tag 确定事件类型等属性特色,判断业务逻辑或者 Key 设置是否正当;全副 Processor 解决慢,能够通过减少 Processor 并行度来解决。
- 如果队列无沉积,排除网络问题后,能够思考减少 Consumer 并行度至 Topic Partition 下限。
音讯重放被触发的起因通常有两种,要么是业务上须要重放局部数据做补全,要么是遇到了事变须要修复数据。为了应答这种需要,咱们在框架层面反对了依据工夫戳重置 Offset 的能力。具体操作时的步骤如下:
- 应用服务测裸露的 API,启动一台实例应用新的 Consumer GroupId: {newConsumerGroup} 从某个 startupTimestamp 开始生产
- 更改全副配置中的 Consumer GroupId 为 {newConsumerGroup}
- 分批重启所有实例
总结
为了解决字节数据中台 DataLeap 中 Data Catalog 零碎生产近实时元数据变更的业务场景,咱们自研了轻量级音讯解决框架。以后该框架已在字节外部生产环境稳固运行超过 1 年,并反对了火山引擎上的数据地图服务的元数据同步场景,满足了咱们团队的需要。
下一步会依据优先级排期反对 RocketMQ 等其余音讯队列,并继续优化配置动静更新,监控报警,运维自动化等方面。
立刻跳转火山引擎大数据研发治理套件 DataLeap 官网理解详情