flume简介

cloudera 公司开源的,贡献给Apache基金会

http://flume.apache.org/

http://archive.cloudera.com/c...

只能运行在linux系统上

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

flume用来高效的收集、聚合、移动大量的日志数据

有一个基于流式的简单的有弹性的传输模型

有一个健壮的可容错的机制

使用简单,可以扩展的数据模型运行使用到在线实时分析应用中

简单体现在flume-agent的配置及传输模型简单

在线实时分析应用中

flume日志的实时采集->sparkStreaming/storm/Flink =>mysql/redis=>实时分析的结果进行报表展示

数据(日志)的移动传输工具:

日志=>系统运行日志、web服务器的访问日志、客户端的用户行为日志、软件的运行操作日志 

可以将数据从数据源中采集并移动到另外一个目的地:

数据源=>系统本地日志文件中的数据、jms、avro端口、kafka、系统本地目录下... 目的地=>hdfs、hive、hbase、kafka、系统本地一个文件中...        

如何将linux本地的一个日志文件中的日志数据采集到hdfs上

  • 脚本+hdfs命令 =>【周期性】上传

        #!/bin/sh    HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2        $HADOOP_HOME/bin/hdfs -put /.../xx.log  /hdfs   

    针对项目初期数据量较少时可以使用 , 没有容灾性及稳定性

  • 采用flume日志采集框架=>【实时】采集一个日志文件中实时追加的日志数据并写入到目的地
    针对不同的应用场景定义并启动对应的flume-agent实例/进程

    source  -- 定义从哪里采集数据      exec类型的source可以借助Linux的shell命令实现实时读取一个日志文件中动态追加的日志数据     avro类型     ……channel  -- 定义了source采集的数据临时存储地       memory 基于内存的管道容器     file 基于磁盘 sink  -- 定义将数据最终写入的-目的地      hdfs类型的sink将数据最终写入到hdfs上      hive类型将数据最终写入到hive表     kafka类型将数据最终写入到kafka分布式消息队列中      ……                             

flume-agent实例的模型

每个flume-agent实例至少由以下三个功能模块组成     source模块          用于监控数据源并进行数据的实时采集,是实时产生数据流的模块        数据源=>系统本地的一个日志文件中、kafka、jms、系统本地的一个目录下、avro端口  。。。         source将采集到的数据提交到channel中    channel模块          用于连接source和sink的管道容器          类似一个队列(FIFO)    sink模块          从channel中拉取take(剪切)数据并最终将数据写入到目的地        目的地=>hdfs、hive、hbase、kafka、avro端口...              event事件:     event事件是flume传输日志数据时基本单元,在flume-agent内部数据都是以事件形式存在         source将采集到的数据封装成一个个的event事件,将事件提交到channel        sink从channel消费事件并将事件中封装的数据最终写入到目的地      event事件的数据结构:header + body              header             是一个map集合类型             内部的key-value为该事件的元数据信息,主要用来区分不同的事件         body             是一个字节数组类型            body为我们真正要传输的数据                

flume的安装使用

                flume-ng-1.6.0-cdh5.14.2安装     1、上次解压flume的安装包         $ tar zxvf  /opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/        $ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2  修改目录名称-可选      2、修改flume配置文件         $ mv conf/flume-env.sh.template conf/flume-env.sh  修改后环境配置文件才能生效        $ vi conf/flume-env.sh             export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112    3、针对不同的场景需求配置对应的java属性配置文件并启动flume-agent进程                  如何启动一个flume-agent进程          $ bin/flume-ng agent  \        --name或-n 当前flume-agent实例的别名  \        --conf或-c 当前flume框架的配置文件目录 \        --conf-file,-f 与当前要启动的flume-agent进程相匹配的java属性配置文件的本地路径                      Usage: bin/flume-ng <command> [options]...                      

案例

案例一:

flume官方简单案例
定义一个flume-agent去监听读取某台服务器上的某个端口中的数据,并将监听读取到的数据最终写入到flume框架自己的日志文件中

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = centos01a1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
            提交测试:     $ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties &  确定目标服务器的端口是否已经成功被flume-agent代理进程监听      $ netstat -antp |grep 44444     --查看端口信息     $ ps -ef | grep flume  -- 查看进程信息      安装一个telnet工具并连接服务器端口写入数据      $ sudo yum -y install telnet        发送消息数据      检查flume的日志文件中的数据     $ tail -f logs/flume.log               

案例二:

要求使用flume实时监控读取系统本地一个日志文件中动态追加的日志数据并实时写入到hdfs上的某个目录下

# example.conf: A single-node Flume configuration#同一台Linux上可开启多个flume-agent,但agent别名要区分a2.sources = r2a2.sinks = k2a2.channels = c2# Describe/configure the source#依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停a2.sources.r2.type = exec# tail -F 文件名  即使没有这个-F后面指定的文件,命令也不会停止,容错能力强a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100#声明a2的sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/webloga2.sinks.k2.hdfs.filePrefix = nginxData# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2

报错首先找logs目录

报错:找不到类

缺少jar包

$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/     $ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/        

案例三:

案例二的优化:

解决生成的文件过多过小的问题(希望文件的大小=128M) 将日志文件按照日期分目录存储(按照天分目录存储)  将生成的日志文件的格式改为Text文本格式 

修改案例二的flume-agent属性文件

# 声明当前flume-agent的别名及当前的flume-agent实例包含的模块的别名和个数a2.sources = s2a2.channels = c2a2.sinks = k2# 定义source模块中的s2的类型及与此类型相关的延伸属性 # exec类型的source可以借助执行一条linux shell命令实现读取linux系统上某个文件中的日志数据,其中 cat是一次性读取,tail可以实现实时读取新增加的数据  # shell属性用来声明要执行的命令的运行环境a2.sources.s2.type = execa2.sources.s2.command = tail -F /opt/nginx/access.log a2.sources.s2.shell = /bin/sh -c# 定义channel模块中的c2的类型及与此类型相关的延伸属性  a2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100# 定义sink模块中的k2的类型及与此类型相关的延伸属性 a2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d#启用根据时间生成路径中的转义字符的具体的时间值a2.sinks.k2.hdfs.round = true#表示使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#设置文件的前缀a2.sinks.k2.hdfs.filePrefix = NgnixLog#设置解决文件过多过小问题a2.sinks.k2.hdfs.rollInterval = 0a2.sinks.k2.hdfs.rollSize = 128000000a2.sinks.k2.hdfs.rollCount = 0#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效a2.sinks.k2.hdfs.minBlockReplicas = 1#批量写入到hdfs上文件中的最大event数量#batchSize的值需要小于等于transactionCapacity的值 #从性能上考虑,最优的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFilea2.sinks.k2.hdfs.fileType = DataStream# 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a2.sinks.k2.hdfs.writeFormat = Text # 将a2中的source及sink模块绑定到对应的channel模块上 # 一个source模块可以同时绑定多个channel模块,但是一个sink模块只能绑定一个唯一的channela2.sources.s2.channels = c2a2.sinks.k2.channel = c2

案例四:

利用flume监控某个目录下的日志文件,当某个目录下出现符合要求的文件名称的文件时,则对文件中的日志数据进行读取,并将数据最终写入到hdfs上

目录    /opt/data/logs        nginx-access.log.2018120309         nginx-access.log.2018120310
# example.conf: A single-node Flume configuration#同一台Linux上可开启多个flume-agent,但agent别名要区分a2.sources = r2a2.sinks = k2a2.channels = c2# Describe/configure the source# includePattern 用正则表达式指定要包含的文件# ignorePattern  用正则表达式指定要忽略的文件a2.sources.r2.type = spooldira2.sources.r2.spoolDir = /home/chen/mylogs# 由于每次读完会给读完的文件增加.COMPLETED从而形成新文件,需要忽略这些文件a2.sources.r2.ignorePattern = ^.*\.COMPLETED$# includePattern和ignorePattern会同时生效a2.sources.r2.includePattern =     ^.*$# Use a channel# file类型更安全# memory类型效率更高a2.channels.c2.type = filea2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data#声明a2的sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d#启用根据时间生成转义字符的具体的时间值a2.sinks.k2.hdfs.round = true#使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = truea2.sinks.k2.hdfs.filePrefix = nginxData#设置解决文件过多过小问题a2.sinks.k2.hdfs.rollInterval = 0a2.sinks.k2.hdfs.rollSize = 128000000a2.sinks.k2.hdfs.rollCount = 0#写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效a2.sinks.k2.hdfs.minBlockReplicas = 1#批量写入到hdfs上文件中的最大event数量#batchSize的值需要小于等于transactionCapacity的值 #从性能上考虑,最优的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100# fileType定义的是数据流的格式,默认的数据流的格式为SequenceFilea2.sinks.k2.hdfs.fileType = DataStream# 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a2.sinks.k2.hdfs.writeFormat = Text# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2

案列五:

需求:            Nginx服务器集群 -- 10台      每台服务器上都有一个access.log日志文件      需要将每台服务器上的日志文件中追加的日志数据实时读取并写入到hdfs上             思路1:         每台Nginx服务器上启动一个flume-agent             source - exec              channel - mem             sink - hdfs          多个flume-agent同时写入数据到hfds上不利于hdfs的稳定性     思路2:         每台Nginx服务器上启动一个flume-agent             source - exec              channel - mem             sink - avro                   type = avro                 hostname = 主机名                port =  端口号             将数据统一写入到某台服务器某个端口中                     启动一个负责对汇总后的数据统一写入到目的地的flum-agent             source - avro                   type = avro                bind =                 port =             channel - mem             sink - hdfs  

nginxs2flume.properties

# example.conf: A single-node Flume configuration#同一台Linux上可开启多个flume-agent,但agent别名要区分a2.sources = r2a2.sinks = k2a2.channels = c2# Describe/configure the source#依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停a2.sources.r2.type = exec# tail -F 文件名  即使没有这个-F后面指定的文件,命令也不会停止,容错能力强a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100#声明a2的sinka2.sinks.k2.type = avroa2.sinks.k2.hostname = centos01a2.sinks.k2.port = 6666# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2

flume2hdfs.properties

# example.conf: A single-node Flume configuration#同一台Linux上可开启多个flume-agent,但agent别名要区分a3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = avroa3.sources.r3.bind = centos01a3.sources.r3.port = 6666# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 1000a3.channels.c3.transactionCapacity = 100#声明a3的sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/testa3.sinks.k3.hdfs.writeFormat = Text # Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3