共计 17996 个字符,预计需要花费 45 分钟才能阅读完成。
MapReduce
Google File System 提供了大数据存储的计划,这也为起初 HDFS 提供了理论依据,然而在大数据存储之上的大数据计算则不得不提到 MapReduce。
尽管当初通过框架的一直倒退,MapReduce 曾经慢慢的淡出人们的视线,越来越多的框架提供了简略的 SQL 语法来进行大数据计算。然而,MapReduce 所提供的编程模型为这所有奠定了根底,所以 Google 的这篇 MapReduce 论文值得咱们去认真的研读。
摘要
MapReduce 是一个编程模型,也是一个解决和生成超大数据集的算法模型的相干实现。用户首先创立一个 Map 函数解决一个基于 key/value pair 的数据汇合,输入两头的基于 key/value pair 的数据汇合;而后再创立一个 Reduce 函数用来合并所有的具备雷同两头 key 值的两头 value 值。事实世界中有很多满足上述解决模型的例子,本论文将详细描述这个模型。
MapReduce 架构的程序可能在大量的一般配置的计算机上实现并行化解决。这个零碎在运行时只关怀:
如何宰割输出数据,在大量计算机组成的集群上的调度,集群中计算机的错误处理,治理集群中计算机之间必要的通信。采纳 MapReduce 架构能够使那些没有并行计算和分布式解决零碎开发教训的程序员无效利用分布式系统的丰盛资源。咱们的 MapReduce 实现运行在规模能够灵便调整的由一般机器组成的集群上:一个典型的 MapReduce 计算往往由几千台机器组成、解决以 TB 计算的数据。程序员发现这个零碎十分好用:曾经实现了数以百计的 MapReduce 程序,在 Google 的集群上,每天都有 1000 多个 MapReduce 程序在执行。
1 介绍
在过来的 5 年里,包含本文作者在内的 Google 的很多程序员,为了解决海量的原始数据,曾经实现了数以百计的、专用的计算方法。这些计算方法用来解决大量的原始数据,比方,文档抓取(相似网络爬虫的程序)、Web 申请日志等等;也为了计算解决各种类型的衍生数据,比方倒排索引、Web 文档的图构造的各种示意局势、每台主机上网络爬虫抓取的页面数量的汇总、每天被申请的最多的查问的汇合等等。大多数这样的数据处理运算在概念上很容易了解。然而因为输出的数据量微小,因而要想在可承受的工夫内实现运算,只有将这些计算散布在成千盈百的主机上。如何解决并行计算、如何散发数据、如何处理错误?所有这些问题综合在一起,须要大量的代码解决,因而也使得本来简略的运算变得难以解决。
为了解决上述简单的问题,咱们设计一个新的形象模型,应用这个形象模型,咱们只有表述咱们想要执行的简略运算即可,而不用关怀并行计算、容错、数据分布、负载平衡等简单的细节,这些问题都被封装在了一个库外面。设计这个形象模型的灵感来自 Lisp 和许多其余函数式语言的 Map 和 Reduce 的原语。咱们意识到咱们大多数的运算都蕴含这样的操作:在输出数据的“逻辑”记录上利用 Map 操作得出一个两头 key/value pair 汇合,而后在所有具备雷同 key 值的 value 值上利用 Reduce 操作,从而达到合并两头的数据,失去一个想要的后果的目标。应用 MapReduce 模型,再联合用户实现的 Map 和 Reduce 函数,咱们就能够非常容易的实现大规模并行化计算;通过 MapReduce 模型自带的“再次执行”(re-execution)性能,也提供了高级的容灾实现计划。
这个工作 (实现一个 MapReduce 框架模型) 的次要奉献是通过简略的接口来实现主动的并行化和大规模的分布式计算,通过应用 MapReduce 模型接口实现在大量一般的 PC 机上高性能计算。
第二局部形容根本的编程模型和一些应用案例。
第三局部形容了一个通过裁剪的、适宜咱们的基于集群的计算环境 MapReduce 实现。
第四局部形容咱们认为在 MapReduce 编程模型中一些实用的技巧。
第五局部对于各种不同的工作,测量咱们 MapReduce 实现的性能。
第六局部揭示了在 Google 外部如何应用 MapReduce 作为根底重写咱们的索引零碎产品,包含其它一些应用 MapReduce 的教训。
第七局部探讨相干的和将来的工作。
2 编程模型
MapReduce 编程模型的原理是:利用一个输出 key/value pair 汇合来产生一个输入的 key/value pair 汇合。
MapReduce 库的用户用两个函数表白这个计算:Map 和 Reduce。
用户自定义的 Map 函数承受一个输出的 key/value pair 值,而后产生一个两头 key/value pair 值的汇合。
MapReduce 库把所有具备雷同两头 key 值 I 的两头 value 值汇合在一起后传递给 reduce 函数。
用户自定义的 Reduce 函数承受一个两头 key 的值 I 和相干的一个 value 值的汇合。Reduce 函数合并这些 value 值,造成一个较小的 value 值的汇合。个别的,每次 Reduce 函数调用只产生 0 或 1 个输入 value 值。通常咱们通过一个迭代器把两头 value 值提供给 Reduce 函数,这样咱们就能够解决无奈全副放入内存中的大量
的 value 值的汇合。
2.1 例子
例如,计算一个大的文档汇合中每个单词呈现的次数,上面是伪代码段:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w,“1″);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map 函数输入文档中的每个词、以及这个词的呈现次数 (在这个简略的例子里就是 1)。Reduce 函数把 Map 函数产生的每一个特定的词的计数累加起来。
另外,用户编写代码,应用输出和输入文件的名字、可选的调节参数来实现一个合乎 MapReduce 模型标准的对象,而后调用 MapReduce 函数,并把这个标准对象传递给它。用户的代码和 MapReduce 库链接在一起(用 C++ 实现)。附录 A 蕴含了这个实例的全副程序代码。
2.2 类型
只管在后面例子的伪代码中应用了以字符串示意的输入输出值,然而在概念上,用户定义的 Map 和 Reduce 函数都有相关联的类型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
比方,输出的 key 和 value 值与输入的 key 和 value 值在类型上推导的域不同。此外,两头 key 和 value
值与输入 key 和 value 值在类型上推导的域雷同。2
咱们的 C++ 中应用字符串类型作为用户自定义函数的输入输出,用户在本人的代码中对字符串进行适当
的类型转换。
2.3 更多的例子
这里还有一些乏味的简略例子,能够很容易的应用 MapReduce 模型来示意:
分布式的 Grep:Map 函数输入匹配某个模式的一行,Reduce 函数是一个恒等函数,即把两头数据复制到输入。
计算 URL 拜访频率:Map 函数解决日志中 web 页面申请的记录,而后输入 (URL,1)。Reduce 函数把雷同 URL 的 value 值都累加起来,产生(URL, 记录总数) 后果。
倒转网络链接图:Map 函数在源页面(source)中搜寻所有的链接指标(target)并输入为 (target,source)。
Reduce 函数把给定链接指标(target)的链接组合成一个列表,输入(target,list(source))。
每个主机的检索词向量:检索词向量用一个 (词, 频率) 列表来概述呈现在文档或文档集中的最重要的一些词。Map 函数为每一个输出文档输入 (主机名, 检索词向量),其中主机名来自文档的 URL。Reduce 函数接管给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,抛弃掉低频的检索词,输入一个最终的(主机名, 检索词向量)。
倒排索引:Map 函数剖析每个文档输入一个 (词, 文档号) 的列表,Reduce 函数的输出是一个给定词的所有(词,文档号),排序所有的文档号,输入 (词,list(文档号))。所有的输入汇合造成一个简略的倒排索引,它以一种简略的算法跟踪词在文档中的地位。
分布式排序:Map 函数从每个记录提取 key,输入 (key,record)。Reduce 函数不扭转任何的值。这个运算依赖分区机制(在 4.1 形容) 和排序属性(在 4.2 形容)。
3 实现
MapReduce 模型能够有多种不同的实现形式。如何正确抉择取决于具体的环境。例如,一种实现形式实用于小型的共享内存形式的机器,另外一种实现形式则实用于大型 NUMA 架构的多处理器的主机,而有的实现形式更适宜大型的网络连接集群。
本章节形容一个实用于 Google 外部宽泛应用的运算环境的实现:用以太网交换机连贯、由一般 PC 机组成的大型集群。在咱们的环境里包含:
- x86 架构、运行 Linux 操作系统、双处理器、2-4GB 内存的机器。
- 一般的网络硬件设施,每个机器的带宽为百兆或者千兆,然而远小于网络的均匀带宽的一半。
- 集群中蕴含成千盈百的机器,因而,机器故障是常态。
- 存储为便宜的内置 IDE 硬盘。一个外部分布式文件系统用来治理存储在这些磁盘上的数据。文件系
统通过数据复制来在不牢靠的硬件上保证数据的可靠性和有效性。 - 用户提交工作(job)给调度零碎。每个工作(job)都蕴含一系列的工作(task),调度零碎将这些任
务调度到集群中多台可用的机器上。
3.1 执行概括
通过将 Map 调用的输出数据主动宰割为 M 个数据片段的汇合,Map 调用被散布到多台机器上执行。输出的数据片段可能在不同的机器上并行处理。应用分区函数将 Map 调用产生的两头 key 值分成 R 个不同分区(例如,hash(key) mod R),Reduce 调用也被散布到多台机器上执行。分区数量(R)和分区函数由用户来指定。
图 1 展现了咱们的 MapReduce 实现中操作的全副流程。当用户调用 MapReduce 函数时,将产生上面的一系列动作(上面的序号和图 1 中的序号一一对应):
- 用户程序首先调用的 MapReduce 库将输出文件分成 M 个数据片度,每个数据片段的大小个别从 16MB 到 64MB(能够通过可选的参数来管制每个数据片段的大小)。而后用户程序在机群中创立大量的程序正本。
- 这些程序正本中的有一个非凡的程序–master。正本中其它的程序都是 worker 程序,由 master 分配任务。有 M 个 Map 工作和 R 个 Reduce 工作将被调配,master 将一个 Map 工作或 Reduce 任务分配给一个闲暇的 worker。
- 被调配了 map 工作的 worker 程序读取相干的输出数据片段,从输出的数据片段中解析出 key/valuepair,而后把 key/value pair 传递给用户自定义的 Map 函数,由 Map 函数生成并输入的两头 key/valuepair,并缓存在内存中。
- 缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的 key/value pair 在本地磁盘上的存储地位将被回传给 master,由 master 负责把这些存储地位再传送给 Reduce worker。
- 当 Reduce worker 程序接管到 master 程序发来的数据存储地位信息后,应用 RPC 从 Map worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的两头数据后,通过对 key 进行排序
后使得具备雷同 key 值的数据聚合在一起。因为许多不同的 key 值会映射到雷同的 Reduce 工作上,因而必须进行排序。如果两头数据太大无奈在内存中实现排序,那么就要在内部进行排序。
- Reduce worker 程序遍历排序后的两头数据,对于每一个惟一的两头 key 值,Reduce worker 程序将这个 key 值和它相干的两头 value 值的汇合传递给用户自定义的 Reduce 函数。Reduce 函数的输入被追加到所属分区的输入文件。
- 当所有的 Map 和 Reduce 工作都实现之后,master 唤醒用户程序。在这个时候,在用户程序里的对 MapReduce 调用才返回。
在胜利实现工作之后,MapReduce 的输入寄存在 R 个输入文件中(对应每个 Reduce 工作产生一个输入文件,文件名由用户指定)。个别状况下,用户不须要将这 R 个输入文件合并成一个文件–他们常常把这些文件作为另外一个 MapReduce 的输出,或者在另外一个能够解决多个宰割文件的分布式应用中应用。
3.2 Master 数据结构
Master 持有一些数据结构,它存储每一个 Map 和 Reduce 工作的状态(闲暇、工作中或实现),以及 Worker 机器 (非闲暇工作的机器) 的标识。
Master 就像一个数据管道,两头文件存储区域的地位信息通过这个管道从 Map 传递到 Reduce。因而,对于每个曾经实现的 Map 工作,master 存储了 Map 工作产生的 R 个两头文件存储区域的大小和地位。当 Map 工作实现时,Master 接管到地位和大小的更新信息,这些信息被逐渐递增的推送给那些正在工作的 Reduce 工作。
3.3 容错
因为 MapReduce 库的设计初衷是应用由成千盈百的机器组成的集群来解决超大规模的数据,所以,这个库必须要能很好的解决机器故障。
3.3.1 worker 故障
master 周期性的 ping 每个 worker。如果在一个约定的工夫范畴内没有收到 worker 返回的信息,master 将把这个 worker 标记为生效。所有由这个生效的 worker 实现的 Map 工作被重设为初始的闲暇状态,之后这些工作就能够被安顿给其余的 worker。同样的,worker 生效时正在运行的 Map 或 Reduce 工作也将被从新置为闲暇状态,期待从新调度。
当 worker 故障时,因为曾经实现的 Map 工作的输入存储在这台机器上,Map 工作的输入已不可拜访了,因而必须从新执行。而曾经实现的 Reduce 工作的输入存储在全局文件系统上,因而不须要再次执行。
当一个 Map 工作首先被 worker A 执行,之后因为 worker A 生效了又被调度到 worker B 执行,这个“从新执行”的动作会被告诉给所有执行 Reduce 工作的 worker。任何还没有从 worker A 读取数据的 Reduce 工作将从 worker B 读取数据。
MapReduce 能够解决大规模 worker 生效的状况。比方,在一个 MapReduce 操作执行期间,在正在运行的集群上进行网络保护引起 80 台机器在几分钟内不可拜访了,MapReduce master 只须要简略的再次执行那些不可拜访的 worker 实现的工作,之后继续执行未实现的工作,直到最终实现这个 MapReduce 操作。
3.3.2 master 失败
一个简略的解决办法是让 master 周期性的将下面形容的数据结构的写入磁盘,即检查点(checkpoint)。如果这个 master 工作生效了,能够从最初一个检查点(checkpoint)开始启动另一个 master 过程。然而,因为只有一个 master 过程,master 生效后再复原是比拟麻烦的,因而咱们当初的实现是如果 master 生效,就停止 MapReduce 运算。客户能够查看到这个状态,并且能够依据须要从新执行 MapReduce 操作。
3.3.3 在生效方面的解决机制
当用户提供的 Map 和 Reduce 操作是输出确定性函数(即雷同的输出产生雷同的输入)时,咱们的分布式实现在任何状况下的输入都和所有程序没有呈现任何谬误、程序的执行产生的输入是一样的。
咱们依赖对 Map 和 Reduce 工作的输入是原子提交的来实现这个个性。每个工作中的工作把它的输入写到公有的临时文件中。每个 Reduce 工作生成一个这样的文件,而每个 Map 工作则生成 R 个这样的文件(一个 Reduce 工作对应一个文件)。当一个 Map 工作实现的时,worker 发送一个蕴含 R 个长期文件名的实现音讯给 master。如果 master 从一个曾经实现的 Map 工作再次接管到到一个实现音讯,master 将疏忽这个音讯;否则,master 将这 R 个文件的名字记录在数据结构里。
当 Reduce 工作实现时,Reduce worker 过程以原子的形式把临时文件重命名为最终的输入文件。如果同一个 Reduce 工作在多台机器上执行,针对同一个最终的输入文件将有多个重命名操作执行。咱们依赖底层文件系统提供的重命名操作的原子性来保障最终的文件系统状态仅仅蕴含一个 Reduce 工作产生的数据。
应用 MapReduce 模型的程序员能够很容易的了解他们程序的行为,因为咱们绝大多数的 Map 和 Reduce 操作是确定性的,而且存在这样的一个事实:咱们的生效解决机制等价于一个程序的执行的操作。当 Map 和 Reduce 操作是不确定性的时候,咱们提供尽管较弱然而仍然正当的解决机制。当应用非确定操作的时候,
一个 Reduce 工作 R1 的输入等价于一个非确定性程序程序执行产生时的输入。然而,另一个 Reduce 工作 R2 的输入兴许合乎一个不同的非确定顺序程序执行产生的 R2 的输入。
思考 Map 工作 M 和 Reduce 工作 R1、R2 的状况。咱们设定 e(Ri)是 Ri 曾经提交的执行过程(有且仅有一个这样的执行过程)。当 e(R1)读取了由 M 一次执行产生的输入,而 e(R2)读取了由 M 的另一次执行产生的输入,导致了较弱的生效解决。
3.4 存储地位
在咱们的计算运行环境中,网络带宽是一个相当匮乏的资源。咱们通过尽量把输出数据 (由 GFS 治理) 存储在集群中机器的本地磁盘上来节俭网络带宽。GFS 把每个文件按 64MB 一个 Block 分隔,每个 Block 保留在多台机器上,环境中就寄存了多份拷贝 (个别是 3 个拷贝)。MapReduce 的 master 在调度 Map 工作时会思考输出文件的地位信息,尽量将一个 Map 任务调度在蕴含相干输出数据拷贝的机器上执行;如果上述致力失败了,master 将尝试在保留有输出数据拷贝的机器左近的机器上执行 Map 工作(例如,调配到一个和蕴含输出数
据的机器在一个 switch 里的 worker 机器上执行)。当在一个足够大的 cluster 集群上运行大型 MapReduce 操作的时候,大部分的输出数据都能从本地机器读取,因而耗费非常少的网络带宽。
3.5 工作粒度
如前所述,咱们把 Map 拆分成了 M 个片段、把 Reduce 拆分成 R 个片段执行。现实状况下,M 和 R 该当比集群中 worker 的机器数量要多得多。在每台 worker 机器都执行大量的不同工作可能进步集群的动静的负载平衡能力,并且可能放慢故障复原的速度:生效机器上执行的大量 Map 工作都能够散布到所有其余的 worker 机器下来执行。
然而实际上,在咱们的具体实现中对 M 和 R 的取值都有肯定的主观限度,因为 master 必须执行 O(M+R)次调度,并且在内存中保留 O(MR)个状态(对影响内存应用的因素还是比拟小的:O(MR)块状态,大略每对 Map 工作 /Reduce 工作 1 个字节就能够了)。更进一步,R 值通常是由用户指定的,因为每个 Reduce 工作最终都会生成一个独立的输入文件。理论应用时咱们也偏向于抉择适合的 M 值,以使得每一个独立工作都是解决大概 16M 到 64M 的输出数据(这样,
下面刻画的输出数据本地存储优化策略才最无效),另外,咱们把 R 值设置为咱们想应用的 worker 机器数量的小的倍数。咱们通常会用这样的比例来执行 MapReduce:M=200000,R=5000,应用 2000 台 worker 机器。
3.6 备用工作
影响一个 MapReduce 的总执行工夫最通常的因素是“落伍者”:在运算过程中,如果有一台机器花了很长的工夫才实现最初几个 Map 或 Reduce 工作,导致 MapReduce 操作总的执行工夫超过预期。呈现“落伍者”的起因十分多。比方:如果一个机器的硬盘出了问题,在读取的时候要常常的进行读取纠错操作,导致读取数据的速度从 30M/s 升高到 1M/s。如果 cluster 的调度零碎在这台机器上又调度了其余的工作,因为 CPU、内存、本地硬盘和网络带宽等竞争因素的存在,导致执行 MapReduce 代码的执行效率更加迟缓。咱们最近遇到的一个问题是因为机器的初始化代码有 bug,导致敞开了的处理器的缓存:在这些机器上执行工作的性能和失常状况相差上百倍。
咱们有一个通用的机制来缩小“落伍者”呈现的状况。当一个 MapReduce 操作靠近实现的时候,master 调度备用(backup)工作过程来执行剩下的、处于解决中状态(in-progress)的工作。无论是最后的执行过程、还是备用(backup)工作过程实现了工作,咱们都把这个工作标记成为曾经实现。咱们调优了这个机制,通常只会占用比失常操作多几个百分点的计算资源。咱们发现采纳这样的机制对于缩小超大 MapReduce 操作的总解决工夫效果显著。例如,在 5.3 节形容的排序工作,在敞开掉备用工作的状况下要多花 44% 的工夫实现排序工作。
4 技巧
尽管简略的 Map 和 Reduce 函数提供的基本功能曾经可能满足大部分的计算须要,咱们还是发掘出了一些有价值的扩大性能。本节将形容这些扩大性能。
4.1 分区函数
MapReduce 的使用者通常会指定 Reduce 工作和 Reduce 工作输入文件的数量(R)。咱们在两头 key 上应用分区函数来对数据进行分区,之后再输出到后续工作执行过程。一个缺省的分区函数是应用 hash 办法 (比方,hash(key) mod R) 进行分区。hash 办法能产生十分均衡的分区。然而,有的时候,其它的一些分区函数对 key 值进行的分区将十分有用。比方,输入的 key 值是 URLs,咱们心愿每个主机的所有条目放弃在同一个输入文件中。为了反对相似的状况,MapReduce 库的用户须要提供专门的分区函数。
例如应用“hash(Hostname(urlkey)) mod R”作为分区函数就能够把所有来自同一个主机的 URLs 保留在同一个输入文件中。
4.2 程序保障
咱们确保在给定的分区中,两头 key/value pair 数据的解决程序是依照 key 值增量程序解决的。这样的程序保障对每个分成生成一个有序的输入文件,这对于须要对输入文件按 key 值随机存取的利用十分有意义,对在排序输入的数据集也很有帮忙。
4.3 Combiner 函数
在某些状况下,Map 函数产生的两头 key 值的反复数据会占很大的比重,并且,用户自定义的 Reduce 函数满足结合律和交换律。在 2.1 节的词数统计程序是个很好的例子。因为词频率偏向于一个 zipf 散布(齐夫散布),每个 Map 工作将产生成千上万个这样的记录 <the,1>。所有的这些记录将通过网络被发送到一个独自的 Reduce 工作,而后由这个 Reduce 工作把所有这些记录累加起来产生一个数字。咱们容许用户指定一个可选的 combiner 函数,combiner 函数首先在本地将这些记录进行一次合并,而后将合并的后果再通过网络发送进来。
Combiner 函数在每台执行 Map 工作的机器上都会被执行一次。个别状况下,Combiner 和 Reduce 函数是一样的。Combiner 函数和 Reduce 函数之间惟一的区别是 MapReduce 库怎么管制函数的输入。Reduce 函数的输入被保留在最终的输入文件里,而 Combiner 函数的输入被写到两头文件里,而后被发送给 Reduce 工作。
局部的合并两头后果能够显著的进步一些 MapReduce 操作的速度。附录 A 蕴含一个应用 combiner 函数的例子。
4.4 输出和输入的类型
MapReduce 库反对几种不同的格局的输出数据。比方,文本模式的输出数据的每一行被视为是一个 key/value pair。key 是文件的偏移量,value 是那一行的内容。另外一种常见的格局是以 key 进行排序来存储的 key/value pair 的序列。每种输出类型的实现都必须可能把输出数据宰割成数据片段,该数据片段可能由独自的 Map 工作来进行后续解决 (例如,文本模式的范畴宰割必须确保仅仅在每行的边界进行范畴宰割)。尽管大多数 MapReduce 的使用者仅仅应用很少的预约义输出类型就满足要求了,然而使用者仍然能够通过提供一
个简略的 Reader 接口实现就可能反对一个新的输出类型。
Reader 并非肯定要从文件中读取数据,比方,咱们能够很容易的实现一个从数据库里读记录的 Reader,或者从内存中的数据结构读取数据的 Reader。
相似的,咱们提供了一些预约义的输入数据的类型,通过这些预约义类型可能产生不同格局的数据。用户采纳相似增加新的输出数据类型的形式减少新的输入类型。
4.5 副作用
在某些状况下,MapReduce 的使用者发现,如果在 Map 和 / 或 Reduce 操作过程中减少辅助的输入文件会比拟省事。咱们依附程序 writer 把这种“副作用”变成原子的和幂等的 3。通常应用程序首先把输入后果写到一个临时文件中,在输入全副数据之后,在应用零碎级的原子操作 rename 重新命名这个临时文件。
如果一个工作产生了多个输入文件,咱们没有提供相似两阶段提交的原子操作反对这种状况。因而,对于会产生多个输入文件、并且对于跨文件有一致性要求的工作,都必须是确定性的工作。然而在理论利用过程中,这个限度还没有给咱们带来过麻烦。
4.6 跳过损坏的记录
有时候,用户程序中的 bug 导致 Map 或者 Reduce 函数在解决某些记录的时候 crash 掉,MapReduce 操作无奈顺利完成。惯常的做法是修复 bug 后再次执行 MapReduce 操作,然而,有时候找出这些 bug 并修复它们不是一件容易的事件;这些 bug 兴许是在第三方库里边,而咱们手头没有这些库的源代码。而且在很多时候,疏忽一些有问题的记录也是能够承受的,比方在一个微小的数据集上进行统计分析的时候。咱们提供了一种执行模式,在这种模式下,为了保障保障整个解决能持续进行,MapReduce 会检测哪些记录导致确定性的 crash,并且跳过这些记录不解决。
每个 worker 过程都设置了信号处理函数捕捉内存段异样(segmentation violation)和总线谬误(bus error)。
在执行 Map 或者 Reduce 操作之前,MapReduce 库通过全局变量保留记录序号。如果用户程序触发了一个零碎信号,音讯处理函数将用“最初一口气”通过 UDP 包向 master 发送解决的最初一条记录的序号。当 master 看到在解决某条特定记录不止失败一次时,master 就标记着条记录须要被跳过,并且在下次从新执行相干的 Map 或者 Reduce 工作的时候跳过这条记录。
4.7 本地执行
调试 Map 和 Reduce 函数的 bug 是十分艰难的,因为理论执行操作时不然而散布在零碎中执行的,而且通常是在好几千台计算机上执行,具体的执行地位是由 master 进行动静调度的,这又大大增加了调试的难度。
为了简化调试、profile 和小规模测试,咱们开发了一套 MapReduce 库的本地实现版本,通过应用本地版本的 MapReduce 库,MapReduce 操作在本地计算机上程序的执行。用户能够管制 MapReduce 操作的执行,能够把操作限度到特定的 Map 工作上。用户通过设定特地的标记来在本地执行他们的程序,之后就能够很容易的应用本地调试和测试工具(比方 gdb)。
4.8 状态信息
master 应用嵌入式的 HTTP 服务器(如 Jetty)显示一组状态信息页面,用户能够监控各种执行状态。状态信息页面显示了包含计算执行的进度,比方曾经实现了多少工作、有多少工作正在解决、输出的字节数、两头数据的字节数、输入的字节数、解决百分比等等。页面还蕴含了指向每个工作的 stderr 和 stdout 文件的链接。用户依据这些数据预测计算须要执行大概多长时间、是否须要减少额定的计算资源。这些页面也能够用来剖析什么时候计算执行的比预期的要慢。
另外,处于最顶层的状态页面显示了哪些 worker 生效了,以及他们生效的时候正在运行的 Map 和 Reduce 工作。这些信息对于调试用户代码中的 bug 很有帮忙。
4.9 计数器
MapReduce 库应用计数器统计不同事件产生次数。比方,用户可能想统计曾经解决了多少个单词、曾经索引的多少篇 German 文档等等。
为了应用这个个性,用户在程序中创立一个命名的计数器对象,在 Map 和 Reduce 函数中相应的减少计数器的值。例如:
Counter* uppercase;
uppercase = GetCounter(“uppercase”);
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w,“1″);
这些计数器的值周期性的从各个独自的 worker 机器上传递给 master(附加在 ping 的应答包中传递)。master 把执行胜利的 Map 和 Reduce 工作的计数器值进行累计,当 MapReduce 操作实现之后,返回给用户代码。
计数器以后的值也会显示在 master 的状态页面上,这样用户就能够看到以后计算的进度。当累加计数器的值的时候,master 要查看反复运行的 Map 或者 Reduce 工作,防止反复累加(之前提到的备用工作和生效后从新执行工作这两种状况会导致雷同的工作被屡次执行)。
有些计数器的值是由 MapReduce 库主动维持的,比方曾经解决的输出的 key/value pair 的数量、输入的 key/value pair 的数量等等。
计数器机制对于 MapReduce 操作的完整性检查十分有用。比方,在某些 MapReduce 操作中,用户须要确保输入的 key value pair 准确的等于输出的 key value pair,或者解决的 German 文档数量在解决的整个文档数量中属于正当范畴。
5 性能
本节咱们用在一个大型集群上运行的两个计算来掂量 MapReduce 的性能。一个计算在大概 1TB 的数据中进行特定的模式匹配,另一个计算对大概 1TB 的数据进行排序。
这两个程序在大量的应用 MapReduce 的理论利用中是十分典型的 — 一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式;另一类是从海量数据中抽取少部分的用户感兴趣的数据。
5.1 集群配置
所有这些程序都运行在一个大概由 1800 台机器形成的集群上。每台机器配置 2 个 2G 主频、反对超线程的 Intel Xeon 处理器,4GB 的物理内存,两个 160GB 的 IDE 硬盘和一个千兆以太网卡。这些机器部署在一个两层的树形替换网络中,在 root 节点大略有 100-200GBPS 的传输带宽。所有这些机器都采纳雷同的部署(对等部署),因而任意两点之间的网络来回工夫小于 1 毫秒。
在 4GB 内存里,大略有 1-1.5G 用于运行在集群上的其余工作。测试程序在周末下午开始执行,这时主机的 CPU、磁盘和网络基本上处于闲暇状态。
5.2 GREP
这个分布式的 grep 程序须要扫描大略 10 的 10 次方个由 100 个字节组成的记录,查找呈现概率较小的 3 个字符的模式(这个模式在 92337 个记录中呈现)。输出数据被拆分成大概 64M 的 Block(M=15000),整个输入数据寄存在一个文件中(R=1)。
图 2 显示了这个运算随工夫的处理过程。其中 Y 轴示意输出数据的处理速度。处理速度随着参加 MapReduce 计算的机器数量的减少而减少,当 1764 台 worker 参加计算的时,处理速度达到了 30GB/s。当 Map 工作完结的时候,即在计算开始后 80 秒,输出的处理速度降到 0。整个计算过程从开始到完结一共花了大略 150 秒。这包含了大概一分钟的初始启动阶段。初始启动阶段耗费的工夫包含了是把这个程序传送到各个 worker 机器上的工夫、期待 GFS 文件系统关上 1000 个输出文件汇合的工夫、获取相干的文件本地地位优化信息的工夫。
5.3 排序
排序程序解决 10 的 10 次方个 100 个字节组成的记录(大略 1TB 的数据)。这个程序模拟 TeraSort benchmark[10]。
排序程序由不到 50 行代码组成。只有三行的 Map 函数从文本行中解析出 10 个字节的 key 值作为排序的 key,并且把这个 key 和原始文本行作为两头的 key/value pair 值输入。咱们应用了一个内置的恒等函数作为 Reduce 操作函数。这个函数把两头的 key/value pair 值不作任何扭转输入。最终排序后果输入到两路复制的 GFS 文件系统(也就是说,程序输入 2TB 的数据)。
如前所述,输出数据被分成 64MB 的 Block(M=15000)。咱们把排序后的输入后果分区后存储到 4000 个文件(R=4000)。分区函数应用 key 的原始字节来把数据分区到 R 个片段中。
在这个 benchmark 测试中,咱们应用的分区函数晓得 key 的分区状况。通常对于排序程序来说,咱们会减少一个预处理的 MapReduce 操作用于采样 key 值的散布状况,通过采样的数据来计算对最终排序解决的分区点。
图三(a)显示了这个排序程序的失常执行过程。左上的图显示了输出数据读取的速度。数据读取速度峰值会达到 13GB/s,并且所有 Map 工作实现之后,即大概 200 秒之后迅速滑落到 0。值得注意的是,排序程序输出数据读取速度小于分布式 grep 程序。这是因为排序程序的 Map 工作花了大概一半的解决工夫和 I/O 带宽把两头输入后果写到本地硬盘。相应的分布式 grep 程序的两头后果输入简直能够忽略不计。
右边两头的图显示了两头数据从 Map 工作发送到 Reduce 工作的网络速度。这个过程从第一个 Map 工作实现之后就开始迟缓启动了。图示的第一个顶峰是启动了第一批大略 1700 个 Reduce 工作(整个 MapReduce 散布到大略 1700 台机器上,每台机器 1 次最多执行 1 个 Reduce 工作)。排序程序运行大概 300 秒后,第一批启动的 Reduce 工作有些实现了,咱们开始执行剩下的 Reduce 工作。所有的解决在大概 600 秒后完结。
左下图示意 Reduce 工作把排序后的数据写到最终的输入文件的速度。在第一个排序阶段完结和数据开始写入磁盘之间有一个小的延时,这是因为 worker 机器正在忙于排序两头数据。磁盘写入速度在 2-4GB/s 继续一段时间。输入数据写入磁盘大概继续 850 秒。计入初始启动局部的工夫,整个运算耗费了 891 秒。这个速度和 TeraSort benchmark[18]的最高纪录 1057 秒相差不多。
还有一些值得注意的景象:输出数据的读取速度比排序速度和输入数据写入磁盘速度要高不少,这是因为咱们的输出数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节俭了网络带宽。排序速度比输入数据写入到磁盘的速度快,这是因为输入数据写了两份(咱们应用了 2 路的 GFS 文件系统,写入复制节点的起因是为了保证数据可靠性和可用性)。咱们把输入数据写入到两个复制节点的起因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。如果底层文件系统应用相似容错编码 [14](erasure
coding) 的形式而不是复制的形式保证数据的可靠性和可用性,那么在输入数据写入磁盘的时候,就能够升高网络带宽的应用。
5.4 高效的 backup 工作
图三(b)显示了敞开了备用工作后排序程序执行状况。执行的过程和图 3(a)很类似,除了输入数据写磁盘的动作在工夫上拖了一个很长的尾巴,而且在这段时间里,简直没有什么写入动作。在 960 秒后,只有 5 个 Reduce 工作没有实现。这些拖后腿的工作又执行了 300 秒才实现。整个计算耗费了 1283 秒,多了 44% 的执行工夫。
5.5 生效的机器
在图三(c)中演示的排序程序执行的过程中,咱们在程序开始后几分钟无意的 kill 了 1746 个 worker 中的 200 个。集群底层的调度立即在这些机器上从新开始新的 worker 解决过程(因为只是 worker 机器上的解决过程被 kill 了,机器自身还在工作)。
图三(c)显示出了一个“负”的输出数据读取速度,这是因为一些曾经实现的 Map 工作失落了(因为相应的执行 Map 工作的 worker 过程被 kill 了),须要从新执行这些工作。相干 Map 工作很快就被从新执行了。
整个运算在 933 秒内实现,包含了初始启动工夫(只比失常执行多耗费了 5% 的工夫)。
6 教训
咱们在 2003 年 1 月实现了第一个版本的 MapReduce 库,在 2003 年 8 月的版本有了显著的加强,这包含了输出数据本地优化、worker 机器之间的动静负载平衡等等。从那以后,咱们惊喜的发现,MapReduce 库能广泛应用于咱们日常工作中遇到的各类问题。它当初在 Google 外部各个领域失去广泛应用,包含:
- 大规模机器学习问题
- Google News 和 Froogle 产品的集群问题
- 从公众查问产品(比方 Google 的 Zeitgeist)的报告中抽取数据。
- 从大量的新利用和新产品的网页中提取有用信息(比方,从大量的地位搜寻网页中抽取地理位置信
息)。
- 大规模的图形计算。
图四显示了在咱们的源代码管理系统中,随着时间推移,独立的 MapReduce 程序数量的显著减少。从 2003 年早些时候的 0 个增长到 2004 年 9 月份的差不多 900 个不同的程序。MapReduce 的胜利取决于采纳 MapReduce
库可能在不到半个小时工夫内写出一个简略的程序,这个简略的程序可能在上千台机器的组成的集群上做大规模并发解决,这极大的放慢了开发和原形设计的周期。另外,采纳 MapReduce 库,能够让齐全没有分布式和 / 或并行零碎开发教训的程序员很容易的利用大量的资源,开发出分布式和 / 或并行处理的利用。
在每个工作完结的时候,MapReduce 库统计计算资源的应用情况。在表 1,咱们列出了 2004 年 8 月份
MapReduce 运行的工作所占用的相干资源。
6.1 大规模索引
到目前为止,MapReduce 最胜利的利用就是重写了 Google 网络搜寻服务所应用到的 index 零碎。索引零碎的输出数据是网络爬虫抓取回来的海量的文档,这些文档数据都保留在 GFS 文件系统里。这些文档原始内容 4 的大小超过了 20TB。索引程序是通过一系列的 MapReduce 操作(大概 5 到 10 次)来建设索引。应用 MapReduce(替换上一个特地设计的、分布式解决的索引程序)带来这些益处:
实现索引局部的代码简略、玲珑、容易了解,因为对于容错、分布式以及并行计算的解决都是 MapReduce 库提供的。比方,应用 MapReduce 库,计算的代码行数从原来的 3800 行 C++ 代码缩小到大略 700 行代码。
MapReduce 库的性能曾经足够好了,因而咱们能够把在概念上不相干的计算步骤离开解决,而不是混在一起以期缩小数据传递的额定耗费。概念上不相干的计算步骤的隔离也使得咱们能够很容易扭转索引解决形式。比方,对之前的索引零碎的一个小更改可能要消耗好几个月的工夫,然而在应用 MapReduce 的新零碎上,这样的更改只须要花几天工夫就能够了。
索引零碎的操作治理更容易了。因为由机器生效、机器处理速度迟缓、以及网络的霎时阻塞等引起的绝大部分问题都曾经由 MapReduce 库解决了,不再须要操作人员的染指了。另外,咱们能够通过在索引零碎集群中减少机器的简略办法进步整体解决性能。
7 相干工作
很多零碎都提供了严格的编程模式,并且通过对编程的严格限度来实现并行计算。例如,一个联合函数能够通过把 N 个元素的数组的前缀在 N 个处理器上应用并行前缀算法,在 log N 的工夫内计算完[6,9,13] 5。
MapReduce 能够看作是咱们联合在实在环境下解决海量数据的教训,对这些经典模型进行简化和萃取的成绩。
更加值得自豪的是,咱们还实现了基于上千台处理器的集群的容错解决。相比而言,大部分并发解决零碎都只在小规模的集群上实现,并且把容错解决交给了程序员。
Bulk Synchronous Programming[17]和一些 MPI 原语 [11] 提供了更高级别的并行处理形象,能够更容易写出并行处理的程序。MapReduce 和这些零碎的要害不同之处在于,MapReduce 利用限制性编程模式实现了用户程序的主动并发解决,并且提供了通明的容错解决。
咱们数据本地优化策略的灵感来源于 active disks[12,15]等技术,在 active disks 中,计算工作是尽量推送到数据存储的节点解决 6,这样就缩小了网络和 IO 子系统的吞吐量。咱们在挂载几个硬盘的一般机器上执行咱们的运算,而不是在磁盘处理器上执行咱们的工作,然而达到的目标一样的。
咱们的备用工作机制和 Charlotte System[3]提出的 eager 调度机制比拟相似。Eager 调度机制的一个毛病是如果一个工作重复生效,那么整个计算就不能实现。咱们通过疏忽引起故障的记录的形式在某种程度上解决了这个问题。
MapReduce 的实现依赖于一个外部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上散布和运行用户工作。尽管这个不是本论文的重点,然而有必要提一下,这个集群管理系统在理念上和其它零碎,如 Condor[16]是一样。
MapReduce 库的排序机制和 NOW-Sort[1]的操作上很相似。读取输出源的机器(map workers)把待排序的数据进行分区后,发送到 R 个 Reduce worker 中的一个进行解决。每个 Reduce worker 在本地对数据进行排序(尽可能在内存中排序)。当然,NOW-Sort 没有给用户自定义的 Map 和 Reduce 函数的机会,因而不具备 MapReduce 库宽泛的实用性。
River[2]提供了一个编程模型:解决过程通过分布式队列传送数据的形式进行相互通信。和 MapReduce 相似,River 零碎尝试在不对等的硬件环境下,或者在零碎平稳的状况下也能提供近似均匀的性能。River 是通过精心调度硬盘和网络的通信来均衡工作的实现工夫。MapReduce 库采纳了其它的办法。通过对编程模型进行限度,MapReduce 框架把问题合成成为大量的“小”工作。这些工作在可用的 worker 集群上动静的调度,这样疾速的 worker 就能够执行更多的工作。通过对编程模型进行限度,咱们可用在工作靠近实现的时候调度备用工作,缩短在硬件配置不平衡的状况下放大整个操作实现的工夫(比方有的机器性能差、或者机器被某些操作阻塞了)。
BAD-FS[5]采纳了和 MapReduce 齐全不同的编程模式,它是面向广域网的。
不过,这两个零碎有两个根底性能很相似。
(1)两个零碎采纳从新执行的形式来避免因为生效导致的数据失落。
(2)两个都应用数据本地化调度策略,缩小网络通讯的数据量。
TACC[7]是一个用于简化结构高可用性网络服务的零碎。和 MapReduce 一样,它也依附从新执行机制来实现的容错解决。
8 结束语
MapReduce 编程模型在 Google 外部胜利利用于多个畛域。咱们把这种胜利归结为几个方面:首先,因为 MapReduce 封装了并行处理、容错解决、数据本地化优化、负载平衡等等技术难点的细节,这使得 MapReduce 库易于应用。即使对于齐全没有并行或者分布式系统开发教训的程序员而言;
其次,大量不同类型的问题都能够通过 MapReduce 简略的解决。比方,MapReduce 用于生成 Google 的网络搜寻服务所须要的数据、用来
排序、用来数据挖掘、用于机器学习,以及很多其它的零碎;第三,咱们实现了一个在数千台计算机组成的大型集群上灵便部署运行的 MapReduce。这个实现使得无效利用这些丰盛的计算资源变得非常简单,因而也适宜用来解决 Google 遇到的其余很多须要大量计算的问题。
咱们也从 MapReduce 开发过程中学到了不少货色。首先,束缚编程模式使得并行和分布式计算非常容易,也易于结构容错的计算环境;其次,网络带宽是罕见资源。大量的系统优化是针对缩小网络传输量为目标的:
本地优化策略使大量的数据从本地磁盘读取,两头文件写入本地磁盘、并且只写一份两头文件也节约了网络带宽;
第三,屡次执行雷同的工作能够缩小性能迟缓的机器带来的负面影响同时解决了因为机器生效导致的数据失落问题。
更多 Flink,Kafka,Spark 等相干技术博文,科技资讯,欢送关注实时流式计算 公众号后盾回复“电子书”下载 300 页 Flink 实战电子书