以下转换算子只适用于键值对中。
object Demo {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("kvTrans").setMaster("local")
val sc: SparkContext = new SparkContext(sparkConf)
groupByKeyOper(sc)
aggregateByKeyOper(sc)
sortByKeyOper(sc)
joinOper(sc)
cogroupOper(sc)
subtractByKeyOper(sc)
keysOper(sc)
valuesOper(sc)
mapValuesOper(sc)
combineByKeyOper(sc)
sc.stop()
}
/**
* groupByKey——转换算子
* groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端
* def groupByKey(): RDD[(K, Iterable[V])] = {}
*/
def groupByKeyOper(sc:SparkContext):Unit = {
// rdd中存放的是一行又一行的数据
val rdd:RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
// rdd1中存放的是一个又一个的单词
val rdd1:RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
// rdd2中存放的是键值对类型的数据
val rdd2:RDD[(String,Int)] = rdd1.map((word: String) => {
(word, 1)
})
// 按相同的key进行聚合
val rdd3:RDD[(String,Iterable[Int])] = rdd2.groupByKey()
println(rdd3.collect().mkString("; "))
}
/**
* aggregateByKey——转换算子,类似于aggregate行动算子,但这个只对键值对生效
*/
def aggregateByKeyOper(sc:SparkContext):Unit = {
// rdd中存放的是一行又一行的数据
val rdd:RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
// rdd1中存放的是一个又一个的单词
val rdd1:RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
// rdd2中存放的是键值对类型的数据
val rdd2:RDD[(String,Int)] = rdd1.map((word: String) => {
(word, 1)
})
println(rdd2.collect().mkString("; "))
// 按相同的key进行聚合
val rdd3:RDD[(String,Int)] = rdd2.aggregateByKey(0)(
(a:Int,b:Int)=>{a+b},
(a:Int,b:Int)=>{a+b}
)
rdd3.foreach(println(_))
}
/**
* sortByKey——转换算子,按照key值对数据进排序
*/
def sortByKeyOper(sc:SparkContext):Unit = {
val rdd:RDD[(String,Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("mm", 46)))
val rdd1:RDD[(String,Int)] = rdd.sortByKey(false)
rdd1.foreach(println(_))
}
/**
* join——转换算子,只保留匹配的数据,不匹配的不会输出
* 出现重复数据会进行笛卡尔乘积
* 可以使用cogroup解决出现笛卡尔乘积的问题
*/
def joinOper(sc:SparkContext):Unit = {
val rdd:RDD[(String,Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("mm", 46), ("bb", 51)))
val rdd1:RDD[(String,String)] = sc.makeRDD(Array(("mm", "女"), ("hr", "女"), ("cy", "男"),("sq", "男")))
val rdd2:RDD[(String,(Int,String))] = rdd.join(rdd1)
rdd2.foreach(println(_))
// leftOuterJoin保留左边的所有数据,Option可以避免报空指针异常
val rdd3:RDD[(String,(Int,Option[String]))] = rdd.leftOuterJoin(rdd1)
rdd3.foreach(tuple=>{
println(s"key=${tuple._1} value=${tuple._2._1},${tuple._2._2}")
})
val rdd4:RDD[(String,(Option[Int],String))] = rdd.rightOuterJoin(rdd1)
rdd4.foreach(tuple=>{
println(s"key=${tuple._1} value=${tuple._2._1},${tuple._2._2}")
})
}
/**
* cogroup——删除前面RDD中与括号传入的RDD Key值相同的元素,返回的是前者RDD删除完成的数据
*/
def cogroupOper(sc:SparkContext):Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("mm", 46), ("bb", 51),("cy",33)))
val rdd1: RDD[(String, String)] = sc.makeRDD(Array(("mm", "女"), ("hr", "女"), ("cy", "男"), ("sq", "男"),("cy","女")))
val rdd2: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd.cogroup(rdd1)
rdd2.foreach(println(_))
// leftOuterJoin保留左边的所有数据,Option可以避免报空指针异常
val rdd3: RDD[(String, (Int, Option[String]))] = rdd.leftOuterJoin(rdd1)
rdd3.foreach(tuple => {
println(s"key=${tuple._1} value=${tuple._2._1},${tuple._2._2}")
})
val rdd4: RDD[(String, (Option[Int], String))] = rdd.rightOuterJoin(rdd1)
rdd4.foreach(tuple => {
println(s"key=${tuple._1} value=${tuple._2._1},${tuple._2._2}")
})
}
/**
* subtractByKey——按照key值进行聚合value(迭代器)
*/
def subtractByKeyOper(sc:SparkContext):Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("mm", 46), ("bb", 51),("cy",33)))
val rdd1: RDD[(String, String)] = sc.makeRDD(Array(("mm", "女"), ("hr", "女"), ("cy", "男"), ("sq", "男"),("cy","女")))
val rdd2: RDD[(String, Int)] = rdd.subtractByKey(rdd1)
rdd2.foreach(println(_)) // (bb,51)
// leftOuterJoin保留左边的所有数据,Option可以避免报空指针异常
val rdd3: RDD[(String, (Int, Option[String]))] = rdd.leftOuterJoin(rdd1)
rdd3.foreach(tuple => {
println(s"key=${tuple._1} value=${tuple._2._1},${tuple._2._2}")
})
val rdd4: RDD[(String, (Option[Int], String))] = rdd.rightOuterJoin(rdd1)
rdd4.foreach(tuple => {
println(s"key=${tuple._1} value=${tuple._2._1},${tuple._2._2}")
})
}
/**
* keys——将数据集中所有的key值取出来形成一个新的RDD
*/
def keysOper(sc:SparkContext):Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("mm", 46), ("bb", 51),("cy",33)))
val rdd1: RDD[String] = rdd.keys
rdd1.foreach(println(_))
}
/**
* values——将数据集中所有的value值取出来形成一个新的RDD
*/
def valuesOper(sc:SparkContext):Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("mm", 46), ("bb", 51),("cy",33)))
val rdd1: RDD[Int] = rdd.values
rdd1.foreach(println(_))
}
/**
* values——将数据集中所有的value值取出来形成一个新的RDD
*/
def mapValuesOper(sc:SparkContext):Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("mm", 46), ("bb", 51),("cy",33)))
val rdd1: RDD[(String, Int)] = rdd.mapValues((a:Int)=>{a+1})
rdd1.foreach(println(_))
}
/**
* combineByKey
*/
def combineByKeyOper(sc:SparkContext):Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("cy", 32), ("hr", 23), ("hr", 24), ("mm", 46), ("bb", 51),("cy",33)))
// 计算总和
val rdd1: RDD[(String, Int)] =rdd.combineByKey(
// 32 1
(a:Int)=>{a},
// 23 32 23+32
(a:Int,b:Int)=>{a+b},
// 将每个分区中的数据相加
(a:Int,b:Int)=>{a+b}
)
rdd1.foreach(println(_))
// 计算平均值
val rdd2: RDD[(String, (Int, Int))] =rdd.combineByKey(
// 32 1
(a:Int)=>{(a,1)},
// 假设为如下分区:
// (32 1) 23
// (23 1) 24
(tuplea:(Int,Int),num:Int)=>{(tuplea._1+num, tuplea._2)},
// 每个分区都是(Int,Int)类型的数据,将所有分区的数据相加
(tuplea:(Int,Int),tupleb:(Int,Int))=>{(tuplea._1+tupleb._1,tuplea._2+tupleb._2)}
)
val rdd3:RDD[(String,Double)] = rdd2.mapValues((tuple2: (Int, Int)) => {
tuple2._1 / tuple2._2.toDouble
})
rdd3.foreach(println(_))
}
}