SparkStreaming之04:调优

发布于:2025-03-05 ⋅ 阅读:(23) ⋅ 点赞:(0)

SparkStreaming调优

一 、要点

4.1 SparkStreaming运行原理

在这里插入图片描述

深入理解

在这里插入图片描述

4.2 调优策略

4.2.1 调整BlockReceiver的数量

在这里插入图片描述

案例演示:

object MultiReceiverNetworkWordCount {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(5))

    //创建多个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
    val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)

    val lines = lines1.union(lines2)

    lines.repartition(100)

    //处理的逻辑,就是简单的进行word count
    val words = lines.repartition(100).flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))

    //将结果输出到控制台
    wordCounts.print()

    //启动Streaming处理流
    ssc.start()

    //等待Streaming程序终止
    ssc.awaitTermination()

    ssc.stop(false)
  }
}
⭐️4.2.2 调整Block的数量

batchInterval : 触发批处理的时间间隔
blockInterval :将接收到的数据生成Block的时间间隔,spark.streaming.blockInterval(默认是200ms),那么,BlockRDD的分区数 = batchInterval / blockInterval,即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10,如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval,blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了

4.2.3 调整Receiver的接受速率

pps:permits per second 每秒允许接受的数据量(QPS -> queries per second)
Spark Streaming默认的PPS是没有限制的,可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue

⭐️4.2.3 调整数据处理的并行度

BlockRDD的分区数

a. 通过Receiver接受数据的特点决定

b. 也可以自己通过repartition设置

ShuffleRDD的分区数

a. 默认的分区数为spark.default.parallelism(core的大小)

b. 通过我们自己设置决定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 数据的序列化

SparkStreaming两种需要序列化的数据:
a. 输入的数据:默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中
b. 缓存的数据:默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
使用Kryo序列化机制,比Java序列化机制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 内存调优
(1)需要内存大小

和transformation的类型有关,如果使用的是updateStateByKey,Window这样的算子,那么内存就要设置得偏大

(2)数据存储级别

如果把接收到的数据设置的存储级别是MEMORY_DISK这种级别,也就是说如果内存不够可以把数据存储到磁盘上,其实性能还是不好的,性能最好的就是所有的数据都在内存里面,所以如果在资源允许的情况下,把内存调大一点,让所有的数据都存在内存里面。

4.2.6 Outout性能
(1)MySQL,HBase

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(2)Kafka(0.8版本)

虽然现在的Kafka的版本已经到2.x版本了,但是很多公司因为历史遗留的原因,公司里面还是会有0.8x的Kafka。比如本人公司里面有两个Kafka集群,一个是0.8x的kafka,一个是1.x的Kafka。开发的时候有时候需要我们使用SparkStreaming做实时的ETL,然后再把数据打回Kafka,0.8版本的kafka默认是没有批量提交的功能的。本人公司里面一个真实的案例,一位同学写的SparkStreaming程序将数据处理完了以后通过ForeachRDD把数据写回到0.8Kafka。但是数据处理得很慢,经常会收到延时告警。最终发现他把数据写到Kafka的时候是一条数据一条数据提交的性能很差。最终手动实现了批量提交的功能。从此再也没有收到过告警。

4.2.7 Backpressure(压力反馈)

在这里插入图片描述
在这里插入图片描述

Feedback Loop : 动态使得Streaming app从unstable状态回到stable状态

在这里插入图片描述

从Spark1.5版本开始:spark.streaming.backpressure.enabled = true

4.2.8 Elastic Scaling(资源动态分配)

动态分配资源:

批处理动态的决定这个application中需要多少个Executors:

  1. 当一个Executor空闲的时候,将这个Executor杀掉
  2. 当task太多的时候,动态的启动Executors

Streaming分配Executor的原则是比对 process time / batchInterval 的比率

在这里插入图片描述

如果延迟了,那么就自动增加资源

在这里插入图片描述

在这里插入图片描述

从Spark2.0有这个功能:: spark.streaming.dynamicAllocation.enabled = true

⭐️4.2.8 数据倾斜调优(重要)

因为SparkStreaming的底层就是RDD,之前SparkCore的所有的数据倾斜的调优策略(见Spark之数据倾斜调优)都适合于SparkStreaming,需要灵活掌握,在实际开发的工作当中用得频率较高。

二 、总结

面试问题:你在工作当中有SparkStreaming调优过项目吗?怎么调优的?效果怎么样?

  1. 比如举foreachRDD的例子
  2. 比如举个数据倾斜的例子
  3. 用Xmind整理调优的策略

网站公告

今日签到

点亮在社区的每一天
去签到