关于数据库:DataLeap的Catalog系统近实时消息同步能力优化

58次阅读

共计 4468 个字符,预计需要花费 12 分钟才能阅读完成。

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

摘要

字节数据中台 DataLeap 的 Data Catalog 零碎通过接管 MQ 中的近实时音讯来同步局部元数据。Apache Atlas 对于实时音讯的生产解决不满足性能要求,外部应用 Flink 工作的解决计划在 ToB 场景中也存在诸多限度,所以团队自研了轻量级异步音讯解决框架,很好的反对了字节外部和火山引擎上同步元数据的诉求。本文定义了需要场景,并具体介绍框架的设计与实现。

背景

动机

字节数据中台 DataLeap 的 Data Catalog 零碎基于 Apache Atlas 搭建,其中 Atlas 通过 Kafka 获取内部零碎的元数据变更音讯。在开源版本中,每台服务器反对的 Kafka Consumer 数量无限,在每日百万级音讯体量下,常常有长延时等问题,影响用户体验。

在 2020 年底,咱们针对 Atlas 的音讯生产局部做了重构,将音讯的生产和解决从后端服务中剥离进去,并编写了 Flink 工作承当这部分工作,比拟好的解决了扩展性和性能问题。然而,到 2021 年年中,团队开始重点投入私有化部署和火山私有云反对,对于 Flink 集群的依赖引入了可维护性的痛点。

在认真的剖析了应用场景和需要,并调研了现成的解决方案后,咱们决定投入人力自研一个音讯解决框架。以后这个框架很好的反对了字节外部以及 ToB 场景中 Data Catalog 对于音讯生产和解决的场景。

本文会具体介绍框架解决的问题,整体的设计,以及实现中的要害决定。

需要定义

应用上面的表格将具体场景定义分明。

相干工作

在启动自研之前,咱们评估了两个比拟相干的计划,别离是 Flink 和 Kafka Streaming。

Flink 是咱们之前生产上应用的计划,在能力上是符合要求的,最次要的问题是长期的可维护性。在私有云场景,那个阶段 Flink 服务在火山云上还没有公布,咱们本人的服务又有严格的工夫线,所以必须思考代替;在私有化场景,咱们不确认客户的环境肯定有 Flink 集群,即便部署的数据底座中带有 Flink,后续的保护也是个头疼的问题。另外一个角度,作为通用流式解决框架,Flink 的大部分性能其实咱们并没有用到,对于单条音讯的流转门路,其实只是简略的读取和解决,应用 Flink 有些“杀鸡用牛刀”了。

另外一个比拟规范的计划是 Kafka Streaming。作为 Kafka 官网提供的框架,对于流式解决的语义有较好的反对,也满足咱们对于轻量的诉求。最终没有采纳的次要思考点是两个:

  • 对于 Offset 的保护不够灵便:咱们的场景不能应用主动提交(会丢音讯),而对于同一个 Partition 中的数据又要求肯定水平的并行处理,应用 Kafka Streaming 的原生接口较难反对。
  • 与 Kafka 强绑定:大部分场景下,咱们团队不是元数据音讯队列的拥有者,也有团队应用 RocketMQ 等提供元数据变更,在应用层,咱们心愿应用同一套框架兼容。

设计

概念阐明

  • MQ Type:Message Queue 的类型,比方 Kafka 与 RocketMQ。后续内容以 Kafka 为主,设计肯定水平兼容其余 MQ。
  • Topic:一批音讯的汇合,蕴含多个 Partition,能够被多个 Consumer Group 生产。
  • Consumer Group:一组 Consumer,同一 Group 内的 Consumer 数据不会反复生产。
  • Consumer:生产音讯的最小单位,属于某个 Consumer Group。
  • Partition:Topic 中的一部分数据,同一 Partition 内音讯有序。同一 Consumer Group 内,一个 Partition 只会被其中一个 Consumer 生产。
  • Event:由 Topic 中的音讯转换而来,局部属性如下。
  • Event Type:音讯的类型定义,会与 Processor 有对应关系;
  • Event Key:蕴含音讯 Topic、Partition、Offset 等元数据,用来对音讯进行 Hash 操作;
  • Processor:音讯解决的单元,针对某个 Event Type 定制的业务逻辑。
  • Task:生产音讯并解决的一条 Pipeline,Task 之间资源是互相独立的。

框架架构

整个框架次要由 MQ Consumer, Message Processor 和 State Manager 组成。

  • MQ Consumer:负责从 Kafka Topic 拉取音讯,并依据 Event Key 将音讯投放到外部队列,如果音讯须要延时生产,会被投放到对应的延时队列;该模块还负责定时查问 State Manager 中记录的音讯状态,并依据返回提交音讯 Offset;上报与音讯生产相干的 Metric。
  • Message Processor:负责从队列中拉取音讯并异步进行解决,它会将音讯的处理结果更新给 State Manager,同时上报与音讯解决相干的 Metric。
  • State Manager:负责保护每个 Kafka Partition 的音讯状态,并裸露以后应提交的 Offset 信息给 MQ Consumer。

实现

线程模型

每个 Task 能够运行在一台或多台实例,倡议部署到多台机器,以取得更好的性能和容错能力。

每台实例中,存在两组线程池:

  • Consumer Pool:负责管理 MQ Consumer Thread 的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个 Thread 平安退出或者超时进行。整体无效 Thread 的下限与 Topic 的 Partition 的总数无关。
  • Processor Pool:负责管理 Message Processor Thread 的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个 Thread 平安退出或者超时进行。能够依据 Event Type 所须要解决的并行度来灵便配置。

两类 Thread 的性质别离如下:

  • Consumer Thread:每个 MQ Consumer 会封装一个 Kafka Consumer,能够生产 0 个或者多个 Partition。依据 Kafka 的机制,当 MQ Consumer Thread 的个数超过 Partition 的个数时,以后 Thread 不会有理论流量。
  • Processor Thread:惟一对应一个外部的队列,并以 FIFO 的形式生产和解决其中的音讯。

StateManager

在 State Manager 中,会为每个 Partition 保护一个优先队列(最小堆),队列中的信息是 Offset,两个优先队列的职责如下:

  • 解决中的队列:一条音讯转化为 Event 后,MQ Consumer 会调用 StateManager 接口,将音讯 Offset 插入该队列。
  • 解决完的队列:一条音讯解决完结或最终失败,Message Processor 会调用 StateManager 接口,将音讯 Offset 插入该队列。

MQ Consumer 会周期性的查看以后能够 Commit 的 Offset,状况枚举如下:

  • 解决中的队列堆顶 < 解决完的队列堆顶或者解决完的队列为空:代表以后生产回来的音讯还在处理过程中,本轮不做 Offset 提交。
  • 解决中的队列堆顶 = 解决完的队列堆顶:示意以后音讯曾经解决完,两边同时出队,并记录以后堆顶为可提交的 Offset,反复查看过程。
  • 解决中的队列堆顶 > 解决完的队列堆顶:异常情况,通常是数据回放到某些中间状态,将解决完的队列堆顶出堆。

留神:当产生 Consumer 的 Rebalance 时,须要将对应 Partition 的队列清空

KeyBy 与 Delay Processing 的反对

因源头的 Topic 和音讯格局有可能不可管制,所以 MQ Consumer 的职责之一是将音讯对立封装为 Event。

依据需要,会从原始音讯中拼装出 Event Key,对 Key 取 Hash 后,雷同后果的 Event 会进入同一个队列,能够保障分区内的此类事件处理程序的稳固,同时将音讯的生产与处了解耦,反对增大外部队列数量来减少吞吐。

Event 中也反对设置是否提早解决属性,能够依据 Event Time 提早固定工夫后处理,须要被提早解决的事件会被发送到有界提早队列中,有界提早队列的实现继承了 DelayQueue,限度 DelayQueue 长度, 达到限定值入队会被阻塞。

异样解决

Processor 在音讯处理过程中,可能遇到各种异常情况,设计框架的动机之一就是为业务逻辑的编写者屏蔽掉这种复杂度。Processor 相干框架的逻辑会与 State Manager 合作,解决异样并充沛裸露状态。比拟典型的异常情况以及解决策略如下:

  • 解决音讯失败:主动触发重试,重试到用户设置的最大次数或默认值后会将音讯失败状态告诉 State Manager。
  • 解决音讯超时:超时对于吞吐影响较大,且通常重试的成果不显著,因而以后策略是不会对音讯重试,间接告诉 State Manager 音讯解决失败。
  • 解决音讯较慢:上游 Topic 存在 Lag,Message Consumer 生产速率大于 Message Processor 解决速率时,音讯会沉积在队列中,达到队列最大长度,Message Consumer 会被阻塞在入队操作,进行拉取音讯,相似 Flink 框架中的背压。

监控

为了不便运维,在框架层面裸露了一组监控指标,并反对用户自定义 Metrics。其中默认反对的 Metrics 如下表所示:

线上运维 Case 举例

理论生产环境运行时,偶然须要做些运维操作,其中最常见的是音讯沉积和音讯重放。

对于 Conusmer Lag 这类问题的解决步骤大抵如下:

  • 查看 Enqueue Time,Queue Length 的监控确定服务内队列是否有沉积。
  • 如果队列有沉积,查看 Process Time 指标,确定是否是某个 Processor 解决慢,如果是,依据指标中的 Tag 确定事件类型等属性特色,判断业务逻辑或者 Key 设置是否正当;全副 Processor 解决慢,能够通过减少 Processor 并行度来解决。
  • 如果队列无沉积,排除网络问题后,能够思考减少 Consumer 并行度至 Topic Partition 下限。

音讯重放被触发的起因通常有两种,要么是业务上须要重放局部数据做补全,要么是遇到了事变须要修复数据。为了应答这种需要,咱们在框架层面反对了依据工夫戳重置 Offset 的能力。具体操作时的步骤如下:

  • 应用服务测裸露的 API,启动一台实例应用新的 Consumer GroupId: {newConsumerGroup} 从某个 startupTimestamp 开始生产
  • 更改全副配置中的 Consumer GroupId 为 {newConsumerGroup}
  • 分批重启所有实例

总结

为了解决字节数据中台 DataLeap 中 Data Catalog 零碎生产近实时元数据变更的业务场景,咱们自研了轻量级音讯解决框架。以后该框架已在字节外部生产环境稳固运行超过 1 年,并反对了火山引擎上的数据地图服务的元数据同步场景,满足了咱们团队的需要。

下一步会依据优先级排期反对 RocketMQ 等其余音讯队列,并继续优化配置动静更新,监控报警,运维自动化等方面。

立刻跳转火山引擎大数据研发治理套件 DataLeap 官网理解详情

正文完
 0