关于flume:Flume简述

数新网络官网已全新上线,欢送点击拜访 数新网络官网_让每个人享受数据的价值 1. 什么是Flume?Flume是什么?咱们从flume的图标就能看出,它是一个水道。Flume的图标上面是水,下面是木头。当咱们在山上的伐木场(source)有很多木头须要运输到山下的时候,如果一棵一棵的去搬的话,太费膂力了,刚好山上有条小溪(channel),而它恰好又是通往山下的,咱们就能够把木头放到小溪里顺流而下,再在山下挖一个水槽(sink),让木头流向这个水槽,咱们须要用的时候就能够在这个水槽外面去取。而flume就是集(source+channel+sink)的一个日志数据采集工具。 编辑 图片来源于官网 2. Flume概述 编辑 图1-1 Flume是一个分布式、牢靠、和高可用的海量日志采集、聚合和传输的零碎。 Flume能够采集文件,socket数据包、文件、文件夹、kafka等各种模式源数据,又能够将采集到的数据(下沉sink)输入到HDFS、hbase、hive、kafka等泛滥内部存储系统中个别的采集需要,通过对flume的简略配置即可实现,Flume针对非凡场景也具备良好的自定义扩大能力。因而,flume能够实用于大部分的日常数据采集场景。 2 Flume版本1.Flume og构造 编辑 图2-1 Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera,随着 Flume 性能的扩大,Flume OG 代码工程臃肿、外围组件设计不合理、外围配置不规范等毛病裸露进去。以后咱们应用的个别是flume ng (apache Flume)。 Flume OG由三个角色节点组成:agent,collector,master组成。agent负责从数据源收集数据,将数据集中到collector,而后汇入数据库,主节点master负责管理,在OG版本应用中,应用稳定性依赖于zookeeper。 2.Flume ng根底构造 编辑 图2-2 Flume NG 只有一种角色的节点:代理节点(agent),没有 collector、master 节点。这是外围组件最外围的变动。去除了 physical nodes、logical nodes 的概念和相干内容。agent 节点的组成也产生了变动。NG agent 由 source、sink、channel 组成。 3 Flume事物及流程Flume事务 编辑 图3-1 Put事务流程将数据先写入 putlist 长期缓冲区,查看channel内存是否足够合并(如果后续没有数据了,将在设定的工夫内合并),channel内存不足,将回滚数据。 Take事务流程将数据推入长期缓冲区takelist,并发送到HDFS,如果数据全副发送胜利,则革除长期缓冲区 takelist,数据发送过程中如果出现异常,rollback 将长期缓冲区 takelist 中的数据归还给 channel 内存队列。 2.Flume agent 流程 编辑 图3-2 在Source接管到数据后,会把数据包装成Event,并把数据交给channel,由ChannelProcessor决定怎么交,交给谁。在ChannelProcessor流程中首先会将数据交给拦截器链,可进行数据清理,解决脏数据。拦截器尽量抉择简略的逻辑,不然会影响流式解决链条,整个流程都会变慢。后将数据交给Channel选择器ChannelSelector,ChannelSelector解决完将数据返还给ChannelProcessor,依据ChannelSelector的抉择后果将数据写入对应的channel。 每个channel都有一个独立的SinkProcessor,sink不是间接拉取数据的,sink是由SinkProcessor决定怎么拉取数据的,默认的SinkProcessor一共有三种,别离是: DefaultSinkProcessor:接管繁多的Sink,不强制用户为Sink创立Processor;LoadBalancingSinkProcessor:负载平衡片处理器提供在多个Sink之间负载平衡的能力;FailoverSinkProcessor:通过配置保护了一个优先级列表。保障每一个无效的事件都会被解决(故障转移)。4 学习总结Flume次要是用来数据采集,将用户的信息以及行为日志保留到HDFS集群中或者本地文件夹中,相当于一个搬运工,flume的单位是agent。学习flume的重点是须要把握source和channel之间的事务、sink和channel之间的事务流程原理,以及source和channel之间的复制和多路复用的原理、sink和channel之间的负载平衡、故障转移等原理。 一 END 一

June 29, 2023 · 1 min · jiezi

关于flume:躬行系列flume安装

环境java环境hadoop环境zookeeper和kafka都要装置实现 组件抉择sourceflume1.7退出的taildir类型的source,能够随时监控文件变动、反对断点续传。是支流的source类型。 channelkafka channel,能将source传入的数据,间接存储到kafka。既保留在磁盘中,进步了可靠性。数据层传输的效率又高。是支流的channel类型 sink步骤/usr/local/apache-flume-1.10.0-bin/conf/flume-env.sh.template文件去掉template后缀配置java home export JAVA_HOME=/usr/local/jdk-11.0.15

November 7, 2022 · 1 min · jiezi

关于flume:flume在k8s下程序发布

企业容器化部署成为趋势,基于k8s的容器编排成为中大企业部署服务的青眼。本文简略介绍k8s下flume程序的部署,flume个别用户大数据中数据的迁徙以及数据接入。 根底镜像构建根底镜像构建次要包含: 1.apache-flume包2.mongoclients包:用于flume连mongo数据库3.lib包:程序依赖4.run.sh:程序启动脚本文件 根底镜像Dockerfile: FROM openjdk:11.0.12-jre# 筹备环境ENV LANGUAGE=zh_CN.UTF-8ENV TZ=Asia/ShanghaiRUN mkdir /apps/WORKDIR /apps/# 增加文件ADD apache-flume-1.9.0-bin /apps/apache-flume-1.9.0-bin# 将 mongo-java-driver-2.6.5.jar、mongo-java-driver-3.12.7.jarADD mongoclients /apps/mongoclients# 将应用程序依赖拷贝到lib下ADD lib /lib/# 将启动命令拷贝到apps上面ADD run.sh /apps/# 启动服务ENV AGENT_NAME=a1ENTRYPOINT ["/bin/sh","-c","bash run.sh"]run.sh启动脚本:启动脚本中会传递MONGO变量,用于指代咱们mongo版本 #!/bin/bashif [ $MONGO -eq 3 ]; then ln -sf /apps/mongoclients/mongo-java-driver-3.12.7.jar /lib/elif [ $MONGO -eq 2 ]; then ln -sf /apps/mongoclients/mongo-java-driver-3.12.7.jar /lib/fi./apache-flume-1.9.0-bin/bin/flume-ng agent \ -Xms2000m -Xmx4000m \ --conf ./apache-flume-1.9.0-bin/conf \ --conf-file ./apache-flume-1.9.0-bin/conf/flume.conf \ -n ${AGENT_NAME:-a1}工程镜像咱们的业务代码是依靠于根底镜像框架,所以咱们须要将咱们的工程代码打包成一个依赖拷贝到flume应用程序框架中去。咱们将Dockerfile寄存在咱们的业务工程代码块中,而后通过mvn编译之后便可实现依赖打包镜像 Dockerfile:xxx-1.0-SNAPSHOT.jar指代咱们的业务代码。 FROM harbor.dc.prod:8000/base/flume:1.0ADD target/xxx-1.0-SNAPSHOT.jar /lib/构建镜像: docker build -t harbor.dc.prod:8000/flume-access:1.0.0 .docker push harbor.dc.prod:8000/flume-access:1.0.0docker容器下服务启动配置文件: ...

June 11, 2022 · 1 min · jiezi

关于flume:Flume组件部署

一、原理1.Linux环境Flume装置配置及应用2.Apache Flume 入门教程3.flume的部署和avro source测试;netcast source测试4.Flume部署及应用5.Flume监听本地Linux-hive日志文件采集到HDFS1.Flume组件Flume的作用: 可用于实时计算和离线计算的数据源采集工具 。Flume 的外围(agent)就是把数据从数据源收集过去,再送到目的地。为了保障高牢靠输送肯定胜利,在送到目的地之前,会先缓存数据,待数据真正达到目的地后,删除本人缓存的数据。Flume : 传输的数据的根本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的根本单位。Event : (蕴含:headers:{} 、body) 从 Source,流向 Channel,再到 Sink,自身为一个 byte 数组,在flume中应用事件作为传输的根本单元。Source : 数据源,用户从数据发生器采集接收数据,source产生数据流,同时会把产生的数据流以Flume的event格局传输到一个或者多个channel。Channel : 就像一个管道(队列),接管 Source 的输入,再推送给 Sink 生产。数据直到进入到下一个Channel中或者进入终端才会被删除。即:直达Event长期存储,在 sources 和 sinks之间起一个连贯作用 。Channel 是一个残缺的事务,这一点保障了数据在收发的时候的一致性. 并且它能够和任意数量的source和sink链接。Sink : 下沉,取出 Channel 中的数据,而后送给内部源(HDFS、HBase)或者其余 Source。 2.装置与配置(1).Flume前提环境java环境(2).解压apache-flume-1.6.0-bin.tar.gz安装包到目标目录下# 解压tar -zxvf apache-flume-1.6.0-bin.tar.gz.tar.gz -C /usr/local# 更改名字cd /usr/localmv apache-flume-1.6.0-bin flume(3).批改配置文件: flume-env.sh# 进入配置文件目录cd /usr/local/flume/conf# 复制配置文件cp flume-env.sh.template flume-env.sh# 批改以下信息vi flume-env.shexport JAVA_HOME=/usr/local/jdk(4).配置环境变量# 关上配置文件vi /etc/profile# 追加以下内容export FLUME_HOME=/usr/local/flumeexport PATH=$PATH:$FLUME_HOME/bin# 申明环境变量,即让环境变量立刻失效source /etc/profile# 应用命令查看Flume版本信息flume-ng version# 执行后果Flume 1.6.0Source code repository: https://git-wip-us.apache.org/repos/asf/flume.gitRevision: 2561a23240a71ba20bf288c7c2cda88f443c2080Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015From source with checksum b29e416802ce9ece3269d34233baf43f(5).配置Flume示例1 -- netcat sourcenetcat source + memory channel + logger sink# 应用Flume的要害就是写配置文件,别离配置Source、Channel、Sink,而后把三者串联起来; 比方这里写一个配置文件$FLUME_HOME/conf/example_netcat.conf,应用netcat source、memory channel、logger sink,# example_netcat.conf内容如下:a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type = netcata1.sources.r1.bind = mastera1.sources.r1.port = 55555a1.sinks.k1.type = loggera1.channels.c1.type = memorya1.sources.r1.channels = c1a1.sinks.k1.channel = c1(6).启动Flume agentflume-ng agent 应用ng启动agent--name a1指定的agent别名--conf YYYY/ 指定配置所在的文件夹--conf-file YYYY/XXXXXX 指定配置文件-Dflume.root.logger=INFO,console 可选,指定日志输入级别(输入到控制台)& 可选,Flume在后盾运行# 命令示例,依据指定的配置文件启动Agentflume-ng agent \--name a1 \--conf $FLUME_HOME/conf \--conf-file $FLUME_HOME/conf/example_netcat.conf \-Dflume.root.logger=INFO,console(7).应用telnet输出数据验证telnet master 55555# 启动后输出内容123就能够在Flume看到如下数据:Event: { headers:{} body: 31 32 33 0D 123. }Event是FLume数据传输的根本单元Event = 可选的header + byte array

April 6, 2022 · 1 min · jiezi

关于flume:Flume日志采集框架构成组件

框架结构Flume 的外围是把数据从数据源收集过去,再送到目的地。为了保障输送肯定胜利,在送到目的地之前,会先缓存数据,待数据真正达到目的地后,删除本人缓存的数据。 Flume分布式系统中最外围的角色是agent,flume采集零碎就是由一个个agent所连接起来造成。 flume形成组件ClientClient生产数据,运行在一个独立的线程。 Event一个数据单元,音讯头和音讯体组成。(Events能够是日志记录、 avro 对象等。) FlowEvent从源点达到目标点的迁徙的形象。 Agent构造Flume 运行的外围是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个残缺的数据收集工具,含有三个外围组件,别离是source、 channel、 sink。通过这些组件, Event 能够从一个中央流向另一个中央,如下图所示。 Source Source是数据的收集端,负责将数据捕捉后进行非凡的格式化,将数据封装到事件(event) 里,而后将事件推入Channel中。Flume提供了很多内置的Source, 反对 Avro, log4j, syslog 和 http post(body为json格局)。能够让应用程序同已有的Source间接打交道,如AvroSource,SyslogTcpSource。如果内置的Source无奈满足需要, Flume还反对自定义Source。 Channel Channel是连贯Source和Sink的组件,大家能够将它看做一个数据的缓冲区(数据队列),它能够将事件暂存到内存中也能够长久化到本地磁盘上, 直到Sink解决完该事件。介绍两个较为罕用的Channel, MemoryChannel和FileChannel。 Sink Sink从Channel中取出事件,而后将数据发到别处,能够向文件系统、数据库、 hadoop存数据, 也能够是其余agent的Source。在日志数据较少时,能够将数据存储在文件系统中,并且设定肯定的工夫距离保留数据。 关键词:大数据培训

January 19, 2022 · 1 min · jiezi

关于flume:流处理组件Flume使用攻略

Flume概述 Flume是一种日志采集工具。是一种分布式,牢靠且可用的服务,可用于无效的手机,聚合和挪动大量日志数据,它具备基于流数据的简略灵便架构,它具备可靠性机制和许多故障转移和复原机制,具备弱小的容错能力;它应用简略的可拓展数据模型,容许在线剖析应用程序。 Flume是Hadoop生态圈中的一个组件。次要利用于实时数据的流解决,比方一旦有某事件触发(如本地交易引起的数据改变)能够将实时的日志数据发向Hadoop文件系统HDFS中。 Flume能够将数量宏大的数据从各项数据资源中集中起来存储的工具/服务,或者集中机制。所以它还有较强的缓存作用。 Flume具备较高的容错性。例如当收集数据的速度超过将写入数据的时候,即超过了零碎的写入数据能力的时候,Flume会在数据生产者和数据收容器间做出调整,保障其可能在两者之间提供可推送的安稳数据。 Flume反对多路径流量,多管道接入流量,多管道接出流量。大数据培训例如Flume的数据收回源及目的地能够是不同类别的比方社交媒体, 关系型数据库Hbase,Hdfs 前端控制台也能够是其余流工具如spark stream,Kafka,甚至其它的Flume。 Flume组件介绍 流程图: 用一个故事了解:有一个池子,它一头进水,另一头出水,进水口能够配置各种管子,出水口也能够配置各种管子,能够有多个进水口、多个出水口。水术语称为Event,进水口术语称为Source、出水口术语成为Sink、池子术语成为Channel,Source+Channel+Sink,术语称为Agent。如果有须要,还能够把多个Agent连起来。 Flume环境搭建官网下载地址:http://flume.apache.org/downl... 解压:tar zxf apache-flume-1.8.0-bin.tar.gz 批改配置文件:vim apache-flume-1.8.0-bin/conf/myagent.conf 文件名随便文件里增加如下配置(Sink为HDFS): myagent.sources= eventDirmyagent.channels= memoryChannelmyagent.sinks= eventHDFS myagent.sources.eventDir.type= spooldirmyagent.sources.eventDir.spoolDir= /home/cloudera/eventsmyagent.sources.eventDir.fileHeader= true myagent.channels.memoryChannel.type= memorymyagent.channels.memoryChannel.capacity= 10000myagent.channels.memoryChannel.transactioncapacity= 1000000 myagent.sinks.eventHDFS.type= hdfsmyagent.sinks.eventHDFS.hdfs.fileType= DataStreammyagent.sinks.eventHDFS.hdfs.path= /temp/eventsmyagent.sinks.eventHDFS.hdfs.writeFormat= Textmyagent.sinks.eventHDFS.hdfs.batchSize=10000 myagent.sources.eventDir.channels= memoryChannelmyagent.sinks.eventHDFS.channel= memoryChannel 运行 # 启动hadoop ./sbin/start-dfs.sh # 启动Flume ./bin/flume-ng agent -n spooldir -c conf -f conf/spooldir.conf 参数阐明: -n 指定agent名称(与配置文件中代理的名字雷同) -c 指定flume中配置文件的目录 -f 指定配置文件 -Dflume.root.logger=DEBUG,console 设置日志等级 最初把数据放在/home/cloudera/events外面 ;source组件就能够获取到数据。 附配置文件阐明: myagent.sources= eventDirmyagent.channels= memoryChannelmyagent.sinks= eventHDFS#这里给agent (名字是myagent, 能够是任起的) 中sources、sinks、channels别离起名--eventDir,memoryChannel以及eventHDFS。这样当前就能够援用这些名字在配置文件或命令行中形容各种操作或关系myagent.sources.eventDir.type= spooldir #指定source的类型myagent.sources.eventDir.spoolDir= /home/cloudera/events #Source用来监听一个指定的目录/user/me/eventsmyagent.sources.eventDir.fileHeader= true myagent.channels.memoryChannel.type= memory #设置channel为内存型myagent.channels.memoryChannel.capacity= 10000 #设置内存管道中存储事件数目下限myagent.channels.memoryChannel.transactioncapacity= 1000000 #设置内存管道中传送事件数目下限myagent.sinks.eventHDFS.type= hdfs #设置sink的传输类型myagent.sinks.eventHDFS.hdfs.fileType= DataStream #设置sink接管的文件类型myagent.sinks.eventHDFS.hdfs.path= /temp/events #设置sink接管的文件目的地即HDFS下的门路myagent.sinks.eventHDFS.hdfs.writeFormat= Text #设置sink文件写入格局Textmyagent.sinks.eventHDFS.hdfs.batchSize=10000 #设置一次性写入事件数10000#设置channel = memoryChannel 桥接source和sinkmyagent.sources.eventDir.channels= memoryChannelmyagent.sinks.eventHDFS.channel= memoryChannel拓展(souce为监听网络端口)Source监听能够指定一个网络端口,即只有应用程序向这个端口外面数据,这个source组件就能够获取到信息而后写入到channle。 ...

January 18, 2022 · 1 min · jiezi

关于flume:如何远程调试自定义开发的Flume应用

一、前言Flume作为当下最风行的大数据采集组件之一。其自身领有分布式/高牢靠/高可用等长处,但相比拟于Flink/Spark/Kafka等大数据组件,其对于本地调试的性能反对度并不高,如果咱们没有把握Flume的近程调试要领,就只能不停的进行打日志,部署,打日志,部署这样低效的工作,而这对于程序员来说无异于折磨。所以明天小编就和大家一起来探索Flume的近程调试办法。 二、环境筹备flink官网下载上传服务器并解压。开发自定义Source,这里以简略的读取mysql表数据为demo,局部代码如下:package org.bigwinner.flume.sources;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.PollableSource;import org.apache.flume.conf.Configurable;import org.apache.flume.event.EventBuilder;import org.apache.flume.source.AbstractSource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.*;/** * @author: IT大狮兄 * @date: 2021/8/13 下午9:11 * @version: 1.0.0 * @description: 自定义Source--读取MySQL表的数据 */public class MysqlSource extends AbstractSource implements PollableSource, Configurable { private static final Logger LOGGER = LoggerFactory.getLogger(MysqlSource.class); private String mysqlUrl; private String mysqlUser; private String mysqlPassword; private String mysqlTable; private String mysqlDriver; private Connection conn = null; public Status process() throws EventDeliveryException { String sql = "select * from " + mysqlTable; try { PreparedStatement statement = conn.prepareStatement(sql); ResultSet resultSet = statement.executeQuery(); while (resultSet.next()) { String id = resultSet.getString(1); String uuid = resultSet.getString(2); String iccid = resultSet.getString(3); byte[] eventBytes = new StringBuilder().append(id).append("--") .append(uuid).append("--").append(iccid).toString().getBytes(); Event event = EventBuilder.withBody(eventBytes); getChannelProcessor().processEvent(event); } } catch (SQLException throwables) { throwables.printStackTrace(); } return Status.READY; } public long getBackOffSleepIncrement() { return 0; } public long getMaxBackOffSleepInterval() { return 0; } @Override /** Flume生命周期开始,能够做一些初始化的工作 */ public void start() { LOGGER.info("Mysql source start......"); try { Class.forName(mysqlDriver); conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword); } catch (ClassNotFoundException e) { LOGGER.error("Driver class is not found!"); } catch (SQLException throwables) { LOGGER.error("get the connection error: {}", throwables); } } @Override /** Flume生命周期完结,能够做一些保留等完结前的工作 */ public void stop() { LOGGER.info("Mysql source stop......"); if (conn != null) { try { conn.close(); } catch (SQLException throwables) { LOGGER.error("连贯敞开异样: {}", throwables); } } super.stop(); } /** Flume配置文件读取的办法 */ public void configure(Context context) { mysqlUrl = context.getString("mysql.url", ""); mysqlUser = context.getString("mysql.user", ""); mysqlPassword = context.getString("mysql.password", ""); mysqlTable = context.getString("mysql.table", ""); LOGGER.info("mysql_driver: {} --> mysql_url: {} --> mysql_user: {} --> mysql_password: {} --> mysql_table: {}", mysqlDriver, mysqlUrl, mysqlUser, mysqlPassword, mysqlTable); }}编辑flume agent配置文件,并上传到flume conf目录下a1.sources = s1a1.sinks = k1a1.channels = c1############################## Source##############################自定义MySQL source类a1.sources.s1.type = org.bigwinner.flume.sources.MysqlSourcea1.sources.s1.mysql.driver = com.mysql.jdbc.Drivera1.sources.s1.mysql.url = jdbc:mysql://lsl001:3306/redis_tempa1.sources.s1.mysql.user = superboya1.sources.s1.mysql.password = iamsuperboya1.sources.s1.mysql.table = redis_temp############################## Channel##############################配置file-channel数据管道a1.channels.c1.type = file#最小需要空间a1.channels.c1.minimumRequiredSpace = 3145728#最大文件大小a1.channels.c1.maxFileSize = 2146435071#flume事件指针检查点备份目录a1.channels.c1.checkpointDir = /opt/soft/flume/flume/data/checkpoint#file-channel对event备份到本地的文件目录a1.channels.c1.dataDirs = /opt/soft/flume/data/file-channel-mysql/data#文件管道中的数据容量,单位条数a1.channels.c1.capacity = 200#文件管道中的事务数据容量,单位条数a1.channels.c1.transactionCapacity = 100#检查点备份flume工夫指针的间隔时间a1.channels.c1.checkpointInterval=60000############################## Sink##############################本次测试重点在Source,所以sink用null即可,示意不输入到任何中央a1.sinks.k1.type = nulla1.sources.s1.channels = c1a1.sinks.k1.channel = c1打包,打成成宽依赖包,即蕴含所有依赖。并上传到flume的lib目录下三、环境配置服务器环境配置 ...

August 16, 2021 · 2 min · jiezi

关于flume:数据采集组件Flume基础用法和Kafka集成

本文源码:GitHub || GitEE 一、Flume简介1、根底形容Flume是Cloudera提供的一个高可用的,高牢靠的,分布式的海量日志采集、聚合和传输的零碎,Flume反对在日志零碎中定制各类数据发送方,用于收集数据; 特点:分布式、高可用、基于流式架构,通常用来收集、聚合、搬运不同数据源的大量日志到数据仓库。 2、架构模型 Agent包含三个外围组成,Source、Channel、Sink。Source负责接收数据源,并兼容多种类型,Channel是数据的缓冲区,Sink解决数据输入的形式和目的地。 Event是Flume定义的一个数据流传输的根本单元,将数据从源头送至目的地。 Flume能够设置多级Agent连贯的形式传输Event数据,从最后的source开始到最终sink传送的目标存储系统,如果数量过多会影响传输速率,并且传输过程中单节点故障也会影响整个传输通道。 Flume反对多路复用数据流到一个或多个目的地,这种模式能够将雷同数据复制到多个channel中,或者将不同数据散发到不同的channel中,并且sink能够抉择传送到不同的目的地。 Agent1了解为路由节点负责Channel的Event平衡到多个Sink组件,每个Sink组件分別连贯到独立的Agent上,实现负载平衡和谬误复原的性能。 Flume的应用组合形式做数据聚合,每台服务器部署一个flume节点采集日志数据,再汇聚传输到存储系统,例如HDFS、Hbase等组件,高效且稳固的解决集群数据的采集。 二、装置过程1、安装包apache-flume-1.7.0-bin.tar.gz 2、解压命名[root@hop01 opt]# pwd/opt[root@hop01 opt]# tar -zxf apache-flume-1.7.0-bin.tar.gz[root@hop01 opt]# mv apache-flume-1.7.0-bin flume1.73、配置文件配置门路:/opt/flume1.7/conf mv flume-env.sh.template flume-env.sh4、批改配置增加JDK依赖 vim flume-env.shexport JAVA_HOME=/opt/jdk1.85、环境测试装置netcat工具 sudo yum install -y nc创立工作配置 [root@hop01 flume1.7]# cd job/[root@hop01 job]# vim flume-netcat-test01.conf增加根底工作配置 留神:a1示意agent名称。 # this agenta1.sources = sr1a1.sinks = sk1a1.channels = sc1# the sourcea1.sources.sr1.type = netcata1.sources.sr1.bind = localhosta1.sources.sr1.port = 55555# the sinka1.sinks.sk1.type = logger# events in memorya1.channels.sc1.type = memorya1.channels.sc1.capacity = 1000a1.channels.sc1.transactionCapacity = 100# Bind the source and sinka1.sources.sr1.channels = sc1a1.sinks.sk1.channel = sc1开启flume监听端口 ...

March 5, 2021 · 1 min · jiezi

关于flume:Flume入门与进阶

一.Flume根底二.Flume source三.Flume事务和外部原理四.Flume拦截器

December 30, 2020 · 1 min · jiezi

关于flume:四Flume拦截器

Flume内置拦截器官网文档 http://flume.apache.org/FlumeUserGuide.htmlTimestamp Interceptor、Host Interceptor、Static Interceptor等等,能够间接拿来用,以Timestamp Interceptor为例,在header中增加一个工夫戳:1)创立配置文件vim timestramp_interceptor_demo.conf # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe interceptora1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c12)启动flume ...

December 30, 2020 · 2 min · jiezi

关于flume:三Flume事务和内部原理

1.Flume 事务Flume应用两个独立的事务别离负责从soucrce到channel,以及从channel到sink的事件传递。在Source到Channel之间的叫put事务,在Channel到Sink之间的叫Take事务。事务两个个性就是:胜利了提交,失败了回滚。 1.1 put事务从source到channel过程中,数据在flume中会被封装成Event对象,多个event被放到一个事务中,而后把这个蕴含events的事务放到channel中。 1.事务开始的时候会调用一个doPut办法,doPut办法的会将这批数据batch data,也就是一批event放到putList中。doPut传递的数据的大小能够通过参数bathchSize配置。putList的大小则通过channel的参数transactionCapacity进行配置。 2 当数据胜利寄存到putList之后,调用doCommit()办法,putList中所有的event进入channel()中, 1)胜利则清空putList.2) 不胜利的状况 从putList传输到channel过程出问题,在doCommit提交之后,事务在向channel放的过程中,遇到问题。sink那边取数据速度要比Source这边放数据速度慢,导致channel中的数据积压,这个时候就会造成putList中的数据放不进去。这时会进行事务的回滚操作,调用doRollback办法,doRollback办法会做两个事件: - 1、清空putList中的数据; - 2、抛出channelException异样。当source捕捉到doRollback抛出的异样,就会把方才的一批数据从新采集一下,采集完之后从新走事务的流程。 在数据采集的过程中也有可能呈现问题,同样是调用doRollback办法来对事务进行回滚。1.2 take事务 1.事务开始时,调用doTake办法,将channel中的event提取到(剪切)takeList中,2.如果前面的sink是HDFS Sink,同时在写入HDFS的IO缓冲流中放一份event。3.当takeList中寄存的Event达到约定数量(batchSize) ,就会调用doCommit办法: 胜利执行状况下: 如果是HDFS Sink,那么手动调用IO流的flush办法,将IO流缓冲区的数据写入到HDFS磁盘中,同时清空takeList中的数据失败状况下: 1.网络提早等起因导致传输数据失败,调用doRollback办法来进行回滚,takeList中还有备份数据,所以将takeList中的数据一成不变地还给channel,这时候就实现了事务的回滚。 2.如果takeList数据有一部分传输胜利了,剩下的因为网络提早传输失败了。同样会调用doRollback办法来进行回滚,它会把整个takeList中的数据返回给channel,而后持续进行数据的读写。如此一来,再次进行事务时候,就会存在数据反复的可能。 2.Flume外部原理1). Source采集数据EventBuilder.withBody(body)将数据封装成Event对象,getChannelProcessor().processEvent(event)将数据交给Channel Processor通过源码能够看到,以avro source为例 public Void append(AvroFlumeOGEvent evt) throws AvroRemoteException { ..... Event event = EventBuilder.withBody(evt.getBody().array(), headers); // 将数据封装成Event对象, try { getChannelProcessor().processEvent(event); // 将数据交给Channel Processor this.counterGroup.incrementAndGet("rpc.events"); } catch (ChannelException ex) { return null; } this.counterGroup.incrementAndGet("rpc.successful"); return null; }2)Channel Processor将Event事件传递给拦截器链interceptorChain.intercept(event),而后将数据返回给Channel Processor。3)Channel Processor将拦挡过滤之后的Event事件传递给Channel选择器(Channel Selector)),Channel Selector返回给Channel Processor写入event事件的Channel列表其中Channel Selectors有两种类型: - 1.Replicating Channel Selector : 将source过去的events发往所有的channel(相当于复制多份,默认应用的channel selector) - 2.Multiplexing Channel Selector:能够指定source发过来的events发往的channel ...

December 30, 2020 · 2 min · jiezi

关于flume:二Flume-source

1 Flume source罕用类型1.1 Avro Source1.1.1 概述文档定义:Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. 监听Avro端口,从Avro Client 流中接管Agents当与另一个(前一跳)的flume agent内置的Avro Sink匹配时,塔可能够创立分层收集拓扑。 加粗为必须配置的属性 1.1.2 示例创立配置文件cd jobsvim avro_demo1.conf #定义agent的sources,sinks,channels的名字a1.sources = s1a1.sinks = k1a1.channels = c1#配置sourcea1.sources.s1.channels = c1a1.sources.s1.type = avroa1.sources.s1.bind = hadoop10 # 这里也能够应用ip地址,我这里指定主机名,但要配置好/etc/hostsa1.sources.s1.port = 33333#配置channelsa1.channels.c1.type = memory#配置sinksa1.sinks.k1.channel = c1a1.sinks.k1.type = logger#为sources和sinks绑定channelsa1.sources.s1.channels = c1a1.sinks.k1.channel = c1启动flume ...

December 30, 2020 · 5 min · jiezi

关于flume:一Flume基础

1.Flume概述flume是一个分布式、牢靠、和高可用的海量日志采集、聚合和传输的零碎。反对在日志零碎中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简略解决,并写到各种数据接受方(比方文本、HDFS、Hbase等)的能力 。 2.Flume根底组成 2.1 AgentFlume运行外围是Agent,Agent是一个JVM过程,它以Event(事件)的模式将数据从源头送到目的地。一个Agent含有三个外围组件,别离是source、channel、sink。 2.2 Event数据传输单元,由Header和Body两局部形成,Header用来寄存该event的一些属性,为K-V构造,Body用来寄存该条数据,模式为字节数组。 2.3 SourceSource组件能够解决各种类型、各种格局的日志数据,包含avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy,同样反对自定义source。 2.4 Channel直达Event的一个长期存储,保留由Source组件传递过去的Event。它是Source和Sink之间的缓冲区,且是线程平安的,能够同时解决多个Source的写入和Sink的读取操作。罕用的有两种channel:MemoryChannel和FileChannel。memory channel是内存中的队列,可能实现高速的吞吐,毛病就是机器宕机、程序重启或者挂了都会导致数据失落。File channel 是将事件写入磁盘,可能长久化保留数据 2.5 Sink轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引零碎、或者被发送到另一个Flume Agent,反对自定义sink 3.Flume装置部署Flume装置部署非常简单 # 1. 上传jar包至服务器# 2.创立装置目录sudo mkdir /opt/appssudo chown v2admin:v2admin -R /opt/apps# 3.解压tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/appscd /opt/appsmv apache-flume-1.9.0-bin flume# 4.批改环境变量sudo vim /etc/profile.....// 在最初增加如下export FLUME_HOME=/opt/apps/flumeexport PATH=$PATH:$FLUME_HOME/bin.....source /etc/profile# 5.批改配置文件cd /opt/apps/flume/confcp flume-env.sh.template flume-env.shvim flume-env.sh....export JAVA_HOME=/usr/local/jdk8// 改成本人的java目录....至此部署实现,可通过flume-ng version 查看版本 [v2admin@hadoop10 conf]$ flume-ng versionFlume 1.9.0Source code repository: https://git-wip-us.apache.org/repos/asf/flume.gitRevision: d4fcab4f501d41597bc616921329a4339f73585eCompiled by fszabo on Mon Dec 17 20:45:25 CET 2018From source with checksum 35db629a3bda49d23e9b3690c80737f94 Flume 部署类型4.1 繁多流程这是最简略的部署形式。 ...

December 30, 2020 · 1 min · jiezi

关于flume:Flume入门案例

Flume入门案例1 监控端口数据官网案例1.1 案例需要:应用Flume监听一个端口,收集该端口数据,并打印到控制台。 1.2 需要剖析: 1.3 实现步骤:1.3.1 装置netcat工具sudo yum install -y nc1.3.2 判断44444端口是否被占用sudo netstat -tunlp | grep 444441.3.3 创立Flume Agent配置文件flume-netcat-logger.conf在flume目录下创立job文件夹并进入job文件夹。 mkdir jobcd job/在job文件夹下创立Flume Agent配置文件flume-netcat-logger.conf。 vim flume-netcat-logger.conf在flume-netcat-logger.conf文件中增加如下内容。 # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1注:配置文件来源于官网手册http://flume.apache.org/FlumeUserGuide.html配置文件解析 ...

December 22, 2020 · 3 min · jiezi

关于flume:Flume拓扑结构Flink进阶认识

Flume拓扑构造1 简略串联Flume Agent连贯此模式不倡议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输零碎。 2 复制和多路复用单source,多channel、sinkFlume反对将事件流向一个或者多个目的地。这种模式能够将雷同数据复制到多个channel中,或者将不同数据散发到不同的channel中,sink能够抉择传送到不同的目的地。 3 负载平衡和故障转移Flume反对应用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor能够实现负载平衡和谬误复原的性能。 4 聚合这种模式是咱们最常见的,也十分实用,日常web利用通常散布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,解决起来也十分麻烦。用flume的这种组合形式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志剖析。

December 15, 2020 · 1 min · jiezi

关于flume:Flume-Agent内部原理Flink进阶认识

Flume Agent外部原理 重要组件:1 ChannelSelectorChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,别离是Replicating(复制)和Multiplexing(多路复用)。ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会依据相应的准则,将不同的Event发往不同的Channel。 2 SinkProcessorSinkProcessor共有三种类型,别离是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessorDefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor能够实现负载平衡的性能,FailoverSinkProcessor能够实现故障转移的性能。

December 15, 2020 · 1 min · jiezi

关于flume:flume初识基本概念

1 定义 Flume是Cloudera提供的一个高可用的,高牢靠的,分布式的海量日志采集、聚合和传输的零碎。Flume基于流式架构,灵便简略。 2 flume根底框架 2.1 AgentAgent是一个JVM过程,它以事件的模式将数据从源头送至目标。 2.2 SourceSource是负责接收数据到Flume Agent的组件。Source组件能够解决各种类型、各种格局的日志数据,包含avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。 2.3 SinkSink一直地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引零碎、或者被发送到另一个Flume Agent。 Sink组件目的地包含hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。 2.4 ChannelChannel是位于Source和Sink之间的缓冲区。因而,Channel容许Source和Sink运作在不同的速率上。Channel是线程平安的,能够同时解决几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel:Memory Channel和File Channel。 Memory Channel是内存中的队列。Memory Channel在不须要关怀数据失落的情景下实用。如果须要关怀数据失落,那么Memory Channel就不应该应用,因为程序死亡、机器宕机或者重启都会导致数据失落。 File Channel将所有事件写到磁盘。因而在程序敞开或机器宕机的状况下不会失落数据。 2.5 Event传输单元,Flume数据传输的根本单元,以Event的模式将数据从源头送至目的地。Event由Header和Body两局部组成,Header用来寄存该event的一些属性,为K-V构造,Body用来寄存该条数据,模式为字节数组。 2.6 Interceptors在Flume中容许应用拦截器对传输中的event进行拦挡和解决!拦截器必须实现org.apache.flume.interceptor.Interceptor接口。拦截器能够依据开发者的设定批改甚至删除event!Flume同时反对拦截器链,即由多个拦截器组合而成!通过指定拦截器链中拦截器的程序,event将依照程序顺次被拦截器进行解决! 2.7 Channel SelectorsChannel Selectors用于source组件将event传输给多个channel的场景。罕用的有replicating(默认)和multiplexing两种类型。replicating负责将event复制到多个channel,而multiplexing则依据event的属性和配置的参数进行匹配,匹配胜利则发送到指定的channel! 2.8 Sink Processors用户能够将多个sink组成一个整体(sink组),Sink Processors可用于提供组内的所有sink的负载平衡性能,或在工夫故障的状况下实现从一个sink到另一个sink的故障转移。

December 15, 2020 · 1 min · jiezi

大数据系列Flume入门和认识

1. Flume简介Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统支持在日志系统中定制各类数据发送方,用于收集数据Flume提供对数据进行简单处理,并写到各种数据接收方2. Flume OG 与Flume NGFlume OG:Flume original generation,即Flume0.9x版本 Flume NG:Flume next generation,即Flume1.x版本 3. Flume体系结构flume的事件(agent) Source: 用来定义采集系统的源头 Channel: 把Source采集到的日志进行传输,处理 Sink:定义数据的目的地 4. Flume的安装准备安装文件 apache-flume-1.6.0-bin.tar.gz解压 tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /opt/重命名 mv apache-flume-1.6.0-bin flume添加环境变量 #配置Flume的环境变量export FLUME_HOME=/opt/flumeexport PATH=$PATH:$FLUME_HOME/bin配置文件 [root@uplooking01 /opt/flume/conf] mv flume-env.sh.template flume-env.sh[root@uplooking01 /opt/flume/conf/flume-env.sh] export JAVA_HOME=/opt/jdk 5. Flume采集网络端口数据5.1 定义flume的事件配置文件flume-nc.properties # flume-nc.conf: 用于监听网络数据的flume agent实例的配置文件############################################# 对各个组件的描述说明# 其中a1为agent的名字# r1是a1的source的代号名字# c1是a1的channel的代号名字# k1是a1的sink的代号名字############################################a1.sources = r1a1.sinks = k1a1.channels = c1# 用于描述source的,类型是netcat网络a1.sources.r1.type = netcat# source监听的网络ip地址和端口号a1.sources.r1.bind = uplooking01a1.sources.r1.port = 44444# 用于描述sink,类型是日志格式a1.sinks.k1.type = logger# 用于描述channel,在内存中做数据的临时的存储a1.channels.c1.type = memory# 该内存中最大的存储容量,1000个events事件a1.channels.c1.capacity = 1000# 能够同时对100个events事件监管事务a1.channels.c1.transactionCapacity = 100# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1启动agent(flume事件)flume-ng agent --name a1 --conf /opt/flume/conf --conf-file /opt/flume/conf/flume-nc.properties -Dflume.root.logger=INFO,console ...

June 14, 2019 · 2 min · jiezi

Flume浅度学习

flume简介cloudera 公司开源的,贡献给Apache基金会 http://flume.apache.org/ http://archive.cloudera.com/c... 只能运行在linux系统上 Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application. flume用来高效的收集、聚合、移动大量的日志数据 ...

May 15, 2019 · 4 min · jiezi