【大数据学习 | Spark-SQL】SparkSQL读写数据

发布于:2024-11-28 ⋅ 阅读:(16) ⋅ 点赞:(0)

我们使用sparksql进行编程,编程的过程我们需要创建dataframe对象,这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。

但是sparksql给大家提供了多种便捷读取数据的方式。

//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json

write写出存储数据的时候也是文件夹的,而且文件夹不能存在。

  • csv是一个介于文本和excel之间的一种格式,如果是文本打开用逗号分隔的。
  • text文本普通文本,但是这个文本必须只能保存一列内容。

以上两个文本都是只有内容的,没有列的。

  • json是一种字符串结构,本质就是字符串,但是存在kv,例子 {"name":"zhangsan","age":20}

多平台解析方便,带有格式信息。

  • orc格式一个列式存储格式,hive专有的。
  • parquet列式存储,顶级项目

以上都是列式存储问题,优点(1.列式存储,检索效率高,防止冗余查询 2.带有汇总信息,查询特别快 3.带有轻量级索引,可以跳过大部分数据进行检索),他们都是二进制文件,带有格式信息。

jdbc 方式,它是一种协议,只要符合jdbc规范的服务都可以连接,mysql,oracle,hive,sparksql

整体代码:

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}

import java.util.Properties

object TestMovieWithSql {
  def main(args: Array[String]): Unit = {
    //??movie???
    //1.id  middle=name  last=type
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions","20")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)

    import sqlSc.implicits._
    //deal data
    val df = sc.textFile("data/movies.txt")
      .flatMap(t => {
        val strs = t.split(",")
        val mid = strs(0)
        val types = strs.reverse.head
        val name = strs.tail.reverse.tail.reverse.mkString(" ")
        types.split("\\|").map((mid, name, _))
      }).toDF("mid", "mname", "type")

    df.limit(1).show()

    val df1 = sc.textFile("data/ratings.txt")
      .map(t=>{
        val strs = t.split(",")
        (strs(0),strs(1),strs(2).toDouble)
      }).toDF("userid","mid","score")
    df1.limit(1).show()

    import org.apache.spark.sql.functions._
    val df11 = df.join(df1, "mid").groupBy("userid", "type")
      .agg(count("userid").as("cnt"))
      .withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc)))
      .where("rn = 1")
      .select("userid", "type")

    val df22 = df.join(df1, "mid").groupBy("type", "mname")
      .agg(avg("score").as("avg"))
      .withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc)))
      .where("rn<4")
      .select("type", "mname")

    val df33 = df11.join(df22, "type")

    //spark3.1.2?? spark2.x

//    df33.write.csv()
    df33.write
      .format("csv")
      .save("data/csv")

//    df33.write.
//      csv("data/csv")
//    df33.write.json("data/json")

//    df33.write.parquet("data/parquet")
//    df33.write.orc("data/orc")
//    val pro = new Properties()
//    pro.put("user","root")
//    pro.put("password","hainiu")
//    df33.write.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro)
  }
}

为了简化存储的计算方式:

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object TestSink {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sink")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    import sqlSc.implicits._
    import org.apache.spark.sql.functions._
    val df = sc.textFile("data/a.txt")
      .map(t=>{
        val strs = t.split(" ")
        (strs(0),strs(1),strs(2),strs(3))
      }).toDF("id","name","age","gender")
      .withColumn("all",concat_ws(" ",$"id",$"name",$"age",$"gender"))
      .select("all")
//    df.write.csv("data/csv")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
//      .save("data/csv")
//    df.write.parquet("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
//      .save("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2")
//      .save("data/json")
    df.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2")
      .save("data/text")
  }
}

读取数据代码:

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

import java.util.Properties

object TestReadData {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions", "20")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
//    sqlSc.read.text("data/text").show()
//    sqlSc.read.csv("data/csv").show()
//  
//    sqlSc.read.parquet("data/parquet").show()
//    sqlSc.read.json("data/json").show()

    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").load("data/text").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2").load("data/csv").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2").load("data/json").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2").load("data/parquet").show()

    sqlSc.read.orc("data/orc").show()
    val pro = new Properties()
    pro.put("user","root")
    pro.put("password","hainiu")
    sqlSc.read.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro).show()
  }
}