Spark 深入解析

发布于:2025-03-07 ⋅ 阅读:(23) ⋅ 点赞:(0)

Spark 是目前最流行的分布式计算框架之一,特别适用于大规模数据处理和机器学习任务。我们将从 入门、架构、与其他组件的关系、底层实现 等角度深入分析,并探讨其优缺点。


1️⃣ Spark 入门

🌟 什么是 Spark?

Spark 是一个 分布式数据处理框架,主要用于:

  • 大规模数据处理(比 Hadoop 更快)
  • 流计算(支持实时数据处理)
  • 机器学习(MLlib)
  • 图计算(GraphX)

核心思想:提供内存计算数据重用,避免 Hadoop 需要反复读写 HDFS 导致的性能瓶颈。


🚀 Spark 基本概念

概念 说明
RDD(弹性分布式数据集) Spark 的核心数据结构,分布式计算的基础
DataFrame 类似于 SQL 表,基于 RDD 但更优化
Dataset DataFrame 的升级版,支持更强的类型安全
SparkSession Spark 运行的入口(取代了 SparkContext)
DAG(有向无环图) Spark 内部执行计划,优化任务调度
Executor 负责运行任务的进程
Driver 负责控制整个 Spark 任务的进程
Task 具体的计算单元
Partition 数据划分的基本单位

🔥 代码示例

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()

# 读取数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 执行查询
df.groupBy("category").count().show()

# 关闭 Spark
spark.stop()

2️⃣ Spark 架构

🌟 Spark 组件

Spark 由多个核心组件构成:

  1. Spark Core:基础计算引擎(RDD、任务调度、DAG 执行)
  2. Spark SQL:支持 SQL 查询和 DataFrame 操作
  3. Spark Streaming:流式计算(基于微批次)
  4. MLlib:机器学习库
  5. GraphX:图计算库

🔥 Spark 运行架构

1️⃣ 典型的 Spark 运行流程
  1. Driver(驱动程序)
    • 负责解析用户提交的代码,生成 DAG(有向无环图)
    • 将任务划分成 Stage
    • 提交给 Cluster Manager
  2. Cluster Manager(集群管理器)
    • Standalone:Spark 自带的管理器
    • YARN:Hadoop 生态集群管理器
    • Kubernetes:现代容器化集群管理
  3. Executors(执行器)
    • 负责真正执行计算任务,每个 Executor 内有多个 Task
    • 计算完成后,Driver 负责收集结果

🔥 Spark 任务调度

Spark 采用 DAG(有向无环图)延迟计算(Lazy Evaluation)

  1. 解析代码,构建 DAG(依赖关系图)
  2. 拆分 DAG 为多个 Stage(按 Shuffle 边界)
  3. 将 Stage 拆成多个 Task 并提交到 Executor 执行
  4. 最终汇总结果返回给 Driver

3️⃣ Spark 与其他组件的关系

对比项 Spark Hadoop MapReduce Flink
计算模式 内存计算,支持批处理和流计算 批处理,基于磁盘 IO 流计算为主
执行方式 DAG(有向无环图)调度 线性任务调度 基于流的事件驱动
适用场景 机器学习、大数据分析、ETL 离线任务 实时计算、低延迟分析
依赖 可以独立运行,也可以跑在 YARN 上 必须依赖 Hadoop YARN 可以独立运行

👉 总结

  • Spark 比 MapReduce 快,因为它是 内存计算,减少了磁盘 IO。
  • 比 Flink 更通用,但 Flink 在实时计算上更强

4️⃣ Spark 底层实现

🔥 RDD(弹性分布式数据集)

RDD 是 Spark 的核心抽象,具有以下特性:

  1. 分布式:数据切分成多个 Partition
  2. 弹性:支持容错机制(基于 Lineage)
  3. 不可变:RDD 只能转换,不能修改
  4. 惰性计算:不会立即执行,只有触发 Action 才执行
  5. 缓存优化:可以将 RDD 结果缓存,提高后续计算速度

🔥 DAG 执行

Spark 任务的执行流程:

  1. 解析代码,构建 DAG(有向无环图)
  2. 将 DAG 拆分成多个 Stage,按照宽窄依赖划分
  3. Task 由 Executor 执行,完成计算
  4. 最终结果返回给 Driver

👉 优化点

  • Pipeline 执行:多个 Map 操作可以合并
  • BroadCast 变量:避免重复传输相同数据
  • Data Locality:尽量让计算靠近数据

5️⃣ Spark 优缺点

✅ 优势

  1. 🚀 速度快
    • 基于内存计算,比 Hadoop 快 10~100 倍
    • DAG 执行,比传统 MapReduce 更高效
  2. 🔥 易用性
    • 支持 SQL、Python、Scala、Java
    • 封装了 DataFrame、Dataset API
  3. 📡 生态完整
    • 支持批处理、流计算、机器学习
    • 可以集成 Hadoop、Kafka、Flink
  4. 💪 高容错
    • 任务失败可基于 RDD Lineage 重新计算
  5. ⚡ 动态调度
    • Spark 可以根据任务动态调整资源

❌ 缺点

  1. 💾 内存占用高
    • Spark 主要依赖内存,数据量大时可能 OOM(可通过 spill 到磁盘)
  2. 🌐 网络开销
    • 大量的 Shuffle 操作会导致网络 IO 增加
  3. 🛠 调优复杂
    • 参数调优涉及 Executor 内存、并行度、数据倾斜等
  4. 🚫 不适合低延迟流计算
    • Flink 在实时计算上比 Spark Streaming 更强

🔥 RDD(弹性分布式数据集)深度解析

RDD(Resilient Distributed Dataset)是 Spark 最核心的数据抽象,它解决了分布式计算中的数据分布、容错、并行计算等问题,使得 Spark 能够高效地执行大规模数据处理任务。


1️⃣ RDD 是什么?为什么叫 "弹性分布式数据集"?

RDD 的全称是 Resilient Distributed Dataset,即 弹性分布式数据集,它具备以下几个核心特点:

特性 含义
分布式(Distributed) 数据集被划分为多个 Partition,分布在不同节点上并行计算
弹性(Resilient) 通过 Lineage(血统) 记录计算历史,支持容错和重算
不可变(Immutable) RDD 创建后不能修改,所有计算会生成新的 RDD
惰性计算(Lazy Evaluation) 只有执行 Action 操作时,RDD 才会真正计算
并行计算(Parallel) 基于 Partition,可以在多个 Executor 上并行计算
容错(Fault-Tolerant) 依赖 DAG 血统 机制恢复丢失数据,避免 Checkpoint 开销

2️⃣ RDD 底层是怎么做的?

(1)RDD 的核心组成

RDD 在底层由以下 5 个核心元素组成:

RDD 组成部分 作用
分区(Partition) RDD 内部数据划分的基本单位,每个分区可在不同节点并行计算
依赖(Dependency) 记录 RDD 之间的计算关系(血统)
计算函数(Compute) 在每个分区上如何计算数据
分区器(Partitioner) 用于决定 RDD 数据如何分布(如 Hash 分区或 Range 分区)
存储位置(Preferred Locations) 指定 RDD 数据的最佳存放位置(数据本地性优化)

(2)RDD 分区(Partition)

Spark 会自动根据数据量划分 RDD 为多个 Partition,并行计算时,每个分区的 Task 会在不同的 Executor 里运行,避免单点瓶颈。

分区示例

rdd = spark.sparkContext.parallelize(range(10), numSlices=4)
print(rdd.getNumPartitions())  # 输出: 4

💡 numSlices=4 指定了 4 个分区,每个分区的数据如下:

Partition 0: [0, 1]
Partition 1: [2, 3]
Partition 2: [4, 5]
Partition 3: [6, 7, 8, 9]

如何查看分区数据?

print(rdd.glom().collect())  
# [[0, 1], [2, 3], [4, 5], [6, 7, 8, 9]]

(3)RDD 的计算模型

RDD 的计算分为两种操作:

  1. 转换(Transformation)

    • 生成新的 RDD,不会立即执行(懒执行)
    • 例如 map(), filter(), flatMap(), groupBy()
    • 形成 DAG(有向无环图),记录计算过程
  2. 行动(Action)

    • 触发计算,返回最终结果
    • 例如 count(), collect(), reduce(), saveAsTextFile()
    • 只有执行 Action 时,RDD 才会真正计算

示例

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
mapped_rdd = rdd.map(lambda x: x * 2)  # 转换操作(Transformation)
result = mapped_rdd.collect()  # 行动操作(Action)
print(result)  # [2, 4, 6, 8, 10]

📌 注意:只有 collect() 执行时,Spark 才真正计算 RDD!


(4)RDD 依赖关系(血统 DAG)

RDD 之间的转换形成了 DAG(有向无环图),Spark 通过 DAG 追踪数据计算过程,以支持 容错机制

RDD 依赖分为两种:

  1. 窄依赖(Narrow Dependency)
    • 父 RDD 和子 RDD 之间是 1 对 1 关系
    • 不需要 Shuffle,计算更快
    • 例如:map(), filter(), flatMap()
  2. 宽依赖(Wide Dependency)
    • 子 RDD 依赖多个父 RDD
    • 需要 Shuffle,性能较低
    • 例如:groupByKey(), reduceByKey(), join()

示例

rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])
rdd2 = rdd1.reduceByKey(lambda x, y: x + y)  # 需要 Shuffle

📌 优化策略:尽量减少 Shuffle 操作,使用 mapPartitionsreduceByKey 替代 groupByKey


(5)RDD 容错机制

RDD 不存储中间数据,而是依赖 血统(Lineage) 记录转换过程。当节点故障时,Spark 可以通过 重算丢失的分区 来恢复数据,而不需要重新计算整个 RDD。

💡 示例:Executor 挂掉时,RDD 如何恢复?

rdd = spark.sparkContext.textFile("data.txt").map(lambda x: (x, 1))
rdd.persist()  # 允许缓存
print(rdd.count())  # 触发计算

如果 Executor 挂掉

  • 未缓存:Spark 通过 DAG 血统 重新计算丢失分区
  • 已缓存(persist()):Spark 直接从缓存中恢复数据,避免重新计算

3️⃣ 为什么 RDD 重要?

  • 高容错:血统机制,支持分区级别恢复
  • 高并行:自动分区,支持分布式计算
  • 灵活性:支持 SQL、流计算、机器学习等多种场景
  • 性能优化:减少 Shuffle,使用缓存(persist)

4️⃣ RDD vs DataFrame vs Dataset

特性 RDD DataFrame Dataset
API 级别 低级 高级 高级
类型安全
性能 较慢 快(优化执行计划) 快(优化执行计划)
适用场景 复杂数据结构 结构化数据(SQL) 类型安全的大数据处理

💡 什么时候用 RDD?

  • 需要更灵活的 API 控制
  • 需要处理 非结构化数据(比如日志解析)
  • 需要自定义复杂的计算逻辑

💡 什么时候用 DataFrame?

  • 处理 结构化数据
  • 性能要求更高,因为 Catalyst 优化器会自动优化查询计划

🔥 总结

  • RDD 是 Spark 的核心数据结构,是一个 分布式、弹性、不可变 的数据集
  • 计算是 惰性执行 的,只有 Action 操作 才会触发真正的计算
  • 窄依赖更快,宽依赖需要 Shuffle
  • 容错机制基于 DAG 血统
  • 在大部分情况下,DataFrame 比 RDD 更快,但 RDD 更灵活

👉 结论:RDD 是 Spark 的基础,但在实际开发中,DataFrame/Dataset 更高效! 🚀

为什么 Spark 需要 DataFrame 和 Dataset,而不仅仅是 RDD?

RDD 是 Spark 最基础的数据抽象,它提供了分布式计算的灵活性,但 在性能、优化、类型安全等方面存在一些缺陷。因此,Spark 引入了 DataFrame 和 Dataset 来弥补 RDD 的不足,使得大数据计算更高效、更易用。


1️⃣ RDD、DataFrame 和 Dataset 的核心区别

特性 RDD(弹性分布式数据集) DataFrame(数据表) Dataset(数据集)
数据结构 松散的数据(对象) 结构化数据(行列) 结构化数据(行列+类型安全)
API 风格 低级 API 高级 SQL API 高级 + 类型安全
性能 低(无优化) 高(优化执行计划) 高(优化执行计划 + 类型安全)
优化 无优化,每个操作都会触发计算 Catalyst 优化器 生成高效查询计划 Catalyst + Tungsten 优化
存储方式 直接存储 Java/Python 对象 以列存格式存储,更紧凑高效 以列存格式存储,支持对象操作
类型安全 ❌ 否 ❌ 否 ✅ 是(仅限 Scala/Java)
适用场景 自定义逻辑、非结构化数据 结构化数据、SQL 查询 结构化数据、强类型检查

2️⃣ 为什么 RDD 不够好?DataFrame / Dataset 的改进点

❌ 问题 1:RDD 没有优化,性能较低

RDD 不会优化执行计划,每个操作都会触发计算,导致 大量的中间数据 生成,影响性能。

示例(RDD 计算):

rdd = spark.sparkContext.textFile("data.txt")
rdd = rdd.map(lambda x: (x.split(",")[0], int(x.split(",")[1])))
rdd = rdd.reduceByKey(lambda x, y: x + y)
rdd.collect()

⚠️ 问题:

  • 每次 split() 操作都会重复执行,浪费 CPU 计算资源
  • reduceByKey() 需要手动优化

DataFrame 解决方案

df = spark.read.csv("data.txt", header=True, inferSchema=True)
df.groupBy("name").sum("value").show()

DataFrame 让 Spark 直接优化执行计划避免重复计算,性能更高。


❌ 问题 2:RDD 不是列存格式,内存效率低

RDD 存储的是 Java/Python 对象,这些对象占用 大量内存,并且 GC(垃圾回收)压力大,导致执行慢。

DataFrame / Dataset 采用 列存格式(Columnar Storage)

  • 避免 Java/Python 对象的开销
  • 提高 CPU 缓存命中率
  • 减少序列化/反序列化成本

📌 示例

df = spark.read.parquet("data.parquet")  # 读取列存格式数据,速度更快

DataFrame / Dataset 直接读取列存格式,避免不必要的内存开销。


❌ 问题 3:RDD 不支持 SQL 查询

RDD 不能像数据库一样查询,需要写复杂的代码进行数据处理。

RDD

rdd = spark.sparkContext.parallelize([("Alice", 20), ("Bob", 25)])
rdd.filter(lambda x: x[1] > 21).collect()

DataFrame

df = spark.createDataFrame([("Alice", 20), ("Bob", 25)], ["name", "age"])
df.filter(df.age > 21).show()

SQL

df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age > 21").show()

DataFrame / SQL 提供了更简洁、更易维护的查询方式!


❌ 问题 4:RDD 没有类型安全

RDD 在 Python / Scala 中都是弱类型的,运行时可能出现 类型错误无法在编译期检查

RDD

rdd = spark.sparkContext.parallelize([("Alice", 20), ("Bob", "25")])  # 这里"25"是字符串
rdd.map(lambda x: x[1] + 10).collect()  # ❌ 运行时报错

Dataset(强类型)

case class Person(name: String, age: Int)
val ds = spark.createDataset(Seq(Person("Alice", 20), Person("Bob", 25)))
ds.map(_.age + 10).show()  // ✅ 编译时检查,保证 age 是 Int 类型

Dataset(仅支持 Scala/Java)提供了强类型检查,防止运行时错误!


3️⃣ 什么时候用 RDD、DataFrame 和 Dataset?

使用场景 推荐使用
处理 非结构化数据(如日志、文本) RDD
需要 SQL 查询,处理结构化数据 DataFrame / Dataset
需要 高性能计算,避免重复计算 DataFrame / Dataset
需要 强类型检查,避免运行时错误 Dataset(仅限 Scala/Java)
需要 低级 API 进行复杂计算 RDD

🔥 4️⃣ 总结

RDD DataFrame Dataset
灵活,但性能低 优化查询计划,适合结构化数据 同时优化性能 + 提供类型安全
不支持 SQL 支持 SQL 查询 支持 SQL 查询
无优化,计算慢 Catalyst 优化器自动优化 Catalyst + 类型安全
需要手动管理分区 自动优化分区 自动优化分区 + 类型安全

👉 结论

  • 如果数据是结构化的(表格式),用 DataFrame / Dataset
  • 如果数据是非结构化的(自由格式),用 RDD
  • 如果需要 SQL 查询,用 DataFrame / Dataset
  • 如果需要强类型检查(Scala/Java),用 Dataset
  • 实际开发中,DataFrame 是首选,RDD 仅用于特殊情况! 🚀


网站公告

今日签到

点亮在社区的每一天
去签到