定义
flume全称Apache Flume
- 技术角度:应用Java语言开发的一个分布式、高牢靠、高可用中间件
- 我的项目角度:最早是Cloudera提供的日志收集零碎,当初是Apache软件基金会(ASF)的顶级我的项目,是Hadoop生态圈中的一个组件。以后Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。因为Flume-ng通过重大重构,与Flume-og有很大不同,应用时请留神辨别。
- 产品角度:用来收集、聚合、转移不同起源的大量日志数据到地方数据仓库的工具(Flume是一种日志采集工具)
官网文档:https://flume.apache.org/rele...
民间中文网:https://flume.liyifeng.org/
应用背景
为什么要进行日志数据采集?
在当初的数据技术时代中,数据有着不可代替的位置,抛开数据谈大数据服务就是瞎扯,没有数据作撑持的大数据平台就是一个空壳。数据是所有数据分析、数据挖掘、大数据处理、ai算法的外围。
在目前的来看,绝大多数公司或者组织做大数据处理时,他们的数据来源于:设施收集、数据库、日志、爬虫等等。
“只有在程序出问题当前才会晓得打一个好的日志有如许重要”,因而当初许多公司的业务平台都有十分严格的日志标准,每天都会产生大量的日志数据,因而这些日志数据中就隐含着微小的价值。
罕用的日志数据采集工具
类型 | Logstash | Filebeat | Flume | Fluentd |
运行环境 | java | go | java | ruby |
资源环境 | 大 | 小 | 小 | 十分小 |
配置 | 简略 | 简略 | 较简单 | 简单 |
可靠性 | 不稳固 | 稳固 | 高牢靠 | 牢靠 |
初步意识并应用flume
用一个故事了解:有一个池子,它一头进水,另一头出水,进水口能够配置各种管子,出水口也能够配置各种管子,能够有多个进水口、多个出水口。水术语称为Event,进水口术语称为Source、出水口术语成为Sink、池子术语成为Channel,Source+Channel+Sink,术语称为Agent。如果有须要,还能够把多个Agent连起来。
flume的名词具体介绍
- Event
数据在flume外部是以Event封装的模式存在的。因而source组件在获取到原始数据后,须要封装成Event后发送到channel中,而后sink从channel取出Event后,依据配置要求再转成其余的模式进行数据输入。
Event封装的对象次要有两局部:Headers和Body;
Headers是一个汇合Map类型,用于存储元数据(如标记、形容等)
Body就是一个字节数组,装载具体的数据内容
- agent
flume最外围的角色就是agent。flume日志采集零碎是由一个个agent连接起来的数据传输通道。
对于每个agent来说就是一个独立的守护过程(JVM),它负责从数据源接收数据,并发送到下一个目的地。
agent外部有三个重要的组件:source,channel,sink
- source
Source是数据的收集端,负责将数据捕捉后进行非凡的格式化,将数据封装到事件(event) 里,而后将事件推入Channel中。并将接管的数据以Event的模式传递给一个或多个channel。Flume提供多种数据接管形式,比方间接读取本地文件,Avro,Thrift等。
- channel
channel是一种短暂的存储容器,它从source处接管到event格局数据后进行缓存,直到被生产掉。它在source和sink之间起到了桥梁作用,channel是一个残缺的事务,这一点保障了数据在收发时的一致性,并且能够和任意数量的source和sink连贯。
反对的类型有:FileSystem Channel,Memory channel,Kafka channel等。
- sink
sink将数据存储到集中存储器比方Hbase和HDFS,它从channels生产数据(events)并将其传递给指标地. 指标地可能是另一个sink,也可能HDFS,HBase.
Flume启动基本上是基于本地配置文件的(遵循Java properties文件格式的文本文件),目前还有一个实验性的性能能够基于Zookeeper的在线配置。
基于以上这些,能够进行一个简略的demo,实时监控服务器/root/tmp.txt文件,将收集到的信息打印到控制台。
a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = TAILDIRa1.sources.r1.channels = c1a1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /root/tmp.txta1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sinks.k1.channel = c1
flume启动命令bin/flume-ng agent -n a1 -c conf -f conf/example.conf -Dflume.root.logger=INFO,console
flume组件详解
Source配置
Source | Desc |
Avro Source | 通过监听一个网络端口来承受数据,而且承受的数据必须是应用avro序列化框架序列化后的数据; |
Thrift Source | 通过监听一个网络端口来承受数据,而且承受的数据必须是应用thrift序列化框架序列化后的数据; |
HTTP Source | 监听一个网络端口,通过http post/get来接收数据,GET形式目前还只是实验性的;该source基于Jetty 9.4,能够解析为JSON或Blob |
NetCat Source | 监听一个网络端口,获取通过TCP或UDP协定的数据 |
Kafka Source | Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取音讯。 如果运行了多个Kafka Source,则能够把它们配置到同一个消费者组,以便每个source都读取一组惟一的topic分区。 |
Exec Source | 启动一个用户所指定的linux shell命令;采集这个linux shell命令的规范输入,作为收集到的数据,转为event写入channel |
Spooling Directory Source | 监督一个指定的文件夹,如果文件夹下有没采集过的新文件,则将这些新文件中的数据采集,并转成event写入channel;留神:spooling目录中的文件必须是不可变的,而且是不能重名的!否则,source会loudly fail! |
Taildir Source | 监督指定目录下的一批文件,只有某个文件中有新写入的行,则会被tail到;它会记录每一个文件所tail到的地位,记录到一个指定的positionfile保留目录中,格局为json(如果须要的时候,能够人为批改,就能够让source从任意指定的地位开始读取数据);它对采集实现的文件,不会做任何批改 |
Syslog Sources | 读取系统日志数据生成event。 |
JMS Source | 从JMS指标(例如队列或主题)读取音讯;作为JMS应用程序,它应可与任何JMS提供程序一起应用,但仅通过ActiveMQ的测试;留神,应该应用plugins.d目录(首选),命令行上的–classpath或通过flume-env.sh中的FLUME_CLASSPATH变量将提供的JMS jar蕴含在Flume类门路中 |
Stress Source | StressSource 是一个外部负载生成Source的实现, 对于压力测试十分有用 。能够配置每个Event的大小(headers为空)、也能够配置总共发送Event数量以及发送胜利的Event最大数量。 |
Custom Source | 自定义source |
Channel配置
Channel | Desc |
Memory Channel | event存储在内存中,且能够配置最大值。对于须要高吞吐而且能够容忍数据失落的状况下,能够抉择该channel |
File Channel | event被缓存在本地磁盘文件中;可靠性高,不会失落; |
Kafka Channel | agent利用kafka作为channel数据缓存;kafka channel要跟 kafka source、 kafka sink区别开来;kafka channel在利用时,能够没有source |
JDBC Channel | event被长久到数据库中,目前反对derby.实用于可复原的场景 |
Spillable Memory Channel | event存储在内存和磁盘上。内存充当主存储,磁盘充当溢出。这个channel目前是实验性的,不倡议用于生产环境 。 |
Custom Channel | 自定义channel |
Sink配置
Sink | Desc |
Null Sink | 间接抛弃 |
Logger Sink | 数据输入到日志中,通常用于debug |
File Roll Sink | 数据存储到本地文件系统 |
HDFS Sink | 数据被最终发往hdfs;能够生成text文件或 sequence 文件,而且反对压缩;反对生成文件的周期性roll机制:基于文件size,或者工夫距离,或者event数量;指标门路,能够应用动静通配符替换,比方用%D代表以后日期;当然,它也能从event的header中,取到一些标记来作为通配符替换 |
Hive Sink | 可将text或json数据间接存储到hive分区表 |
HBaseSink | 数据存储到hbase中 |
ElasticSearchSink | 间接存储到es中 |
Kafka Sink | 存储到kafka中 |
Kite Dataset Sink | 将事件写入Kite数据集。该接收器将反序列化每个传入事件的主体,并将后果记录存储在Kite数据集中。它通过按URI加载数据集来确定指标数据集。(kite是专门来操纵大数据集的,能够通过kite将数据存储到hdfs、local、hive、hbase中,并且还提供了partition分区机制,放慢数据访问速度。并且kite反对avro、parquet、csv、json等几种存储数据的形式。) |
MorphlineSolrSink | (Solr是一个独立的企业级搜寻应用服务器)该接收器从Flume事件中提取数据,对其进行转换,并将其简直实时地加载到Apache Solr服务器中,后者再为最终用户或搜寻应用程序提供查问 |
Avro Sink | avro sink用来向avro source发送avro序列化数据,这样就能够实现agent之间的级联 |
Thrift Sink | 同avro sink |
HTTP Sink | 将接管到的数据通过post申请产生到近程服务,event内容作为申请体发送 |
IRC Sink | 因特网中继聊天(Internet Relay Chat),个别称为互联网中继聊天,简称:IRC。它是由芬兰人Jarkko Oikarinen于1988年独创的一种网络聊天协定。 |
Custom Sink | 自定义sink |
拦截器 Interceptor
拦截器,就是工作在source之后,能够从source取得event,做一个逻辑解决,而后再返回解决之后的event。这也就能够让用户不须要改变source代码的状况下,插入一些解决逻辑。学过java的同学对拦截器应该比较清楚了。具体类型如下:
Interceptor | Desc |
host | 往event的header中插入主机名信息 |
timestamp | 向event中,写入一个kv到header里;key的名称能够随便配置,value就是以后工夫戳 |
uuid | 用于在每个event header中生成一个uuid字符串 |
static动态属性写入拦截器 | 让用户往event中增加一个自定义的header,key-value模式的,当然这个kv在配置文件中是写死的 |
remove_header删除属性拦截器 | 这个拦截器能够删除Event header外面的属性,能够是一个或多个。 |
search_replace查找-替换拦截器 | 该拦截器基于Java正则表达式提供简略的基于字符串的搜寻和替换性能;相似于Java中的Matcher.replaceAll办法 |
regex_filter正则过滤拦截器 | 这个拦截器会把Event的body当做字符串来解决,并用配置的正则表达式来匹配。能够配置指定被匹配到的Event抛弃还是没被匹配到的Event抛弃。 |
regex_extractor正则提取拦截器 | 这个拦截器会应用正则表达式从Event内容体中获取一组值并与配置的key组成n个键值对,而后放入Event的header中,Event的body不会有任何更改。 |
Morphline 实时荡涤拦截器 | 此拦截器通过 morphline配置文件 过滤Event,配置文件定义了一系列转换命令,用于将记录从一个命令传递到另一个命令。 |
custom type as FQCN | 自定义实现拦截器 |
Source通道选择器(Flume Channel Selectors)
一个source能够对接多个channel,那么问题来了,source的数据是怎么在多个channel之间进行传递的呢?这就是selector的性能了,通过selector选择器依据策略能够将event从source传递到指定的channel中去。具体的selector选择器类型如下表格:
Selector | Desc |
replicating 复制选择器 | 默认的选择器,将event进行复制分发给上游所有的节点 |
Multiplexing 多路复用选择器 | 多路选择器,能够依据event中的一个指定key对应的value来决定这条音讯会被写入到那个哪个channel中 |
Custom Selector | 自定义选择器 |
Sink组逻辑处理器(Flume Sink Processors)
一个agent中,多个sink能够被组装到一个组中,而数据在组内多个sink之间发送。接管处理器能够在组内提供负载平衡的性能,或者是在长期故障的状况下实现从一个接收器转移到另一个接收器上。具体的接收器模式见下表格:
Processor | Desc |
default | 默认的接管处理器仅承受一个sink,当然用户也没有必要为了一个sink去创立processor |
Failover | 故障转移模式,即一个组内只有优先级高的sink在工作,而其余的sink处于期待中 |
load_balance | 负载平衡模式,容许channel中的数据在一组sink中的多个sink之间进行轮转,具体的策略有:round-robin(轮流发送);random(随记发送) |
Custom processor | 目前还不反对自定义Sink组逻辑处理器 |
flume进阶
flume的高可靠性
事务是flume高可靠性的保障
首先咱们先来理解一下消息传递的可靠性保障的三种形式:
- At-Least-Once
- At-Most-Once
- Exactly-Once
基本上所有工具的应用用户都心愿工具框架能保障音讯 Exactly-once ,这样就不用在设计实现上思考音讯的失落或者反复的解决场景。然而事实上很少有工具和框架能做到这一点,真正能做到这一点所付出的老本往往很大,或者带来的额定影响反而让你感觉不值得。假如 Flume 真的做到了 Exactly-once ,那势必升高了稳定性和吞吐量,所以 Flume 抉择的策略是 At-least-once 。
Flume采纳的是At-Least-Once策略,这里并不是说Flume任意一个组件就能够实现这种策略,而是通过三个组件(source,channel,sink)之间高低投递音讯来保障,然而如果抉择了Memory Channel的话,这个就不敢打包票能实现At-Least-Once了。Flume要保障at-least-once的根底就是Transaction。
- Source到Channel是事务性的
- Channel到Sink也是事务性的
flume的事务工作流:
这里再详解一下上图的流程,Flume的transaction是有生命周期的,别离是start、commit、rollback和close.
当source往channel投递事件的时候,会首先调用start办法开启事务,将event怼入putList中,此时putList会包裹在一层事务中(为了数据传输的原子性),当putList的数据问题怼入channel,进行事务commit确认,当putList中的数据怼入到channel失败的时候(通常体现为channel中的数据已满了,putList中的数据没方法在放进channel外面,这里须要嗔怪设定channel capacity的那个家伙),进行事务rollback回滚(会将以后的putList清空,source将方才提交的音讯事件向源头进行ack确认,须要上游具备音讯重发性能),而后close事务。
sink生产channel也是同样的模式,惟一不同的中央是sink在往指标源实现写入后才会对事务进行commit,通常为上游的存储系统宕机导致信息无奈收回,此时会执行事务回滚(会将以后的takeList清空,并将takeList的信息回滚到channel中,如果此时channel也是满的,间接报错)。
数据可能会反复
Flume 保障事件至多一次被送到它们的目的地,只有一次倾力写数据,且不存在任何类型的故障事件只被写一次。然而像网络超时或局部写入存储系统的谬误,可能导致事件不止被写一次,因为Flume 将重试写操作直到它们齐全胜利。
flume的分布式
flume灵便的source和sink组件,使得多个agent之间能够进行数据通信,这也是flume扩展性和分布式的根底,能够反对大量的部署计划。Flume内置了专门的RPC sink-source对。对于agent-to-agent通信,首选的RPC sink-RPC source对是Avro Sink-Avro Source对。要从其它的Flume Agent或客户端接收数据,Agent接收数据能够被配置为应用Avro Source,且发送数据的Agent必须配置用来运行Avro Sink。
日志收集场景中比拟常见的是数百个日志生产者发送数据到几个日志消费者Agent上,而后消费者Agent负责把数据发送到存储系统。例如从数百个web服务器收集的日志发送到十几个Agent上,而后由十几个Agent写入到HDFS集群。
日志生产者到日志消费者的数据传输通常应用负载平衡的形式进步整个集群的吞吐量和稳定性。
如果生产工夫的服务器数量继续减少,该层从应用程序服务器接收数据的agent数量也须要减少。这意味着在某种程度上,它可能须要减少后续层来减少缓冲容量,以适应数据产量的减少。
Flume反对多路复用数据流到一个或多个目的地。这是通过应用一个流的[多路复用器](multiplexer)来实现的,它能够 复制 或者 抉择(多路复用) 数据流到一个或多个channel上。
应用flume的channel选择器能够依据须要将数据复制或分组发送到不同的目的地
依据官网的倡议:向HDFS发送数据的Agent数量尽量不要超过8个;下层和上层的数量比率不要超过32:1。
Flume真的适宜你吗?
如果你须要将文本日志数据提取到Hadoop / HDFS中,那么Flume最合适不过了。然而,对于其余状况,你最好看看以下倡议:
Flume旨在通过绝对稳固,可能简单的拓扑部署来传输和收集定期生成的Event数据。“Event数据”定义十分宽泛,对于Flume来说一个Event就是一个一般的字节数组而已。 Event大小有一些限度,它不能比你的内存或者服务器硬盘还大,理论应用中Flume Event能够是文本或图片的任何文件。要害的是这些Event应该是以间断的流的形式一直生成的。 如果你的数据不是定期生成的(比方你将大量的数据批量加载到Hadoop集群中),尽管Flume能够做这个事件,然而有点“杀鸡用牛刀”的感觉,这并不是Flume所善于和喜爱的工作形式。 Flume喜爱绝对稳固的拓扑构造,但也不是说永远变化无穷,Flume能够解决拓扑中的更改而又不失落数据,还能够解决因为故障转移或者配置的定期从新加载。如果你的拓扑构造每天都会变动, 那么Flume可能就无奈失常的工作了,毕竟重新配置也是须要肯定思考和开销的。
Flume的拓扑设计
FLume的监控
Flume作为一个弱小的数据收集工具,尽管性能十分弱小实用,然而却无奈看到flume收集数据的详细信息,所以咱们须要一个能展现flume实时收集数据动静信息的界面,包含flume胜利收集的日志数量、胜利发送的日志数量、flume启动工夫、进行工夫、以及flume一些具体的配置信息,像通道容量等,于是顺利成章的监控能帮咱们做到这些,有了这些数据,在遇到数据收集瓶颈或者数据失落的时候,通过剖析监控数据来剖析、解决问题。
source监控项:
指标项 | 阐明 |
OpenConnectionCount | 目前与客户端或sink放弃连贯的总数量 |
AppendBatchAcceptedCount | 胜利提交到channel的批次的总数量 |
AppendBatchReceivedCount | 接管到事件批次的总数量 |
AppendAcceptedCount | 逐条录入的次数 |
AppendReceivedCount | 每批只有一个事件的事件总数量 |
EventAcceptedCount | 胜利写出到channel的事件总数量 |
EventReceivedCount | 目前为止source曾经接管到的事件总数量 |
StartTime | source启动时的毫秒值工夫 |
StopTime | source进行时的毫秒值工夫,为0示意始终在运行 |
channel监控项:
指标项 | 阐明 |
EventPutAttemptCount | Source尝试写入Channe的事件总次数 |
EventPutSuccessCount | 胜利写入channel且提交的事件总次数 |
EventTakeAttemptCount | sink尝试从channel拉取事件的总次数 |
EventTakeSuccessCount | sink胜利从channel读取事件的总数量 |
ChannelSize | 目前channel中事件的总数量 |
ChannelCapacity | channel的容量 |
ChannelFillPercentage | channel已填入的百分比 |
StartTime | channel启动时的毫秒值工夫 |
StopTime | channel进行时的毫秒值工夫,为0示意始终在运行 |
sink监控项:
指标项 | 阐明 |
ConnectionCreatedCount | 创立的连贯数量 |
ConnectionClosedCount | 敞开的连贯数量 |
ConnectionFailedCount | 因为谬误敞开的连贯数量 |
BatchEmptyCount | 批量解决event的个数为0的数量-示意source写入数据的速度比sink解决数据的速度慢 |
BatchUnderflowCount | 批量解决event的个数小于批处理大小的数量 |
BatchCompleteCount | 批量解决event的个数等于批处理大小的数量 |
EventDrainAttemptCount | sink尝试写出到存储的事件总数量 |
EventDrainSuccessCount | sink胜利写出到存储的事件总数量 |
StartTime | channel启动时的毫秒值工夫 |
StopTime | channel进行时的毫秒值工夫,为0示意始终在运行 |
- JMX Reporting
Java 治理扩大(Java Management Extension,JMX)是从jdk1.4开始的,但从1.5时才加到jdk外面,并把API放到java.lang.management包外面。MX监控能够通过在flume-env.sh脚本中批改JAVA_OPTS环境变量中的JMX参数来开启
如果一个 Java 对象能够由一个遵循 JMX 标准的管理器利用治理,那么这个Java 对象就能够称为一个可由 JMX 治理的资源。 - JSON Reporting
Flume也反对以JSON格局报告运行指标。为了对外提供这些报告数据,Flume会在某个端口(可自定义)上运行一个web服务来提供这些数据。 - Ganglia Reporting
Ganglia是UC Berkeley发动的一个开源集群监督可视化工具,设计用于测量数以千计的节点。Ganglia的外围蕴含gmond、gmetad以及一个Web前端。次要是用来监控零碎性能,如:cpu 、mem、硬盘利用率, I/O负载、网络流量状况等,通过曲线很容易见到每个节点的工作状态,对正当调整、调配系统资源,进步零碎整体性能起到重要作用。 - Custom Reporting
能够通过编写本人的执行报告服务向其余零碎报告运行指标。 报告类必须实现org.apache.flume.instrumentation.MonitorService 接口。
我的项目实际
kafka集群作为channel的计划
Memory Channel 有很大的丢数据危险,而且容量个别,File Channel 尽管能缓存更多的音讯,但如果缓存下来的音讯还没写入 Sink,此时 Agent 呈现故障则 File Channel 中的音讯一样不能被持续应用,直到该 Agent 复原。而 Kafka Channel 容量大,容错能力强。
有了 Kafka Channel 能够在日志收集层只配置 Source 组件和 Kafka 组件,不须要再配置 Sink 组件,缩小了日志收集层启动的过程数,无效升高服务器内存、磁盘等资源的使用率。而日志汇聚层,能够只配置 Kafka Channel 和 Sink,不须要再配置 Source。
- 业务代码中实时发送日志
- 业务服务器装置flume读取本地日志文件,发送日志
flume Collector配置文件:
# Name the components on this agenta1.sources = r1# a1.sinks = k1a1.channels = c1# Describe/configure the source a1.sources.r1.type = avroa1.sources.r1.channels = c1a1.sources.r1.bind = node4a1.sources.r1.port = 4444a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092a1.channels.c1.kafka.topic = testflumea1.channels.c1.kafka.consumer.group.id = test-consumer-groupa1.sources.r1.channels = c1
本地读取文件发送flume Collector的配置文件:
# Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1# Describe/configure the source a1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = ./taildir_position.jsona1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /root/tmp1.txta1.sources.r1.filegroups.f2 = /root/tmp2.txta1.sources.r1.fileHeader = falsea1.sources.r1.batchSize = 1a1.sources.r1.channels = c1a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = avroa1.sinks.k1.hostname = node4a1.sinks.k1.port = 4444a1.sinks.k1.channel = c1a1.sinks.k2.type = avroa1.sinks.k2.hostname = node5a1.sinks.k2.port = 4444a1.sinks.k2.channel = c1a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.selector = random
Logback配置flume
业务零碎中向flume发送日志通常要联合日志框架(Log4j、Logback等)一起应用,SpringBoot天生自带logback,咱们这里只须要引入一个logback-flume依赖即可。
<dependency> <groupId>com.teambytes.logback</groupId> <artifactId>logback-flume-appender_2.10</artifactId> <version>0.0.9</version></dependency>
而后在logback的配置文件中进行相干配置:
增加一个appender,配置向flume发送音讯的相干信息,配置多个flumeAgent时默认会应用负载平衡的形式进行发送
<appender name="flumeTest" class="com.teambytes.logback.flume.FlumeLogstashV1Appender"> <flumeAgents> 192.168.197.130:4444,192.168.197.131:4444 </flumeAgents> <flumeProperties> connect-timeout=4000; request-timeout=8000 </flumeProperties> <batchSize>100</batchSize> <reportingWindow>1000</reportingWindow> <additionalAvroHeaders> myHeader = myValue </additionalAvroHeaders> <application>wk's Application</application> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %message%n%ex</pattern> </layout></appender>
而后启用这个appender即可
<logger name="com" level="info"> <appender-ref ref="flumeTest"/> <appender-ref ref="fileAppender"/></logger>