乐趣区

大数据系列Flume入门和认识

1. Flume 简介

  • Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
  • 支持在日志系统中定制各类数据发送方,用于收集数据
  • Flume 提供对数据进行简单处理,并写到各种数据接收方

2. Flume OG 与 Flume NG

Flume OG:Flume original generation, 即 Flume0.9x 版本

Flume NG:Flume next generation,即 Flume1.x 版本

3. Flume 体系结构

flume 的事件 (agent)

Source: 用来定义采集系统的源头

Channel: 把 Source 采集到的日志进行传输, 处理

Sink: 定义数据的目的地

4. Flume 的安装

  • 准备安装文件

    • apache-flume-1.6.0-bin.tar.gz
  • 解压

    • tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /opt/
  • 重命名

    • mv apache-flume-1.6.0-bin flume
  • 添加环境变量

    # 配置 Flume 的环境变量
    export FLUME_HOME=/opt/flume
    export PATH=$PATH:$FLUME_HOME/bin
  • 配置文件

    [root@uplooking01 /opt/flume/conf]
        mv flume-env.sh.template flume-env.sh
    [root@uplooking01 /opt/flume/conf/flume-env.sh]
        export JAVA_HOME=/opt/jdk        

5. Flume 采集网络端口数据

5.1 定义 flume 的事件配置文件

flume-nc.properties

# flume-nc.conf: 用于监听网络数据的 flume agent 实例的配置文件
############################################
# 对各个组件的描述说明
# 其中 a1 为 agent 的名字
# r1 是 a1 的 source 的代号名字
# c1 是 a1 的 channel 的代号名字
# k1 是 a1 的 sink 的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述 source 的,类型是 netcat 网络
a1.sources.r1.type = netcat
# source 监听的网络 ip 地址和端口号
a1.sources.r1.bind = uplooking01
a1.sources.r1.port = 44444

# 用于描述 sink,类型是日志格式
a1.sinks.k1.type = logger


# 用于描述 channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000 个 events 事件
a1.channels.c1.capacity = 1000
# 能够同时对 100 个 events 事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将 a1 中的各个组件建立关联关系,将 source 和 sink 都指向了同一个 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 启动 agent(flume 事件)

    flume-ng agent –name a1 –conf /opt/flume/conf –conf-file /opt/flume/conf/flume-nc.properties -Dflume.root.logger=INFO,console

  • 启动 agent(flume 事件) , 简单化

    flume-ng agent –name a1 –conf-file /opt/flume/conf/flume-nc.properties

6. 安装 nc(瑞士军刀)

rpm -ivh nc-1.84-22.el6.x86_64.rpm

监听端口: nc -lk 4444

连接主机的端口: nc host port

7. 查看文件的命令

cat xxx: 查看文件的全部内容

head -100 xxx: 查看文件内容开始的 100 行 (默认不加 -100 是查看 10 行)

tail -100 xxx: 查看文件内容末尾的 100 行 (默认不加 -100 是查看 10 行)

tail -f xxx: 查看文件内容末尾 10 行, 并且监听文件的变化

8. Flume 基本的组件

8.1 Source

  • 从外界采集各种类型的数据,将数据传递给 Channel
  • 常见采集的数据类型

    • NetCat Source
    • Spooling Directory Source }
    • Exec Source
    • Kafka Source (后面说)
    • HTTP Source

8.2 Channel

  • 一个数据的存储池,中间通道
  • 接受 source 传出的数据,向 sink 指定的目的地传输。Channel 中的数据直到进入到下一个 channel 中或者进入终端才会被删除。当 sink 写入失败后,可以自动重写,不会造成数据丢失,因此很可靠
  • channel 的类型 :

    • Memory Channel (常用) 使用内存作为数据的存储。速度快
    • File Channel 使用文件来作为数据的存储。安全可靠
    • Spillable Memory Channel 用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则 flush 到文件中
    • JDBC Channel 使用 jdbc 数据源来作为数据的存储。
    • Kafka Channel 使用 kafka 服务来作为数据的存储

8.3 Sink

  • 数据的最终的目的地
  • 接受 channel 写入的数据以指定的形式表现出来(或存储或展示)。

    sink 的表现形式很多比如: 打印到控制台、hdfs 上、avro 服务中、文件中等

  • 常见采集的数据类型

    • HDFS Sink
    • Hive Sink
    • Logger Sink
    • Avro Sink
    • HBaseSink
    • File Roll Sink

8.4 Event

event 是 Flume NG 传输的数据的基本单位,也是事务的基本单位

在文本文件,通常是一行记录就是一个 event。

网络消息传输系统中,一条消息就是一个 event。

event 里有 header、body

Event 里面的 header 类型:Map<String, String>

我们可以在 source 中自定义 header 的 key:value,在某些 channel 和 sink 中使用 header

9. Flume 监听命令的执行结果

配置 agent

############################################
# 对各个组件的描述说明
# 其中 a1 为 agent 的名字
# r1 是 a1 的 source 的代号名字
# c1 是 a1 的 channel 的代号名字
# k1 是 a1 的 sink 的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述 source 的,类型是 linux 命令
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F  /opt/flume/conf/logs/test01.txt

# 用于描述 sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述 channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000 个 events 事件
a1.channels.c1.capacity = 1000
# 能够同时对 100 个 events 事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将 a1 中的各个组件建立关联关系,将 source 和 sink 都指向了同一个 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行 agent, 开始采集数据

flume-ng agent –name a1 –conf-file flume-exec.properties

10. 采集目录中新增文件的数据

配置 agent

############################################
# 对各个组件的描述说明
# 其中 a1 为 agent 的名字
# r1 是 a1 的 source 的代号名字
# c1 是 a1 的 channel 的代号名字
# k1 是 a1 的 sink 的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述 source 的,监听的是一个目录
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/flume/conf/logs/aa
a1.sources.r1.fileSuffix = .OK
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true

# 用于描述 sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述 channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000 个 events 事件
a1.channels.c1.capacity = 1000
# 能够同时对 100 个 events 事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将 a1 中的各个组件建立关联关系,将 source 和 sink 都指向了同一个 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1    

运行 agent, 开始采集数据

flume-ng agent –name a1 –conf-file flume-dir.properties

11. 后台运行 flume

nohup flume-ng agent –name a1 –conf-file flume-dir.properties >dev/null 2>&1 &

退出移动版