实时流接入数仓,根本在大公司都会有,在Flume1.8当前反对taildir source, 其有以下几个特点,而被宽泛应用:

1.应用正则表达式匹配目录中的文件名
2.监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
3.高牢靠,不会失落数据
4.不会对跟踪文件有任何解决,不会重命名也不会删除
5.不反对Windows,不能读二进制文件。反对按行读取文本文件

本文以开源Flume流为例,介绍流接入HDFS ,前面在其下面建设ods层表面。

1.1 taildir source配置

a1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log

1.2 hdfs sink 配置

a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/a1.sinks.k1.hdfs.filePrefix = startlog.# 配置文件滚动形式(文件大小32M)a1.sinks.k1.hdfs.rollSize = 33554432a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.idleTimeout = 0a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数a1.sinks.k1.hdfs.batchSize = 100# 应用本地工夫a1.sinks.k1.hdfs.useLocalTimeStamp = true 

1.3 Agent的配置

a1.sources = r1a1.sinks = k1a1.channels = c1# taildir sourcea1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log# memorychannela1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 2000# hdfs sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/a1.sinks.k1.hdfs.filePrefix = startlog.# 配置文件滚动形式(文件大小32M)a1.sinks.k1.hdfs.rollSize = 33554432a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.idleTimeout = 0a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数a1.sinks.k1.hdfs.batchSize = 1000# 应用本地工夫a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1 

/opt/hoult/servers/conf/flume-log2hdfs.conf

1.4 启动

flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,consoleexport JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"# 要想使配置文件失效,还要在命令行中指定配置文件目录flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console

$FLUME_HOME/conf/flume-env.sh加上面的参数,否则会报谬误如下:

1.5 应用自定义拦截器解决Flume Agent替换本地工夫为日志外面的工夫戳

应用netcat source → logger sink来测试

# a1是agent的名称。source、channel、sink的名称别离为:r1 c1 k1a1.sources = r1a1.channels = c1a1.sinks = k1# sourcea1.sources.r1.type = netcata1.sources.r1.bind = linux121a1.sources.r1.port = 9999a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder# channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100# sinka1.sinks.k1.type = logger# source、channel、sink之间的关系a1.sources.r1.channels = c1a1.sinks.k1.channel = c1 

拦截器次要代码如下:

public class CustomerInterceptor implements Interceptor {    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");    @Override    public void initialize() {    }    @Override    public Event intercept(Event event) {        // 取得body的内容        String eventBody = new String(event.getBody(), Charsets.UTF_8);        // 获取header的内容        Map<String, String> headerMap = event.getHeaders();        final String[] bodyArr = eventBody.split("\\s+");        try {            String jsonStr = bodyArr[6];            if (Strings.isNullOrEmpty(jsonStr)) {                return null;            }            // 将 string 转成 json 对象            JSONObject jsonObject = JSON.parseObject(jsonStr);            String timestampStr = jsonObject.getString("time");            //将timestamp 转为工夫日期类型(格局 :yyyyMMdd)            long timeStamp = Long.valueOf(timestampStr);            String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));            headerMap.put("logtime", date);            event.setHeaders(headerMap);        } catch (Exception e) {            headerMap.put("logtime", "unknown");            event.setHeaders(headerMap);        }        return event;    }    @Override    public List<Event> intercept(List<Event> events) {        List<Event> out = new ArrayList<>();        for (Event event : events) {            Event outEvent = intercept(event);            if (outEvent != null) {                out.add(outEvent);            }        }        return out;    }    @Override    public void close() {    }    public static class Builder implements Interceptor.Builder {        @Override        public Interceptor build() {            return new CustomerInterceptor();        }        @Override        public void configure(Context context) {        }    }

启动

flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console## 测试telnet linux121 9999 

吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注