01 背景
Lancer 是 B 站的实时流式传输平台,承载全站服务端、客户端的数据上报 / 采集、传输、集成工作,秒级提早,作为数仓入口是 B 站数据平台的生命线。目前每日峰值 5000w/s rps, 3PB/ 天, 4K+ 条流的数据同步能力。
服务如此大的数据规模,对产品的可靠性、可扩展性和可维护性提出了很高的要求。流式传输的实现是一个很有挑战的事件,聚焦快、准、稳的需要, Lancer 整体演进经验了大管道模型、BU 粒度管道模型、单流单作业模型三个阶段的演进,上面咱们娓娓道来。
02 关键词阐明
logid:每个业务方上报的数据流以 logid 进行标识,logid 是数据在传输 + 集成过程中的元信息标识。
数据源:数据进入到 lancer 的入口,例如:log-agent,bfe-agent,flink cdc
lancer-gateway(数据网关):接收数据上报的网关。
数据缓冲层:也叫做外部 kafka,用于解耦数据上报和数据散发。
lancer-collector(数据散发层):也叫做数据同步,能够依据理论场景实现不同端到端的数据同步。
03 技术演进
整个 B 站流式数据传输架构的演进大抵经验了三个阶段。
3.1 架构 V1.0- 基于 flume 的
大管道数据传输架构(2019 之前)
B 站流式传输架构建设之初,数据流量和数据流条数绝对较少,因而采纳了全站的数据流混合在一个管道中进行解决,基于 flume 二次定制化的数据传输架构,架构如下:
- 整个架构从数据生成到落地分为:数据源、数据网关、数据缓冲、数据散发层。
- 数据上报端根本采纳 sdk 的形式间接发送 http 和 grpc 申请上报。
- 数据网关 lancer-gateway 是基于 flume 二次迭代的数据网关,用于承载数据的上报,反对两种协定:http 用于承载公网数据上报(web/app),grpc 用于承载 IDC 内服务端数据上报。
- 数据缓冲层应用 kafka 实现,用于解耦数据上报和数据散发。
- 数据散发层 lancer-collector 同样是基于 flume 二次迭代的数据散发层,用于将数据从缓冲层同步到 ODS。
v1.0 架构在应用中暴露出一些的痛点:
1. 数据源端对于数据上报的可控性和容错性较差,例如:
- 数据网关故障状况下,数据源端短少缓存能力,不能间接反压,存在数据失落隐患。
- 重 SDK:SDK 中须要增加各种适配逻辑以应答上报异常情况
2. 整体架构是一个大管道模型,资源的划分和隔离不明确,整体保护老本高,本身故障隔离性差。
3. 基于 flume 二次迭代的一些缺点:
- 逻辑简单,性能差,咱们须要的性能绝对繁多
- hdfs 散发场景,不反对 exactly once 语义,每次重启,会导致数据大量反复
3.2 架构 V2.0-BU 粒度的
管道化架构(2020-2021)
针对 v1.0 的缺点,咱们引入了架构 v2.0,架构如下:
此架构的要害细节如下:
1. 强化了数据上报源端的边缘可控能力
- 服务器上部署 log-agent 承载服务端数据上报。
- cdn 上部署 bfe-agent 用于承载公网(web 端、app 端)数据上报。
- log-agent/bfe-agent 中集成数据缓冲、预聚合、流控、重试、降级等能力,数据上报 sdk 只需专一数据的生成和上报逻辑。
- agent 端基于 logid 的 BU 属性,将数据路由到不同的管道。
2. 数据管道以 BU 为粒度搭建,管道间资源隔离,每个管道蕴含整套独立的残缺数据传输链路,并且数据管道反对基于 airflow 疾速搭建。故障隔离做到 BU 级别。
3. 数据网关降级到自研 lancer-gateway2.0,逻辑精简,反对流控反压,并且适配 kafka failover,基于 k8s 进行部署。
4. hdfs 散发基于 flink jar 进行实现:反对 exactly once 语义保障。
V2.0 架构绝对于 v1.0,重点晋升了数据上报边缘的可控力、BU 粒度管道间的资源划分和隔离性。然而随着 B 站流式数据传输规模的疾速减少,对数据传输的时效性、老本、品质也提出了越来越高的要求,V2.0 也逐步暴露出了一些缺点:
1. logid 级别隔离性差:
- 单个管道外部某个 logid 流量陡增,几倍甚至几十倍,仍然会造成整个管道的数据散发提早,
- 单个管道内散发层组件故障重启,例如:hdfs 散发对应的 flink jar 作业挂掉重启,从 checkpoint 复原,此管道内所有的 logid 的 hdfs 散发都会存在归档提早隐患。
2. 网关是异步发送模型,极其状况下(组件解体),存在数据失落危险。
3. ods 层部分热点 / 故障影响放大
- 因为散发层一个作业同时散发多个 logid,这种大作业模型更易受到 ods 层部分热点的影响,例如:hdfs 某个 datanode 热点,会导致某个散发作业整体写阻塞,进而影响到此散发作业的其余 logid,kafka 散发同理。
- hdfs 单个文件块的所有正本生效,会导致对应散发工作整体挂掉重启。
4. hdfs 小文件问题放大
- hdfs 散发对应的 flink jar 作业为了保障吞吐,整体设置的并发度绝对较大。因而对于管道内的所有 logid,同一时刻都会关上并发度大小的文件数,对于流量低的 logid,就会造成小文件数量变大的问题。
针对上述痛点,最间接的解决思路就是整体架构做进一步的隔离,以单 logid 为维度实现数据传输 + 散发。面临的挑战次要有以下几个方面:
- 如何保障全链路以 logid 为单位进行隔离,如何在资源应用可控的状况下正当控流并且保障数据流之间的隔离性
- 须要与内部零碎进行大量的交互,如何适配内部零碎的各种问题:部分热点、故障
- 集成作业的数量指数级减少,如何保障高性能、稳定性的同时并且高效的进行治理、运维、品质监控。
3.3 架构 V3.0- 基于 Flink SQL 的
单流单作业数据集成计划
在 V3.0 架构中,咱们对整体传输链路进行了单作业单数据流隔离革新,并且基于 Flink SQL 撑持数据散发场景。架构如下:
相比 v2.0, 资源池容量治理上仍然以 BU 为粒度,然而每个 logid 的传输和散发互相独立,互不影响。具体逻辑如下:
- agent:整体上报 SDK 和 agent 接管 + 发送逻辑依照 logid 进行隔离革新,logid 间采集发送互相隔离。
- lancer-gateway3.0:logid 的申请解决之间互相隔离,当 kafka 发送碰壁,间接反压给 agent 端,上面具体介绍。
- 数据缓冲层:每个 logid 对应一个独立的外部 kafka topic,实现数据的缓冲。
- 数据散发层:散发层对每个 logid 的启动独立的 flink sql 作业进行数据的散发,单个 logid 解决碰壁,只会导致当个 logid 的数据沉积。
相较于之前的实现,v3.0 架构具备以下的劣势:
1. 可靠性:
- 性能品质上整顿链路能够保证数据不失落,网关层以同步形式发送数据,能够保证数据被长久化到外部 kafka;flink 反对状态复原和 exactly once 的语义,同样保证数据不丢。
2. 可维护性上:
- 隔离性上 logid 之间互相隔离,一个 logid 呈现问题,其余 logid 不受影响。
- 资源分配以 logid 为最小单位,能够准确管制单个 logid 的资源应用。
3. 可扩展性:
- 能够以单个 logid 为单位灵便管控:灵便的扩缩资源
04 V3.0 架构具体实现
咱们重点介绍下,以后 V3.0 构造各个分层的实现。
4.1 数据上报边缘层
4.1.1 log-agent
基于 go 自研,插件化架构,部署于物理机,牢靠、高效的反对服务端数据上报。
工夫架构分为收集、解决、发送三层,具备以下次要个性:
- 反对文件采集和 unix sock 两种数据上报形式
- 与网关 GRPC 通信:ACK+ 退却重试 + 流控
- 整体上报 SDK 和 agent 接管 + 发送逻辑依照 logid 进行隔离革新,单 logid 解决互相隔离:每个 logid 启动独立的 pipeline 进行采集、解析、发送。
- 网关基于服务发现,自适应网关的调整
- 发送碰壁状况下,基于磁盘进行本地沉积
- logid 粒度的埋点监控,实时监控数据的解决状态
- CGroup 资源限度:CPU + 内存
- 数据聚合发送,晋升传输效率
- 反对物理机和容器日志此采集,配置随利用公布,自适应配置的增、删、改。
4.1.2 bfe-agent
基于 go 自研,部署于 cdn,用于承载公网数据上报。
边缘 cdn 节点,cdn 服务器上部署 nginx 和 bfe-agent,bfe-agent 整体实现架构与 log-agent 相似,对于 web 和 app 端数据上报申请 QPS 高、突发性强的特点,次要强化了以下能力:
- 应答流量陡增:基于边缘节点的本地缓冲起到削峰作用
- 策略(降级、流控)前置,加强可控力
- logid 级别分流隔离, 反对等级划分
- 聚合压缩回传以晋升数据传输效率、降低成本,回源 QPS 升高 90% 以上。
4.2 数据上报网关层
v3.0 计划中,数据数据网关的架构如下:
数据网关性能个性如下:
- kafka 的通用代理层:反对 grpc /http 协定
- 基于 kafka send callback 实现了同步发送模型,保证数据不丢:数据写入 kafka 后,再对申请返回 ack
- 申请不拆分:基于 agent 的聚合机制,只反对单次申请单条记录,因而一条记录对应一条缓存层 kakfa 的音讯
- lancer-gateway3.0 依据申请的 topic 信息,发送申请到对应的 kafka 集群
- lancer-gateway3.0 适配 kafka 集群的部分热点:反对 partition 动静剔除
- logid 与 topic 一一对应,解决流程中互相隔离:一个 topic 发送碰壁,不影响其余的 topic
整个数据网关中的实现难点是:单 gateway 承载多 logid 解决的过程中如何保障隔离性和公平性,咱们参考了 Golang 中 GMP 的机制,整体数据流程如下:
1. 收到的申请,会把申请放到 logid 对应的申请队列,如果队列满,间接拒绝请求
2. 每个 kafka 集群,会初始化一个 N 大小的 kafka producer pool,其中每个 producer 会遍历所有的队列,进行数据的发送。
3. 对于每个 logid 的申请队列,会从两个保护限度资源的占用,以保障公平性和隔离性
- 限度当个 logid 队列绑定的 producer 数量
- 基于工夫片限定当个 producer 服务于单个队列的工夫长度
4.3 数据上报散发层
随着 flink 在实时计算畛域的成熟,其高性能、低提早、exactly once 语义保障、批流一体、丰盛的数据源反对、沉闷的社区等劣势,促使咱们抉择了以 flink sql 作为数据散发层的解决方案。以后咱们次要反对了 kafka→hive, kafka→kafka, cdc→kafka->hudi/hive 三种场景:
1. kafka→hive
- 以流式形式,实时导入数据到 hive。
- file rolling on check,保障 exactly once。
- 依照 event time 写入分区和归档,归档提早小于 15min
- 反对 text+lzo(行存)和 orc+zstd(列存)两种存储格局。
- 反对上游作业增量同步。
2. kafka→kafka
- 以流式形式,反对数据的实时同步
- 反对 kafka header metadata 信息的透传
3. cdc→kafka->hudi/hive
- 以实时流的形式同步全量和增量数据,整个 cdc 的应用场景分为两个环节
- cdc → kafka
- 基于 cdc 2.1,同步 mysql 的全量和增量 binlog 同步
- 单 sql 作业反对分库分表、多库多表的同步。
- 反对依据 db 和 table 自定义策略分流到不同的数据缓冲层 kafka topic
- kafka→hudi/hive
- 生产单 topic 同步到单张 hudi/hive 表,反对 event_time 落分区。
- 保证数据最终一致性
05 Flink connector 性能迭代
在 Flink SQL 数据散发场景的反对中,针对咱们遇到的理论需要,对社区原生 connector 进行了对应的优化,咱们针对性的介绍下。
5.1 hive sink connector 优化
断流空分区提交
背景:B 站离线作业的拉起依赖上游分区提交,HDFS 分区提交的判断依赖于作业整体 watermark 的推动,然而某些 logid 在断流的状况下,如何进行分区的提交呢
解决办法:
如图所示:当所有的 StreamFileWriter 间断两次 checkpoint 内未解决任何数据的状况下,StreamingFileCommiter 会断定产生了断流,依照以后工夫提交分区。
反对上游增量数据同步
背景:传统形式 ods 到 dwd 的数据同步只有当 ods 层分区 ready 之后才会开始,时效性较差,如何减速数据的同步?
解决办法:
- 不依赖 ods 层分区 ready,当 ods 目录中文件生成后,即可开始数据的解决,以增量的形式读取数据文件。
- 通过 HDFS 的 list 操作来获取须要读取的文件,对 NameNode 压力较大,为此咱们提供了文件 list 列表索引(包含文件名和数据条数),上游只须要读取索引,即可获取增量文件列表。
- 实现中索引文件状态被长久化到 state 中,snapshot 中生成.inflight 状态临时文件,notifyCheckpointComplete 中将文件 rename 成 commit 正式文件, 提供 exactly once 语义保障。
- 上游作业读取文件索引,反对 ods 到 dwd 的增量数据同步。
orc+zstd
背景:相较于行式存储,列式存储在压缩比上有着显著的劣势。
解决办法:反对 orc+zstd, 通过测试,相较于 text+lzo,空间节俭在 40% 以上。
hdfs 异步 close
背景:snapshot 阶段 flush 数据,close 文件常常因为个别文件慢连累整体吞吐。
解决办法:
- 将 close 超时的文件扔到异步队列中。也就是 close 的动作不会去梗塞整个主链路的解决,晋升 hdfs 部分热点状况下的吞吐。异步 close 文件列表保留到 pendingPartsForCurrentCheckpoint,并且长久化到 state 当中。故障复原时,也能持续对文件进行敞开。
- 异步 close 的引入,会引入分区提前创立的隐患,为此引入了对于 bucket 状态的判断。对于某分区,只有当隶属于此分区的所有 bucket 中的 pendingPartsForCurrentCheckpoint 为空(所有文件都进行了敞开),才在 commit 算子中进行分区的提交。
小文件合并
背景:rolling on checkpoint 的滚动策略,会导致文件数量的收缩,对 namenode 产生较大的压力。
解决办法:
- 引入了小文件合并性能,在 checkpoint 实现后,由 Streaming writer 的 notifyCheckpointComplete 办法触发合并操作,向上游发送 EndCheckpoint 信号。
- coordinator 收到每个 writer 的 EndCheckpoint 后,开始进行文件的分组,封装成一个个 compactunit 播送上游,全副 unit 发送完之后,再播送 EndCompaction。
- compact operator 找到属于本人的工作后开始解决,当收到 EndCompaction 后,往上游发送分区提交信息。
5.2 kafka connector 优化
反对 protobuf format
背景:用户有解决 protobuf 格局数据的需要
解决办法:
- 应用 protoc 生成 java 类,打包 jar,上传到实时计算平台。
- 实现对应的 DeserializationSchema 和 SerializationSchema,动静加载 pb 类并通过反射调用办法,实现 pb bytes 与 RowData 的互转。
kafka sink 反对自定义分流
背景:用户心愿在一个 sql 作业中依据须要,灵便定制将音讯发送到指定 kafka 集群和 topic。
解决办法:
- 反对用户自定义 udf,灵便抉择 sql 中的字段作为 udf 的入参,在 udf 外部,用户依据业务场景定制逻辑,返回 topic 或者 broker list。最终 sink 外部发送到对应的 kafka 集群和 topic。
- kakfa sink 外部动静加载 udf,通过反射机制实时获取对应的 broker 和 topic,同时反对后果的缓存。
-
例子:
CREATE TABLE sink_test ( broker_name_arg varchar, topic_name_arg varchar, message string, t1 string ) WITH('bootstrapServers' = 'BrokerUdf(broker_name_arg)', // 依据 broker_name_arg 作为 udf 参数计算 brokers 'bootstrapServers.udf.class' = 'com.bilibili.lancer.udf.BrokerUdf', // 获取 brokers Udf 'topic' = 'TopicUdf(broker_name_arg, topic_name_arg)', // 依据 broker_name_arg 和 topic_name_arg 作为 udf 参数计算 topic 'topic.udf.class' = 'com.bilibili.lancer.udf.TopicUdf', // 计算 topoc Udf 'udf.cache.min' = '1', // 缓存工夫 'exclude.udf.field' = 'false', // udf 的相干字段是否输入 'connector' = 'kafka-diversion' );
5.3 cdc connector 优化
sql 场景下多库多表场景反对
背景:原生的 flink cdc source 在单个 sql 工作中,只能同步雷同 DDL 定义的表,如果须要同步异构 DDL,不得不启动多个独立的 job 进行同步。这样会存在资源的额定开销。
解决办法:
-
sql 定义去 DDL:
原生 flink cdc source 会对所有监听到的数据在反序列化时依据 sql ddl 定义做 column 转换和解析,以 RowData 的模式传给上游。咱们在 cdc-source 中新增了一种的 format 形式:changelog bytes 序列化形式。该 format 在将数据反序列化时在不再进行 column 转换和解析,而是将所有 column 间接转换为 changelog-json 二进制传输,外层将该二进制数据间接封装成 RowData 再传给上游。对上游通明,上游在生产 kafka 数据的时候能够间接通过 changelog-json 反序列化进行数据解析。并且因为该改变缩小了一次 column 的转换和解析工作,通过理论测试下来发现除主动感知 schema 变更外还能晋升 1 倍的吞。在 kafka sink connector 中,依据 db 和 table 进行分流,能够反对发送到不同的 topic。
-
扩大 metadata,增加 sequence:
将增量数据同步到 kafka 中,因为 kafka 存在多分区,因而必然会导致音讯乱序问题。因而须要提供一个单任务内严格枯燥递增的 sequence,用于上游消费者进行排序,保证数据的最终一致性。最终咱们提取 binlog 中的 gtid 作为 binlog 音讯的 sequence id,通过 metadata 的形式裸露解决来,写入 kafka record 的 header 中,对于全量数据,sequence 设置为 0。
断流场景分区提交反对
背景:因为整个 cdc 计划存在上游和上游两个独立的 job,并且都是基于 event time 推动 watermark 做分区的提交,上游 watermark 的推动碰壁可能受到数据失常断流或者上游作业异样两种起因的影响,如果正确判断呢?
解决办法:
- 在 cdc source connector 内定义一种新类型的 record HeartbeatRecord,此 record 工夫为以后工夫。当发现某张表数据进行发送时,定期 mock 心跳数据进行发送。失常断流状况下,上游作业能够依据心跳信息失常推动 watermark,并且能够过滤抛弃此信息。
-
最终 cdc connector sql 样例:
CREATE TABLE mysql_binlog ( host_name STRING METADATA FROM 'host_name' VIRTUAL, db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP(3) METADATA FROM 'op_ts' VIRTUAL, sequence BIGINT METADATA FROM 'sequence' VIRTUAL, // sequence 严格枯燥递增 heartbeat BOOLEAN METADATA FROM 'heartbeat'VIRTUAL, // 对于心跳信息标识为 true mtime TIMESTAMP(3) METADATA FROM 'mtime'VIRTUAL, // 提取 mtime,用于上游推动 watermark id BIGINT NOT NULL, filed_list BYTES NOT NULL, // 去 DDL,在 source 外部数据全副依照 changelog-json 格局进行序列化、PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'xxxx', 'port' = '3552', 'username' = 'datacenter_cdc', 'password' = 'xxx', 'database-name' = 'xxx', 'debezium.format' = 'bytes', 'table-name' = 'xxx', 'server-time-zone' = 'Asia/Shanghai', 'heartbeat.enable'='true', 'scan.incremental.snapshot.chunk.size' = '80960' );
06 架构稳定性优化
为了保障流式传输稳固和高效运行,咱们在以下几个方面做了一些优化,别离介绍下:
6.1 管道热点优化
作业在失常运行的过程中,常常遇到部分热点问题,例如 kafka/hdfs io 热点导致部分并行度生产速度降落或者写入碰壁、yarn 队列机器 load 不平均导致作业局部并行度生产能力不如,尽管起因多种多样,然而实质看,这些问题的一个共性就是因为部分热点导致部分数据提早。针对这个问题,咱们别离从部分流量调度和全局流量调度两个维度进行优化。
部分流量调度
部分流量调度的优化思路是在单个 producer 和 task 外部,分区之间进行流量的重调配。目前在两个点就行了优化:
-
bsql Task manager 外部 subtask 上下游通信优化:
集成作业并没有 keyby 的需要,基于 Flink Credit-based Flow Control 反压机制,能够通过 Backlog Size 判断上游工作的解决负载,那么咱们就能够将 Round-robin 发送的形式批改为依据 Channel 的 Backlog Size 信息抉择负载更低的上游 Channel 发送的形式。留神:此种策略只有 source 和 sink 端之间是 rebalance/rescale 时,才有成果。会造成肯定的序列化开销,然而测试下来能够承受。
-
kafka producer partition 主动剔除机制:
kafka producer 在发送数据 callback 异样(绝大多数是 timeout)超出肯定的阈值,会将对应 tp 从 available partition list 中进行剔除,后续 record 将不再发送到剔除的 tp。同时,被剔除 tp 后续将进行恢复性测试,如果数据能够失常发送,将从新放入到 available partition list 中。目前此机制在 flink kafka sink connector 和规范 kafka client 都进行了实现。
全局流量调度
全局流量调度的优化思路是整个传输链路层级之间的流量调配,目前咱们将生产者 (lancer-gateway) 与消费者 (flink sql kafka source) 进行联动,当消费者呈现 tp 生产 lag 的状况,通过注册黑名单(lag partition)到 zookeeper,上游生产者感知黑名单,进行向高 lag partition 中持续发送数据。
Flink kafka source 中基于 flink AggregateFunction 机制,kafka source subtask 上报 lag 到 job manager,job manager 基于全局 lag 判断注册黑名单到 zookeeper
黑名单判断逻辑:当单 tp lag > min(全局 lag 平均值,全局 lag 中位数) 倍数 && 单 tp lag 大于 lag 绝对值,其中 “ 单 tp lag 大于 lag 绝对值 ” 是为了躲避此机制过于敏感,” 单 tp lag > min(全局 lag 平均值,全局 lag 中位数) 倍数 ” 用于筛选出头部的 lag tp。为了避免黑名单比例过大,黑名单剔除的 tp 数量下限不得大于全副 tp 数量的肯定比例。
部分流量调度和全局流量调度在管道热点优化成果上存在肯定的互补性,然而也各有劣势。
6.2 全链路埋点品质监控
数据品质是重要一环,通常数据品质蕴含完整性、时效性、准确性、一致性、唯一性等方面,对于数据传输场景,当面咱们重点关注完整性和时效性两个方面
整体品质计划大抵蕴含监控数据采集和规定配置两个大的方向,整体架构如下:
监控数据采集
咱们自研了 trace 零碎:以 logid 为单位,咱们在数据处理流程中的每一层都进行了监控埋点
- 每层埋点蕴含三个方面:接管、发送、外部谬误。所有埋点数据以数据创立工夫(ctime)进行窗口对齐,并且通过更新 utime 以统计层间和层内的解决耗时。
- 通过监控埋点能够实时统计出:端到端、层级间、层级外部的数据处理耗时、完整性、谬误数。
- 以后计划缺点:flink sql 挂掉从 ck 复原,监控数据不能保障幂等,后续须要进一步改良。
监控报警规定
咱们针对数据流进行了分级,每个等级指定了不同的保障级别(SLA),SLA 破线,报警告诉 oncall 同学解决。
提早归档报警:hdfs 分区提交提早,触发报警。
实时完整性监控:基于 trace 数据,实时监控端到端的完整性,接管条数 / 落地条数
离线数据完整性:hdfs 分区 ready 后,触发 dqc 规定运行,比照接管条数(trace 数据)/ 落地条数(hive 查问条数)
传输提早监控:基于 trace 数据,计算端到端数据传输提早的分位数。
DQC 阻塞:离线数据完整性异样后,阻塞上游作业的调度。
6.3 kafka 同步断流反复优化
绝对比 2.0 计划中 flume 计划,基于 flink sql 的 kafka 到 kafka 的实现计划显著的一个变动就是作业的重启、故障复原会导致整体的断流和肯定比例的数据反复(从 checkpoint 复原),因而如何升高用户对此类问题的感知,至关重要。
首先梳理下可能造成问题呈现的起因:1)作业降级重启 2)task manager 故障 3)job manager 故障 4)checkpoint 间断失败,同时依据 flink job 整体提交流程,影响作业复原速度的关键环节是资源的申请。根据上述剖析和针对性测试,针对 kafka 同步场景的断流反复采纳了如下优化形式:
- checkpoint interval 设置成 10s:升高从 checkpoint 复原导致的数据反复比例
- 基于 session 模式提交作业:作业重启无需反复申请资源
- jobmanager.execution.failover-strategy=region,单个 tm 挂掉后,只复原对应的 region,不必复原整个作业。集成作业 DAG 绝对简略,能够尽量躲避 rebalance 的呈现,升高复原的比例。
- 应用小资源粒度 task manager(2core cpu,8GB memory,2 slot):等同资源规模下,tm 数量变多,单 tm 挂掉影响水平显著变低。
- 针对高优作业冗余 task manager:冗余一个 tm,当单个 tm 挂掉状况下,流量简直没受影响
- 基于 zookeeper 实现 job manager ha:在开启 jm ha 后,jm 挂掉工作未断流
- 针对 checkpoint 间断失败的场景,咱们引入了 regional checkpoint,以 region(而不是整个 topology)作为 checkpoint 治理的单位,避免个别 task 的 ck 失败造成整个作业的失败,能够无效避免在个别 task 的 ck 间断失败的状况下须要回溯的数据量,减小集群稳定(网络,HDFS IO 等)对 checkpoint 的影响
通过上述优化,通过测试一个(50core,400GB memory,50 slot)规模的作业,优化成果如下:
6.4 kafka 流量动静 failover 能力
为了保证数据及时上报,Lancer 对于数据缓冲层的 kafka 的发送成功率依赖性很高,常常遇到的 case 是高峰期或者流量抖动导致的 kafka 写入瓶颈。参考 Netflix Hystrix 熔断原理,咱们在网关层实现了一种动静 kafka failover 机制:网关能够依据实时的数据发送状况计算熔断率,依据熔断率将流量在 normal kafka 和 failover kafka 之间动静调节。
- 基于滑动工夫窗口计算熔断比例:滑动窗口的大小为 10,每个窗口中统计 1s 内胜利和失败的次数。
- 熔断器状态:敞开 / 关上 / 半开,熔断率 =fail_total/sum_total , 为防止极其状况流量全切到 failover,熔断率须要有一个下限配置。熔断后的降级策略:normal kafka 熔断后尝试切 failover,failover kafka 如果也熔断的话就切回 normal
- 判断逻辑:
6.5 全链路流控、反压、降级
从端上上报到数据落地的整个流程中,为了保障稳定性和可控性,除了前述伎俩,咱们还引入了整体流控、反压、降级等技术手段,上面综合介绍下。
从后向前,分为几个环节:
1. 数据散发层:
- 如果呈现生产提早,数据反压到数据缓冲层 kafka
- 单作业外部通过 backlog 反压做 subtask 之间的流量平衡
2. 数据网关层:
- 如果写入 kafka 提早,间接返回流控码(429)给数据上报端
- 数据网关层和数据散发层之间通过 kafka tp 级别流控调度适配部分 tp 解决提早。
3. 数据上报层:
- 适配数据网关的流控返回:做退却重试
- 基于本地磁盘进行数据的沉积
- 配置动静推送失效被动采样 / 降级沉积
6.6 开发阶段品质验证
为了在开发阶段保障整体服务的正确性和稳定性,开发阶段咱们设计了一套残缺的测试框架。
- 新版本上线之前,咱们会同时双跑新旧两条作业链路,将数据别离落入两张 hive 表,并且进行全分区的条数和内容 md5 校验,校验后果以小时级别 / 天级别报表的模式收回。此测试框架保障了版本迭代的过程中,端到端的正确性。
- 同时为了保障异样极其状况下数据的准确性,咱们也引入了混沌测试,被动注入一些异样。异样包含:job manager 挂掉,taskmanager 挂掉、作业随机重启、部分热点、脏数据等等。
07 将来瞻望
- 链路架构降级,接入公司级的数据网关(Databus),架构对立并且能够涵盖更多的数据上报场景。
- 云原生,拥抱 K8S,面向用户 quota 治理,并且实现主动资源 AutoScale。
- 拥抱批流一体,强化增量化集成,笼罩离线批集成场景,打造对立基于 Flink 的统一化集成框架。