1. 问题背景
通过 flume 间接上传实时数据到 hdfs,会常遇到的一个问题就是小文件,须要调参数来设置,往往在生产环境参数大小也不同
1.flume 滚动配置为何不起作用?
2. 通过源码剖析得出什么起因?
3. 该如何解决 flume 小文件?
2. 过程剖析
接着上一篇,https://blog.csdn.net/hu_lichao/article/details/110358689
自己在测试 hdfs 的 sink,发现 sink 端的文件滚动配置项起不到任何作用,配置如下:
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://linux121:9000/user/data/logs/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=XXX
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
这里配置的是 60 秒,文件滚动一次,也就每隔 60 秒,会新产生一个文件【前提,flume 的 source 端有数据来】这里留神 useLocalTimeStamp 是应用本地工夫戳来对 hdfs 上的目录来命名,这个属性的目标就是相当于工夫戳的拦截器,否则 %Y 等等这些货色都辨认不了
要么用下面这个属性,要么用工夫戳拦截器。然而当我启动 flume 的时候,运行十几秒,一直写入数据,发现 hdfs 端频繁的产生文件,每隔几秒就有新文件产生而且在 flume 的日志输入能够频繁看到这句:
[WARN] Block Under-replication detected. Rotating file.
只有有这句,就会产生一个新的文件,意思就是检测到复制块正在滚动文件,联合源码看下:
private boolean shouldRotate() {
boolean doRotate = false;
if (writer.isUnderReplicated()) {
this.isUnderReplicated = true;
doRotate = true;
} else {this.isUnderReplicated = false;}
if ((rollCount > 0) && (rollCount <= eventCounter)) {LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
doRotate = true;
}
if ((rollSize > 0) && (rollSize <= processSize)) {LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
doRotate = true;
}
return doRotate;
}
这是判断是否滚动文件,然而这外面的第一判断条件是判断是否以后的 HDFSWriter 正在复制块
public boolean isUnderReplicated() {
try {int numBlocks = getNumCurrentReplicas();
if (numBlocks == -1) {return false;}
int desiredBlocks;
if (configuredMinReplicas != null) {desiredBlocks = configuredMinReplicas;} else {desiredBlocks = getFsDesiredReplication();
}
return numBlocks < desiredBlocks;
} catch (IllegalAccessException e) {logger.error("Unexpected error while checking replication factor", e);
} catch (InvocationTargetException e) {logger.error("Unexpected error while checking replication factor", e);
} catch (IllegalArgumentException e) {logger.error("Unexpected error while checking replication factor", e);
}
return false;
}
通过读取的配置复制块数量和以后正在复制的块比拟,判断是否正在被复制
if (shouldRotate()) {
boolean doRotate = true;
if (isUnderReplicated) {
if (maxConsecUnderReplRotations > 0 &&
consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
doRotate = false;
if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {LOG.error("Hit max consecutive under-replication rotations ({});" +
"will not continue rolling files under this path due to" +
"under-replication", maxConsecUnderReplRotations);
}
} else {LOG.warn("Block Under-replication detected. Rotating file.");
}
consecutiveUnderReplRotateCount++;
} else {consecutiveUnderReplRotateCount = 0;}
以上办法,入口是 shouldRotate()
办法,也就是如果你配置了 rollcount
,rollsize
大于 0,会依照你的配置来滚动的,然而在入口进来后,发现,又去判断了是否有块在复制;外面就读取了一个固定变量maxConsecUnderReplRotations
=30,也就是正在复制的块,最多之能滚动出 30 个文件,如果超过了 30 次,该数据块如果还在复制中,那么数据也不会滚动了,doRotate
=false
,不会滚动了,所以有的人发现自己一旦运行一段时间,会呈现 30 个文件,再联合下面的源码看一下:如果你配置了 10 秒滚动一次,写了 2 秒,恰好这时候该文件内容所在的块在复制中,那么尽管没到 10 秒,仍然会给你滚动文件的,文件大小,事件数量的配置同理了。
为了解决上述问题,咱们只有让程序感知不到写的文件所在块正在复制就行了,怎么做呢??只有让 isUnderReplicated
() 办法始终返回 false
就行了,该办法是通过以后正在被复制的块和配置中读取的复制块数量比拟的,咱们能改的就只有配置项中复制块的数量,而官网给出的 flume 配置项中有该项
hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
默认读的是 hadoop
中的 dfs
.replication
属性,该属性默认值是 3,这里咱们也不去该 hadoop 中的配置,在 flume 中增加上述属性为 1 即可
残缺配置如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile =
/data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/hoult/servers/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
# 配置文件滚动形式(文件大小 32M)a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向 hdfs 上刷新的 event 的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 应用本地工夫
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这样程序就永远不会因为文件所在块的复制而滚动文件了,只会依据你的配置项来滚动文件了。。。。
3. 总结
设置 minBlockReplicas
=1 的时候能够保障会按你设置的几个参数来达到不会产生过多的小文件,因为这个参数在读取时候优先级较高,会首先判断到有没有 Hdfs 的正本复制,导致滚动文件的异样,另外flume
接入数据时候能够通过过滤器尽可能把一些齐全用不到的数据进行过滤,荡涤时候就 省事一些了。
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注