1、Job启动流程
1、Client触发 SparkContext
初始化
2、SparkContext
向 Master
注册应用
3、Master
调度 Worker
启动 Executor
4、Worker
进程启动 Executor
5、DAGScheduler
将作业分解为 Stage
:
6、TaskScheduler
分配 Task
到 Executor
2、核心组件
组件 | 职责 |
---|---|
SparkContext | 应用入口,协调各组件,管理应用生命周期。 |
DAGScheduler | 将 Job 拆分为 Stage,构建 DAG,提交 TaskSet 给 TaskScheduler。 |
TaskScheduler | 调度 Task 到 Executor,处理故障重试。 |
CoarseGrainedSchedulerBackend | 与集群管理器交互,申请资源,管理 Executor。 |
ExternalClusterManager | 抽象层,适配不同集群(Standalone/YARN/Mesos)。 |
Master & Worker | Standalone 模式下管理集群资源(Master 分配资源,Worker 启动 Executor)。 |
Executor | 在 Worker 上运行,执行 Task,管理内存/磁盘。 |
CoarseGrainedExecutorBackend | Executor 的通信代理,接收 Task,返回状态/结果。 |
Task | 计算单元(ShuffleMapTask / ResultTask)。 |
ShuffleManager | 管理 Shuffle 数据读写(如 SortShuffleManager)。 |
3、工作流程
1、SparkContext
负责资源申请、任务提交、与集群管理器通信。
调用runJob
方法,将RDD操作传递给DAGScheduler
2、DAGScheduler
将Job拆分为Stage(DAG),处理Shuffle依赖,提交TaskSet给TaskScheduler。
1、DAGSchedulerEvent
/* 作业生命周期事件 */
JobSubmitted //新作业提交时触发
JobCancelled //单个作业被取消
JobGroupCancelled //作业组整体取消
JobTagCancelled //按标签批量取消作业
AllJobsCancelled //取消所有运行中的作业
/* 阶段执行事件 */
MapStageSubmitted //Shuffle Map阶段提交
StageCancelled //单个阶段取消
StageFailed //阶段执行失败
ResubmitFailedStages //自动重试失败阶段 ,默认4次
/* 任务调度事件 */
TaskSetFailed //整个任务集失败,默认4次
SpeculativeTaskSubmitted //启动推测执行任务
UnschedulableTaskSetAdded //任务集进入待调度队列
UnschedulableTaskSetRemoved //任务集离开待调度队列
/* Shuffle 优化事件 */
RegisterMergeStatuses //注册Shuffle合并状态
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle数据推送完成
/* 资源管理事件 */
ExecutorAdded //新Executor注册成功
ExecutorLost //Executor异常丢失
WorkerRemoved //工作节点移除
/* 执行过程事件 */
BeginEvent //任务集开始执行
GettingResultEvent //驱动程序主动获取任务结果
CompletionEvent //作业/阶段完成
2、stage拆分流程
*ResultStage
(执行作的最后一个阶段)、ShuffleMapStage
(shuffle映射输出文件)*
- 用户行动操作触发
submitJob
,发送JobSubmitted
事件。 handleJobSubmitted
处理事件,调用createResultStage
创建ResultStage。createResultStage
调用getOrCreateParentStages
获取父Stage,父Stage的创建会递归进行。- 在创建父Stage的过程中,遇到宽依赖则创建ShuffleMapStage,并递归创建其父Stage。
- 当所有父Stage都创建完成后,回到
handleJobSubmitted
,调用submitStage
提交ResultStage。 submitStage
检查父Stage是否完成,如果有未完成的父Stage,则递归提交父Stage;否则,提交当前Stage(调用submitMissingTasks
)。submitMissingTasks
为Stage创建任务(ShuffleMapTask或ResultTask),并提交给TaskScheduler执行。
3、宽窄依赖切分
private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
if (stage == target) {
return true
}
// DFS遍历RDD依赖树
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += stage.rdd
def visit(rdd: RDD[_]): Unit = {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
for (dep <- rdd.dependencies) {
dep match {
// 宽依赖:创建新的ShuffleMapStage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
waitingForVisit.prepend(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
// 窄依赖:继续回溯
case narrowDep: NarrowDependency[_] =>
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.remove(0))
}
visitedRdds.contains(target.rdd)
}
3、TaskScheduler
接收TaskSet,按调度策略(FIFO/FAIR)将Task分配给Executor。
1、执行流程
1、DAGScheduler 调用 taskScheduler.submitTasks()
后,任务进入 TaskScheduler 调度阶段
2、任务提交submitTasks
// TaskSetManager管理任务集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到调度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 触发资源分配
backend.reviveOffers()
3、资源分配 (Driver)
// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {
driverEndpoint.send(ReviveOffers) // 向DriverEndpoint发送消息
}
// DriverEndpoint处理
case ReviveOffers =>
makeOffers() // 触发资源分配
4、资源分配核心
private def makeOffers(): Unit = {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// 1. 获取所有可用Executor资源
val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }
val workOffers = activeExecutors.map {
case (id, executorData) => buildWorkerOffer(id, executorData)
}.toIndexedSeq
// 2. 调用任务调度器分配任务
scheduler.resourceOffers(workOffers, true)
}
// 3. 启动分配的任务
if (taskDescs.nonEmpty) {
launchTasks(taskDescs)
}
}
5、任务启动
// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
for (task <- tasks.flatten) {
// 1. 序列化任务
val serializedTask = TaskDescription.encode(task)
// 2. 检查任务大小
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
// Do resources allocation here. The allocated resources will get released after the task
// finishes.
executorData.freeCores -= task.cpus
task.resources.foreach { case (rName, addressAmounts) =>
executorData.resourcesInfo(rName).acquire(addressAmounts)
}
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
// 发送任务到Executor
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}