1.问题背景

通过flume间接上传实时数据到hdfs,会常遇到的一个问题就是小文件,须要调参数来设置,往往在生产环境参数大小也不同

1.flume滚动配置为何不起作用?

2.通过源码剖析得出什么起因?

3.该如何解决flume小文件?

2. 过程剖析

接着上一篇,https://blog.csdn.net/hu_lichao/article/details/110358689

自己在测试hdfs的sink,发现sink端的文件滚动配置项起不到任何作用,配置如下:

a1.sinks.k1.type=hdfs  a1.sinks.k1.channel=c1  a1.sinks.k1.hdfs.useLocalTimeStamp=true  a1.sinks.k1.hdfs.path=hdfs://linux121:9000/user/data/logs/%Y-%m-%da1.sinks.k1.hdfs.filePrefix=XXX  a1.sinks.k1.hdfs.rollInterval=60  a1.sinks.k1.hdfs.rollSize=0  a1.sinks.k1.hdfs.rollCount=0  a1.sinks.k1.hdfs.idleTimeout=0   

这里配置的是60秒,文件滚动一次,也就每隔60秒,会新产生一个文件【前提,flume的source端有数据来】这里留神 useLocalTimeStamp 是应用本地工夫戳来对hdfs上的目录来命名,这个属性的目标就是相当于工夫戳的拦截器,否则%Y 等等这些货色都辨认不了

要么用下面这个属性,要么用工夫戳拦截器。然而当我启动flume的时候,运行十几秒,一直写入数据,发现hdfs端频繁的产生文件,每隔几秒就有新文件产生而且在flume的日志输入能够频繁看到这句:

[WARN] Block Under-replication detected. Rotating file.

只有有这句,就会产生一个新的文件,意思就是检测到复制块正在滚动文件,联合源码看下:

private boolean shouldRotate() {      boolean doRotate = false;        if (writer.isUnderReplicated()) {            this.isUnderReplicated = true;            doRotate = true;          } else {              this.isUnderReplicated = false;          }          if ((rollCount > 0) && (rollCount <= eventCounter)) {          LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);         doRotate = true;          }          if ((rollSize > 0) && (rollSize <= processSize)) {          LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);           doRotate = true;      }         return doRotate;  }   

这是判断是否滚动文件,然而这外面的第一判断条件是判断是否以后的HDFSWriter正在复制块

public boolean isUnderReplicated() {      try {              int numBlocks = getNumCurrentReplicas();          if (numBlocks == -1) {              return false;          }              int desiredBlocks;          if (configuredMinReplicas != null) {                desiredBlocks = configuredMinReplicas;          } else {               desiredBlocks = getFsDesiredReplication();             }              return numBlocks < desiredBlocks;          } catch (IllegalAccessException e) {          logger.error("Unexpected error while checking replication factor", e);       } catch (InvocationTargetException e) {           logger.error("Unexpected error while checking replication factor", e);          } catch (IllegalArgumentException e) {             logger.error("Unexpected error while checking replication factor", e);         }          return false;  } 

通过读取的配置复制块数量和以后正在复制的块比拟,判断是否正在被复制

if (shouldRotate()) {      boolean doRotate = true;          if (isUnderReplicated) {              if (maxConsecUnderReplRotations > 0 &&              consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {              doRotate = false;                  if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {                   LOG.error("Hit max consecutive under-replication rotations ({}); " +                      "will not continue rolling files under this path due to " +                      "under-replication", maxConsecUnderReplRotations);              }              } else {                LOG.warn("Block Under-replication detected. Rotating file.");          }              consecutiveUnderReplRotateCount++;          } else {              consecutiveUnderReplRotateCount = 0;  }  

以上办法,入口是shouldRotate()办法,也就是如果你配置了rollcount,rollsize大于0,会依照你的配置来滚动的,然而在入口进来后,发现,又去判断了是否有块在复制;外面就读取了一个固定变量maxConsecUnderReplRotations=30,也就是正在复制的块,最多之能滚动出30个文件,如果超过了30次,该数据块如果还在复制中,那么数据也不会滚动了,doRotate=false,不会滚动了,所以有的人发现自己一旦运行一段时间,会呈现30个文件,再联合下面的源码看一下:如果你配置了10秒滚动一次,写了2秒,恰好这时候该文件内容所在的块在复制中,那么尽管没到10秒,仍然会给你滚动文件的,文件大小,事件数量的配置同理了。

为了解决上述问题,咱们只有让程序感知不到写的文件所在块正在复制就行了,怎么做呢??只有让isUnderReplicated()办法始终返回false就行了,该办法是通过以后正在被复制的块和配置中读取的复制块数量比拟的,咱们能改的就只有配置项中复制块的数量,而官网给出的flume配置项中有该项

hdfs.minBlockReplicasSpecify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. 

默认读的是hadoop中的dfs.replication属性,该属性默认值是3,这里咱们也不去该hadoop中的配置,在flume中增加上述属性为1即可

残缺配置如下:

a1.sources = r1a1.sinks = k1a1.channels = c1# taildir sourcea1.sources.r1.type = TAILDIRa1.sources.r1.positionFile =/data/lagoudw/conf/startlog_position.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /opt/hoult/servers/logs/start/.*loga1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder# memorychannela1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 2000# hdfs sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/a1.sinks.k1.hdfs.filePrefix = startlog.# 配置文件滚动形式(文件大小32M)a1.sinks.k1.hdfs.rollSize = 33554432a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.idleTimeout = 0a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数a1.sinks.k1.hdfs.batchSize = 1000# 应用本地工夫# a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1 

这样程序就永远不会因为文件所在块的复制而滚动文件了,只会依据你的配置项来滚动文件了。。。。

3.总结

设置minBlockReplicas=1 的时候能够保障会按你设置的几个参数来达到不会产生过多的小文件,因为这个参数在读取时候优先级较高,会首先判断到有没有Hdfs的正本复制,导致滚动文件的异样,另外flume接入数据时候能够通过过滤器尽可能把一些齐全用不到的数据进行过滤,荡涤时候就 省事一些了。
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注