Flume内置拦截器
官网文档 http://flume.apache.org/FlumeUserGuide.html
Timestamp Interceptor、Host Interceptor、Static Interceptor等等,能够间接拿来用,
以Timestamp Interceptor为例,在header中增加一个工夫戳:
1)创立配置文件
vim timestramp_interceptor_demo.conf
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe interceptora1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
2)启动flume
flume-ng agent -n a1 -c conf/ -f jobs/timestramp_interceptor_demo.conf -Dflume.root.logger=INFO,console
3)另启一个终端测试
[v2admin@hadoop10 ~]$ nc localhost 44444hello worldOK
4)在监听处察看
2020-12-26 17:31:33,919 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{timestamp=1608975093917} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world } // headers中能够看到工夫戳
Flume自定义拦截器
意识Interceptor接口
public interface Interceptor { /** * 运行前的初始化,个别不须要实现 */ public void initialize(); /** * 解决单个event */ public Event intercept(Event event); /** * 批量解决event */ public List<Event> intercept(List<Event> events); /** * 退出时做一些敞开工作 */ public void close(); /** * 构建Interceptor对象,内部应用这个Builder来获取Interceptor对象。 */ public interface Builder extends Configurable { public Interceptor build(); }}
自定义一个拦截器
- 需要
有这样一server,产生这样的日志信息
qq.com 2020-11-10 11:10 XXXXXXXXXXXXXXXXXXXXXXbaidu.com 2020-10 11:11 XXXXXXXXXXXXXXXXXXXXXx.....
flume采集数据,实现辨别不同域名的内容,落在hdfs上
那么须要自定义一个拦截器
public class AddHeaderDomainIntecreptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); byte[] body = event.getBody(); String strBody = null; try { strBody = new String(body,"utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String[] s = strBody.split(" "); headers.put("domain",s[0]); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new AddHeaderDomainIntecreptor(); } @Override public void configure(Context context) { } }}