共计 52036 个字符,预计需要花费 131 分钟才能阅读完成。
此套面试题来自于各大厂的实在面试题及常问的知识点,如果能了解吃透这些问题,你的大数据能力将会大大晋升,进入大厂不可企及
温习大数据面试题,看这一套就够了!
本文目录:
一、Hadoop
二、Hive
三、Spark
四、Kafka
五、HBase
六、Flink
七、数仓业务方面
八、算法
本文因内容较多,带目录的 PDF 版查看是比拟不便的:2022 年最弱小数据面试宝典 PDF 版
Hadoop
Hadoop 中常问的就三块,第一:分布式存储(HDFS);第二:分布式计算框架(MapReduce);第三:资源调度框架(YARN)。
1. 请说下 HDFS 读写流程
这个问题尽管见过无数次,面试官问过无数次,还是有不少面试者不能残缺的说进去,所以请务必记住。并且很多问题都是从 HDFS 读写流程中引申进去的。
HDFS 写流程:
- Client 客户端发送上传申请,通过 RPC 与 NameNode 建设通信,NameNode 查看该用户是否有上传权限,以及上传的文件是否在 HDFS 对应的目录下重名,如果这两者有任意一个不满足,则间接报错,如果两者都满足,则返回给客户端一个能够上传的信息;
- Client 依据文件的大小进行切分,默认 128M 一块,切分实现之后给 NameNode 发送申请第一个 block 块上传到哪些服务器上;
-
NameNode 收到申请之后,依据网络拓扑和机架感知以及正本机制进行文件调配,返回可用的 DataNode 的地址;
注:Hadoop 在设计时思考到数据的平安与高效, 数据文件默认在 HDFS 上寄存三份, 存储策略为本地一份,同机架内其它某一节点上一份, 不同机架的某一节点上一份。
- 客户端收到地址之后与服务器地址列表中的一个节点如 A 进行通信,实质上就是 RPC 调用,建设 pipeline,A 收到申请后会持续调用 B,B 在调用 C,将整个 pipeline 建设实现,逐级返回 Client;
- Client 开始向 A 上发送第一个 block(先从磁盘读取数据而后放到本地内存缓存 ), 以 packet(数据包,64kb)为单位,A 收到一个 packet 就会发送给 B,而后 B 发送给 C,A 每传完一个 packet 就会放入一个应答队列期待应答;
- 数据被宰割成一个个的 packet 数据包在 pipeline 上顺次传输,在 pipeline 反向传输中,一一发送 ack(命令正确应答),最终由 pipeline 中第一个 DataNode 节点 A 将 pipelineack 发送给 Client;
- 当一个 block 传输实现之后, Client 再次申请 NameNode 上传第二个 block,NameNode 从新抉择三台 DataNode 给 Client。
HDFS 读流程:
- Client 向 NameNode 发送 RPC 申请。申请文件 block 的地位;
- NameNode 收到申请之后会检查用户权限以及是否有这个文件,如果都合乎,则会视状况返回局部或全副的 block 列表,对于每个 block,NameNode 都会返回含有该 block 正本的 DataNode 地址;这些返回的 DataNode 地址,会依照集群拓扑构造得出 DataNode 与客户端的间隔,而后进行排序,排序两个规定:网络拓扑构造中距离 Client 近的排靠前;心跳机制中超时汇报的 DataNode 状态为 STALE,这样的排靠后;
- Client 选取排序靠前的 DataNode 来读取 block,如果客户端自身就是 DataNode, 那么将从本地间接获取数据 ( 短路读取个性);
- 底层上实质是建设 Socket Stream(FSDataInputStream),反复的调用父类 DataInputStream 的 read 办法,直到这个块上的数据读取结束;
- 当读完列表的 block 后,若文件读取还没有完结,客户端会持续向 NameNode 获取下一批的 block 列表;
- 读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时呈现谬误,客户端会告诉 NameNode,而后再从下一个领有该 block 正本的 DataNode 持续读;
- read 办法是并行的读取 block 信息,不是一块一块的读取 ;NameNode 只是返回 Client 申请蕴含块的 DataNode 地址, 并不是返回申请块的数据;
- 最终读取来所有的 block 会合并成一个残缺的最终文件;
2. HDFS 在读取文件的时候,如果其中一个块忽然损坏了怎么办
客户端读取完 DataNode 上的块之后会进行 checksum 验证,也就是把客户端读取到本地的块与 HDFS 上的原始块进行校验,如果发现校验后果不统一,客户端会告诉 NameNode,而后再 从下一个领有该 block 正本的 DataNode 持续读。
3. HDFS 在上传文件的时候,如果其中一个 DataNode 忽然挂掉了怎么办
客户端上传文件时与 DataNode 建设 pipeline 管道,管道的正方向是客户端向 DataNode 发送的数据包,管道反向是 DataNode 向客户端发送 ack 确认,也就是正确接管到数据包之后发送一个已确认接管到的应答。
当 DataNode 忽然挂掉了,客户端接管不到这个 DataNode 发送的 ack 确认,客户端会告诉 NameNode,NameNode 查看该块的正本与规定的不符,NameNode 会告诉 DataNode 去复制正本,并将挂掉的 DataNode 作下线解决,不再让它参加文件上传与下载。
4. NameNode 在启动的时候会做哪些操作
NameNode 数据存储在内存和本地磁盘,本地磁盘数据存储在fsimage 镜像文件和 edits 编辑日志文件。
首次启动 NameNode:
- 格式化文件系统,为了生成 fsimage 镜像文件;
-
启动 NameNode:
- 读取 fsimage 文件,将文件内容加载进内存
- 期待 DataNade 注册与发送 block report
-
启动 DataNode:
- 向 NameNode 注册
- 发送 block report
- 查看 fsimage 中记录的块的数量和 block report 中的块的总数是否雷同
-
对文件系统进行操作(创立目录,上传文件,删除文件等):
- 此时内存中曾经有文件系统扭转的信息,然而磁盘中没有文件系统扭转的信息,此时会将这些扭转信息写入 edits 文件中,edits 文件中存储的是文件系统元数据扭转的信息。
第二次启动 NameNode:
- 读取 fsimage 和 edits 文件;
- 将 fsimage 和 edits 文件合并成新的 fsimage 文件;
- 创立新的 edits 文件,内容开始为空;
- 启动 DataNode。
5. Secondary NameNode 理解吗,它的工作机制是怎么的
Secondary NameNode 是合并 NameNode 的 edit logs 到 fsimage 文件中;
它的具体工作机制:
- Secondary NameNode 询问 NameNode 是否须要 checkpoint。间接带回 NameNode 是否查看后果;
- Secondary NameNode 申请执行 checkpoint;
- NameNode 滚动正在写的 edits 日志;
- 将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode;
- Secondary NameNode 加载编辑日志和镜像文件到内存,并合并;
- 生成新的镜像文件 fsimage.chkpoint;
- 拷贝 fsimage.chkpoint 到 NameNode;
- NameNode 将 fsimage.chkpoint 重新命名成 fsimage;
所以如果 NameNode 中的元数据失落,是能够从 Secondary NameNode 复原一部分元数据信息的,但不是全副,因为 NameNode 正在写的 edits 日志还没有拷贝到 Secondary NameNode,这部分复原不了。
6. Secondary NameNode 不能复原 NameNode 的全副数据,那如何保障 NameNode 数据存储平安
这个问题就要说 NameNode 的高可用了,即 NameNode HA。
一个 NameNode 有单点故障的问题,那就配置双 NameNode,配置有两个关键点,一是必须要保障这两个 NameNode 的元数据信息必须要同步的,二是一个 NameNode 挂掉之后另一个要立马补上。
- 元数据信息同步在 HA 计划中采纳的是“共享存储”。每次写文件时,须要将日志同步写入共享存储,这个步骤胜利能力认定写文件胜利。而后备份节点定期从共享存储同步日志,以便进行主备切换。
- 监控 NameNode 状态采纳 zookeeper,两个 NameNode 节点的状态寄存在 zookeeper 中,另外两个 NameNode 节点别离有一个过程监控程序,施行读取 zookeeper 中有 NameNode 的状态,来判断以后的 NameNode 是不是曾经 down 机。如果 Standby 的 NameNode 节点的 ZKFC 发现主节点曾经挂掉,那么就会强制给本来的 Active NameNode 节点发送强制敞开申请,之后将备用的 NameNode 设置为 Active。
如果面试官再问 HA 中的 共享存储 是怎么实现的晓得吗?
能够进行解释下:NameNode 共享存储计划有很多,比方 Linux HA, VMware FT, QJM 等,目前社区曾经把由 Clouderea 公司实现的基于 QJM(Quorum Journal Manager)的计划合并到 HDFS 的 trunk 之中并且作为 默认的共享存储 实现。
基于 QJM 的共享存储系统 次要用于保留 EditLog,并不保留 FSImage 文件。FSImage 文件还是在 NameNode 的本地磁盘上。
QJM 共享存储的根本思维来自于 Paxos 算法,采纳多个称为 JournalNode 的节点组成的 JournalNode 集群来存储 EditLog。每个 JournalNode 保留同样的 EditLog 正本。每次 NameNode 写 EditLog 的时候,除了向本地磁盘写入 EditLog 之外,也会并行地向 JournalNode 集群之中的每一个 JournalNode 发送写申请,只有大多数的 JournalNode 节点返回胜利就认为向 JournalNode 集群写入 EditLog 胜利。如果有 2N+ 1 台 JournalNode,那么依据大多数的准则,最多能够容忍有 N 台 JournalNode 节点挂掉。
7. 在 NameNode HA 中,会呈现脑裂问题吗?怎么解决脑裂
假如 NameNode1 以后为 Active 状态,NameNode2 以后为 Standby 状态。如果某一时刻 NameNode1 对应的 ZKFailoverController 过程产生了“假死”景象,那么 Zookeeper 服务端会认为 NameNode1 挂掉了,依据后面的主备切换逻辑,NameNode2 会代替 NameNode1 进入 Active 状态。然而此时 NameNode1 可能依然处于 Active 状态失常运行,这样 NameNode1 和 NameNode2 都处于 Active 状态,都能够对外提供服务。这种状况称为脑裂。
脑裂对于 NameNode 这类对数据一致性要求十分高的零碎来说是灾难性的,数据会产生错乱且无奈复原。zookeeper 社区对这种问题的解决办法叫做 fencing,中文翻译为隔离,也就是想方法把旧的 Active NameNode 隔离起来,使它不能失常对外提供服务。
在进行 fencing 的时候,会执行以下的操作:
- 首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 办法,看能不能把它转换为 Standby 状态。
-
如果 transitionToStandby 办法调用失败,那么就执行 Hadoop 配置文件之中预约义的隔离措施,Hadoop 目前次要提供两种隔离措施,通常会抉择 sshfence:
- sshfence:通过 SSH 登录到指标机器上,执行命令 fuser 将对应的过程杀死;
- shellfence:执行一个用户自定义的 shell 脚本来将对应的过程隔离。
8. 小文件过多会有什么危害,如何防止
Hadoop 上大量 HDFS 元数据信息存储在 NameNode 内存中, 因而过多的小文件必定会压垮 NameNode 的内存。
每个元数据对象约占 150byte,所以如果有 1 千万个小文件,每个文件占用一个 block,则 NameNode 大概须要 2G 空间。如果存储 1 亿个文件,则 NameNode 须要 20G 空间。
不言而喻的解决这个问题的办法就是合并小文件, 能够抉择在客户端上传时执行肯定的策略先合并, 或者是应用 Hadoop 的 CombineFileInputFormat\<K,V\>
实现小文件的合并。
9. 请说下 HDFS 的组织架构
-
Client:客户端
- 切分文件。文件上传 HDFS 的时候,Client 将文件切分成一个一个的 Block,而后进行存储
- 与 NameNode 交互,获取文件的地位信息
- 与 DataNode 交互,读取或者写入数据
- Client 提供一些命令来治理 HDFS,比方启动敞开 HDFS、拜访 HDFS 目录及内容等
-
NameNode:名称节点,也称主节点,存储数据的元数据信息,不存储具体的数据
- 治理 HDFS 的名称空间
- 治理数据块(Block)映射信息
- 配置正本策略
- 解决客户端读写申请
-
DataNode:数据节点,也称从节点。NameNode 下达命令,DataNode 执行理论的操作
- 存储理论的数据块
- 执行数据块的读 / 写操作
-
Secondary NameNode:并非 NameNode 的热备。当 NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务
- 辅助 NameNode,分担其工作量
- 定期合并 Fsimage 和 Edits,并推送给 NameNode
- 在紧急情况下,可辅助复原 NameNode
10. 请说下 MR 中 Map Task 的工作机制
简略概述:
inputFile 通过 split 被切割为多个 split 文件,通过 Record 按行读取内容给 map(本人写的解决逻辑的办法)
,数据被 map 解决完之后交给 OutputCollect 收集器,对其后果 key 进行分区(默认应用的 hashPartitioner),而后写入 buffer,每个 map task 都有一个内存缓冲区(环形缓冲区),寄存着 map 的输入后果,当缓冲区快满的时候须要将缓冲区的数据以一个临时文件的形式溢写到磁盘,当整个 map task 完结后再对磁盘中这个 maptask 产生的所有临时文件做合并,生成最终的正式输入文件,而后期待 reduce task 的拉取。
具体步骤:
1) 读取数据组件 InputFormat (默认 TextInputFormat) 会通过 getSplits 办法对输出目录中的文件进行逻辑切片布局失去 block,有多少个 block 就对应启动多少个 MapTask。
2) 将输出文件切分为 block 之后,由 RecordReader 对象 (默认是 LineRecordReader) 进行读取,以 \n 作为分隔符, 读取一行数据, 返回 \<key,value\>,Key 示意每行首字符偏移值,Value 示意这一行文本内容。
3) 读取 block 返回 \<key,value\>, 进入用户本人继承的 Mapper 类中,执行用户重写的 map 函数,RecordReader 读取一行这里调用一次。
4) Mapper 逻辑完结之后,将 Mapper 的每条后果通过 context.write 进行 collect 数据收集。在 collect 中,会先对其进行分区解决,默认应用 HashPartitioner。
5) 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区(默认 100M),缓冲区的作用是 批量收集 Mapper 后果,缩小磁盘 IO 的影响。咱们的 Key/Value 对以及 Partition 的后果都会被写入缓冲区。当然,写入之前,Key 与 Value 值都会被序列化成字节数组。
6) 当环形缓冲区的数据达到溢写比列 (默认 0.8),也就是 80M 时,溢写线程启动, 须要对这 80MB 空间内的 Key 做排序 (Sort)。排序是 MapReduce 模型默认的行为,这里的排序也是对序列化的字节做的排序。
7) 合并溢写文件,每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner),如果 Mapper 的输入后果真的很大,有屡次这样的溢写产生,磁盘上相应的就会有多个临时文件存在。当整个数据处理完结之后开始对磁盘中的临时文件进行 Merge 合并,因为最终的文件只有一个写入磁盘,并且为这个文件提供了一个索引文件,以记录每个 reduce 对应数据的偏移量。
11. 请说下 MR 中 Reduce Task 的工作机制
简略形容:
Reduce 大抵分为 copy、sort、reduce 三个阶段,重点在前两个阶段。
copy 阶段蕴含一个 eventFetcher 来获取已实现的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,别离为 inMemoryMerger 和 onDiskMerger,别离将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。待数据 copy 实现之后,copy 阶段就实现了。
开始进行 sort 阶段,sort 阶段次要是执行 finalMerge 操作,纯正的 sort 阶段,实现之后就是 reduce 阶段,调用用户定义的 reduce 函数进行解决。
具体步骤:
1) Copy 阶段:简略地拉取数据。Reduce 过程启动一些数据 copy 线程(Fetcher),通过 HTTP 形式申请 maptask 获取属于本人的文件(map task 的分区会标识每个 map task 属于哪个 reduce task,默认 reduce task 的标识从 0 开始)。
2) Merge 阶段:在近程拷贝数据的同时,ReduceTask 启动了两个后盾线程对内存和磁盘上的文件进行合并,以避免内存应用过多或磁盘上文件过多。
merge 有三种模式:内存到内存;内存到磁盘;磁盘到磁盘。默认状况下第一种模式不启用。当内存中的数据量达到肯定阈值,就间接启动内存到磁盘的 merge。与 map 端相似,这也是溢写的过程,这个过程中如果你设置有 Combiner,也是会启用的,而后在磁盘中生成了泛滥的溢写文件。内存到磁盘的 merge 形式始终在运行,直到没有 map 端的数据时才完结,而后启动第三种磁盘到磁盘的 merge 形式生成最终的文件。
3) 合并排序:把扩散的数据合并成一个大的数据后,还会再对合并后的数据排序。
4) 对排序后的键值对调用 reduce 办法:键相等的键值对调用一次 reduce 办法,每次调用会产生零个或者多个键值对,最初把这些输入的键值对写入到 HDFS 文件中。
12. 请说下 MR 中 Shuffle 阶段
shuffle 阶段分为四个步骤:顺次为:分区,排序,规约,分组,其中前三个步骤在 map 阶段实现,最初一个步骤在 reduce 阶段实现。
shuffle 是 Mapreduce 的外围,它散布在 Mapreduce 的 map 阶段和 reduce 阶段。个别把从 Map 产生输入开始到 Reduce 获得数据作为输出之前的过程称作 shuffle。
- Collect 阶段:将 MapTask 的后果输入到默认大小为 100M 的环形缓冲区,保留的是 key/value,Partition 分区信息等。
- Spill 阶段:当内存中的数据量达到肯定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前须要对数据进行一次排序的操作,如果配置了 combiner,还会将有雷同分区号和 key 的数据进行排序。
- MapTask 阶段的 Merge:把所有溢出的临时文件进行一次合并操作,以确保一个 MapTask 最终只产生一个两头数据文件。
- Copy 阶段:ReduceTask 启动 Fetcher 线程到曾经实现 MapTask 的节点上复制一份属于本人的数据,这些数据默认会保留在内存的缓冲区中,当内存的缓冲区达到肯定的阀值的时候,就会将数据写到磁盘之上。
- ReduceTask 阶段的 Merge:在 ReduceTask 近程复制数据的同时,会在后盾开启两个线程对内存到本地的数据文件进行合并操作。
- Sort 阶段:在对数据进行合并的同时,会进行排序操作,因为 MapTask 阶段曾经对数据进行了部分的排序,ReduceTask 只需保障 Copy 的数据的最终整体有效性即可。
Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
缓冲区的大小能够通过参数调整, 参数:mapreduce.task.io.sort.mb
默认 100M
13. Shuffle 阶段的数据压缩机制理解吗
在 shuffle 阶段,能够看到数据通过大量的拷贝,从 map 阶段输入的数据,都要通过网络拷贝,发送到 reduce 阶段,这一过程中,波及到大量的网络 IO,如果数据可能进行压缩,那么数据的发送量就会少得多。
hadoop 当中反对的压缩算法:
gzip、bzip2、LZO、LZ4、Snappy,这几种压缩算法综合压缩和解压缩的速率,谷歌的 Snappy 是最优的,个别都抉择 Snappy 压缩。谷歌出品,必属精品。
14. 在写 MR 时,什么状况下能够应用规约
规约(combiner)是不可能影响工作的运行后果的部分汇总,实用于求和类,不适用于求平均值,如果 reduce 的输出参数类型和输入参数的类型是一样的,则规约的类能够应用 reduce 类,只须要在驱动类中指明规约的类即可。
15. YARN 集群的架构和工作原理晓得多少
YARN 的根本设计思维是将 MapReduce V1 中的 JobTracker 拆分为两个独立的服务:ResourceManager 和 ApplicationMaster。
ResourceManager 负责整个零碎的资源管理和调配,ApplicationMaster 负责单个应用程序的的治理。
1) ResourceManager:
RM 是一个全局的资源管理器,负责整个零碎的资源管理和调配,它次要由两个局部组成:调度器(Scheduler)和应用程序管理器(Application Manager)。
调度器依据容量、队列等限度条件,将零碎中的资源分配给正在运行的应用程序,在保障容量、公平性和服务等级的前提下,优化集群资源利用率,让所有的资源都被充分利用应用程序管理器负责管理整个零碎中的所有的应用程序,包含应用程序的提交、与调度器协商资源以启动 ApplicationMaster、监控 ApplicationMaster 运行状态并在失败时重启它。
2) ApplicationMaster:
用户提交的一个应用程序会对应于一个 ApplicationMaster,它的次要性能有:
- 与 RM 调度器协商以取得资源,资源以 Container 示意。- 将失去的工作进一步调配给外部的工作。- 与 NM 通信以启动 / 进行工作。- 监控所有的外部工作状态,并在工作运行失败的时候从新为工作申请资源以重启工作。
3) NodeManager:
NodeManager 是每个节点上的资源和工作管理器,一方面,它会定期地向 RM 汇报本节点上的资源应用状况和各个 Container 的运行状态;另一方面,他接管并解决来自 AM 的 Container 启动和进行申请。
4) Container:
Container 是 YARN 中的资源形象,封装了各种资源。一个应用程序会调配一个 Container,这个应用程序只能应用这个 Container 中形容的资源。不同于 MapReduceV1 中槽位 slot 的资源封装,Container 是一个动静资源的划分单位,更能充分利用资源。
16. YARN 的工作提交流程是怎么的
当 jobclient 向 YARN 提交一个应用程序后,YARN 将分两个阶段运行这个应用程序:一是启动 ApplicationMaster; 第二个阶段是由 ApplicationMaster 创立应用程序,为它申请资源,监控运行直到完结。
具体步骤如下:
1) 用户向 YARN 提交一个应用程序,并指定 ApplicationMaster 程序、启动 ApplicationMaster 的命令、用户程序。
2) RM 为这个应用程序调配第一个 Container,并与之对应的 NM 通信,要求它在这个 Container 中启动应用程序 ApplicationMaster。
3) ApplicationMaster 向 RM 注册,而后拆分为外部各个子工作,为各个外部工作申请资源,并监控这些工作的运行,直到完结。
4) AM 采纳轮询的形式向 RM 申请和支付资源。
5) RM 为 AM 分配资源,以 Container 模式返回。
6) AM 申请到资源后,便与之对应的 NM 通信,要求 NM 启动工作。
7) NodeManager 为工作设置好运行环境,将工作启动命令写到一个脚本中,并通过运行这个脚本启动工作。
8) 各个工作向 AM 汇报本人的状态和进度,以便当工作失败时能够重启工作。
9) 应用程序实现后,ApplicationMaster 向 ResourceManager 登记并敞开本人。
17. YARN 的资源调度三种模型理解吗
在 Yarn 中有三种调度器能够抉择:FIFO Scheduler,Capacity Scheduler,Fair Scheduler。
Apache 版本的 hadoop 默认应用的是 Capacity Scheduler 调度形式。CDH 版本的默认应用的是 Fair Scheduler 调度形式
FIFO Scheduler(先来先服务):
FIFO Scheduler 把利用按提交的程序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的利用进行分配资源,待最头上的利用需要满足后再给下一个调配,以此类推。
FIFO Scheduler 是最简略也是最容易了解的调度器,也不须要任何配置,但它并不适用于共享集群。大的利用可能会占用所有集群资源,这就导致其它利用被阻塞,比方有个大工作在执行,占用了全副的资源,再提交一个小工作,则此小工作会始终被阻塞。
Capacity Scheduler(能力调度器):
对于 Capacity 调度器,有一个专门的队列用来运行小工作,然而为小工作专门设置一个队列会事后占用肯定的集群资源,这就导致大工作的执行工夫会落后于应用 FIFO 调度器时的工夫。
Fair Scheduler(偏心调度器):
在 Fair 调度器中,咱们不须要事后占用肯定的系统资源,Fair 调度器会为所有运行的 job 动静的调整系统资源。
比方:当第一个大 job 提交时,只有这一个 job 在运行,此时它取得了所有集群资源;当第二个小工作提交后,Fair 调度器会调配一半资源给这个小工作,让这两个工作偏心的共享集群资源。
须要留神的是,在 Fair 调度器中,从第二个工作提交到取得资源会有肯定的提早,因为它须要期待第一个工作开释占用的 Container。小工作执行实现之后也会开释本人占用的资源,大工作又取得了全副的系统资源。最终的成果就是 Fair 调度器即失去了高的资源利用率又能保障小工作及时实现。
Hive
1. Hive 外部表和内部表的区别
未被 external 润饰的是外部表,被 external 润饰的为内部表。
区别:
1) 外部表数据由 Hive 本身治理,内部表数据由 HDFS 治理;
2) 外部表数据存储的地位是 hive.metastore.warehouse.dir
(默认:/user/hive/warehouse
),内部表数据的存储地位由本人制订(如果没有 LOCATION,Hive 将在 HDFS 上的/user/hive/warehouse
文件夹下以内部表的表名创立一个文件夹,并将属于这个表的数据寄存在这里);
3) 删除外部表会间接删除元数据(metadata)及存储数据;删除内部表仅仅会删除元数据,HDFS 上的文件并不会被删除。
2. Hive 有索引吗
Hive 反对索引(3.0 版本之前),然而 Hive 的索引与关系型数据库中的索引并不相同,比方,Hive 不反对主键或者外键。并且 Hive 索引提供的性能很无限,效率也并不高,因而 Hive 索引很少应用。
- 索引实用的场景:
实用于不更新的动态字段。免得总是重建索引数据。每次建设、更新数据后,都要重建索引以构建索引表。
- Hive 索引的机制如下:
hive 在指定列上建设索引,会产生一张索引表(Hive 的一张物理表),外面的字段包含:索引列的值、该值对应的 HDFS 文件门路、该值在文件中的偏移量。
Hive 0.8 版本后引入 bitmap 索引处理器,这个处理器实用于去重后,值较少的列(例如,某字段的取值只可能是几个枚举值)
因为索引是用空间换工夫,索引列的取值过多会导致建设 bitmap 索引表过大。
留神:Hive 中每次有数据时须要及时更新索引,相当于重建一个新表,否则会影响数据查问的效率和准确性,Hive 官网文档曾经明确示意 Hive 的索引不举荐被应用,在新版本的 Hive 中曾经被废除了。
扩大:Hive 是在 0.7 版本之后反对索引的,在 0.8 版本后引入 bitmap 索引处理器,在 3.0 版本开始移除索引的性能,取而代之的是 2.3 版本开始的物化视图,主动重写的物化视图代替了索引的性能。
3. 运维如何对 Hive 进行调度
- 将 hive 的 sql 定义在脚本当中;
- 应用 azkaban 或者 oozie 进行工作的调度;
- 监控任务调度页面。
4. ORC、Parquet 等列式存储的长处
ORC 和 Parquet 都是高性能的存储形式,这两种存储格局总会带来存储和性能上的晋升。
Parquet:
- Parquet 反对嵌套的数据模型,相似于 Protocol Buffers,每一个数据模型的 schema 蕴含多个字段,每一个字段有三个属性:反复次数、数据类型和字段名。
反复次数能够是以下三种:required(只呈现 1 次),repeated(呈现 0 次或屡次),optional(呈现 0 次或 1 次)。每一个字段的数据类型能够分成两种:
group(简单类型)和 primitive(根本类型)。 - Parquet 中没有 Map、Array 这样的简单数据结构,然而能够通过 repeated 和 group 组合来实现的。
- 因为 Parquet 反对的数据模型比拟涣散,可能一条记录中存在比拟深的嵌套关系,如果为每一条记录都保护一个相似的树状结可能会占用较大的存储空间,因而 Dremel 论文中提出了一种高效的对于嵌套数据格式的压缩算法:Striping/Assembly 算法。通过 Striping/Assembly 算法,parquet 能够应用较少的存储空间示意简单的嵌套格局,并且通常 Repetition level 和 Definition level 都是较小的整数值,能够通过 RLE 算法对其进行压缩,进一步升高存储空间。
- Parquet 文件是以二进制形式存储的,是不能够间接读取和批改的,Parquet 文件是自解析的,文件中包含该文件的数据和元数据。
ORC:
- ORC 文件是自描述的,它的元数据应用 Protocol Buffers 序列化,并且文件中的数据尽可能的压缩以升高存储空间的耗费。
- 和 Parquet 相似,ORC 文件也是以二进制形式存储的,所以是不能够间接读取,ORC 文件也是自解析的,它蕴含许多的元数据,这些元数据都是同构 ProtoBuffer 进行序列化的。
- ORC 会尽可能合并多个离散的区间尽可能的缩小 I / O 次数。
- ORC 中应用了更加准确的索引信息,使得在读取数据时能够指定从任意一行开始读取,更细粒度的统计信息使得读取 ORC 文件跳过整个 row group,ORC 默认会对任何一块数据和索引信息应用 ZLIB 压缩,因而 ORC 文件占用的存储空间也更小。
- 在新版本的 ORC 中也退出了对 Bloom Filter 的反对,它能够进一
步晋升谓词下推的效率,在 Hive 1.2.0 版本当前也退出了对此的支
持。
5. 数据建模用的哪些模型?
1. 星型模型
星形模式 (Star Schema) 是最罕用的维度建模形式。星型模式是以事实表为核心,所有的维度表间接连贯在事实表上,像星星一样。
星形模式的维度建模由一个事实表和一组维表成,且具备以下特点:
a. 维表只和事实表关联,维表之间没有关联;
b. 每个维表主键为单列,且该主键搁置在事实表中,作为两边连贯的外键;
c. 以事实表为外围,维表围绕外围呈星形散布。
2. 雪花模型
雪花模式 (Snowflake Schema) 是对星形模式的扩大。雪花模式的维度表能够领有其余维度表的,尽管这种模型相比星型更标准一些,然而因为这种模型不太容易了解,保护老本比拟高,而且性能方面须要关联多层维表,性能比星型模型要低。
3. 星座模型
星座模式是星型模式延长而来,星型模式是基于一张事实表的,而 星座模式是基于多张事实表的,而且共享维度信息。后面介绍的两种维度建模办法都是多维表对应单事实表,但在很多时候维度空间内的事实表不止一个,而一个维表也可能被多个事实表用到。在业务倒退前期,绝大部分维度建模都采纳的是星座模式。
数仓建模具体介绍可查看:通俗易懂数仓建模
6. 为什么要对数据仓库分层?
- 用空间换工夫,通过大量的预处理来晋升利用零碎的用户体验(效率),因而数据仓库会存在大量冗余的数据。
- 如果不分层的话,如果源业务零碎的业务规定发生变化将会影响整个数据荡涤过程,工作量微小。
- 通过数据分层治理能够简化数据荡涤的过程,因为把原来一步的工作分到了多个步骤去实现,相当于把一个简单的工作拆成了多个简略的工作,把一个大的黑盒变成了一个白盒,每一层的解决逻辑都绝对简略和容易了解,这样咱们比拟容易保障每一个步骤的正确性,当数据产生谬误的时候,往往咱们只须要部分调整某个步骤即可。
数据仓库具体介绍可查看:万字详解整个数据仓库建设体系
7. 应用过 Hive 解析 JSON 串吗
Hive 解决 json 数据总体来说有两个方向的路走:
- 将 json 以字符串的形式整个入 Hive 表,而后通过应用 UDF 函数解析曾经导入到 hive 中的数据,比方应用
LATERAL VIEW json_tuple
的办法,获取所须要的列名。 - 在导入之前将 json 拆成各个字段,导入 Hive 表的数据是曾经解析过的。这将须要应用第三方的
SerDe。
具体介绍可查看:Hive 解析 Json 数组超全解说
8. sort by 和 order by 的区别
order by 会对输出做全局排序,因而只有一个 reducer(多个 reducer 无奈保障全局有序)只有一个 reducer,会导致当输出规模较大时,须要较长的计算工夫。
sort by 不是全局排序,其在数据进入 reducer 前实现排序.
因而,如果用 sort by 进行排序,并且设置 mapred.reduce.tasks>1,则sort by 只保障每个 reducer 的输入有序,不保障全局有序。
9. 数据歪斜怎么解决
数据歪斜问题次要有以下几种:
- 空值引发的数据歪斜
- 不同数据类型引发的数据歪斜
- 不可拆分大文件引发的数据歪斜
- 数据收缩引发的数据歪斜
- 表连贯时引发的数据歪斜
- 的确无奈缩小数据量引发的数据歪斜
以上歪斜问题的具体解决方案可查看:Hive 千亿级数据歪斜解决方案
留神:对于 left join 或者 right join 来说,不会对关联的字段主动去除 null 值,对于 inner join 来说,会对关联的字段主动去除 null 值。
小伙伴们在浏览时留神下,在下面的文章(Hive 千亿级数据歪斜解决方案)中,有一处 sql 呈现了上述问题(举例的时候本来是想应用 left join 的,后果手误写成了 join)。此问题由公众号读者发现,感激这位读者斧正。
10. Hive 小文件过多怎么解决
1. 应用 hive 自带的 concatenate 命令,主动合并小文件
应用办法:
# 对于非分区表
alter table A concatenate;
#对于分区表
alter table B partition(day=20201224) concatenate;
留神:
1、concatenate 命令只反对 RCFILE 和 ORC 文件类型。
2、应用 concatenate 命令合并小文件时不能指定合并后的文件数量,但能够屡次执行该命令。
3、当屡次应用 concatenate 后文件数量不在变动,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置无关,可设定每个文件的最小 size。
2. 调整参数缩小 Map 数量
设置 map 输出合并小文件的相干参数(执行 Map 前进行小文件合并):
在 mapper 中将多个文件合成一个 split 作为输出(CombineHiveInputFormat
底层是 Hadoop 的 CombineFileInputFormat
办法):
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认
每个 Map 最大输出大小(这个值决定了合并后文件的数量):
set mapred.max.split.size=256000000; -- 256M
一个节点上 split 的至多大小(这个值决定了多个 DataNode 上的文件是否须要合并):
set mapred.min.split.size.per.node=100000000; -- 100M
一个交换机下 split 的至多大小(这个值决定了多个交换机上的文件是否须要合并):
set mapred.min.split.size.per.rack=100000000; -- 100M
3. 缩小 Reduce 的数量
reduce 的个数决定了输入的文件的个数,所以能够调整 reduce 的个数管制 hive 表的文件数量。
hive 中的分区函数 distribute by 正好是管制 MR 中 partition 分区的,能够通过设置 reduce 的数量,联合分区函数让数据平衡的进入每个 reduce 即可:
# 设置 reduce 的数量有两种形式,第一种是间接设置 reduce 个数
set mapreduce.job.reduces=10;
#第二种是设置每个 reduce 的大小,Hive 会依据数据总大小猜想确定一个 reduce 个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是 1G,设置为 5G
#执行以下语句,将数据平衡的调配到 reduce 中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();
对于上述语句解释:如设置 reduce 数量为 10,应用 rand(),随机生成一个数 x % 10
,
这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小。
4. 应用 hadoop 的 archive 将小文件归档
Hadoop Archive 简称 HAR,是一个高效地将小文件放入 HDFS 块中的文件存档工具,它可能将多个小文件打包成一个 HAR 文件,这样在缩小 namenode 内存应用的同时,依然容许对文件进行通明的拜访。
# 用来管制归档是否可用
set hive.archive.enabled=true;
#告诉 Hive 在创立归档时是否能够设置父目录
set hive.archive.har.parentdir.settable=true;
#管制须要归档文件的大小
set har.partfile.size=1099511627776;
应用以下命令进行归档:ALTER TABLE A ARCHIVE PARTITION(dt='2021-05-07', hr='12');
对已归档的分区复原为原文件:ALTER TABLE A UNARCHIVE PARTITION(dt='2021-05-07', hr='12');
留神:
归档的分区能够查看不能 insert overwrite,必须先 unarchive
Hive 小文件问题具体可查看:解决 hive 小文件过多问题
11. Hive 优化有哪些
1. 数据存储及压缩:
针对 hive 中表的存储格局通常有 orc 和 parquet,压缩格局个别应用 snappy。相比与 textfile 格局表,orc 占有更少的存储。因为 hive 底层应用 MR 计算架构,数据流是 hdfs 到磁盘再到 hdfs,而且会有很屡次,所以应用 orc 数据格式和 snappy 压缩策略能够升高 IO 读写,还能升高网络传输量,这样在肯定水平上能够节俭存储,还能晋升 hql 工作执行效率;
2. 通过调参优化:
并行执行,调节 parallel 参数;
调节 jvm 参数,重用 jvm;
设置 map、reduce 的参数;开启 strict mode 模式;
敞开揣测执行设置。
3. 无效地减小数据集将大表拆分成子表;联合应用内部表和分区表。
4. SQL 优化
- 大表对大表:尽量减少数据集,能够通过分区表,防止扫描全表或者全字段;
- 大表对小表:设置自动识别小表,将小表放入内存中去执行。
Hive 优化具体分析可查看:Hive 企业级性能优化
Spark
1. Spark 的运行流程?
具体运行流程如下:
- SparkContext 向资源管理器注册并向资源管理器申请运行 Executor
- 资源管理器调配 Executor,而后资源管理器启动 Executor
- Executor 发送心跳至资源管理器
- SparkContext 构建 DAG 有向无环图
- 将 DAG 分解成 Stage(TaskSet)
- 把 Stage 发送给 TaskScheduler
- Executor 向 SparkContext 申请 Task
- TaskScheduler 将 Task 发送给 Executor 运行
- 同时 SparkContext 将利用程序代码发放给 Executor
- Task 在 Executor 上运行,运行结束开释所有资源
2. Spark 有哪些组件?
- master:治理集群和节点,不参加计算。
- worker:计算节点,过程自身不参加计算,和 master 汇报。
- Driver:运行程序的 main 办法,创立 spark context 对象。
- spark context:管制整个 application 的生命周期,包含 dagsheduler 和 task scheduler 等组件。
- client:用户提交程序的入口。
3. Spark 中的 RDD 机制了解吗?
rdd 分布式弹性数据集,简略的了解成一种数据结构,是 spark 框架上的通用货币。所有算子都是基于 rdd 来执行的,不同的场景会有不同的 rdd 实现类,然而都能够进行相互转换。rdd 执行过程中会造成 dag 图,而后造成 lineage 保障容错性等。从物理的角度来看 rdd 存储的是 block 和 node 之间的映射。
RDD 是 spark 提供的外围形象,全称为弹性分布式数据集。
RDD 在逻辑上是一个 hdfs 文件,在形象上是一种元素汇合,蕴含了数据。它是被分区的,分为多个分区,每个分区散布在集群中的不同结点上,从而让 RDD 中的数据能够被并行操作(分布式数据集)
比方有个 RDD 有 90W 数据,3 个 partition,则每个分区上有 30W 数据。RDD 通常通过 Hadoop 上的文件,即 HDFS 或者 HIVE 表来创立,还能够通过应用程序中的汇合来创立;RDD 最重要的个性就是容错性,能够主动从节点失败中恢复过来。即如果某个结点上的 RDD partition 因为节点故障,导致数据失落,那么 RDD 能够通过本人的数据起源从新计算该 partition。这所有对使用者都是通明的。
RDD 的数据默认寄存在内存中,然而当内存资源有余时,spark 会主动将 RDD 数据写入磁盘。比方某结点内存只能解决 20W 数据,那么这 20W 数据就会放入内存中计算,剩下 10W 放到磁盘中。RDD 的弹性体当初于 RDD 上主动进行内存和磁盘之间衡量和切换的机制。
4. RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么?
reduceByKey:reduceByKey 会在后果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点相似于在 MapReduce 中的 combiner。这样做的益处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保障 reduce 端可能更快的进行后果计算。
groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合造成一个序列(Iterator),此操作产生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的节约。同时如果数据量非常大,可能还会造成 OutOfMemoryError。
所以在进行大量数据的 reduce 操作时候倡议应用 reduceByKey。不仅能够进步速度,还能够避免应用 groupByKey 造成的内存溢出问题。
5. 介绍一下 cogroup rdd 实现原理,你在什么场景下用过这个 rdd?
cogroup:对多个(2~4)RDD 中的 KV 元素,每个 RDD 中雷同 key 中的元素别离聚合成一个汇合。
与 reduceByKey 不同的是 :reduceByKey 针对 一个 RDD中雷同的 key 进行合并。而 cogroup 针对 多个 RDD中雷同的 key 的元素进行合并。
cogroup 的函数实现:这个实现依据要进行合并的两个 RDD 操作,生成一个 CoGroupedRDD 的实例,这个 RDD 的返回后果是把雷同的 key 中两个 RDD 别离进行合并操作,最初返回的 RDD 的 value 是一个 Pair 的实例,这个实例蕴含两个 Iterable 的值,第一个值示意的是 RDD1 中雷同 KEY 的值,第二个值示意的是 RDD2 中雷同 key 的值。
因为做 cogroup 的操作,须要通过 partitioner 进行从新分区的操作,因而,执行这个流程时,须要执行一次 shuffle 的操作(如果要进行合并的两个 RDD 的都曾经是 shuffle 后的 rdd,同时他们对应的 partitioner 雷同时,就不须要执行 shuffle)。
场景:表关联查问或者解决反复的 key。
6. 如何辨别 RDD 的宽窄依赖?
窄依赖: 父 RDD 的一个分区只会被子 RDD 的一个分区依赖;
宽依赖: 父 RDD 的一个分区会被子 RDD 的多个分区依赖(波及到 shuffle)。
7. 为什么要设计宽窄依赖?
- _对于窄依赖_:
\
窄依赖的多个分区能够并行计算;
\
窄依赖的一个分区的数据如果失落只须要从新计算对应的分区的数据就能够了。 - _对于宽依赖_:
\
划分 Stage(阶段)的根据: 对于宽依赖, 必须等到上一阶段计算实现能力计算下一阶段。
8. DAG 是什么?
DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程,有方向,无闭环 (其实就是 RDD 执行的流程);
\
原始的 RDD 通过一系列的转换操作就造成了 DAG 有向无环图,工作执行时,能够依照 DAG 的形容,执行真正的计算(数据被操作的一个过程)。
9. DAG 中为什么要划分 Stage?
并行计算。
一个简单的业务逻辑如果有 shuffle,那么就意味着后面阶段产生后果后,能力执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么咱们依照 shuffle 进行划分(也就是依照宽依赖就行划分),就能够将一个 DAG 划分成多个 Stage/ 阶段,在同一个 Stage 中,会有多个算子操作,能够造成一个 pipeline 流水线,流水线内的多个平行的分区能够并行执行。
10. 如何划分 DAG 的 stage?
对于窄依赖,partition 的转换解决在 stage 中实现计算,不划分(将窄依赖尽量放在在同一个 stage 中,能够实现流水线计算)。
对于宽依赖,因为有 shuffle 的存在,只能在父 RDD 解决实现后,能力开始接下来的计算,也就是说须要要划分 stage。
11. DAG 划分为 Stage 的算法理解吗?
外围算法:回溯算法
从后往前回溯 / 反向解析,遇到窄依赖退出本 Stage,遇见宽依赖进行 Stage 切分。
Spark 内核会从触发 Action 操作的那个 RDD 开始 从后往前推 ,首先会为最初一个 RDD 创立一个 Stage,而后持续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创立一个新的 Stage,那个 RDD 就是新的 Stage 的最初一个 RDD。
而后顺次类推,持续倒推,依据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全副遍历实现为止。
具体划分算法请参考:AMP 实验室发表的论文
\
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
\http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se
12. 对于 Spark 中的数据歪斜问题你有什么好的计划?
- 前提是定位数据歪斜,是 OOM 了,还是工作执行迟缓,看日志,看 WebUI
- 解决办法,有多个方面:
- 防止不必要的 shuffle,如应用播送小表的形式,将 reduce-side-join 晋升为 map-side-join
- 分拆产生数据歪斜的记录,分成几个局部进行,而后合并 join 后的后果
- 扭转并行度,可能并行度太少了,导致个别 task 数据压力大
- 两阶段聚合,先部分聚合,再全局聚合
- 自定义 paritioner,扩散 key 的散布,使其更加平均
13. Spark 中的 OOM 问题?
- map 类型的算子执行中内存溢出如 flatMap,mapPatitions
- 起因:map 端过程产生大量对象导致内存溢出:这种溢出的起因是在单个 map 中产生了大量的对象导致的针对这种问题。
- 解决方案:
- 减少堆内内存。
- 在不减少内存的状况下,能够缩小每个 Task 解决数据量,使每个 Task 产生大量的对象时,Executor 的内存也可能装得下。具体做法能够在会产生大量对象的 map 操作之前调用 repartition 办法,分区成更小的块传入 map。
- shuffle 后内存溢出如 join,reduceByKey,repartition。
- shuffle 内存溢出的状况能够说都是 shuffle 后,单个文件过大导致的。在 shuffle 的应用,须要传入一个 partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner,默认值是父 RDD 中最大的分区数.这个参数 spark.default.parallelism 只对 HashPartitioner 无效.如果是别的 partitioner 导致的 shuffle 内存溢出就须要重写 partitioner 代码了.
- driver 内存溢出
- 用户在 Dirver 端口生成大对象,比方创立了一个大的汇合数据结构。解决方案:将大对象转换成 Executor 端加载,比方调用 sc.textfile 或者评估大对象占用的内存,减少 dirver 端的内存
- 从 Executor 端收集数据(collect)回 Dirver 端,倡议将 driver 端对 collect 回来的数据所作的操作,转换成 executor 端 rdd 操作。
14. Spark 中数据的地位是被谁治理的?
每个数据分片都对应具体物理地位,数据的地位是被 blockManager 治理,无论数据是在磁盘,内存还是 tacyan,都是由 blockManager 治理。
15. Spaek 程序执行,有时候默认为什么会产生很多 task,怎么批改默认 task 执行个数?
- 输出数据有很多 task,尤其是有很多小文件的时候,有多少个输出
block 就会有多少个 task 启动; - spark 中有 partition 的概念,每个 partition 都会对应一个 task,task 越多,在解决大规模数据的时候,就会越有效率。不过 task 并不是越多越好,如果平时测试,或者数据量没有那么大,则没有必要 task 数量太多。
- 参数能够通过 spark_home/conf/spark-default.conf 配置文件设置:
针对 spark sql 的 task 数量:spark.sql.shuffle.partitions=50
非 spark sql 程序设置失效:spark.default.parallelism=10
16. 介绍一下 join 操作优化教训?
这道题常考,这里只是给大家一个思路,简略说下!面试之前还需做更多筹备。
join 其实常见的就分为两类:map-side join 和 reduce-side join。
当大表和小表 join 时,用 map-side join 能显著提高效率。
将多份数据进行关联是数据处理过程中十分广泛的用法,不过在分布式计算零碎中,这个问题往往会变的十分麻烦,因为框架提供的 join 操作个别会将所有数据依据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘 IO 耗费,运行效率极其低下,这个过程个别被称为 reduce-side-join。
如果其中有张表较小的话,咱们则能够本人实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行工夫失去大量缩短,依据不同数据可能会有几倍到数十倍的性能晋升。
在大数据量的状况下,join 是一中十分低廉的操作,须要在 join 之前应尽可能的先放大数据量。
对于放大数据量,有以下几条倡议:
- 若两个 RDD 都有反复的 key,join 操作会使得数据量会急剧的扩充。所有,最好先应用 distinct 或者 combineByKey 操作来缩小 key 空间或者用 cogroup 来解决反复的 key,而不是产生所有的穿插后果。在 combine 时,进行机智的分区,能够防止第二次 shuffle。
- 如果只在一个 RDD 呈现,那你将在无心中失落你的数据。所以应用外连贯会更加平安,这样你就能确保右边的 RDD 或者左边的 RDD 的数据完整性,在 join 之后再过滤数据。
- 如果咱们容易失去 RDD 的能够的有用的子集合,那么咱们能够先用 filter 或者 reduce,如何在再用 join。
17. Spark 与 MapReduce 的 Shuffle 的区别?
- 相同点:都是将 mapper(Spark 里是 ShuffleMapTask)的输入进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)
- 不同点:
- MapReduce 默认是排序的,spark 默认不排序,除非应用 sortByKey 算子。
- MapReduce 能够划分成 split,map()、spill、merge、shuffle、sort、reduce()等阶段,spark 没有显著的阶段划分,只有不同的 stage 和算子操作。
- MR 落盘,Spark 不落盘,spark 能够解决 mr 落盘导致效率低下的问题。
18. Spark SQL 执行的流程?
这个问题如果深挖还挺简单的,这里简略介绍下总体流程:
- parser:基于 antlr 框架对 sql 解析,生成形象语法树。
- 变量替换:通过正则表达式找出合乎规定的字符串,替换成零碎缓存环境的变量
SQLConf 中的
spark.sql.variable.substitute
,默认是可用的;参考SparkSqlParser
- parser:将 antlr 的 tree 转成 spark catalyst 的 LogicPlan,也就是 未解析的逻辑打算;具体参考
AstBuild
,ParseDriver
- analyzer:通过分析器,联合 catalog,把 logical plan 和理论的数据绑定起来,将 未解析的逻辑打算 生成 逻辑打算;具体参考
QureyExecution
- 缓存替换:通过 CacheManager,替换有雷同后果的 logical plan(逻辑打算)
- logical plan 优化,基于规定的优化;优化规定参考 Optimizer,优化执行器 RuleExecutor
- 生成 spark plan,也就是物理打算;参考
QueryPlanner
和SparkStrategies
- spark plan 筹备阶段
- 结构 RDD 执行,波及 spark 的 wholeStageCodegenExec 机制,基于 janino 框架生成 java 代码并编译
19. Spark SQL 是如何将数据写到 Hive 表的?
- 形式一:是利用 Spark RDD 的 API 将数据写入 hdfs 造成 hdfs 文件,之后再将 hdfs 文件和 hive 表做加载映射。
- 形式二:利用 Spark SQL 将获取的数据 RDD 转换成 DataFrame,再将 DataFrame 写成缓存表,最初利用 Spark SQL 直接插入 hive 表中。而对于利用 Spark SQL 写 hive 表官网有两种常见的 API,第一种是利用 JavaBean 做映射,第二种是利用 StructType 创立 Schema 做映射。
20. 通常来说,Spark 与 MapReduce 相比,Spark 运行效率更高。请阐明效率更高来源于 Spark 内置的哪些机制?
- 基于内存计算,缩小低效的磁盘交互;
- 高效的调度算法,基于 DAG;
- 容错机制 Linage。
重点局部就是 DAG 和 Lingae
21. Hadoop 和 Spark 的相同点和不同点?
Hadoop 底层应用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比拟欠缺,而且在 MR 过程中会反复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适宜高时延环境下批处理计算的利用;
Spark 是基于内存的分布式计算架构,提供更加丰盛的数据集操作类型,次要分成转化操作和口头操作,包含 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加疾速,所以适宜低时延环境下计算的利用;
spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 次要分为 map 和 reduce 两个阶段,两个阶段完了就完结了,所以在一个 job 外面能做的解决很无限;spark 计算模型是基于内存的迭代式计算模型,能够分为 n 个阶段,依据用户编写的 RDD 算子和程序,在解决完一个阶段后能够持续往下解决很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵便,能够提供更弱小的性能。
然而 spark 也有劣势,因为 spark 基于内存进行计算,尽管开发容易,然而真正面对大数据的时候,在没有进行调优的状况下,可能会呈现各种各样的问题,比方 OOM 内存溢出等状况,导致 spark 程序可能无奈运行起来,而 mapreduce 尽管运行迟缓,然而至多能够缓缓运行完。
22. Hadoop 和 Spark 应用场景?
Hadoop/MapReduce 和 Spark 最适宜的都是做离线型的数据分析,但 Hadoop 特地适宜是单次剖析的数据量“很大”的情景,而 Spark 则实用于数据量不是很大的情景。
- 个别状况下,对于中小互联网和企业级的大数据利用而言,单次剖析的数量都不会“很大”,因而能够优先思考应用 Spark。
- 业务通常认为 Spark 更实用于机器学习之类的“迭代式”利用,80GB 的压缩数据(解压后超过 200GB),10 个节点的集群规模,跑相似“sum+group-by”的利用,MapReduce 花了 5 分钟,而 spark 只须要 2 分钟。
23. Spark 如何保障宕机迅速复原?
- 适当减少 spark standby master
- 编写 shell 脚本,定期检测 master 状态,呈现宕机后对 master 进行重启操作
24. RDD 长久化原理?
spark 十分重要的一个性能个性就是能够将 RDD 长久化在内存中。
调用 cache()和 persist()办法即可。cache()和 persist()的区别在于,cache()是 persist()的一种简化形式,cache()的底层就是调用 persist()的无参版本 persist(MEMORY_ONLY),将数据长久化到内存中。
如果须要从内存中革除缓存,能够应用 unpersist()办法。RDD 长久化是能够手动抉择不同的策略的。在调用 persist()时传入对应的 StorageLevel 即可。
25. Checkpoint 检查点机制?
利用场景:当 spark 应用程序特地简单,从初始的 RDD 开始到最初整个应用程序实现有很多的步骤,而且整个利用运行工夫特地长,这种状况下就比拟适宜应用 checkpoint 性能。
起因:对于特地简单的 Spark 利用,会呈现某个重复应用的 RDD,即便之前长久化过但因为节点的故障导致数据失落了,没有容错机制,所以须要从新计算一次数据。
Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()办法,设置一个容错的文件系统的目录,比如说 HDFS;而后对 RDD 调用 checkpoint()办法。之后在 RDD 所处的 job 运行完结之后,会启动一个独自的 job,来将 checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类长久化操作。
检查点机制是咱们在 spark streaming 中用来保障容错性的次要机制,它能够使 spark streaming 阶段性的把利用数据存储到诸如 HDFS 等牢靠存储系统中,以供复原时应用。具体来说基于以下两个目标服务:
- 管制产生失败时须要重算的状态数。Spark streaming 能够通过转化图的谱系图来重算状态,检查点机制则能够管制须要在转化图中回溯多远。
- 提供驱动器程序容错。如果流计算利用中的驱动器程序解体了,你能够重启驱动器程序并让驱动器程序从检查点复原,这样 spark streaming 就能够读取之前运行的程序处理数据的进度,并从那里持续。
26. Checkpoint 和长久化机制的区别?
最次要的区别在于长久化只是将数据保留在 BlockManager 中,然而 RDD 的 lineage(血缘关系,依赖关系)是不变的。然而 checkpoint 执行完之后,rdd 曾经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就扭转了。
长久化的数据失落的可能性更大,因为节点的故障会导致磁盘、内存的数据失落。然而 checkpoint 的数据通常是保留在高可用的文件系统中,比方 HDFS 中,所以数据失落可能性比拟低
27. Spark Streaming 以及根本工作原理?
Spark streaming 是 spark core API 的一种扩大,能够用于进行大规模、高吞吐量、容错的实时数据流的解决。
它反对从多种数据源读取数据,比方 Kafka、Flume、Twitter 和 TCP Socket,并且可能应用算子比方 map、reduce、join 和 window 等来解决数据,解决后的数据能够保留到文件系统、数据库等存储中。
Spark streaming 外部的根本工作原理是:承受实时输出数据流,而后将数据拆分成 batch,比方每收集一秒的数据封装成一个 batch,而后将每个 batch 交给 spark 的计算引擎进行解决,最初会生产处一个后果数据流,其中的数据也是一个一个的 batch 组成的。
28. DStream 以及根本工作原理?
DStream 是 spark streaming 提供的一种高级形象,代表了一个继续一直的数据流。
DStream 能够通过输出数据源来创立,比方 Kafka、flume 等,也能够通过其余 DStream 的高阶函数来创立,比方 map、reduce、join 和 window 等。
DStream 外部其实一直产生 RDD,每个 RDD 蕴含了一个时间段的数据。
Spark streaming 肯定是有一个输出的 DStream 接收数据,依照工夫划分成一个一个的 batch,并转化为一个 RDD,RDD 的数据是扩散在各个子节点的 partition 中。
29. Spark Streaming 整合 Kafka 的两种模式?
- receiver 形式 :将数据拉取到 executor 中做操作,若数据量大,内存存储不下,能够通过 WAL,设置了本地存储,保证数据不失落,而后应用 Kafka 高级 API 通过 zk 来保护偏移量,保障生产数据。receiver 生产的数据偏移量是在 zk 获取的, 此形式效率低,容易呈现数据失落。
- receiver 形式的容错性:在默认的配置下,这种形式可能会因为底层的失败而失落数据。如果要启用高牢靠机制,让数据零失落,就必须启用 Spark Streaming 的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接管到的 Kafka 数据写入分布式文件系统(比方 HDFS)上的预写日志中。所以,即便底层节点呈现了失败,也能够应用预写日志中的数据进行复原。
- Kafka 中的 topic 的 partition,与 Spark 中的 RDD 的 partition 是没有关系的。在 1、KafkaUtils.createStream()中,进步 partition 的数量,只会减少 Receiver 形式中读取 partition 的线程的数量。不会减少 Spark 解决数据的并行度。能够创立多个 Kafka 输出 DStream,应用不同的 consumer group 和 topic,来通过多个 receiver 并行接收数据。
- 基于 Direct 形式 : 应用 Kafka 底层 Api,其消费者间接连贯 kafka 的分区上,因为 createDirectStream 创立的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,然而须要本人保护偏移量,即用即取,不会给内存造成太大的压力,效率高。
- 长处:简化并行读取:如果要读取多个 partition,不须要创立多个输出 DStream 而后对它们进行 union 操作。Spark 会创立跟 Kafka partition 一样多的 RDD partition,并且会并行从 Kafka 中读取数据。所以在 Kafka partition 和 RDD partition 之间,有一个一对一的映射关系。
- 高性能:如果要保障零数据失落,在基于 receiver 的形式中,须要开启 WAL 机制。这种形式其实效率低下,因为数据实际上被复制了两份,Kafka 本人自身就有高牢靠的机制,会对数据复制一份,而这里又会复制一份到 WAL 中。而基于 direct 的形式,不依赖 Receiver,不须要开启 WAL 机制,只有 Kafka 中作了数据的复制,那么就能够通过 Kafka 的正本进行复原。
- receiver 与和 direct 的比拟:
- 基于 receiver 的形式,是应用 Kafka 的高阶 API 来在 ZooKeeper 中保留生产过的 offset 的。这是生产 Kafka 数据的传统形式。这种形式配合着 WAL 机制能够保证数据零失落的高可靠性,然而却无奈保证数据被解决一次且仅一次,可能会解决两次。因为 Spark 和 ZooKeeper 之间可能是不同步的。
- 基于 direct 的形式,应用 Kafka 的低阶 API,Spark Streaming 本人就负责追踪生产的 offset,并保留在 checkpoint 中。Spark 本人肯定是同步的,因而能够保证数据是生产一次且仅生产一次。
- Receiver 形式是通过 zookeeper 来连贯 kafka 队列,Direct 形式是间接连贯到 kafka 的节点上获取数据。
30. Spark 主备切换机制原理晓得吗?
Master 实际上能够配置两个,Spark 原生的 standalone 模式是反对 Master 主备切换的。当 Active Master 节点挂掉当前,咱们能够将 Standby Master 切换为 Active Master。
Spark Master 主备切换能够基于两种机制,一种是基于文件系统的,一种是基于 ZooKeeper 的。
基于文件系统的主备切换机制,须要在 Active Master 挂掉之后手动切换到 Standby Master 上;
而基于 Zookeeper 的主备切换机制,能够实现主动切换 Master。
31. Spark 解决了 Hadoop 的哪些问题?
-
MR:抽象层次低,须要应用手工代码来实现程序编写,应用上难以上手;
Spark:Spark 采纳 RDD 计算模型,简略容易上手。
-
MR:只提供 map 和 reduce 两个操作,表达能力欠缺;
Spark:Spark 采纳更加丰盛的算子模型,包含 map、flatmap、groupbykey、reducebykey 等;
-
MR:一个 job 只能蕴含 map 和 reduce 两个阶段,简单的工作须要蕴含很多个 job,这些 job 之间的治理以来须要开发者本人进行治理;
Spark:Spark 中一个 job 能够蕴含多个转换操作,在调度时能够生成多个 stage,而且如果多个 map 操作的分区不变,是能够放在同一个 task 外面去执行;
-
MR:两头后果寄存在 hdfs 中;
Spark:Spark 的两头后果个别存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;
-
MR:只有等到所有的 map task 执行结束后能力执行 reduce task;
Spark:Spark 中分区雷同的转换形成流水线在一个 task 中执行,分区不同的须要进行 shuffle 操作,被划分成不同的 stage 须要期待后面的 stage 执行完能力执行。
-
MR:只适宜 batch 批处理,时延高,对于交互式解决和实时处理反对不够;
Spark:Spark streaming 能够将流拆成工夫距离的 batch 进行解决,实时计算。
32. 数据歪斜的产生和解决办法?
数据歪斜认为着某一个或者某几个 partition 的数据特地大,导致这几个 partition 上的计算须要消耗相当长的工夫。
在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而一个 stage 外面的多个 task 是能够并行执行,task 数目由 partition 数目决定,如果一个 partition 的数目特地大,那么导致这个 task 执行工夫很长,导致接下来的 stage 无奈执行,从而导致整个 job 执行变慢。
防止数据歪斜,个别是要选用适合的 key,或者本人定义相干的 partitioner,通过加盐或者哈希值来拆分这些 key,从而将这些数据扩散到不同的 partition 去执行。
如下算子会导致 shuffle 操作,是导致数据歪斜可能产生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;
33. 你用 Spark Sql 解决的时候,处理过程中用的 DataFrame 还是间接写的 Sql?为什么?
这个问题的主旨是问你 spark sql 中 dataframe 和 sql 的区别,从执行原理、操作不便水平和自定义水平来剖析
这个问题。
34. Spark Master HA 主从切换过程不会影响到集群已有作业的运行,为什么?
不会的。
因为程序在运行之前,曾经申请过资源了,driver 和 Executors 通信,不须要和 master 进行通信的。
35. Spark Master 应用 Zookeeper 进行 HA,有哪些源数据保留到 Zookeeper 外面?
spark 通过这个参数 spark.deploy.zookeeper.dir 指定 master 元数据在 zookeeper 中保留的地位,包含 Worker,Driver 和 Application 以及 Executors。standby 节点要从 zk 中,取得元数据信息,复原集群运行状态,能力对外持续提供服务,作业提交资源申请等,在复原前是不能承受申请的。
注:Master 切换须要留神 2 点:
1、在 Master 切换的过程中,所有的曾经在运行的程序皆失常运行!
因为 Spark Application 在运行前就曾经通过 Cluster Manager 取得了计算资源,所以在运行时 Job 自身的
调度和解决和 Master 是没有任何关系。
2、在 Master 的切换过程中惟一的影响是不能提交新的 Job:一方面不可能提交新的应用程序给集群,
因为只有 Active Master 能力承受新的程序的提交申请;另外一方面,曾经运行的程序中也不可能因
Action 操作触发新的 Job 的提交申请。
36. 如何实现 Spark Streaming 读取 Flume 中的数据?
能够这样说:
- 后期通过技术调研,查看官网相干材料,发现 sparkStreaming 整合 flume 有 2 种模式,一种是拉模式,一种是推模式,而后在简略的聊聊这 2 种模式的特点,以及如何部署实现,须要做哪些事件,最初比照两种模式的特点,抉择那种模式更好。
- 推模式:Flume 将数据 Push 推给 Spark Streaming
- 拉模式:Spark Streaming 从 flume 中 Poll 拉取数据
37. 在理论开发的时候是如何保证数据不失落的?
能够这样说:
- flume 那边采纳的 channel 是将数据落地到磁盘中,保障数据源端安全性(能够在补充一下,flume 在这里的 channel 能够设置为 memory 内存中,进步数据接管解决的效率,然而因为数据在内存中,平安机制保障不了,故抉择 channel 为磁盘存储。整个流程运行有一点的提早性)
- sparkStreaming 通过拉模式整合的时候,应用了 FlumeUtils 这样一个类,该类是须要依赖一个额定的 jar 包(spark-streaming-flume_2.10)
-
要想保证数据不失落,数据的准确性,能够在构建 StreamingConext 的时候,利用 StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)来创立一个 StreamingContext, 应用 StreamingContext.getOrCreate 来创立 StreamingContext 对象,传入的第一个参数是 checkpoint 的寄存目录,第二参数是生成 StreamingContext 对象的用户自定义函数。如果 checkpoint 的寄存目录存在,则从这个目录中生成 StreamingContext 对象;如果不存在,才会调用第二个函数来生成新的 StreamingContext 对象。在 creatingFunc 函数中,除了生成一个新的 StreamingContext 操作,还须要实现各种操作,而后调用 ssc.checkpoint(checkpointDirectory)来初始化 checkpoint 性能,最初再返回 StreamingContext 对象。
这样,在 StreamingContext.getOrCreate 之后,就能够间接调用 start()函数来启动(或者是从中断点持续运行)流式利用了。如果有其余在启动或持续运行都要做的工作,能够在 start()调用前执行。
38. RDD 有哪些缺点?
- 不反对细粒度的写和更新操作,Spark 写数据是粗粒度的,所谓粗粒度,就是批量写入数据,目标是为了提高效率。然而 Spark 读数据是细粒度的,也就是说能够一条条的读。
- 不反对增量迭代计算,如果对 Flink 相熟,能够说下 Flink 反对增量迭代计算。
Kafka
1. 为什么要应用 kafka?
- 缓冲和削峰:上游数据时有突发流量,上游可能扛不住,或者上游没有足够多的机器来保障冗余,kafka 在两头能够起到一个缓冲的作用,把音讯暂存在 kafka 中,上游服务就能够依照本人的节奏进行缓缓解决。
- 解耦和扩展性:我的项目开始的时候,并不能确定具体需要。音讯队列能够作为一个接口层,解耦重要的业务流程。只须要恪守约定,针对数据编程即可获取扩大能力。
- 冗余:能够采纳一对多的形式,一个生产者公布音讯,能够被多个订阅 topic 的服务生产到,供多个毫无关联的业务应用。
- 健壮性:音讯队列能够沉积申请,所以生产端业务即便短时间死掉,也不会影响次要业务的失常进行。
- 异步通信:很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。
2. Kafka 生产过的音讯如何再生产?
kafka 生产音讯的 offset 是定义在 zookeeper 中的,如果想反复生产 kafka 的音讯,能够在 redis 中本人记录 offset 的 checkpoint 点(n 个),当想反复生产音讯时,通过读取 redis 中的 checkpoint 点进行 zookeeper 的 offset 重设,这样就能够达到反复生产音讯的目标了
3. kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
kafka 应用的是磁盘存储。
速度快是因为:
- 程序写入:因为硬盘是机械构造,每次读写都会寻址 -> 写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘“厌恶”随机 I /O,喜爱程序 I /O。为了进步读写硬盘的速度,Kafka 就是应用程序 I /O。
- Memory Mapped Files(内存映射文件):64 位操作系统中个别能够示意 20G 的数据文件,它的工作原理是间接利用操作系统的 Page 来实现文件到物理内存的间接映射。实现映射之后你对物理内存的操作会被同步到硬盘上。
- Kafka 高效文件存储设计:Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期革除或删除曾经生产完文件,缩小磁盘占用。通过索引信息能够疾速定位
message 和确定 response 的 大 小。通过 index 元数据全副映射到 memory(内存映射文件),
能够防止 segment file 的 IO 磁盘操作。通过索引文件稠密存储,能够大幅升高 index 文件元数据占用空间大小。
注:
- Kafka 解决查问效率的伎俩之一是将数据文件分段,比方有 100 条 Message,它们的 offset 是从 0 到 99。假如将数据文件分成 5 段,第一段为 0 -19,第二段为 20-39,以此类推,每段放在一个独自的数据文件外面,数据文件以该段中 小的 offset 命名。这样在查找指定 offset 的
Message 的时候,用二分查找就能够定位到该 Message 在哪个段中。 - 为数据文件建 索引数据文件分段 使得能够在一个较小的数据文件中查找对应 offset 的 Message 了,然而这仍然须要程序扫描能力找到对应 offset 的 Message。
为了进一步提高查找的效率,Kafka 为每个分段后的数据文件建设了索引文件,文件名与数据文件的名字是一样的,只是文件扩大名为.index。
4. Kafka 数据怎么保障不失落?
分三个点说,一个是生产者端,一个消费者端,一个 broker 端。
- 生产者数据的不失落
kafka 的 ack 机制:在 kafka 发送数据的时候,每次发送音讯都会有一个确认反馈机制,确保音讯失常的可能被收到,其中状态有 0,1,-1。
如果是同步模式:
ack 设置为 0,危险很大,个别不倡议设置为 0。即便设置为 1,也会随着 leader 宕机失落数据。所以如果要严格保障生产端数据不失落,可设置为 -1。
如果是异步模式:
也会思考 ack 的状态,除此之外,异步模式下的有个 buffer,通过 buffer 来进行控制数据的发送,有两个值来进行管制,工夫阈值与音讯的数量阈值,如果 buffer 满了数据还没有发送进来,有个选项是配置是否立刻清空 buffer。能够设置为 -1,永恒阻塞,也就数据不再生产。异步模式下,即便设置为 -1。也可能因为程序员的不迷信操作,操作数据失落,比方 kill -9,但这是特地的例外情况。
注:
ack=0:producer 不期待 broker 同步实现的确认,持续发送下一条 (批) 信息。
ack=1(默认):producer 要期待 leader 胜利收到数据并失去确认,才发送下一条 message。
ack=-1:producer 失去 follwer 确认,才发送下一条数据。
- 消费者数据的不失落
通过 offset commit 来保证数据的不失落,kafka 本人记录了每次生产的 offset 数值,下次持续生产的时候,会接着上次的 offset 进行生产。
而 offset 的信息在 kafka0.8 版本之前保留在 zookeeper 中,在 0.8 版本之后保留到 topic 中,即便消费者在运行过程中挂掉了,再次启动的时候会找到 offset 的值,找到之前生产音讯的地位,接着生产,因为 offset 的信息写入的时候并不是每条音讯生产实现后都写入的,所以这种状况有可能会造成反复生产,然而不会失落音讯。
惟一例外的状况是,咱们在程序中给本来做不同性能的两个 consumer 组设置
KafkaSpoutConfig.bulider.setGroupid 的时候设置成了一样的 groupid,这种状况会导致这两个组共享同一份数据,就会产生组 A 生产 partition1,partition2 中的音讯,组 B 生产 partition3 的音讯,这样每个组生产的音讯都会失落,都是不残缺的。为了保障每个组都独享一份音讯数据,groupid 肯定不要反复才行。
- kafka 集群中的 broker 的数据不失落
每个 broker 中的 partition 咱们个别都会设置有 replication(正本)的个数,生产者写入的时候首先依据散发策略(有 partition 按 partition,有 key 按 key,都没有轮询)写入到 leader 中,follower(正本)再跟 leader 同步数据,这样有了备份,也能够保障音讯数据的不失落。
5. 采集数据为什么抉择 kafka?
采集层 次要能够应用 Flume, Kafka 等技术。
Flume:Flume 是管道流形式,提供了很多的默认实现,让用户通过参数部署,及扩大 API.
Kafka:Kafka 是一个可长久化的分布式的音讯队列。Kafka 是一个十分通用的零碎。你能够有许多生产者和很多的消费者共享多个主题 Topics。
相比之下,Flume 是一个专用工具被设计为旨在往 HDFS,HBase 发送数据。它对 HDFS 有非凡的优化,并且集成了 Hadoop 的平安个性。
所以,Cloudera 倡议如果数据被多个零碎生产的话,应用 kafka;如果数据被设计给 Hadoop 应用,应用 Flume。
6. kafka 重启是否会导致数据失落?
- kafka 是将数据写到磁盘的,个别数据不会失落。
- 然而在重启 kafka 过程中,如果有消费者生产音讯,那么 kafka 如果来不及提交 offset,可能会造成数据的不精确(失落或者反复生产)。
7. kafka 宕机了如何解决?
- 先思考业务是否受到影响
kafka 宕机了,首先咱们思考的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不必放心了。
- 节点排错与复原
想要复原集群的节点,次要的步骤就是通过日志剖析来查看节点宕机的起因,从而解决,从新复原节点。
8. 为什么 Kafka 不反对读写拆散?
在 Kafka 中,生产者写入音讯、消费者读取音讯的操作都是与 leader 正本进行交互的,从 而实现的是一种 主写主读 的生产生产模型。
Kafka 并不反对 主写从读,因为主写从读有 2 个很显著的毛病:
- 数据一致性问题:数据从主节点转到从节点必然会有一个延时的工夫窗口,这个工夫 窗口会导致主从节点之间的数据不统一。某一时刻,在主节点和从节点中 A 数据的值都为 X,之后将主节点中 A 的值批改为 Y,那么在这个变更告诉到从节点之前,利用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不统一的问题。
- 延时问题:相似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程须要经验 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会消耗肯定的工夫。而在 Kafka 中,主从同步会比 Redis 更加耗时,它须要经验 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的利用而言,主写从读的性能并不太实用。
而 kafka 的 主写主读 的长处就很多了:
- 能够简化代码的实现逻辑,缩小出错的可能;
- 将负载粒度细化均摊,与主写从读相比,不仅负载效力更好,而且对用户可控;
- 没有延时的影响;
- 在正本稳固的状况下,不会呈现数据不统一的状况。
9. kafka 数据分区和消费者的关系?
每个分区只能由同一个生产组内的一个消费者 (consumer) 来生产,能够由不同的生产组的消费者来生产,同组的消费者则起到并发的成果。
10. kafka 的数据 offset 读取流程
- 连贯 ZK 集群,从 ZK 中拿到对应 topic 的 partition 信息和 partition 的 Leader 的相干信息
- 连贯到对应 Leader 对应的 broker
- consumer 将⾃自⼰己保留的 offset 发送给 Leader
- Leader 依据 offset 等信息定位到 segment(索引⽂文件和⽇日志⽂文件)
- 依据索引⽂文件中的内容,定位到⽇日志⽂文件中该偏移量量对应的开始地位读取相应⻓长度的数据并返回给 consumer
11. kafka 外部如何保障程序,联合内部组件如何保障消费者的程序?
kafka 只能保障 partition 内是有序的,然而 partition 间的有序是没方法的。爱奇艺的搜寻架构,是从业务上把须要有序的打到同⼀个 partition。
12. Kafka 音讯数据积压,Kafka 生产能力有余怎么解决?
- 如果是 Kafka 生产能力有余,则能够思考减少 Topic 的分区数,并且同时晋升生产组的消费者数量,消费者数 = 分区数。(两者缺一不可)
- 如果是上游的数据处理不及时:进步每批次拉取的数量。批次拉取数据过少(拉取数据 / 解决工夫 < 生产速度),使解决的数据小于生产的数据,也会造成数据积压。
13. Kafka 单条日志传输大小
kafka 对于音讯体的大小默认为单条最大值是 1M 然而在咱们利用场景中, 经常会呈现一条音讯大于 1M,如果不对 kafka 进行配置。则会呈现生产者无奈将音讯推送到 kafka 或消费者无奈去生产 kafka 外面的数据, 这时咱们就要对 kafka 进行以下配置:server.properties
replica.fetch.max.bytes: 1048576 broker 可复制的音讯的最大字节数, 默认为 1M
message.max.bytes: 1000012 kafka 会接管单个音讯 size 的最大限度,默认为 1M 左右
留神:message.max.bytes 必须小于等于 replica.fetch.max.bytes,否则就会导致 replica 之间数据同步失败。
Hbase
1. Hbase 是怎么写数据的?
Client 写入 -> 存入 MemStore,始终到 MemStore 满 -> Flush 成一个 StoreFile,直至增长到肯定阈值 -> 触发 Compact 合并操作 -> 多个 StoreFile 合并成一个 StoreFile,同时进行版本合并和数据删除 -> 当 StoreFiles Compact 后,逐步形成越来越大的 StoreFile -> 单个 StoreFile 大小超过肯定阈值后(默认 10G),触发 Split 操作,把以后 Region Split 成 2 个 Region,Region 会下线,新 Split 出的 2 个孩子 Region 会被 HMaster 调配到相应的 HRegionServer 上,使得原先 1 个 Region 的压力得以分流到 2 个 Region 上
由此过程可知,HBase 只是减少数据,没有更新和删除操作,用户的更新和删除都是逻辑层面的,在物理层面,更新只是追加操作,删除只是标记操作。
用户写操作只须要进入到内存即可立刻返回,从而保障 I / O 高性能。
2. HDFS 和 HBase 各自应用场景
首先一点须要明确:Hbase 是基于 HDFS 来存储的。
HDFS:
- 一次性写入,屡次读取。
- 保证数据的一致性。
- 次要是能够部署在许多便宜机器中,通过多正本进步可靠性,提供了容错和复原机制。
HBase:
- 霎时写入量很大,数据库不好撑持或须要很高老本撑持的场景。
- 数据须要短暂保留,且量会长久增长到比拟大的场景。
- HBase 不实用与有 join,多级索引,表关系简单的数据模型。
- 大数据量(100s TB 级数据)且有疾速随机拜访的需要。如:淘宝的交易历史记录。数据量微小无容置疑,面向普通用户的申请必然要即时响应。
- 业务场景简略,不须要关系数据库中很多个性(例如穿插列、穿插表,事务,连贯等等)。
3. Hbase 的存储构造
Hbase 中的每张表都通过行键 (rowkey) 依照肯定的范畴被宰割成多个子表(HRegion),默认一个 HRegion 超过 256M 就要被宰割成两个,由 HRegionServer 治理,治理哪些 HRegion 由 Hmaster 调配。HRegion 存取一个子表时,会创立一个 HRegion 对象,而后对表的每个列族(Column Family)创立一个 store 实例,每个 store 都会有 0 个或多个 StoreFile 与之对应,每个 StoreFile 都会对应一个 HFile,HFile 就是理论的存储文件,一个 HRegion 还领有一个 MemStore 实例。
4. 热点景象(数据歪斜)怎么产生的,以及解决办法有哪些
热点景象:
某个小的时段内,对 HBase 的读写申请集中到极少数的 Region 上,导致这些 region 所在的 RegionServer 解决申请量骤增,负载量显著偏大,而其余的 RgionServer 显著闲暇。
热点景象呈现的起因:
HBase 中的行是依照 rowkey 的字典程序排序的,这种设计优化了 scan 操作,能够将相干的行以及会被一起读取的行存取在邻近地位,便于 scan。然而蹩脚的 rowkey 设计是热点的源头。
热点产生在大量的 client 间接拜访集群的一个或极少数个节点(拜访可能是读,写或者其余操作)。大量拜访会使热点 region 所在的单个机器超出本身承受能力,引起性能降落甚至 region 不可用,这也会影响同一个 RegionServer 上的其余 region,因为主机无奈服务其余 region 的申请。
热点景象解决办法:
为了防止写热点,设计 rowkey 使得不同行在同一个 region,然而在更多数据状况下,数据应该被写入集群的多个 region,而不是一个。常见的办法有以下这些:
- 加盐:在 rowkey 的后面减少随机数,使得它和之前的 rowkey 的结尾不同。调配的前缀品种数量应该和你想应用数据扩散到不同的 region 的数量统一。加盐之后的 rowkey 就会依据随机生成的前缀扩散到各个 region 上,以防止热点。
- 哈希:哈希能够使负载扩散到整个集群,然而读却是能够预测的。应用确定的哈希能够让客户端重构残缺的 rowkey,能够应用 get 操作精确获取某一个行数据
- 反转:第三种避免热点的办法时反转固定长度或者数字格局的 rowkey。这样能够使得 rowkey 中常常扭转的局部(最没有意义的局部)放在后面。这样能够无效的随机 rowkey,然而就义了 rowkey 的有序性。反转 rowkey 的例子以手机号为 rowkey,能够将手机号反转后的字符串作为 rowkey,这样的就防止了以手机号那样比拟固定结尾导致热点问题
-
工夫戳反转 :一个常见的数据处理问题是疾速获取数据的最近版本,应用反转的工夫戳作为 rowkey 的一部分对这个问题非常有用,能够用 Long.Max_Value – timestamp 追加到 key 的开端,例如 key,[key] 的最新值能够通过 scan [key]取得 [key] 的第一条记录,因为 HBase 中 rowkey 是有序的,第一条记录是最初录入的数据。
- 比方须要保留一个用户的操作记录,依照操作工夫倒序排序,在设计 rowkey 的时候,能够这样设计[userId 反转]
[Long.Max_Value – timestamp],在查问用户的所有操作记录数据的时候,间接指定反转后的 userId,startRow 是 userId 反转,stopRow 是 userId 反转 - 如果须要查问某段时间的操作记录,startRow 是 user 反转,stopRow 是 userId 反转
- 比方须要保留一个用户的操作记录,依照操作工夫倒序排序,在设计 rowkey 的时候,能够这样设计[userId 反转]
- HBase 建表预分区:创立 HBase 表时,就事后依据可能的 RowKey 划分出多个 region 而不是默认的一个,从而能够将后续的读写操作负载平衡到不同的 region 上,防止热点景象。
5. HBase 的 rowkey 设计准则
长度准则:100 字节以内,8 的倍数最好,可能的状况下越短越好。因为 HFile 是依照 keyvalue 存储的,过长的 rowkey 会影响存储效率;其次,过长的 rowkey 在 memstore 中较大,影响缓冲成果,升高检索效率。最初,操作系统大多为 64 位,8 的倍数,充分利用操作系统的最佳性能。
散列准则:高位散列,低位工夫字段。防止热点问题。
惟一准则:分利用这个排序的特点,将常常读取的数据存储到一块,将最近可能会被拜访 的数据放到一块。
6. HBase 的列簇设计
准则:在正当范畴内能尽量少的缩小列簇就尽量减少列簇,因为列簇是共享 region 的,每个列簇数据相差太大导致查问效率低下。
最优:将所有相关性很强的 key-value 都放在同一个列簇下,这样既能做到查问效率最高,也能放弃尽可能少的拜访不同的磁盘文件。以用户信息为例,能够将必须的根本信息寄存在一个列族,而一些附加的额定信息能够放在另一列族。
7. HBase 中 compact 用处是什么,什么时候触发,分为哪两种,有什么区别
在 hbase 中每当有 memstore 数据 flush 到磁盘之后,就造成一个 storefile,当 storeFile 的数量达到肯定水平后,就须要将 storefile 文件来进行 compaction 操作。
Compact 的作用:
- 合并文件
- 革除过期,多余版本的数据
- 进步读写数据的效率
4
HBase 中实现了两种 compaction 的形式:minor and major. 这两种 compaction 形式的
区别是: - Minor 操作只用来做局部文件的合并操作以及包含 minVersion=0 并且设置 ttl 的过
期版本清理,不做任何删除数据、多版本数据的清理工作。 - Major 操作是对 Region 下的 HStore 下的所有 StoreFile 执行合并操作,最终的后果
是整顿合并出一个文件。
Flink
1. 简略介绍一下 Flink
Flink 是一个面向 流解决 和批处理 的分布式数据计算引擎,可能基于同一个 Flink 运行,能够提供流解决和批处理两种类型的性能。在 Flink 的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界线的流:这就是所谓的有界流和无界流。
2. Flink 的运行必须依赖 Hadoop 组件吗
Flink 能够齐全独立于 Hadoop,在不依赖 Hadoop 组件下运行。然而做为大数据的基础设施,Hadoop 体系是任何大数据框架都绕不过来的。Flink 能够集成泛滥 Hadooop 组件,例如 Yarn、Hbase、HDFS 等等。例如,Flink 能够和 Yarn 集成做资源调度,也能够读写 HDFS,或者利用 HDFS 做检查点。
3. Flink 集群运行时角色
Flink 运行时由两种类型的过程组成:一个 JobManager 和一个或者多个 TaskManager。
Client 不是运行时和程序执行的一部分,而是用于筹备数据流并将其发送给 JobManager。之后,客户端能够断开连接(拆散模式),或放弃连贯来接管过程报告(附加模式)。客户端能够作为触发执行 Java/Scala 程序的一部分运行,也能够在命令行过程 ./bin/flink run ...
中运行。
能够通过多种形式启动 JobManager 和 TaskManager:间接在机器上作为 standalone 集群启动、在容器中启动、或者通过 YARN 等资源框架治理并启动。TaskManager 连贯到 JobManagers,发表本人可用,并被调配工作。
JobManager:
JobManager 具备许多与协调 Flink 应用程序的分布式执行无关的职责:它决定何时调度下一个 task(或一组 task)、对实现的 task 或执行失败做出反馈、协调 checkpoint、并且协调从失败中复原等等。这个过程由三个不同的组件组成:
- ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、调配,治理 task slots。
- Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
- JobMaster
JobMaster 负责管理单个 JobGraph 的执行。Flink 集群中能够同时运行多个作业,每个作业都有本人的 JobMaster。
TaskManagers:
TaskManager(也称为 worker)执行作业流的 task,并且缓存和替换数据流。
必须始终至多有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量示意并发解决 task 的数量。请留神一个 task slot 中能够执行多个算子。
4. Flink 相比 Spark Streaming 有什么区别
1. 架构模型
Spark Streaming 在运行时的次要角色包含:Master、Worker、Driver、Executor,Flink 在运行时次要蕴含:Jobmanager、Taskmanager 和 Slot。
2. 任务调度
Spark Streaming 连续不断的生成渺小的数据批次,构建有向无环图 DAG,Spark Streaming 会顺次创立 DStreamGraph、JobGenerator、JobScheduler。
Flink 依据用户提交的代码生成 StreamGraph,通过优化生成 JobGraph,而后提交给 JobManager 进行解决,JobManager 会依据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最外围的数据结构,JobManager 依据 ExecutionGraph 对 Job 进行调度。
3. 工夫机制
Spark Streaming 反对的工夫机制无限,只反对解决工夫。Flink 反对了流处理程序在工夫上的三个定义:解决工夫、事件工夫、注入工夫。同时也反对 watermark 机制来解决滞后数据。
4. 容错机制
对于 Spark Streaming 工作,咱们能够设置 checkpoint,而后如果产生故障并重启,咱们能够从上次 checkpoint 之处复原,然而这个行为只能使得数据不失落,可能会反复解决,不能做到恰一次解决语义。
Flink 则应用两阶段提交协定来解决这个问题。
5. 介绍下 Flink 的容错机制(checkpoint)
Checkpoint 机制是 Flink 可靠性的基石,能够保障 Flink 集群在某个算子因为某些起因 (如 异样退出) 呈现故障时,可能将整个利用流图的状态复原到故障之前的某一状态,保障利用流图状态的一致性。Flink 的 Checkpoint 机制原理来自“Chandy-Lamport algorithm”算法。
每个须要 Checkpoint 的利用在启动时,Flink 的 JobManager 为其创立一个 CheckpointCoordinator(检查点协调器),CheckpointCoordinator 全权负责本利用的快照制作。
CheckpointCoordinator(检查点协调器),CheckpointCoordinator 全权负责本利用的快照制作。
1) CheckpointCoordinator(检查点协调器) 周期性的向该流利用的所有 source 算子发送 barrier(屏障)。
2) 当某个 source 算子收到一个 barrier 时,便暂停数据处理过程,而后将本人的以后状态制作成快照,并保留到指定的长久化存储中,最初向 CheckpointCoordinator 报告本人快照制作状况,同时向本身所有上游算子播送该 barrier,复原数据处理
3) 上游算子收到 barrier 之后,会暂停本人的数据处理过程,而后将本身的相干状态制作成快照,并保留到指定的长久化存储中,最初向 CheckpointCoordinator 报告本身快照状况,同时向本身所有上游算子播送该 barrier,复原数据处理。
4) 每个算子依照步骤 3 一直制作快照并向上游播送,直到最初 barrier 传递到 sink 算子,快照制作实现。
5) 当 CheckpointCoordinator 收到所有算子的报告之后,认为该周期的快照制作胜利; 否则,如果在规定的工夫内没有收到所有算子的报告,则认为本周期快照制作失败。
文章举荐:
Flink 可靠性的基石 -checkpoint 机制具体解析
6. Flink checkpoint 与 Spark Streaming 的有什么区别或劣势吗
spark streaming 的 checkpoint 仅仅是针对 driver 的故障复原做了数据和元数据的 checkpoint。而 flink 的 checkpoint 机制 要简单了很多,它采纳的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。
7. Flink 是如何保障 Exactly-once 语义的
Flink 通过实现 两阶段提交 和状态保留来实现端到端的一致性语义。分为以下几个步骤:
开始事务(beginTransaction)创立一个长期文件夹,来写把数据写入到这个文件夹外面
预提交(preCommit)将内存中缓存的数据写入文件并敞开
正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些提早
抛弃(abort)抛弃临时文件
若失败产生在预提交胜利后,正式提交前。能够依据状态来提交预提交的数据,也可删除预提交的数据。
两阶段提交协定详解:八张图搞懂 Flink 的 Exactly-once
8. 如果上级存储不反对事务,Flink 怎么保障 exactly-once
端到端的 exactly-once 对 sink 要求比拟高,具体实现次要有幂等写入和事务性写入两种形式。
幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种形式。
如果内部零碎不反对事务,那么能够用预写日志的形式,把后果数据先当成状态保留,而后在收到 checkpoint 实现的告诉时,一次性写入 sink 零碎。
9. Flink 罕用的算子有哪些
分两局部:
- 数据读取,这是 Flink 流计算利用的终点,罕用算子有:
- 从内存读:fromElements
- 从文件读:readTextFile
- Socket 接入:socketTextStream
- 自定义读取:createInput
- 解决数据的算子,罕用的算子包含:Map(单输出单输入)、FlatMap(单输出、多输入)、Filter(过滤)、KeyBy(分组)、Reduce(聚合)、Window(窗口)、Connect(连贯)、Split(宰割)等。
举荐浏览:一文学完 Flink 流计算罕用算子(Flink 算子大全)
10. Flink 工作延时高,如何动手
在 Flink 的后台任务治理中,咱们能够看到 Flink 的哪个算子和 task 呈现了反压。最次要的伎俩是资源调优和算子调优。资源调优即是对作业中的 Operator 的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包含:并行度的设置,State 的设置,checkpoint 的设置。
11. Flink 是如何解决反压的
Flink 外部是基于 producer-consumer 模型来进行消息传递的,Flink 的反压设计也是基于这个模型。Flink 应用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。上游消费者生产变慢,上游就会受到阻塞。
12. 如何排查生产环境中的反压问题
1. 反压呈现的场景
反压经常出现在促销、热门流动等场景。短时间内流量陡增造成数据的沉积或者生产速度变慢。
它们有一个独特的特点:数据的生产速度小于数据的生产速度。
2. 反压监控办法
通过 Flink Web UI 发现反压问题。
Flink 的 TaskManager 会每隔 50 ms 触发一次反压状态监测,共监测 100 次,并将计算结果反馈给 JobManager,最初由 JobManager 进行计算反压的比例,而后进行展现。
这个比例展现逻辑如下:
OK: 0 <= Ratio <= 0.10,示意状态良好正;
LOW: 0.10 < Ratio <= 0.5,示意有待察看;
HIGH: 0.5 < Ratio <= 1,示意要解决了(减少并行度 /subTask/ 查看是否有数据歪斜 / 减少内存)。
0.01,代表 100 次中有一次阻塞在外部调用。
3. flink 反压的实现形式
Flink 工作的组成由根本的“流”和“算子”形成,“流”中的数据在“算子”间进行计算和转换时,会被放入分布式的阻塞队列中。当消费者的阻塞队列满时,则会升高生产者的数据生产速度
4. 反压问题定位和解决
Flink 会因为数据沉积和处理速度变慢导致 checkpoint 超时,而 checkpoint 是 Flink 保证数据一致性的关键所在,最终会导致数据的不统一产生。
数据歪斜:能够在 Flink 的后盾治理页面看到每个 Task 解决数据的大小。当数据歪斜呈现时,通常是简略地应用相似 KeyBy 等分组聚合函数导致的,须要用户将热点 Key 进行预处理,升高或者打消热点 Key 的影。
GC:不合理的设置 TaskManager 的垃圾回收参数会导致重大的 GC 问题,咱们能够通过 -XX:+PrintGCDetails
参数查看 GC 的日志。
代码自身:开发者谬误地应用 Flink 算子,没有深刻理解算子的实现机制导致性能问题。咱们能够通过查看运行机器节点的 CPU 和内存状况定位问题。
13. Flink 中的状态存储
Flink 在做计算的过程中常常须要存储中间状态,来防止数据失落和状态复原。抉择的状态存储策略不同,会影响状态长久化如何和 checkpoint 交互。Flink 提供了三种状态存储形式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
14. Operator Chains(算子链)这个概念你理解吗
为了更高效地分布式执行,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起造成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是十分无效的优化:它能缩小线程之间的切换,缩小音讯的序列化 / 反序列化,缩小数据在缓冲区的替换,缩小了提早的同时进步整体的吞吐量。这就是咱们所说的算子链。
15. Flink 的内存治理是如何做的
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预调配的内存块上。此外,Flink 大量的应用了堆外内存。如果须要解决的数据超出了内存限度,则会将局部数据存储到硬盘上。Flink 为了间接操作二进制数据实现了本人的序列化框架。
16. 如何解决生产环境中的数据歪斜问题
1. flink 数据歪斜的体现:
工作节点频繁呈现反压,减少并行度也不能解决问题;
局部节点呈现 OOM 异样,是因为大量的数据集中在某个节点上,导致该节点内存被爆,工作失败重启。
2. 数据歪斜产生的起因:
业务上有重大的数据热点,比方滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其余地区;
技术上大量应用了 KeyBy、GroupBy 等操作,谬误的应用了分组 Key,人为产生数据热点。
3. 解决问题的思路:
业务上要尽量避免热点 key 的设计,例如咱们能够把北京、上海等热点城市分成不同的区域,并进行独自解决;
技术上呈现热点时,要调整计划打散原来的 key,防止间接聚合;此外 Flink 还提供了大量的性能能够防止数据歪斜。
17. Flink 中的 Time 有哪几种
Flink 中的工夫有三种类型,如下图所示:
- Event Time:是事件创立的工夫。它通常由事件中的工夫戳形容,例如采集的日志数据中,每一条日志都会记录本人的生成工夫,Flink 通过工夫戳分配器拜访事件工夫戳。
- Ingestion Time:是数据进入 Flink 的工夫。
- Processing Time:是每一个执行基于工夫操作的算子的本地零碎工夫,与机器相干,默认的工夫属性就是 Processing Time。
例如,一条日志进入 Flink 的工夫为2021-01-22 10:00:00.123
,达到 Window 的零碎工夫为2021-01-22 10:00:01.234
,日志的内容如下:2021-01-06 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计 1min 内的故障日志个数,哪个工夫是最有意义的?—— eventTime,因为咱们要依据日志的生成工夫进行统计。
18. Flink 对于早退数据是怎么解决的
Flink 中 WaterMark 和 Window 机制解决了流式数据的乱序问题,对于因为提早而程序有误的数据,能够依据 eventTime 进行业务解决,对于提早的数据 Flink 也有本人的解决办法,次要的方法是给定一个容许提早的工夫,在该工夫范畴内仍能够承受解决提早数据
设置容许提早的工夫是通过 allowedLateness(lateness: Time)设置
保留提早数据则是通过 sideOutputLateData(outputTag: OutputTag[T])保留
获取提早数据是通过 DataStream.getSideOutput(tag: OutputTag[X])获取
文章举荐:
Flink 中极其重要的 Time 与 Window 具体解析
19. Flink 中 window 呈现数据歪斜怎么解决
window 产生数据歪斜指的是数据在不同的窗口内沉积的数据量相差过多。实质上产生这种状况的起因是数据源头发送的数据量速度不同导致的。呈现这种状况个别通过两种形式来解决:
- 在数据进入窗口前做预聚合
- 从新设计窗口聚合的 key
20. Flink CEP 编程中当状态没有达到的时候会将数据保留在哪里
在流式解决中,CEP 当然是要反对 EventTime 的,那么绝对应的也要反对数据的早退景象,也就是 watermark 的解决逻辑。CEP 对未匹配胜利的事件序列的解决,和早退数据是相似的。在 Flink CEP 的解决逻辑中,状态没有满足的和早退的数据,都会存储在一个 Map 数据结构中,也就是说,如果咱们限定判断事件序列的时长为 5 分钟,那么内存中就会存储 5 分钟的数据,这在我看来,也是对内存的极大伤害之一。
举荐浏览:一文学会 Flink CEP
21. Flink 设置并行度的形式
们在理论生产环境中能够从四个不同层面设置并行度:
-
操作算子层面(Operator Level)
.map(new RollingAdditionMapper()).setParallelism(10) // 将操作算子设置并行度
-
执行环境层面(Execution Environment Level)
$FLINK_HOME/bin/flink 的 - p 参数批改并行度
-
客户端层面(Client Level)
env.setParallelism(10)
- 零碎层面(System Level)
全局配置在 flink-conf.yaml 文件中,parallelism.default,默认是 1:能够设置默认值大一点
须要留神的优先级:算子层面 > 环境层面 > 客户端层面 > 零碎层面。
22. Flink 中 Task 如何做到数据交换
在一个 Flink Job 中,数据须要在不同的 task 中进行替换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲 buffer 中收集 records,而后再发送。Records 并不是一个一个被发送的,是积攒一个批次再发送,batch 技术能够更加高效的利用网络资源。
23. Flink 的内存治理是如何做的
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预调配的内存块上。此外,Flink 大量的应用了堆外内存。如果须要解决的数据超出了内存限度,则会将局部数据存储到硬盘上。Flink 为了间接操作二进制数据实现了本人的序列化框架。
24. 介绍下 Flink 的序列化
Flink 摒弃了 Java 原生的序列化办法,以独特的形式解决数据类型和序列化,蕴含本人的类型描述符,泛型类型提取和类型序列化框架。
TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些根本属性,并且能够生成序列化器。
TypeInformation 反对以下几种类型:
- BasicTypeInfo: 任意 Java 根本类型或 String 类型
- BasicArrayTypeInfo: 任意 Java 根本类型数组或 String 数组
- WritableTypeInfo: 任意 Hadoop Writable 接口的实现类
- TupleTypeInfo: 任意的 Flink Tuple 类型(反对 Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的 Java Tuple 实现
- CaseClassTypeInfo: 任意的 Scala CaseClass(包含 Scala tuples)
- PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java 对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 办法
- GenericTypeInfo: 任意无奈匹配之前几种类型的类
25. Flink 海量数据高效去重
- 基于状态后端。
- 基于 HyperLogLog:不是精准的去重。
- 基于布隆过滤器(BloomFilter);疾速判断一个 key 是否存在于某容器,不存在就间接返回。
- 基于 BitMap;用一个 bit 位来标记某个元素对应的 Value,而 Key 即是该元素。因为采纳了 Bit 为单位来存储数据,因而能够大大节俭存储空间。
- 基于内部数据库;抉择应用 Redis 或者 HBase 存储数据,咱们只须要设计好存储的 Key 即可,不须要关怀 Flink 工作重启造成的状态失落问题。
26. Flink SQL 的是如何实现的
构建形象语法树的事件交给了 Calcite 去做。SQL query 会通过 Calcite 解析器转变成 SQL 节点树,通过验证后构建成 Calcite 的形象语法树(也就是图中的 Logical Plan)。另一边,Table API 上的调用会构建成 Table API 的形象语法树,并通过 Calcite 提供的 RelBuilder 转变成 Calcite 的形象语法树。而后顺次被转换成逻辑执行打算和物理执行打算。
在提交工作后会散发到各个 TaskManager 中运行,在运行时会应用 Janino 编译器编译代码后运行。
业务方面
1. ODS 层采纳什么压缩形式和存储格局?
压缩采纳Snappy,存储采纳orc,压缩比是 100g 数据压缩完 10g 左右。
2. DWD 层做了哪些事?
1) 数据荡涤
- 空值去除
- 过滤外围字段无意义的数据,比方订单表中订单 id 为 null,领取表中领取 id 为空
- 对手机号、身份证号等敏感数据脱敏
- 对业务数据传过来的表进行维度进化和降维。
- 将用户行为宽表和业务表进行数据一致性解决
2) 荡涤的伎俩
- Sql、mr、rdd、kettle、Python(我的项目中采纳 sql 进行革除)
3. DWS 层做了哪些事?
1) DWS 层有 3 - 5 张宽表(解决 100-200 个指标 70% 以上的需要)
具体宽表名称:用户行为宽表,用户购买商品明细行为宽表,商品宽表,购物车宽表,物流宽表、登录注册、售后等。
2) 哪个宽表最宽?大略有多少个字段?
最宽的是用户行为宽表。大略有 60-100 个字段
1. 在解决大数据过程中,如何保障失去期望值
- 保障在数据采集的时候不失落数据,这个尤为重要,如果在数据采集的时候就曾经不精确,前面很难达到期望值
- 在数据处理的时候不失落数据,例如 sparkstreaming 解决 kafka 数据的时候,要保证数据不失落,这个尤为重要
- 前两步中,如果无奈保证数据的完整性,那么就要通过离线计算进行数据的校对,这样能力保障咱们可能失去期望值
2. 你感觉数仓建设中最重要的是什么
数仓建设中,最重要的是数据准确性,数据的真正价值在于数据驱动决策,通过数据领导经营,在一个不精确的数据驱动下,失去的肯定是谬误的数据分析,影响的是公司的业务倒退决策,最终导致公司的策略调控失败。
3. 数据仓库建模怎么做的
数仓建设中最罕用模型 –Kimball 维度建模详解
4. 数据品质怎么监控
单表数据量监控
一张表的记录数在一个已知的范畴内,或者高低浮动不会超过某个阈值
- SQL 后果:var 数据量 = select count(*)from 表 where 工夫等过滤条件
- 报警触发条件设置:如果数据量不在[数值上限, 数值下限],则触发报警
- 同比增加:如果 ((本周的数据量 - 上周的数据量)/ 上周的数据量 *100) 不在 [比例下线,比例下限],则触发报警
- 环比减少:如果 ((明天的数据量 – 昨天的数据量)/ 昨天的数据量 *100) 不在 [比例下线,比例下限],则触发报警
- 报警触发条件设置肯定要有。如果没有配置的阈值,不能做监控
日活、周活、月活、留存(日周月)、转化率(日、周、月)GMV(日、周、月)
复购率(日周月)
单表空值检测
某个字段为空的记录数在一个范畴内,或者占总量的百分比在某个阈值范畴内
- 指标字段:抉择要监控的字段,不能选“无”
- SQL 后果:var 异样数据量 = select count(*) from 表 where 指标字段 is null
- 单次检测:如果 (异样数据量) 不在[数值上限, 数值下限],则触发报警
单表反复值检测
一个或多个字段是否满足某些规定
- 指标字段:第一步先失常统计条数;select count(*) form 表;
- 第二步,去重统计;select count(*) from 表 group by 某个字段
- 第一步的值和第二步的值做减法,看是否在高低线阀值之内
- 单次检测:如果 (异样数据量) 不在[数值上限, 数值下限],则触发报警
跨表数据量比照
次要针对同步流程,监控两张表的数据量是否统一
- SQL 后果:count(本表) – count(关联表)
- 阈值配置与“空值检测”雷同
5. 数据分析方法论理解过哪些?
数据商业剖析的指标是利用大数据为所有职场人员做出迅捷,高质,高效的决策提供可规模化的解决方案。商业剖析是发明价值的数据迷信。
数据商业剖析中会存在很多判断:
- 察看数据以后产生了什么?
比方想晓得线上渠道 A、B 各自带来了多少流量,新上线的产品有多少用户喜爱,新注册流中注册的人数有多少。这些都须要通过数据来展现后果。
- 了解为什么产生?
咱们须要晓得渠道 A 为什么比渠道 B 好,这些是要通过数据去发现的。兴许某个关键字带来的流量转化率比其余都要低,这时能够通过信息、常识、数据积淀出产生的起因是什么。
- 预测将来会产生什么?
在对渠道 A、B 有了判断之后,依据以往的常识预测将来会产生什么。在投放渠道 C、D 的时候,猜想渠道 C 比渠道 D 好,当上线新的注册流、新的优化,能够晓得哪一个节点比拟容易出问题,这些都是通过数据进行预测的过程。
- 商业决策
所有工作中最有意义的还是商业决策,通过数据来判断应该做什么。这是商业剖析最终的目标。
算法
大数据面试中考查的算法绝对容易一些,常考的有排序算法,查找算法,二叉树等,上面解说一些最容易考的算法。
1. 排序算法
十种常见排序算法能够分为两大类:
- 比拟类排序:通过比拟来决定元素间的绝对秩序,因为其工夫复杂度不能冲破 O(nlogn),因而也称为非线性工夫比拟类排序。
- 非比拟类排序:不通过比拟来决定元素间的绝对秩序,它能够冲破基于比拟排序的工夫下界,以线性工夫运行,因而也称为线性工夫非比拟类排序。
算法复杂度:
相干概念:
- 稳固:如果 a 本来在 b 后面,而 a =b,排序之后 a 依然在 b 的后面。
- 不稳固:如果 a 本来在 b 的后面,而 a =b,排序之后 a 可能会呈现在 b 的前面。
- 工夫复杂度:对排序数据的总的操作次数。反映当 n 变动时,操作次数出现什么法则。
- 空间复杂度:是指算法在计算机内执行时所需存储空间的度量,它也是数据规模 n 的函数。
上面解说大数据中最常考的两种:快排和归并
1) 疾速排序
疾速排序的根本思维:通过一趟排序将待排记录分隔成独立的两局部,其中一部分记录的关键字均比另一部分的关键字小,则可别离对这两局部记录持续进行排序,以达到整个序列有序。
算法形容
疾速排序应用分治法来把一个串(list)分为两个子串(sub-lists)。具体算法形容如下:
- 从数列中挑出一个元素,称为“基准”(pivot);
- 从新排序数列,所有元素比基准值小的摆放在基准后面,所有元素比基准值大的摆在基准的前面(雷同的数能够到任一边)。在这个分区退出之后,该基准就处于数列的两头地位。这个称为分区(partition)操作;
- 递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序。
代码实现:
function quickSort(arr, left, right) {
var len = arr.length,
partitionIndex,
left = typeof left != 'number' ? 0 : left,
right = typeof right != 'number' ? len - 1 : right;
if (left < right) {partitionIndex = partition(arr, left, right);
quickSort(arr, left, partitionIndex-1);
quickSort(arr, partitionIndex+1, right);
}
return arr;
}
function partition(arr, left ,right) { // 分区操作
var pivot = left, // 设定基准值(pivot)index = pivot + 1;
for (var i = index; i <= right; i++) {if (arr[i] < arr[pivot]) {swap(arr, i, index);
index++;
}
}
swap(arr, pivot, index - 1);
return index-1;
}
function swap(arr, i, j) {var temp = arr[i];
arr[i] = arr[j];
arr[j] = temp;
}
2) 归并排序
归并排序是建设在归并操作上的一种无效的排序算法。该算法是采纳分治法(Divide and Conquer)的一个十分典型的利用。将已有序的子序列合并,失去齐全有序的序列;即先使每个子序列有序,再使子序列段间有序。若将两个有序表合并成一个有序表,称为 2 - 路归并。
算法形容
- 把长度为 n 的输出序列分成两个长度为 n / 2 的子序列;
- 对这两个子序列别离采纳归并排序;
- 将两个排序好的子序列合并成一个最终的排序序列。
代码实现:
function mergeSort(arr) {
var len = arr.length;
if (len < 2) {return arr;}
var middle = Math.floor(len / 2),
left = arr.slice(0, middle),
right = arr.slice(middle);
return merge(mergeSort(left), mergeSort(right));
}
function merge(left, right) {var result = [];
while (left.length>0 && right.length>0) {if (left[0] <= right[0]) {result.push(left.shift());
} else {result.push(right.shift());
}
}
while (left.length)
result.push(left.shift());
while (right.length)
result.push(right.shift());
return result;
}
2. 查找算法
七大查找算法:1. 程序查找、2. 二分查找、3. 插值查找、4. 斐波那契查找、5. 树表查找、6. 分块查找、7. 哈希查找
这些查找算法中 二分查找 是最容易考查的,上面解说二分查找算法。
1) 二分查找
二分查找也称折半查找(Binary Search),它是一种效率较高的查找办法。然而,折半查找要求线性表必须采纳顺序存储构造,而且表中元素按关键字有序排列,留神必须要是有序排列。
代码实现:
- 应用递归
/**
* 应用递归的二分查找
*title:recursionBinarySearch
*@param arr 有序数组
*@param key 待查找关键字
*@return 找到的地位
*/
public static int recursionBinarySearch(int[] arr,int key,int low,int high){if(key < arr[low] || key > arr[high] || low > high){return -1;}
int middle = (low + high) / 2; // 初始两头地位
if(arr[middle] > key){
// 比关键字大则关键字在左区域
return recursionBinarySearch(arr, key, low, middle - 1);
}else if(arr[middle] < key){
// 比关键字小则关键字在右区域
return recursionBinarySearch(arr, key, middle + 1, high);
}else {return middle;}
}
- 不应用递归实现(while 循环)
/**
* 不应用递归的二分查找
*title:commonBinarySearch
*@param arr
*@param key
*@return 关键字地位
*/
public static int commonBinarySearch(int[] arr,int key){
int low = 0;
int high = arr.length - 1;
int middle = 0; // 定义 middle
if(key < arr[low] || key > arr[high] || low > high){return -1;}
while(low <= high){middle = (low + high) / 2;
if(arr[middle] > key){
// 比关键字大则关键字在左区域
high = middle - 1;
}else if(arr[middle] < key){
// 比关键字小则关键字在右区域
low = middle + 1;
}else{return middle;}
}
return -1; // 最初依然没有找到,则返回 -1
}
3. 二叉树实现及遍历
定义:二叉树,是一种非凡的树,二叉树的任意一个节点的度都不大于 2,不蕴含度的节点称之为叶子。
遍历形式:二叉树的遍历形式有三种,中序遍历,先序遍历,后序遍历。
将一个数组中的数以二叉树的存储构造存储,并遍历打印:
代码实现:
import java.util.ArrayList;
import java.util.List;
public class bintree {
public bintree left;
public bintree right;
public bintree root;
// 数据域
private Object data;
// 存节点
public List<bintree> datas;
public bintree(bintree left, bintree right, Object data){
this.left=left;
this.right=right;
this.data=data;
}
// 将初始的左右孩子值为空
public bintree(Object data){this(null,null,data);
}
public bintree() {}
public void creat(Object[] objs){datas=new ArrayList<bintree>();
// 将一个数组的值顺次转换为 Node 节点
for(Object o:objs){datas.add(new bintree(o));
}
// 第一个数为根节点
root=datas.get(0);
// 建设二叉树
for (int i = 0; i <objs.length/2; i++) {
// 左孩子
datas.get(i).left=datas.get(i*2+1);
// 右孩子
if(i*2+2<datas.size()){// 防止偶数的时候 下标越界
datas.get(i).right=datas.get(i*2+2);
}
}
}
// 先序遍历
public void preorder(bintree root){if(root!=null){System.out.println(root.data);
preorder(root.left);
preorder(root.right);
}
}
// 中序遍历
public void inorder(bintree root){if(root!=null){inorder(root.left);
System.out.println(root.data);
inorder(root.right);
}
}
// 后序遍历
public void afterorder(bintree root){if(root!=null){System.out.println(root.data);
afterorder(root.left);
afterorder(root.right);
}
}
public static void main(String[] args) {bintree bintree=new bintree();
Object []a={2,4,5,7,1,6,12,32,51,22};
bintree.creat(a);
bintree.preorder(bintree.root);
}
}
参考
2022 年最弱小数据面试宝典 PDF 版