1 背景

随着Flink实例的迁徙下云以及新增需要接入,自建Flink平台规模逐步壮大,以后总计已超4万核运行在自建的K8S集群中,然而 Flink 工作数的减少,特地是大状态工作,每次Checkpoint 时会产生脉冲式带宽占用,峰值流量超过100Gb/s,晚期应用OSS作为Checkpoint数据存储,单个Bucket 每 1P数据量只有收费带宽10Gb/s,超出局部独自计费,以后规模每月须要减少1x w+/月。

为了管制这部分老本,得物发展了自建HDFS在Flink Checkpoint场景下的落地工作,实现年度老本节俭xxx万元。

此次分享自建HDFS在实时计算checkpoint场景的实践经验,心愿能为读者提供一些参考。

2 Flink Checkpoint 介绍

2.1 Flink里的Checkpoint是什么?

Checkpoint:简略的说,在某一时刻,将 Flink 工作本地机器中存储在状态后端的状态去同步到近程文件存储系统(比方 HDFS)的过程就叫 Checkpoint。

状态后端:做状态数据长久化的工具就叫做状态后端。比方你在 Flink 中见到的 RocksDB、FileSystem 的概念就是指状态后端,再引申一下,也能够了解为:利用中有一份状态数据,把这份状态数据存储到 MySQL 中,这个 MySQL 就能叫做状态后端。

2.2 Checkpoint解决什么问题?

其实在实时计算中的状态的性能次要体现在工作能够做到失败重启后没有数据品质、时效问题。

实时工作个别都是 7x24 小时 Long run 的,挂了之后,就会有以下两个问题,首先给一个理论场景:一个生产上游 Kafka,应用 Set 去重计算 DAU 的实时工作。

数据品质问题:当这个实时工作挂了之后复原,Set空了,这时候工作再持续从上次失败的 Offset 生产 Kafka 产出数据,则产出的数据就是谬误数据了

数据时效问题:一个实时工作,产出的指标是有时效性(次要是时延)要求的。你能够从明天 0 点开始从新生产,然而你回溯数据也是须要工夫的。举例:中午 12 点挂了,实时工作从新回溯 12 个小时的数据能在 1 分钟之内实现嘛?大多数场景下是不能的!个别都要回溯几个小时,这就是实时场景中的数据时效问题。

而 Flink的Checkpoint就是把 Set 定期的存储到近程 HDFS 上,当工作挂了,咱们的工作还能够从 HDFS 下面把这个数据给读回来,接着从最新的一个 Kafka Offset 持续计算就能够,这样即没有数据品质问题,也没有数据时效性问题。

2.3 Checkpoint的运行流程?

  1. JM 定时调度 Checkpoint 的触发,承受到 JM 做 Checkpoint 的申请后,开始做本地 Checkpoint,暂停解决新流入的数据,将新数据缓存起来。
  2. 将工作的本地状态数据,复制到一个近程的长久化存储(HDFS)空间上。
  3. 持续解决新流入的数据,包含方才缓存起来的数据。

3 自建HDFS引入

3.1 为什么用HDFS?

Flink 做为一个成熟的流计算引擎,对外声称能够实现 Exactly Once。为了实现业务上的 Exactly Once,Flink 必定不能丢数据,也就是状态数据必须保障高可靠性,而HDFS作为是一个分布式文件系统,具备高容错率、高吞吐量等个性,是业界应用最宽泛的开源分布式文件系统,针对大状态的Checkpoint工作十分符合,带宽易扩大且老本低廉。

HDFS次要有如下几项特点:

  • 和本地文件系统一样的目录树视图
  • Append Only 的写入(不反对随机写)
  • 程序和随机读
  • 超大数据规模
  • 易扩大,容错率高

3.2 得物自建HDFS架构

架构层面是典型的主从构造,架构见下图,核心思想是将文件依照固定大小进行分片存储,

  • 主节点:称为 NameNode,次要寄存诸如目录树、文件分片信息、分片寄存地位等元数据信息
  • 从节点:称为 DataNode,次要用来存分片数据

比方用户收回了一个1GB的文件写申请给HDFS客户端,HDFS客户端会依据配置(默认是128MB),对这个文件进行切分,HDFS客户端会切分成8个Block,而后询问NameNode应该将这些切分好的Block往哪几台DataNode上写,尔后client端和NameNode调配的多个DataNode形成pipeline管道,开始以packet为单位向Datanode写数据。

4 自建HDFS落地实际

4.1 集群布局

晚期应用OSS的次要瓶颈在于带宽,为了匹配将大状态的工作从OSS迁徙到Hdfs带宽需要,撑持写入流量100Gib+/s,比照OSS的带宽老本,联合到老本与带宽瓶颈思考,外部大数据d2s.5xlarge机型做了一次性能压测,单节点吞吐能达到12Gib/s,按100Gib/s预估,算上Buffer,3正本集群须要xx台机器,满足当初的带宽及写入吞吐需要,最终抉择d2s.5xlarge类型Ecs机器,对应实例详情如下:

4.2 稳定性保障建设

4.2.1 Hdfs组件指标采集

为了确保HDFS集群的稳固和可靠性,撑持线上实时Flink工作Checkpoint,监控告警建设是必不可少的,咱们通过对立的采集程序Hadoop Exporter将集群里各组件的JMX信息换为维度模型,将下述为扁平化的事实指标Jmx数据,转换为维度构造,比方针对NameNode、DataNode,能够间接将指标应用预约义维度,例如:cluster、instance等维度,并存储到Prometheus可能辨认的指标数据,存储为一个二维字典构造,例如: _hadoop_namenode_metrics指标分类(通常是MBean的名称)

4.2.2 指标采集架构

联合以后集群的规模,咱们通过集中是Pull的形式采集架构,只须要启动时指定集群Namenode及Jn的Jmx的url信息,就能采集集群的所有组件的指标信息,这样当有集群扩大或变更时,会主动采集上报到apm里,不便运维,具体采集架构如下图:

4.2.3 监控与告警

监控:基于已采集汇报上的指标数据,目前配置了Namenode、Datanode组件外围指标监控大盘,包含HDFS节点衰弱状态、HDFS服务衰弱状态、数据块衰弱状态、节点的写入吞吐量等指标。

告警:以后监控数据已实现接入公司天眼监控平台,咱们将影响hdfs服务可用性的指标对立配置了告警模版,比方集群总的写入带宽、Callqueue队列、DN存活数量、集群节点根底io值班等,能够动静笼罩多集群,实现定制化告警,更加灵便及不便感知问题,缩小故障止损时长,满足线上HDFS稳定性保障SLA指标。

4.2.4 集群疾速变更能力

随着Hdfs集群规模的减少,在日常运维过程中,如何做到疾速扩、缩容、节点重启及配置变更能力,保障集群具备疾速止损的能力,咱们封装了一整套HDFS的各组件变更能力,包含节点主动上报到cmdb对应利用、集群数据节点maintenance模式疾速无影响重启、日常变配等,并集成到ansible playbook,做到集群扩容在分钟级实现。

4.3 迁徙到HDFS攻克难关

4.3.1 DN 心跳汇报于删除共用一把写锁问题

景象:自建Flink平台大部分大状态工作迁徙后,自建HDFS集群节点整体的水位各个ecs的网络带宽峰值,呈现偶发局部工作因checkpiont 写入失败问题,报错信息如下:

问题定位过程:

  1. 依据客户端日志的堆栈信息,查看Namenode的日志找到对应的文件、块,发现了谬误日志,文件块在写入胜利后不能及时上报,块的状态始终处于not COMPLETE。

这里介绍下Hdfs文件写入流程介绍:

  • 客户端向datanode写入块完结后,datanode通过IBR(增量块汇报)向namenode汇报新写入的块
  • namenode收到汇报后更新文件的块正本数,当文件块正本数>=1时,文件写入状态为COMPLETE
  • 客户端写入完结后一直向namenode询问文件写入状态是否COMPLETE,失败5(默认)次后报错写入失败。
  1. 根据上述写入流程,狐疑问题呈现在IBR阶段,查看Namenode监控指标,Namenode解决块汇报均匀时长<10ms,所以猜想问题出在Datanode端,察看发现,Datanode偶发心跳汇报距离>30s(失常3s一次),Datanode IBR和心跳都是BPServiceActor线程解决,很可能是心跳阻塞了IBR。

  1. 咱们依据猜想的方向,持续定位什么起因导致心跳阻塞了IBR汇报,于是在每台节点上,部署了脚本(见下图),依据Datanode的Jmx指标监听本节点心跳距离,大于10s时就打印Datanode的Jstack。

Datanode 每个节点上的metric信息里蕴含心跳汇报距离的数据。

  1. 剖析多个Jstack代码(具体内容见下),能够发现BPServiceActor线程被CommandProcessingThread线程阻塞,而CommandProcessingThread线程在调用invalidate()办法,而invalidate()是在调用删除操作。
"BP-1732625734-****-1675758643065 heartbeating to ****:8020" #56 daemon prio=5 os_prio=0 tid=0x00007f8fc6417800 nid=0x77e0 waiting on condition [0x00007f8f933f5000]   java.lang.Thread.State: WAITING (parking)        at sun.misc.Unsafe.park(Native Method)        - parking to wait for  <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.writeLock(BPOfferService.java:118)        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.updateActorStatesFromHeartbeat(BPOfferService.java:570)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:699)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:879)        at java.lang.Thread.run(Thread.java:748)   Locked ownable synchronizers:        - None        "Command processor" #54 daemon prio=5 os_prio=0 tid=0x00007f8fc640f800 nid=0x77de runnable [0x00007f8f935f7000]   java.lang.Thread.State: RUNNABLE        at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)        at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)        at java.io.File.isDirectory(File.java:858)        at java.io.File.toURI(File.java:741)        at org.apache.hadoop.hdfs.server.datanode.LocalReplica.getBlockURI(LocalReplica.java:256)        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2133)        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2099)        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActive(BPOfferService.java:738)        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActor(BPOfferService.java:684)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processCommand(BPServiceActor.java:1359)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.lambda$enqueue$2(BPServiceActor.java:1405)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread$$Lambda$75/2086554487.run(Unknown Source)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processQueue(BPServiceActor.java:1332)        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.run(BPServiceActor.java:1315)   Locked ownable synchronizers:        - <0x00000007204cf938> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync)        - <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

联合堆栈信息定位到代码,的确发现processCommandFromActor办法在执行删除(调用invalidate()办法)操作时与心跳汇报updateActorStatesFromHeartbeat办法共用同一把写锁。

class BPOfferService {private final Lock mWriteLock = mReadWriteLock.writeLock();void writeLock() {  mWriteLock.lock();}void writeUnlock() {  mWriteLock.unlock();}void updateActorStatesFromHeartbeat(    BPServiceActor actor,    NNHAStatusHeartbeat nnHaState) {  writeLock();  try {//... 心跳汇报  } finally {    writeUnlock();  }}boolean processCommandFromActor(DatanodeCommand cmd,    BPServiceActor actor) throws IOException {  assert bpServices.contains(actor);// ...省略  writeLock();  try {//...执行删除逻辑  } finally {    writeUnlock();  }}}
  1. 确认问题:查看Namenode审计日志发现,集群继续有大量文件删除(Flink删除过期Checkpoint meta文件)操作,批改Datanode端代码,在调用processCommandFromActive办法超过肯定10s后打印调用时长与CommandAction日志。查看datanode日志发现的确存在删除操作大于30s的状况,由此确认问题就是呈现在删除操作耗时过长影响了Datanode的增量块汇报。

由此确定问题:

删除块操作耗时过长,阻塞datanode心跳,导致IBR被阻塞,块写入胜利后不能及时上报,客户端重试肯定次数后失败抛异样,重试次数由dfs.client.block.write.locateFollowingBlock.retries决定,默认5次,第一次期待0.4s,之后每次期待时长翻倍,5次约为15s左右。

问题解决方案

找到问题就是呈现在BPServiceActor 线程做了太多的事,蕴含FBR、IBR、心跳汇报,而且心跳汇报和删除独特持有一把写锁,那解决方案一个就把这两把锁进行拆分,一个就是将IBR逻辑独自独立进去,不受心跳汇报影响。

而社区3.4.0版本曾经将IBR从BPServiceActor 线程独立进去了,所有咱们最终将HDFS-16016 patch 合并到自建Hdfs3.3.3版本中,IBR不会被invalidate()阻塞,问题失去根治!

5 总结与布局

总结:Oss的流量已从晚期137Gib/s升高到30Gib/s左右(下图一),自建Hdfs集群峰值流量达到120Gb/s(下图二),且安稳运行

整个我的项目已实现全副大状态工作从Oss迁徙到自建Hdfs,以后Hdfs集群规模xx台,老本x w/月,原OSS带宽费用报价1x w/月,相比节俭xx w/月。

将来布局:对于全量 checkpoint 来说,TM 将每个 Checkpoint 外部的数据都写到同一个文件,而对于 RocksDBStateBackend 的增量 Checkpoint 来说,则会将每个 sst 文件写到一个分布式系统的文件内,当作业量很大,且作业的并发很大时,则会对底层 HDFS 造成十分大的压力,

1)大量的 RPC 申请会影响 RPC 的响应工夫。

2)大量文件对 NameNode 内存造成很大压力。

针对下面的问题咱们将来思考引入小文件合并计划升高 HDFS 的压力,包含 RPC 压力以及 NameNode 内存的压力。

*文/希贤


线下流动举荐: 得物技术沙龙「企业合作效率演进之路」(总第19期)
工夫:2023年7月16日 14:00 ~ 2023年7月16日 18:00
地点:(上海杨浦)黄兴路221号互联宝地 C栋5楼(宁国路地铁站1号口出)
流动亮点:在当今竞争日益强烈的商业环境中,企业合作效率成为企业团队胜利的要害。越来越多的企业意识到,通过信息化建设和工具化的反对,能够大幅晋升合作效率,并在行业中获得冲破。本次沙龙将涵盖多个主题,这些主题将为与会者提供丰盛的思考和教训,助力企业合作效率的晋升。通过得物技术沙龙这个交流平台,您将有机会与其余企业的代表一起学习、借鉴彼此的教训和做法。独特探讨企业外部合作效率的最佳实际,驱动企业长期生存和倒退。退出得物技术沙龙,与行业先驱者们一起开启合作效率的新篇章!让咱们独特为合作效率的冲破而致力!
点击报名: 得物技术沙龙「企业合作效率演进之路」(总第19期)

本文属得物技术原创,来源于:得物技术官网
未经得物技术许可严禁转载,否则依法追究法律责任!