实时流接入数仓,根本在大公司都会有,在Flume1.8
当前反对taildir source
, 其有以下几个特点,而被宽泛应用:
1.应用正则表达式匹配目录中的文件名
2.监控的文件中,一旦有数据写入,Flume
就会将信息写入到指定的Sink
3.高牢靠,不会失落数据
4.不会对跟踪文件有任何解决,不会重命名也不会删除
5.不反对Windows
,不能读二进制文件。反对按行读取文本文件
本文以开源Flume
流为例,介绍流接入HDFS
,前面在其下面建设ods
层表面。
1.1 taildir source配置
a1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log
1.2 hdfs sink 配置
a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/a1.sinks.k1.hdfs.filePrefix = startlog.# 配置文件滚动形式(文件大小32M)a1.sinks.k1.hdfs.rollSize = 33554432a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.idleTimeout = 0a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数a1.sinks.k1.hdfs.batchSize = 100# 应用本地工夫a1.sinks.k1.hdfs.useLocalTimeStamp = true
1.3 Agent的配置
a1.sources = r1a1.sinks = k1a1.channels = c1# taildir sourcea1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log# memorychannela1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 2000# hdfs sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/a1.sinks.k1.hdfs.filePrefix = startlog.# 配置文件滚动形式(文件大小32M)a1.sinks.k1.hdfs.rollSize = 33554432a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.idleTimeout = 0a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数a1.sinks.k1.hdfs.batchSize = 1000# 应用本地工夫a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
/opt/hoult/servers/conf/flume-log2hdfs.conf
1.4 启动
flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,consoleexport JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"# 要想使配置文件失效,还要在命令行中指定配置文件目录flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
要$FLUME_HOME/conf/flume-env.sh
加上面的参数,否则会报谬误如下:
1.5 应用自定义拦截器解决Flume Agent替换本地工夫为日志外面的工夫戳
应用netcat source → logger sink来测试
# a1是agent的名称。source、channel、sink的名称别离为:r1 c1 k1a1.sources = r1a1.channels = c1a1.sinks = k1# sourcea1.sources.r1.type = netcata1.sources.r1.bind = linux121a1.sources.r1.port = 9999a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder# channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100# sinka1.sinks.k1.type = logger# source、channel、sink之间的关系a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
拦截器次要代码如下:
public class CustomerInterceptor implements Interceptor { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); @Override public void initialize() { } @Override public Event intercept(Event event) { // 取得body的内容 String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取header的内容 Map<String, String> headerMap = event.getHeaders(); final String[] bodyArr = eventBody.split("\\s+"); try { String jsonStr = bodyArr[6]; if (Strings.isNullOrEmpty(jsonStr)) { return null; } // 将 string 转成 json 对象 JSONObject jsonObject = JSON.parseObject(jsonStr); String timestampStr = jsonObject.getString("time"); //将timestamp 转为工夫日期类型(格局 :yyyyMMdd) long timeStamp = Long.valueOf(timestampStr); String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); headerMap.put("logtime", date); event.setHeaders(headerMap); } catch (Exception e) { headerMap.put("logtime", "unknown"); event.setHeaders(headerMap); } return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> out = new ArrayList<>(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } }
启动
flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console## 测试telnet linux121 9999
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注