乐趣区

关于Flink:Apache-Flink-在汽车之家的应用与实践

本文整顿自汽车之家实时计算平台负责人邸星星在 Flink Forward Asia 2020 分享的议题《Apache Flink 在汽车之家的利用及实际》。次要内容包含:

  1. 背景及现状
  2. AutoStream 平台
  3. 基于 Flink 的实时生态建设
  4. 后续布局

一、背景及现状

1. 第一阶段

在 2019 年之前,汽车之家的大部分实时业务都是运行在 Storm 之上的。Storm 作为晚期支流的实时计算引擎,凭借简略的 Spout 和 Bolt 编程模型以及集群自身的稳定性,俘获了少量用户,咱们在 2016 年搭建了 Storm 平台。

随着实时计算的需要日渐增多,数据规模逐渐增大,Storm 在开发及保护老本上都凸显了有余,这里列举几个痛点:

  1. 开发成本高

    咱们始终是用的 Lambda 架构,会用 T+1 的离线数据修改实时数据,即最终以离线数据为准,所以计算口径实时要和离线齐全保持一致,实时数据开发的需要文档就是离线的 SQL,实时开发人员的外围工作就是把离线的 SQL 翻译成 Storm 代码,期间尽管封装了一些通用的 Bolt 来简化开发,但把离线动辄几百行的 SQL 精准地翻译成代码还是很有挑战的,并且每次运行都要通过打包、上传、重启的一系列的繁琐操作,调试老本很高。

  1. 计算低效

    Storm 对状态反对的不好,通常须要借助 Redis、HBase 这类 kv 存储保护中间状态,咱们之前是强依赖 Redis。比方常见的计算 UV 的场景,最简略的方法是应用 Redis 的 sadd 命令判断 uid 是否为曾经存在,但这种办法会带来很高的网络 IO,同时如果没有提前报备的大促或搞流动导致流量翻倍的状况,很容易把 Redis 内存搞满,运维同学也会被杀个措手不及。同时 Redis 的吞吐能力也限度了整个作业的吞吐量。

  1. 难以保护、治理

    因为采纳编写 Storm 代码形式开发,难以剖析元数据及血缘关系,同时可读性差,计算口径不通明,业务交接老本很高。

  1. 对数仓不敌对

    数据仓库团队是间接对接业务需要的团队,他们更相熟基于 Hive 的 SQL 开发模式,通常都不善于 Storm 作业的开发,这导致一些本来是实时的需要,只能退而求其次抉择 T+1 的形式给出数据。

在这个阶段,咱们反对了最根本的实时计算需要,因为开发门槛比拟高,很多实时业务都是由咱们平台开发来实现,既做平台,又做数据开发,精力扩散很重大。

2. 第二阶段

咱们从 2018 年开始调研 Flink 引擎,其绝对齐备的 SQL 反对,天生对状态的反对吸引了咱们,在通过学习调研后,2019 年初开始设计开发 Flink SQL 平台,并于 2019 年中上线了 AutoStream 1.0 平台。平台上线之初就在仓库团队、监控团队和运维团队得以利用,可能疾速被用户次要得益于以下几点:

  1. 开发、保护成本低:汽车之家大部分的实时工作能够用 Flink SQL + UDF 实现。平台提供罕用的 Source 和 Sink,以及业务开发罕用的 UDF,同时用户能够本人编写 UDF。基于 “SQL + 配置 ” 的形式实现开发,能够满足大部分需要。对于自定义工作,咱们提供方便开发应用的 SDK,助力用户疾速开发自定义 Flink 工作。平台面向的用户曾经不只是业余的数据开发人员了,一般开发、测试、运维人员通过根本的学习都能够在平台上实现日常的实时数据开发工作,实现平台赋能化。数据资产可治理,SQL 语句自身是结构化的,咱们通过解析一个作业的 SQL,联合 source、sink 的 DDL,能够很容易的晓得这个作业的上下游,人造保留血缘关系。
  1. 高性能:Flink 能够齐全基于状态 (内存,磁盘) 做计算,比照之前依赖内部存储做计算的场景,性能晋升巨。在 818 流动压测期间,革新后的程序能够轻松反对原来几十倍流量的实时计算,且横向扩大性能非常良好。
  1. 全面的监控报警:用户将工作托管在平台上,工作的存续由平台负责,用户专一于工作自身的逻辑开发自身即可。对于 SQL 工作,SQL 的可读性极高,便于保护;对于自定义工作,基于咱们 SDK 开发,用户能够更专一于梳理业务逻辑上。不论是 SQL 工作还是 SDK,咱们都内嵌了大量监控,并与报警平台关联,不便用户疾速发现剖析定位并修复工作,进步稳定性。
  1. 赋能业务:反对数仓分层模型,平台提供了良好的 SQL 反对,数仓人员能够借助 SQL,将离线数仓的建设教训利用于实时数仓的建设上,自平台上线后,数仓逐渐开始对接实时计算需要。

痛点:

  1. 易用性有待进步,比方用户无奈自助治理 UDF,只能应用平台内置的 UDF 或者把打好的 jar 包发给平台管理员,通过人工的形式解决上传问题。
  1. 随着平台作业量的高速增长,平台 on-call 老本十分高。首先咱们常常面对一些新用户的根底问题:

    1. 平台的应用问题;
    2. 开发过程中遇到的问题,比方为什么打包报错;
    3. Flink UI 的应用问题;
    4. 监控图形的含意,如何配置报警。

还有一些不太容易疾速给出答案的问题:

  1. Jar 包抵触;
  2. 为什么生产 Kafka 提早;
  3. 工作为什么报错。

尤其是提早问题,咱们常见的数据歪斜,GC,反压问题能够间接疏导用户去 Flink UI 和监控图表下来查看,但有时候还是须要手动去服务器上查看 jmap、jstack 等信息,有时候还须要生成火焰图来帮忙用户定位性能问题。

初期咱们没有和经营团队单干,齐全是咱们开发人员间接对接解决这些问题,尽管期间补充了大量的文档,然而整体上 on-call 老本还是很高。

  1. 在 Kafka 或 Yarn 呈现故障时,没有疾速复原的计划,当面对一些重保业务时,有些顾此失彼。家喻户晓,没有永远稳固,不出故障的环境或组件,当有重大故障呈现时,须要有疾速复原业务的应答计划。
  1. 资源没有正当管控,存在比较严重的资源节约的状况。随着应用平台开发工作的用户一直减少,平台的作业数也一直减少。有些用户不能很好的把控集群资源的应用,经常出现过多申请资源的问题,导致作业运行效率低下甚至闲暇,造成了资源的节约。

在 AutoStream1.0 平台这个阶段,基于 SQL 开发的形式极大地升高了实时开发的门槛,各业务方能够本人实现实时业务的开发,同时数仓同学通过简略的学习后,就开始对接实时业务,将咱们平台方从大量的业务需要中释放出来,让咱们能够分心做平台方面的工作。

3. 以后阶段

针对下面的几个方面,咱们有针对性行的做了以下几点降级:

  1. 引入 Jar Service:反对用户自助上传 UDF jar 包,并在 SQL 片段中自助援用,实现自助治理 UDF。同时自定义作业也能够配置 Jar Service 中的 Jar,面对多个作业共用同一个 Jar 的场景,用户只须要在作业中配置 Jar Service 中的 jar 包门路就能够,防止每次上线都反复上传 Jar 的繁琐操作;
  2. 自助诊断:咱们开发了动静调整日志级别、自助查看火焰图等性能,不便用户本人定位问题,加重咱们日常 on-call 老本;
  3. 作业健康检查性能:从多个维度剖析,为每个 Flink 作业打分,每个低分项都相应的给出倡议;
  4. Flink 作业级别的疾速容灾复原:咱们建设了两套 YARN 环境,每一个 YARN 对应一个独自的 HDFS,两个 HDFS 之前通过 SNAPSHOT 形式进行 Checkpoint 数据的双向复制,同时在平台上减少了切换集群的性能,在一个 YARN 集群不可用的状况下,用户能够自助在平台上,抉择备用集群的 Checkpoint;
  5. Kafka 多集群架构反对:应用咱们自研的 Kafka SDK,反对疾速切换 Kafka 集群;
  6. 对接估算零碎:每个作业占用的资源都间接对应到估算团队,这样肯定水平上保障资源不会被其余团队占用,同时每个团队的估算管理员能够查看估算应用明细,理解本人的估算反对了团队内的哪些业务。

目前用户对平台的应用曾经趋于相熟,同时自助健康检查和自助诊断等性能的上线,咱们平台方的日常 on-call 频率在逐渐升高,开始逐步进入平台建设的良性循环阶段。

4. 利用场景

汽车之家用于做实时计算的数据次要分为三类:

  1. 客户端日志,也就是咱们外部说的点击流日志,包含用户端上报的启动日志、时长日志、PV 日志、点击日志以及各类事件日志,这类次要是用户行为日志,是咱们建设实时数仓中流量宽表、UAS 零碎、实时画像的根底,在这之上还反对了智能搜寻、智能举荐等在线业务;同时根底的流量数据也用于反对各业务线的流量剖析、实时成果统计,反对日常经营决策。
  2. 服务端日志,包含 nginx 日志、各类后端利用产生的日志、各种中间件的日志。这些日志数据次要用于后端服务的衰弱监测、性能监控等场景。
  3. 业务库的实时变更记录,次要有三种:MySQL 的 binlog,SQLServer 的 CDC,TiDB 的 TiCDC 数据,基于这些实时的数据变更记录,咱们通过对各种内容数据的形象与标准,建设了内容中台、资源池等根底服务;也有一些做简略逻辑的业务数据实时统计场景,后果数据用于实时大屏、罗盘等,做数据展示。

以上这三类数据都会实时写入 Kafka 集群,在 Flink 集群中针对不同场景进行计算,后果数据写入到 Redis、MySQL、Elasticsearch、HBase、Kafka、Kylin 等引擎中,用于反对下层利用。

上面列举了一些利用场景:

5. 集群规模

目前 Flink 集群服务器 400+,部署模式为 YARN (80%) 和 Kubernetes,运行作业数 800+,日计算量 1 万亿,峰值每秒解决数据 2000 万条。

二、AutoStream 平台

1. 平台架构

下面是 AutoStream 平台目前的整体架构,次要是以下几局部内容:

  1. AutoStream core System

    这是咱们平台的外围服务,负责对元数据服务、Flink 客户端服务、Jar 治理服务及交互后果查问服务进行整合,通过前端页面把平台性能裸露给用户。

次要包含 SQL 和 Jar 作业的治理、库表信息的治理、UDF 治理、操作记录及历史版本的治理、健康检查、自助诊断、报警治理等模块,同时提供对接内部零碎的能力,反对其余零碎通过接口方式治理库表信息、SQL 作业信息及作业启停操作等。基于 Akka 工作的生命周期治理和调度零碎提供了高效,简略,低提早的操作保障,晋升了用户应用的效率和易用性。

  1. 元数据服务 (Catalog-like Unified Metastore)

    次要对应 Flink Catalog 的后端实现,除了反对根本的库表信息管理外,还反对库表粒度的权限管制,联合咱们本身的特点,反对用户组级别的受权。

底层咱们提供了 Plugin Catalog 机制,既能够用于和 Flink 已有的 Catalog 实现做集成,也能够不便咱们嵌入自定义的 Catalogs,通过 Plugin 机制能够很容易的重用 HiveCatalog,JdbcCatalog 等,从而保障了库表的周期的一致性。

同时元数据服务还负责对用户提交的 DML 语句进行解析,辨认以后作业的依赖的表信息,用于作业的剖析及提交过程,同时能够记录血缘关系。

  1. Jar Service

    平台提供的各类 SDK 在 Jar Service 上进行对立治理,同时用户也能够在平台上把自定义 Jar、UDF jar 等提交到 Jar Service 上对立治理,而后在作业中通过配置或 DDL 援用。

  1. Flink 客户端服务 (Customed Flink Job Client)

    负责把平台上的作业转化成 Flink Job 提交到 Yarn 或 Kubernetes 上,咱们在这一层针对 Yarn 和 Kubernetes 做了形象,对立两种调度框架的行为,对外裸露对立接口及规范化的参数,弱化 Yarn 和 Kubernetes 的差别,为 Flink 作业在两种框架上无缝切换打下了良好的根底。

每个作业的依赖不尽相同,咱们除了对根底依赖的治理以外,还须要反对个性化的依赖。比方不同版本的 SQL SDK,用户自助上传的 Jar、UDF 等,所以不同作业的提交阶段须要做隔离。

咱们采纳的是 Jar service + 过程隔离的形式,通过和 Jar Service 对接,依据作业的类型和配置,选用相应的 Jar,并且提交独自的过程中执行,实现物理隔离。

  1. 后果缓存服务 (Result Cache Serivce)

    是一个繁难的缓存服务,用于 SQL 作业开发阶段的在线调试场景。当咱们剖析出用户的 SQL 语句,将 Select 语句的后果集存入缓存服务中;而后用户能够在平台上通过抉择 SQL 序号 (每个残缺的 SELECT 语句对应一个序号),实时查看 SQL 对应的后果数据,不便用户开发与剖析问题。

  1. 内置 Connectors (Source & Sink)

    最右侧的局部次要是各种 Source、Sink 的实现,有一些是重用 Flink 提供的 connector,有一些是咱们本人开发的 connector。

针对每一种 connector 咱们都增加了必要 Metric,并配置成独自的监控图表,不便用户理解作业运行状况,同时也为定位问题提供数据根据。

2. 基于 SQL 的开发流程

在平台提供以上性能的根底上,用户能够疾速的实现 SQL 作业的开发:

  1. 创立一个 SQL 工作;
  2. 编写 DDL 申明 Source 和 Sink;
  3. 编写 DML,实现次要业务逻辑的实现;
  4. 在线查看后果,若数据合乎预期,增加 INSERT INTO 语句,写入到指定 Sink 中即可。

平台默认会保留 SQL 每一次的变更记录,用户能够在线查看历史版本,同时咱们会记录针对作业的各种操作,在作业维护阶段能够帮忙用户追溯变更历史,定位问题。

上面是一个 Demo,用于统计当天的 PV、UV 数据:

3. 基于 Catalog 的元数据管理

元数据管理的次要内容:

  1. 反对权限管制:除了反对根本的库表信息管理外,还反对表粒度的权限管制,联合咱们本身的特点,反对用户组级别的受权;
  2. Plugin Catalog 机制:能够组合多种其余 Catalog 实现,复用已有的 Catalog;
  3. 库表生命周期行为对立:用户能够抉择平台上的表和底层存储的生命周期对立,防止两边别离保护,反复建表;
  4. 新老版本齐全兼容:因为在 AutoStream 1.0 的时候,咱们没有独自引入 Metastore 服务,此外 1.0 期间的 DDL SQL 解析模块是自研的组件。所以在建设 MetaStore 服务时,须要思考历史作业和历史库表信息兼容的问题。

    1. 对于库表信息,新的 MetaStore 在底层将新版和旧版的库表信息转换成对立的存储格局,从而保障了库表信息的兼容性。
    2. 对于作业,这里咱们通过形象接口,并别离提供 V1Service 和 V2Service 两种实现门路,保障了新老作业在用户层面的兼容。

上面是几个模块和 Metastore 交互的示意图:

4. UDXF 治理

咱们引入了 Jar Service 服务用来治理各种 Jar,包含用户自定义作业、平台外部 SDK 组件、UDXF 等,在 Jar Service 根底上咱们能够很容易的实现 UDXF 的自助治理,在 On k8s 的场景下,咱们提供了对立的镜像,Pod 启动后会从 Jar Service 下载对应的 Jar 到容器外部,用于反对作业的启动。

用户提交的 SQL 中如果蕴含 Function DDL,咱们会在 Job Client Service 中会解析 DDL,下载对应的 Jar 到本地。

为了防止和其余作业有依赖抵触,咱们每次都会独自启动一个子过程来实现作业提交的操作。UDXF Jar 会被并退出到 classpath 中,咱们对 Flink 做了一些批改,作业提交时会把这个 Jar 一并上传到 HDFS 中;同时 AutoSQL SDK 会依据函数名称和类名为以后作业注册 UDF。

5. 监控报警及日志收集

得益于 Flink 欠缺的 Metric 机制,咱们能够不便的增加 Metric,针对 Connector,咱们内嵌了丰盛的 Metric,并配置了默认的监控看板,通过看板能够查看 CPU、内存、JVM、网络传输、Checkpoint、各种 Connector 的监控图表。同时平台和公司的云监控零碎对接,主动生成默认的报警策略,监控存活状态、生产提早等要害指标。同时用户能够在云监控零碎批改默认的报警策略,增加新的报警项实现个性化监控报警。

日志通过云 Filebeat 组件写入到 Elasticsearch 集群,同时凋谢 Kibana 供用户查问。

整体的监控报警及日志收集架构如下:

6. 健康检查机制

随着作业数的高速增长,呈现了很多资源应用不合理的状况,比方后面提到的资源节约的状况。用户大多时候都是在对接新需要,反对新业务,很少回过头来评估作业的资源配置是否正当,优化资源应用。所以平台布局了一版老本评估的模型,也就是当初说的健康检查机制,平台每天会针对作业做多维度的衰弱评分,用户能够随时在平台上查看单个作业的得分状况及最近 30 天的得分变动曲线。

低分作业会在用户登录平台时进行提醒,并且定期发邮件揭示用户进行优化、整改,在优化作业后用户能够被动触发从新评分,查看优化成果。

咱们引入了多维度,基于权重的评分策略,针对 CPU、内存使用率、是否存在闲暇 Slot、GC 状况、Kafka 生产提早、单核每秒解决数据量等多个维度的指标联合计算拓补图进行剖析评估,最终产生一个综合分。

每个低分项都会显示低分的起因及参考范畴,并显示一些领导倡议,辅助用户进行优化。

咱们新增了一个 Metric,用一个 0%~100% 的数字体现 TaskManagner CPU 利用率。这样用户能够直观的评估 CPU 是否存在节约的状况。

上面是作业评分的大抵流程:首先咱们会收集和整顿运行作业的根本信息和 Metrics 信息。而后利用咱们设定好的规定,失去根本评分和根底倡议信息。最初将得分信息和倡议整合,综合评判,得出综合得分和最终的报告。用户能够通过平台查看报告。对于得分较低的作业,咱们会发送报警给作业的归属用户。

7. 自助诊断

如之前提到的痛点,用户定位线上问题时,只能求助于咱们平台方,造成咱们 on-call 工作量很大,同时用户体验也不好,鉴于此,所以咱们上线了以下性能:

  1. 动静批改日志级别:咱们借鉴了 Storm 的批改日志级别的形式,在 Flink 上实现了相似性能,通过扩大 REST API 和 RPC 接口的办法,反对批改指定 Logger 的到某一日志级别,并反对设置一个过期工夫,当过期后,改 Logger 的日志会从新复原为 INFO 级别;
  2. 反对自助查看线程栈和堆内存信息:Flink UI 中曾经反对在线查看线程栈 (jstack),咱们间接复用了这个接口;还额定减少了查看堆内存 (jmap) 的接口,不便用户在线查看;
  3. 反对在线生成、查看火焰图:火焰图是定位程序性能问题的一大利器,咱们利用了阿里的 arthas 组件,为 Flink 减少了在线查看火焰图的能力,用户遇到性能问题时,能够疾速评估性能瓶颈。

8. 基于 Checkpoint 复制的疾速容灾

当实时计算利用在重要业务场景时,单个 Yarn 集群一旦呈现故障且短期内不可复原,那么可能会对业务造成较大影响。

在此背景下,咱们建设了 Yarn 多集群架构,两个独立的 Yarn 各自对应一套独立的 HDFS 环境,checkpoint 数据定期在两个 HDFS 间互相复制。目前 checkpoint 复制的提早稳固在 20 分钟内。

同时,在平台层面,咱们把切换集群的性能间接凋谢给用户,用户能够在线查看 checkpoint 的复制状况,抉择适合的 checkpoint 后 (当然也能够抉择不从 checkpoint 复原) 进行集群切换,而后重启作业,实现作业在集群间的绝对平滑的迁徙。

三、基于 Flink 的实时生态建设

AutoStream 平台的外围场景是反对实时计算开发人员的应用,使实时计算开发变得简略高效、可监控、易运维。同时随着平台的逐步完善,咱们开始摸索如何对 AutoStream 平台进行重用,如何让 Flink 利用在更多场景下。重用 AutoStream 有以下几点劣势:

  1. Flink 自身是优良的分布式计算框架,有着较高的计算性能,良好的容错能力和成熟的状态管理机制,社区蓬勃发展,性能及稳定性有保障;
  2. AutoStream 有着欠缺的监控和报警机制,作业运行在平台上,无需独自对接监控零碎,同时 Flink 对 Metric 反对很敌对,能够不便的增加新的 Metric;
  3. 大量的技术积淀和经营教训,通过两年多的平台建设,咱们在 AutoStream 上曾经实现了较为欠缺的 Flink 作业全生命周期的治理,并建设了 Jar Service 等根底组件,通过简略的下层接口包装,就能够对接其余零碎,让其余零碎具备实时计算的能力;
  4. 反对 Yarn 和 Kubernetes 部署。

基于以上几点,咱们在建设其余零碎时,优先重用 AutoStream 平台,以接口调用的形式进行对接,将 Flink 作业全流程的生命周期,齐全托管给 AutoStream 平台,各零碎优先思考实现本身的业务逻辑即可。

咱们团队内的 AutoDTS (接入及散发工作) 和 AutoKafka (Kafka 集群复制) 零碎目前就是依靠于 AutoStream 建设的。简略介绍一下集成的形式,以 AutoDTS 为例:

  1. 把工作 Flink 化,AutoDTS 上的接入、散发工作,都是以 Flink 作业的模式存在;
  2. 和 AutoStream 平台对接,调用接口实现 Flink 作业的创立、批改、启动、进行等操作。这里 Flink 作业既能够是 Jar,也能够是 SQL 作业;
  3. AutoDTS 平台依据业务场景,建设个性化的前端页面,个性化的表单数据,表单提交后,能够将表单数据存储到 MySQL 中;同时须要把作业信息以及 Jar 包地址等信息组装成 AutoStream 接口定义的格局,通过接口调用在 AutoStream 平台主动生成一个 Flink 工作,同时保留这个 Flink 工作的 ID;
  4. 启动 AutoDTS 的一个接入工作,间接调用 AutoStream 接口就实现了作业的启动。

1. AutoDTS 数据接入散发平台

AutoDTS 零碎次要蕴含两局部性能:

  1. 数据接入:将数据库中的变更数据 (Change log) 实时写入到 Kafka;
  2. 数据散发:将接入到 Kafka 的数据,实时写入到其余存储引擎。

1.1 AutoDTS 数据接入

上面是数据接入的架构图:

咱们保护了基于 Flink 的数据接入 SDK 并定义了对立的 JSON 数据格式,也就是说 MySQL Binlog,SQL Server、TiDB 的变更数据接入到 Kafka 后,数据格式是统一的,上游业务应用时,基于对立格局做开发,无需关注原始业务库的类型。

数据接入到 Kafka Topic 的同时,Topic 会主动注册为一张 AutoStream 平台上的流表,不便用户应用。

数据接入基于 Flink 建设还有一个额定的益处,就是能够基于 Flink 的准确一次语义,低成本的实现准确一次数据接入,这对反对数据准确性要求很高的业务来说,是一个必要条件。

目前咱们在做把业务表中的全量数据接入 Kafka Topic 中,基于 Kafka 的 compact 模式,能够实现 Topic 中同时蕴含存量数据和增量数据。这对于数据散发场景来说是非常敌对的,目前如果想把数据实时同步到其余存储引擎中,须要先基于调度零碎,接入一次全量数据,而后再开启实时散发工作,进行变更数据的实时散发。有了 Compact Topic 后,能够省去全量接入的操作。Flink1.12 版本曾经对 Compact Topic 做反对,引入 upsert-kafka Connector [1]

[1] https://cwiki.apache.org/conf…

上面是一条样例数据:

默认注册到平台上的流表是 Schemaless 的,用户能够用 JSON 相干的 UDF 获取其中的字段数据。

上面是应用流表的示例:

1.2 AutoDTS 数据散发

咱们曾经晓得,接入到 Kafka 中的数据是能够当做一张流表来应用的,而数据散发工作实质上是把这个流表的数据写入到其余存储引擎,鉴于 AutoStream 平台曾经反对多种 Table Sink (Connector),咱们只须要依据用户填写的上游存储的类型和地址等信息,就能够通过拼装 SQL 来实现数据的散发。

通过间接重用 Connector 的形式,最大化的防止了反复开发的工作。

上面是一个散发工作对应的 SQL 示例:

2. Kaka 多集群架构

Kafka 在理论利用中,有些场景是须要做 Kafka 多集群架构反对的,上面列举几个常见的场景:

  • 数据冗余灾备,实时复制数据到另一个备用集群,当一个 Kafka 集群不可用时,能够让利用切换到备用集群,疾速复原业务;
  • 集群迁徙,当机房合同到期,或者上云时,都须要做集群的迁徙,此时须要把集群数据整体复制到新机房的集群,让业务绝对平滑迁徙;
  • 读写拆散场景,应用 Kafka 时,大多数状况都是读多写少,为保证数据写入的稳定性,能够抉择建设 Kafka 读写拆散集群。

咱们目前建设了 Kafka 多集群架构,和 Flink 相干的次要有两块内容:

  1. Kafka 集群间数据复制的程序运行在 Flink 集群中;
  2. 革新了 Flink Kafka Connector,反对疾速切换 Kafka 集群。

2.1 整体架构

先来看一下 Kafka 集群间的数据复制,这是建设多集群架构的根底。咱们是应用 MirrorMaker2 来实现数据复制的,咱们把 MirrorMaker2 革新成一般的 Flink 作业,运行在 Flink 集群中。

咱们引入了 Route Service 和 Kafka SDK,实现客户端疾速切换拜访的 Kafka 集群。

客户端须要依赖咱们本人公布的 Kafka SDK,并且配置中不再指定 bootstrap.servers 参数,而是通过设置 cluster.code 参数来申明本人要拜访的集群。SDK 会依据 cluster.code 参数,拜访 Route Service 获取集群真正的地址,而后创立 Producer/Consumer 开始生产 / 生产数据。

SDK 会监听路由规定的变动,当须要切换集群时,只须要在 Route Service 后盾切换路由规定,SDK 发现路由集群发生变化时,会重启 Producer/Consumer 实例,切换到新集群。

如果是消费者产生了集群切换,因为 Cluster1 和 Cluster2 中 Topic 的 offset 是不同的,须要通过 Offset Mapping Service 来获取以后 Consumer Group 在 Cluster2 中的 offset,而后从这些 Offset 开始生产,实现绝对平滑的集群切换。

2.2 Kafka 集群间的数据复制

咱们应用 MirrorMaker2 来实现集群间的数据复制,MirrorMaker2 是 Kafka 2.4 版本引入的,具体以下个性:

  • 自动识别新的 Topic 和 Partition;
  • 主动同步 Topic 配置:Topic 的配置会主动同步到指标集群;
  • 主动同步 ACL;
  • 提供 Offset 的转换工具:反对依据源集群、指标集群及 Group 信息,获取到该 Group 在指标集群的中对应的 Offset 信息;
  • 反对扩大黑白名单策略:能够灵便定制,动静失效。

clusters = primary, backup

primary.bootstrap.servers = vip1:9091

backup.bootstrap.servers = vip2:9092

primary->backup.enabled = true

backup->primary.enabled = true

这段配置实现 primary 到 backup 集群的双向数据复制,primary 集群中的 topic1 中的数据会复制到 backup 集群中的 primary.topic1 这个 Topic 中,指标集群的 Topic 命名规定是 sourceCluster.sourceTopicName,能够通过实现 ReplicationPolicy 接口来自定义命名策略。

2.3 MirrorMaker2 相干的 Topic 介绍

  • 源集群中的 Topic

    heartbeats: 存储心跳数据;

    mm2-offset-syncs.targetCluster.internal: 存储源集群 (upstreamOffset) 和指标集群的 offset(downstreamOffset) 对应关系。

  • 指标集群中的 Topic

    mm2-configs.sourceCluster.internal:connect 框架自带,用来存储配置;

    mm2-offsets.sourceCluster.internal:connect 框架自带,用来存储 WorkerSourceTask 以后解决的 offset,mm2 场景下是为了以后数据同步到源集群 topic partition 的哪一个 offset,这个更像是 Flink 的 checkpoint 概念;

    mm2-status.sourceCluster.internal:connect 框架自带,用来存储 connector 状态。

下面三个用的都是 connect runtime 模块中的 KafkaBasedLog 工具类,这个工具类能够读写一个 compact 模式的 topic 数据,此时 MirrorMaker2 把 topic 当作 KV 存储应用。

sourceCluster.checkpoints.internal:记录 sourceCluster consumer group 在以后集群对应的 offset,mm2 会定期从源 kafka 集群读取 topic 对应的 consumer group 提交的 offset,并写到指标集群的 sourceCluster.checkpoints.internal topic 中。

2.4 MirrorMaker2 的部署

上面是 MirrorMaker2 作业运行的流程,在 AutoKafka 平台上创立一个数据复制作业,会调用 AutoStream 平台接口,相应的创立一个 MM2 类型的作业。启动作业时,会调用 AutoStream 平台的接口把 MM2 作业提交到 Flink 集群中运行。

2.5 路由服务

Route Service 负责解决客户端的路由申请,依据客户端的信息匹配适合的路由规定,将最终路由后果,也就是集群信息返回给客户端。

反对基于集群名称、Topic、Group、ClientID 以及客户端自定义的参数灵便配置路由规定。

上面的例子就是将 Flink 作业 ID 为 1234 的消费者,路由到 cluster_a1 集群。

2.6 Kafka SDK

应用原生的 kafka-clients 是无奈和 Route Service 进行通信的,客户端须要依赖咱们提供的 Kafka SDK (汽车之家外部开发的 SDK) 能和 Route Service 通信,实现动静路由的成果。

Kafka SDK 实现了 Producer、Consumer 接口,实质是 kafka-clients 的代理,业务做较少的改变就能够引入 Kafka SDK。

业务依赖 Kafka SDK 后,Kafka SDK 会负责和 Route Service 通信,监听路由变动,当发现路由的集群发生变化时,会 close 以后的 Producer/Consumer,创立新的 Producer/Consumer,拜访新的集群。

此外 Kafka SDK 还负责将 Producer、Consumer 的 metric 对立上报到云监控零碎的 prometheus,通过查看平台事后配置好的仪表盘,能够清晰的看到业务的生产、生产状况。

同时 SDK 会收集一些信息,比方利用名称、IP 端口、过程号等,这些信息能够在 AutoKafka 平台上查到,不便咱们和用户独特定位问题。

2.7 Offset Mapping Service

当 Consumer 的路由发生变化并切换集群时,状况有一些简单,因为目前 MirrorMaker2 是先把数据从源集群生产进去,再写入到指标集群的,同一条数据能够确保写入到指标 topic 的雷同分区,然而 offset 和源集群是不同的。

针对这种 offset 不统一的状况,MirrorMaker2 会生产源集群的 __consumer_offsets 数据,加上指标集群对应的 offset,写入到指标集群的 sourceCluster.checkpoints.internal topic 中。

同时,源集群的 mm2-offset-syncs.targetCluster.internal topic 记录了源集群和指标集群 offset 的映射关系,联合这两个 topic,咱们建设了 Offset Mapping Service 来实现指标集群的 offset 的转换工作。

所以当 Consumer 须要切换集群时,会调用 Offset Mapping Service 的接口,获取到指标集群的 offsets,而后被动 seek 到这些地位开始生产,这样实现绝对平滑的集群切换工作。

2.8 Flink 与 Kafka 多集群架构的集成

因为 Kafka SDK 兼容 kafka-clients 的用法,用户只须要更换依赖,而后设置 cluster.code、Flink.id 等参数即可。

当 Producer/Consumer 产生集群切换后,因为创立了新的 Producer/Consumer 实例,Kafka 的 metric 数据没有从新注册,导致 metric 数据无奈失常上报。咱们在 AbstractMetricGroup 类中减少了 unregister 办法,在监听 Producer/Consumer 的切换事件时,从新注册 kafka metrics 就能够了。

至此咱们实现了 Flink 对 Kafka 多集群架构的反对。

四、后续布局

  1. 目前咱们反对的数据统计类场景大多是基于流量数据或用户行为数据的,这些场景对准确一次的语义要求不高,随着目前社区对 Change Log 反对的逐步完善,同时咱们的数据接入体系是反对准确一次语义的,并且正在做业务表全量接入到 Kafka 的性能,所以后续能够实现准确一次的数据统计,反对交易、线索、金融类的统计需要。
  2. 一些公司曾经提出湖仓一体的理念,数据湖技术的确能够解决一些原有数仓架构的痛点,比方数据不反对更新操作,无奈做到准实时的数据查问。目前咱们在做一些 Flink 和 Iceberg、Hudi 集成的一些尝试,后续会在公司寻找场景并落地。

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群;

第一工夫获取最新技术文章和社区动静,请关注公众号~

退出移动版