Flume内置拦截器

官网文档 http://flume.apache.org/FlumeUserGuide.html
Timestamp Interceptor、Host Interceptor、Static Interceptor等等,能够间接拿来用,
以Timestamp Interceptor为例,在header中增加一个工夫戳:
1)创立配置文件
vim timestramp_interceptor_demo.conf

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe interceptora1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

2)启动flume

flume-ng agent -n a1 -c conf/ -f jobs/timestramp_interceptor_demo.conf -Dflume.root.logger=INFO,console

3)另启一个终端测试

[v2admin@hadoop10 ~]$ nc localhost 44444hello worldOK

4)在监听处察看

2020-12-26 17:31:33,919 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{timestamp=1608975093917} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }  // headers中能够看到工夫戳

Flume自定义拦截器

意识Interceptor接口

public interface Interceptor {  /**  * 运行前的初始化,个别不须要实现  */  public void initialize();    /**  * 解决单个event  */  public Event intercept(Event event);  /**  * 批量解决event  */  public List<Event> intercept(List<Event> events);  /**  * 退出时做一些敞开工作  */  public void close();    /**  * 构建Interceptor对象,内部应用这个Builder来获取Interceptor对象。  */  public interface Builder extends Configurable {    public Interceptor build();  }}

自定义一个拦截器

  • 需要

有这样一server,产生这样的日志信息

qq.com 2020-11-10 11:10 XXXXXXXXXXXXXXXXXXXXXXbaidu.com 2020-10 11:11 XXXXXXXXXXXXXXXXXXXXXx.....

flume采集数据,实现辨别不同域名的内容,落在hdfs上
那么须要自定义一个拦截器

public class AddHeaderDomainIntecreptor implements Interceptor {    @Override    public void initialize() {    }    @Override    public Event intercept(Event event) {        Map<String, String> headers = event.getHeaders();        byte[] body = event.getBody();        String strBody = null;        try {            strBody = new String(body,"utf-8");        } catch (UnsupportedEncodingException e) {            e.printStackTrace();        }        String[] s = strBody.split(" ");        headers.put("domain",s[0]);        return event;    }    @Override    public List<Event> intercept(List<Event> list) {        for (Event event : list) {            intercept(event);        }        return list;    }    @Override    public void close() {    }    public static class Builder implements Interceptor.Builder{        @Override        public Interceptor build() {            return new AddHeaderDomainIntecreptor();        }        @Override        public void configure(Context context) {        }    }}