Apache Spark 的结构化流

发布于:2025-08-05 ⋅ 阅读:(18) ⋅ 点赞:(0)

Apache Spark 的结构化流(Structured Streaming)是 Spark 专为伪实时(近实时,Near Real-Time)流数据处理设计的高级 API,它基于 DataFrame/Dataset API,提供了简单、高效、容错的流处理能力。

一、核心思想:流即无限表(Unbounded Table)

结构化流的核心设计理念是将流数据视为一张不断增长的 “无限表”(与静态批处理的 “有限表” 对应)。用户可以用编写静态批处理逻辑的方式(如 SQL、DataFrame 转换)来处理流数据,框架会自动将逻辑转换为持续运行的流处理任务,无需手动管理流的连续性、状态或容错。

二、处理模式:微批处理(Micro-Batch)与连续处理(Continuous)

结构化流支持两种处理模式,以平衡延迟吞吐量

1. 微批处理(默认模式)
  • 原理:将流数据切分成一系列 “微批次”(Micro-Batches),每个批次处理一小段时间内的数据(默认最小批次间隔约 100ms)。
  • 特点:延迟通常在几百毫秒到几秒(伪实时的典型范围),吞吐量高,支持所有结构化流的功能(如状态操作、精确一次语义)。
  • 适用场景:大多数业务场景(如实时监控、日志分析、实时 ETL)。
2. 连续处理(实验性,Spark 2.3 + 引入)
  • 原理:通过长期运行的连续读取器和写入器,以 “逐条处理” 的方式接近真正的实时(延迟可低至毫秒级)。
  • 特点:延迟极低,但功能受限(仅支持部分转换操作,如 map、filter,不支持复杂状态操作),且容错保证较弱。
  • 适用场景:对延迟要求极高的场景(如高频交易实时风控)。

三、核心组件与流程

结构化流的处理流程可分为三部分:输入源(Sources)→ 处理逻辑(Operations)→ 输出接收器(Sinks)

1. 输入源(Sources)

负责读取流数据,支持的主流源包括:

  • Kafka:最常用的流数据源(支持精确一次语义)。
  • 文件系统(如 HDFS、S3):监控目录下新增的文件(支持文本、Parquet 等格式)。
  • Socket:用于测试(从 TCP socket 读取文本数据)。
  • 自定义源:通过实现Source接口扩展。
2. 处理逻辑(Operations)

用户通过 DataFrame/Dataset API 或 SQL 定义处理逻辑,支持与静态批处理完全一致的操作,例如:

  • 基础转换:selectfiltermapflatMap等。
  • 聚合操作:groupBy(含状态聚合)。
  • 关联操作:流与静态表的join、流与流的join(需配合 Watermark)。
  • 窗口操作:基于事件时间的滚动 / 滑动 / 会话窗口(核心功能)。
3. 输出接收器(Sinks)

负责将处理结果写入外部系统,支持的主流接收器包括:

  • Kafka:写入流数据到 Kafka。
  • 文件系统:以批处理方式写入文件(如 Parquet)。
  • 控制台 / 内存:用于调试(console输出或memory表)。
  • Foreach/ForeachBatch:自定义输出逻辑(如写入数据库)。
  • 更新模式:根据输出模式(Output Mode)决定如何更新结果。

四、关键概念:输出模式(Output Mode)

输出模式定义了流处理结果如何写入接收器,结构化流支持 3 种模式:

模式 适用场景 说明
Append 无聚合的转换(如 filter) 仅将新处理的行追加到输出(类似日志追加),不修改历史结果。
Complete 全局聚合(如group by count 输出所有聚合结果的完整快照(每次更新都会重写全量结果)。
Update 部分聚合或非聚合操作 仅输出被更新的行(新增或修改的结果),不输出未变化的历史行。

五、核心能力:状态管理与容错

流处理的核心挑战是状态管理(如聚合、窗口、关联)和容错(保证数据不丢不重),结构化流通过以下机制解决:

1. 状态管理(State Management)

结构化流自动维护处理过程中的状态(如聚合的中间结果、窗口的缓存数据),并支持:

  • 状态持久化:状态数据默认存储在 Executor 的内存中,大状态可 spill 到磁盘。
  • 状态清理:通过Watermark(水印)自动清理过期状态(如超过窗口时间的旧数据),避免状态无限增长。
2. Watermark(水印):处理延迟数据

实际场景中,数据可能因网络延迟等原因 “迟到”(事件时间 < 处理时间)。Watermark 用于定义 “可接受的最大延迟时间”:

  • 原理:动态计算当前事件时间的最大值(max_event_time),超过 max_event_time - watermark 的数据会被视为 “过期”,不再参与计算。
  • 示例:若 Watermark 设为 10 分钟,当前最大事件时间是 10:00,则 9:50 之前的迟到数据会被丢弃。

Watermark 是流处理中平衡 “准确性” 和 “性能” 的关键,常用于窗口聚合、流流关联等场景。

3. 容错与精确一次语义(Exactly-Once)

结构化流通过检查点(Checkpointing) 和预写日志(WAL) 保证 “精确一次” 语义(数据被且仅被处理一次):

  • 检查点:定期将流处理的元数据(如已处理的偏移量、状态快照、水印位置)写入可靠存储(如 HDFS)。
  • 故障恢复:当应用崩溃重启时,从最近的检查点恢复状态和进度,避免重复处理或丢失数据。

六、典型场景示例

以 “实时统计网站 5 分钟滑动窗口内的 PV(页面访问量)” 为例,展示结构化流的使用流程:

1. 定义输入源(假设从 Kafka 读取,事件时间字段为event_time
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// 定义Kafka源配置
val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "page_visits") // 订阅的Kafka主题
  .load()

// 解析Kafka消息(假设value是JSON格式,包含event_time和page_id)
val schema = new StructType()
  .add("event_time", TimestampType)
  .add("page_id", StringType)

val visitsDF = kafkaDF
  .select(from_json(col("value").cast(StringType), schema).as("data"))
  .select("data.event_time", "data.page_id")
2. 定义处理逻辑(5 分钟滑动窗口,每 1 分钟更新一次)
// 定义Watermark(允许数据延迟30秒)
val withWatermarkDF = visitsDF
  .withWatermark("event_time", "30 seconds") // 水印:最大延迟30秒

// 5分钟滑动窗口(每1分钟滑动一次),按page_id统计PV
val windowPVDF = withWatermarkDF
  .groupBy(
    window(col("event_time"), "5 minutes", "1 minute"), // 窗口:5分钟大小,1分钟滑动
    col("page_id")
  )
  .count()
  .withColumnRenamed("count", "pv")
3. 定义输出接收器(写入控制台,Update 模式)
val query = windowPVDF.writeStream
  .format("console") // 输出到控制台
  .outputMode("update") // 仅输出更新的行
  .option("truncate", "false") // 不截断输出
  .start() // 启动流查询

query.awaitTermination() // 等待查询结束

七、结构化流的优势

  1. 批流统一:用相同的 API 处理静态数据和流数据,降低学习成本。
  2. 简单易用:无需手动管理流的连续性、状态或容错,框架自动处理。
  3. 强容错保证:支持精确一次语义,适合生产环境。
  4. 丰富的状态操作:内置窗口、聚合、关联等状态处理能力,支持 Watermark 清理过期状态。
  5. 高性能:基于 Spark 的分布式计算引擎,可水平扩展,支持高吞吐量。

总结

Spark 结构化流是伪实时流处理的理想选择,它通过 “流即无限表” 的抽象、微批处理模式、精确一次语义和自动状态管理,简化了流数据处理的复杂度,适用于实时监控、日志分析、实时 ETL 等多种场景。


网站公告

今日签到

点亮在社区的每一天
去签到