乐趣区

关于flume:Flume入门案例

Flume 入门案例

1 监控端口数据官网案例

1.1 案例需要:

应用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

1.2 需要剖析:

1.3 实现步骤:

1.3.1 装置 netcat 工具
sudo yum install -y nc
1.3.2 判断 44444 端口是否被占用
sudo netstat -tunlp | grep 44444
1.3.3 创立 Flume Agent 配置文件 flume-netcat-logger.conf

在 flume 目录下创立 job 文件夹并进入 job 文件夹。

mkdir job
cd job/

在 job 文件夹下创立 Flume Agent 配置文件 flume-netcat-logger.conf。

vim flume-netcat-logger.conf

在 flume-netcat-logger.conf 文件中增加如下内容。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置文件来源于官网手册 http://flume.apache.org/FlumeUserGuide.html
配置文件解析

1.3.4 先开启 flume 监听端口

第一种写法:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

第二种写法:

bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数阐明:

--conf/-c:示意配置文件存储在 conf/ 目录
--name/-n:示意给 agent 起名为 a1
--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。-Dflume.root.logger=INFO,console:- D 示意 flume 运行时动静批改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包含:log、info、warn、error。
1.3.5 应用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
hello 
atguigu
1.3.6 在 Flume 监听页面察看接收数据状况

2 实时监控单个文件

2.1 案例需要:

实时监控 Hive 日志,并上传到 HDFS 中

2.2 需要剖析:

实时读取本地文件到 hdfs

2.3 实现步骤:

2.3.1 Flume 要想将数据输入到 HDFS,必须持有 Hadoop 相干 jar 包


commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷贝到 /opt/module/flume/lib 文件夹下(flume lib 库下)。

2.3.2 创立 flume-file-hdfs.conf 文件

创立文件

vim flume-file-hdfs.conf

注:要想读取 Linux 零碎中的文件,就得依照 Linux 命令的规定执行命令。因为 Hive 日志在 Linux 零碎中所以读取文件的类型抉择:exec 即 execute 执行的意思。示意执行 Linux 命令来读取文件。
增加如下内容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否依照工夫滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少工夫单位创立一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#从新定义工夫单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否应用本地工夫戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 1000

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

留神:
对于所有与工夫相干的转义序列,Event Header 中必须存在以“timestamp”的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此办法会应用 TimestampInterceptor 主动增加 timestamp)。

a2.sinks.k2.hdfs.useLocalTimeStamp = true
配置文件解析

2.3.3 运行 Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
2.3.4 开启 Hadoop 和 Hive 并操作 Hive 产生日志
sbin/start-dfs.sh
sbin/start-yarn.sh

bin/hive
hive (default)>
2.3.5 在 HDFS 上查看文件。

2.4 Execsouce 总结

execsource 和异步的 source 一样,无奈在 source 向 channel 中放入 event 故障时,及时告诉客户端,暂停生成数据!
容易造成数据失落!

解决方案:
  • 须要在产生故障时,及时告诉客户端!
  • 如果客户端无奈暂停,必须有一个数据的缓存机制!
  • 如果心愿数据有强的可靠性保障,能够思考应用 SpoolingDirSource 或 TailDirSource 或本人写 Source 本人管制!

3 监控多个新文件

3.1 案例需要:

应用 Flume 监听整个目录的文件,并上传至 HDFS

3.2 需要剖析:

3.3 实现步骤:

3.3.1 创立配置文件 flume-dir-hdfs.conf

创立一个文件

vim flume-dir-hdfs.conf

增加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
#疏忽所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = \\S*\\.tmp

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否依照工夫滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少工夫单位创立一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#从新定义工夫单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否应用本地工夫戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可反对压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大略是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

监控多个新文件 - 配置文件解析

3.3.2 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

阐明:在应用 Spooling Directory Source 时
不要在监控目录中创立并继续批改文件
上传实现的文件会以.COMPLETED 结尾
被监控文件夹每 500 毫秒扫描一次文件变动

3.3.3 向 upload 文件夹中增加文件

在 /opt/module/flume 目录下创立 upload 文件夹

[atguigu@hadoop102 flume]$ mkdir upload

向 upload 文件夹中增加文件

touch atguigu.txt
touch atguigu.tmp
touch atguigu.log
3.3.4 查看 HDFS 上的数据

3.3.5 期待 1s,再次查问 upload 文件夹
[atguigu@hadoop102 upload]$ ll
总用量 0
-rw-rw-r--. 1 atguigu atguigu 0 5 月  20 22:31 atguigu.log.COMPLETED
-rw-rw-r--. 1 atguigu atguigu 0 5 月  20 22:31 atguigu.tmp
-rw-rw-r--. 1 atguigu atguigu 0 5 月  20 22:31 atguigu.txt.COMPLETED

3.4 SpoolingDirSource 总结

  • SpoolingDirSource 指定本地磁盘的一个目录为 ”Spooling(主动收集)” 的目录!这个 source 能够读取目录中新增的文件,将文件的内容封装为 event!
  • SpoolingDirSource 在读取一整个文件到 channel 之后,它会采取策略,要么删除文件(是否能够删除取决于配置),要么对文件过程一个实现状态的重命名,这样能够保障 source 继续监控新的文件!

  • SpoolingDirSource 和 execsource 不同
    SpoolingDirSource 是牢靠的!即便 flume 被杀死或重启,仍然不丢数据!然而为了保障这个个性,付出的代价是,一旦 flume 发现以下状况,flume 就会报错,进行!

    1. 一个文件曾经被放入目录,在采集文件时,不能被批改
    2. 文件的名在放入目录后又被从新应用(呈现了重名的文件)

要求:必须曾经关闭的文件能力放入到 SpoolingDirSource,在同一个 SpoolingDirSource 中都不能呈现重名的文件!

应用:

必须配置

type    –    The component type name, needs to be spooldir.
spoolDir    –    The directory from which to read files from.

4 实时监控多个文件

Exec source 实用于监控一个实时追加的文件,但不能保证数据不失落;Spooldir Source 可能保证数据不失落,且可能实现断点续传,但提早较高,不能实时监控;而 Taildir Source 既可能实现断点续传,又能够保证数据不失落,还可能进行实时监控。

4.1 案例需要:

应用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

4.2 需要剖析:

4.3 实现步骤:

4.3.1 创立配置文件 flume-taildir-hdfs.conf

创立一个文件

[atguigu@hadoop102 job]$ vim flume-taildir-hdfs.conf

增加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否依照工夫滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少工夫单位创立一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#从新定义工夫单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否应用本地工夫戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可反对压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大略是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
实时监控多个文件 - 配置文件解析

4.3.2 启动监控文件夹命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
4.3.3 向 files 文件夹中追加内容

在 /opt/module/flume 目录下创立 files 文件夹

[atguigu@hadoop102 flume]$ mkdir files

向 upload 文件夹中增加文件

[atguigu@hadoop102 files]$ echo hello >> file1.txt
[atguigu@hadoop102 files]$ echo atguigu >> file2.txt
4.3.4 查看 HDFS 上的数据

Taildir 阐明:

Taildir Source 保护了一个 json 格局的 position File,其会定期的往 position File 中更新每个文件读取到的最新的地位,因而可能实现断点续传。Position File 的格局如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

注:Linux 中贮存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来辨认不同的文件,Unix/Linux 零碎外部不应用文件名,而应用 inode 号码来辨认文件。

4.4 TailDirSource 总结

flume ng 1.7 版本后提供!

常见问题:

TailDirSource 采集的文件,不能随便重命名!如果日志在正在写入时,名称为 xxxx.tmp,写入实现后,滚动,改名为 xxx.log,此时一旦匹配规定能够匹配上述名称,就会产生数据的反复采集!

  • Taildir Source 能够读取多个文件最新追加写入的内容!
  • Taildir Source 是牢靠的,即便 flume 呈现了故障或挂掉。Taildir Source 在工作时,会将读取文件的最初的地位记录在一个 json 文件中,一旦 agent 重启,会从之前曾经记录的地位,继续执行 tail 操作!
  • Json 文件中,地位是能够批改,批改后,Taildir Source 会从批改的地位进行 tail 操作!如果 JSON 文件失落了,此时会从新从每个文件的第一行,从新读取,这会造成数据的反复!
  • Taildir Source 目前只能读文本文件!
必须配置:
channels    –     
type    –    The component type name, needs to be TAILDIR.
filegroups    –    Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName>    –    Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
退出移动版