共计 8859 个字符,预计需要花费 23 分钟才能阅读完成。
flume 简介
cloudera 公司开源的,贡献给 Apache 基金会
http://flume.apache.org/
http://archive.cloudera.com/c…
只能运行在 linux 系统上
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
flume 用来高效的收集、聚合、移动大量的日志数据
有一个基于流式的简单的有弹性的传输模型
有一个健壮的可容错的机制
使用简单,可以扩展的数据模型运行使用到在线实时分析应用中
简单体现在 flume-agent 的配置及传输模型简单
在线实时分析应用中
flume 日志的实时采集 ->sparkStreaming/storm/Flink =>mysql/redis=> 实时分析的结果进行报表展示
数据(日志)的移动传输工具:
日志 => 系统运行日志、web 服务器的访问日志、客户端的用户行为日志、软件的运行操作日志
可以将数据从数据源中采集并移动到另外一个目的地:
数据源 => 系统本地日志文件中的数据、jms、avro 端口、kafka、系统本地目录下...
目的地 =>hdfs、hive、hbase、kafka、系统本地一个文件中...
如何将 linux 本地的一个日志文件中的日志数据采集到 hdfs 上
-
脚本 +hdfs 命令 =>【周期性】上传
#!/bin/sh HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2 $HADOOP_HOME/bin/hdfs -put /.../xx.log /hdfs
针对项目初期数据量较少时可以使用,没有容灾性及稳定性
-
采用 flume 日志采集框架 =>【实时】采集一个日志文件中实时追加的日志数据并写入到目的地
针对不同的应用场景定义并启动对应的 flume-agent 实例 / 进程source -- 定义从哪里采集数据 exec 类型的 source 可以借助 Linux 的 shell 命令实现实时读取一个日志文件中动态追加的日志数据 avro 类型 …… channel -- 定义了 source 采集的数据临时存储地 memory 基于内存的管道容器 file 基于磁盘 sink -- 定义将数据最终写入的 - 目的地 hdfs 类型的 sink 将数据最终写入到 hdfs 上 hive 类型将数据最终写入到 hive 表 kafka 类型将数据最终写入到 kafka 分布式消息队列中 ……
flume-agent 实例的模型
每个 flume-agent 实例至少由以下三个功能模块组成
source 模块
用于监控数据源并进行数据的实时采集,是实时产生数据流的模块
数据源 => 系统本地的一个日志文件中、kafka、jms、系统本地的一个目录下、avro 端口。。。source 将采集到的数据提交到 channel 中
channel 模块
用于连接 source 和 sink 的管道容器
类似一个队列(FIFO)sink 模块
从 channel 中拉取 take(剪切)数据并最终将数据写入到目的地
目的地 =>hdfs、hive、hbase、kafka、avro 端口...
event 事件:event 事件是 flume 传输日志数据时基本单元,在 flume-agent 内部数据都是以事件形式存在
source 将采集到的数据封装成一个个的 event 事件,将事件提交到 channel
sink 从 channel 消费事件并将事件中封装的数据最终写入到目的地
event 事件的数据结构:header + body
header
是一个 map 集合类型
内部的 key-value 为该事件的元数据信息,主要用来区分不同的事件
body
是一个字节数组类型
body 为我们真正要传输的数据
flume 的安装使用
flume-ng-1.6.0-cdh5.14.2
安装
1、上次解压 flume 的安装包
$ tar zxvf /opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/
$ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2 修改目录名称 - 可选
2、修改 flume 配置文件
$ mv conf/flume-env.sh.template conf/flume-env.sh 修改后环境配置文件才能生效
$ vi conf/flume-env.sh
export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112
3、针对不同的场景需求配置对应的 java 属性配置文件并启动 flume-agent 进程
如何启动一个 flume-agent 进程
$ bin/flume-ng agent \
--name 或 -n 当前 flume-agent 实例的别名 \
--conf 或 -c 当前 flume 框架的配置文件目录 \
--conf-file,-f 与当前要启动的 flume-agent 进程相匹配的 java 属性配置文件的本地路径
Usage: bin/flume-ng <command> [options]...
案例
案例一:
flume 官方简单案例
定义一个 flume-agent 去监听读取某台服务器上的某个端口中的数据,并将监听读取到的数据最终写入到 flume 框架自己的日志文件中
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = centos01
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
提交测试:$ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties &
确定目标服务器的端口是否已经成功被 flume-agent 代理进程监听
$ netstat -antp |grep 44444 -- 查看端口信息
$ ps -ef | grep flume -- 查看进程信息
安装一个 telnet 工具并连接服务器端口写入数据
$ sudo yum -y install telnet
发送消息数据
检查 flume 的日志文件中的数据
$ tail -f logs/flume.log
案例二:
要求使用 flume 实时监控读取系统本地一个日志文件中动态追加的日志数据并实时写入到 hdfs 上的某个目录下
# example.conf: A single-node Flume configuration
#同一台 Linux 上可开启多个 flume-agent,但 agent 别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
#依靠的是 Linux 的命令读取本地文件,Linux 的命令不停止 flume 就不停
a2.sources.r2.type = exec
# tail -F 文件名 即使没有这个 - F 后面指定的文件,命令也不会停止,容错能力强
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#声明 a2 的 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog
a2.sinks.k2.hdfs.filePrefix = nginxData
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
报错首先找 logs 目录
报错:找不到类
缺少 jar 包
$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
$ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
案例三:
案例二的优化:
解决生成的文件过多过小的问题(希望文件的大小 =128M)将日志文件按照日期分目录存储(按照天分目录存储)将生成的日志文件的格式改为 Text 文本格式
修改案例二的 flume-agent 属性文件
# 声明当前 flume-agent 的别名及当前的 flume-agent 实例包含的模块的别名和个数
a2.sources = s2
a2.channels = c2
a2.sinks = k2
# 定义 source 模块中的 s2 的类型及与此类型相关的延伸属性
# exec 类型的 source 可以借助执行一条 linux shell 命令实现读取 linux 系统上某个文件中的日志数据,其中 cat 是一次性读取,tail 可以实现实时读取新增加的数据
# shell 属性用来声明要执行的命令的运行环境
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /opt/nginx/access.log
a2.sources.s2.shell = /bin/sh -c
# 定义 channel 模块中的 c2 的类型及与此类型相关的延伸属性
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# 定义 sink 模块中的 k2 的类型及与此类型相关的延伸属性
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d
#启用根据时间生成路径中的转义字符的具体的时间值
a2.sinks.k2.hdfs.round = true
#表示使用本地 linux 系统时间戳作为时间基准,否则会自动参考事件的 header 中的时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#设置文件的前缀
a2.sinks.k2.hdfs.filePrefix = NgnixLog
#设置解决文件过多过小问题
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#写入到 hdfs 的最小副本数,不设置会导致上面的三个参数不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1
#批量写入到 hdfs 上文件中的最大 event 数量
#batchSize 的值需要小于等于 transactionCapacity 的值
#从性能上考虑,最优的是 batchSize=transactionCapacity
a2.sinks.k2.hdfs.batchSize = 100
# fileType 定义的是数据流的格式,默认的数据流的格式为 SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 写入到 hdfs 上的文件的格式(序列化方法)# 格式改为 text 后,可以通过 cat 或 text 命令查看文件中的日志内容
a2.sinks.k2.hdfs.writeFormat = Text
# 将 a2 中的 source 及 sink 模块绑定到对应的 channel 模块上
# 一个 source 模块可以同时绑定多个 channel 模块,但是一个 sink 模块只能绑定一个唯一的 channel
a2.sources.s2.channels = c2
a2.sinks.k2.channel = c2
案例四:
利用 flume 监控某个目录下的日志文件,当某个目录下出现符合要求的文件名称的文件时,则对文件中的日志数据进行读取,并将数据最终写入到 hdfs 上
目录
/opt/data/logs
nginx-access.log.2018120309
nginx-access.log.2018120310
# example.conf: A single-node Flume configuration
#同一台 Linux 上可开启多个 flume-agent,但 agent 别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
# includePattern 用正则表达式指定要包含的文件
# ignorePattern 用正则表达式指定要忽略的文件
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /home/chen/mylogs
# 由于每次读完会给读完的文件增加.COMPLETED 从而形成新文件,需要忽略这些文件
a2.sources.r2.ignorePattern = ^.*\.COMPLETED$
# includePattern 和 ignorePattern 会同时生效
a2.sources.r2.includePattern = ^.*$
# Use a channel
# file 类型更安全
# memory 类型效率更高
a2.channels.c2.type = file
a2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data
#声明 a2 的 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d
#启用根据时间生成转义字符的具体的时间值
a2.sinks.k2.hdfs.round = true
#使用本地 linux 系统时间戳作为时间基准,否则会自动参考事件的 header 中的时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.filePrefix = nginxData
#设置解决文件过多过小问题
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#写入到 hdfs 的最小副本数,不设置会导致上面的三个参数不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1
#批量写入到 hdfs 上文件中的最大 event 数量
#batchSize 的值需要小于等于 transactionCapacity 的值
#从性能上考虑,最优的是 batchSize=transactionCapacity
a2.sinks.k2.hdfs.batchSize = 100
# fileType 定义的是数据流的格式,默认的数据流的格式为 SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 写入到 hdfs 上的文件的格式(序列化方法)# 格式改为 text 后,可以通过 cat 或 text 命令查看文件中的日志内容
a2.sinks.k2.hdfs.writeFormat = Text
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
案列五:
需求:Nginx 服务器集群 -- 10 台
每台服务器上都有一个 access.log 日志文件
需要将每台服务器上的日志文件中追加的日志数据实时读取并写入到 hdfs 上
思路 1:每台 Nginx 服务器上启动一个 flume-agent
source - exec
channel - mem
sink - hdfs
多个 flume-agent 同时写入数据到 hfds 上不利于 hdfs 的稳定性
思路 2:每台 Nginx 服务器上启动一个 flume-agent
source - exec
channel - mem
sink - avro
type = avro
hostname = 主机名
port = 端口号
将数据统一写入到某台服务器某个端口中
启动一个负责对汇总后的数据统一写入到目的地的 flum-agent
source - avro
type = avro
bind =
port =
channel - mem
sink - hdfs
nginxs2flume.properties
# example.conf: A single-node Flume configuration
#同一台 Linux 上可开启多个 flume-agent,但 agent 别名要区分
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
#依靠的是 Linux 的命令读取本地文件,Linux 的命令不停止 flume 就不停
a2.sources.r2.type = exec
# tail -F 文件名 即使没有这个 - F 后面指定的文件,命令也不会停止,容错能力强
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#声明 a2 的 sink
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = centos01
a2.sinks.k2.port = 6666
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
flume2hdfs.properties
# example.conf: A single-node Flume configuration
#同一台 Linux 上可开启多个 flume-agent,但 agent 别名要区分
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = centos01
a3.sources.r3.port = 6666
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
#声明 a3 的 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/test
a3.sinks.k3.hdfs.writeFormat = Text
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3