共计 12674 个字符,预计需要花费 32 分钟才能阅读完成。
定义
flume 全称 Apache Flume
- 技术角度:应用 Java 语言开发的一个分布式、高牢靠、高可用中间件
- 我的项目角度:最早是 Cloudera 提供的日志收集零碎,当初是 Apache 软件基金会(ASF)的顶级我的项目,是 Hadoop 生态圈中的一个组件。以后 Flume 有两个版本 Flume 0.9X 版本的统称 Flume-og,Flume1.X 版本的统称 Flume-ng。因为 Flume-ng 通过重大重构,与 Flume-og 有很大不同,应用时请留神辨别。
- 产品角度:用来收集、聚合、转移不同起源的大量日志数据到地方数据仓库的工具(Flume 是一种日志采集工具)
官网文档:https://flume.apache.org/rele…
民间中文网:https://flume.liyifeng.org/
应用背景
为什么要进行日志数据采集?
在当初的数据技术时代中,数据有着不可代替的位置,抛开数据谈大数据服务就是瞎扯,没有数据作撑持的大数据平台就是一个空壳。数据是所有数据分析、数据挖掘、大数据处理、ai 算法的外围。
在目前的来看,绝大多数公司或者组织做大数据处理时,他们的数据来源于:设施收集、数据库、日志、爬虫等等。
“只有在程序出问题当前才会晓得打一个好的日志有如许重要”,因而当初许多公司的业务平台都有十分严格的日志标准,每天都会产生大量的日志数据,因而这些日志数据中就隐含着微小的价值。
罕用的日志数据采集工具
类型 | Logstash | Filebeat | Flume | Fluentd |
运行环境 | java | go | java | ruby |
资源环境 | 大 | 小 | 小 | 十分小 |
配置 | 简略 | 简略 | 较简单 | 简单 |
可靠性 | 不稳固 | 稳固 | 高牢靠 | 牢靠 |
初步意识并应用 flume
用一个故事了解:有一个池子,它一头进水,另一头出水,进水口能够配置各种管子,出水口也能够配置各种管子,能够有多个进水口、多个出水口。水术语称为 Event,进水口术语称为 Source、出水口术语成为 Sink、池子术语成为 Channel,Source+Channel+Sink,术语称为 Agent。如果有须要,还能够把多个 Agent 连起来。
flume 的名词具体介绍
- Event
数据在 flume 外部是以 Event 封装的模式存在的。因而 source 组件在获取到原始数据后,须要封装成 Event 后发送到 channel 中,而后 sink 从 channel 取出 Event 后,依据配置要求再转成其余的模式进行数据输入。
Event 封装的对象次要有两局部:Headers 和 Body;
Headers 是一个汇合 Map 类型,用于存储元数据(如标记、形容等)
Body 就是一个字节数组,装载具体的数据内容
- agent
flume 最外围的角色就是 agent。flume 日志采集零碎是由一个个 agent 连接起来的数据传输通道。
对于每个 agent 来说就是一个独立的守护过程(JVM), 它负责从数据源接收数据,并发送到下一个目的地。
agent 外部有三个重要的组件:source,channel,sink
- source
Source 是数据的收集端,负责将数据捕捉后进行非凡的格式化,将数据封装到事件(event)里,而后将事件推入 Channel 中。并将接管的数据以 Event 的模式传递给一个或多个 channel。Flume 提供多种数据接管形式,比方间接读取本地文件,Avro,Thrift 等。
- channel
channel 是一种短暂的存储容器,它从 source 处接管到 event 格局数据后进行缓存,直到被生产掉。它在 source 和 sink 之间起到了桥梁作用,channel 是一个残缺的事务,这一点保障了数据在收发时的一致性,并且能够和任意数量的 source 和 sink 连贯。
反对的类型有:FileSystem Channel,Memory channel,Kafka channel 等。
- sink
sink 将数据存储到集中存储器比方 Hbase 和 HDFS, 它从 channels 生产数据 (events) 并将其传递给指标地. 指标地可能是另一个 sink, 也可能 HDFS,HBase.
Flume 启动基本上是基于本地配置文件的(遵循 Java properties 文件格式的文本文件),目前还有一个实验性的性能能够基于 Zookeeper 的在线配置。
基于以上这些,能够进行一个简略的 demo,实时监控服务器 /root/tmp.txt 文件,将收集到的信息打印到控制台。
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/tmp.txt
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
flume 启动命令bin/flume-ng agent -n a1 -c conf -f conf/example.conf -Dflume.root.logger=INFO,console
flume 组件详解
Source 配置
Source | Desc |
Avro Source | 通过监听一个网络端口来承受数据,而且承受的数据必须是应用 avro 序列化框架序列化后的数据; |
Thrift Source | 通过监听一个网络端口来承受数据,而且承受的数据必须是应用 thrift 序列化框架序列化后的数据; |
HTTP Source | 监听一个网络端口,通过 http post/get 来接收数据,GET 形式目前还只是实验性的;该 source 基于 Jetty 9.4,能够解析为 JSON 或 Blob |
NetCat Source | 监听一个网络端口,获取通过 TCP 或 UDP 协定的数据 |
Kafka Source | Kafka Source 就是一个 Apache Kafka 消费者,它从 Kafka 的 topic 中读取音讯。如果运行了多个 Kafka Source,则能够把它们配置到同一个消费者组,以便每个 source 都读取一组惟一的 topic 分区。 |
Exec Source | 启动一个用户所指定的 linux shell 命令; 采集这个 linux shell 命令的规范输入,作为收集到的数据,转为 event 写入 channel |
Spooling Directory Source | 监督一个指定的文件夹,如果文件夹下有没采集过的新文件,则将这些新文件中的数据采集,并转成 event 写入 channel; 留神:spooling 目录中的文件必须是不可变的,而且是不能重名的!否则,source 会 loudly fail! |
Taildir Source | 监督指定目录下的一批文件,只有某个文件中有新写入的行,则会被 tail 到; 它会记录每一个文件所 tail 到的地位,记录到一个指定的 positionfile 保留目录中,格局为 json(如果须要的时候,能够人为批改,就能够让 source 从任意指定的地位开始读取数据); 它对采集实现的文件,不会做任何批改 |
Syslog Sources | 读取系统日志数据生成 event。 |
JMS Source | 从 JMS 指标(例如队列或主题)读取音讯; 作为 JMS 应用程序,它应可与任何 JMS 提供程序一起应用,但仅通过 ActiveMQ 的测试; 留神,应该应用 plugins.d 目录(首选),命令行上的–classpath 或通过 flume-env.sh 中的 FLUME_CLASSPATH 变量将提供的 JMS jar 蕴含在 Flume 类门路中 |
Stress Source | StressSource 是一个外部负载生成 Source 的实现,对于压力测试十分有用。能够配置每个 Event 的大小(headers 为空)、也能够配置总共发送 Event 数量以及发送胜利的 Event 最大数量。 |
Custom Source | 自定义 source |
Channel 配置
Channel | Desc |
Memory Channel | event 存储在内存中,且能够配置最大值。对于须要高吞吐而且能够容忍数据失落的状况下,能够抉择该 channel |
File Channel | event 被缓存在本地磁盘文件中;可靠性高,不会失落; |
Kafka Channel | agent 利用 kafka 作为 channel 数据缓存;kafka channel 要跟 kafka source、kafka sink 区别开来;kafka channel 在利用时,能够没有 source |
JDBC Channel | event 被长久到数据库中,目前反对 derby. 实用于可复原的场景 |
Spillable Memory Channel | event 存储在内存和磁盘上。内存充当主存储,磁盘充当溢出。这个 channel 目前是实验性的,不倡议用于生产环境。 |
Custom Channel | 自定义 channel |
Sink 配置
Sink | Desc |
Null Sink | 间接抛弃 |
Logger Sink | 数据输入到日志中,通常用于 debug |
File Roll Sink | 数据存储到本地文件系统 |
HDFS Sink | 数据被最终发往 hdfs;能够生成 text 文件或 sequence 文件,而且反对压缩;反对生成文件的周期性 roll 机制:基于文件 size,或者工夫距离,或者 event 数量;指标门路,能够应用动静通配符替换,比方用 %D 代表以后日期;当然,它也能从 event 的 header 中,取到一些标记来作为通配符替换 |
Hive Sink | 可将 text 或 json 数据间接存储到 hive 分区表 |
HBaseSink | 数据存储到 hbase 中 |
ElasticSearchSink | 间接存储到 es 中 |
Kafka Sink | 存储到 kafka 中 |
Kite Dataset Sink | 将事件写入 Kite 数据集。该接收器将反序列化每个传入事件的主体,并将后果记录存储在 Kite 数据集中。它通过按 URI 加载数据集来确定指标数据集。(kite 是专门来操纵大数据集的,能够通过 kite 将数据存储到 hdfs、local、hive、hbase 中,并且还提供了 partition 分区机制,放慢数据访问速度。并且 kite 反对 avro、parquet、csv、json 等几种存储数据的形式。) |
MorphlineSolrSink | (Solr 是一个独立的企业级搜寻应用服务器)该接收器从 Flume 事件中提取数据,对其进行转换,并将其简直实时地加载到 Apache Solr 服务器中,后者再为最终用户或搜寻应用程序提供查问 |
Avro Sink | avro sink 用来向 avro source 发送 avro 序列化数据,这样就能够实现 agent 之间的级联 |
Thrift Sink | 同 avro sink |
HTTP Sink | 将接管到的数据通过 post 申请产生到近程服务,event 内容作为申请体发送 |
IRC Sink | 因特网中继聊天(Internet Relay Chat),个别称为互联网中继聊天,简称:IRC。它是由芬兰人 Jarkko Oikarinen 于 1988 年独创的一种网络聊天协定。 |
Custom Sink | 自定义 sink |
拦截器 Interceptor
拦截器,就是工作在 source 之后,能够从 source 取得 event,做一个逻辑解决,而后再返回解决之后的 event。这也就能够让用户不须要改变 source 代码的状况下,插入一些解决逻辑。学过 java 的同学对拦截器应该比较清楚了。具体类型如下:
Interceptor | Desc |
host | 往 event 的 header 中插入主机名信息 |
timestamp | 向 event 中,写入一个 kv 到 header 里;key 的名称能够随便配置,value 就是以后工夫戳 |
uuid | 用于在每个 event header 中生成一个 uuid 字符串 |
static 动态属性写入拦截器 | 让用户往 event 中增加一个自定义的 header,key-value 模式的,当然这个 kv 在配置文件中是写死的 |
remove_header 删除属性拦截器 | 这个拦截器能够删除 Event header 外面的属性,能够是一个或多个。 |
search_replace 查找 - 替换拦截器 | 该拦截器基于 Java 正则表达式提供简略的基于字符串的搜寻和替换性能;相似于 Java 中的 Matcher.replaceAll 办法 |
regex_filter 正则过滤拦截器 | 这个拦截器会把 Event 的 body 当做字符串来解决,并用配置的正则表达式来匹配。能够配置指定被匹配到的 Event 抛弃还是没被匹配到的 Event 抛弃。 |
regex_extractor 正则提取拦截器 | 这个拦截器会应用正则表达式从 Event 内容体中获取一组值并与配置的 key 组成 n 个键值对,而后放入 Event 的 header 中,Event 的 body 不会有任何更改。 |
Morphline 实时荡涤拦截器 | 此拦截器通过 morphline 配置文件 过滤 Event,配置文件定义了一系列转换命令,用于将记录从一个命令传递到另一个命令。 |
custom type as FQCN | 自定义实现拦截器 |
Source 通道选择器(Flume Channel Selectors)
一个 source 能够对接多个 channel,那么问题来了,source 的数据是怎么在多个 channel 之间进行传递的呢?这就是 selector 的性能了,通过 selector 选择器依据策略能够将 event 从 source 传递到指定的 channel 中去。具体的 selector 选择器类型如下表格:
Selector | Desc |
replicating 复制选择器 | 默认的选择器,将 event 进行复制分发给上游所有的节点 |
Multiplexing 多路复用选择器 | 多路选择器,能够依据 event 中的一个指定 key 对应的 value 来决定这条音讯会被写入到那个哪个 channel 中 |
Custom Selector | 自定义选择器 |
Sink 组逻辑处理器(Flume Sink Processors)
一个 agent 中,多个 sink 能够被组装到一个组中,而数据在组内多个 sink 之间发送。接管处理器能够在组内提供负载平衡的性能,或者是在长期故障的状况下实现从一个接收器转移到另一个接收器上。具体的接收器模式见下表格:
Processor | Desc |
default | 默认的接管处理器仅承受一个 sink,当然用户也没有必要为了一个 sink 去创立 processor |
Failover | 故障转移模式,即一个组内只有优先级高的 sink 在工作,而其余的 sink 处于期待中 |
load_balance | 负载平衡模式,容许 channel 中的数据在一组 sink 中的多个 sink 之间进行轮转,具体的策略有:round-robin(轮流发送);random(随记发送) |
Custom processor | 目前还不反对自定义 Sink 组逻辑处理器 |
flume 进阶
flume 的高可靠性
事务是 flume 高可靠性的保障
首先咱们先来理解一下消息传递的可靠性保障的三种形式:
- At-Least-Once
- At-Most-Once
- Exactly-Once
基本上所有工具的应用用户都心愿工具框架能保障音讯 Exactly-once,这样就不用在设计实现上思考音讯的失落或者反复的解决场景。然而事实上很少有工具和框架能做到这一点,真正能做到这一点所付出的老本往往很大,或者带来的额定影响反而让你感觉不值得。假如 Flume 真的做到了 Exactly-once,那势必升高了稳定性和吞吐量,所以 Flume 抉择的策略是 At-least-once。
Flume 采纳的是 At-Least-Once 策略,这里并不是说 Flume 任意一个组件就能够实现这种策略,而是通过三个组件 (source,channel,sink) 之间高低投递音讯来保障,然而如果抉择了 Memory Channel 的话,这个就不敢打包票能实现 At-Least-Once 了。Flume 要保障 at-least-once 的根底就是 Transaction。
- Source 到 Channel 是事务性的
- Channel 到 Sink 也是事务性的
flume 的事务工作流:
这里再详解一下上图的流程,Flume 的 transaction 是有生命周期的,别离是 start、commit、rollback 和 close.
当 source 往 channel 投递事件的时候,会首先调用 start 办法开启事务,将 event 怼入 putList 中,此时 putList 会包裹在一层事务中(为了数据传输的原子性),当 putList 的数据问题怼入 channel,进行事务 commit 确认,当 putList 中的数据怼入到 channel 失败的时候(通常体现为 channel 中的数据已满了,putList 中的数据没方法在放进 channel 外面,这里须要嗔怪设定 channel capacity 的那个家伙),进行事务 rollback 回滚(会将以后的 putList 清空,source 将方才提交的音讯事件向源头进行 ack 确认,须要上游具备音讯重发性能),而后 close 事务。
sink 生产 channel 也是同样的模式,惟一不同的中央是 sink 在往指标源实现写入后才会对事务进行 commit, 通常为上游的存储系统宕机导致信息无奈收回,此时会执行事务回滚(会将以后的 takeList 清空,并将 takeList 的信息回滚到 channel 中,如果此时 channel 也是满的,间接报错)。
数据可能会反复
Flume 保障事件至多一次被送到它们的目的地,只有一次倾力写数据,且不存在任何类型的故障事件只被写一次。然而像网络超时或局部写入存储系统的谬误,可能导致事件不止被写一次,因为 Flume 将重试写操作直到它们齐全胜利。
flume 的分布式
flume 灵便的 source 和 sink 组件,使得多个 agent 之间能够进行数据通信,这也是 flume 扩展性和分布式的根底,能够反对大量的部署计划。Flume 内置了专门的 RPC sink-source 对。对于 agent-to-agent 通信,首选的 RPC sink-RPC source 对是 Avro Sink-Avro Source 对。要从其它的 Flume Agent 或客户端接收数据,Agent 接收数据能够被配置为应用 Avro Source,且发送数据的 Agent 必须配置用来运行 Avro Sink。
日志收集场景中比拟常见的是数百个日志生产者发送数据到几个日志消费者 Agent 上,而后消费者 Agent 负责把数据发送到存储系统。例如从数百个 web 服务器收集的日志发送到十几个 Agent 上,而后由十几个 Agent 写入到 HDFS 集群。
日志生产者到日志消费者的数据传输通常应用负载平衡的形式进步整个集群的吞吐量和稳定性。
如果生产工夫的服务器数量继续减少,该层从应用程序服务器接收数据的 agent 数量也须要减少。这意味着在某种程度上,它可能须要减少后续层来减少缓冲容量,以适应数据产量的减少。
Flume 反对多路复用数据流到一个或多个目的地。这是通过应用一个流的[多路复用器](multiplexer)来实现的,它能够 复制 或者 抉择(多路复用)数据流到一个或多个 channel 上。
应用 flume 的 channel 选择器能够依据须要将数据复制或分组发送到不同的目的地
依据官网的倡议:向 HDFS 发送数据的 Agent 数量尽量不要超过 8 个;下层和上层的数量比率不要超过 32:1。
Flume 真的适宜你吗?
如果你须要将文本日志数据提取到 Hadoop / HDFS 中,那么 Flume 最合适不过了。然而,对于其余状况,你最好看看以下倡议:
Flume 旨在通过绝对稳固,可能简单的拓扑部署来传输和收集定期生成的 Event 数据。“Event 数据”定义十分宽泛,对于 Flume 来说一个 Event 就是一个一般的字节数组而已。Event 大小有一些限度,它不能比你的内存或者服务器硬盘还大,理论应用中 Flume Event 能够是文本或图片的任何文件。要害的是这些 Event 应该是以间断的流的形式一直生成的。如果你的数据不是定期生成的(比方你将大量的数据批量加载到 Hadoop 集群中),尽管 Flume 能够做这个事件,然而有点“杀鸡用牛刀”的感觉,这并不是 Flume 所善于和喜爱的工作形式。Flume 喜爱绝对稳固的拓扑构造,但也不是说永远变化无穷,Flume 能够解决拓扑中的更改而又不失落数据,还能够解决因为故障转移或者配置的定期从新加载。如果你的拓扑构造每天都会变动,那么 Flume 可能就无奈失常的工作了,毕竟重新配置也是须要肯定思考和开销的。
Flume 的拓扑设计
FLume 的监控
Flume 作为一个弱小的数据收集工具,尽管性能十分弱小实用,然而却无奈看到 flume 收集数据的详细信息,所以咱们须要一个能展现 flume 实时收集数据动静信息的界面,包含 flume 胜利收集的日志数量、胜利发送的日志数量、flume 启动工夫、进行工夫、以及 flume 一些具体的配置信息,像通道容量等,于是顺利成章的监控能帮咱们做到这些,有了这些数据,在遇到数据收集瓶颈或者数据失落的时候,通过剖析监控数据来剖析、解决问题。
source 监控项:
指标项 | 阐明 |
OpenConnectionCount | 目前与客户端或 sink 放弃连贯的总数量 |
AppendBatchAcceptedCount | 胜利提交到 channel 的批次的总数量 |
AppendBatchReceivedCount | 接管到事件批次的总数量 |
AppendAcceptedCount | 逐条录入的次数 |
AppendReceivedCount | 每批只有一个事件的事件总数量 |
EventAcceptedCount | 胜利写出到 channel 的事件总数量 |
EventReceivedCount | 目前为止 source 曾经接管到的事件总数量 |
StartTime | source 启动时的毫秒值工夫 |
StopTime | source 进行时的毫秒值工夫,为 0 示意始终在运行 |
channel 监控项:
指标项 | 阐明 |
EventPutAttemptCount | Source 尝试写入 Channe 的事件总次数 |
EventPutSuccessCount | 胜利写入 channel 且提交的事件总次数 |
EventTakeAttemptCount | sink 尝试从 channel 拉取事件的总次数 |
EventTakeSuccessCount | sink 胜利从 channel 读取事件的总数量 |
ChannelSize | 目前 channel 中事件的总数量 |
ChannelCapacity | channel 的容量 |
ChannelFillPercentage | channel 已填入的百分比 |
StartTime | channel 启动时的毫秒值工夫 |
StopTime | channel 进行时的毫秒值工夫,为 0 示意始终在运行 |
sink 监控项:
指标项 | 阐明 |
ConnectionCreatedCount | 创立的连贯数量 |
ConnectionClosedCount | 敞开的连贯数量 |
ConnectionFailedCount | 因为谬误敞开的连贯数量 |
BatchEmptyCount | 批量解决 event 的个数为 0 的数量 - 示意 source 写入数据的速度比 sink 解决数据的速度慢 |
BatchUnderflowCount | 批量解决 event 的个数小于批处理大小的数量 |
BatchCompleteCount | 批量解决 event 的个数等于批处理大小的数量 |
EventDrainAttemptCount | sink 尝试写出到存储的事件总数量 |
EventDrainSuccessCount | sink 胜利写出到存储的事件总数量 |
StartTime | channel 启动时的毫秒值工夫 |
StopTime | channel 进行时的毫秒值工夫,为 0 示意始终在运行 |
- JMX Reporting
Java 治理扩大(Java Management Extension,JMX)是从 jdk1.4 开始的,但从 1.5 时才加到 jdk 外面,并把 API 放到 java.lang.management 包外面。MX 监控能够通过在 flume-env.sh 脚本中批改 JAVA_OPTS 环境变量中的 JMX 参数来开启
如果一个 Java 对象能够由一个遵循 JMX 标准的管理器利用治理,那么这个 Java 对象就能够称为一个可由 JMX 治理的资源。 - JSON Reporting
Flume 也反对以 JSON 格局报告运行指标。为了对外提供这些报告数据,Flume 会在某个端口(可自定义)上运行一个 web 服务来提供这些数据。 - Ganglia Reporting
Ganglia 是 UC Berkeley 发动的一个开源集群监督可视化工具,设计用于测量数以千计的节点。Ganglia 的外围蕴含 gmond、gmetad 以及一个 Web 前端。次要是用来监控零碎性能,如:cpu、mem、硬盘利用率,I/ O 负载、网络流量状况等,通过曲线很容易见到每个节点的工作状态,对正当调整、调配系统资源,进步零碎整体性能起到重要作用。 - Custom Reporting
能够通过编写本人的执行报告服务向其余零碎报告运行指标。报告类必须实现 org.apache.flume.instrumentation.MonitorService 接口。
我的项目实际
kafka 集群作为 channel 的计划
Memory Channel 有很大的丢数据危险,而且容量个别,File Channel 尽管能缓存更多的音讯,但如果缓存下来的音讯还没写入 Sink,此时 Agent 呈现故障则 File Channel 中的音讯一样不能被持续应用,直到该 Agent 复原。而 Kafka Channel 容量大,容错能力强。
有了 Kafka Channel 能够在日志收集层只配置 Source 组件和 Kafka 组件,不须要再配置 Sink 组件,缩小了日志收集层启动的过程数,无效升高服务器内存、磁盘等资源的使用率。而日志汇聚层,能够只配置 Kafka Channel 和 Sink,不须要再配置 Source。
- 业务代码中实时发送日志
- 业务服务器装置 flume 读取本地日志文件,发送日志
flume Collector 配置文件:
# Name the components on this agent
a1.sources = r1
# a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node4
a1.sources.r1.port = 4444
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.channels.c1.kafka.topic = testflume
a1.channels.c1.kafka.consumer.group.id = test-consumer-group
a1.sources.r1.channels = c1
本地读取文件发送 flume Collector 的配置文件:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = ./taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /root/tmp1.txt
a1.sources.r1.filegroups.f2 = /root/tmp2.txt
a1.sources.r1.fileHeader = false
a1.sources.r1.batchSize = 1
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node4
a1.sinks.k1.port = 4444
a1.sinks.k1.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node5
a1.sinks.k2.port = 4444
a1.sinks.k2.channel = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
Logback 配置 flume
业务零碎中向 flume 发送日志通常要联合日志框架(Log4j、Logback 等)一起应用,SpringBoot 天生自带 logback,咱们这里只须要引入一个 logback-flume 依赖即可。
<dependency>
<groupId>com.teambytes.logback</groupId>
<artifactId>logback-flume-appender_2.10</artifactId>
<version>0.0.9</version>
</dependency>
而后在 logback 的配置文件中进行相干配置:
增加一个 appender,配置向 flume 发送音讯的相干信息,配置多个 flumeAgent 时默认会应用负载平衡的形式进行发送
<appender name="flumeTest" class="com.teambytes.logback.flume.FlumeLogstashV1Appender">
<flumeAgents>
192.168.197.130:4444,192.168.197.131:4444
</flumeAgents>
<flumeProperties>
connect-timeout=4000;
request-timeout=8000
</flumeProperties>
<batchSize>100</batchSize>
<reportingWindow>1000</reportingWindow>
<additionalAvroHeaders>
myHeader = myValue
</additionalAvroHeaders>
<application>wk's Application</application>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %message%n%ex</pattern>
</layout>
</appender>
而后启用这个 appender 即可
<logger name="com" level="info">
<appender-ref ref="flumeTest"/>
<appender-ref ref="fileAppender"/>
</logger>