共计 8186 个字符,预计需要花费 21 分钟才能阅读完成。
Flume 入门案例
1 监控端口数据官网案例
1.1 案例需要:
应用 Flume 监听一个端口,收集该端口数据,并打印到控制台。
1.2 需要剖析:
1.3 实现步骤:
1.3.1 装置 netcat 工具
sudo yum install -y nc
1.3.2 判断 44444 端口是否被占用
sudo netstat -tunlp | grep 44444
1.3.3 创立 Flume Agent 配置文件 flume-netcat-logger.conf
在 flume 目录下创立 job 文件夹并进入 job 文件夹。
mkdir job
cd job/
在 job 文件夹下创立 Flume Agent 配置文件 flume-netcat-logger.conf。
vim flume-netcat-logger.conf
在 flume-netcat-logger.conf 文件中增加如下内容。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置文件来源于官网手册 http://flume.apache.org/FlumeUserGuide.html
配置文件解析
1.3.4 先开启 flume 监听端口
第一种写法:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
第二种写法:
bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数阐明:
--conf/-c:示意配置文件存储在 conf/ 目录
--name/-n:示意给 agent 起名为 a1
--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。-Dflume.root.logger=INFO,console:- D 示意 flume 运行时动静批改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包含:log、info、warn、error。
1.3.5 应用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
hello
atguigu
1.3.6 在 Flume 监听页面察看接收数据状况
2 实时监控单个文件
2.1 案例需要:
实时监控 Hive 日志,并上传到 HDFS 中
2.2 需要剖析:
实时读取本地文件到 hdfs
2.3 实现步骤:
2.3.1 Flume 要想将数据输入到 HDFS,必须持有 Hadoop 相干 jar 包
将
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷贝到 /opt/module/flume/lib 文件夹下(flume lib 库下)。
2.3.2 创立 flume-file-hdfs.conf 文件
创立文件
vim flume-file-hdfs.conf
注:要想读取 Linux 零碎中的文件,就得依照 Linux 命令的规定执行命令。因为 Hive 日志在 Linux 零碎中所以读取文件的类型抉择:exec 即 execute 执行的意思。示意执行 Linux 命令来读取文件。
增加如下内容
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否依照工夫滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少工夫单位创立一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#从新定义工夫单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否应用本地工夫戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 1000
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
留神:
对于所有与工夫相干的转义序列,Event Header 中必须存在以“timestamp”的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此办法会应用 TimestampInterceptor 主动增加 timestamp)。
a2.sinks.k2.hdfs.useLocalTimeStamp = true
配置文件解析
2.3.3 运行 Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
2.3.4 开启 Hadoop 和 Hive 并操作 Hive 产生日志
sbin/start-dfs.sh
sbin/start-yarn.sh
bin/hive
hive (default)>
2.3.5 在 HDFS 上查看文件。
2.4 Execsouce 总结
execsource 和异步的 source 一样,无奈在 source 向 channel 中放入 event 故障时,及时告诉客户端,暂停生成数据!
容易造成数据失落!
解决方案:
- 须要在产生故障时,及时告诉客户端!
- 如果客户端无奈暂停,必须有一个数据的缓存机制!
- 如果心愿数据有强的可靠性保障,能够思考应用 SpoolingDirSource 或 TailDirSource 或本人写 Source 本人管制!
3 监控多个新文件
3.1 案例需要:
应用 Flume 监听整个目录的文件,并上传至 HDFS
3.2 需要剖析:
3.3 实现步骤:
3.3.1 创立配置文件 flume-dir-hdfs.conf
创立一个文件
vim flume-dir-hdfs.conf
增加如下内容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
#疏忽所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = \\S*\\.tmp
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否依照工夫滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少工夫单位创立一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#从新定义工夫单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否应用本地工夫戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可反对压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大略是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
监控多个新文件 - 配置文件解析
3.3.2 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
阐明:在应用 Spooling Directory Source 时
不要在监控目录中创立并继续批改文件
上传实现的文件会以.COMPLETED 结尾
被监控文件夹每 500 毫秒扫描一次文件变动
3.3.3 向 upload 文件夹中增加文件
在 /opt/module/flume 目录下创立 upload 文件夹
[atguigu@hadoop102 flume]$ mkdir upload
向 upload 文件夹中增加文件
touch atguigu.txt
touch atguigu.tmp
touch atguigu.log
3.3.4 查看 HDFS 上的数据
3.3.5 期待 1s,再次查问 upload 文件夹
[atguigu@hadoop102 upload]$ ll
总用量 0
-rw-rw-r--. 1 atguigu atguigu 0 5 月 20 22:31 atguigu.log.COMPLETED
-rw-rw-r--. 1 atguigu atguigu 0 5 月 20 22:31 atguigu.tmp
-rw-rw-r--. 1 atguigu atguigu 0 5 月 20 22:31 atguigu.txt.COMPLETED
3.4 SpoolingDirSource 总结
- SpoolingDirSource 指定本地磁盘的一个目录为 ”Spooling(主动收集)” 的目录!这个 source 能够读取目录中新增的文件,将文件的内容封装为 event!
-
SpoolingDirSource 在读取一整个文件到 channel 之后,它会采取策略,要么删除文件(是否能够删除取决于配置),要么对文件过程一个实现状态的重命名,这样能够保障 source 继续监控新的文件!
-
SpoolingDirSource 和 execsource 不同
SpoolingDirSource 是牢靠的!即便 flume 被杀死或重启,仍然不丢数据!然而为了保障这个个性,付出的代价是,一旦 flume 发现以下状况,flume 就会报错,进行!- 一个文件曾经被放入目录,在采集文件时,不能被批改
- 文件的名在放入目录后又被从新应用(呈现了重名的文件)
要求:必须曾经关闭的文件能力放入到 SpoolingDirSource,在同一个 SpoolingDirSource 中都不能呈现重名的文件!
应用:
必须配置:
type – The component type name, needs to be spooldir.
spoolDir – The directory from which to read files from.
4 实时监控多个文件
Exec source 实用于监控一个实时追加的文件,但不能保证数据不失落;Spooldir Source 可能保证数据不失落,且可能实现断点续传,但提早较高,不能实时监控;而 Taildir Source 既可能实现断点续传,又能够保证数据不失落,还可能进行实时监控。
4.1 案例需要:
应用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
4.2 需要剖析:
4.3 实现步骤:
4.3.1 创立配置文件 flume-taildir-hdfs.conf
创立一个文件
[atguigu@hadoop102 job]$ vim flume-taildir-hdfs.conf
增加如下内容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否依照工夫滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少工夫单位创立一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#从新定义工夫单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否应用本地工夫戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可反对压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大略是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
实时监控多个文件 - 配置文件解析
4.3.2 启动监控文件夹命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
4.3.3 向 files 文件夹中追加内容
在 /opt/module/flume 目录下创立 files 文件夹
[atguigu@hadoop102 flume]$ mkdir files
向 upload 文件夹中增加文件
[atguigu@hadoop102 files]$ echo hello >> file1.txt
[atguigu@hadoop102 files]$ echo atguigu >> file2.txt
4.3.4 查看 HDFS 上的数据
Taildir 阐明:
Taildir Source 保护了一个 json 格局的 position File,其会定期的往 position File 中更新每个文件读取到的最新的地位,因而可能实现断点续传。Position File 的格局如下:
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
注:Linux 中贮存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来辨认不同的文件,Unix/Linux 零碎外部不应用文件名,而应用 inode 号码来辨认文件。
4.4 TailDirSource 总结
flume ng 1.7 版本后提供!
常见问题:
TailDirSource 采集的文件,不能随便重命名!如果日志在正在写入时,名称为 xxxx.tmp,写入实现后,滚动,改名为 xxx.log,此时一旦匹配规定能够匹配上述名称,就会产生数据的反复采集!
- Taildir Source 能够读取多个文件最新追加写入的内容!
- Taildir Source 是牢靠的,即便 flume 呈现了故障或挂掉。Taildir Source 在工作时,会将读取文件的最初的地位记录在一个 json 文件中,一旦 agent 重启,会从之前曾经记录的地位,继续执行 tail 操作!
- Json 文件中,地位是能够批改,批改后,Taildir Source 会从批改的地位进行 tail 操作!如果 JSON 文件失落了,此时会从新从每个文件的第一行,从新读取,这会造成数据的反复!
- Taildir Source 目前只能读文本文件!
必须配置:
channels –
type – The component type name, needs to be TAILDIR.
filegroups – Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName> – Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.