存储级别指定
persist:可以通过传入 StorageLevel 参数来指定不同的持久化级别。常见的持久化级别有:
MEMORY_ONLY:将 RDD 以 Java 对象的形式存储在 JVM 的内存中。若内存不足,部分分区将不会被缓存,需要时会重新计算。
MEMORY_AND_DISK:优先把 RDD 以 Java 对象的形式存储在 JVM 的内存中。若内存不足,会把多余的分区存储到磁盘上。
DISK_ONLY:将 RDD 的数据存储在磁盘上。
MEMORY_ONLY_SER:将 RDD 以序列化的 Java 对象形式存储在内存中,相较于 MEMORY_ONLY,序列化后占用的内存空间更小,但读取时需要进行反序列化操作,会带来一定的性能开销。
MEMORY_AND_DISK_SER:优先将 RDD 以序列化的 Java 对象形式存储在内存中,内存不足时存储到磁盘上。
cache:不能指定存储级别,它固定使用 MEMORY_ONLY 存储级别。
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Cache {
//Spark的缓存
//1.cache()
//2.persist()
//cache()是persist()的一种特殊情况
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Cache").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//创建一个包含大量随机数的RDD
val rdd = sc.parallelize(1 to 1000000).map( _=> scala.util.Random.nextInt(100))
//定义一个复杂的转换函数
def complexTransformation(x:Int): Int = {
var result=x
for(i<-1 to 1000){
result=result*2%100
}
result
}
//val rdd1=rdd.map(complexTransformation)
//缓存rdd
//val rdd1=rdd.map(complexTransformation).cache()
//persist
val rdd1=rdd.map(complexTransformation).persist(StorageLevel.DISK_ONLY)
//第一次触发行动算子,计算并统计消耗时间
val startTime=System.currentTimeMillis()
val rs1=rdd1.collect()
val endTime=System.currentTimeMillis()
println("第一次计算消耗时间:"+(endTime - startTime)+"毫秒")
//第二次触发行动算子,计算并统计消耗时间
val startTime1=System.currentTimeMillis()
val rs2=rdd1.collect()
val endTime1=System.currentTimeMillis()
println("第二次计算消耗时间:"+(endTime1 - startTime1)+"毫秒")
}
}