关于flume:二Flume-source

41次阅读

共计 9385 个字符,预计需要花费 24 分钟才能阅读完成。

1 Flume source 罕用类型

1.1 Avro Source

1.1.1 概述

文档定义:
Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies.

监听 Avro 端口,从 Avro Client 流中接管 Agents
当与另一个 (前一跳) 的 flume agent 内置的 Avro Sink 匹配时,塔可能够创立分层收集拓扑。

加粗为必须配置的属性

1.1.2 示例

创立配置文件
cd jobs
vim avro_demo1.conf

# 定义 agent 的 sources,sinks,channels 的名字
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置 source
a1.sources.s1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = hadoop10 # 这里也能够应用 ip 地址,我这里指定主机名,但要配置好 /etc/hosts
a1.sources.s1.port = 33333

#配置 channels
a1.channels.c1.type = memory

#配置 sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

#为 sources 和 sinks 绑定 channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动 flume

flume-ng agent -n a1 -c conf/ -f jobs/avro_demo1.conf -Dflume.root.logger=INFO,console

另启动一个终端,通过 flume 提供的 avro 客户端向指定机器指定端口发送日志信息

echo "hello world" > avro_log.txt
flume-ng avro-client -c conf/ -H hadoop10 -p 33333 -F avro_log.txt

在监听处察看

2020-12-26 10:07:58,494 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 => /192.168.122.10:33333] OPEN
2020-12-26 10:07:58,495 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 => /192.168.122.10:33333] BOUND: /192.168.122.10:33333
2020-12-26 10:07:58,495 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 => /192.168.122.10:33333] CONNECTED: /192.168.122.10:51714
2020-12-26 10:07:58,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 64                   hello word }
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 :> /192.168.122.10:33333] DISCONNECTED
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 :> /192.168.122.10:33333] UNBOUND
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 :> /192.168.122.10:33333] CLOSED
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.122.10:51714 disconnected.

1.2 Exec Source

1.2.1 概述

文档定义
Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as date will probably not – the former two commands produce streams of data where as the latter produces a single event and exits.

ExecSource 它通过配置,设定一个 Unix(linux)命令,而后通过这个命令一直输入数据,如果过程退出了,那 ExecSource 也跟着一起退出,不再产生数据。

1.2.2 示例

1)创立配置文件
touch exec_demo.conf
vim exec_demo.conf

#Name the components on this agent  
a1.sources= s1  
a1.sinks= k1  
a1.channels= c1  
   
#配置 sources
a1.sources.s1.type = exec  
a1.sources.s1.command = tail -F /home/v2admin/exec_demo.log  
 

#配置 channel
a1.channels.c1.type= memory
   
#配置 sinks 
a1.sinks.k1.type= logger

#关联 channel
a1.sources.s1.channels = c1   
a1.sinks.k1.channel= c1  

2)在 /home/v2admin 下定义个日志文件
touch exec_demo.log
3) 启动 flume
flume-ng agent -n a1 -c conf/ -f job/exec_demo.conf -Dflume.root.logger=INFO,console
4)另起一个终端,在 exec_demo.log 中追加内容

[v2admin@hadoop10 ~]$ echo "hello world" >> exec_demo.log 
[v2admin@hadoop10 ~]$ echo "this is my demo" >> exec_demo.log 

5)在监听窗口察看日志

2020-12-26 10:30:12,616 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }
2020-12-26 10:30:37,329 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {headers:{} body: 74 68 69 73 20 69 73 20 6D 79 20 64 65 6D 6F    this is my demo }

1.3 Spooling Directory Source

1.3.1 概述

官网

This source lets you ingest data by placing files to be ingested into a“spooling”directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used to keep track of processed files.
Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability, only immutable, uniquely-named files must be dropped into the spooling directory. Flume tries to detect these problem conditions and will fail loudly if they are violated:

1.If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
2.If a file name is reused at a later time, Flume will print an error to its log file and stop processing.

To avoid the above issues, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory.
Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components.

Spooling Directory Source 可能监测配置的目录下新增的文件,并将文件中的数据读取进去,然而它不适宜对实时追加日志的文件进行监听并同步。
加粗属性为必要配置,其余可选

1.3.2 示例

1)创立配置文件
touch spooldir_demo.conf
vim spooldir_demo.conf

# 定义 agent 的 sources,sinks,channels 的名字
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置 source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /home/v2admin/demo
a1.sources.s1.fileHeader = true

#配置 channels
a1.channels.c1.type = memory

#配置 sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

#为 sources 和 sinks 绑定 channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2)在 /home/v2admin 创立 demo 目录
mkdir demo
3) 启动 flume

flume-ng agent -n a1 -c conf/ -f jobs/spooldir_demo.conf -Dflume.root.logger=INFO,console

4)另启一个终端

cd /home/v2admin/demo
echo "hello world" >> spooldir.log

5)在监听出察看

2020-12-26 11:28:40,815 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2020-12-26 11:28:40,815 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /home/v2admin/demo/spooldir.log to /home/v2admin/demo/spooldir.log.COMPLETED
2020-12-26 11:28:40,817 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {headers:{file=/home/v2admin/demo/spooldir.log} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }

6)spooldir 目录下的文件不能够再次编辑

// 再次执行下
echo "hello world" >> spooldir.log 

察看

 [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:296)] FATAL: Spool Directory source s1: {spoolDir: /home/v2admin/demo}: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /home/v2admin/demo/spooldir.log.COMPLETED
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:528)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:475)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:386)
    at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:263)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1.4 Taildir Source

1.4.1 概述

Taildir Source 适宜用于监听多个实时追加的文件,并且可能实现断点续传。

1.4.2 示例

1)创立配置文件
vim taildir_demo.conf

#Name the components on this agent  
a1.sources= s1  
a1.sinks= k1  
a1.channels= c1  
   
#配置 sources
a1.sources.s1.type = TAILDIR
a1.sources.s1.positionFile = /home/v2admin/demo/pos.log
a1.sources.s1.filegroups = f
a1.sources.s1.filegroups.f = /home/v2admin/demo/f/.*.log.*
 

#配置 channel
a1.channels.c1.type= memory
   
#配置 sinks 
a1.sinks.k1.type= logger

#关联 channel
a1.sources.s1.channels = c1   
a1.sinks.k1.channel= c1  

2)在 /home/v2admin/demo 目录创立 f 目录
mkdir f
3) 启动 flume

flume-ng agent -n a1 -c conf/ -f jobs/taildir_demo.conf -Dflume.root.logger=INFO,console

4)进入 /home/v2admin/demo/ f 目录

echo "hello world" > helloworld.log
echo "this is a demo" >> helloworld.log

5)在监听处观看

2020-12-26 11:55:45,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }
2020-12-26 11:57:03,089 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {headers:{} body: 74 68 69 73 20 69 73 20 61 20 64 65 6D 6F       this is a demo }

6)查看 pos.log

[{"inode":73056560,"pos":27,"file":"/home/v2admin/demo/f/helloworld.log"}]

其余更多参见文档 http://flume.apache.org/FlumeUserGuide.html

正文完
 0