乐趣区

关于java:分享一个Flink-checkpoint失败的问题和解决办法

本文来自: PerfMa 技术社区

PerfMa(笨马网络)官网

接触 Flink 一段时间了,遇到了一些问题,其中有一个 checkpoint 失败导致作业重启的问题,遇到了很屡次,重启之后个别也能恢复正常,没有太在意,最近 2 天有共事又频繁遇到,这里记录一下解决方案和剖析过程。

咱们的 flink 测试环境有 3 个节点,部署架构是每个 flink 节点上部署一个 HDFS 的 DataNode 节点,hdfs 用于 flink 的 checkpoint 和 savepoint

景象

看日志是说有个 3 个 datanode 活着,文件正本是 1,然而写文件失败

There are 3 datanode(s) running and no node(s) are excluded

网上搜了一下这种报错,没有间接的答案,我看了下 namenode 的日志,没有更多间接的信息

50070 web ui 上看一切正常,datanode 的残余空间都还有很多,使用率不到 10%

我试了一下往 hdfs 上 put 一个文件再 get 下来,都 ok,阐明 hdfs 服务没有问题,datanode 也是通的

日志景象 1

持续前后翻了一下 namenode 的日志,留神到有一些 warning 信息,

这时候狐疑块搁置策略有问题

依照日志提醒关上相应的的 debug 开关
批改

etc/hadoop/log4j.properties

找到

log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN

照抄这个格局,在上面增加

log4j.logger.org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy=DEBUG
log4j.logger.org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor=DEBUG
log4j.logger.org.apache.hadoop.net.NetworkTopology=DEBUG

重启 namenode,而后重跑 flink 作业

日志景象 2

这时候看到的问题是机架感知策略无奈满足,因为咱们没有提供机架映射脚本,默认同一个机架,然而认真想想跟这个应该也没有关系

因为很多生产环境的 hdfs 其实都不配置机架映射脚本,并且导致 checkpoint 失败的问题并不是始终存在,最起码 put/get 文件都是失常的。

这时候开始思考看一下 hdfs 的源码了,依据下面的日志调用栈,先看到 BlockPlacementPolicyDefault 以及相干的 DatanodeDescriptor

这些源码大抵的意思是当给一个块抉择一个 datanode 的时候,要对这个 datanode 进行一些查看,比方看下残余空间,看下忙碌水平

当咱们的问题复现的时候,察看日志会发现一些与此相关的要害信息

这个日志的意思是,存储空间有 43G,调配块理论须要 100 多 M, 然而 scheduled 大小就超过 43G, 因而咱们认为失常的 datanode,namenode 认为它空间有余了

起因

scheduled 大小含意是什么呢?依据代码能够看到 scheduled 大小是块大小跟一个计数器做乘法,计数器代表的其实是新建文件块数量计数器,hdfs 依据这两个参数评估可能须要的存储空间,相当于给每个 datanode 预约了肯定的空间,预约的空间在文件写入后,计算完实在的占用空间后,还会调整回来。

理解这个原理之后,能够判断的是 datanode 在一段时间内被预约了太多的空间。

flink 的 checkpoint 机制能够参考这一篇 https://www.jianshu.com/p/9c5…
大抵的意思是 taskmanager 上的很多工作线程都会写 hdfs

看了下 hdfs 的目录构造,有大量的相似 uuid 命名 checkpoint 文件,同时每个文件都很小

当咱们的作业并发较大时,相应的在 hdfs 上就会创立更多的 checkpoint 文件,只管咱们的文件大小只有几 K,然而在每一个 datanode 预约的空间都是 128M 乘以调配到的文件数量(文件很小,不超过 128M),那么 43G 的空间,最多预约多少文件呢?除一下也就是 300 多个,三个节点也就是最多 900 个,咱们有多个作业,总并发较大,在预留空间齐全开释前,是很容易呈现这个问题的。

之前晓得 hdfs 不适宜存储小文件,起因是大量小文件会导致 inode 耗费以及 block location 这些元数据增长,让 namenode 内存吃紧,这个例子还表明
当 blocksize 设置较大,文件大小却远小于 blocksize 时,大量这种小文件会导致 datanode 间接 ” 不可用 ”。

解决办法

块大小不是集群属性,是文件属性,客户端能够设置的,flink 这时候每个 taskmanager 和 jobmanager 都是 hdfs 的 ” 客户端 ”,依据 flink 文档,咱们能够做如下配置
1、在 conf/flink-conf.yaml 中指定一个 hdfs 的配置文件门路

fs.hdfs.hadoopconf: /home/xxxx/flink/conf

这里跟 flink 的配置文件门路抉择同一个目录

2、放进去 2 个配置文件,一个 core-site.xml 一个是 hdfs-site.xml

core-site.xml 能够不放,如果 checkpoint 和 savepoint 指定了具体的 hdfs 地址的话,

hdfs-site.xml 里加上 blockSize 配置即可,比方这里咱们给它设置为 1M

具体块大小如何设置,须要察看本人的作业状态文件大小本人灵便调整。

重启 flink 集群,提交作业即可,运行时能够察看下 hdfs 的 fsimage 大小,留神不要因为块太小,小文件太多导致元数据过大。

小结

咱们曾经将该问题同步到集群自动化部署脚本中,部署时会专门增加 blocksize 的配置。

flink 这套依赖 hdfs 的 checkpoint 计划对于轻量级的流计算场景稍显臃肿,checkpoint 的分布式存储不论是间接 filesystem 还是 rocksDB 都须要 hdfs, 其实从 checkpoint 原理和数据类型思考,es 应该也是不错的抉择,遗憾的是社区并没有提供这种计划。

一起来学习吧

PerfMa KO 系列课之 JVM 参数【Memory 篇】

一次 StackOverflowError 排查,起因居然和 Dubbo 无关!

退出移动版