AM中是如何启动Driver的?调用了什么方法?

发布于:2025-04-12 ⋅ 阅读:(39) ⋅ 点赞:(0)

在 Apache Spark 的 YARN 集群模式 下,Application Master (AM) 启动 Driver 的过程涉及以下关键步骤和方法调用:


1. 整体流程

  1. 应用提交:用户通过 spark-submit 提交作业到 YARN 集群。
  2. ResourceManager 分配容器:YARN 的 ResourceManager 分配一个容器用于启动 AM。
  3. AM 初始化:AM 在容器中启动,并负责管理整个 Spark 应用的生命周期。
  4. Driver 启动:AM 调用用户定义的 Driver 主类(即 --class 指定的类),触发 SparkContext 初始化。

2. 核心代码路径(Spark 源码解析)

(1) AM 入口类
  • org.apache.spark.deploy.yarn.ApplicationMaster
    这是 Spark 在 YARN 模式下 AM 的核心实现类,负责与 YARN ResourceManager 交互,并启动 Driver。
(2) 启动 Driver 的关键方法
  • run() 方法
    AM 的 run() 方法根据部署模式(Client/Cluster)选择执行逻辑。在 Cluster 模式 下调用 runDriver()

    // ApplicationMaster.scala
    private def run(): Unit = {
      if (isClusterMode) {
        runDriver()
      } else {
        runExecutorLauncher()
      }
    }
    
  • runDriver() 方法
    负责启动用户定义的 Driver 主类:

    1. 设置环境:配置类路径、JVM 参数、安全上下文等。
    2. 反射调用用户主类:通过 Java 反射机制调用用户指定的 Driver 类的 main 方法。
    private def runDriver(): Unit = {
      // 1. 创建用户线程启动 Driver
      val userClassThread = new Thread {
        override def run(): Unit = {
          try {
            // 2. 反射加载用户主类
            val mainMethod = userClassLoader.loadClass(args.userClass)
              .getMethod("main", classOf[Array[String]])
            // 3. 调用 main 方法(触发 SparkContext 初始化)
            mainMethod.invoke(null, args.userArgs.toArray)
          } catch {
            case e: Throwable => // 异常处理
          }
        }
      }
      userClassThread.setName("Driver")
      userClassThread.start()
      userClassThread.join()
    }
    

3. 关键调用链

ApplicationMaster.run() 
  → runDriver() 
    → 创建用户线程(userClassThread) 
      → userClassLoader.loadClass(args.userClass) 
        → mainMethod.invoke() 
          → 用户主类的 main() 
            → new SparkContext() 
              → 初始化 Driver 并提交任务

4. 核心配置与参数

  • 用户主类:通过 --class 参数指定(如 com.example.MySparkJob)。
  • Driver 内存/CPU:通过 --driver-memory--driver-cores 配置。
  • 类加载器:AM 使用 MutableURLClassLoader 加载用户 JAR 和依赖。

5. 注意事项

  1. 资源分配
    • AM 所在的容器资源由 spark.yarn.am.memoryspark.yarn.am.cores 配置。
    • Driver 运行在 AM 的容器内(Cluster 模式),共享其资源。
  2. 日志管理
    • Driver 的日志通过 YARN 的日志聚合功能收集,可通过 yarn logs -applicationId <appId> 查看。
  3. 故障恢复
    • 若 Driver 进程崩溃,AM 会标记应用为失败,YARN 会尝试重启(需配置 spark.yarn.maxAppAttempts)。

总结

在 YARN 集群模式下,AM 通过反射调用用户主类的 main 方法启动 Driver,具体代码实现在 ApplicationMaster.runDriver() 中。Driver 运行在 AM 的容器内,负责初始化 SparkContext 并调度任务。此机制确保了 Spark 应用与 YARN 资源管理的无缝集成。


网站公告

今日签到

点亮在社区的每一天
去签到