下面我将详细讲解如何使用 Spark SQL 分别通过 SQL 模式和 DSL(Domain Specific Language)模式实现 WordCount 功能。
WordCount 是大数据处理中的经典案例,主要功能是统计文本中每个单词出现的次数。
准备工作
首先需要初始化 SparkSession,这是 Spark SQL 的入口点:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
# 创建 SparkSession
spark = SparkSession.builder \
.appName("WordCountExample") \
.master("local[*]") # 本地模式运行,生产环境可去掉
.getOrCreate()
示例数据
我们使用一段简单的文本作为示例数据:
# 创建包含文本数据的 DataFrame
data = [("Hello Spark SQL",), ("Hello DSL",), ("Spark SQL is powerful",), ("DSL is flexible",)]
df = spark.createDataFrame(data, ["sentence"])
方法一:SQL 模式实现 WordCount
SQL 模式的核心是将数据注册为临时视图,然后通过编写 SQL 语句来实现单词计数。
步骤如下:
- 将 DataFrame 注册为临时视图
df.createOrReplaceTempView("sentences")
- 编写 SQL 语句实现单词计数
# 使用 SQL 进行单词拆分、过滤和计数
word_count_sql = spark.sql("""
SELECT word, COUNT(*) as count
FROM (
-- 拆分句子为单词
SELECT explode(split(sentence, ' ')) as word
FROM sentences
) temp
WHERE word != '' -- 过滤空字符串
GROUP BY word
ORDER BY count DESC
""")
# 显示结果
word_count_sql.show()
- 输出结果
+--------+-----+
| word|count|
+--------+-----+
| Hello| 2|
| Spark| 2|
| SQL| 2|
| DSL| 2|
| is| 2|
|powerful| 1|
|flexible| 1|
+--------+-----+
方法二:DSL 模式实现 WordCount
DSL 模式(DataFrame API)通过调用 DataFrame 的方法链来实现功能,不需要编写 SQL 语句。
步骤如下:
# 使用 DataFrame API (DSL) 实现单词计数
word_count_dsl = df.select(
# 拆分句子并展开为多行
explode(split(col("sentence"), " ")).alias("word")
).filter(
col("word") != "" # 过滤空字符串
).groupBy(
col("word") # 按单词分组
).count(
).orderBy(
col("count").desc() # 按计数降序排列
)
# 显示结果
word_count_dsl.show()
输出结果与 SQL 模式完全相同。
两种模式的对比分析
特点 | SQL 模式 | DSL 模式 |
---|---|---|
语法风格 | 使用标准 SQL 语句 | 使用方法链调用(如 select、filter、groupBy) |
适用人群 | 熟悉 SQL 的数据分析师、数据工程师 | 熟悉编程的开发者 |
灵活性 | 适合复杂查询(如窗口函数、子查询) | 适合程序式数据处理流程 |
可读性 | 对于复杂业务逻辑,SQL 结构更清晰 | 对于数据处理流水线,方法链更直观 |
类型安全 | 运行时检查 | 部分支持编译时检查(Scala/Java) |
完整代码示例
下面是两种模式的完整代码,可以直接运行:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
# 初始化 SparkSession
spark = SparkSession.builder \
.appName("WordCount with Spark SQL") \
.master("local[*]") \
.getOrCreate()
# 准备示例数据
data = [
("Hello Spark SQL",),
("Hello DSL",),
("Spark SQL is powerful",),
("DSL is flexible",),
("Spark and SQL and DSL",)
]
df = spark.createDataFrame(data, ["sentence"])
print("原始数据:")
df.show(truncate=False)
# 方法1: SQL 模式实现 WordCount
print("\n=== SQL 模式结果 ===")
df.createOrReplaceTempView("sentences")
word_count_sql = spark.sql("""
SELECT word, COUNT(*) as count
FROM (
SELECT explode(split(sentence, ' ')) as word
FROM sentences
) temp
WHERE word != ''
GROUP BY word
ORDER BY count DESC
""")
word_count_sql.show()
# 方法2: DSL 模式实现 WordCount
print("\n=== DSL 模式结果 ===")
word_count_dsl = df.select(
explode(split(col("sentence"), " ")).alias("word")
).filter(
col("word") != ""
).groupBy(
"word"
).count(
).orderBy(
col("count").desc()
)
word_count_dsl.show()
# 停止 SparkSession
spark.stop()
关键函数解释
split():将字符串按指定分隔符拆分,返回数组
split(col("sentence"), " ") # 按空格拆分句子
explode():将数组中的每个元素转换为一行,实现 "行转列"
explode(array_column) # 将数组列展开为多行
groupBy() + count():按指定列分组并计数
groupBy("word").count() # 按单词分组并计算出现次数
通过这两种方式,我们可以灵活地利用 Spark SQL 处理文本数据并实现单词计数,根据实际场景和个人习惯选择合适的方式即可。