关于大数据:火山引擎DataLeap基于Apache-Atlas自研异步消息处理框架

21次阅读

共计 4538 个字符,预计需要花费 12 分钟才能阅读完成。

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

字节数据中台 DataLeap 的 Data Catalog 零碎通过接管 MQ 中的近实时音讯来同步局部元数据。Apache Atlas 对于实时音讯的生产解决不满足性能要求,外部应用 Flink 工作的解决计划在 ToB 场景中也存在诸多限度,所以团队自研了轻量级异步音讯解决框架,反对了字节外部和火山引擎上同步元数据的诉求。本文定义了需要场景,并具体介绍框架的设计与实现。

背景

字节数据中台 DataLeap 的 Data Catalog 零碎基于 Apache Atlas 搭建,其中 Atlas 通过 Kafka 获取内部零碎的元数据变更音讯。在开源版本中,每台服务器反对的 Kafka Consumer 数量无限,在每日百万级音讯体量下,常常有长延时等问题,影响用户体验。在 2020 年底,火山引擎 DataLeap 研发人员针对 Atlas 的音讯生产局部做了重构,将音讯的生产和解决从后端服务中剥离进去,并编写了 Flink 工作承当这部分工作,比拟好的解决了扩展性和性能问题。然而,到 2021 年年中,团队开始重点投入私有化部署和火山私有云反对,对于 Flink 集群的依赖引入了可维护性的痛点。在认真的剖析了应用场景和需要,并调研了现成的解决方案后,火山引擎 DataLeap 研发人员决定投入人力自研一个音讯解决框架。以后这个框架很好的反对了字节外部以及 ToB 场景中 Data Catalog 对于音讯生产和解决的场景。本文会具体介绍框架解决的问题,整体的设计,以及实现中的要害决定。

需要定义

应用上面的表格将具体场景定义分明。

相干工作

在启动自研之前,火山引擎 DataLeap 研发团队评估了两个比拟相干的计划,别离是 Flink 和 Kafka Streaming。Flink 是团队之前生产上应用的计划,在能力上是符合要求的,最次要的问题是长期的可维护性。在私有云场景,那个阶段 Flink 服务在火山云上还没有公布,外部本人的服务又有严格的工夫线,所以必须思考代替;在私有化场景,火山引擎 DataLeap 研发团队不确认客户的环境肯定有 Flink 集群,即便部署的数据底座中带有 Flink,后续的保护也是个头疼的问题。另外一个角度,作为通用流式解决框架,Flink 的大部分性能其实团队并没有用到,对于单条音讯的流转门路,其实只是简略的读取和解决,应用 Flink 有些“杀鸡用牛刀”了。另外一个比拟规范的计划是 Kafka Streaming。作为 Kafka 官网提供的框架,对于流式解决的语义有较好的反对,也满足团队对于轻量的诉求。最终没有采纳的次要思考点是两个:

  • 对于 Offset 的保护不够灵便:外部的场景不能应用主动提交(会丢音讯),而对于同一个 Partition 中的数据又要求肯定水平的并行处理,应用 Kafka Streaming 的原生接口较难反对。
  • 与 Kafka 强绑定:大部分场景下,团队不是元数据音讯队列的拥有者,也有团队应用 RocketMQ 等提供元数据变更,在应用层,团队心愿应用同一套框架兼容。

设计

概念阐明

  • MQ Type:Message Queue 的类型,比方 Kafka 与 RocketMQ。后续内容以 Kafka 为主,设计肯定水平兼容其余 MQ。
  • Topic:一批音讯的汇合,蕴含多个 Partition,能够被多个 Consumer Group 生产。
  • Consumer Group:一组 Consumer,同一 Group 内的 Consumer 数据不会反复生产。
  • Consumer:生产音讯的最小单位,属于某个 Consumer Group。
  • Partition:Topic 中的一部分数据,同一 Partition 内音讯有序。同一 Consumer Group 内,一个 Partition 只会被其中一个 Consumer 生产。
  • Event:由 Topic 中的音讯转换而来,局部属性如下。

    • Event Type:音讯的类型定义,会与 Processor 有对应关系;
    • Event Key:蕴含音讯 Topic、Partition、Offset 等元数据,用来对音讯进行 Hash 操作;
  • Processor:音讯解决的单元,针对某个 Event Type 定制的业务逻辑。
  • Task:生产音讯并解决的一条 Pipeline,Task 之间资源是互相独立的。

框架架构

整个框架次要由 MQ Consumer, Message Processor 和 State Manager 组成。

  • MQ Consumer:负责从 Kafka Topic 拉取音讯,并依据 Event Key 将音讯投放到外部队列,如果音讯须要延时生产,会被投放到对应的延时队列;该模块还负责定时查问 State Manager 中记录的音讯状态,并依据返回提交音讯 Offset;上报与音讯生产相干的 Metric。
  • Message Processor:负责从队列中拉取音讯并异步进行解决,它会将音讯的处理结果更新给 State Manager,同时上报与音讯解决相干的 Metric。
  • State Manager:负责保护每个 Kafka Partition 的音讯状态,并裸露以后应提交的 Offset 信息给 MQ Consumer。
    下一篇将分享此异步音讯框架的实现过程以及线上运维 case 举例。

实现

线程模型

每个 Task 能够运行在一台或多台实例,倡议部署到多台机器,以取得更好的性能和容错能力。每台实例中,存在两组线程池:

  • Consumer Pool:负责管理 MQ Consumer Thread 的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个 Thread 平安退出或者超时进行。整体无效 Thread 的下限与 Topic 的 Partition 的总数无关。
  • Processor Pool:负责管理 Message Processor Thread 的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个 Thread 平安退出或者超时进行。能够依据 Event Type 所须要解决的并行度来灵便配置。
    两类 Thread 的性质别离如下:
  • Consumer Thread:每个 MQ Consumer 会封装一个 Kafka Consumer,能够生产 0 个或者多个 Partition。依据 Kafka 的机制,当 MQ Consumer Thread 的个数超过 Partition 的个数时,以后 Thread 不会有理论流量。
  • Processor Thread:惟一对应一个外部的队列,并以 FIFO 的形式生产和解决其中的音讯。

StateManager

在 State Manager 中,会为每个 Partition 保护一个优先队列(最小堆),队列中的信息是 Offset,两个优先队列的职责如下:

  • 解决中的队列:一条音讯转化为 Event 后,MQ Consumer 会调用 StateManager 接口,将音讯 Offset 插入该队列。
  • 解决完的队列:一条音讯解决完结或最终失败,Message Processor 会调用 StateManager 接口,将音讯 Offset 插入该队列。
    MQ Consumer 会周期性的查看以后能够 Commit 的 Offset,状况枚举如下:
  • 解决中的队列堆顶 < 解决完的队列堆顶或者解决完的队列为空:代表以后生产回来的音讯还在处理过程中,本轮不做 Offset 提交。
  • 解决中的队列堆顶 = 解决完的队列堆顶:示意以后音讯曾经解决完,两边同时出队,并记录以后堆顶为可提交的 Offset,反复查看过程。
  • 解决中的队列堆顶 > 解决完的队列堆顶:异常情况,通常是数据回放到某些中间状态,将解决完的队列堆顶出堆。
    留神:当产生 Consumer 的 Rebalance 时,须要将对应 Partition 的队列清空

KeyBy 与 Delay Processing 的反对

因源头的 Topic 和音讯格局有可能不可管制,所以 MQ Consumer 的职责之一是将音讯对立封装为 Event。
依据需要,会从原始音讯中拼装出 Event Key,对 Key 取 Hash 后,雷同后果的 Event 会进入同一个队列,能够保障分区内的此类事件处理程序的稳固,同时将音讯的生产与处了解耦,反对增大外部队列数量来减少吞吐。
Event 中也反对设置是否提早解决属性,能够依据 Event Time 提早固定工夫后处理,须要被提早解决的事件会被发送到有界提早队列中,有界提早队列的实现继承了 DelayQueue,限度 DelayQueue 长度, 达到限定值入队会被阻塞。

异样解决

Processor 在音讯处理过程中,可能遇到各种异常情况,设计框架的动机之一就是为业务逻辑的编写者屏蔽掉这种复杂度。Processor 相干框架的逻辑会与 State Manager 合作,解决异样并充沛裸露状态。比拟典型的异常情况以及解决策略如下:

  • 解决音讯失败:主动触发重试,重试到用户设置的最大次数或默认值后会将音讯失败状态告诉 State Manager。
  • 解决音讯超时:超时对于吞吐影响较大,且通常重试的成果不显著,因而以后策略是不会对音讯重试,间接告诉 State Manager 音讯解决失败。
  • 解决音讯较慢:上游 Topic 存在 Lag,Message Consumer 生产速率大于 Message Processor 解决速率时,音讯会沉积在队列中,达到队列最大长度,Message Consumer 会被阻塞在入队操作,进行拉取音讯,相似 Flink 框架中的背压。

监控

为了不便运维,在框架层面裸露了一组监控指标,并反对用户自定义 Metrics。其中默认反对的 Metrics 如下表所示:

线上运维 case 举例

理论生产环境运行时,偶然须要做些运维操作,其中最常见的是音讯沉积和音讯重放。
对于 Conusmer Lag 这类问题的解决步骤大抵如下:

  • 查看 Enqueue Time,Queue Length 的监控确定服务内队列是否有沉积。
  • 如果队列有沉积,查看 Process Time 指标,确定是否是某个 Processor 解决慢,如果是,依据指标中的 Tag 确定事件类型等属性特色,判断业务逻辑或者 Key 设置是否正当;全副 Processor 解决慢,能够通过减少 Processor 并行度来解决。
  • 如果队列无沉积,排除网络问题后,能够思考减少 Consumer 并行度至 Topic Partition 下限。
    音讯重放被触发的起因通常有两种,要么是业务上须要重放局部数据做补全,要么是遇到了事变须要修复数据。为了应答这种需要,咱们在框架层面反对了依据工夫戳重置 Offset 的能力。具体操作时的步骤如下:
  • 应用服务测裸露的 API,启动一台实例应用新的 Consumer GroupId: {newConsumerGroup} 从某个 startupTimestamp 开始生产
  • 更改全副配置中的 Consumer GroupId 为 {newConsumerGroup}
  • 分批重启所有实例

总结

为了解决字节数据中台 DataLeap 中 Data Catalog 零碎生产近实时元数据变更的业务场景,团队自研了轻量级音讯解决框架。以后该框架已在字节外部生产环境稳固运行超过 1 年,并反对了火山引擎上的数据地图服务的元数据同步场景,满足了团队的需要。
下一步会依据优先级排期反对 RocketMQ 等其余音讯队列,并继续优化配置动静更新,监控报警,运维自动化等方面。

点击跳转大数据研发治理套件 DataLeap 理解更多

正文完
 0