引入
在2010年,来自Berkeley的博士生 Matei Zaharia 发表了一篇论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》。伴随着这篇论文的,是一个开源系统,也就是 Spark。在之后的几年里,Spark不仅逐步侵蚀了Hadoop的市场份额,成为大数据分析的第一选择,也在实时数据处理,以及大规模机器学习里占据了重要的地位。
Spark的计算模型,其实可以看作一个是支持多轮迭代的MapReduce模型。不过它在实现层面,又和稍显笨拙的MapReduce完全不同。通过引入RDD这样一个函数式对象的数据集的概念,Spark在多轮的数据迭代里,不需要像MapReduce一样反反复复地读写硬盘,大大提升了处理数据的性能。
和之前我们聊过的HDFS、MapReduce等类似的是,它没有用什么黑科技,也没有太多复杂难懂的学术概念,Spark论文发表的时候,整个系统也就1万多行代码,它的核心就是针对MapReduce明显的不足之处做了改造。
本文我们就从Spark论文开始学习。
弹性分布式数据集:一种用于内存集群计算的容错抽象
摘要
我们提出了弹性分布式数据集(RDDs),这是一种分布式内存抽象,它使程序员能够以容错的方式在大型集群上进行内存计算。RDDs 的产生源于当前计算框架处理效率低下的两类应用:迭代算法和交互式数据挖掘工具。在这两种情况下,将数据保留在内存中可使性能提升一个数量级。
为了高效实现容错,RDDs 基于粗粒度转换而非对共享状态的细粒度更新,提供了一种受限形式的共享内存。然而,我们证明了 RDDs 具有足够的表达能力,能够涵盖广泛的计算类型,包括近期用于迭代作业的专门编程模型(如 Pregel),以及这些模型尚未涉及的新应用。我们已在一个名为 Spark 的系统中实现了 RDDs,并通过各种用户应用程序和基准测试对其进行了评估。
1. 引言
像 MapReduce [10] 和 Dryad [19] 这样的集群计算框架已被广泛应用于大规模数据分析。这些系统使用户能够通过一组高级操作符编写并行计算程序,而无需担心任务分配和容错问题。
尽管当前的框架为访问集群计算资源提供了众多抽象,但它们缺乏利用分布式内存的抽象机制。这使得它们在一类重要的新兴应用中效率低下:这类应用需要在多个计算过程中复用中间结果。数据复用在许多迭代机器学习和图算法中很常见,包括 PageRank、K 均值聚类和逻辑回归。另一个引人注目的用例是交互式数据挖掘,用户在数据的同一子集上运行多个即席查询。不幸的是,在大多数当前框架中,在计算之间(例如,在两个 MapReduce 作业之间)复用数据的唯一方法是将其写入外部稳定存储系统,如分布式文件系统。这会由于数据复制、磁盘 I/O 和序列化而产生大量开销,这些开销可能会主导应用程序的执行时间。
认识到这个问题后,研究人员针对一些需要数据复用的应用开发了专门的框架。例如,Pregel [22] 是一个用于迭代图计算的系统,它将中间数据保存在内存中,而 HaLoop [7] 提供了一个迭代 MapReduce 接口。然而,这些框架仅支持特定的计算模式(例如,循环一系列 MapReduce 步骤),并且仅针对这些模式隐式地进行数据共享。它们没有为更通用的复用提供抽象,例如,让用户将多个数据集加载到内存中并对其运行即席查询。
在本文中,我们提出了一种名为弹性分布式数据集(RDDs)的新抽象,它能够在广泛的应用中实现高效的数据复用。RDDs 是容错的并行数据结构,它允许用户显式地将中间结果持久化到内存中,控制其分区以优化数据放置,并使用丰富的操作符对其进行操作。
设计 RDDs 的主要挑战在于定义一个能够高效提供容错能力的编程接口。现有的集群内存存储抽象,如分布式共享内存 [24]、键值存储 [25]、数据库和 Piccolo [27],提供了一个基于对可变状态(例如表中的单元格)进行细粒度更新的接口。使用这个接口,提供容错的唯一方法是在机器间复制数据或在机器间记录更新。对于数据密集型工作负载,这两种方法成本都很高,因为它们需要通过集群网络复制大量数据,而集群网络的带宽远低于内存带宽,并且会产生大量的存储开销。
与这些系统不同,RDDs 提供了一个基于粗粒度转换(例如 map、filter 和 join)的接口,该接口对许多数据项应用相同的操作。这使得它们能够通过记录用于构建数据集的转换(其血统)而不是实际数据来高效地提供容错能力。如果 RDD 的一个分区丢失,RDD 拥有足够的关于它如何从其他 RDD 派生而来的信息,以便仅重新计算该分区。因此,丢失的数据通常可以相当快速地恢复,而无需进行代价高昂的复制。
不过,当血统链变得很长时,对某些弹性分布式数据集(RDD)中的数据进行检查点操作可能会很有用,我们将在 5.4 节讨论如何进行该操作。
虽然基于粗粒度转换的接口乍一看可能有限,但 RDDs 很适合许多并行应用程序,因为这些应用程序自然会对多个数据项应用相同的操作。事实上,我们表明 RDDs 可以有效地表达许多迄今为止作为独立系统提出的集群编程模型,包括 MapReduce、DryadLINQ、SQL、Pregel 和 HaLoop,以及这些系统未涵盖的新应用,如交互式数据挖掘。我们相信,RDDs 能够满足以前仅通过引入新框架才能满足的计算需求,这是 RDD 抽象强大功能的最可靠证据。
我们已经在一个名为 Spark 的系统中实现了 RDDs,该系统正在加州大学伯克利分校和几家公司用于研究和生产应用。Spark 以 Scala 编程语言 [2] 提供了类似于 DryadLINQ [31] 的便捷的语言集成编程接口。此外,Spark 可以与 Scala 解释器交互使用,以查询大数据集。我们认为 Spark 是第一个允许以交互式速度在集群上使用通用编程语言进行内存数据挖掘的系统。
我们通过微基准测试和用户应用程序测量来评估 RDDs 和 Spark。我们表明,对于迭代应用程序,Spark 比 Hadoop 快 20 倍,将实际数据分析报告的速度提高了 40 倍,并且可以交互式地用于扫描 1TB 数据集,延迟为 5 - 7 秒。更根本的是,为了说明 RDDs 的通用性,我们在 Spark 之上实现了 Pregel 和 HaLoop 编程模型,包括它们采用的放置优化,作为相对较小的库(每个 200 行代码)。
本文首先概述 RDDs(§2)和 Spark(§3)。然后,我们讨论 RDDs 的内部表示(§4)、我们的实现(§5)和实验结果(§6)。最后,我们讨论 RDDs 如何涵盖几种现有的集群编程模型(§7),调查相关工作(§8),并得出结论。
2. 弹性分布式数据集(RDDs)
本节对 RDDs 进行概述。我们首先定义 RDDs(2.1 节),并介绍其在 Spark 中的编程接口(2.2 节)。然后,我们将 RDDs 与更细粒度的共享内存抽象进行比较(2.3 节)。最后,我们讨论 RDD 模型的局限性(2.4 节)。
2.1 RDD 抽象
从形式上看,RDD 是一个只读的、分区的记录集合。RDD 只能通过对以下两种数据进行确定性操作来创建:(1)稳定存储中的数据;(2)其他 RDD。为了与 RDD 上的其他操作区分开来,我们将这些操作称为转换操作。转换操作的示例包括 map、filter 和 join 。
RDD 并不需要一直物化。相反,RDD 拥有足够的关于它如何从其他数据集派生而来的信息(即它的血统),以便从稳定存储中的数据计算其分区。这是一个强大的特性:本质上,程序无法引用在发生故障后无法重构的 RDD。
最后,用户可以控制 RDD 的另外两个方面:持久化和分区。用户可以指明他们将复用哪些 RDD,并为它们选择一种存储策略(例如,内存存储)。用户还可以要求根据每条记录中的某个键,将 RDD 的元素在多台机器间进行分区。这对于布局优化很有用,例如确保将要进行连接的两个数据集以相同方式进行哈希分区。
2.2 Spark 编程接口
Spark 通过一种与 DryadLINQ [31] 和 FlumeJava [8] 类似的语言集成式 API 来公开 RDD。在这种 API 中,每个数据集都表示为一个对象,并且通过调用这些对象上的方法来执行转换操作。
程序员首先通过对稳定存储中的数据执行转换操作(例如 map 和 filter)来定义一个或多个 RDD。然后,他们可以在行动操作中使用这些 RDD,行动操作是指将值返回给应用程序或将数据导出到存储系统的操作。行动操作的示例包括 count(返回数据集中元素的数量)、collect(返回元素本身)以及 save(将数据集输出到存储系统)。与 DryadLINQ 类似,Spark 在 RDD 首次在行动操作中被使用时进行惰性计算,这样它就可以将转换操作串接起来。
此外,程序员可以调用 persist 方法来指明他们希望在未来操作中复用哪些 RDD。默认情况下,Spark 将持久化的 RDD 保存在内存中,但如果内存不足,它可以将这些 RDD 溢出到磁盘。用户还可以通过 persist 的标志位请求其他持久化策略,例如仅将 RDD 存储在磁盘上或在多台机器间复制它。最后,用户可以在每个 RDD 上设置持久化优先级,以指定哪些内存中的数据应首先溢出到磁盘。
2.2.1 示例:控制台日志挖掘
假设一个 Web 服务出现错误,运维人员想要在 Hadoop 文件系统(HDFS)中搜索数 TB 的日志以找出原因。使用 Spark,该运维人员可以仅将日志中的错误消息加载到一组节点的内存中,并以交互方式查询它们。
尽管单个弹性分布式数据集(RDD)是不可变的,但通过使用多个RDD来表示数据集的多个版本,就有可能实现可变状态。我们将RDD设计为不可变的,是为了更便于描述血统图,但如果我们的抽象是版本化数据集,并在血统图中追踪版本,其效果也是等效的。
图 1:我们示例中第三个查询的血统图。方框代表弹性分布式数据集(RDDs),箭头代表转换操作。
她首先会输入以下 Scala 代码:
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()
第 1 行定义了一个由 HDFS 文件支持的 RDD(作为文本行的集合),而第 2 行从它派生出一个经过筛选的 RDD。第 3 行接着要求将 “errors” RDD 持久化到内存中,以便在多个查询中共享。请注意,“filter” 的参数是 Scala 中闭包的语法形式。
此时,集群上还未执行任何操作。然而,用户现在可以在行动操作中使用这个 RDD,例如,统计消息的数量:
errors.count()
用户还可以对该 RDD 执行进一步的转换操作,并使用其结果,如下几行代码所示:
// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split(’\t’)(3))
.collect()
在涉及errors
的第一个行动操作运行之后,Spark 会将errors
的分区存储在内存中,极大地加快后续对其进行的计算。请注意,基础 RDD,即lines
,并不会加载到内存中。这是很理想的情况,因为错误消息可能只占数据的一小部分(小到足以装入内存)。
最后,为了说明我们的模型是如何实现容错的,我们在图 1 中展示了第三个查询中 RDD 的血统图。在这个查询中,我们从errors
开始,它是对lines
进行筛选的结果,然后在运行collect
之前,又进行了一次筛选和映射操作。Spark 调度器会将后两个转换操作串接起来,并将一组任务发送到持有errors
缓存分区的节点上进行计算。此外,如果errors
的某个分区丢失,Spark 会通过仅对lines
中相应的分区应用筛选操作来重建该分区。
表 1:弹性分布式数据集(RDDs)与分布式共享内存的比较
2.3 RDD 模型的优势
为了理解 RDD 作为分布式内存抽象的优势,我们在表 1 中将其与分布式共享内存(DSM)进行比较。在 DSM 系统中,应用程序可以对全局地址空间中的任意位置进行读写操作。请注意,按照这个定义,我们不仅涵盖传统的共享内存系统 [24],还包括其他应用程序对共享状态进行细粒度写入的系统,其中包括提供共享分布式哈希表(DHT)的 Piccolo [27] 以及分布式数据库。
DSM 是一种非常通用的抽象,但这种通用性使得在普通集群上以高效且容错的方式实现它变得更加困难。
RDD 和 DSM 之间的主要区别在于,RDD 只能通过粗粒度转换来创建(“写入”),而 DSM 允许对每个内存位置进行读写操作 。这将 RDD 限制于执行批量写入的应用程序,但却实现了更高效的容错。特别是,RDD 无需承担检查点的开销,因为它们可以使用血统信息进行恢复 。此外,在发生故障时,只需重新计算 RDD 丢失的分区,并且可以在不同节点上并行重新计算这些分区,而无需回滚整个程序。
请注意,对 RDD 的读取操作仍然可以是细粒度的。例如,应用程序可以将 RDD 视为一个大型只读查找表。
在某些应用中,对于具有长血统链的 RDD 进行检查点操作仍然是有帮助的,正如我们将在 5.4 节中所讨论的。然而,由于 RDD 是不可变的,这一操作可以在后台进行,无需像在分布式共享内存(DSM)中那样对整个应用程序进行快照。
RDD 的第二个好处是其不可变的特性,这使得系统能够像 MapReduce [10] 那样,通过运行缓慢任务的备份副本来缓解慢节点(拖后腿节点)的问题。使用 DSM 很难实现备份任务,因为一个任务的两个副本会访问相同的内存位置,并相互干扰对方的更新操作。
图 2:Spark 运行时。用户的驱动程序启动多个工作节点,这些工作节点从分布式文件系统读取数据块,并能将计算出的 RDD 分区持久化存储在内存中。
最后,与分布式共享内存(DSM)相比,弹性分布式数据集(RDDs)还有另外两个优势。
- 第一,在对 RDDs 进行批量操作时,运行时可以根据数据的本地性来调度任务,从而提高性能。
- 第二,当内存不足以存储 RDDs 时,只要它们仅用于基于扫描的操作,RDDs 能平稳应对。无法存入内存的分区可以存储在磁盘上,并且性能与当前的数据并行系统相近。
2.4 不适合 RDD 的应用场景
正如引言中所讨论的,RDD 最适合对数据集中所有元素应用相同操作的批处理应用。在这些情况下,RDD 可以高效地将每个转换记录为血统图中的一个步骤,并且能够在无需记录大量数据的情况下恢复丢失的分区。
RDD 不太适合对共享状态进行异步细粒度更新的应用,比如 Web 应用的存储系统或增量式网络爬虫。对于这些应用,使用执行传统更新日志记录和数据检查点的系统会更高效,例如数据库、RAMCloud [25]、Percolator [26] 和 Piccolo [27]。我们的目标是为批处理分析提供一个高效的编程模型,而将这些异步应用留给专门的系统。
3 Spark 编程接口
Spark 通过一种与 DryadLINQ [31] 类似的语言集成式 API 在 Scala [2] 中提供 RDD 抽象,Scala 是一种用于 Java 虚拟机的静态类型函数式编程语言。我们选择 Scala 是因为它兼具简洁性(便于交互式使用)和高效性(得益于静态类型)。然而,RDD 抽象本身并不依赖于函数式语言。
如图 2 所示,开发者使用 Spark 时,需编写一个连接到一组工作节点集群的驱动程序。驱动程序定义一个或多个 RDD,并对它们调用行动操作。驱动程序上的 Spark 代码还会追踪 RDD 的血统。工作节点是长期运行的进程,能够在不同操作间将 RDD 分区存储在内存中。
正如我们在 2.2.1 节的日志挖掘示例中所示,用户通过传递闭包(函数字面量)为 RDD 的操作(如 map)提供参数。Scala 将每个闭包表示为一个 Java 对象,这些对象可以被序列化并加载到另一个节点上,以便通过网络传递闭包。Scala 还会将闭包中绑定的任何变量作为 Java 对象中的字段保存。例如,用户可以编写类似 var x = 5; rdd.map(_ + x)
的代码,将 RDD 的每个元素加上 5 。
我们在闭包创建时就保存它,所以在这个例子中,即使
x
的值发生变化,map
操作始终会给每个元素加 5 。
RDD 本身是由元素类型参数化的静态类型对象。例如,RDD[Int]
是一个整数类型的 RDD。不过,由于 Scala 支持类型推断,我们的大多数示例都省略了类型。
虽然我们在 Scala 中公开 RDD 的方法在概念上很简单,但我们不得不使用反射 [33] 来解决 Scala 闭包对象相关的问题。正如我们将在 5.2 节讨论的,为了让 Spark 能在 Scala 解释器中可用,我们还需要做更多工作。尽管如此,我们无需修改 Scala 编译器。
3.1 Spark 中的 RDD 操作
表 2 列出了 Spark 中可用的主要 RDD 转换操作和行动操作。我们给出了每个操作的签名,方括号中显示类型参数。请记住,转换操作是定义新 RDD 的惰性操作,而行动操作则启动计算,将值返回给程序或将数据写入外部存储。
请注意,某些操作(如 join)仅适用于键值对形式的 RDD。此外,我们选择的函数名与 Scala 和其他函数式语言中的其他 API 相匹配;例如,map 是一对一映射,而 flatMap 将每个输入值映射到一个或多个输出值(类似于 MapReduce 中的 map)。
除了这些操作符,用户可以要求持久化一个 RDD。此外,用户可以获取 RDD 的分区顺序,它由一个 Partitioner 类表示,并根据它对另一个数据集进行分区。诸如 groupByKey、reduceByKey 和 sort 等操作会自动生成一个哈希分区或范围分区的 RDD。
表 2:Spark 中 RDD 可用的转换操作和行动操作。Seq [T] 表示类型为 T 的元素序列。
3.2 示例应用
我们通过两个迭代应用来补充 2.2.1 节中的数据挖掘示例:逻辑回归和 PageRank。后者还展示了对 RDD 分区的控制如何提高性能。
3.2.1 逻辑回归
许多机器学习算法本质上具有迭代性,因为它们会运行诸如梯度下降之类的迭代优化程序,以最大化某个函数。因此,通过将数据保留在内存中,这些算法能够显著提升运行速度。
例如,以下程序实现了逻辑回归算法 [14],这是一种常见的分类算法,旨在寻找一个超平面 w,以最佳方式分隔两组数据点(例如,垃圾邮件和非垃圾邮件)。该算法采用梯度下降法:首先将 w 初始化为一个随机值,然后在每次迭代中,对数据上关于 w 的函数进行求和,从而朝着改进 w 的方向移动 w 的值。
val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}
我们首先定义一个名为points
的持久化 RDD,它是对一个文本文件进行map
转换的结果,该转换将文本的每一行解析为一个Point
对象。然后,我们在points
上反复运行map
和reduce
操作,通过对当前w
的函数求和来计算每一步的梯度。正如我们将在 6.1 节展示的,在各次迭代中把points
保留在内存中可以使速度提升 20 倍。
3.2.2 PageRank 算法
在 PageRank 算法 [6] 中,会出现一种更为复杂的数据共享模式。该算法通过累加指向每个文档的其他文档的贡献值,来迭代更新每个文档的排名。在每次迭代中,每个文档向其邻居发送 r/n 的贡献值,其中 r 是该文档的排名,n 是其邻居数量。然后,它将自身排名更新为 α/N + (1 - α)∑ci ,这里的求和是对其收到的所有贡献值进行的,N 是文档总数。我们可以在 Spark 中按如下方式编写 PageRank 算法:
// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}
图 3:PageRank 中数据集的血统图。
该程序会生成如图 3 所示的 RDD 血统图。在每次迭代中,我们会基于上一次迭代的contribs和ranks数据集以及静态的links数据集,创建一个新的ranks数据集 。此图的一个有趣特性是,它会随着迭代次数的增加而变长。因此,在具有多次迭代的作业中,可能有必要可靠地复制ranks的某些版本,以减少故障恢复时间 [20]。用户可以使用RELIABLE标志调用persist方法来实现这一点。不过,请注意,links数据集无需复制,因为通过对输入文件的数据块重新运行映射操作,可以高效地重建其分区。通常,这个数据集会比ranks大得多,因为每个文档有许多链接,但作为其排名的数字只有一个,所以与对程序整个内存状态进行检查点的系统相比,使用血统信息恢复该数据集能节省时间。
请注意,尽管 RDD 是不可变的,但程序中的变量
ranks
和contribs
在每次迭代时指向不同的 RDD。
最后,我们可以通过控制 RDD 的分区来优化 PageRank 中的通信。如果我们为links指定一种分区方式(例如,按 URL 在节点间进行哈希分区链接列表),我们可以以相同的方式对ranks进行分区,并确保links和ranks之间的连接操作无需通信(因为每个 URL 的排名将与它的链接列表位于同一台机器上)。我们还可以编写一个自定义的Partitioner类,将相互链接的页面分组在一起(例如,按域名对 URL 进行分区)。当我们定义links时,可以通过调用partitionBy来实现这两种优化:
links = spark.textFile(...).map(...)
.partitionBy(myPartFunc).persist()
在这个初始调用之后,links
和ranks
之间的连接操作会自动将每个 URL 的贡献值聚合到其链接列表所在的机器上,在那里计算其新的排名,然后将其与链接进行连接。这种跨迭代的一致性分区是像 Pregel 这样的专门框架中的主要优化方法之一。RDD 让用户能够直接表达这一目标。
4 RDD 的表示
将 RDD 作为一种抽象来实现的挑战之一,是为其选择一种能够在各种不同转换操作中追踪血统关系的表示方式。理想情况下,实现 RDD 的系统应提供尽可能丰富的转换操作符集合(例如,表 2 中的那些操作符),并允许用户以任意方式组合使用它们。我们提出一种基于图的简单 RDD 表示方法,以助力实现这些目标。我们已在 Spark 中采用这种表示方法,支持了广泛的转换操作,且无需为每个操作在调度器中添加特殊逻辑,这极大地简化了系统设计。
简而言之,我们提议通过一个通用接口来表示每个 RDD,该接口公开五条信息:一组分区,它们是数据集的原子组成部分;一组对父 RDD 的依赖关系;一个基于其父 RDD 计算数据集的函数;以及关于其分区方案和数据存放位置的元数据。例如,一个表示 HDFS 文件的 RDD,对于文件的每个数据块都有一个分区,并且知道每个数据块位于哪些机器上。同时,对这个 RDD 进行 map 操作后的结果具有相同的分区,但在计算其元素时会将 map 函数应用于父 RDD 的数据。我们在表 3 中总结了这个接口。
表 3:Spark 中用于表示 RDD 的接口。
设计这个接口时,最有趣的问题是如何表示 RDD 之间的依赖关系。我们发现将依赖关系分为两种类型既足够又实用:窄依赖,即父 RDD 的每个分区最多被子 RDD 的一个分区使用;宽依赖,即多个子分区可能依赖于同一个父分区。例如,map 操作会产生窄依赖,而 join 操作会产生宽依赖(除非父 RDD 已按哈希分区)。图 4 展示了其他示例。
图 4:窄依赖和宽依赖的示例。每个方框代表一个 RDD,分区显示为阴影矩形。
这种区分之所以有用,有两个原因。首先,窄依赖允许在单个集群节点上进行流水线式执行,该节点可以计算所有父分区。例如,可以逐个元素地先应用 map 操作,再应用 filter 操作。相比之下,宽依赖要求所有父分区的数据都可用,并且需要使用类似 MapReduce 的操作在节点间进行数据洗牌。其次,在节点发生故障后,窄依赖的恢复效率更高,因为只需重新计算丢失的父分区,并且可以在不同节点上并行重新计算。相比之下,在具有宽依赖的血统图中,单个节点故障可能导致 RDD 所有祖先的某些分区丢失,从而需要完全重新执行。
RDD 的这个通用接口使得在 Spark 中用不到 20 行代码就能实现大多数转换操作。实际上,即使是 Spark 的新用户,在不了解调度器细节的情况下,也实现了新的转换操作(例如,采样和各种类型的连接操作)。下面我们简要介绍一些 RDD 的实现。
- HDFS 文件:我们示例中的输入 RDD 通常是 HDFS 中的文件。对于这些 RDD,
partitions
方法为文件的每个数据块返回一个分区(每个Partition
对象中存储数据块的偏移量),preferredLocations
方法给出数据块所在的节点,iterator
方法则读取数据块。 - map:对任何 RDD 调用
map
操作会返回一个MappedRDD
对象。这个对象与它的父 RDD 具有相同的分区和首选位置,但在其iterator
方法中,会将传递给map
的函数应用于父 RDD 的记录。 - union:对两个 RDD 调用
union
操作会返回一个 RDD,其分区是两个父 RDD 分区的并集。每个子分区通过对相应父分区的窄依赖来计算 。 - sample:采样操作与映射操作类似,不同之处在于 RDD 为每个分区存储一个随机数生成器种子,以便确定性地对父记录进行采样。
- join:连接两个 RDD 可能会导致两种窄依赖(如果它们都使用相同的分区器按哈希 / 范围分区)、两种宽依赖,或者两者混合(如果一个父 RDD 有分区器,而另一个没有)。无论哪种情况,输出的 RDD 都有一个分区器(要么从父 RDD 继承,要么是默认的哈希分区器)。
5 实现
我们用大约 14000 行 Scala 代码实现了 Spark。该系统运行在 Mesos 集群管理器 [17] 之上,使其能够与 Hadoop、MPI 及其他应用程序共享资源。每个 Spark 程序作为一个独立的 Mesos 应用程序运行,拥有自己的驱动程序(主节点)和工作节点,这些应用程序之间的资源共享由 Mesos 负责处理。
Spark 可以使用 Hadoop 现有的输入插件 API,从任何 Hadoop 输入源(如 HDFS 或 HBase)读取数据,并且可以在未经修改的 Scala 版本上运行。
下面我们简要介绍该系统中几个技术上较为有趣的部分:作业调度器(§5.1)、支持交互式使用的 Spark 解释器(§5.2)、内存管理(§5.3)以及对检查点的支持(§5.4)。
5.1 作业调度
Spark 的调度器采用了我们在第 4 节中描述的 RDD 表示方法。
总体而言,我们的调度器与 Dryad 的调度器类似 [19],但它还额外考虑了持久化 RDD 的哪些分区在内存中可用。每当用户对一个 RDD 执行一个行动操作(例如,count 或 save)时,调度器会检查该 RDD 的血统图,以构建一个要执行的阶段有向无环图(DAG),如图 5 所示。每个阶段尽可能多地包含具有窄依赖关系的流水线式转换操作。阶段的边界是宽依赖关系所需的洗牌操作,或者是任何已计算的分区,这些分区可以使父 RDD 的计算短路。然后,调度器会启动任务来计算每个阶段中缺失的分区,直到计算出目标 RDD。
图 5:Spark 计算作业阶段的示例。轮廓为实线的方框代表 RDD。分区用阴影矩形表示,若已在内存中则为黑色。要对 RDD G 执行一个行动操作,我们在宽依赖处构建阶段,并在每个阶段内将窄依赖转换操作组成流水线。在这个例子中,阶段 1 的输出 RDD 已在随机存取存储器(RAM)中,所以我们先运行阶段 2,然后运行阶段 3。
我们的调度器使用延迟调度 [32],根据数据局部性将任务分配到各个机器上。如果一个任务需要处理某个节点内存中已有的分区,我们就将该任务发送到该节点。否则,如果一个任务要处理某个分区,而包含该分区的 RDD 提供了首选位置(例如,一个 HDFS 文件),我们就将任务发送到这些位置。
对于宽依赖关系(即洗牌依赖),我们目前会在持有父分区的节点上具体化中间记录,以简化故障恢复,这与 MapReduce 具体化映射输出的方式很相似。
如果一个任务失败,只要其所在阶段的父阶段仍然可用,我们就会在另一个节点上重新运行该任务。如果某些阶段变得不可用(例如,因为洗牌 “映射端” 的输出丢失),我们会重新提交任务,并行计算缺失的分区。不过,我们目前还不能容忍调度器故障,尽管复制 RDD 血统图相对简单。
最后,虽然 Spark 目前的所有计算都是响应驱动程序中调用的行动操作而运行的,但我们也在试验让集群上的任务(例如,映射操作)调用查找操作,该操作通过键对哈希分区的 RDD 元素提供随机访问。在这种情况下,如果所需的分区缺失,任务需要告知调度器进行计算。
图 6:示例展示了 Spark 解释器如何将用户输入的两行内容转换为 Java 对象。
5.2 解释器集成
Scala 包含一个类似于 Ruby 和 Python 的交互式 shell。鉴于内存数据处理的低延迟特性,我们希望让用户能够从解释器以交互方式运行 Spark 来查询大型数据集。
Scala 解释器通常的工作方式是,为用户输入的每一行代码编译一个类,将其加载到 JVM 中,并在其上调用一个函数。这个类包含一个单例对象,该对象包含该行的变量或函数,并在 initialize
方法中运行该行代码。例如,如果用户先输入 var x = 5
,接着输入 println(x)
,解释器会定义一个名为 Line1
的类,其中包含 x
,并使第二行代码编译为 println(Line1.getInstance().x)
。
在 Spark 中,我们对解释器做了两处修改:
- 类传输:为了让工作节点能够获取为每一行代码创建的类的字节码,我们让解释器通过 HTTP 提供这些类。
- 修改代码生成:通常,为每一行代码创建的单例对象是通过其对应类上的静态方法来访问的。这意味着,当我们序列化一个引用前一行定义的变量的闭包时,比如上面例子中的
Line1.x
,Java 不会沿着对象图追踪来传输包装x
的Line1
实例。因此,工作节点将无法获取x
。我们修改了代码生成逻辑,以直接引用每个行对象的实例。
图 6 展示了在我们修改之后,解释器是如何将用户输入的一组代码行转换为 Java 对象的。
我们发现 Spark 解释器在处理作为研究一部分获取的大型跟踪数据以及探索存储在 HDFS 中的数据集时很有用。我们还计划使用它来交互式运行更高级别的查询语言,例如 SQL。
5.3 内存管理
Spark 为持久化 RDD 的存储提供了三种选择:以反序列化的 Java 对象形式存储在内存中、以序列化数据形式存储在内存中,以及存储在磁盘上。第一种选择性能最快,因为 Java 虚拟机可以原生访问每个 RDD 元素。第二种选择在空间有限时,允许用户选择一种比 Java 对象图更节省内存的表示形式,但代价是性能较低 。第三种选择适用于那些太大而无法全部保留在内存中,但每次使用时重新计算成本又很高的 RDD。
成本取决于应用程序每字节数据所执行的计算量,对于轻量级处理,成本可能高达两倍。
为了管理有限的可用内存,我们在 RDD 层面采用最近最少使用(LRU)的逐出策略。当计算出一个新的 RDD 分区,但没有足够空间存储它时,我们会从最近最少访问的 RDD 中逐出一个分区,除非这个 RDD 与包含新分区的 RDD 是同一个。在这种情况下,我们将旧分区保留在内存中,以防止同一 RDD 的分区不断进出内存。这一点很重要,因为大多数操作会对整个 RDD 运行任务,所以内存中已有的分区很可能在未来还会被用到。到目前为止,我们发现这个默认策略在所有应用中都运行良好,但我们也通过为每个 RDD 设置 “持久化优先级”,让用户有更多控制权。
最后,目前集群上的每个 Spark 实例都有自己独立的内存空间。在未来的工作中,我们计划研究如何通过统一的内存管理器在不同 Spark 实例间共享 RDD。
5.4 对检查点的支持
尽管血统信息始终可用于在故障后恢复 RDD,但对于具有长血统链的 RDD 而言,这种恢复可能很耗时。因此,将某些 RDD 检查点到稳定存储中会很有帮助。
一般来说,对于具有包含宽依赖关系的长血统图的 RDD,检查点很有用,例如我们 PageRank 示例(§3.2.2)中的排名数据集。在这些情况下,集群中的节点故障可能会导致每个父 RDD 中某些数据片段丢失,从而需要完全重新计算 [20]。相比之下,对于那些对稳定存储中的数据具有窄依赖关系的 RDD,例如我们逻辑回归示例(§3.2.1)中的点以及 PageRank 中的链接列表,检查点可能永远都不值得。如果节点发生故障,这些 RDD 中丢失的分区可以在其他节点上并行重新计算,成本仅为复制整个 RDD 的一小部分。
Spark 目前提供了一个用于检查点的 API(persist
的REPLICATE
标志),但将哪些数据进行检查点的决策留给了用户。不过,我们也在研究如何执行自动检查点。由于我们的调度器知道每个数据集的大小以及首次计算它所花费的时间,它应该能够选择一组最优的 RDD 进行检查点,以最小化系统恢复时间 [30]。
最后,请注意,RDD 的只读性质使其比一般的共享内存更容易进行检查点操作。因为无需担心一致性问题,RDD 可以在后台写出,而无需程序暂停或使用分布式快照方案。
6 评估
我们通过在亚马逊 EC2 上进行的一系列实验以及对用户应用程序的基准测试,对 Spark 和 RDD 进行了评估。总体而言,我们的结果表明:
- 在迭代式机器学习和图应用程序中,Spark 的性能比 Hadoop 高出 20 倍之多。这种加速源于将数据以 Java 对象的形式存储在内存中,从而避免了 I/O 和反序列化成本。
- 我们用户编写的应用程序性能良好且可扩展性强。特别是,我们使用 Spark 将一个原本在 Hadoop 上运行的分析报告的速度提高了 40 倍。
- 当节点发生故障时,Spark 只需重建丢失的 RDD 分区,就能快速恢复。
- Spark 可用于交互式查询 1TB 的数据集,延迟在 5 - 7 秒之间。
我们首先展示针对 Hadoop 的迭代式机器学习应用程序(§6.1)和 PageRank(§6.2)的基准测试结果。然后,我们评估 Spark 中的故障恢复(§6.3)以及数据集无法完全装入内存时的表现(§6.4)。最后,我们讨论用户应用程序(§6.5)和交互式数据挖掘(§6.6)的结果。
除非另有说明,我们的测试使用的是配备 4 个核心和 15GB 内存的 m1.xlarge EC2 节点。我们使用 HDFS 进行存储,数据块大小为 256MB。在每次测试前,我们会清除操作系统的缓冲区缓存,以准确测量 I/O 成本。
6.1 迭代式机器学习应用程序
我们实现了两个迭代式机器学习应用程序:逻辑回归和 K 均值聚类,以比较以下系统的性能:
- Hadoop:Hadoop 0.20.2 稳定版本。
- HadoopBinMem:一种 Hadoop 部署方式,它在第一次迭代中将输入数据转换为低开销的二进制格式,以在后续迭代中消除文本解析,并将其存储在内存中的 HDFS 实例中。
- Spark:我们基于 RDD 的实现。
我们在 100GB 数据集上使用 25 - 100 台机器对这两种算法运行了 10 次迭代。这两个应用程序的关键区别在于它们每字节数据执行的计算量。K 均值聚类的迭代时间主要由计算主导,而逻辑回归的计算强度较低,因此对反序列化和 I/O 所花费的时间更为敏感。
由于典型的学习算法通常需要数十次迭代才能收敛,我们分别报告第一次迭代和后续迭代的时间。我们发现通过 RDD 共享数据大大加快了后续迭代的速度。
第一次迭代:所有三个系统在第一次迭代中都从 HDFS 读取文本输入。如图 7 中的浅色条所示,在各项实验中,Spark 的速度略快于 Hadoop。这种差异是由于 Hadoop 主节点和工作节点之间的心跳协议存在信号开销。HadoopBinMem 是最慢的,因为它运行了一个额外的 MapReduce 作业将数据转换为二进制格式,并且必须通过网络将这些数据写入到一个复制的内存 HDFS 实例中。
后续迭代:图 7 还展示了后续迭代的平均运行时间,而图 8 展示了这些时间如何随集群规模缩放。对于逻辑回归,在 100 台机器上,Spark 分别比 Hadoop 和 HadoopBinMem 快 25.3 倍和 20.7 倍。对于计算强度更高的 K 均值聚类应用程序,Spark 仍实现了 1.9 倍到 3.2 倍的加速。
图 7:在一个由 100 个节点组成的集群上,使用 100GB 数据运行逻辑回归和 K 均值算法时,Hadoop、HadoopBinMem 和 Spark 中首次迭代及后续迭代的时长。
图 8:Hadoop、HadoopBinMem 和 Spark 中首次迭代之后各次迭代的运行时间。所有作业均处理 100GB 数据
理解加速原因:我们惊讶地发现,Spark 甚至比将二进制数据存储在内存中的 Hadoop(HadoopBinMem)性能高出 20 倍。在 HadoopBinMem 中,我们使用了 Hadoop 的标准二进制格式(SequenceFile)和 256MB 的大数据块大小,并且我们强制将 HDFS 的数据目录设置在内存文件系统上。然而,由于以下几个因素,Hadoop 仍然运行较慢:
- Hadoop 软件栈的最小开销。
- HDFS 提供数据时的开销。
- 将二进制记录转换为可用的内存 Java 对象的反序列化成本。
我们依次研究了这些因素中的每一个。为了测量(1),我们运行了无操作的 Hadoop 作业,发现这些作业至少需要 25 秒的开销来完成作业设置、启动任务和清理的最低要求。关于(2),我们发现 HDFS 为提供每个数据块执行了多次内存复制和一次校验和操作。最后,为了测量(3),我们在单台机器上运行微基准测试,以各种格式对 256MB 输入数据运行逻辑回归计算。特别是,我们比较了处理来自 HDFS(HDFS 栈中的开销会体现出来)的文本和二进制输入以及来自内存本地文件(内核可以非常高效地将数据传递给程序)的时间。我们在图 9 中展示了这些测试的结果。内存 HDFS 和本地文件之间的差异表明,即使数据在本地机器的内存中,通过 HDFS 读取也会引入 2 秒的开销。文本和二进制输入之间的差异表明解析开销为 7 秒。最后,即使从内存文件读取,将预先解析的二进制数据转换为 Java 对象也需要 3 秒,这几乎与逻辑回归本身的成本一样高。通过将 RDD 元素直接作为 Java 对象存储在内存中,Spark 避免了所有这些开销。
图 9:在单台机器上使用 256MB 数据进行逻辑回归时,不同输入源的迭代时间。
6.2 PageRank
我们使用一个 54GB 的维基百科转储数据,对比了 Spark 和 Hadoop 在 PageRank 算法上的性能。我们运行了 10 次 PageRank 算法迭代,来处理一个包含约 400 万篇文章的链接图。图 10 表明,仅靠内存存储,在 30 个节点上,Spark 相较于 Hadoop 就实现了 2.4 倍的加速。此外,如 3.2.2 节所讨论的,通过控制 RDD 的分区,使其在各次迭代中保持一致,加速比提升到了 7.4 倍。该结果在扩展到 60 个节点时,几乎呈线性提升。
我们还评估了一个基于我们在 Spark 上实现的 Pregel 编写的 PageRank 版本,具体内容在 7.1 节介绍。其迭代时间与图 10 中的相近,但每次迭代大约多耗时 4 秒,因为 Pregel 在每次迭代中会运行一个额外操作,让顶点 “投票” 决定是否结束任务。
图 10:Hadoop 和 Spark 上 PageRank 算法的性能表现。
6.3 故障恢复
我们评估了 K 均值聚类应用程序中节点发生故障后,使用血统信息重建 RDD 分区的成本。图 11 对比了在 75 个节点的集群上,K 均值聚类算法正常运行 10 次迭代的运行时间,以及在第 6 次迭代开始时一个节点发生故障情况下的运行时间。在无故障情况下,每次迭代由处理 100GB 数据的 400 个任务组成。
图 11:存在故障时 K 均值算法的迭代时间。在第 6 次迭代开始时,一台机器停止运行,导致使用血统关系对一个 RDD 进行部分重建。
在第 5 次迭代结束前,每次迭代时间约为 58 秒。在第 6 次迭代中,其中一台机器停止运行,导致在该机器上运行的任务以及存储在那里的 RDD 分区丢失。Spark 在其他机器上并行重新运行这些任务,这些任务重新读取相应的输入数据,并通过血统信息重建 RDD,这使得迭代时间增加到 80 秒。一旦丢失的 RDD 分区重建完成,迭代时间又回到了 58 秒。
请注意,使用基于检查点的故障恢复机制时,根据检查点的频率,恢复可能至少需要重新运行几次迭代。此外,系统需要通过网络复制应用程序 100GB 的工作数据集(从文本输入数据转换为二进制后的数据),并且要么消耗 Spark 两倍的内存将其复制到内存中,要么就必须等待将 100GB 数据写入磁盘。相比之下,我们示例中 RDD 的血统图大小均小于 10KB。
6.4 内存不足时的表现
到目前为止,我们确保了集群中的每台机器都有足够的内存来在迭代过程中存储所有的 RDD。一个很自然的问题是,如果没有足够的内存来存储作业数据,Spark 会如何运行。在这个实验中,我们将 Spark 配置为在每台机器上使用不超过一定百分比的内存来存储 RDD。图 12 展示了逻辑回归在不同存储空间下的结果。我们可以看到,随着存储空间的减少,性能会逐渐下降。
图 12:在 25 台机器上使用 100GB 数据进行逻辑回归时,内存中数据量不同情况下的性能表现
6.5 使用 Spark 构建的用户应用程序
内存分析:视频分发公司 Conviva Inc 使用 Spark 加速了一些之前在 Hadoop 上运行的数据统计报告。例如,有一份报告以一系列 Hive [1] 查询的形式运行,为一位客户计算各种统计数据。这些查询都作用于相同的数据子集(与客户提供的过滤器匹配的记录),但会针对不同的分组字段执行聚合操作(平均值、百分位数和去重计数),这需要单独的 MapReduce 作业。通过在 Spark 中实现这些查询,并将这些查询间共享的数据子集一次性加载到一个 RDD 中,该公司将报告速度提高了 40 倍。一份处理 200GB 压缩数据的报告,在 Hadoop 集群上需要 20 小时,现在仅用两台 Spark 机器,30 分钟就能完成。此外,Spark 程序仅需 96GB 内存,因为它只在 RDD 中存储与客户过滤器匹配的行和列,而非整个解压后的文件。
交通建模:伯克利分校移动千禧年项目的研究人员对一种从零星的汽车 GPS 测量数据推断道路交通拥堵情况的学习算法进行了并行化处理。源数据包括一个大城市区域的 10,000 条道路网络,以及 600,000 个配备 GPS 的汽车的点对点行程时间样本(每个路径的行程时间可能涉及多个路段)。利用一个交通模型,该系统可以估算通过各个路段所需的时间。研究人员使用期望最大化(EM)算法训练这个模型,该算法迭代重复两个 map 和 reduceByKey 步骤。如图 13(a)所示,该应用程序从 20 个节点扩展到 80 个节点(每个节点 4 个核心)时,几乎呈线性扩展。
Twitter 垃圾邮件分类:伯克利分校的 Monarch 项目 [29] 使用 Spark 识别 Twitter 消息中的链接垃圾邮件。他们在 Spark 之上实现了一个类似于 6.1 节示例的逻辑回归分类器,但他们使用分布式 reduceByKey 并行求和梯度向量。在图 13(b)中,我们展示了在 50GB 数据子集上训练分类器的扩展结果:250,000 个 URL 以及与每个 URL 页面的网络和内容属性相关的 107 个特征 / 维度。由于每次迭代的固定通信成本较高,扩展并非严格线性。
图 13:使用 Spark 实现的两个用户应用程序每次迭代的运行时间。误差线表示标准差。
6.6 交互式数据挖掘
为了展示 Spark 交互式查询大型数据集的能力,我们使用它来分析 1TB 的维基百科页面浏览日志(两年的数据)。在这个实验中,我们使用了 100 个 m2.4xlarge 的 EC2 实例,每个实例有 8 个核心和 68GB 内存。我们运行查询以找出以下页面的总浏览量:(1)所有页面;(2)标题与给定单词完全匹配的页面;(3)标题与某个单词部分匹配的页面。每个查询都会扫描整个输入数据。
图 14 展示了在完整数据集、半量数据集以及十分之一量数据集上查询的响应时间。即便处理 1TB 的数据,Spark 上的查询也只需 5 - 7 秒。这比处理磁盘上的数据快了一个数量级以上;例如,从磁盘查询 1TB 的文件需要 170 秒。这表明 RDD 使 Spark 成为交互式数据挖掘的强大工具。
图 14:在 100 台机器上,随着扫描的输入数据集规模不断增大,Spark 上交互式查询的响应时间。
7 讨论
尽管由于 RDD 的不可变特性和粗粒度转换,其编程接口看似有限,但我们发现它们适用于广泛的应用场景。特别是,RDD 能够表示数量惊人的集群编程模型,而这些模型此前通常作为独立的框架被提出。这使得用户可以在一个程序中组合使用这些模型(例如,先运行 MapReduce 操作构建图,然后在图上运行 Pregel),并在它们之间共享数据。在本节中,我们将讨论 RDD 能够表示哪些编程模型以及它们为何具有如此广泛的适用性(§7.1)。此外,我们还将探讨 RDD 中血统信息的另一个优势,即有助于跨这些模型进行调试(§7.2)。
7.1 表达现有编程模型
RDD 能够高效地表达多种到目前为止被独立提出的集群编程模型。这里所说的 “高效”,不仅意味着 RDD 可用于产生与用这些模型编写的程序相同的输出,还意味着 RDD 能够体现这些框架所执行的优化,比如将特定数据保留在内存中、对数据进行分区以最小化通信开销,以及有效地从故障中恢复。可用 RDD 表达的模型包括:
- MapReduce:此模型可以通过 Spark 中的
flatMap
和groupByKey
操作来表达,如果存在合并器(combiner),也可以使用reduceByKey
。 - DryadLINQ:DryadLINQ 系统在更通用的 Dryad 运行时之上,提供了比 MapReduce 更广泛的操作符,但这些都是批量操作符,直接对应于 Spark 中可用的 RDD 转换操作(如
map
、groupByKey
、join
等)。 - SQL:与 DryadLINQ 表达式类似,SQL 查询对记录集执行数据并行操作。
- Pregel:谷歌的 Pregel [22] 是一种专门用于迭代图应用的模型,乍一看与其他系统中面向集合的编程模型大不相同。在 Pregel 中,程序作为一系列协调的 “超级步” 运行。在每个超级步中,图中的每个顶点运行一个用户函数,该函数可以更新与顶点相关联的状态、更改图拓扑结构,并向其他顶点发送消息以供下一个超级步使用。这个模型可以表达许多图算法,包括最短路径、二分匹配和 PageRank。
我们能够用 RDD 实现这个模型的关键在于,Pregel 在每次迭代中对所有顶点应用相同的用户函数。因此,我们可以将每次迭代的顶点状态存储在一个 RDD 中,并执行批量转换(flatMap
)来应用此函数并生成一个消息 RDD。然后,我们可以将这个 RDD 与顶点状态进行连接,以执行消息交换。同样重要的是,RDD 允许我们像 Pregel 那样将顶点状态保留在内存中,通过控制分区来最小化通信,并支持故障时的部分恢复。我们已经在 Spark 之上将 Pregel 实现为一个 200 行代码的库,更多细节请读者参考 [33]。 - 迭代 MapReduce:包括 HaLoop [7] 和 Twister [11] 在内的几个最近提出的系统,提供了一种迭代 MapReduce 模型,用户可以让系统循环执行一系列 MapReduce 作业。这些系统在迭代过程中保持数据分区一致,Twister 还可以将数据保留在内存中。这两种优化都很容易用 RDD 来表达,我们能够使用 Spark 将 HaLoop 实现为一个 200 行代码的库。
- 批量流处理:研究人员最近提出了几种用于定期使用新数据更新结果的应用程序的增量处理系统 [21, 15, 4]。例如,一个每 15 分钟更新一次广告点击统计信息的应用程序,应该能够将前一个 15 分钟窗口的中间状态与新日志中的数据结合起来。这些系统执行类似于 Dryad 的批量操作,但将应用程序状态存储在分布式文件系统中。将中间状态置于 RDD 中会加快它们的处理速度。
解释 RDD 的表达能力
为什么 RDD 能够表达这些不同的编程模型呢?原因是对 RDD 的限制在许多并行应用中影响不大。特别是,尽管 RDD 只能通过批量转换来创建,但许多并行程序自然会对许多记录应用相同的操作,这使得它们很容易表达。同样,RDD 的不可变性也不是障碍,因为可以创建多个 RDD 来表示同一数据集的不同版本。实际上,如今许多 MapReduce 应用都是在不允许更新文件的文件系统(如 HDFS)上运行的。
最后一个问题是,为什么以前的框架没有提供同样程度的通用性。我们认为这是因为这些系统探索的是 MapReduce 和 Dryad 处理得不好的特定问题,比如迭代,却没有注意到这些问题的共同原因是缺乏数据共享抽象。
7.2 利用 RDD 进行调试
虽然我们最初设计 RDD 是为了实现故障容错的确定性重新计算,但这一特性也有助于调试。具体来说,通过记录作业执行过程中创建的 RDD 的血统信息,我们能够:(1)在之后重建这些 RDD,并让用户以交互方式查询它们;(2)通过重新计算作业中某个任务所依赖的 RDD 分区,在单进程调试器中重新运行该任务。与传统的针对通用分布式系统的重放调试器 [13] 不同,后者必须捕获或推断多个节点间的事件顺序,而这种方法几乎不增加记录开销,因为只需要记录 RDD 的血统图。我们目前正在基于这些思路开发一个 Spark 调试器 [33]。
与这些系统不同,基于 RDD 的调试器不会重放用户函数中的非确定性行为(例如,非确定性的 map 操作),但它至少可以通过对数据进行校验和来报告这种行为。
8 相关工作
集群编程模型:集群编程模型的相关工作可分为几类。首先,诸如 MapReduce [10]、Dryad [19] 和 Ciel [23] 等数据流模型支持丰富的操作符来处理数据,但通过稳定存储系统来共享数据。RDD 代表了一种比稳定存储更高效的数据共享抽象,因为它们避免了数据复制、I/O 和序列化的成本。
请注意,像在 RAMCloud [25] 这样的内存数据存储上运行 MapReduce/Dryad,仍然需要数据复制和序列化,正如 6.1 节所示,这对某些应用程序来说成本可能很高。
其次,一些针对数据流系统的高级编程接口,包括 DryadLINQ [31] 和 FlumeJava [8],提供了语言集成的 API,用户可通过诸如 map 和 join 等操作符来操作 “并行集合”。然而,在这些系统中,并行集合要么代表磁盘上的文件,要么代表用于表达查询计划的临时数据集。尽管这些系统会在同一查询的不同操作符之间(例如,一个 map 操作后接另一个 map 操作)对数据进行流水线处理,但它们无法在不同查询之间高效地共享数据。我们基于并行集合模型设计 Spark 的 API,是因为其便利性,并且并不宣称语言集成接口具有新颖性,但通过提供 RDD 作为该接口背后的存储抽象,我们使其能够支持更广泛的应用类别。
第三类系统为需要数据共享的特定应用类别提供高级接口。例如,Pregel [22] 支持迭代图应用,而 Twister [11] 和 HaLoop [7] 是迭代 MapReduce 运行时。然而,这些框架针对它们所支持的计算模式隐式地进行数据共享,并未提供一个通用的抽象,使用户能够按照自己的选择在不同操作之间共享自己选定的数据。例如,用户无法使用 Pregel 或 Twister 将数据集加载到内存中,然后决定在其上运行什么查询。RDD 明确地提供了一种分布式存储抽象,因此能够支持这些专用系统无法涵盖的应用,比如交互式数据挖掘。
最后,一些系统公开共享的可变状态,以使用户能够进行内存计算。例如,Piccolo [27] 允许用户运行并行函数,对分布式哈希表中的单元格进行读取和更新。分布式共享内存(DSM)系统 [24] 以及像 RAMCloud [25] 这样的键值存储提供了类似的模型。RDD 在两个方面与这些系统不同。首先,RDD 基于 map、sort 和 join 等操作符提供了更高级的编程接口,而 Piccolo 和 DSM 中的接口仅仅是对表单元格的读取和更新。其次,Piccolo 和 DSM 系统通过检查点和回滚来实现恢复,在许多应用中,这比 RDD 基于血统的策略成本更高。最后,如 2.3 节所讨论的,RDD 相较于 DSM 还有其他优势,比如减轻拖后腿任务的影响。
缓存系统:Nectar [12] 可以通过程序分析识别公共子表达式,从而在 DryadLINQ 作业之间重用中间结果 [16]。将这种能力添加到基于 RDD 的系统中会很有吸引力。然而,Nectar 不提供内存缓存(它将数据存储在分布式文件系统中),也不允许用户明确控制哪些数据集要持久化以及如何对它们进行分区。Ciel [23] 和 FlumeJava [8] 同样可以缓存任务结果,但不提供内存缓存或对缓存哪些数据的明确控制。Ananthanarayanan 等人提出在分布式文件系统中添加内存缓存,以利用数据访问的时间和空间局部性 [3]。虽然这种解决方案能更快地访问已在文件系统中的数据,但在应用程序内共享中间结果方面,它不如 RDD 高效,因为它仍要求应用程序在不同阶段之间将这些结果写入文件系统。
血统信息:捕获数据的血统或来源信息长期以来一直是科学计算和数据库领域的研究课题,用于诸如解释结果、让他人能够重现结果,以及在工作流中发现错误或数据集丢失时重新计算数据等应用场景。关于这方面工作的综述,读者可参考 [5] 和 [9]。RDD 提供了一种并行编程模型,在其中捕获细粒度的血统信息成本较低,因此可用于故障恢复。我们基于血统的恢复机制也与 MapReduce 和 Dryad 中计算(作业)内部使用的恢复机制类似,后者跟踪任务有向无环图(DAG)中的依赖关系。然而,在这些系统中,作业结束后血统信息就丢失了,需要使用复制存储系统在不同计算之间共享数据。相比之下,RDD 将血统信息应用于在不同计算之间高效地持久化内存数据,而无需复制和磁盘 I/O 成本。
关系数据库:RDD 在概念上类似于数据库中的视图,而持久化的 RDD 类似于物化视图 [28]。然而,与 DSM 系统一样,数据库通常允许对所有记录进行细粒度的读写访问,这就需要记录操作和数据以实现容错,并为维护一致性增加额外开销。RDD 的粗粒度转换模型则不需要这些开销。
9 结论
我们介绍了弹性分布式数据集(RDD),这是一种高效、通用且具备容错能力的抽象,用于在集群应用中共享数据。RDD 能够表达广泛的并行应用,包括许多为迭代计算而提出的专用编程模型,以及这些模型未能涵盖的新应用。与现有的集群存储抽象不同,后者为实现容错需要进行数据复制,而 RDD 提供了基于粗粒度转换的 API,使其能够利用血统信息高效地恢复数据。我们已在名为 Spark 的系统中实现了 RDD,在迭代应用中,Spark 的性能比 Hadoop 高出 20 倍之多,并且可用于交互式查询数百 GB 的数据。