Flume的source选哪个?
taildir source首选!
1.断点还原 positionFile可以记录偏移量
2.可配置文件组,里面使用正则表达式配置多个要监控的文件
就凭第一点其他的source都被比下去了!
这么好的taildir source有一点不完美,不能支持递归监控文件夹。
所以就只能修改源代码了……好玩,我喜欢~

改源码,先读源码

Flume的taildir source启动会调用start()方法作初始化,里面创建一个ReliableTaildirEventReader,这里用到了建造者模式

@Overridepublic synchronized void start() {    logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);    try {        reader = new ReliableTaildirEventReader.Builder()                .filePaths(filePaths)                .headerTable(headerTable)                .positionFilePath(positionFilePath)                .skipToEnd(skipToEnd)                .addByteOffset(byteOffsetHeader)                .cachePatternMatching(cachePatternMatching)                .recursive(isRecursive)                .annotateFileName(fileHeader)                .fileNameHeader(fileHeaderKey)                .build();    } catch (IOException e) {        throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);    }    idleFileChecker = Executors.newSingleThreadScheduledExecutor(            new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());    idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),            idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);    positionWriter = Executors.newSingleThreadScheduledExecutor(            new ThreadFactoryBuilder().setNameFormat("positionWriter").build());    positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),            writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);    super.start();    logger.debug("TaildirSource started");    sourceCounter.start();}

taildir source属于PollableSource

/** * A {@link Source} that requires an external driver to poll to determine * whether there are {@linkplain Event events} that are available to ingest * from the source. * * @see org.apache.flume.source.EventDrivenSourceRunner */public interface PollableSource extends Source {    ...

这段注释的意思是PollableSource是需要一个外部驱动去查看有没有需要消费的事件,从而拉取事件,讲白了就是定时拉取。所以flume也不一定是真正实时的,只是隔一会儿不停地来查看事件而已。(与之相应的是另一种EventDrivenSourceRunner)
那么taildir source在定时拉取事件的时候是调用的process方法

@Overridepublic Status process() {    Status status = Status.READY;    try {        existingInodes.clear();        existingInodes.addAll(reader.updateTailFiles());        for (long inode : existingInodes) {            TailFile tf = reader.getTailFiles().get(inode);            if (tf.needTail()) {                tailFileProcess(tf, true);            }        }        closeTailFiles();        try {            TimeUnit.MILLISECONDS.sleep(retryInterval);        } catch (InterruptedException e) {            logger.info("Interrupted while sleeping");        }    } catch (Throwable t) {        logger.error("Unable to tail files", t);        status = Status.BACKOFF;    }    return status;}

重点就是下面这几行

existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
} }
reader.updateTailFiles()获取需要监控的文件,然后对每一个进行处理,查看最后修改时间,判定是否需要tail,需要tailtail
那么进入reader.updateTailFiles()
for (TaildirMatcher taildir : taildirCache) {      Map<String, String> headers = headerTable.row(taildir.getFileGroup());      for (File f : taildir.getMatchingFiles()) {        long inode = getInode(f);        TailFile tf = tailFiles.get(inode);        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {          long startPos = skipToEnd ? f.length() : 0;          tf = openFile(f, headers, inode, startPos);

遍历每一个正则表达式匹配对应的匹配器,每个匹配器去获取匹配的文件!taildir.getMatchingFiles()

List<File> getMatchingFiles() {    long now = TimeUnit.SECONDS.toMillis(            TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));    long currentParentDirMTime = parentDir.lastModified();    List<File> result;    // calculate matched files if    // - we don't want to use cache (recalculate every time) OR    // - directory was clearly updated after the last check OR    // - last mtime change wasn't already checked for sure    //   (system clock hasn't passed that second yet)    if (!cachePatternMatching ||            lastSeenParentDirMTime < currentParentDirMTime ||            !(currentParentDirMTime < lastCheckedTime)) {        lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache(isRecursive));        lastSeenParentDirMTime = currentParentDirMTime;        lastCheckedTime = now;    }    return lastMatchedFiles;}

可以看到getMatchingFilesNoCache(isRecursive)就是获取匹配的文件的方法,也就是需要修改的方法了!
ps:这里的isRecursive是我加的~
点进去:

private List<File> getMatchingFilesNoCache() {    List<File> result = Lists.newArrayList();    try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {        for (Path entry : stream) {            result.add(entry.toFile());        }    } catch (IOException e) {        logger.error("I/O exception occurred while listing parent directory. " +                "Files already matched will be returned. " + parentDir.toPath(), e);    }    return result;}

源码是用了Files.newDirectoryStream(parentDir.toPath(), fileFilter)),将父目录下符合正则表达式的文件都添加到一个迭代器里。(这里还用了try (...)的语法糖)


找到地方了,开始改!

我在这个getMatchingFilesNoCache()方法下面下了一个重载的方法, 可增加扩展性:

private List<File> getMatchingFilesNoCache(boolean recursion) {    if (!recursion) {        return getMatchingFilesNoCache();    }    List<File> result = Lists.newArrayList();    // 使用非递归的方式遍历文件夹    Queue<File> dirs = new ArrayBlockingQueue<>(10);    dirs.offer(parentDir);    while (dirs.size() > 0) {        File dir = dirs.poll();        try {            DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath(), fileFilter);            stream.forEach(path -> result.add(path.toFile()));        } catch (IOException e) {            logger.error("I/O exception occurred while listing parent directory. " +                    "Files already matched will be returned. (recursion)" + parentDir.toPath(), e);        }        File[] dirList = dir.listFiles();        assert dirList != null;        for (File f : dirList) {            if (f.isDirectory()) {                dirs.add(f);            }        }    }    return result;}

我使用了非递归的方式遍历文件夹,就是树到队列的转换。
到这里,核心部分就改完了。接下来要处理这个recursion的参数


华丽的分割线后,顺腾摸瓜!

一路改构造方法,添加这个参数,最终参数从哪来呢?
flume的source启动时会调用configure方法,将Context中的内容配置进reader等对象中。
isRecursive = context.getBoolean(RECURSIVE, DEFAULT_RECURSIVE);
contextTaildirSourceConfigurationConstants中获取配置名和默认值

  /**   * Whether to support recursion. */  public static final String RECURSIVE = "recursive";  public static final boolean DEFAULT_RECURSIVE = false;

这里的recursive也就是flume配置文件里配置项了

# Whether to support recusiona1.sources.r1.recursive = true

大功告成,打包试试!

用maven只对这一个module打包。我把这个module的pom改了下artifactId,加上了自己名字作个纪念,哈哈
可惜pom里面不能写中文……

<groupId>org.apache.flume.flume-ng-sources</groupId><artifactId>flume-taildir-source-recursive-by-Wish000</artifactId><name>Flume Taildir Source</name>

执行package将其放在flume的lib下,替换原来的flume-taildir-source***.jar
启动,测试,成功!

具体代码见GitHub地址:https://github.com/Wish000/me...