共计 5748 个字符,预计需要花费 15 分钟才能阅读完成。
Flume 的 source 选哪个?
taildir source 首选!
1. 断点还原 positionFile
可以记录偏移量
2. 可配置文件组,里面使用正则表达式配置多个要监控的文件
就凭第一点其他的 source 都被比下去了!
这么好的 taildir source 有一点不完美,不能支持递归监控文件夹。
所以就只能修改源代码了……好玩,我喜欢~
改源码,先读源码
Flume 的 taildir source 启动会调用 start()
方法作初始化,里面创建一个ReliableTaildirEventReader
, 这里用到了建造者模式
@Override
public synchronized void start() {logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);
try {reader = new ReliableTaildirEventReader.Builder()
.filePaths(filePaths)
.headerTable(headerTable)
.positionFilePath(positionFilePath)
.skipToEnd(skipToEnd)
.addByteOffset(byteOffsetHeader)
.cachePatternMatching(cachePatternMatching)
.recursive(isRecursive)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.build();} catch (IOException e) {throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
}
idleFileChecker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);
positionWriter = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);
super.start();
logger.debug("TaildirSource started");
sourceCounter.start();}
taildir source 属于PollableSource
,
/**
* A {@link Source} that requires an external driver to poll to determine
* whether there are {@linkplain Event events} that are available to ingest
* from the source.
*
* @see org.apache.flume.source.EventDrivenSourceRunner
*/
public interface PollableSource extends Source {...
这段注释的意思是 PollableSource
是需要一个外部驱动去查看有没有需要消费的事件,从而拉取事件,讲白了就是定时拉取。所以 flume 也不一定是真正实时的,只是隔一会儿不停地来查看事件而已。(与之相应的是另一种 EventDrivenSourceRunner
)
那么 taildir source 在定时拉取事件的时候是调用的 process
方法
@Override
public Status process() {
Status status = Status.READY;
try {existingInodes.clear();
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {tailFileProcess(tf, true);
}
}
closeTailFiles();
try {TimeUnit.MILLISECONDS.sleep(retryInterval);
} catch (InterruptedException e) {logger.info("Interrupted while sleeping");
}
} catch (Throwable t) {logger.error("Unable to tail files", t);
status = Status.BACKOFF;
}
return status;
}
重点就是下面这几行
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
} }
从reader.updateTailFiles()
获取需要监控的文件,然后对每一个进行处理,查看最后修改时间,判定是否需要tail
,需要tail
就tail
那么进入reader.updateTailFiles()
for (TaildirMatcher taildir : taildirCache) {Map<String, String> headers = headerTable.row(taildir.getFileGroup());
for (File f : taildir.getMatchingFiles()) {long inode = getInode(f);
TailFile tf = tailFiles.get(inode);
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {long startPos = skipToEnd ? f.length() : 0;
tf = openFile(f, headers, inode, startPos);
遍历每一个正则表达式匹配对应的匹配器,每个匹配器去获取匹配的文件!taildir.getMatchingFiles()
List<File> getMatchingFiles() {
long now = TimeUnit.SECONDS.toMillis(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
long currentParentDirMTime = parentDir.lastModified();
List<File> result;
// calculate matched files if
// - we don't want to use cache (recalculate every time) OR
// - directory was clearly updated after the last check OR
// - last mtime change wasn't already checked for sure
// (system clock hasn't passed that second yet)
if (!cachePatternMatching ||
lastSeenParentDirMTime < currentParentDirMTime ||
!(currentParentDirMTime < lastCheckedTime)) {lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache(isRecursive));
lastSeenParentDirMTime = currentParentDirMTime;
lastCheckedTime = now;
}
return lastMatchedFiles;
}
可以看到 getMatchingFilesNoCache(isRecursive)
就是获取匹配的文件的方法,也就是需要修改的方法了!
ps:这里的 isRecursive
是我加的~
点进去:
private List<File> getMatchingFilesNoCache() {List<File> result = Lists.newArrayList();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {for (Path entry : stream) {result.add(entry.toFile());
}
} catch (IOException e) {
logger.error("I/O exception occurred while listing parent directory." +
"Files already matched will be returned." + parentDir.toPath(), e);
}
return result;
}
源码是用了 Files.newDirectoryStream(parentDir.toPath(), fileFilter))
,将父目录下符合正则表达式的文件都添加到一个迭代器里。(这里还用了try (...)
的语法糖)
找到地方了,开始改!
我在这个 getMatchingFilesNoCache()
方法下面下了一个重载的方法, 可增加扩展性:
private List<File> getMatchingFilesNoCache(boolean recursion) {if (!recursion) {return getMatchingFilesNoCache();
}
List<File> result = Lists.newArrayList();
// 使用非递归的方式遍历文件夹
Queue<File> dirs = new ArrayBlockingQueue<>(10);
dirs.offer(parentDir);
while (dirs.size() > 0) {File dir = dirs.poll();
try {DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath(), fileFilter);
stream.forEach(path -> result.add(path.toFile()));
} catch (IOException e) {
logger.error("I/O exception occurred while listing parent directory." +
"Files already matched will be returned. (recursion)" + parentDir.toPath(), e);
}
File[] dirList = dir.listFiles();
assert dirList != null;
for (File f : dirList) {if (f.isDirectory()) {dirs.add(f);
}
}
}
return result;
}
我使用了非递归的方式遍历文件夹,就是树到队列的转换。
到这里,核心部分就改完了。接下来要处理这个 recursion
的参数
华丽的分割线后,顺腾摸瓜!
一路改构造方法,添加这个参数,最终参数从哪来呢?
flume 的 source 启动时会调用 configure
方法,将 Context
中的内容配置进 reader
等对象中。isRecursive = context.getBoolean(RECURSIVE, DEFAULT_RECURSIVE);
context
从 TaildirSourceConfigurationConstants
中获取配置名和默认值
/**
* Whether to support recursion. */
public static final String RECURSIVE = "recursive";
public static final boolean DEFAULT_RECURSIVE = false;
这里的 recursive
也就是 flume 配置文件里配置项了
# Whether to support recusion
a1.sources.r1.recursive = true
大功告成,打包试试!
用 maven 只对这一个 module 打包。我把这个 module 的 pom 改了下 artifactId
,加上了自己名字作个纪念,哈哈
可惜 pom 里面不能写中文……
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-taildir-source-recursive-by-Wish000</artifactId>
<name>Flume Taildir Source</name>
执行 package 将其放在 flume 的 lib 下,替换原来的 flume-taildir-source***.jar
启动,测试,成功!
具体代码见 GitHub 地址:https://github.com/Wish000/me…