Flume入门案例

1 监控端口数据官网案例

1.1 案例需要:

应用Flume监听一个端口,收集该端口数据,并打印到控制台。

1.2 需要剖析:

1.3 实现步骤:

1.3.1 装置netcat工具
sudo yum install -y nc
1.3.2 判断44444端口是否被占用
sudo netstat -tunlp | grep 44444
1.3.3 创立Flume Agent配置文件flume-netcat-logger.conf

在flume目录下创立job文件夹并进入job文件夹。

mkdir jobcd job/

在job文件夹下创立Flume Agent配置文件flume-netcat-logger.conf。

vim flume-netcat-logger.conf

在flume-netcat-logger.conf文件中增加如下内容。

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
注:配置文件来源于官网手册http://flume.apache.org/FlumeUserGuide.html
配置文件解析

1.3.4 先开启flume监听端口

第一种写法:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

第二种写法:

bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数阐明:

--conf/-c:示意配置文件存储在conf/目录--name/-n:示意给agent起名为a1--conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。-Dflume.root.logger=INFO,console :-D示意flume运行时动静批改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包含:log、info、warn、error。
1.3.5 应用netcat工具向本机的44444端口发送内容
nc localhost 44444hello atguigu
1.3.6 在Flume监听页面察看接收数据状况

2 实时监控单个文件

2.1 案例需要:

实时监控Hive日志,并上传到HDFS中

2.2 需要剖析:

实时读取本地文件到hdfs

2.3 实现步骤:

2.3.1 Flume要想将数据输入到HDFS,必须持有Hadoop相干jar包


commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷贝到/opt/module/flume/lib文件夹下(flume lib库下)。

2.3.2 创立flume-file-hdfs.conf文件

创立文件

vim flume-file-hdfs.conf

注:要想读取Linux零碎中的文件,就得依照Linux命令的规定执行命令。因为Hive日志在Linux零碎中所以读取文件的类型抉择:exec即execute执行的意思。示意执行Linux命令来读取文件。
增加如下内容

# Name the components on this agenta2.sources = r2a2.sinks = k2a2.channels = c2# Describe/configure the sourcea2.sources.r2.type = execa2.sources.r2.command = tail -F /opt/module/hive/logs/hive.loga2.sources.r2.shell = /bin/bash -c# Describe the sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H#上传文件的前缀a2.sinks.k2.hdfs.filePrefix = logs-#是否依照工夫滚动文件夹a2.sinks.k2.hdfs.round = true#多少工夫单位创立一个新的文件夹a2.sinks.k2.hdfs.roundValue = 1#从新定义工夫单位a2.sinks.k2.hdfs.roundUnit = hour#是否应用本地工夫戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize = 100#多久生成一个新的文件a2.sinks.k2.hdfs.rollInterval = 60#设置每个文件的滚动大小a2.sinks.k2.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 10000a2.channels.c2.transactionCapacity = 1000# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2

留神:
对于所有与工夫相干的转义序列,Event Header中必须存在以 “timestamp”的key(除非hdfs.useLocalTimeStamp设置为true,此办法会应用TimestampInterceptor主动增加timestamp)。

a2.sinks.k2.hdfs.useLocalTimeStamp = true
配置文件解析

2.3.3 运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
2.3.4 开启Hadoop和Hive并操作Hive产生日志
sbin/start-dfs.shsbin/start-yarn.shbin/hivehive (default)>
2.3.5 在HDFS上查看文件。

2.4 Execsouce 总结

execsource和异步的source一样,无奈在source向channel中放入event故障时,及时告诉客户端,暂停生成数据!
容易造成数据失落!

解决方案:
  • 须要在产生故障时,及时告诉客户端!
  • 如果客户端无奈暂停,必须有一个数据的缓存机制!
  • 如果心愿数据有强的可靠性保障,能够思考应用SpoolingDirSource或TailDirSource或本人写Source本人管制!

3 监控多个新文件

3.1 案例需要:

应用Flume监听整个目录的文件,并上传至HDFS

3.2 需要剖析:

3.3 实现步骤:

3.3.1 创立配置文件flume-dir-hdfs.conf

创立一个文件

vim flume-dir-hdfs.conf

增加如下内容

a3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = spooldira3.sources.r3.spoolDir = /opt/module/flume/uploada3.sources.r3.fileSuffix = .COMPLETED#疏忽所有以.tmp结尾的文件,不上传a3.sources.r3.ignorePattern = \\S*\\.tmp# Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H#上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload-#是否依照工夫滚动文件夹a3.sinks.k3.hdfs.round = true#多少工夫单位创立一个新的文件夹a3.sinks.k3.hdfs.roundValue = 1#从新定义工夫单位a3.sinks.k3.hdfs.roundUnit = hour#是否应用本地工夫戳a3.sinks.k3.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a3.sinks.k3.hdfs.batchSize = 100#设置文件类型,可反对压缩a3.sinks.k3.hdfs.fileType = DataStream#多久生成一个新的文件a3.sinks.k3.hdfs.rollInterval = 60#设置每个文件的滚动大小大略是128Ma3.sinks.k3.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 10000a3.channels.c3.transactionCapacity = 1000# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3

监控多个新文件-配置文件解析

3.3.2 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

阐明:在应用Spooling Directory Source时
不要在监控目录中创立并继续批改文件
上传实现的文件会以.COMPLETED结尾
被监控文件夹每500毫秒扫描一次文件变动

3.3.3 向upload文件夹中增加文件

在/opt/module/flume目录下创立upload文件夹

[atguigu@hadoop102 flume]$ mkdir upload

向upload文件夹中增加文件

touch atguigu.txttouch atguigu.tmptouch atguigu.log
3.3.4 查看HDFS上的数据

3.3.5 期待1s,再次查问upload文件夹
[atguigu@hadoop102 upload]$ ll总用量 0-rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.log.COMPLETED-rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.tmp-rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.txt.COMPLETED

3.4 SpoolingDirSource 总结

  • SpoolingDirSource指定本地磁盘的一个目录为"Spooling(主动收集)"的目录!这个source能够读取目录中新增的文件,将文件的内容封装为event!
  • SpoolingDirSource在读取一整个文件到channel之后,它会采取策略,要么删除文件(是否能够删除取决于配置),要么对文件过程一个实现状态的重命名,这样能够保障source继续监控新的文件!

  • SpoolingDirSource和execsource不同
    SpoolingDirSource是牢靠的!即便flume被杀死或重启,仍然不丢数据!然而为了保障这个个性,付出的代价是,一旦flume发现以下状况,flume就会报错,进行!

    1. 一个文件曾经被放入目录,在采集文件时,不能被批改
    2. 文件的名在放入目录后又被从新应用(呈现了重名的文件)

要求: 必须曾经关闭的文件能力放入到SpoolingDirSource,在同一个SpoolingDirSource中都不能呈现重名的文件!

应用:

必须配置

type    –    The component type name, needs to be spooldir.spoolDir    –    The directory from which to read files from.

4 实时监控多个文件

Exec source实用于监控一个实时追加的文件,但不能保证数据不失落;Spooldir Source可能保证数据不失落,且可能实现断点续传,但提早较高,不能实时监控;而Taildir Source既可能实现断点续传,又能够保证数据不失落,还可能进行实时监控。

4.1 案例需要:

应用Flume监听整个目录的实时追加文件,并上传至HDFS

4.2 需要剖析:

4.3 实现步骤:

4.3.1 创立配置文件flume-taildir-hdfs.conf

创立一个文件

[atguigu@hadoop102 job]$ vim flume-taildir-hdfs.conf

增加如下内容

a3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = TAILDIRa3.sources.r3.positionFile = /opt/module/flume/tail_dir.jsona3.sources.r3.filegroups = f1a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*# Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H#上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload-#是否依照工夫滚动文件夹a3.sinks.k3.hdfs.round = true#多少工夫单位创立一个新的文件夹a3.sinks.k3.hdfs.roundValue = 1#从新定义工夫单位a3.sinks.k3.hdfs.roundUnit = hour#是否应用本地工夫戳a3.sinks.k3.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a3.sinks.k3.hdfs.batchSize = 100#设置文件类型,可反对压缩a3.sinks.k3.hdfs.fileType = DataStream#多久生成一个新的文件a3.sinks.k3.hdfs.rollInterval = 60#设置每个文件的滚动大小大略是128Ma3.sinks.k3.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 10000a3.channels.c3.transactionCapacity = 1000# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
实时监控多个文件-配置文件解析

4.3.2 启动监控文件夹命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
4.3.3 向files文件夹中追加内容

在/opt/module/flume目录下创立files文件夹

[atguigu@hadoop102 flume]$ mkdir files

向upload文件夹中增加文件

[atguigu@hadoop102 files]$ echo hello >> file1.txt[atguigu@hadoop102 files]$ echo atguigu >> file2.txt
4.3.4 查看HDFS上的数据

Taildir阐明:

Taildir Source保护了一个json格局的position File,其会定期的往position File中更新每个文件读取到的最新的地位,因而可能实现断点续传。Position File的格局如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

注:Linux中贮存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来辨认不同的文件,Unix/Linux零碎外部不应用文件名,而应用inode号码来辨认文件。

4.4 TailDirSource 总结

flume ng 1.7版本后提供!

常见问题:

TailDirSource采集的文件,不能随便重命名!如果日志在正在写入时,名称为 xxxx.tmp,写入实现后,滚动,改名为xxx.log,此时一旦匹配规定能够匹配上述名称,就会产生数据的反复采集!

  • Taildir Source 能够读取多个文件最新追加写入的内容!
  • Taildir Source是牢靠的,即便flume呈现了故障或挂掉。Taildir Source在工作时,会将读取文件的最初的地位记录在一个json文件中,一旦agent重启,会从之前曾经记录的地位,继续执行tail操作!
  • Json文件中,地位是能够批改,批改后,Taildir Source会从批改的地位进行tail操作!如果JSON文件失落了,此时会从新从每个文件的第一行,从新读取,这会造成数据的反复!
  • Taildir Source目前只能读文本文件!
必须配置:
channels    –     type    –    The component type name, needs to be TAILDIR.filegroups    –    Space-separated list of file groups. Each file group indicates a set of files to be tailed.filegroups.<filegroupName>    –    Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.