【实战ES】实战 Elasticsearch:快速上手与深度实践-7.1.1Spark Streaming实时写入ES

发布于:2025-03-11 ⋅ 阅读:(53) ⋅ 点赞:(0)

👉 点击关注不迷路
👉 点击关注不迷路
👉 点击关注不迷路


7.1.1 Spark Streaming实时写入Elasticsearch深度实践指南

  • Spark Streaming与Elasticsearch实时数据管道架构
开始
数据源接入
创建Spark Streaming上下文
数据接收与转换
数据是否符合格式?
数据预处理
数据清洗与转换
数据分区与批量处理
配置Elasticsearch连接
写入Elasticsearch
写入是否成功?
监控与日志记录
错误处理与重试
是否继续接收数据?
结束

1. 核心集成原理

1.1 数据流转架构

Kafka/Flume/S3
ETL
聚合计算
数据源
Spark Streaming
数据处理
结构化数据
指标数据
Elasticsearch Writer
Elasticsearch Cluster
Kibana可视化

1.2 技术选型对比

集成方式 吞吐量 延迟 可靠性 开发复杂度
原生ES-Hadoop连接器 5万条/秒 2-5秒
自定义Bulk API 8万条/秒 1-3秒
Logstash管道 3万条/秒 5-10秒
Kafka Connect 6万条/秒 3-8秒

2. 全链路配置实战

2.1 环境准备清单

// 此为 build.sbt 文件,用于配置 Scala 项目的依赖项和构建信息
// 通过定义 libraryDependencies 来指定项目所需的外部库

// 使用 Seq 函数来创建一个依赖项的序列,将多个依赖项组合在一起
libraryDependencies ++= Seq(
  // 添加 Apache Spark Core 库依赖
  // "org.apache.spark" 是库的组织名称,代表该库来自 Apache Spark 项目
  // "spark-core" 是库的名称,它包含了 Spark 的核心功能,如 RDD(弹性分布式数据集)等
  // "3.3.1" 是库的版本号,指定使用该版本的 Spark Core
  "org.apache.spark" %% "spark-core" % "3.3.1",
  
  // 添加 Apache Spark Streaming 库依赖
  // "spark-streaming" 提供了 Spark 进行实时流处理的功能,可用于处理连续的数据流
  // 同样使用版本号 "3.3.1",确保与 Spark Core 版本兼容
  "org.apache.spark" %% "spark-streaming" % "3.3.1",
  
  // 添加 Elasticsearch 与 Spark 集成的库依赖
  // "org.elasticsearch" 是库的组织名称,表明该库来自 Elasticsearch 项目
  // "elasticsearch-spark-30" 是用于将 Spark 与 Elasticsearch 集成的库,允许将 Spark 处理的数据写入 Elasticsearch 或从 Elasticsearch 读取数据
  // "8.6.1" 是该集成库的版本号
  "org.elasticsearch" %% "elasticsearch-spark-30" % "8.6.1",
  
  // 添加 Apache Spark SQL 与 Kafka 集成的库依赖
  // "spark-sql-kafka-0-10" 用于在 Spark SQL 中与 Kafka 进行交互,可从 Kafka 主题读取数据或向 Kafka 主题写入数据
  // 版本号 "3.3.1" 保证与其他 Spark 组件版本一致
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.1"
)

// 版本兼容矩阵
Spark版本 ES-Hadoop版本 ES Server版本 特性支持
3.0.x 7.17.x 7.x-8.x 基础写入
3.3.x 8.6.x 8.x 向量化写入/安全连接
3.5.x 8.9.x 8.x 原生向量搜索支持

2.2 基础写入模板

// 导入 Spark 配置类,用于设置 Spark 应用的各种参数
import org.apache.spark.SparkConf
// 导入 Spark Streaming 上下文类,用于创建和管理流式计算任务
import org.apache.spark.streaming._
// 导入 Kafka 相关工具类,用于从 Kafka 主题接收数据流
import org.apache.spark.streaming.kafka010._
// 导入 Kafka 消费策略枚举
import org.apache.kafka.common.serialization.StringDeserializer
// 导入 Elasticsearch 与 Spark SQL 集成的工具类,用于将数据写入 Elasticsearch
import org.elasticsearch.spark.sql._
// 导入 SparkSession 类,用于创建和管理 Spark 会话
import org.apache.spark.sql.SparkSession

// 创建一个 SparkConf 对象,用于配置 Spark 应用的属性
val conf = new SparkConf()
  // 设置 Spark 应用的名称为 "ES-Streaming",方便在 Spark 集群管理界面识别
  .setAppName("ES-Streaming")
  // 设置 Elasticsearch 节点的地址,这里指定了两个节点,多个节点地址用逗号分隔
  .set("es.nodes", "es-node1:9200,es-node2:9200")
  // 设置当写入 Elasticsearch 时,如果索引不存在则自动创建
  .set("es.index.auto.create", "true")
  // 指定 Elasticsearch 文档的 ID 字段为 "id",在写入数据时会根据该字段进行操作
  .set("es.mapping.id", "id")

// 创建一个 StreamingContext 对象,用于初始化 Spark Streaming 环境
// 第一个参数是之前创建的 SparkConf 对象,第二个参数是批处理间隔为 5 秒
val ssc = new StreamingContext(conf, Seconds(5))

// 定义 Kafka 参数,指定 Kafka 服务器地址、键和值的反序列化器等
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "kafka-broker1:9092,kafka-broker2:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// 使用 KafkaUtils 创建一个直接从 Kafka 主题接收数据的 DStream
// 第一个参数是 StreamingContext 对象,第二个参数是消费策略,这里选择 PreferConsistent 表示均匀分配分区
// 第三个参数是订阅的 Kafka 主题和 Kafka 参数,这里订阅了 "log-topic" 主题
val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String]("log-topic", kafkaParams)
)

// 对从 Kafka 接收到的每个 RDD 进行处理
kafkaStream.foreachRDD { rdd =>
  // 导入 Elasticsearch 与 Spark SQL 集成的隐式转换和方法
  import org.elasticsearch.spark.sql._
  
  // 创建或获取一个 SparkSession 对象,使用当前 RDD 的 SparkContext 配置
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  
  // 将 RDD 中的每条消息的值(JSON 字符串)解析为 DataFrame
  // rdd.map(_.value) 提取出消息的值部分,spark.read.json 方法将其解析为 DataFrame
  val df = spark.read.json(rdd.map(_.value))
  
  // 将 DataFrame 中的数据保存到 Elasticsearch
  // 第一个参数指定 Elasticsearch 的索引名称,这里使用了动态索引名,根据 @timestamp 字段的日期生成索引
  // 第二个参数是一个 Map,包含了一些写入 Elasticsearch 的配置参数
  df.saveToEs("logs-{@timestamp:YYYY.MM.dd}",
    Map(
      // 启用日期字段的自动检测,让 Elasticsearch 自动识别日期字段
      "es.mapping.date.detection" -> "true",
      // 设置写入操作类型为 upsert,如果文档已存在则更新,不存在则插入
      "es.write.operation" -> "upsert",
      // 告诉 Elasticsearch 输入的数据是 JSON 格式
      "es.input.json" -> "true"
    )
  )
}

// 启动 Spark Streaming 上下文,开始接收和处理数据流
ssc.start()
// 等待 StreamingContext 终止,使程序保持运行状态,直到手动停止或发生异常
ssc.awaitTermination()

3. 高级调优策略

3.1 性能调优参数表

参数 默认值 推荐值 调优效果
es.batch.size.entries 1000 5000 提升35%写入吞吐量
es.batch.write.refresh true false 减少60%索引刷新开销
es.batch.write.retry.count 3 5 提升网络波动下的可靠性
es.http.timeout 1m 30s 降低慢节点影响
es.nodes.discovery false true 自动发现节点提升负载均衡
es.mapping.id - “business_id” 避免文档重复写入

3.2 写入模式对比

写入策略 配置参数 适用场景 性能基准(万条/秒)
批量提交 es.batch.size.entries=5000 高吞吐离线数据 8.2
实时逐条 es.batch.size.entries=1 低延迟关键事务 1.5
分区并行 es.spark.partition.multiplier=4 大数据量场景 12.4
异步回压 es.spark.streaming.backpressure.enabled=true 流量波动场景 9.8

4. 企业级应用案例

4.1 电商实时日志分析

// 定义一个样例类 LogEvent,用于表示结构化的日志事件
// 样例类会自动为其属性生成 getter、setter 方法,还会生成 equals、hashCode、toString 等方法,方便数据处理和比较
case class LogEvent(userId: String, 
                   action: String, 
                   timestamp: Long,
                   geo: String)

// 对从 Kafka 接收到的数据流 kafkaStream 进行处理
val logStream = kafkaStream
  // 从 Kafka 消息中提取消息的值部分,因为 Kafka 消息是键值对形式,这里只关注值
  .map(_.value)
  // 使用 transform 方法对 RDD 进行转换操作
  .transform(rdd => {
    // 使用 SparkSession 的 read.json 方法将 RDD 中的 JSON 字符串解析为 DataFrame
    // 然后使用 as[LogEvent] 方法将 DataFrame 转换为 Dataset[LogEvent],即按照 LogEvent 样例类的结构进行映射
    spark.read.json(rdd).as[LogEvent]
      // 过滤掉 userId 为 null 的日志事件,确保处理的数据中 userId 是有效的
      .filter(_.userId != null)
      // 使用自定义的 geohashUDF 函数对 geo 字段进行处理,生成一个新的列 geo_hash
      // 这里假设 geohashUDF 是一个已经定义好的用户自定义函数,用于将地理位置信息转换为地理哈希值
      .withColumn("geo_hash", geohashUDF(col("geo")))
  })

// 对处理后的日志流 logStream 中的每个 RDD 进行操作
logStream.foreachRDD { rdd =>
  // 将 RDD 中的数据保存到 Elasticsearch
  // 第一个参数指定 Elasticsearch 的索引名称,使用动态索引名,根据 timestamp 字段的日期生成索引
  rdd.saveToEs("logs-{timestamp:yyyy-MM-dd}",
    // 第二个参数是一个 Map,包含了一些写入 Elasticsearch 的配置参数
    Map(
      // 指定 Elasticsearch 文档的 ID 生成规则,使用 userId 和 timestamp 拼接而成
      // 这样可以确保每个文档的 ID 是唯一的,避免数据重复
      "es.mapping.id" -> "concat(userId,'_',timestamp)",
      // 设置写入操作类型为 create,表示只有当文档不存在时才进行写入操作
      // 如果文档已存在,写入操作会失败
      "es.write.operation" -> "create",
      // 启用 Spark 写入 Elasticsearch 后的清理操作,释放相关资源
      "es.spark.cleanup" -> "true"
    )
  )
}
  • 性能指标
数据规模 批次间隔 处理延迟 吞吐量 资源消耗
1亿条/天 5秒 3.2秒 6.8万条/秒 32核/64GB
5亿条/天 10秒 6.5秒 9.4万条/秒 64核/128GB
10亿条/天 15秒 9.1秒 12.1万条/秒 128核/256GB

4.2 物联网时序数据

  • 从 Kafka 接收 IoT 数据,对数据进行解析和转换,并将处理后的数据连同元数据一起写入 Elasticsearch 的功能。
// 定义一个包含 Elasticsearch 配置信息的 Map,键值对用于设置与 Elasticsearch 交互时的各种参数
val iotConf = Map(
  // 设置是否仅通过广域网(WAN)连接到 Elasticsearch 节点
  // 值为 "true" 表示只使用广域网连接,通常在集群部署环境中使用,避免本地网络干扰
  "es.nodes.wan.only" -> "true",
  
  // 指定 Elasticsearch 中的资源(索引和文档类型),这里使用动态索引名
  // 索引名会根据数据中的 device_type 字段和 @timestamp 字段的年月部分动态生成
  // 例如,不同的设备类型和时间会对应不同的索引,便于数据的组织和管理
  "es.resource" -> "iot-{device_type}-{@timestamp:YYYY-MM}",
  
  // 配置 Elasticsearch 文档的路由规则
  // 使用 device_id 作为路由键,确保相同 device_id 的文档会被路由到同一分片,提高查询性能
  "es.mapping.routing" -> "device_id",
  
  // 设置是否自动创建 Elasticsearch 索引
  // 值为 "true" 表示如果写入数据时指定的索引不存在,会自动创建该索引
  "es.index.auto.create" -> "true"
)

// 对从 Kafka 接收到的数据流 kafkaStream 进行处理
kafkaStream
  // 使用 map 函数对 kafkaStream 中的每条消息应用 parseIoTData 函数
  // parseIoTData 函数用于将 Kafka 消息解析为特定的 IoT 数据格式
  .map(parseIoTData)
  // 对处理后的 DStream 中的每个 RDD 进行操作
  .foreachRDD { rdd =>
    // 对 RDD 中的每个元素应用 transformData 函数
    // transformData 函数用于对解析后的 IoT 数据进行进一步的转换和处理
    rdd.map(transformData)
      // 将处理后的数据连同元数据一起保存到 Elasticsearch
      // iotConf 是之前定义的 Elasticsearch 配置信息,用于指定写入的索引、路由规则等
      .saveToEsWithMeta(iotConf)
  }
  • 优化效果
优化项 优化前 优化后 提升比例
写入吞吐量 4.2万条/秒 7.8万条/秒 85%↑
索引延迟 8.5秒 2.3秒 73%↓
存储压缩率 1:3 1:5 66%↑
查询响应时间 420ms 120ms 71%↓

5. 容错与监控

5.1 故障处理机制

// 配置 Elasticsearch 批量写入的重试策略

// 设置批量写入操作失败时的重试次数
// 这里将重试次数设置为 5 次,意味着当批量写入操作失败时,Spark 会尝试重新执行该操作,最多重试 5 次
conf.set("es.batch.write.retry.count", "5")

// 设置每次重试之间的等待时间
// 这里将等待时间设置为 10 秒,即每次重试操作之间会间隔 10 秒,避免频繁重试给 Elasticsearch 带来过大压力
conf.set("es.batch.write.retry.wait", "10s")

// 配置死信队列处理,当向 Elasticsearch 写入数据时出现错误,将无法写入的数据存储到死信队列

// 指定 DataFrame 写入的数据格式为 Elasticsearch
df.write
  .format("es")
  // 设置处理 Elasticsearch 写入错误的处理类
  // 这里指定使用 com.example.ESErrorHandler 类来处理写入过程中出现的错误
  // 该类需要开发者自定义实现,用于处理各种 Elasticsearch 写入错误情况
  .option("es.write.rest.error.handlers", "com.example.ESErrorHandler")
  // 设置死信队列的存储路径
  // 当 com.example.ESErrorHandler 类处理错误时,将无法写入 Elasticsearch 的数据存储到指定的路径 /dlq 下
  // 可以后续对死信队列中的数据进行分析和处理,以找出写入失败的原因并尝试重新写入
  .option("es.write.rest.error.handler.com.example.ESErrorHandler.deadLetterPath", "/dlq")
  // 执行写入操作,将 DataFrame 中的数据写入到 Elasticsearch 中
  .save()

5.2 监控指标体系

指标类别 监控项 告警阈值 采集方式
资源使用 Executor CPU使用率 >85%持续5分钟 Prometheus
数据流量 批次处理速率 <1000条/秒 Spark Metrics
写入健康 ES Bulk拒绝率 >1% ES Cluster Stats
时效性 端到端延迟 >10秒 打点日志分析

6. 安全加固方案

6.1 安全连接配置

// 以下配置用于设置 Spark 与 Elasticsearch 之间的安全连接及相关参数

// 开启 Elasticsearch 连接的 SSL 加密
// 当设置为 "true" 时,Spark 与 Elasticsearch 之间的通信将使用 SSL 协议进行加密,确保数据在传输过程中的安全性
conf.set("es.net.ssl", "true")

// 指定 SSL 密钥库的存储位置
// 密钥库(keystore)通常包含客户端的私钥和证书,用于 SSL 握手过程中的身份验证
// 这里设置密钥库的路径为 /path/to/keystore.jks,确保 Spark 能够找到并使用该密钥库进行安全连接
conf.set("es.net.ssl.keystore.location", "/path/to/keystore.jks")

// 设置密钥库的访问密码
// 密钥库是加密存储的,需要使用密码来解锁
// 这里设置的密码 "password" 用于访问 /path/to/keystore.jks 密钥库
conf.set("es.net.ssl.keystore.pass", "password")

// 指定 SSL 信任库的存储位置
// 信任库(truststore)包含了客户端信任的证书颁发机构(CA)的证书
// 通过设置信任库的路径为 /path/to/truststore.jks,Spark 可以验证 Elasticsearch 服务器提供的证书是否由受信任的 CA 颁发
conf.set("es.net.ssl.truststore.location", "/path/to/truststore.jks")

// 禁用 Elasticsearch 节点的自动发现功能
// 默认情况下,Elasticsearch 客户端会尝试自动发现集群中的其他节点
// 当设置为 "false" 时,Spark 将不会自动发现 Elasticsearch 集群中的其他节点,而是使用预先配置的节点信息进行连接
// 这种设置通常用于固定节点配置的场景,或者在网络环境复杂的情况下避免自动发现带来的问题
conf.set("es.nodes.discovery", "false")

6.2 权限控制模板

// 向 Elasticsearch 发送 PUT 请求,用于创建一个名为 spark_writer 的安全角色
PUT _security/role/spark_writer
{
    // 定义该角色在集群层面拥有的权限
    "cluster": [
        // 赋予该角色对集群进行监控的权限,拥有此权限可以查看集群的状态信息、节点信息等监控数据
        "monitor"
    ],
    // 定义该角色在索引层面拥有的权限,可针对不同的索引进行不同的权限设置
    "indices": [
        {
            // 指定该权限所作用的索引名称模式,这里表示对所有以 "logs-" 开头的索引生效
            "names": ["logs-*"],
            // 定义该角色对匹配上述模式的索引所拥有的权限
            "privileges": [
                // 允许该角色创建新的索引,当需要向符合 "logs-*" 模式的索引写入数据但该索引不存在时,可以自动创建
                "create_index",
                // 允许该角色向索引中写入新的文档,即可以执行插入操作
                "index"
            ],
            // 配置字段级别的安全权限,用于控制对索引中特定字段的访问
            "field_security": { 
                // 授予该角色对索引中除了特定字段外的所有字段的访问权限
                "grant": ["*"],
                // 明确列出不授予访问权限的字段,这里是 "password" 和 "credit_card" 字段
                // 这意味着即使该角色有对索引的操作权限,但无法访问和操作这些敏感字段
                "except": ["password", "credit_card"]
            }
        }
    ]
}

附录:运维工具箱

工具类别 推荐方案 核心功能
性能诊断 Spark History Server 作业执行分析
日志分析 ELK Stack 管道异常追踪
资源监控 Grafana+Prometheus 实时资源监控
数据校验 Great Expectations 数据质量检查

最佳实践

  1. 生产环境必须启用SSL加密
  2. 建议采用结构化数据格式(Parquet/JSON)
  3. 定期执行索引模板优化
  4. 建立数据Schema版本控制