Spark Memory 内存设计的核心组件、对比Flink内存配置

发布于:2025-08-11 ⋅ 阅读:(16) ⋅ 点赞:(0)

1、内存区域划分

1、driver

+-------------------------------------------------------+
|                  Driver 进程总内存                     |
+-------------------------------------------------------+
|  +------------------+  +----------------------------+ |
|  |   JVM 堆内内存    |  |       堆外内存区域           | |
|  | (spark.driver.   |  |                            | |
|  |     memory)      |  +----------------------------+ |
|  +------------------+  |  1. Spark管理堆外内存池      | |
|  |                  |  | (可选,通常不启用)           | |
|  | +--------------+ |  +----------------------------+ |
|  | | 统一内存池    | |  |  2. JVM/系统开销内存         | |
|  | | (60%默认)     | |  | (spark.driver.memoryOverhead)| 
|  | | +----------+ | |  |   - JVM 开销                | |
|  | | | 存储内存  | | |  |   - 网络缓冲区              | |
|  | | | (主区域)  | | |  |   - 广播变量传输缓冲区       | |
|  | | +----------+ | |  |   - 收集结果缓冲区           | |
|  | |              | |  +----------------------------+ |
|  | | [执行内存]    | |                                 |
|  | | (极小)       | |                                 |
|  | +--------------+ |                                 |
|  | +--------------+ |                                 |
|  | | 用户内存      | |                                 |
|  | | (40%默认)     | |                                 |
|  | |   - 收集数据   | |                                 |
|  | |   - 应用状态   | |                                 |
|  | +--------------+ |                                 |
|  +------------------+                                 |
+-------------------------------------------------------+

2、executor

+-------------------------------------------------------+
|                  Executor 进程总内存                   |
+-------------------------------------------------------+
|                                                       |
|  +------------------+  +----------------------------+ |
|  |   JVM 堆内内存    |  |       堆外内存区域           | |
|  | (spark.executor. |  |                            | |
|  |     memory)      |  +----------------------------+ |
|  +------------------+  |  1. Spark管理堆外内存池      | |
|  |                  |  | (spark.memory.offHeap.size) | |
|  | +--------------+ |  |   - 存储内存 (Storage)       | |
|  | | 统一内存池    | |  |   - 执行内存 (Execution)     | |
|  | | (60%默认)     | |  +----------------------------+ |
|  | | +----------+ | |  |  2. JVM/系统开销内存         | |
|  | | | 执行内存  | | |  | (spark.executor.memoryOverhead)| 
|  | | | (50%默认)| | |  |   - JVM 自身开销             | |
|  | | +----------+ | |  |   - 线程栈                   | |
|  | | +----------+ | |  |   - 本地库(NIO/Native)       | |
|  | | | 存储内存  | | |  |   - 内存映射文件(MMAP)       | |
|  | | | (50%默认)| | |  |   - PySpark/Python进程       | |
|  | | +----------+ | |  |   - 用户代码直接分配堆外内存   | |
|  | +--------------+ |  +----------------------------+ |
|  | +--------------+ |                                 |
|  | | 用户内存      | |                                 |
|  | | (40%默认)     | |                                 |
|  | |   - UDF对象   | |                                 |
|  | |   - 用户数据  | |                                 |
|  | +--------------+ |                                 |
|  +------------------+                                 |
+-------------------------------------------------------+
参数 目标问题 配置关键点
spark.memory.offHeap.enabled GC压力、超大内存 、大量缓存/Shuffle spark.memory.offHeap.enabled、spark.memory.offHeap.size
spark.executor.memoryOverhead pyspark、大量shuffle、kyro序列化、 使用 PySpark/本地库 spark.executor.memoryOverhead

2、确定内存消耗

1、估算数据集所需的内存消耗量的最佳方法是创建一个 RDD,将其放入缓存中,并查看 Web UI 中的“存储”页面。该页面会告诉您 RDD 占用了多少内存。

2、要估算某个对象的内存消耗,请使用 SizeEstimatorestimate 方法。这在尝试不同的数据布局以减少内存使用量以及确定广播变量在每个执行器堆中所占的空间时非常有用。
*https://www.infoworld.com/article/2077408/sizeof-for-java.html*

3、Driver、Executor 内存

Driver 存储元数据(RDD依赖、Task调度状态) MemoryManager(仅Storage)
Executor 运行Task,处理Shuffle/缓存数据 TaskMemoryManager + MemoryConsumer

每个Task独占一个TaskMemoryManager,管理其生命周期内的内存分配

//Executor.scala
override def run(): Unit = {
      ......
      **val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)**
      val deserializeStartTimeNs = System.nanoTime()
      val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
      } else 0L
      Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
      val ser = env.closureSerializer.newInstance()
      logInfo(log"Running ${LogMDC(TASK_NAME, taskName)}")
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      var taskStartTimeNs: Long = 0
      var taskStartCpu: Long = 0
      startGCTime = computeTotalGcTime()
      var taskStarted: Boolean = false

      try {
        // Must be set before updateDependencies() is called, in case fetching dependencies
        // requires access to properties contained within (e.g. for access control).
        Executor.taskDeserializationProps.set(taskDescription.properties)

        updateDependencies(
          taskDescription.artifacts.files,
          taskDescription.artifacts.jars,
          taskDescription.artifacts.archives,
          isolatedSession)
        // Always reset the thread class loader to ensure if any updates, all threads (not only
        // the thread that updated the dependencies) can update to the new class loader.
        Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
        task = ser.deserialize[Task[Any]](
          taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
        task.localProperties = taskDescription.properties
        **task.setTaskMemoryManager(taskMemoryManager)**
        ......
      } finally {
        cleanMDCForTask(taskName, mdcProperties)
        runningTasks.remove(taskId)
        if (taskStarted) {
          // This means the task was successfully deserialized, its stageId and stageAttemptId
          // are known, and metricsPoller.onTaskStart was called.
          metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)
        }
      }
    }

4、核心组件

1、MemoryManager → UnifiedMemoryManager

**统一内存池 + 动态借用。**它在执行和存储之间强制执行软边界,以便任何一方都可以从另一方借用内存。

参数 默认值 作用 计算公式
spark.memory.fraction 0.6 统一内存池占总可用堆内存(扣除 300MB)的比例 Spark Memory = (总堆内存 - 300MB) × 0.6
spark.memory.storageFraction 0.5 Storage 内存在统一内存池中的初始占比 初始 Storage 内存 = Spark Memory × 0.5

1、交互流程

  • Execution内存不足时

强制从Storage“借用”内存(若Storage有缓存,则溢写磁盘)。

  • Storage内存不足时

可借用空闲Execution内存,但Execution需用时能强制收回。

  • Task间公平调度

ExecutionMemoryPool保证每个Task最小值总执行内存/(2*并行Task数)内存,最大值总执行内存 / 并行Task数

private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // TODO: clean up this clunky method signature

    // Add this task to the taskMemory map just so we can keep an accurate count of the number
    // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      **val minMemoryPerTask = poolSize / (2 * numActiveTasks)**

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(log"TID ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} waiting for at least 1/2N of" +
          log" ${MDC(POOL_NAME, poolName)} pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }

2、配置优化建议

  1. 缓存密集型任务(如迭代算法)
    • 提高 spark.memory.storageFraction(如 0.6),确保更多缓存数据保留在内存中。
  2. Shuffle 密集型任务(如大数据关联)
    • 降低 spark.memory.storageFraction(如 0.4),优先保障 Execution 内存,避免 Shuffle 溢写磁盘

2、MemoryPool

  • StorageMemoryPool:记录Storage内存使用。
  • ExecutionMemoryPool:记录Execution内存使用。

3. TaskMemoryManager

  • 职责:管理单个Task的内存(堆内/堆外),通过页式内存抽象(Page)统一寻址。
  • 关键机制
    • 将内存划分为Page(通过MemoryBlock表示)。
    • 使用13位页号 + 51位偏移量的64位地址编码,统一寻址堆内/堆外内存。
    • 允许我们寻址 8192 (2^31 - 1) 8 字节,大约是 140 TB 的内存。
  • 依赖MemoryAllocator分配物理内存,HeapMemoryAllocator/UnsafeMemoryAllocator
特性 堆内内存 堆外内存
分配方式 JVM 堆内 long[] Unsafe.allocateMemory()
地址空间 对象引用 + 偏移量 直接内存地址
GC 影响 受GC暂停影响 不受GC影响

4. MemoryAllocator

物理内存分配器:

  • HeapMemoryAllocator:分配堆内内存(基于long[]数组)。
  • UnsafeMemoryAllocator:通过sun.misc.Unsafe直接分配堆外内存(避免GC开销)。

5. MemoryConsumer

抽象类,代表需要内存的组件(如Shuffle、排序)。关键方法:

  • spill():内存不足时将数据溢写到磁盘。
  • acquireMemory():向TaskMemoryManager申请内存。

5、spark、flink 对比

维度 Flink Spark
核心创新 堆外托管内存(Managed Memory) 统一内存(Unified Memory)
流批一致性 ✅ 流批使用同一内存模型 ⚠️ 批处理优化为主
GC 影响 通过堆外内存降低 GC 压力 依赖堆内内存,GC 调优复杂
状态处理 托管内存直接服务 RocksDB 状态后端 无专用状态内存,依赖堆内/用户内存
Shuffle 性能 固定网络缓冲保障稳定性 动态内存可能因缓存挤压导致抖动
配置复杂度 高(需理解多区域划分) 中(聚焦堆内/堆外比例)

1、Flink内存配置

# 设置总容器内存为 4GB
taskmanager.memory.process.size: 4096m

# 细分内存区域(框架内存10%、任务堆内40%、托管内存20%...)
taskmanager.memory.framework.heap.size: 256m
taskmanager.memory.task.heap.size: 1024m
taskmanager.memory.managed.size: 1024m
taskmanager.memory.network.min: 512m

2、Spark内存配置

# 设置 Executor 总内存
spark.executor.memory: 2g  # 堆内内存
spark.executor.memoryOverhead: 1g  # 堆外开销
spark.memory.fraction: 0.6  # 统一内存占堆内比例
spark.memory.storageFraction: 0.5  # Storage 初始占比

网站公告

今日签到

点亮在社区的每一天
去签到