PySpark基础知识(python)

发布于:2025-09-14 ⋅ 阅读:(21) ⋅ 点赞:(0)

PySpark 是 Apache Spark 的 Python API,它允许开发者使用 Python 语言编写 Spark 应用程序,结合了 Python 的易用性和 Spark 的分布式计算能力,是处理大规模数据的强大工具。

一、安装与环境配置

  1. 安装方式
    通过 pip 安装:pip install pyspark(速度慢可以使用清华镜像Links for pyspark
    (需注意:PySpark 依赖 Java 环境,需提前安装 JDK 8 或以上版本)

  2. 运行模式

    • 本地模式(local[*]):用于开发测试,利用本地 CPU 核心模拟集群。
    • 集群模式:需部署 Spark 集群(Standalone/YARN/Kubernetes),适合生产环境。

二、PySpark 的核心定位

  • 桥梁作用:PySpark 将 Spark 的分布式计算能力与 Python 的生态系统(如 NumPy、Pandas、Scikit-learn)无缝结合,让熟悉 Python 的开发者无需学习 Scala(Spark 原生语言)即可使用 Spark。
  • 适用场景:大规模数据处理、机器学习、数据挖掘、实时流处理等,尤其适合数据科学家和 Python 开发者。

三、核心组件与功能

PySpark 基于 Spark 的核心引擎,主要包含以下组件:

  1. Spark Core(核心组件)

    • 提供 RDD(弹性分布式数据集)作为基本数据抽象,支持分布式计算、任务调度、内存管理等底层功能。
    • 你的代码中SparkContext就是 Core 组件的入口,用于创建 RDD 和配置 Spark 应用。
  2. Spark SQL

    • 用于处理结构化数据,支持 SQL 查询和 DataFrame/DataSet API。
    • 相比 RDD,DataFrame 提供了更高效的计算和更简洁的 API(类似 Pandas DataFrame,但支持分布式处理)。
  3. Spark Streaming

    • 处理实时流数据(如日志、消息队列),支持从 Kafka、Flume 等数据源读取数据,并进行低延迟处理。
  4. MLlib

    • 分布式机器学习库,提供分类、回归、聚类等算法(如逻辑回归、随机森林),支持大规模数据集上的模型训练。
  5. GraphX

    • 分布式图计算库,用于处理图结构数据(如社交网络关系),提供图算法(如 PageRank)。

四、PySpark 的优势

  1. 分布式计算能力
    突破单机内存和算力限制,可在集群中并行处理 TB/PB 级数据,比传统单机 Python 工具(如 Pandas)更适合大数据场景。

  2. 惰性计算优化
    操作不会立即执行,而是等到 “行动操作”(如collectcount)时才触发,Spark 会自动优化执行计划,减少冗余计算。

  3. 丰富的 API
    提供 RDD、DataFrame、SQL 等多种 API,满足不同场景需求:

    • RDD:灵活,适合复杂数据处理逻辑。
    • DataFrame:结构化数据处理,性能优于 RDD。
    • SQL:直接使用 SQL 语句查询数据,降低使用门槛。
  4. 兼容 Python 生态
    可直接调用 Python 库(如 NumPy 处理数值、Matplotlib 可视化),同时支持将 Spark 结果转换为 Pandas DataFrame 进行后续分析。

  5. 容错机制
    通过 RDD 的 “血统”(Lineage)记录依赖关系,当数据丢失时可自动重建,确保计算可靠性。

五、基本使用流程

以你的代码为基础,PySpark 的典型使用步骤如下:

1.初始化 Spark 环境
通过SparkConf配置应用(如设置 Master 节点、应用名称),再创建SparkContext(核心入口):

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("my_app")  # local[*]表示本地模式,使用所有CPU核心
sc = SparkContext(conf=conf)

2.创建 RDD/DataFrame

  • 从内存数据创建(如列表、元组):sc.parallelize([1,2,3])
  • 从外部文件创建:sc.textFile("path/to/file")(文本文件)、spark.read.csv("file.csv")(CSV 文件,需用 SparkSession)
# 通过parallelize方法将python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1,2,3,4,5])  # 列表
rdd2 = sc.parallelize((1,2,3,4,5))  # 元组
rdd3 = sc.parallelize("abcdefg")    # 字符串
rdd4 = sc.parallelize({1,2,3,4,5})  # 集合
rdd5 = sc.parallelize({"key1": "value1","key2":"value2"})  # 字典

# # 查看RDD里面的内容,需要用collect()方法
print("列表RDD:", rdd1.collect())
print("元组RDD:", rdd2.collect())
print("字符串RDD:", rdd3.collect())
print("集合RDD:", rdd4.collect())
print("字典RDD:", rdd5.collect())  # 注意:字典会只保留键

rdd = sc.textFile("D:/test.txt")
print(rdd.collect())

3.数据处理
使用转换操作(如mapfiltergroupBy)处理数据,例如:

rdd = sc.parallelize([1,2,3,4,5])
filtered_rdd = rdd.filter(lambda x: x > 2)  # 过滤出大于2的元素

4.执行计算并获取结果
通过行动操作触发计算并返回结果,例如:

print(filtered_rdd.collect())  # 输出:[3,4,5],collect()将分布式数据拉取到本地

5.关闭 Spark 环境
任务结束后关闭SparkContext释放资源:

sc.stop()

六、与传统 Python 工具的对比

工具 特点 适用场景
PySpark 分布式计算,支持大数据,惰性计算 TB/PB 级数据处理、集群环境
Pandas 单机内存计算,API 简洁,适合小数据 单机小数据(GB 级以内)
Dask 并行化 Pandas/NumPy,支持中等规模数据 单机多核或小规模集群

PySpark 的核心优势在于分布式架构,能处理远超单机内存的数据,而 Pandas 等工具更适合单机环境的小规模数据处理。