乐趣区

关于大数据:万字长文Hadoop入门笔记附资料

大数据迅速倒退,然而 Hadoop 的根底位置始终没有扭转。了解并把握 Hadoop 相干常识对于之后的相干组件学习有着地基的作用。本文整顿了 Hadoop 基础理论常识与罕用组件介绍,尽管有一些组件曾经不太罕用。然而了解第一批组件的相干常识对于当前的学习很有帮忙,将来的很多组件也借鉴了之前的设计理念。

文章较长,倡议珍藏 后浏览。

相干学习材料能够通过上面的形式下载,本文只是整顿了大数据 Hadoop 基本知识,有精力的同学能够通过相干书籍进行更深刻的学习。

本文通过以下章节由浅入深学习,倡议浏览前有肯定的 Linux 根底Java 根底 ,并搭建好 大数据环境。相干常识能够在大数据流动中获取。

一个最简略的大数据系统就是通过,zookeeper 进行协调服务,并通过任务调度对 hive 或者 mr 进行计算工作执行,通过数据传输与内部零碎建立联系。当然架构在不变动,最新的大数据架构远不止于此。但这些根本的组件对于了解大数据的原理十分的有帮忙。

这些组件互相配合,最终造成了 Hadoop 的生态体系。

注释开始~

一、大数据发展史

信息时代数据量的爆炸性增长,让大数据的倒退异样迅速。简略来说大数据是:

1、有海量的数据

2、有对海量数据进行开掘的需要

3、有对海量数据进行开掘的软件工具(hadoop、spark、flink……)

Hadoop 与大数据

HADOOP 最早起源于 Nutch 我的项目。Nutch 的设计指标是构建一个大型的全网搜索引擎,包含网页抓取、索引、查问等性能,但随着抓取网页数量的减少,遇到了重大的可扩展性问题——如何解决数十亿网页的存储和索引问题。

2003 年、2004 年谷歌发表的两篇论文为该问题提供了可行的解决方案。

——分布式文件系统(GFS),可用于解决海量网页的存储。

——分布式计算框架 MAPREDUCE,可用于解决海量网页的索引计算问题。

Nutch 的开发人员实现了相应的开源实现 HDFS 和 MAPREDUCE,并从 Nutch 中剥离成为独立我的项目 HADOOP,到 2008 年 1 月,HADOOP 成为 Apache 顶级我的项目,迎来了它的疾速发展期。

大数据组件

在大数据的倒退中,组件化始终都是一个十分大的趋势。屏蔽简单的底层研发,只关注数据工程与数据分析自身,让大数据得以迅速地倒退。而开源的技术倒退更是让大数据的倒退失去了长足的提高,大量的公司及集体奉献了很多的开源计划。这也让数据采集,荡涤,剖析,利用都变得轻而易举。

Hadoop,Hive,Spark,Flink 等等开源框架一直的倒退呈现。

这些组件相互配合,独特构建起了大数据的平台体系。所以学习好大数据的相干组件常识就十分的重要,也是做好大数据利用的根底。

大数据架构

大数据的技术与利用的倒退同步进行,催生着架构的一直演变。

从离线到实时,从数据仓库到数据湖,从大数据平台到数据中台。有人会说大数据有点夸张,大屏泛滥没有理论利用。然而事物的倒退正是通过了从概念到实际到落地的过程。不得不抵赖,大数据的架构在一直的向更好的方向演变。

大数据倒退

大数据的利用范畴在逐步的扩充,用户画像,举荐零碎等等畛域都是大数据在撑持。而数据治理的倒退让数据安全,数据品质也失去了器重。

将来的大数据,将是大数据 + 数据分析 + 人工智能的结合体,架构和技术都将一直的演进,越来越影响并扭转咱们的生存。

大数据的倒退让大数据相干岗位的需要猛增,大数据工程师,架构师,数据分析师,大数据运维等等都是十分不错的职业抉择。不过要揭示的是大数据的技术倒退迅速,要放弃学习,一直的获取新的常识。

二、分布式协调服务——Zookeeper

在学习 hadoop 组件之前,要先理解下 zookeeper。zookeeper 是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务。

简略的说 zk 解决了分布式系统的一致性问题,能够将须要一致性的数据放在 zk 中,同时 zk 也提供了监听等机制。zk 为 hadoop 分布式的实现提供了保障,所以大家之后不必纠结 hadoop 很多的操作是如何实现的,很多都依赖了 zk。

zk 是什么?

1、Zookeeper 是为别的分布式程序服务的

2、Zookeeper 自身就是一个分布式程序(只有有半数以上节点存活,zk 就能失常服务)

3、Zookeeper 所提供的服务涵盖:主从协调、服务器节点动静高低线、对立配置管理、分布式共享锁、对立名称服务……

4、尽管说能够提供各种服务,然而 zookeeper 在底层其实只提供了两个性能:

a、治理 (存储,读取) 用户程序提交的数据;

b、并为用户程序提供数据节点监听服务;

不仅是大数据畛域,在很多分布式系统中,zk 都有着十分大的利用。

Zookeeper 工作机制

1、Zookeeper:一个 leader,多个 follower 组成的集群

2、全局数据统一:每个 server 保留一份雷同的数据正本,client 无论连贯到哪个 server,数据都是统一的

3、分布式读写,更新申请转发,由 leader 施行

4、更新申请程序进行,来自同一个 client 的更新申请按其发送程序顺次执行

5、数据更新原子性,一次数据更新要么胜利(半数以上节点胜利),要么失败

6、实时性,在肯定工夫范畴内,client 能读到最新数据

Zookeeper 数据结构

1、层次化的目录构造,命名合乎惯例文件系统标准(见下图)

2、每个节点在 zookeeper 中叫做 znode, 并且其有一个惟一的门路标识

3、节点 Znode 能够蕴含数据 (只能存储很小量的数据,<1M; 最好是 1k 字节以内) 和子节点

4、客户端利用能够在节点上设置监视器

zookeeper 的选举机制

(1)Zookeeper 集群中只有超过半数以上的服务器启动,集群能力失常工作;

(2)在集群失常工作之前,myid 小的服务器给 myid 大的服务器投票,直到集群失常工作,选出 Leader;

(3)选出 Leader 之后,之前的服务器状态由 Looking 扭转为 Following,当前的服务器都是 Follower。

zk 命令行操作

运行 zkCli.sh –server <ip> 进入命令行工具

查看 znode 门路 ls /aaa

获取 znode 数据 get /aaa

zk 客户端 API

org.apache.zookeeper.Zookeeper 是客户端入口主类,负责建设与 server 的会话

它提供以下几类次要办法:

性能 形容
create 在本地目录树中创立一个节点
delete 删除一个节点
exists 测试本地是否存在指标节点
get/set data 从指标节点上读取 / 写数据
get/set ACL 获取 / 设置指标节点访问控制列表信息
get children 检索一个子节点上的列表
sync 期待要被传送的数据

三、分布式文件系统——HDFS

HDFS 概念

分而治之:将大文件、大批量文件,分布式寄存在大量服务器上,以便于采取分而治之的形式对海量数据进行运算剖析;

HDFS 是一个文件系统,用于存储文件,通过对立的命名空间——目录树来定位文件;

HDFS 是分布式的,由很多服务器联结起来实现其性能,集群中的服务器有各自的角色;

重要个性

HDFS 中的文件在物理上是分块存储(block),块的大小能够配置;

HDFS 文件系统会给客户端提供一个对立的形象目录树,客户端通过门路来拜访文件,形如:hdfs://namenode:port/dir/file;

目录构造及文件分块地位信息 (元数据) 的治理由 namenode 节点承当——namenode 是 HDFS 集群主节点,负责保护整个 hdfs 文件系统的目录树,以及每一个门路(文件)所对应的 block 块信息;

文件的各个 block 的存储管理由 datanode 节点承当——datanode 是 HDFS 集群从节点,每一个 block 都能够在多个 datanode 上存储多个正本;

HDFS 是设计成适应一次写入,屡次读出的场景,且不反对文件的批改(适宜用来做数据分析,并不适宜用来做网盘利用,因为,不便批改,提早大,网络开销大,老本太高)

HDFS 基本操作

不同的 hadoop 版本,hdfs 操作命令不同。上面是 hadoop3 的操作命令,如果是其余版本要查问对应的操作命令,能够应用 -help 来查看帮忙。

1、查问命令
hdfs dfs -ls / 查问 / 目录下的所有文件和文件夹

hdfs dfs -ls -R 以递归的形式查问 / 目录下的所有文件

2、创立文件夹
hdfs dfs -mkdir /test 创立 test 文件夹

3、创立新的空文件
hdfs dfs -touchz /aa.txt 在 / 目录下创立一个空文件 aa.txt

4、减少文件
hdfs dfs -put aa.txt /test 将当前目录下的 aa.txt 文件复制到 /test 目录下(把 -put 换成 -copyFromLocal 成果一样 -moveFromLocal 会移除本地文件)

5、查看文件内容
hdfs dfs -cat /test/aa.txt 查看 /test 目录下文件 aa.txt 的内容(将 -cat 换成 -text 成果一样)

6、复制文件
hdfs dfs -copyToLocal /test/aa.txt . 将 /test/aa.txt 文件复制到当前目录(. 是指当前目录,也可指定其余的目录)

7、删除文件或文件夹
hdfs dfs -rm -r /test/aa.txt 删除 /test/aa.txt 文件(/test/aa.txt 能够替换成文件夹就是删除文件夹)

8、重命名文件
hdfs dfs -mv /aa.txt /bb.txt 将 /aa.txt 文件重命名为 /bb.txt

9、将源目录中的所有文件排序合并到一个本地文件
hdfs dfs -getmerge / local-file 将 / 目录下的所有文件合并到本地文件 local-file 中

能够拜访 web 端对文件操作有一个直观的意识。拜访 NameNode Web UI 进行查看。

咱们能够了解为咱们通过命令对文件及文件夹进行了操作,但这都是 hdfs 给咱们提供的服务,而 hdfs 底层会将咱们的文件分布式存储。

HDFS 工作机制

能够通过 hdfs 的工作机制来了解一下原理。来理解一下 hdfs 是如何通过指令实现文件存取工作的。

  1. HDFS 集群分为两大角色:NameNode、DataNode (Secondary Namenode)
  2. NameNode 负责管理整个文件系统的元数据
  3. DataNode 负责管理用户的文件数据块
  4. 文件会依照固定的大小(blocksize)切成若干块后分布式存储在若干台 datanode 上
  5. 每一个文件块能够有多个正本,并存放在不同的 datanode 上
  6. Datanode 会定期向 Namenode 汇报本身所保留的文件 block 信息,而 namenode 则会负责放弃文件的正本数量
  7. HDFS 的外部工作机制对客户端放弃通明,客户端申请拜访 HDFS 都是通过向 namenode 申请来进行

写数据

客户端要向 HDFS 写数据,首先要跟 namenode 通信以确认能够写文件并取得接管文件 block 的 datanode,而后,客户端按程序将文件一一 block 传递给相应 datanode,并由接管到 block 的 datanode 负责向其余 datanode 复制 block 的正本。

读数据

客户端将要读取的文件门路发送给 namenode,namenode 获取文件的元信息(次要是 block 的寄存地位信息)返回给客户端,客户端依据返回的信息找到相应 datanode 一一获取文件的 block 并在客户端本地进行数据追加合并从而取得整个文件。

咱们要了解的是 namenode 的工作机制尤其是 元数据管理 机制,这对于当前做数据治理也十分的有帮忙。

Namenode 的工作机制

1、namenode 职责:负责客户端申请的响应, 元数据的治理(查问,批改)。

2、namenode 对数据的治理采纳了三种存储模式:

内存元数据(NameSystem)

磁盘元数据镜像文件

数据操作日志文件(可通过日志运算出元数据)

3、元数据存储形式:

内存中有一份残缺的元数据(内存 meta data)

磁盘有一个“准残缺”的元数据镜像(fsimage)文件(在 namenode 的工作目录中)

用于连接内存 metadata 和长久化元数据镜像 fsimage 之间的操作日志(edits 文件)

4、checkpoint:每隔一段时间,会由 secondary namenode 将 namenode 上积攒的所有 edits 和一个最新的 fsimage 下载到本地,并加载到内存进行 merge(这个过程称为 checkpoint)

Datanode 工作机制

1、Datanode 工作职责:

存储管理用户的文件块数据

定期向 namenode 汇报本身所持有的 block 信息(通过心跳信息上报)

2、Datanode 掉线判断

datanode 过程死亡或者网络故障造成 datanode 无奈与 namenode 通信,namenode 不会立刻把该节点断定为死亡,要通过一段时间。

客户端操作

hdfs 提供了对外的 api,能够进行客户端的操作。咱们只须要引入相干依赖就能够进行操作了。

这里是 java 示例,也有其余语言的操作。这种根本的操作前期的新组件也都有代替的计划,这里次要是相熟为主。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
</dependency>

如果是 windows 下研发,要指定 hadoop 安装包地位,能力引入相干包操作,安装包能够去材料中查看。

示例代码如下:

Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://192.168.137.101:9820"), conf, "root");
fs.copyFromLocalFile(new Path("D:\\aaa.txt"), new Path("/aaa"));
fs.close();

四、分布式运算框架——Mapreduce

Mapreduce 是一个分布式运算程序的编程框架。大略的意思能够了解为对于 hdfs 中的分布式数据,能够通过 Mapreduce 这种分布式的框架形式来进行简单的运算。试想一下,如果手写分布式运算,要进行任务分配,分批执行,再汇总。这是非常复杂的工程,分布式运算框架的作用就是简化这个过程。

Mapreduce 是偏底层的技术,前期的 Hive 框架将 sql 语句转化成 Mapreduce 语句进行执行,来简化操作。前期的 spark,flink 也都是反对 sql 语句的。不过这种分布式估算的思维还是十分的重要,也影响了起初很多框架的运算原理。理论工作中不会遇到,然而要对原理有一个理解。

一个残缺的 mapreduce 程序在分布式运行时有三类实例过程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、mapTask:负责 map 阶段的整个数据处理流程

3、ReduceTask:负责 reduce 阶段的整个数据处理流程

执行流程

1、一个 mr 程序启动的时候,最先启动的是 MRAppMaster,MRAppMaster 启动后依据本次 job 的形容信息,计算出须要的 maptask 实例数量,而后向集群申请机器启动相应数量的 maptask 过程

2、maptask 过程启动之后,依据给定的数据切片 (哪个文件的哪个偏移量范畴) 范畴进行数据处理,主体流程为:

a) 利用客户指定的 inputformat 来获取 RecordReader 读取数据,造成输出 KV 对

b) 将输出 KV 对传递给客户定义的 map()办法,做逻辑运算,并将 map()办法输入的 KV 对收集到缓存

c) 将缓存中的 KV 对依照 K 分区排序后一直溢写到磁盘文件

3、MRAppMaster 监控到所有 maptask 过程工作实现之后(真实情况是,某些 maptask 过程解决实现后,就会开始启动 reducetask 去已实现的 maptask 处 fetch 数据),会依据客户指定的参数启动相应数量的 reducetask 过程,并告知 reducetask 过程要解决的数据范畴(数据分区)

4、Reducetask 过程启动之后,依据 MRAppMaster 告知的待处理数据所在位置,从若干台 maptask 运行所在机器上获取到若干个 maptask 输入后果文件,并在本地进行从新归并排序,而后依照雷同 key 的 KV 为一个组,调用客户定义的 reduce()办法进行逻辑运算,并收集运算输入的后果 KV,而后调用客户指定的 outputformat 将后果数据输入到内部存储。

mapreduce 的 shuffle 机制

mapreduce 中,map 阶段解决的数据如何传递给 reduce 阶段,是 mapreduce 框架中最要害的一个流程,这个流程就叫 shuffle;

具体来说:就是将 maptask 输入的处理结果数据,分发给 reducetask,并在散发的过程中,对数据按 key 进行了分区和排序;

Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。

随后将 mr 的程序开发好,并运行即可,这就波及到一个问题。如何运行。

五、资源调度——Yarn

在 hadoop 最开始的版本中,mapreduce 的程序要想运行必须本人进行调度,调配资源。这就导致治理越老越凌乱,Yarn 就呈现了。

Apache Hadoop YARN:Yet Another Resource Negotiator,另一种资源协调者。

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 mapreduce 等运算程序则相当于运行于操作系统之上的应用程序。随着 hadoop 的倒退,yarn 始终是最外围的资源调度核心,将来咱们写的 spark,flink 程序都能够通过 Yarn 来进行调度。

YARN 的重要概念

1、yarn 并不分明用户提交的程序的运行机制

2、yarn 只提供运算资源的调度(用户程序向 yarn 申请资源,yarn 就负责分配资源)

3、yarn 中的主管角色叫 ResourceManager

4、yarn 中具体提供运算资源的角色叫 NodeManager

5、这样一来,yarn 其实就与运行的用户程序齐全解耦,就意味着 yarn 上能够运行各种类型的分布式运算程序(mapreduce 只是其中的一种),比方 mapreduce、spark,flink……

6、所以,spark 等运算框架都能够整合在 yarn 上运行,只有他们各自的框架中有合乎 yarn 标准的资源申请机制即可

Yarn 就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都能够整合在一个物理集群上,进步资源利用率,不便数据共享。

ResourceManager

ResourceManager 是 YARN 中的主节点服务,它负责集群中所有资源的对立治理和作业调度。

简略来讲,ResourceManager 次要实现的性能包含:

  1. 与客户端交互,解决来自客户端的申请;
  2. 启动和治理 ApplicationMaster,并在它运行失败时重新启动它;
  3. 治理 NodeManager,接管来自 NodeManager 的资源汇报信息,并向 NodeManager 下达治理指令(比方杀死 container 等);
  4. 资源管理与调度,接管来自 ApplicationMaster 的资源申请申请,并为之分配资源。

NodeManager

NodeManager 是 YARN 集群中的每个具体节点的资源和工作管理者。NodeManager 的次要性能包含:

  1. 定时向 ResourceManager 汇报本节点上的资源应用状况和各个 Container 的运行状态;
  2. 接管并解决 ApplicationMaster 对 container 的启动、进行等各种申请;
  3. 治理 Container 的生命周期,监控 Container 的资源应用;
  4. 治理工作日志和不同应用程序用到的从属服务(auxiliary service)。

ApplicationMaster

用户提交的每个应用程序均蕴含一个 ApplicationMaster,次要性能包含:

  1. 与 ResourceManager 调度器协商以获取资源;
  2. 将失去的资源进一步调配给外部的工作;
  3. 与 NodeManager 通信以启动或进行工作;
  4. 监控所有工作的运行状态,并在工作运行失败时负责进行容错解决。

Container

Container 是 YARN 中的资源形象,它封装了某个节点上的多个维度的资源,如 CPU、内存、磁盘、网络等。当 ApplicationMaster 向 ResourceManager 申请资源时,ResourceManager 为 ApplicationMaster 返回的资源是用 Container 示意的。

当用户向 YARN 中提交一个应用程序后,YARN 将分两个阶段运行该应用程序:

第一阶段:启动 ApplicationMaster;

第二阶段:由 ApplicationMaster 创立应用程序;为它申请资源,并监控它的整个运行过程,直到运行实现。

第 1 步:

client 读取作业配置信息并创立 Job 的环境,调用 job.waitForCompletion 办法,向集群提交一个 MapReduce 作业。

第 2 步:

资源管理器给任务分配一个新的作业 ID。

第 3 步:

作业的 client 核实作业的输入门路,计算输出文件的分片,将作业的资源 (包含:Jar 包、配置文件,split 信息等) 拷贝到 HDFS 集群上的作业提交目录。

第 4 步:

通过调用资源管理器的 submitApplication()来提交作业。

第 5 步:

当资源管理器收到 submitApplciation()的申请时,就将该申请发给调度器 (scheduler),调度器向 NodeManager 发送一个启动 container 的申请。

第 6 步:

节点管理器 NodeManager 启动 container,外部运行着一个主类为 MRAppMaster 的 Java 利用。其通过发明一些对象来监控作业的进度,失去各个 task 的进度和实现报告。

第 7 步:

而后其通过分布式文件系统 HDFS 来获取由客户端提前计算好的输出 split,而后为每个输出 split 创立一个 map 工作,依据 mapreduce.job.reduces 创立 reduce 工作对象。

第 8 步:

如果不是小作业,那利用管理器向资源管理器申请 container 来运行所有的 map 和 reduce 工作。

这些申请是通过心跳来传输的,包含每个 map 工作的数据地位。比方:寄存输出 split 的主机名和机架(rack),调度器利用这些信息来调度工作,尽量将任务分配给存储数据的节点或雷同机架的节点。

第 9 步:

当一个工作由资源管理器的调度器调配给一个 container 后,AppMaster 通过分割 NodeManager 来启动 container。

第 10 步:

工作由一个主类为 YarnChild 的 Java 利用执行,在运行工作之前首先本地化工作须要的资源。比方:作业配置、JAR 文件以及分布式缓存的所有依赖文件。

第 11 步:

最初,启动并运行 map 或 reduce 工作。

同理在向 yarn 提交 spark 程序时也会按这种形式进行。这就让资源的调度与程序自身拆散。

六、数仓工具——Hive

Hive 是基于 Hadoop 的一个数据仓库工具(离线),能够将结构化的数据文件映射为一张数据库表,并提供类 SQL 查问性能。

Hive 解决了 MapReduce 的简单研发问题,采纳类 SQL 语法学习成本低。

Hive 须要有一个存储元数据的数据库,能够用 mysql 等等。

简略来说,通过 Hive 就能够与 hdfs 文件建设映射关系。咱们只须要通过开发 hivesql 语句,就能够对 hdfs 上的文件进行操作了。

Hive 基本操作

hive 中有一个默认的库:

库名:default

库目录:hdfs://ip:9000/user/hive/warehouse

新建库:

create database db_order;

库建好后,在 hdfs 中会生成一个库目录:

hdfs://hdp20-01:9000/user/hive/warehouse/db_order.db

建表:

use db_order;

create table t_order(id string,create_time string,amount float,uid string);

表建好后,会在所属的库目录中生成一个表目录

/user/hive/warehouse/db_order.db/t_order

只是,这样建表的话,hive 会认为表数据文件中的字段分隔符为 ^A

正确的建表语句为:

create table t_order(id string,create_time string,amount float,uid string)

row format delimited

fields terminated by ‘,’;

这样就指定了,咱们的表数据文件中的字段分隔符为 “,”

删除表:

drop table t_order;

删除表的成果是:

hive 会从元数据库中革除对于这个表的信息;

hive 还会从 hdfs 中删除这个表的表目录;

外部表与内部表

外部表(MANAGED_TABLE):表目录依照 hive 的标准来部署,位于 hive 的仓库目录 /user/hive/warehouse 中

内部表(EXTERNAL_TABLE):表目录由建表用户本人指定

create external table t_access(ip string,url string,access_time string)

row format delimited

fields terminated by ‘,’

location ‘/access/log’;

内部表和外部表的个性差异:

1、外部表的目录在 hive 的仓库目录中 VS 内部表的目录由用户指定

2、drop 一个外部表时:hive 会革除相干元数据,并删除表数据目录

3、drop 一个内部表时:hive 只会革除相干元数据;

一个 hive 的数据仓库,最底层的表,肯定是来自于内部零碎,为了不影响内部零碎的工作逻辑,在 hive 中可建 external 表来映射这些内部零碎产生的数据目录;

而后,后续的 etl 操作,产生的各种表倡议用 managed_table

分区表

分区表的本质是:在表目录中为数据文件创建分区子目录,以便于在查问时,MR 程序能够针对分区子目录中的数据进行解决,缩减读取数据的范畴。

比方,网站每天产生的浏览记录,浏览记录应该建一个表来寄存,然而,有时候,咱们可能只须要对某一天的浏览记录进行剖析

这时,就能够将这个表建为分区表,每天的数据导入其中的一个分区;

当然,每日的分区目录,应该有一个目录名(分区字段)

示例:

create table t_access(ip string,url string,access_time string)
partitioned by(dt string)
row format delimited
fields terminated by ',';

数据导入导出

形式 1:导入数据的一种形式:

手动用 hdfs 命令,将文件放入表目录;

形式 2:在 hive 的交互式 shell 中用 hive 命令来导入本地数据到表目录

hive>load data local inpath ‘/root/order.data.2’ into table t_order;

形式 3:用 hive 命令导入 hdfs 中的数据文件到表目录

hive>load data inpath ‘/access.log’ into table t_access partition(dt=’20210806′);

文件格式

HIVE 反对很多种文件格式:SEQUENCE FILE | TEXT FILE | PARQUET FILE | RC FILE

create table t_pq(movie string,rate int) stored as textfile;

create table t_pq(movie string,rate int) stored as sequencefile;

create table t_pq(movie string,rate int) stored as parquetfile;

七、任务调度——azkaban

azkaban 是一个工作流调度零碎。与之类似的还有 oozie,airflow 等等。

一个残缺的数据分析系统通常都是由大量工作单元组成:

shell 脚本程序,java 程序,mapreduce 程序、hive 脚本等;

各工作单元之间存在工夫先后及前后依赖关系;

为了很好地组织起这样的简单执行打算,须要一个工作流调度零碎来调度执行。

在理论工作中,绝不是一个程序就能搞定所有的。须要分为多个程序运行,还有前后程序,所以任务调度零碎始终存在。也在一直的倒退。

简略的任务调度:间接应用 linux 的 crontab 来定义;

简单的任务调度:开发调度平台

或应用现成的开源调度零碎,比方 ooize、azkaban 等。

Azkaban介绍

Azkaban 是由 Linkedin 开源的一个批量工作流任务调度器。用于在一个工作流内以一个特定的程序运行一组工作和流程。Azkaban 定义了一种 KV 文件格式来建设工作之间的依赖关系,并提供一个易于应用的 web 用户界面保护和跟踪你的工作流。

地址:https://github.com/azkaban/az…

Azkaban 应用

Azkaba 内置的工作类型反对 command、java

1、创立 job 形容文件

vi command.job

command.jobtype=command command=echo ‘hello’

2、将 job 资源文件打包成 zip 文件

zip command.job

3、通过 azkaban 的 web 治理平台创立 project 并上传 job 压缩包

首先创立 project

上传 zip 包

4、启动执行该 job

Command 类型多 job 工作流 flow

1、创立有依赖关系的多个 job 形容

第一个 job:foo.job

foo.jobtype=commandcommand=echo foo

第二个 job:bar.job 依赖 foo.job

bar.jobtype=commanddependencies=foocommand=echo bar

2、将所有 job 资源文件打到一个 zip 包中

3、在 azkaban 的 web 治理界面创立工程并上传 zip 包

4、启动工作流 flow

HDFS 操作工作

1、创立 job 形容文件

fs.jobtype=commandcommand=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz

2、将 job 资源文件打包成 zip 文件

3、通过 azkaban 的 web 治理平台创立 project 并上传 job 压缩包

4、启动执行该 job

八、数据传输 Sqoop

Sqoop 是一个用于在 Hadoop\ 和关系型数据库之间流转数据的一个工具。能够应用 Sqoop 将数据从关系型数据库系统 (RDBMS) 比方 MySQL 或者 Oracle 导入到 hadoop 分布式文件系统 (HDFS) 上,而后数据在 Hadoop MapReduce 上转换,以及将数据导出到 RDBMS 中。
Sqoop 主动实现了下面提到的很多过程,Sqoop 应用 MapReduce 来导入和导出数据,这样既能够提供并行化操作又能够进步容错能力。

Sqoop 是 Apache 软件基金会的一个开源我的项目。能够拜访 http://Sqoop.apache.org 获取,sqoop 目前曾经趋于稳定,从 apache 退休了。

在每天定时定时调度把 mysql 数据传到大数据集群中,或者把 hive 中数据传走时会用到。不过随时数据实时化的要求变高,sqoop 的作用小了很多。然而一些历史数据的导入还是须要的。

Sqoop 应用

Sqoop 提供了一系列的操作工具,应用 Sqoop 须要指定你想要应用的具体工具,以及提供对应的一些参数,应用形式如下。

sqoop tool-name [tool-arguments]

能够应用 sqoop help 命令查看帮忙信息

sqoop help
Available commands:
  codegen            生成 Java 代码
  create-hive-table  依据表构造生成 hive 表
  eval               执行 SQL 语句并返回后果
  export             导出 HDFS 文件到数据库表
  help               帮忙
  import             从数据库导入数据到 HDFS
  import-all-tables  导入数据库所有表到 HDFS
  list-databases     列举所有的 database
  list-tables        列举数据库中的所有表
  version            查看版本信息

能够看到,sqoop 提供的操作工具有 10 个。具体工具的应用帮忙能够 sqoop help (tool-name)或者 sqoop tool-name –help 进行查看。

sqoop-import

import 工具能够用于从 RDBMS 中导入一张表到 HDFS。表中的每一条记录对应生成 HDFS 文件中的每一行。这些记录能够以 text files 或者 Avro 或者 SequenceFiles 格局进行存储。

应用办法如下

$ sqoop-import (generic-args) (import-args)

参数列表 -import 基本参数

参数 形容
–connect < jdbc-uri > JDBC 连贯串
–connection-manager < class-name > 连贯治理类
–driver < class-name > 手动指定 JDBC 驱动类
–hadoop-mapred-home < dir > 能够笼罩 $HADOOP_MAPRED_HOME
–help 应用帮忙
–password-file 指定蕴含明码的文件
-P 执行 import 时会暂停,期待用户手动输出明码
–password < password > 间接将明码写在命令行中
–username < username > 指定用户名
–verbose 显示 Sqoop 工作更多执行信息
–connection-param-file < filename > 可选的参数,用于提供连贯参数
–relaxed-isolation 设置每个 mapmer 的连贯事务隔离

Hive 参数

以下是导入到 Hive 中时可选的参数:

--hive-home <dir>:笼罩 $HIVE_HOME。--hive-import:将表导入 Hive(如果没有设置,则应用 Hive 的默认分隔符。)--hive-overwrite:笼罩 Hive 表中的现有数据。--create-hive-table:如果设置,那么如果存在指标 hivetable,作业将失败。默认状况下,此属性为 false。--hive-table <table-name>:设置导入到 Hive 时要应用的表名。--hive-drop-import-delims:导入到 Hive 时,从字符串字段中删除 \n、\r 和 \01。--hive-delims-replacement:在导入到 Hive 时,将字符串字段中的 \n、\r 和 \01 替换为用户定义的字符串。--hive-partition-key:调配到分区的 Hive 字段的名称。--hive-partition-value <v>:作为该工作导入到 Hive 中的分区键的字符串值。

示例:

bin/sqoop import \
--connect jdbc:mysql://hostname:3306/mydb \
--username root \
--password root \
--table mytable \
--num-mappers 1 \
--hive-import \
--hive-database mydb \
--hive-table mytable \
--fields-terminated-by "\t" \
--delete-target-dir \
--hive-overwrite 

sqoop-export

Sqoop 的 export 工具能够从 HDFS 同步一系列文件数据到 RDBMS 中。应用这个工具的前提是导出指标表在数据库必须存在。导出文件依据用户指定的分隔符转化成一系列的输入记录。
默认的导出操作会将这些记录转化成一系列的 INSERT 语句,依据这些语句将记录插入到关系型数据库中。而在 update 模式下,Sqoop 会生成一系列的 UPDATE 语句,将数据库中曾经存在的记录进行更新。在 call 模式下,Sqoop 会为每一条记录调用一个存储过程来解决。

$ sqoop-export (generic-args) (export-args)

基本参数

* 参数 * * 形容 *
–connect < jdbc-uri > JDBC 连贯串
–connection-manager < class-name > 连贯治理类
–driver < class-name > 手动指定 JDBC 驱动类
–hadoop-mapred-home < dir > 能够笼罩 $HADOOP_MAPRED_HOME
–help 应用帮忙
–password-file 指定蕴含明码的文件
-P 执行 import 时会暂停,期待用户手动输出明码
–password < password > 间接将明码写在命令行中
–username < username > 指定用户名

示例:

$ bin/sqoop export \
--connect jdbc:mysql://hostname:3306/mydb \
--username root \
--password root \
--table mytable \
--num-mappers 1 \
--export-dir /user/hive/warehouse/mydb.db/mytable \
--input-fields-terminated-by "\t"

九、数据收集 -Flume

  • Flume 是一个分布式、牢靠、和高可用的海量日志采集、聚合和传输的零碎
  • 反对在日志零碎中定制各类数据发送方,用于收集数据
  • Flume 提供对数据进行简略解决,并写到各种数据接管方

Flume 是成熟的开源日志采集零碎,且自身就是 hadoop 生态体系中的一员,与 hadoop 体系中的各种框架组件具备天生的亲和力,可扩展性强。

绝对于用 Shell 脚本和 Java 的收集形式,躲避了对日志采集过程中的容错解决不便管制,缩小了开发工作量。

例如对于实时的日志剖析这种场景中,对数据采集局部的可靠性、容错能力要求通常不会十分严苛,因而应用通用的 flume 日志采集框架齐全能够满足需要。

Flume 的配置

装置好 flume 当前须要对其进行配置。

flume 通过事件 (agent) 进行运作,事件下蕴含如下的概念。

Source: 用来定义采集零碎的源头

Channel: 把 Source 采集到的日志进行传输, 解决

Sink: 定义数据的目的地

上面是一个示例。

有一个概念就是,咱们定义了 agent1 这个 agent。

定义了 agent1.sources 的系列设置去执行 tail - F 实时的采集日志数据。

通过 Channel 传输,最初指定 Sink 将日志存入 hdfs。

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure tail -F source1
#应用 exec 作为数据源 source 组件
agent1.sources.source1.type = exec 
#应用 tail - F 命令实时收集新产生的日志数据
agent1.sources.source1.command = tail -F /var/logs/nginx/access_log
agent1.sources.source1.channels = channel1

#configure host for source
#配置一个拦截器插件
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
#应用拦截器插件获取 agent 所在服务器的主机名
agent1.sources.source1.interceptors.i1.hostHeader = hostname

#配置 sink 组件为 hdfs
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
#agent1.sinks.sink1.hdfs.path=hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H%M%S
#指定文件 sink 到 hdfs 上的门路
agent1.sinks.sink1.hdfs.path=
hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M_%hostname
#指定文件名前缀
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 
#指定每批下沉数据的记录条数
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#指定下沉文件按 1G 大小滚动
agent1.sinks.sink1.hdfs.rollSize = 1024*1024*1024
#指定下沉文件按 1000000 条数滚动
agent1.sinks.sink1.hdfs.rollCount = 1000000
#指定下沉文件按 30 分钟滚动
agent1.sinks.sink1.hdfs.rollInterval = 30
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
#应用 memory 类型 channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

随后将 flume 用指定的配置文件启动即可。

bin/flume-ng agent --conf ./conf -f ./conf/weblog.properties.2 -n agent

留神:启动命令中的 -n 参数要给配置文件中配置的 agent 名称

目前市面针对日志采集的有 Flume,Logstash,Filebeat,Fluentd,rsyslog 很多种。但根本的原理是雷同的,要依据公司的状况进行抉择。

本文从大数据实践到罕用的根底组件进行的笔记的整顿,更深刻的 hadoop 理论知识倡议通过书籍进行深刻的浏览学习。而 Spark,Flink 等组件的学习将会通过独自的文章进行笔记整顿。心愿对大家有所帮忙,更多大数据相干常识,请关注 大数据流动~

退出移动版