引入
通过前面的学习,我们对于Spark已经有一个基本的认识,并且搭建了一个本地的练习环境,因为本专栏的主要对象是数仓和数分,所以就不花大篇幅去写环境搭建等内容,当然,如果感兴趣的小伙伴可以留言,如果人多的话,我也可以在后面加餐篇里面补充企业目前常见的架构环境搭建,比如Spark on yarn以及Spark on k8s等。
本文将进入Spark 核心数据结构:RDD(弹性分布式数据集 )的学习,如果未阅读论文篇的小伙伴,推荐先阅读以后,再看本文哈,相关链路如下:
RDD 的核心概念
RDD 是 Spark 最核心的数据结构,RDD(Resilient Distributed Dataset)全称为弹性分布式数据集,是 Spark 对分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体,它实质上是一组分布式的 JVM 不可变对象集合,不可变决定了它是只读的,所以 RDD 在经过变换产生新的 RDD 时,(如下图中 A-B),原有 RDD 不会改变。
在论文总结里,我们有提到这个弹性体现在两个方面:
- 第一个是数据存储上。 数据不再是存放在硬盘上的,而是可以缓存在内存中。只有当内存不足的时候,才会把它们换出到硬盘上。同时,数据的持久化,也支持硬盘、序列化后的内存存储,以及反序列化后Java对象的内存存储三种形式。虽然需要占用更多的内存,但是计算速度会更快。
- 第二个是选择把什么数据输出到硬盘上。 Spark会根据数据计算的DAG Lineage,来判断某一个RDD对于前置数据是宽依赖,还是窄依赖的。如果是宽依赖的,意味着一个节点的故障,可能会导致大量的数据要进行重新计算,乃至数据网络传输的需求。那么,它就会把数据计算的中间结果存储到硬盘上。
如下就是第二点提到的DAG图,通过它可以大概反推出计算逻辑:A 和 C 都是两张表,在分别进行分组聚合和筛选的操后, 做了一次 join 操作。在图中深色的方框就是我们所说的分区(partition),它和计算任务是一一对应的,也就是说,有多少个分区,就有多少个计算任务,显然一个作业会有多个计算任务,这也是分布式计算的意义所在,我们可以通过设置分区数量来控制每个计算任务的计算量。
在 DAG 中,每个计算任务的输入就是一个分区,一些相关的计算任务所构成的任务集合可以被看成一个 Stage。RDD 则是分区的集合(图中 A、B、C、D、E),在面对出错情况(例如任意一台节点宕机)时,Spark 能通过 RDD 之间的依赖关系恢复任意出错的 RDD,例如最终输出节点故障,我们无需重跑所有任务,只用通过 B 和 E 就能算出最后的 RDD,这就是弹性的好处。
下面我们通过源码去进一步了解RDD。
深入RDD
还是老样子,我们先看看RDD的源码注释:
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map, filter, and persist. In addition, PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join; DoubleRDDFunctions contains operations available only on RDDs of Doubles; and SequenceFileRDDFunctions contains operations available on RDDs that can be saved as SequenceFiles. All operations are automatically available on any RDD of the right type (e. g. RDD[(Int, Int)]) through implicit.
Internally, each RDD is characterized by five main properties:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e. g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e. g. block locations for an HDFS file)
All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e. g. for reading data from a new storage system) by overriding these functions. Please refer to the Spark paper for more details on RDD internals.翻译:
弹性分布式数据集(RDD)是 Spark 中的基本抽象。它代表一个不可变的、分区的元素集合,可以并行操作。这个类包含所有 RDD 都具备的基本操作,如映射(map)、过滤(filter)和持久化(persist)。此外,PairRDDFunctions 包含仅适用于键值对 RDD 的操作,如按键分组(groupByKey)和连接(join);DoubleRDDFunctions 包含仅适用于 Double 类型 RDD 的操作;SequenceFileRDDFunctions 包含适用于可保存为 SequenceFiles 的 RDD 的操作。通过隐式转换,所有操作会自动适用于任何合适类型的 RDD(例如 RDD [(Int, Int)])。
在内部,每个 RDD 由五个主要属性来描述:
- 分区的集合;
- 用于计算每个分片的函数(算子);
- 对其他 RDD 的依赖关系集合;
- 可选的,键值型 RDD 的分区器(例如,表明该 RDD 是哈希分区的);
- 可选的,计算每个分片的首选位置集合(例如,HDFS 文件的块位置)。
Spark 中的所有调度和执行都是基于这些方法进行的,这使得每个 RDD 能够实现自己的计算方式。实际上,用户可以通过重写这些函数来实现自定义的 RDD(例如,从新的存储系统读取数据)。有关 RDD 内部机制的更多详细信息,请参考 Spark 论文 。
可以看到里面提到了RDD的5个核心特性,我们对应梳理理解如下:
- 每个RDD有多个分区组成(分区列表/集合)
- 每个rdd都有作用于每个分区的函数/算子(计算函数/算子)
- 每个RDD都有与上下游有依赖的RDD, RDD之间是有依赖关系的(依赖关系)
- 针对key-value的 RDD 默认是Hash分区,也可以指定其它分区器(k-v分区器)
- 每个RDD的分区,其实都有副本的概念,到底选用那个副本来计算,给每个分区都计算出来了一个首选位置,这个选择主要是移动计算而不是存储的思路(副本位置)
下面我们从源码实现里面去看看:
/**
* 存储当前RDD的分区数组。该变量是volatile的,以确保多线程环境下的可见性;
* 同时是transient的,意味着在序列化时会被忽略。
* 初始值为null,在第一次调用`getPartitions`方法时会被赋值。
*/
@volatile @transient private var partitions_ : Array[Partition] = _
/**
* 存储当前RDD依赖的其他RDD的依赖关系序列。
* 每个依赖关系描述了当前RDD如何依赖于其他RDD,
* 这些依赖关系对于Spark的任务调度和数据处理流程至关重要。
* 同样,该变量被标记为`@transient`,因为依赖关系可能包含不可序列化的对象。
*/
@transient private var deps: Seq[Dependency[_]]
/**
* 可选的分区器,用于指定RDD的分区方式。
* 如果此值为 None,则表示该RDD没有指定分区器,可能会使用默认的分区方式。
* transient 关键字表示该字段不会被序列化,这在分布式计算中很常见,
* 因为分区器通常在运行时动态确定,不需要在不同节点之间进行序列化和传输。
*/
@transient val partitioner: Option[Partitioner] = None
/**
* 由子类实现,用于返回此RDD的分区集合。
* 该方法只会被调用一次,因此可以在其中实现耗时的计算逻辑。
* 返回的分区数组必须满足以下属性:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*
* @return 此RDD的分区数组
*/
protected def getPartitions: Array[Partition]
/**
* 由子类实现,用于返回此RDD如何依赖于父RDD。
* 该方法只会被调用一次,因此可以在其中实现耗时的计算逻辑。
*
* @return 此RDD对父RDD的依赖关系序列
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* 可选地由子类重写,用于指定每个分区的首选计算位置。
* 例如,对于从HDFS文件读取的RDD,首选位置可能是HDFS块所在的节点。
* 默认情况下,返回一个空的序列,表示没有首选位置。
*
* @param split 需要计算的分区。
* @return 分区的首选位置列表,如果没有则返回空序列。
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
通过源码我们能更直观的理解前面的特性,尤其是上面代码中的@volatile @transient private var partitions_ : Array[Partition] = _它说明了一个重要的问题,RDD 是分区的集合,本质上还是一个集合,所以在理解时,可以用分区之类的概念去理解,但是在使用时,就可以忘记这些,把其当做是一个普通的集合去使用就好了。
除此之外,在Spark论文中有提到RDD的编程模型中,一共有两类操作,Transformations 类转换操作是定义新 RDD 的惰性操作,而Actions 类行动操作则启动计算,将值返回给程序或将数据写入外部存储。
在这样的编程模型下,Spark 在运行时的计算被划分为两个环节:
- 基于不用数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph);
- 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图。
换句话说,开发者调用的各类 Transformations 算子,并不立即执行计算,当且仅当开发者调用 Actions 算子时,之前调用的转换算子才会付诸执行。在业内,这样的计算模式有个专门的术语,叫作“延迟计算”(Lazy Evaluation)。
总结
我们重点讲解了 RDD的核心概念,RDD 是 Spark 对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体。对于 RDD,你要重点掌握它的 5 大特性/属性:
- 分区列表/集合
- 计算函数/算子
- 依赖关系
- k-v分区器
- 副本位置
把评论区回复的贴上来,方便小伙伴们阅读,有其他疑问也可以私信或者评论告诉我:
- 一组分片(Partition),本质就是RDD数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。每个分配的存储是由BlockManager实现的。每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。
- 一个用于每个分区计算的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
- RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
- RDD的分区函数。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
- 一个存储存取每个Partition的优先位置(preferred location)的列表。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
深入理解 RDD 之后,你需要熟悉 RDD 的编程模型。在 RDD 的编程模型中,开发者需要
使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算
子,将计算结果收集起来、或是物化到磁盘。
而延迟计算指的是,开发者调用的各类 Transformations 算子,并不会立即执行计算,当
且仅当开发者调用 Actions 算子时,之前调用的转换算子才会付诸执行。
如下是Spark3.5.4的RDD源码,并配置了相关注释,给小伙伴们留了个作业,请结合以下源码和官网,梳理归类 Transformations 算子和Actions 算子:
/**
* 抽象类 RDD 表示弹性分布式数据集,是 Spark 中的基本抽象。
* 它代表一个不可变的、分区的元素集合,可以并行操作。
*
* @tparam T 此 RDD 中元素的类型
* @param _sc Spark 上下文,用于与 Spark 集群交互
* @param deps 此 RDD 依赖的其他 RDD 的依赖关系序列
*/
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
// 检查元素类型是否为 RDD,如果是则发出警告,因为 Spark 不支持嵌套 RDD
if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
// 这是一个警告而不是异常,以避免破坏可能定义了嵌套 RDD 但未运行作业的用户程序。
logWarning("Spark does not support nested RDDs (see SPARK-5063)")
}
/**
* 获取 Spark 上下文。
* 如果上下文为空,则抛出异常。
*
* @return Spark 上下文
*/
private def sc: SparkContext = {
if (_sc == null) {
throw SparkCoreErrors.rddLacksSparkContextError()
}
_sc
}
/**
* 构造一个仅依赖于一个父 RDD 的 RDD,使用一对一依赖关系。
*
* @param oneParent 父 RDD
*/
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
/**
* 获取 Spark 配置。
*
* @return Spark 配置
*/
private[spark] def conf = sc.conf
// =======================================================================
// 应由 RDD 子类实现的方法
// =======================================================================
/**
* :: DeveloperApi ::
* 由子类实现,用于计算给定分区的数据。
*
* @param split 要计算的分区
* @param context 任务上下文,包含任务执行的相关信息
* @return 分区元素的迭代器
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* 由子类实现,返回此 RDD 的分区集合。
* 此方法仅会被调用一次,因此可以在其中执行耗时的计算。
*
* 分区数组必须满足以下属性:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*
* @return 分区数组
*/
protected def getPartitions: Array[Partition]
/**
* 由子类实现,返回此 RDD 对父 RDD 的依赖关系。
* 此方法仅会被调用一次,因此可以在其中执行耗时的计算。
*
* @return 依赖关系序列
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* 可选地由子类重写,以指定每个分区的首选计算位置。
*
* @param split 分区
* @return 首选位置序列
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/**
* 可选地由子类重写,以指定 RDD 的分区方式。
*/
@transient val partitioner: Option[Partitioner] = None
// =======================================================================
// 所有 RDD 可用的方法和字段
// =======================================================================
/**
* 获取创建此 RDD 的 Spark 上下文。
*
* @return Spark 上下文
*/
def sparkContext: SparkContext = sc
/**
* 获取此 RDD 的唯一 ID(在其 Spark 上下文中)。
*/
val id: Int = sc.newRddId()
/**
* 此 RDD 的友好名称。
*/
@transient var name: String = _
/**
* 为 RDD 分配一个名称。
*
* @param _name 要分配的名称
* @return 此 RDD 实例
*/
def setName(_name: String): this.type = {
name = _name
this
}
/**
* 使用指定的存储级别标记此 RDD 进行持久化。
*
* @param newLevel 目标存储级别
* @param allowOverride 是否允许用新级别覆盖现有级别
* @return 此 RDD 实例
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: 处理存储级别的更改
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw SparkCoreErrors.cannotChangeStorageLevelError()
}
// 如果这是第一次标记此 RDD 进行持久化,则向 Spark 上下文注册以进行清理和统计。只执行一次。
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
/**
* 设置此 RDD 的存储级别,以便在第一次计算后跨操作持久化其值。
* 仅当 RDD 尚未设置存储级别时,才能使用此方法分配新的存储级别。本地检查点是一个例外。
*
* @param newLevel 目标存储级别
* @return 此 RDD 实例
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// 这意味着用户之前调用了 localCheckpoint(),这应该已经标记此 RDD 进行持久化。
// 这里我们应该用用户明确请求的级别覆盖旧的存储级别(在将其调整为使用磁盘后)。
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/**
* 使用默认存储级别(`MEMORY_ONLY`)持久化此 RDD。
*
* @return 此 RDD 实例
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* 使用默认存储级别(`MEMORY_ONLY`)持久化此 RDD。
*
* @return 此 RDD 实例
*/
def cache(): this.type = persist()
/**
* 标记此 RDD 为非持久化,并从内存和磁盘中移除其所有块。
*
* @param blocking 是否阻塞直到所有块被删除(默认:false)
* @return 此 RDD 实例
*/
def unpersist(blocking: Boolean = false): this.type = {
logInfo(s"Removing RDD $id from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}
/**
* 获取此 RDD 的当前存储级别,如果未设置则返回 StorageLevel.NONE。
*
* @return 存储级别
*/
def getStorageLevel: StorageLevel = storageLevel
/**
* 用于锁定此 RDD 的所有可变状态(持久化、分区、依赖关系等)。
* 不使用 `this` 是因为 RDD 是用户可见的,用户可能已经在 RDD 上添加了自己的锁;
* 共享锁可能会导致死锁。
*
* 一个线程可能会在一系列 RDD 依赖关系中持有多个这样的锁。
* 如果在持有 stateLock 的同时尝试锁定另一个资源,并且这些锁的获取顺序不能保证相同,就可能会发生死锁。
* 这可能会导致一个线程先获取 stateLock,然后获取资源,而另一个线程先获取资源,然后获取 stateLock,从而导致死锁。
*
* 执行器可能会引用共享字段(尽管它们永远不应该修改它们,只有驱动程序会进行修改)。
*/
private[spark] val stateLock = new Serializable {}
// 我们的依赖关系和分区将通过调用下面的子类方法获取,并且在进行检查点操作时会被覆盖
@volatile private var dependencies_ : Seq[Dependency[_]] = _
// 当我们覆盖依赖关系时,我们会保留对旧依赖关系的弱引用,以便用户进行清理。
@volatile @transient private var legacyDependencies: WeakReference[Seq[Dependency[_]]] = _
@volatile @transient private var partitions_ : Array[Partition] = _
/**
* 获取此 RDD 的检查点 RDD(如果已进行检查点操作)。
*
* @return 检查点 RDD 的可选实例
*/
private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD)
/**
* 获取此 RDD 的依赖关系列表,考虑到 RDD 是否进行了检查点操作。
*
* @return 依赖关系序列
*/
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
stateLock.synchronized {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
}
}
dependencies_
}
}
/**
* 获取此 RDD 的依赖关系列表,忽略检查点操作。
*
* @return 依赖关系序列的可选实例
*/
final private def internalDependencies: Option[Seq[Dependency[_]]] = {
if (legacyDependencies != null) {
legacyDependencies.get
} else if (dependencies_ != null) {
Some(dependencies_)
} else {
// 这种情况应该很少见。
stateLock.synchronized {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
Some(dependencies_)
}
}
}
/**
* 获取此 RDD 的分区数组,考虑到 RDD 是否进行了检查点操作。
*
* @return 分区数组
*/
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
stateLock.synchronized {
if (partitions_ == null) {
partitions_ = getPartitions
partitions_.zipWithIndex.foreach { case (partition, index) =>
require(partition.index == index,
s"partitions($index).partition == ${partition.index}, but it should equal $index")
}
}
}
}
partitions_
}
}
/**
* 返回此 RDD 的分区数。
*
* @return 分区数
*/
@Since("1.6.0")
final def getNumPartitions: Int = partitions.length
/**
* 获取分区的首选位置,考虑到 RDD 是否进行了检查点操作。
*
* @param split 分区
* @return 首选位置序列
*/
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
/**
* 此 RDD 的内部方法;如果适用,将从缓存中读取,否则进行计算。
* 此方法不应该由用户直接调用,而是供自定义 RDD 子类的实现者使用。
*
* @param split 分区
* @param context 任务上下文
* @return 分区元素的迭代器
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
/**
* 返回给定 RDD 的祖先,这些祖先仅通过一系列窄依赖关系与之相关。
* 此方法使用深度优先搜索遍历给定 RDD 的依赖树,但不保证返回的 RDD 有特定顺序。
*
* @return 祖先 RDD 序列
*/
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
val ancestors = new mutable.HashSet[RDD[_]]
def visit(rdd: RDD[_]): Unit = {
val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
val narrowParents = narrowDependencies.map(_.rdd)
val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
}
visit(this)
// 为了避免循环,不包括根 RDD 本身
ancestors.filterNot(_ == this).toSeq
}
/**
* 计算 RDD 分区或从检查点读取(如果 RDD 正在进行检查点操作)。
*
* @param split 分区
* @param context 任务上下文
* @return 分区元素的迭代器
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
/**
* 获取或计算 RDD 分区。当 RDD 被缓存时,由 RDD.iterator() 使用。
*
* @param partition 分区
* @param context 任务上下文
* @return 分区元素的迭代器
*/
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// 此方法在执行器上调用,因此我们需要调用 SparkEnv.get 而不是 sc.env。
SparkEnv.get.blockManager.getOrElseUpdateRDDBlock(
context.taskAttemptId(), blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}
) match {
// 块命中。
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
// 需要计算块。
case Right(iter) =>
new InterruptibleIterator(context, iter)
}
}
/**
* 在一个作用域内执行一段代码,使得在该作用域内创建的所有新 RDD 都将属于同一个作用域。
* 有关更多详细信息,请参阅 {{org.apache.spark.rdd.RDDOperationScope}}。
*
* 注意:给定的代码块中不允许使用 return 语句。
*
* @param body 要执行的代码块
* @tparam U 代码块的返回类型
* @return 代码块的返回值
*/
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
// 转换操作(返回一个新的 RDD)
/**
* 通过对该 RDD 的所有元素应用一个函数,返回一个新的 RDD。
*
* @param f 应用于每个元素的函数
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
/**
* 首先对该 RDD 的所有元素应用一个函数,然后将结果展平,返回一个新的 RDD。
*
* @param f 应用于每个元素的函数,返回一个可遍历的集合
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}
/**
* 返回一个新的 RDD,其中只包含满足谓词的元素。
*
* @param f 用于过滤元素的谓词函数
* @return 新的 RDD
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(_, _, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
/**
* 返回一个新的 RDD,其中包含该 RDD 中的不同元素。
*
* @param numPartitions 新 RDD 的分区数
* @param ord 元素的排序规则
* @return 新的 RDD
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
// 创建一个外部追加仅映射的实例,该实例忽略值。
val map = new ExternalAppendOnlyMap[T, Null, Null](
createCombiner = _ => null,
mergeValue = (a, b) => a,
mergeCombiners = (a, b) => a)
map.insertAll(partition.map(_ -> null))
map.iterator.map(_._1)
}
partitioner match {
case Some(_) if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
}
}
/**
* 返回一个新的 RDD,其中包含该 RDD 中的不同元素。
* 使用该 RDD 的分区数作为新 RDD 的分区数。
*
* @return 新的 RDD
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
/**
* 返回一个新的 RDD,该 RDD 正好有 numPartitions 个分区。
*
* 可以增加或减少该 RDD 的并行度。内部使用洗牌操作来重新分配数据。
*
* 如果要减少该 RDD 的分区数,建议使用 `coalesce`,它可以避免执行洗牌操作。
*
* @param numPartitions 新的分区数
* @param ord 元素的排序规则
* @return 新的 RDD
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
/**
* 返回一个新的 RDD,该 RDD 被合并为 `numPartitions` 个分区。
*
* 这会产生一个窄依赖关系,例如,如果从 1000 个分区合并到 100 个分区,不会进行洗牌操作,
* 而是每个新分区将获取当前分区中的 10 个分区。如果请求的分区数更大,则保持当前的分区数。
*
* 但是,如果进行大幅度的合并,例如合并到 numPartitions = 1,这可能会导致计算只在比预期少的节点上进行
* (例如,numPartitions = 1 时只在一个节点上进行)。为了避免这种情况,可以将 shuffle 设置为 true。
* 这将添加一个洗牌步骤,但意味着当前的上游分区将并行执行(根据当前的分区方式)。
*
* @param numPartitions 新的分区数
* @param shuffle 是否进行洗牌操作
* @param partitionCoalescer 可选的分区合并器
* @param ord 元素的排序规则
* @return 新的 RDD
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** 从随机分区开始,将元素均匀分布到输出分区。 */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new XORShiftRandom(index).nextInt(numPartitions)
items.map { t =>
// 注意,键的哈希码将只是键本身。HashPartitioner 将对其进行取模操作。
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// 包括一个洗牌步骤,以便我们的上游任务仍然可以分布执行
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
/**
* 返回此 RDD 的一个采样子集。
*
* @param withReplacement 元素是否可以被多次采样(采样后是否替换)
* @param fraction 作为此 RDD 大小的一部分,样本的预期大小
* 无替换:每个元素被选中的概率;fraction 必须在 [0, 1] 范围内
* 有替换:每个元素被选中的预期次数;fraction 必须大于或等于 0
* @param seed 随机数生成器的种子
*
* @note 这不能保证提供给定 [[RDD]] 的确切分数计数。
*
* @return 采样后的 RDD
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")
withScope {
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
/**
* 随机将此 RDD 按提供的权重分割。
*
* @param weights 分割的权重,若它们的和不为 1,则会进行归一化
* @param seed 随机种子
*
* @return 分割后的 RDD 数组
*/
def randomSplit(
weights: Array[Double],
seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
require(weights.forall(_ >= 0),
s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
require(weights.sum > 0,
s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")
withScope {
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
randomSampleWithRange(x(0), x(1), seed)
}.toArray
}
}
/**
* 为 DataFrames 中的随机分割暴露的内部方法。根据给定的概率范围对 RDD 进行采样。
*
* @param lb 用于伯努利采样器的下限
* @param ub 用于伯努利采样器的上限
* @param seed 随机数生成器的种子
*
* @return 此 RDD 的一个无替换随机子样本。
*/
private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {
this.mapPartitionsWithIndex( { (index, partition) =>
val sampler = new BernoulliCellSampler[T](lb, ub)
sampler.setSeed(seed + index)
sampler.sample(partition)
}, isOrderSensitive = true, preservesPartitioning = true)
}
/**
* 以数组形式返回此 RDD 的固定大小采样子集。
*
* @param withReplacement 是否进行有放回采样
* @param num 返回样本的大小
* @param seed 随机数生成器的种子
*
* @return 指定大小的样本数组
*
* @note 只有当结果数组预期较小时才应使用此方法,因为所有数据都会加载到驱动程序的内存中。
*/
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = withScope {
val numStDev = 10.0
require(num >= 0, "Negative number of elements requested")
require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
"Cannot support a sample size > Int.MaxValue - " +
s"$numStDev * math.sqrt(Int.MaxValue)")
if (num == 0) {
new Array[T](0)
} else {
val initialCount = this.count()
if (initialCount == 0) {
new Array[T](0)
} else {
val rand = new Random(seed)
if (!withReplacement && num >= initialCount) {
Utils.randomizeInPlace(this.collect(), rand)
} else {
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
withReplacement)
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
// 如果第一个样本不够大,继续尝试采样;这种情况不应该经常发生,因为我们初始大小使用了较大的乘数
var numIters = 0
while (samples.length < num) {
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
numIters += 1
}
Utils.randomizeInPlace(samples, rand).take(num)
}
}
}
}
/**
* 返回此 RDD 和另一个 RDD 的并集。任何相同的元素都会多次出现(使用 `.distinct()` 消除它们)。
*
* @param other 另一个 RDD
* @return 并集 RDD
*/
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}
/**
* 返回此 RDD 和另一个 RDD 的并集。任何相同的元素都会多次出现(使用 `.distinct()` 消除它们)。
* 这是 `union` 方法的别名。
*
* @param other 另一个 RDD
* @return 并集 RDD
*/
def ++(other: RDD[T]): RDD[T] = withScope {
this.union(other)
}
/**
* 返回按给定键函数排序的此 RDD。
*
* @param f 用于提取键的函数
* @param ascending 是否升序排序
* @param numPartitions 结果 RDD 的分区数
* @param ord 键的排序规则
* @param ctag 键的类型标签
* @return 排序后的 RDD
*/
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
/**
* 返回此 RDD 和另一个 RDD 的交集。输出不会包含任何重复元素,即使输入 RDD 中有重复元素。
*
* @note 此方法内部会执行洗牌操作。
*
* @param other 另一个 RDD
* @return 交集 RDD
*/
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
/**
* 返回此 RDD 和另一个 RDD 的交集。输出不会包含任何重复元素,即使输入 RDD 中有重复元素。
*
* @note 此方法内部会执行洗牌操作。
*
* @param other 另一个 RDD
* @param partitioner 结果 RDD 使用的分区器
* @param ord 元素的排序规则
* @return 交集 RDD
*/
def intersection(
other: RDD[T],
partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
/**
* 返回此 RDD 和另一个 RDD 的交集。输出不会包含任何重复元素,即使输入 RDD 中有重复元素。
* 在集群上执行哈希分区。
*
* @note 此方法内部会执行洗牌操作。
*
* @param other 另一个 RDD
* @param numPartitions 结果 RDD 的分区数
* @return 交集 RDD
*/
def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
intersection(other, new HashPartitioner(numPartitions))
}
/**
* 返回一个新的 RDD,通过将每个分区内的所有元素合并为一个数组。
*
* @return 新的 RDD,元素为数组
*/
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray))
}
/**
* 返回此 RDD 和另一个 RDD 的笛卡尔积,即所有元素对 (a, b) 的 RDD,其中 a 属于 `this`,b 属于 `other`。
*
* @param other 另一个 RDD
* @tparam U 另一个 RDD 中元素的类型
* @return 笛卡尔积 RDD
*/
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
}
/**
* 返回一个分组元素的 RDD。每个组由一个键和映射到该键的元素序列组成。
* 每个组内元素的顺序不保证,并且每次计算结果 RDD 时可能会不同。
*
* @note 此操作可能非常昂贵。如果是为了对每个键执行聚合操作(如求和或求平均值),
* 使用 `PairRDDFunctions.aggregateByKey` 或 `PairRDDFunctions.reduceByKey` 会提供更好的性能。
*
* @param f 用于提取键的函数
* @param kt 键的类型标签
* @tparam K 键的类型
* @return 分组后的 RDD
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
/**
* 返回一个分组元素的 RDD。每个组由一个键和映射到该键的元素序列组成。
* 每个组内元素的顺序不保证,并且每次计算结果 RDD 时可能会不同。
*
* @note 此操作可能非常昂贵。如果是为了对每个键执行聚合操作(如求和或求平均值),
* 使用 `PairRDDFunctions.aggregateByKey` 或 `PairRDDFunctions.reduceByKey` 会提供更好的性能。
*
* @param f 用于提取键的函数
* @param numPartitions 结果 RDD 的分区数
* @param kt 键的类型标签
* @tparam K 键的类型
* @return 分组后的 RDD
*/
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy(f, new HashPartitioner(numPartitions))
}
/**
* 返回一个分组元素的 RDD。每个组由一个键和映射到该键的元素序列组成。
* 每个组内元素的顺序不保证,并且每次计算结果 RDD 时可能会不同。
*
* @note 此操作可能非常昂贵。如果是为了对每个键执行聚合操作(如求和或求平均值),
* 使用 `PairRDDFunctions.aggregateByKey` 或 `PairRDDFunctions.reduceByKey` 会提供更好的性能。
*
* @param f 用于提取键的函数
* @param p 分区器
* @param kt 键的类型标签
* @param ord 键的排序规则
* @tparam K 键的类型
* @return 分组后的 RDD
*/
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
/**
* 返回一个通过将元素传递给一个外部进程而创建的 RDD。
*
* @param command 要执行的命令
* @return 新的 RDD,元素为字符串
*/
def pipe(command: String): RDD[String] = withScope {
// 类似于 Runtime.exec(),如果我们得到一个字符串,使用标准的 StringTokenizer(按空格)将其拆分为单词
pipe(PipedRDD.tokenize(command))
}
/**
* 返回一个通过将元素传递给一个外部进程而创建的 RDD。
*
* @param command 要执行的命令
* @param env 环境变量映射
* @return 新的 RDD,元素为字符串
*/
def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
// 类似于 Runtime.exec(),如果我们得到一个字符串,使用标准的 StringTokenizer(按空格)将其拆分为单词
pipe(PipedRDD.tokenize(command), env)
}
/**
* 返回一个通过将元素传递给一个外部进程而创建的 RDD。
* 结果 RDD 通过为每个分区执行一次给定的进程来计算。每个输入分区的所有元素作为由换行符分隔的输入行写入进程的标准输入。
* 结果分区由进程的标准输出组成,标准输出的每一行都会成为输出分区的一个元素。即使对于空分区也会启动一个进程。
*
* 可以通过提供两个函数来定制打印行为。
*
* @param command 在外部进程中运行的命令
* @param env 要设置的环境变量
* @param printPipeContext 在管道元素之前,调用此函数以提供管道上下文数据的机会。
* 打印行函数(如 out.println)将作为 printPipeContext 的参数传递。
* @param printRDDElement 使用此函数来定制如何管道元素。此函数将以每个 RDD 元素作为第一个参数,
* 打印行函数(如 out.println())作为第二个参数调用。
* 例如,以流式方式管道 groupBy() 的 RDD 数据,而不是构造一个巨大的字符串来连接所有元素:
* {{{
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2) {f(e)}
* }}}
* @param separateWorkingDir 是否为每个任务使用单独的工作目录
* @param bufferSize 管道进程标准输入写入器的缓冲区大小
* @param encoding 与管道进程交互(通过标准输入、标准输出和标准错误)使用的字符编码
* @return 结果 RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false,
bufferSize: Int = 8192,
encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir,
bufferSize,
encoding)
}
/**
* 通过对该 RDD 的每个分区应用一个函数,返回一个新的 RDD。
*
* `preservesPartitioning` 指示输入函数是否保留分区器,除非这是一个键值对 RDD 且输入函数不修改键,否则应设置为 `false`。
*
* @param f 应用于每个分区的函数
* @param preservesPartitioning 是否保留分区器
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
/**
* [性能] Spark 内部的 mapPartitionsWithIndex 方法,跳过闭包清理。
* 这是一个性能相关的 API,只有在确定 RDD 元素是可序列化的且不需要闭包清理时才能谨慎使用。
*
* @param f 应用于每个分区的函数,接收分区索引和迭代器作为参数
* @param preservesPartitioning 指示输入函数是否保留分区器,
* 除非这是一个键值对 RDD 且输入函数不修改键,否则应设置为 `false`。
* @param isOrderSensitive 函数是否对顺序敏感。如果对顺序敏感,当输入顺序改变时,可能会返回完全不同的结果。
* 大多数有状态的函数对顺序敏感。
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false,
isOrderSensitive: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning = preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}
/**
* [性能] Spark 内部的 mapPartitions 方法,跳过闭包清理。
*
* @param f 应用于每个分区的函数
* @param preservesPartitioning 是否保留分区器
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
private[spark] def mapPartitionsInternal[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(_: TaskContext, _: Int, iter: Iterator[T]) => f(iter),
preservesPartitioning)
}
/**
* 通过对该 RDD 的每个分区应用一个函数,同时跟踪原始分区的索引,返回一个新的 RDD。
*
* `preservesPartitioning` 指示输入函数是否保留分区器,除非这是一个键值对 RDD 且输入函数不修改键,否则应设置为 `false`。
*
* @param f 应用于每个分区的函数,接收分区索引和迭代器作为参数
* @param preservesPartitioning 是否保留分区器
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
/**
* 通过对该 RDD 的每个分区应用一个评估器,返回一个新的 RDD。
* 给定的评估器工厂将被序列化并发送到执行器,每个任务将使用该工厂创建一个评估器,
* 并使用评估器转换输入分区的数据。
*
* @param evaluatorFactory 分区评估器工厂
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
@DeveloperApi
@Since("3.5.0")
def mapPartitionsWithEvaluator[U: ClassTag](
evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope {
new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory)
}
/**
* 将此 RDD 的分区与另一个 RDD 的分区进行拉链操作,并通过对拉链后的分区应用一个评估器,返回一个新的 RDD。
* 假设两个 RDD 具有 *相同数量的分区*,但不要求每个分区中的元素数量相同。
*
* @param rdd2 另一个 RDD
* @param evaluatorFactory 分区评估器工厂
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
@DeveloperApi
@Since("3.5.0")
def zipPartitionsWithEvaluator[U: ClassTag](
rdd2: RDD[T],
evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope {
new ZippedPartitionsWithEvaluatorRDD(this, rdd2, evaluatorFactory)
}
/**
* 通过对该 RDD 的每个分区应用一个函数,同时跟踪原始分区的索引,返回一个新的 RDD。
*
* `preservesPartitioning` 指示输入函数是否保留分区器,除非这是一个键值对 RDD 且输入函数不修改键,否则应设置为 `false`。
*
* `isOrderSensitive` 指示函数是否对顺序敏感。如果对顺序敏感,当输入顺序改变时,可能会返回完全不同的结果。
* 大多数有状态的函数对顺序敏感。
*
* @param f 应用于每个分区的函数,接收分区索引和迭代器作为参数
* @param preservesPartitioning 是否保留分区器
* @param isOrderSensitive 函数是否对顺序敏感
* @tparam U 新 RDD 中元素的类型
* @return 新的 RDD
*/
private[spark] def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean,
isOrderSensitive: Boolean): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}
/**
* 将此 RDD 与另一个 RDD 进行拉链操作,返回键值对 RDD,其中每个键值对包含两个 RDD 中的第一个元素、第二个元素等。
* 假设两个 RDD 具有 *相同数量的分区* 且 *每个分区中的元素数量相同*(例如,一个 RDD 是通过对另一个 RDD 进行映射操作得到的)。
*
* @param other 另一个 RDD
* @tparam U 另一个 RDD 中元素的类型
* @return 拉链后的 RDD
*/
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError()
}
def next(): (T, U) = (thisIter.next(), otherIter.next())
}
}
}
/**
* 将此 RDD 的分区与另一个 RDD 的分区进行拉链操作,并通过对拉链后的分区应用一个函数,返回一个新的 RDD。
* 假设两个 RDD 具有 *相同数量的分区*,但不要求每个分区中的元素数量相同。
*
* @param rdd2 另一个 RDD
* @param preservesPartitioning 是否保留分区器
* @param f 应用于拉链后分区的函数
* @tparam B 另一个 RDD 中元素的类型
* @tparam V 新 RDD 中元素的类型
* @return 新的 RDD
*/
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
}
/**
* 将此 RDD 的分区与另一个 RDD 的分区进行拉链操作,并通过对拉链后的分区应用一个函数,返回一个新的 RDD。
* 假设两个 RDD 具有 *相同数量的分区*,但不要求每个分区中的元素数量相同。
* 不保留分区器。
*
* @param rdd2 另一个 RDD
* @param f 应用于拉链后分区的函数
* @tparam B 另一个 RDD 中元素的类型
* @tparam V 新 RDD 中元素的类型
* @return 新的 RDD
*/
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
zipPartitions(rdd2, preservesPartitioning = false)(f)
}
/**
* 将此 RDD 的分区与另外两个 RDD 的分区进行拉链操作,并通过对拉链后的分区应用一个函数,返回一个新的 RDD。
* 假设三个 RDD 具有 *相同数量的分区*,但不要求每个分区中的元素数量相同。
*
* @param rdd2 第二个 RDD
* @param rdd3 第三个 RDD
* @param preservesPartitioning 是否保留分区器
* @param f 应用于拉链后分区的函数
* @tparam B 第二个 RDD 中元素的类型
* @tparam C 第三个 RDD 中元素的类型
* @tparam V 新 RDD 中元素的类型
* @return 新的 RDD
*/
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
}
/**
* 将此 RDD 的分区与另外两个 RDD 的分区进行拉链操作,并通过对拉链后的分区应用一个函数,返回一个新的 RDD。
* 假设三个 RDD 具有 *相同数量的分区*,但不要求每个分区中的元素数量相同。
* 不保留分区器。
*
* @param rdd2 第二个 RDD
* @param rdd3 第三个 RDD
* @param f 应用于拉链后分区的函数
* @tparam B 第二个 RDD 中元素的类型
* @tparam C 第三个 RDD 中元素的类型
* @tparam V 新 RDD 中元素的类型
* @return 新的 RDD
*/
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
}
/**
* 将此 RDD 的分区与另外三个 RDD 的分区进行拉链操作,并通过对拉链后的分区应用一个函数,返回一个新的 RDD。
* 假设四个 RDD 具有 *相同数量的分区*,但不要求每个分区中的元素数量相同。
*
* @param rdd2 第二个 RDD
* @param rdd3 第三个 RDD
* @param rdd4 第四个 RDD
* @param preservesPartitioning 是否保留分区器
* @param f 应用于拉链后分区的函数
* @tparam B 第二个 RDD 中元素的类型
* @tparam C 第三个 RDD 中元素的类型
* @tparam D 第四个 RDD 中元素的类型
* @tparam V 新 RDD 中元素的类型
* @return 新的 RDD
*/
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
}
/**
* 将此 RDD 的分区与另外三个 RDD 的分区进行拉链操作,并通过对拉链后的分区应用一个函数,返回一个新的 RDD。
* 假设四个 RDD 具有 *相同数量的分区*,但不要求每个分区中的元素数量相同。
* 不保留分区器。
*
* @param rdd2 第二个 RDD
* @param rdd3 第三个 RDD
* @param rdd4 第四个 RDD
* @param f 应用于拉链后分区的函数
* @tparam B 第二个 RDD 中元素的类型
* @tparam C 第三个 RDD 中元素的类型
* @tparam D 第四个 RDD 中元素的类型
* @tparam V 新 RDD 中元素的类型
* @return 新的 RDD
*/
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
}
// 动作操作(启动一个作业以将值返回给用户程序)
/**
* 对该 RDD 的所有元素应用一个函数。
*
* @param f 应用于每个元素的函数
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
/**
* 对该 RDD 的每个分区应用一个函数。
*
* @param f 应用于每个分区的函数
*/
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
/**
* 返回一个包含该 RDD 所有元素的数组。
*
* @note 只有当结果数组预期较小时才应使用此方法,因为所有数据都会加载到驱动程序的内存中。
*
* @return 包含所有元素的数组
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
/**
* 返回一个包含该 RDD 所有元素的迭代器。
*
* 迭代器将消耗与该 RDD 中最大分区相同的内存。
*
* @note 这会导致多个 Spark 作业,如果输入 RDD 是宽转换的结果(例如,使用不同分区器进行连接),
* 为了避免重新计算,输入 RDD 应该先进行缓存。
*
* @return 包含所有元素的迭代器
*/
def toLocalIterator: Iterator[T] = withScope {
def collectPartition(p: Int): Array[T] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
}
partitions.indices.iterator.flatMap(i => collectPartition(i))
}
/**
* 通过应用 `f` 返回一个包含所有匹配值的 RDD。
*
* @param f 部分函数,用于筛选和转换元素
* @tparam U 新 RDD 中元素的类型
* @return 包含匹配值的 RDD
*/
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
filter(cleanF.isDefinedAt).map(cleanF)
}
/**
* 返回一个 RDD,其中包含 `this` 中不在 `other` 中的元素。
*
* 使用 `this` 的分区器/分区大小,因为即使 `other` 很大,结果 RDD 也将 <= 我们。
*
* @param other 另一个 RDD
* @return 包含差集元素的 RDD
*/
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}
/**
* 返回一个 RDD,其中包含 `this` 中不在 `other` 中的元素。
*
* @param other 另一个 RDD
* @param numPartitions 结果 RDD 的分区数
* @return 包含差集元素的 RDD
*/
def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
subtract(other, new HashPartitioner(numPartitions))
}
/**
* 返回一个 RDD,其中包含 `this` 中不在 `other` 中的元素。
*
* @param other 另一个 RDD
* @param p 分区器
* @param ord 元素的排序规则
* @return 包含差集元素的 RDD
*/
def subtract(
other: RDD[T],
p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
if (partitioner == Some(p)) {
// 我们的分区器知道如何处理 T(因为我们有一个分区器,实际上是 (K, V))
// 所以创建一个新的分区器,用于解包我们的假元组
val p2 = new Partitioner() {
override def numPartitions: Int = p.numPartitions
override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
// 不幸的是,由于我们创建了一个新的 p2,我们仍然会得到 ShuffleDependencies
// 并且在调用 .keys 时,即使 SubtractedRDD 会因为 p2 的解包分区而已经按正确/真实的键(例如 p)分区,也不会设置分区器。
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
} else {
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
}
}
/**
* 使用指定的可交换和关联的二元运算符对该 RDD 的元素进行归约。
*
* @param f 归约函数
* @return 归约结果
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (_: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// 从 Option 中获取最终结果,或者如果 RDD 为空则抛出异常
jobResult.getOrElse(throw SparkCoreErrors.emptyCollectionError())
}
/**
* 以多级树模式对该 RDD 的元素进行归约。
*
* @param depth 树的建议深度(默认:2)
* @see [[org.apache.spark.rdd.RDD#reduce]]
* @return 归约结果
*/
def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
val cleanF = context.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it)))
val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
if (c.isDefined && x.isDefined) {
Some(cleanF(c.get, x.get))
} else if (c.isDefined) {
c
} else if (x.isDefined) {
x
} else {
None
}
}
partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw SparkCoreErrors.emptyCollectionError())
}
/**
* 对每个分区的元素进行聚合,然后对所有分区的结果进行聚合,使用给定的关联函数和中性 "零值"。
* 函数 op(t1, t2) 允许修改 t1 并将其作为结果返回,以避免对象分配;但是,它不应该修改 t2。
*
* 此操作的行为与函数式语言(如 Scala)中为非分布式集合实现的折叠操作有所不同。
* 此折叠操作可能会分别应用于各个分区,然后将这些结果折叠为最终结果,而不是按某种定义的顺序依次应用于每个元素。
* 对于非交换函数,结果可能与应用于非分布式集合的折叠操作不同。
*
* @param zeroValue 每个分区的累积结果的初始值,以及不同分区的合并结果的初始值 - 这通常是中性元素
* (例如,列表连接的 `Nil` 或求和的 `0`)
* @param op 用于在分区内累积结果和合并不同分区结果的运算符
* @return 聚合结果
*/
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// 克隆零值,因为我们也会将其作为任务的一部分进行序列化
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
/**
* 对每个分区的元素进行聚合,然后对所有分区的结果进行聚合,使用给定的合并函数和中性 "零值"。
* 此函数可以返回与该 RDD 的类型 T 不同的结果类型 U。
* 因此,我们需要一个操作来将 T 合并到 U 中,以及一个操作来合并两个 U,就像在 scala.TraversableOnce 中一样。
* 这两个函数都允许修改并返回其第一个参数,而不是创建一个新的 U,以避免内存分配。
*
* @param zeroValue 每个分区的累积结果的初始值,以及不同分区的合并结果的初始值 - 这通常是中性元素
* (例如,列表连接的 `Nil` 或求和的 `0`)
* @param seqOp 用于在分区内累积结果的运算符
* @param combOp 用于合并不同分区结果的关联运算符
* @tparam U 聚合结果的类型
* @return 聚合结果
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// 克隆零值,因为我们也会将其作为任务的一部分进行序列化
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
/**
* 以多级树模式对该 RDD 的元素进行聚合。
* 此方法在语义上与 [[org.apache.spark.rdd.RDD#aggregate]] 相同。
*
* @param zeroValue 每个分区的累积结果的初始值,以及不同分区的合并结果的初始值 - 这通常是中性元素
* (例如,列表连接的 `Nil` 或求和的 `0`)
* @param seqOp 用于在分区内累积结果的运算符
* @param combOp 用于合并不同分区结果的关联运算符
* @param depth 树的建议深度(默认:2)
* @tparam U 聚合结果的类型
* @return 聚合结果
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
treeAggregate(zeroValue, seqOp, combOp, depth, finalAggregateOnExecutor = false)
}
/**
* [[org.apache.spark.rdd.RDD#treeAggregate]] 带有一个参数,用于在执行器上进行最终聚合。
*
* @param zeroValue 每个分区的累积结果的初始值,以及不同分区的合并结果的初始值 - 这通常是中性元素
* (例如,列表连接的 `Nil` 或求和的 `0`)
* @param seqOp 用于在分区内累积结果的运算符
* @param combOp 用于合并不同分区结果的关联运算符
* @param depth 树的建议深度
* @param finalAggregateOnExecutor 是否在执行器上进行最终聚合
* @tparam U 聚合结果的类型
* @return 聚合结果
*/
def treeAggregate[U: ClassTag](
zeroValue: U,
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int,
finalAggregateOnExecutor: Boolean): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// 如果创建一个额外的级别不能帮助减少
// 时钟时间,我们停止树聚合。
// 当不能节省时钟时间时,不触发树聚合
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
if (finalAggregateOnExecutor && partiallyAggregated.partitions.length > 1) {
// 将部分聚合的 RDD 映射为键值对 RDD
// 在单个执行器上使用一个分区进行计算
// 获取新的 RDD[U]
partiallyAggregated = partiallyAggregated
.map(v => (0.toByte, v))
.foldByKey(zeroValue, new ConstantPartitioner)(cleanCombOp)
.values
}
val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
}
/**
* 返回该 RDD 中的元素数量。
*
* @return 元素数量
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
/**
* count() 的近似版本,在超时时间内返回一个可能不完整的结果,即使并非所有任务都已完成。
*
* 置信度是结果的误差范围包含真实值的概率。也就是说,如果以置信度 0.9 重复调用 countApprox,
* 我们预计 90% 的结果将包含真实计数。置信度必须在 [0,1] 范围内,否则将抛出异常。
*
* @param timeout 作业的最大等待时间,以毫秒为单位
* @param confidence 结果的期望统计置信度
* @return 可能不完整的结果,带有误差范围
*/
def countApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
val countElements: (TaskContext, Iterator[T]) => Long = { (_, iter) =>
var result = 0L
while (iter.hasNext) {
result += 1L
iter.next()
}
result
}
val evaluator = new CountEvaluator(partitions.length, confidence)
sc.runApproximateJob(this, countElements, evaluator, timeout)
}
/**
* 返回该 RDD 中每个唯一值的计数,作为一个本地的 (值, 计数) 对映射。
*
* @note 只有当结果映射预期较小时才应使用此方法,因为整个映射都会加载到驱动程序的内存中。
* 要处理非常大的结果,考虑使用
*
* {{{
* rdd.map(x => (x, 1L)).reduceByKey(_ + _)
* }}}
*
* 它返回一个 RDD[T, Long] 而不是一个映射。
*
* @param ord 元素的排序规则
* @return 唯一值的计数映射
*/
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
map(value => (value, null)).countByKey()
}
/**
* countByValue() 的近似版本。
*
* @param timeout 作业的最大等待时间,以毫秒为单位
* @param confidence 结果的期望统计置信度
* @param ord 元素的排序规则
* @return 可能不完整的结果,带有误差范围
*/
def countByValueApprox(timeout: Long, confidence: Double = 0.95)
(implicit ord: Ordering[T] = null)
: PartialResult[Map[T, BoundedDouble]] = withScope {
require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
if (elementClassTag.runtimeClass.isArray) {
throw SparkCoreErrors.countByValueApproxNotSupportArraysError()
}
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (_, iter) =>
val map = new OpenHashMap[T, Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
map
}
val evaluator = new GroupedCountEvaluator[T](partitions.length, confidence)
sc.runApproximateJob(this, countPartition, evaluator, timeout)
}
/**
* 返回该 RDD 中不同元素的近似数量。
*
* 所使用的算法基于 streamlib 对 "HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm" 的实现,
* 可在 <a href="https://doi.org/10.1145/2452376.2452456">此处</a> 找到。
*
* 相对精度约为 `1.054 / sqrt(2^p)`。设置一个非零的 (`sp` 大于 `p`) 将触发寄存器的稀疏表示,
* 这可能会减少内存消耗并在基数较小时提高精度。
*
* @param p 正常集合的精度值。
* `p` 必须是一个介于 4 和 `sp` 之间的值(如果 `sp` 不为零,最大为 32)。
* @param sp 稀疏集合的精度值,介于 0 和 32 之间。
* 如果 `sp` 等于 0,则跳过稀疏表示。
* @return 不同元素的近似数量
*/
def countApproxDistinct(p: Int, sp: Int): Long = withScope {
require(p >= 4, s"p ($p) must be >= 4")
require(sp <= 32, s"sp ($sp) must be <= 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val zeroCounter = new HyperLogLogPlus(p, sp)
aggregate(zeroCounter)(
(hll: HyperLogLogPlus, v: T) => {
hll.offer(v)
hll
},
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h1
}).cardinality()
}
/**
* 返回该 RDD 中不同元素的近似数量。
*
* 所使用的算法基于 streamlib 对 "HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm" 的实现,
* 可在 <a href="https://doi.org/10.1145/2452376.2452456">此处</a> 找到。
*
* @param relativeSD 相对精度。较小的值会创建需要更多空间的计数器。
* 它必须大于 0.000017。
* @return 不同元素的近似数量
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope {
require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
countApproxDistinct(if (p < 4) 4 else p, 0)
}
/**
* 将此 RDD 与其元素的索引进行拉链操作。排序首先基于分区索引,然后是每个分区内元素的顺序。
* 因此,第一个分区中的第一个元素的索引为 0,最后一个分区中的最后一个元素的索引最大。
*
* 这类似于 Scala 的 zipWithIndex,但它使用 Long 而不是 Int 作为索引类型。
* 当此 RDD 包含多个分区时,此方法需要触发一个 Spark 作业。
*
* @note 某些 RDD(例如,由 groupBy() 返回的 RDD)不保证分区内元素的顺序。
* 因此,分配给每个元素的索引不能保证,并且如果重新计算 RDD,索引甚至可能会改变。
* 如果需要固定的顺序来保证相同的索引分配,应该使用 sortByKey() 对 RDD 进行排序或将其保存到文件中。
*
* @return 拉链后的 RDD,元素为 (元素, 索引) 对
*/
def zipWithIndex(): RDD[(T, Long)] = withScope {
new ZippedWithIndexRDD(this)
}
/**
* 将此 RDD 与生成的唯一 Long ID 进行拉链操作。第 k 个分区中的元素将获得 ID k, n+k, 2*n+k, ...,
* 其中 n 是分区数。因此可能会存在间隙,但此方法不会触发 Spark 作业,这与 [[org.apache.spark.rdd.RDD#zipWithIndex]] 不同。
*
* @note 某些 RDD(例如,由 groupBy() 返回的 RDD)不保证分区内元素的顺序。
* 因此,分配给每个元素的唯一 ID 不能保证,并且如果重新计算 RDD,ID 甚至可能会改变。
* 如果需要固定的顺序来保证相同的索引分配,应该使用 sortByKey() 对 RDD 进行排序或将其保存到文件中。
*
* @return 拉链后的 RDD,元素为 (元素, 唯一 ID) 对
*/
def zipWithUniqueId(): RDD[(T, Long)] = withScope {
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
(item, i * n + k)
}
}
}
/**
* 获取该 RDD 的前 num 个元素。它首先扫描一个分区,然后使用该分区的结果来估计满足限制所需的额外分区数。
*
* @note 只有当结果数组预期较小时才应使用此方法,因为所有数据都会加载到驱动程序的内存中。
*
* @note 由于内部实现的复杂性,如果在 `Nothing` 或 `Null` 类型的 RDD 上调用此方法,将抛出异常。
*
* @param num 要获取的元素数量
* @return 包含前 num 个元素的数组
*/
def take(num: Int): Array[T] = withScope {
val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// 本次迭代要尝试的分区数。这个数字可以大于 totalParts,因为我们实际上在 runJob 中会将其限制为 totalParts。
var numPartsToTry = conf.get(RDD_LIMIT_INITIAL_NUM_PARTITIONS)
val left = num - buf.size
if (partsScanned > 0) {
// 如果上一次迭代后没有找到任何行,则乘以 limitScaleUpFactor 并重试。
// 否则,插值计算我们需要尝试的分区数,但高估 50%。最后我们也会限制这个估计值。
if (buf.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
// 由于 left > 0,numPartsToTry 总是 >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}
buf.toArray
}
}
/**
* 返回该 RDD 中的第一个元素。
*
* @return 第一个元素
*/
def first(): T = withScope {
take(1) match {
case Array(t) => t
case _ => throw SparkCoreErrors.emptyCollectionError()
}
}
/**
* 根据指定的隐式 Ordering[T] 返回该 RDD 中的前 k 个(最大)元素,并保持排序顺序。
* 这与 [[takeOrdered]] 相反。例如:
* {{{
* sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
* // 返回 Array(12)
*
* sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
* // 返回 Array(6, 5)
* }}}
*
* @note 只有当结果数组预期较小时才应使用此方法,因为所有数据都会加载到驱动程序的内存中。
*
* @param num k,要返回的前 k 个元素的数量
* @param ord 元素的排序规则
* @return 包含前 k 个元素的数组
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
takeOrdered(num)(ord.reverse)
}
/**
* 根据指定的隐式 Ordering[T] 返回该 RDD 中的前 k 个(最小)元素,并保持排序顺序。
* 这与 [[top]] 相反。例如:
* {{{
* sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
* // 返回 Array(2)
*
* sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)
* // 返回 Array(2, 3)
* }}}
*
* @note 只有当结果数组预期较小时才应使用此方法,因为所有数据都会加载到驱动程序的内存中。
*
* @param num k,要返回的前 k 个元素的数量
* @param ord 元素的排序规则
* @return 包含前 k 个元素的数组
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || this.getNumPartitions == 0) {
Array.empty
} else {
this.mapPartitionsWithIndex { case (pid, iter) =>
if (iter.nonEmpty) {
// 优先级队列保留最大的元素,所以我们反转排序规则。
Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
} else if (pid == 0) {
// 确保分区 0 总是返回一个数组,以避免在空 RDD 上进行 reduce 操作
Iterator.single(Array.empty[T])
} else {
Iterator.empty
}
}.reduce { (array1, array2) =>
val size = math.min(num, array1.length + array2.length)
val array = Array.ofDim[T](size)
collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).copyToArray(array, 0, size)
array
}
}
}
/**
* 根据隐式 Ordering[T] 返回该 RDD 中的最大值。
*
* @param ord 元素的排序规则
* @return 最大值
*/
def max()(implicit ord: Ordering[T]): T = withScope {
this.reduce(ord.max)
}
/**
* 根据隐式 Ordering[T] 返回该 RDD 中的最小值。
*
* @param ord 元素的排序规则
* @return 最小值
*/
def min()(implicit ord: Ordering[T]): T = withScope {
this.reduce(ord.min)
}
/**
* @note 由于内部实现的复杂性,如果在 `Nothing` 或 `Null` 类型的 RDD 上调用此方法,将抛出异常。
* 这在实践中可能会出现,例如,`parallelize(Seq())` 的类型是 `RDD[Nothing]`。
* (无论如何都应该避免使用 `parallelize(Seq())`,而应使用 `parallelize(Seq[T]())`。)
*
* @return 如果且仅当该 RDD 不包含任何元素时返回 true。注意,即使 RDD 至少有 1 个分区,它也可能为空。
*/
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}
/**
* 将此 RDD 保存为文本文件,使用元素的字符串表示形式。
*
* @param path 文件路径
*/
def saveAsTextFile(path: String): Unit = withScope {
saveAsTextFile(path, null)
}
/**
* 将此 RDD 保存为压缩文本文件,使用元素的字符串表示形式。
*
* @param path 文件路径
* @param codec 压缩编解码器类
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
require(x != null, "text files do not allow null rows")
text.set(x.toString)
(NullWritable.get(), text)
}
}.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}
/**
* 将此 RDD 保存为序列化对象的 SequenceFile。
*
* @param path 文件路径
*/
def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
/**
* 通过应用 `f` 创建此 RDD 中元素的元组。
*
* @param f 用于提取键的函数
* @tparam K 键的类型
* @return 新的 RDD,元素为 (键, 元素) 对
*/
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
val cleanedF = sc.clean(f)
map(x => (cleanedF(x), x))
}
/**
* 一个用于测试的私有方法,用于查看每个分区的内容。
*
* @return 包含每个分区元素的二维数组
*/
private[spark] def collectPartitions(): Array[Array[T]] = withScope {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
/**
* 标记此 RDD 进行检查点操作。它将被保存到 `SparkContext#setCheckpointDir` 设置的检查点目录中的文件中,
* 并且所有对其父 RDD 的引用将被移除。此函数必须在对此 RDD 执行任何作业之前调用。
* 强烈建议将此 RDD 持久化到内存中,否则将其保存到文件中需要重新计算。
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// 注意:由于下游在确保子 RDD 分区指向正确的父分区方面存在复杂性,我们在这里使用全局锁。
// 将来我们应该重新考虑这个问题。
if (context.checkpointDir.isEmpty) {
throw SparkCoreErrors.checkpointDirectoryHasNotBeenSetInSparkContextError()
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
/**
* 使用 Spark 现有的缓存层标记此 RDD 进行本地检查点操作。
*
* 此方法适用于希望截断 RDD 血缘关系,同时跳过在可靠分布式文件系统中复制物化数据这一昂贵步骤的用户。
* 这对于需要定期截断长血缘关系的 RDD(例如 GraphX)很有用。
*
* 本地检查点牺牲了容错性以换取性能。特别是,检查点数据被写入执行器的临时本地存储,而不是可靠的、容错的存储。
* 这意味着如果在计算过程中执行器发生故障,检查点数据可能不再可用,导致不可恢复的作业失败。
*
* 这与动态分配一起使用不安全,因为动态分配会移除执行器及其缓存块。如果必须同时使用这两个功能,建议将
* `spark.dynamicAllocation.cachedExecutorIdleTimeout` 设置为较高的值。
*
* `SparkContext#setCheckpointDir` 设置的检查点目录不被使用。
*
* @return 此 RDD 实例
*/
def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
if (Utils.isDynamicAllocationEnabled(conf) &&
conf.contains(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)) {
logWarning("Local checkpointing is NOT safe to use with dynamic allocation, " +
"which removes executors along with their cached blocks. If you must use both " +
"features, you are advised to set `spark.dynamicAllocation.cachedExecutorIdleTimeout` " +
"to a high value. E.g. If you plan to use the RDD for 1 hour, set the timeout to " +
"at least 1 hour.")
}
// 注意:此时我们实际上不知道用户以后是否会调用 persist() 方法,
// 所以我们必须在这里显式调用它,以确保缓存块在 SparkContext 中注册以便后续清理。
//
// 但是,如果用户已经在这个 RDD 上调用了 persist() 方法,那么我们必须将他/她指定的存储级别
// 调整为适合本地检查点的级别(即使用磁盘),以保证正确性。
if (storageLevel == StorageLevel.NONE) {
persist(LocalRDDCheckpointData.DEFAULT_STORAGE_LEVEL)
} else {
persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
}
// 如果这个 RDD 已经被检查点并物化,它的血缘关系已经被截断。
// 在这种情况下,我们不能覆盖我们的 `checkpointData`,因为它是恢复检查点数据所必需的。
// 如果被覆盖,下次在这个 RDD 上进行物化操作会导致错误。
if (isCheckpointedAndMaterialized) {
logWarning("Not marking RDD for local checkpoint because it was already " +
"checkpointed and materialized")
} else {
// 血缘关系还没有被截断,所以只需用我们的检查点数据覆盖任何现有的检查点数据
checkpointData match {
case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
"RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
case _ =>
}
checkpointData = Some(new LocalRDDCheckpointData(this))
}
this
}
/**
* 返回此 RDD 是否已被检查点并物化,无论是可靠地还是本地地。
*
* @return 如果已被检查点并物化则返回 true,否则返回 false
*/
def isCheckpointed: Boolean = isCheckpointedAndMaterialized
/**
* 返回此 RDD 是否已被检查点并物化,无论是可靠地还是本地地。
* 这是 `isCheckpointed` 的别名,用于澄清返回值的语义。供测试使用。
*
* @return 如果已被检查点并物化则返回 true,否则返回 false
*/
private[spark] def isCheckpointedAndMaterialized: Boolean =
checkpointData.exists(_.isCheckpointed)
/**
* 返回此 RDD 是否被标记为本地检查点。
* 供测试使用。
*
* @return 如果被标记为本地检查点则返回 true,否则返回 false
*/
private[rdd] def isLocallyCheckpointed: Boolean = {
checkpointData match {
case Some(_: LocalRDDCheckpointData[T]) => true
case _ => false
}
}
/**
* 返回此 RDD 是否已被可靠地检查点并物化。
*
* @return 如果已被可靠地检查点并物化则返回 true,否则返回 false
*/
private[rdd] def isReliablyCheckpointed: Boolean = {
checkpointData match {
case Some(reliable: ReliableRDDCheckpointData[_]) if reliable.isCheckpointed => true
case _ => false
}
}
/**
* 获取此 RDD 被检查点到的目录的名称。
* 如果 RDD 是本地检查点,则此方法未定义。
*
* @return 检查点目录的名称的可选实例
*/
def getCheckpointFile: Option[String] = {
checkpointData match {
case Some(reliable: ReliableRDDCheckpointData[T]) => reliable.getCheckpointDir
case _ => None
}
}
/**
* 移除 RDD 的洗牌操作及其非持久化的祖先。
* 当不使用洗牌服务运行时,清理洗牌文件可以实现缩容。
* 如果在此调用后使用 RDD,应该先对其进行检查点并物化。
* 如果不确定自己在做什么,请不要使用此功能。
* 缓解孤立洗牌文件的其他技术:
* * 调整驱动程序的垃圾回收以更积极,以便触发常规的上下文清理器
* * 设置适当的洗牌文件 TTL 以自动清理
*
* @param blocking 是否阻塞直到清理完成(默认:false)
*/
@DeveloperApi
@Since("3.1.0")
def cleanShuffleDependencies(blocking: Boolean = false): Unit = {
sc.cleaner.foreach { cleaner =>
/**
* 清理洗牌操作及其所有父操作。
*/
def cleanEagerly(dep: Dependency[_]): Unit = {
dep match {
case dependency: ShuffleDependency[_, _, _] =>
val shuffleId = dependency.shuffleId
cleaner.doCleanupShuffle(shuffleId, blocking)
case _ => // 不做任何操作
}
val rdd = dep.rdd
val rddDepsOpt = rdd.internalDependencies
if (rdd.getStorageLevel == StorageLevel.NONE) {
rddDepsOpt.foreach(deps => deps.foreach(cleanEagerly))
}
}
internalDependencies.foreach(deps => deps.foreach(cleanEagerly))
}
}
/**
* :: 实验性 ::
* 将当前阶段标记为屏障阶段,在该阶段中,Spark 必须同时启动所有任务。
* 若任务失败,Spark 不会仅重启失败的任务,而是会中止整个阶段并重新启动该阶段的所有任务。
* 屏障执行模式功能尚处于实验阶段,仅能处理有限的场景。
* 请阅读相关的 SPIP 和设计文档,以了解其限制和未来规划。
* @return 一个 [[RDDBarrier]] 实例,可在屏障阶段内执行操作
* @see [[org.apache.spark.BarrierTaskContext]]
* @see <a href="https://issues.apache.org/jira/browse/SPARK-24374">
* SPIP: 屏障执行模式</a>
* @see <a href="https://issues.apache.org/jira/browse/SPARK-24582">设计文档</a>
*/
@Experimental
@Since("2.4.0")
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
/**
* 指定在计算此 RDD 时使用的资源配置文件。此功能仅在某些集群管理器中受支持,
* 并且目前需要启用动态分配。
* 这将导致获取具有指定资源的新执行器来计算该 RDD。
*/
@Experimental
@Since("3.1.0")
def withResources(rp: ResourceProfile): this.type = {
// 设置资源配置文件
resourceProfile = Option(rp)
// 将资源配置文件添加到资源配置文件管理器中
sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
this
}
/**
* 获取为此 RDD 指定的资源配置文件,如果未指定则返回 null。
* @return 用户指定的资源配置文件,若未指定则返回 null(为了 Java 兼容性)
*/
@Experimental
@Since("3.1.0")
def getResourceProfile(): ResourceProfile = resourceProfile.orNull
// =======================================================================
// 其他内部方法和字段
// =======================================================================
// 存储级别,默认为不存储
private var storageLevel: StorageLevel = StorageLevel.NONE
// 资源配置文件,初始为 None
@transient private var resourceProfile: Option[ResourceProfile] = None
/** 创建此 RDD 的用户代码(例如 `textFile`, `parallelize`)。 */
@transient private[spark] val creationSite = sc.getCallSite()
/**
* 与创建此 RDD 的操作关联的作用域。
*
* 这比调用站点更灵活,并且可以进行分层定义。更多详细信息,请参阅 {{RDDOperationScope}} 的文档。
* 如果用户在不使用任何 Spark 操作的情况下自己实例化此 RDD,则此作用域未定义。
*/
@transient private[spark] val scope: Option[RDDOperationScope] = {
// 从本地属性中获取 RDD 作用域的 JSON 字符串,并将其转换为 RDDOperationScope 对象
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
}
/**
* 获取创建此 RDD 的调用站点的简短形式。
* @return 调用站点的简短形式,如果不存在则返回空字符串
*/
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
/**
* 获取此 RDD 元素的 ClassTag。
* @return 元素的 ClassTag
*/
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
// 检查点数据,初始为 None
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
// 是否对所有标记为检查点的祖先 RDD 进行检查点操作。默认情况下,
// 我们一旦找到第一个这样的 RDD 就停止,这是一种优化,允许我们写入更少的数据,
// 但并非对所有工作负载都安全。例如,在流式处理中,我们可能会在每个批次中对 RDD 及其父 RDD 进行检查点操作,
// 在这种情况下,父 RDD 可能永远不会被检查点,其谱系也不会被截断,从而导致长期运行时出现 OOM(SPARK-6847)。
private val checkpointAllMarkedAncestors =
Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)
/**
* 返回第一个父 RDD。
* @tparam U 父 RDD 元素的类型
* @return 第一个父 RDD
*/
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
// 获取依赖项列表的第一个元素,并将其 RDD 转换为指定类型
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
/**
* 返回第 j 个父 RDD:例如,rdd.parent[T](0) 等同于 rdd.firstParent[T]。
* @param j 父 RDD 的索引
* @tparam U 父 RDD 元素的类型
* @return 第 j 个父 RDD
*/
protected[spark] def parent[U: ClassTag](j: Int): RDD[U] = {
// 获取指定索引的依赖项,并将其 RDD 转换为指定类型
dependencies(j).rdd.asInstanceOf[RDD[U]]
}
/**
* 获取创建此 RDD 的 [[org.apache.spark.SparkContext]]。
* @return 创建此 RDD 的 SparkContext
*/
def context: SparkContext = sc
/**
* 私有 API,用于更改 RDD 的 ClassTag。
* 用于内部 Java - Scala API 兼容性。
* @param cls 新的 Class 类型
* @return 具有新 ClassTag 的 RDD
*/
private[spark] def retag(cls: Class[T]): RDD[T] = {
// 根据传入的 Class 类型创建新的 ClassTag
val classTag: ClassTag[T] = ClassTag.apply(cls)
// 调用另一个 retag 方法,使用新的 ClassTag
this.retag(classTag)
}
/**
* 私有 API,用于更改 RDD 的 ClassTag。
* 用于内部 Java - Scala API 兼容性。
* @param classTag 新的 ClassTag
* @return 具有新 ClassTag 的 RDD
*/
private[spark] def retag(implicit classTag: ClassTag[T]): RDD[T] = {
// 对每个分区应用 identity 函数,并保留分区器,使用新的 ClassTag
this.mapPartitions(identity, preservesPartitioning = true)(classTag)
}
// 避免多次处理 doCheckpoint 以防止过度递归
@transient private var doCheckpointCalled = false
/**
* 对这个 RDD 进行检查点操作,即保存该 RDD。此方法在使用该 RDD 的作业完成后调用
* (因此该 RDD 已被物化并可能存储在内存中)。
* doCheckpoint() 会递归地调用其父 RDD 的 doCheckpoint() 方法。
*/
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
// 检查是否已经调用过 doCheckpoint
if (!doCheckpointCalled) {
// 标记为已调用
doCheckpointCalled = true
// 检查是否定义了检查点数据
if (checkpointData.isDefined) {
// 如果需要对所有标记为检查点的祖先 RDD 进行检查点操作
if (checkpointAllMarkedAncestors) {
// 先对父 RDD 进行检查点操作,因为在我们对自己进行检查点操作后,谱系将被截断
dependencies.foreach(_.rdd.doCheckpoint())
}
// 对当前 RDD 进行检查点操作
checkpointData.get.checkpoint()
} else {
// 若未定义检查点数据,递归调用父 RDD 的 doCheckpoint 方法
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
/**
* 将此 RDD 的依赖项从其原始父 RDD 更改为从检查点文件创建的新 RDD (`newRDD`),
* 并忘记其旧的依赖项和分区。
*/
private[spark] def markCheckpointed(): Unit = stateLock.synchronized {
// 保存旧的依赖项到弱引用中
legacyDependencies = new WeakReference(dependencies_)
// 清除依赖项
clearDependencies()
// 清空分区信息
partitions_ = null
// 忘记依赖项的构造函数参数
deps = null
}
/**
* 清除此 RDD 的依赖项。此方法必须确保移除对原始父 RDD 的所有引用,
* 以便父 RDD 可以被垃圾回收。RDD 的子类可以重写此方法以实现自己的清理逻辑。
* 请参阅 [[org.apache.spark.rdd.UnionRDD]] 以获取示例。
*/
protected def clearDependencies(): Unit = stateLock.synchronized {
// 将依赖项设置为 null
dependencies_ = null
}
/**
* 返回此 RDD 及其递归依赖项的调试描述。
* @return 调试描述字符串
*/
def toDebugString: String = {
// 获取一个 RDD 及其存储信息的调试描述,不包含其子 RDD
def debugSelf(rdd: RDD[_]): Seq[String] = {
import Utils.bytesToString
// 获取持久化信息,如果存储级别不为 NONE,则获取存储级别的描述,否则为空字符串
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
// 获取 RDD 的存储信息,包括缓存分区数、内存大小和磁盘大小
val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info =>
" CachedPartitions: %d; MemorySize: %s; DiskSize: %s".format(
info.numCachedPartitions, bytesToString(info.memSize), bytesToString(info.diskSize)))
// 返回 RDD 描述和存储信息
s"$rdd [$persistence]" +: storageInfo
}
// 对最后一个子 RDD 应用不同的规则
def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
// 获取依赖项的数量
val len = rdd.dependencies.length
len match {
case 0 => Seq.empty
case 1 =>
// 获取第一个依赖项
val d = rdd.dependencies.head
// 递归调用 debugString 方法,处理第一个依赖项的 RDD
debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]], true)
case _ =>
// 获取除最后一个依赖项之外的所有依赖项
val frontDeps = rdd.dependencies.take(len - 1)
// 递归调用 debugString 方法,处理前面的依赖项的 RDD
val frontDepStrings = frontDeps.flatMap(
d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]]))
// 获取最后一个依赖项
val lastDep = rdd.dependencies.last
// 递归调用 debugString 方法,处理最后一个依赖项的 RDD
val lastDepStrings =
debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_, _, _]], true)
// 合并前面和最后一个依赖项的调试信息
frontDepStrings ++ lastDepStrings
}
}
// 依赖栈中的第一个 RDD 没有父 RDD,因此不需要 +-
def firstDebugString(rdd: RDD[_]): Seq[String] = {
// 构建分区信息字符串
val partitionStr = "(" + rdd.partitions.length + ")"
// 计算左偏移量
val leftOffset = (partitionStr.length - 1) / 2
// 构建下一个前缀字符串
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
// 处理 RDD 自身的调试信息
debugSelf(rdd).zipWithIndex.map{
case (desc: String, 0) => s"$partitionStr $desc"
case (desc: String, _) => s"$nextPrefix $desc"
} ++ debugChildren(rdd, nextPrefix)
}
// 处理洗牌依赖的 RDD 的调试信息
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
// 构建分区信息字符串
val partitionStr = "(" + rdd.partitions.length + ")"
// 计算左偏移量
val leftOffset = (partitionStr.length - 1) / 2
// 处理前缀字符串
val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
// 构建下一个前缀字符串
val nextPrefix = (
thisPrefix
+ (if (isLastChild) " " else "| ")
+ (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
// 处理 RDD 自身的调试信息
debugSelf(rdd).zipWithIndex.map{
case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc"
case (desc: String, _) => s"$nextPrefix$desc"
} ++ debugChildren(rdd, nextPrefix)
}
// 处理 RDD 的调试信息
def debugString(
rdd: RDD[_],
prefix: String = "",
isShuffle: Boolean = true,
isLastChild: Boolean = false): Seq[String] = {
if (isShuffle) {
// 处理洗牌依赖的 RDD
shuffleDebugString(rdd, prefix, isLastChild)
} else {
// 处理非洗牌依赖的 RDD
debugSelf(rdd).map(prefix + _) ++ debugChildren(rdd, prefix)
}
}
// 调用 firstDebugString 方法并将结果拼接成字符串
firstDebugString(this).mkString("\n")
}
/**
* 返回此 RDD 的字符串表示形式。
* @return 字符串表示形式
*/
override def toString: String = "%s%s[%d] at %s".format(
Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)
/**
* 将此 RDD 转换为 JavaRDD。
* @return JavaRDD 实例
*/
def toJavaRDD() : JavaRDD[T] = {
// 创建一个 JavaRDD 实例,使用当前 RDD 和元素的 ClassTag
new JavaRDD(this)(elementClassTag)
}
/**
* 判断此 RDD 是否处于屏障阶段。在屏障阶段,Spark 必须同时启动所有任务。
*
* 如果此 RDD 的至少一个父 RDD 或其本身是从 [[RDDBarrier]] 映射而来的,则此 RDD 处于屏障阶段。
* 对于 [[ShuffledRDD]],此函数始终返回 false,因为 [[ShuffledRDD]] 表示一个新阶段的开始。
*
* 如果 [[MapPartitionsRDD]] 是从 [[RDDBarrier]] 转换而来的,则该 [[MapPartitionsRDD]] 应被标记为屏障。
* @return 如果处于屏障阶段则返回 true,否则返回 false
*/
private[spark] def isBarrier(): Boolean = isBarrier_
// 出于性能考虑,缓存该值以避免在长 RDD 链上重复计算 `isBarrier()`
@transient protected lazy val isBarrier_ : Boolean =
// 过滤掉洗牌依赖的依赖项,检查是否存在处于屏障阶段的父 RDD
dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier())
// 缓存输出确定性级别的值
private final lazy val _outputDeterministicLevel: DeterministicLevel.Value =
// 调用 getOutputDeterministicLevel 方法获取输出确定性级别
getOutputDeterministicLevel
/**
* 返回此 RDD 输出的确定性级别。请参考 [[DeterministicLevel]] 以获取定义。
*
* 默认情况下,可靠检查点的 RDD 或没有父 RDD 的根 RDD 是 DETERMINATE。
* 对于有父 RDD 的 RDD,我们将根据依赖关系为每个父 RDD 生成一个确定性级别候选。
* 当前 RDD 的确定性级别是确定性最低的候选级别。请重写 [[getOutputDeterministicLevel]] 方法
* 以提供自定义的输出确定性级别计算逻辑。
* @return 输出的确定性级别
*/
// TODO(SPARK-34612): 使其公开,以便用户可以为自定义 RDD 设置确定性级别。
// TODO: 这可以是每个分区的。例如,UnionRDD 可以为不同的分区具有不同的确定性级别。
private[spark] final def outputDeterministicLevel: DeterministicLevel.Value = {
if (isReliablyCheckpointed) {
// 如果是可靠检查点的 RDD,则返回 DETERMINATE
DeterministicLevel.DETERMINATE
} else {
// 否则返回缓存的确定性级别
_outputDeterministicLevel
}
}
/**
* 开发者 API:获取此 RDD 输出的确定性级别。
* 默认情况下,一个可靠检查点的 RDD 或没有父 RDD 的根 RDD 是 DETERMINATE。
* 对于有父 RDD 的 RDD,会根据依赖关系为每个父 RDD 生成一个确定性级别候选,
* 当前 RDD 的确定性级别是这些候选中确定性最低的级别。
* 子类可以重写此方法以提供自定义的计算逻辑。
* @return 输出的确定性级别
*/
@DeveloperApi
protected def getOutputDeterministicLevel: DeterministicLevel.Value = {
// 根据依赖关系计算每个父 RDD 的确定性级别候选
val deterministicLevelCandidates = dependencies.map {
// 对于洗牌依赖,如果父 RDD 的分区器与当前依赖的分区器相同,则认为洗牌未真正发生,
// 假设当前 RDD 的输出确定性级别与父 RDD 相同
case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) =>
dep.rdd.outputDeterministicLevel
// 对于洗牌依赖,如果父 RDD 的输出是不确定的,则洗牌输出也将是不确定的
case dep: ShuffleDependency[_, _, _] =>
if (dep.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
DeterministicLevel.INDETERMINATE
} else if (dep.keyOrdering.isDefined && dep.aggregator.isDefined) {
// 如果指定了聚合器(因此键是唯一的)并且指定了键排序,则输出是确定的
DeterministicLevel.DETERMINATE
} else {
// 在 Spark 中,归约器会同时获取多个远程洗牌块,这些洗牌块的到达顺序是完全随机的。
// 即使父映射 RDD 是确定的,归约 RDD 也总是无序的
DeterministicLevel.UNORDERED
}
// 对于窄依赖,假设当前 RDD 的输出确定性级别与父 RDD 相同
case dep => dep.rdd.outputDeterministicLevel
}
if (deterministicLevelCandidates.isEmpty) {
// 如果没有依赖项,默认假设根 RDD 是确定的
DeterministicLevel.DETERMINATE
} else {
// 选择确定性最低的级别
deterministicLevelCandidates.maxBy(_.id)
}
}
}
/**
* 包含与RDD相关的工具方法和隐式转换。
*/
object RDD {
/**
* 用于配置是否对所有标记为检查点的祖先RDD进行检查点操作的属性键。
*/
private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS =
"spark.checkpoint.checkpointAllMarkedAncestors"
/**
* 以下隐式函数在Spark 1.3之前位于SparkContext中,用户需要 `import SparkContext._` 来启用它们。
* 现在我们将它们移到这里,以便编译器自动找到它们。
* 然而,为了向后兼容,我们仍然在SparkContext中保留旧的函数,并直接转发到以下函数。
*/
/**
* 将一个键值对RDD转换为PairRDDFunctions,以便使用键值对RDD的特定操作。
*
* @param rdd 要转换的键值对RDD。
* @param kt 键的ClassTag。
* @param vt 值的ClassTag。
* @param ord 键的排序器,默认为null。
* @tparam K 键的类型。
* @tparam V 值的类型。
* @return 转换后的PairRDDFunctions实例。
*/
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
/**
* 将一个RDD转换为AsyncRDDActions,以便使用RDD的异步操作。
*
* @param rdd 要转换的RDD。
* @tparam T RDD中元素的类型。
* @return 转换后的AsyncRDDActions实例。
*/
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = {
new AsyncRDDActions(rdd)
}
/**
* 将一个键值对RDD转换为SequenceFileRDDFunctions,以便使用SequenceFile相关的操作。
*
* @param rdd 要转换的键值对RDD。
* @param kt 键的ClassTag。
* @param vt 值的ClassTag。
* @param keyWritableFactory 键的Writable工厂。
* @param valueWritableFactory 值的Writable工厂。
* @tparam K 键的类型。
* @tparam V 值的类型。
* @return 转换后的SequenceFileRDDFunctions实例。
*/
implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V],
keyWritableFactory: WritableFactory[K],
valueWritableFactory: WritableFactory[V])
: SequenceFileRDDFunctions[K, V] = {
// 隐式定义键的转换器
implicit val keyConverter = keyWritableFactory.convert
// 隐式定义值的转换器
implicit val valueConverter = valueWritableFactory.convert
new SequenceFileRDDFunctions(rdd,
keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
}
/**
* 将一个键值对RDD转换为OrderedRDDFunctions,以便使用有序RDD的特定操作。
*
* @param rdd 要转换的键值对RDD。
* @tparam K 键的类型,需要实现Ordering和ClassTag。
* @tparam V 值的类型,需要实现ClassTag。
* @return 转换后的OrderedRDDFunctions实例。
*/
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
: OrderedRDDFunctions[K, V, (K, V)] = {
new OrderedRDDFunctions[K, V, (K, V)](rdd)
}
/**
* 将一个Double类型的RDD转换为DoubleRDDFunctions,以便使用Double类型RDD的特定操作。
*
* @param rdd 要转换的Double类型RDD。
* @return 转换后的DoubleRDDFunctions实例。
*/
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = {
new DoubleRDDFunctions(rdd)
}
/**
* 将一个数值类型的RDD转换为DoubleRDDFunctions,先将RDD中的元素转换为Double类型,再进行操作。
*
* @param rdd 要转换的数值类型RDD。
* @param num 数值类型的隐式转换。
* @tparam T 数值类型。
* @return 转换后的DoubleRDDFunctions实例。
*/
implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T])
: DoubleRDDFunctions = {
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
}
}
/**
* RDD输出的确定性级别,解释了Spark重新运行RDD任务时输出的差异情况。
* 有三种确定性级别:
* 1. DETERMINATE: 重新运行后,RDD输出始终是相同的数据集,且顺序相同。
* 2. UNORDERED: 重新运行后,RDD输出始终是相同的数据集,但顺序可能不同。
* 3. INDETERMINATE: 重新运行后,RDD输出可能不同。
*
* 注意,RDD的输出通常依赖于其父RDD。当父RDD的输出是INDETERMINATE时,该RDD的输出很可能也是INDETERMINATE。
*/
private[spark] object DeterministicLevel extends Enumeration {
// 确定性级别:输出始终相同且顺序一致
val DETERMINATE,
// 确定性级别:输出相同但顺序可能不同
UNORDERED,
// 确定性级别:输出可能不同
INDETERMINATE = Value
}