Spark常见的可以优化的点

发布于:2024-06-17 ⋅ 阅读:(16) ⋅ 点赞:(0)

Shuffle 复用

在这里插入图片描述

# 1.以下操作会复用的shuffle结果,只会读一遍数据源
val rdd1 = sc.textFile("hdfs://zjyprc-hadoop/tmp/hive-site.xml")
             .flatMap(_.split(" "))
             .map(x => (x,1))
             .reduceByKey(_ + _)
             .filter(_._2 > 1)
rdd1.count()
rdd1.filter(_._2 > 10).count()

# 2.以下操作每次需要重新执行整个操作,会读两遍数据源
sc.textFile("hdfs://zjyprc-hadoop/tmp/hive-site.xml")
  .flatMap(_.split(" "))
  .map(x => (x,1))
  .reduceByKey(_ + _)
  .filter(_._2 > 1)
  .count()
  
sc.textFile("hdfs://zjyprc-hadoop/tmp/hive-site.xml")
  .flatMap(_.split(" "))
  .map(x => (x,1))
  .reduceByKey(_ + _)
  .filter(_._2 > 1)
  .filter(_._2 > 10).count()

在第一个示例中,rdd1.count() 触发了计算,并且结果被存储,后续的 filter 和 count 操作都是在这个已经计算好的结果上进行的。而在第二个示例中,每次 count() 都是一个新的触发点,导致每次都需要重新计算。

并行度设置

shuffle并行度

如下图所示,作业中可能不止一个Job,shuffle数据只有几十mb,但是并行度设置了2000:
在这里插入图片描述

设置2000可能只是为了让作业中的一个Job的瓶颈更小:
在这里插入图片描述

rdd程序其实控制能力极强的,常见的瓶颈算子都可以直接单独设置并行度的。
例如上述图中的算子可以由 rdd.reduceByKey(keyName) 改为 rdd.reduceByKey(keyName,3000)

scan file并行度

如果从hadoop输入的数据在后续的stage计算可能出现了内存瓶颈(gc时间比较长)时,可以考虑减少数据每个分区的大小,提高并行度:

  • DataSource读法,特指使用SparkSession.read这种,默认128:
    spark.files.maxPartitionBytes=268435456

  • 使用rdd直接读的,例如HDFSIO.thriftSequence、直接使用rdd hadoop api等,默认256(注意这个没有合并小文件功能):
    spark.hadoop.mapred.max.split.size=268435456

REPARTITION 之后JOIN

在这里插入图片描述

  • 首先repartition是用来做什么的?
    改变下游stage的并行度。
  • 什么时候改变下游并行度是有用的?
    • 下游是一个迭代计算,输入数据很少但是可能迭代所需资源很大,那么多并行度可以上每个task的输入数据变少。
    • 输出文件后通过改变并行度,控制小文件。
  • 为什么说上面的repartition基本无效?
    因为repartition之后,没有任何操作,就进行shuffle,将数据shuffle输入给下游的join了。join需要根据join的key进行hash分布到不同的join的task中,上游这个repartition基本没任何意义了。

SQL中可能造成性能瓶颈的算子

Get json object

尤其是连续对一个json解析多个字段,如下图,效率很低:
在这里插入图片描述
优化方式:
批量解析用json_tuple替换get_json_object,通常是更加高效的
问题原因:

--- t1 构建的测试数据
with t1 as (
    SELECT  properties
    FROM VALUES ('{"a":1, "b":2, "c":3}') tab(properties)
)
--- 实际执行properties会被解析3次,效率较低
--- 期望只解析一次,复用解析后的结果,得到a,b,  c
select
properties,
get_json_object(properties, '$.a') as a,
get_json_object(properties, '$.b') as b,
get_json_object(properties, '$.c') as c
from t1

优化方法:

--- t1 构建的测试数据
with t1 as (
    SELECT  properties
    FROM VALUES ('{"a":1, "b":2, "c":3}') tab (properties)
)
--- 写法1(推荐)
--- 1.使用json_tuple指定要解析出来的字段 'a', 'b', 'c'
--- 2.将'a', 'b', 'c'中的'替换为`,作为别名
select  
	properties,  
	json_tuple(properties, 'a', 'b', 'c') as (`a`,`b`,`c`)
from t1

--- 写法2
 --- 1.使用lateral view json_tuple指定要解析出来的字段 'a', 'b', 'c'
 --- 2.将'a', 'b', 'c'中的'替换为`,作为别名
 --- 3.直接指定select tb.*
--- select  properties, tb.*
--- from t1
--- lateral view json_tuple(properties, 'a', 'b', 'c') tb as `a`,`b`,`c`

array_union

这个算子会对array的对象去重,涉及到排序等操作,可能某些array比较大就会非常慢。

count(distinct()) group by 替换 size(collect_set() partition by)

窗口函数中collect_set是一种执行很慢的操作。

重复缓存数据

在算子内部或者分区内部不断重复缓存元素,导致内存占用

rdd.map(ele => {
    val list = ele.getList
    val set = list.toSet
    val result = list.map( *** )
})
rdd.foreachPartition(partition => {
    val result = new HashMap<String, String>
    while(partition.hashNext) {
        val next = partition.next
        result.put(next.key, next.value)
    }
    
    dealWithResult(result)
})

Cache 不合理

stage0:
在这里插入图片描述
stage3:
在这里插入图片描述
多次触发读相同的数据源,并且在同一个pipeline上使用了多次cache,但是并没有用上。

对比stage0和stage3, 在InstancePushMergeFeedProfileLabel2FeedImpressionWithInfoInFeed类第148行之前的逻辑是一致的。缓存应该对InstancePushMergeFeedProfileLabel2FeedImpressionWithInfoInFeed 148行的filter结果做缓存,后续使用缓存结果,避免重复读数据源。