更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
字节数据中台DataLeap的Data Catalog零碎通过接管MQ中的近实时音讯来同步局部元数据。Apache Atlas对于实时音讯的生产解决不满足性能要求,外部应用Flink工作的解决计划在ToB场景中也存在诸多限度,所以团队自研了轻量级异步音讯解决框架,反对了字节外部和火山引擎上同步元数据的诉求。本文定义了需要场景,并具体介绍框架的设计与实现。

背景

字节数据中台DataLeap的Data Catalog零碎基于Apache Atlas搭建,其中Atlas通过Kafka获取内部零碎的元数据变更音讯。在开源版本中,每台服务器反对的Kafka Consumer数量无限,在每日百万级音讯体量下,常常有长延时等问题,影响用户体验。在2020年底,火山引擎DataLeap研发人员针对Atlas的音讯生产局部做了重构,将音讯的生产和解决从后端服务中剥离进去,并编写了Flink工作承当这部分工作,比拟好的解决了扩展性和性能问题。然而,到2021年年中,团队开始重点投入私有化部署和火山私有云反对,对于Flink集群的依赖引入了可维护性的痛点。在认真的剖析了应用场景和需要,并调研了现成的解决方案后,火山引擎DataLeap研发人员决定投入人力自研一个音讯解决框架。以后这个框架很好的反对了字节外部以及ToB场景中Data Catalog对于音讯生产和解决的场景。本文会具体介绍框架解决的问题,整体的设计,以及实现中的要害决定。

需要定义

应用上面的表格将具体场景定义分明。

相干工作

在启动自研之前,火山引擎DataLeap研发团队评估了两个比拟相干的计划,别离是Flink和Kafka Streaming。Flink是团队之前生产上应用的计划,在能力上是符合要求的,最次要的问题是长期的可维护性。在私有云场景,那个阶段Flink服务在火山云上还没有公布,外部本人的服务又有严格的工夫线,所以必须思考代替;在私有化场景,火山引擎DataLeap研发团队不确认客户的环境肯定有Flink集群,即便部署的数据底座中带有Flink,后续的保护也是个头疼的问题。另外一个角度,作为通用流式解决框架,Flink的大部分性能其实团队并没有用到,对于单条音讯的流转门路,其实只是简略的读取和解决,应用Flink有些“杀鸡用牛刀”了。另外一个比拟规范的计划是Kafka Streaming。作为Kafka官网提供的框架,对于流式解决的语义有较好的反对,也满足团队对于轻量的诉求。最终没有采纳的次要思考点是两个:

  • 对于Offset的保护不够灵便:外部的场景不能应用主动提交(会丢音讯),而对于同一个Partition中的数据又要求肯定水平的并行处理,应用Kafka Streaming的原生接口较难反对。
  • 与Kafka强绑定:大部分场景下,团队不是元数据音讯队列的拥有者,也有团队应用RocketMQ等提供元数据变更,在应用层,团队心愿应用同一套框架兼容。

设计

概念阐明

  • MQ Type:Message Queue的类型,比方Kafka与RocketMQ。后续内容以Kafka为主,设计肯定水平兼容其余MQ。
  • Topic:一批音讯的汇合,蕴含多个Partition,能够被多个Consumer Group生产。
  • Consumer Group:一组Consumer,同一Group内的Consumer数据不会反复生产。
  • Consumer:生产音讯的最小单位,属于某个Consumer Group。
  • Partition:Topic中的一部分数据,同一Partition内音讯有序。同一Consumer Group内,一个Partition只会被其中一个Consumer生产。
  • Event:由Topic中的音讯转换而来,局部属性如下。

    • Event Type:音讯的类型定义,会与Processor有对应关系;
    • Event Key:蕴含音讯Topic、Partition、Offset等元数据,用来对音讯进行Hash操作;
  • Processor:音讯解决的单元,针对某个Event Type定制的业务逻辑。
  • Task:生产音讯并解决的一条Pipeline,Task之间资源是互相独立的。

框架架构


整个框架次要由MQ Consumer, Message Processor和State Manager组成。

  • MQ Consumer:负责从Kafka Topic拉取音讯,并依据Event Key将音讯投放到外部队列,如果音讯须要延时生产,会被投放到对应的延时队列;该模块还负责定时查问State Manager中记录的音讯状态,并依据返回提交音讯Offset;上报与音讯生产相干的Metric。
  • Message Processor:负责从队列中拉取音讯并异步进行解决,它会将音讯的处理结果更新给State Manager,同时上报与音讯解决相干的Metric。
  • State Manager:负责保护每个Kafka Partition的音讯状态,并裸露以后应提交的Offset信息给MQ Consumer。
    下一篇将分享此异步音讯框架的实现过程以及线上运维case举例。

实现

线程模型


每个Task能够运行在一台或多台实例,倡议部署到多台机器,以取得更好的性能和容错能力。每台实例中,存在两组线程池:

  • Consumer Pool:负责管理MQ Consumer Thread的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个Thread平安退出或者超时进行。整体无效Thread的下限与Topic的Partition的总数无关。
  • Processor Pool:负责管理Message Processor Thread的生命周期,当服务启动时,依据配置拉起肯定规模的线程,并在服务敞开时确保每个Thread平安退出或者超时进行。能够依据Event Type所须要解决的并行度来灵便配置。
    两类Thread的性质别离如下:
  • Consumer Thread:每个MQ Consumer会封装一个Kafka Consumer,能够生产0个或者多个Partition。依据Kafka的机制,当MQ Consumer Thread的个数超过Partition的个数时,以后Thread不会有理论流量。
  • Processor Thread:惟一对应一个外部的队列,并以FIFO的形式生产和解决其中的音讯。

StateManager


在State Manager中,会为每个Partition保护一个优先队列(最小堆),队列中的信息是Offset,两个优先队列的职责如下:

  • 解决中的队列:一条音讯转化为Event后,MQ Consumer会调用StateManager接口,将音讯Offset 插入该队列。
  • 解决完的队列:一条音讯解决完结或最终失败,Message Processor会调用StateManager接口,将音讯Offset插入该队列。
    MQ Consumer会周期性的查看以后能够Commit的Offset,状况枚举如下:
  • 解决中的队列堆顶 < 解决完的队列堆顶或者解决完的队列为空:代表以后生产回来的音讯还在处理过程中,本轮不做Offset提交。
  • 解决中的队列堆顶 = 解决完的队列堆顶:示意以后音讯曾经解决完,两边同时出队,并记录以后堆顶为可提交的Offset,反复查看过程。
  • 解决中的队列堆顶 > 解决完的队列堆顶:异常情况,通常是数据回放到某些中间状态,将解决完的队列堆顶出堆。
    留神:当产生Consumer的Rebalance时,须要将对应Partition的队列清空

KeyBy与Delay Processing的反对

因源头的Topic和音讯格局有可能不可管制,所以MQ Consumer的职责之一是将音讯对立封装为Event。
依据需要,会从原始音讯中拼装出Event Key,对Key取Hash后,雷同后果的Event会进入同一个队列,能够保障分区内的此类事件处理程序的稳固,同时将音讯的生产与处了解耦,反对增大外部队列数量来减少吞吐。
Event中也反对设置是否提早解决属性,能够依据Event Time提早固定工夫后处理,须要被提早解决的事件会被发送到有界提早队列中,有界提早队列的实现继承了DelayQueue,限度DelayQueue长度, 达到限定值入队会被阻塞。

异样解决

Processor在音讯处理过程中,可能遇到各种异常情况,设计框架的动机之一就是为业务逻辑的编写者屏蔽掉这种复杂度。Processor相干框架的逻辑会与State Manager合作,解决异样并充沛裸露状态。比拟典型的异常情况以及解决策略如下:

  • 解决音讯失败:主动触发重试,重试到用户设置的最大次数或默认值后会将音讯失败状态告诉State Manager。
  • 解决音讯超时:超时对于吞吐影响较大,且通常重试的成果不显著,因而以后策略是不会对音讯重试,间接告诉State Manager 音讯解决失败。
  • 解决音讯较慢:上游Topic存在Lag,Message Consumer生产速率大于Message Processor解决速率时,音讯会沉积在队列中,达到队列最大长度,Message Consumer 会被阻塞在入队操作,进行拉取音讯,相似Flink框架中的背压。

监控

为了不便运维,在框架层面裸露了一组监控指标,并反对用户自定义Metrics。其中默认反对的Metrics如下表所示:

线上运维case举例

理论生产环境运行时,偶然须要做些运维操作,其中最常见的是音讯沉积和音讯重放。
对于Conusmer Lag这类问题的解决步骤大抵如下:

  • 查看Enqueue Time,Queue Length的监控确定服务内队列是否有沉积。
  • 如果队列有沉积,查看Process Time指标,确定是否是某个Processor解决慢,如果是,依据指标中的Tag 确定事件类型等属性特色,判断业务逻辑或者Key设置是否正当;全副Processor 解决慢,能够通过减少Processor并行度来解决。
  • 如果队列无沉积,排除网络问题后,能够思考减少Consumer并行度至Topic Partition 下限。
    音讯重放被触发的起因通常有两种,要么是业务上须要重放局部数据做补全,要么是遇到了事变须要修复数据。为了应答这种需要,咱们在框架层面反对了依据工夫戳重置Offset的能力。具体操作时的步骤如下:
  • 应用服务测裸露的API,启动一台实例应用新的Consumer GroupId: {newConsumerGroup} 从某个startupTimestamp开始生产
  • 更改全副配置中的 Consumer GroupId 为 {newConsumerGroup}
  • 分批重启所有实例

总结

为了解决字节数据中台DataLeap中Data Catalog零碎生产近实时元数据变更的业务场景,团队自研了轻量级音讯解决框架。以后该框架已在字节外部生产环境稳固运行超过1年,并反对了火山引擎上的数据地图服务的元数据同步场景,满足了团队的需要。
下一步会依据优先级排期反对RocketMQ等其余音讯队列,并继续优化配置动静更新,监控报警,运维自动化等方面。

点击跳转大数据研发治理套件 DataLeap理解更多