spark的数据源

发布于:2025-03-24 ⋅ 阅读:(81) ⋅ 点赞:(0)

通用操作load和save操作

load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中。如果不指定format,那么默认的就是parquet文件。

// 读取加载 users.parquet, 不指定format ,默认  parquet格式
 val df = spark.read.load("src/main/resources/data/users.parquet")

// 先查询,再写入保存,mode是当遇到文件已存在时重写处理
df.select("name", "favorite_color").write.mode(SaveMode.Overwrite).save("src/main/resources/data/users1.parquet")

// 再查看写入的文件读取并展示
spark.read.load("src/main/resources/data/users1.parquet").show()

Spark SQL对于save操作,提供了不同的save mode,主要用来处理当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。

Save Mode

意义

SaveMode.ErrorIfExists (默认)

如果目标位置已经存在数据,那么抛出一个异常

SaveMode.Append

如果目标位置已经存在数据,那么将数据追加进去

SaveMode.Overwrite

如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖

SaveMode.Ignore

如果目标位置已经存在数据,那么就忽略,不做任何操作。

常见的数据格式

SparkSQL内建支持多种数据格式比如:txt、csv、excel(非内建)、json、Parquet、ORC等

txt格式

最常见最一般的文本格式,从文本中读取的话,就只有一个字段,字段名为value,字段值为每一行。可以使用text或者textFile算子读取。其中textFile算子底层调用的就是text算子,但是两者的区别是text算子返回的是DataFrame,而textFile算子返回的是DataSet,从源码可以看出,text算子支持多文件的形式

    // 单文件读取
    spark.read.format("text")
              .load("src/main/resources/data/aaaa.txt")
              .show(100)

    // 多文件读取
    val path = "src/main/resources/data/abc.txt"
    val path2 = "src/main/resources/data/aaaa.txt"
    val arrPath = Array(path, path2)
     spark.read.textFile(arrPath: _*)
                .show(100)

案例:文件读取并拆分

    // 从文本中读取的话,就只有一个字段,字段名为value,字段值为每一行
    spark.read.format("text").load("src/main/resources/data/aaaa.txt")
    //第一种 纯裸写map的形式
        spark.read.textFile("src/main/resources/data/aaaa.txt")
          .map{
            line =>{
              val split = line.split(",")
              (split(0), split(1))
            }
          }
          .toDF("stock","price")
          .show()

    //第二种 DSL的方式
        val df = spark.read.format("text").load("src/main/resources/data/aaaa.txt")
        import org.apache.spark.sql.functions._
        df.withColumn("split", split(df("value"),","))
          .selectExpr("split[0]","split[1]")
          .toDF("stock","price")
          .show()

    //第三种 SQL
    val df1 = spark.read.format("text").load("src/main/resources/data/aaaa.txt")
    df1.createTempView("my_table")
    spark.sql(
      """
        |select
        | split[0] as stock,split[1] as price
        |from
        | (
        |   select
        |     split(value,',') as split
        |   from
        |     my_table
        | )
        |""".stripMargin).show
csv格式

csv(Comma-Separated Values,逗号分隔值)文件和普通的文本文件一样被广泛使用,它是一种将所有的数据字段用逗号隔开的文本文件格式。在这些用逗号隔开的字段中,每行表示一条记录。虽然默认使用逗号作为字段分隔符,但也可以在数据中包含逗号时,使用其他分隔符代替逗号来分隔不同字段。常见的电子表格可以生成 csv 文件。

// 读 
val df = spark.read
            .option("header",true)
            .csv("src/main/resources/data/people.csv")
// 写
df.write.csv("src/main/resources/data/people2.csv")

option 参数详解

参数

默认值

含义

范围

header

false

设置为true时候 ,读取csv的时候,使用第一行作为列名

read/write

inferSchema

false

设置为true时,将自动推断schema类型

read

nullvalue

设置空值的字符串表示形式

read/write

mode

PERMISSIVE

允许在解析期间处理损坏记录的模式。它支持以下不区分大小写的模式。请注意,Spark 在列修剪下尝试仅解析 CSV 中所需的列。因此,损坏的记录可能会根据所需的字段集而有所不同。此行为可以通过spark.sql.csv.parser.columnPruning.enabled(默认启用)来控制。

PERMISSIVE: 当遇到损坏的记录时,将格式错误的字符串放入由 配置的字段中columnNameOfCorruptRecord,并将格式错误的字段设置为null。为了保留损坏的记录,用户可以设置以columnNameOfCorruptRecord用户定义的模式命名的字符串类型字段。如果模式没有该字段,它会在解析过程中删除损坏的记录。令牌少于/多于模式的记录不是 CSV 的损坏记录。当它遇到标记少于模式长度的记录时,设置null为额外字段。当记录的标记多于模式的长度时,它会丢弃额外的标记。

DROPMALFORMED: 忽略整个损坏的记录。

FAILFAST: 遇到损坏的记录时抛出异常。

read

sep

,

为每个字段和值设置分隔符。此分隔符可以是一个或多个字符。

read/write

compression

保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none, bzip2, gzip, lz4, snappy and deflate).

write

excel格式

Excel是Microsoft为使用Windows操作系统下最常用的一款表格软件,对于数据分析包括原始数据的导入,在很多场景下都是使用excel的格式,那么在这种情况就需要spark sql能够处理这种格式文件的能力。

默认情况下,spark sql是不支持读取excel文件的,好在github上有开源人员贡献了相关的操作excel格式的工具包,大大的提高的开发效率,具体地址:https://github.com/crealytics/spark-excel

在maven的pom文件内加入以下依赖

<dependency>
    <groupId>com.crealytics</groupId>
    <artifactId>spark-excel_${scala.version}</artifactId>
    <version>${spark.version}_0.16.0</version>
</dependency>
    // 读取Excel文件  format("com.crealytics.spark.excel") 路径是固定写法,第三方开源库
    spark.read.format("com.crealytics.spark.excel")
      .option("header", "true")    // 第一行是否作为表头,如果为true则作为表头,该参数必须要设置
      .option("dataAddress", "B:D")    // 指定读取数据的起始位置,默认从A1开始
      .load("src/main/resources/data/设备信息.xlsx")
      .show()


      // 也可以使用这种隐式转换
      import com.crealytics.spark.excel._
       spark.read.excel()

以下为拓展

/*
    val excelDF = spark.read
      // 指定excel的format
      .format("com.crealytics.spark.excel")
      // 第一行是否作为表头,如果为true则作为表头,该参数必须要设置
      .option("header", "true")
      //       .option("inferSchema", "true") // 是否推断schema
      // .option("workbookPassword", "None") // excel文件的打开密码
      // 默认读取整个Excel
      //      .option("dataAddress", "A:E")
      .option("treatEmptyValuesAsNulls", "true") // 空值是否为作为null
      .load("src/main/resources/data/设备信息.xlsx") // 如果是本地文件需要标注'file:///实际路径'因为spark会默认将HDFS作为文件系统
    //    val excelHeader = Seq("id", "mac", "deviceId", "siteId", "siteName", "product", "hard", "soft", "traffic") // 自定义表头名称
    //    val frameDF = excelDF.toDF(excelHeader: _*)
    excelDF.show() */
JSON 格式

JSON(JavaScript Object Notation)它是一种常见的数据格式,相比XML,JSON 的可读性更强,更容易解析,因此声名鹊起。JSON 有两种表示格式:单行模式和多行模式。这两种模式 Spark 都支持。在单行模式中,每行表示一个 JSON 对象;在多行模式中,整个多行对象组成一个 JSON 对象。要想用多行模式读取数据,只需在 option 方法中将multiLine 设置为 true 即可。

    // 读取json文件,使用默认的schema
    spark.read.json("src/main/resources/data/people.json")
      .printSchema()

    // 读取json文件,指定schema
    val schema =
      """
        |address string,
        |age int,
        |name string
        |""".stripMargin

    spark.read
      .schema(schema)
      .json("src/main/resources/data/people.json")
      .printSchema()

执行结果 第一个默认age是long类型,第二个指点是int类型

JSON option参数详解

参数

默认值

含义

范围

multiline

false

是否支持json的多行模式

ignoreNullFields

(spark.sql.jsonGenerator.ignoreNullFields配置值)

生成 JSON 对象时是否忽略空字段。

write

compression

同csv

mode

同csv

columnNameOfCorruptRecord

(spark.sql.columnNameOfCorruptRecord配置值)

允许重命名具有由PERMISSIVE模式创建的格式错误的字符串的新字段。这会覆盖 spark.sql.columnNameOfCorruptRecord。

read

allowSingleQuotes

true

除了双引号外,还允许单引号。

read

allowNumericLeadingZero

false

允许在数字中使用前导零(例如 00012)。

read

场景:在企业中,解析JSON的时候,是使用强Schema方案,还是弱Schema方案,这两种方案分别有哪些优势和劣势?

什么是强Schema方案?什么是弱Schema方案?

强Schema指的是我们在解析JSON的时候使用业务方提供的JSON Schema,指定Schema入库

弱Schema指的是我们在解析JSON的时候不指定Schema

强Schema方案其实更规范一点,但实际推行过程中会比较难,企业需要使用统一的埋点平台与完善的埋点流程才可以保证schema的正确性。

弱Schema其实不用业务方维护Schema,但更难管理,数据接入时,我们可能需要写很多硬编码来去解决Schema的冲突和新增。

建议:如果是初创企业,业务这时候变化很快,这时候推荐用弱Schema方案,但如果是中大型企业,业务比较稳定时,推荐使用强Schema方案。

 parquet格式

Parquet是面向分析型业务的列式存储格式

列式存储和行式存储相比有哪些优势呢?

1、可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

2、压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。

3、只读取需要的列,支持向量运算,能够获取更好的扫描性能。 

Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。

  // 标准读写模式:
  // 读:spark.read.format("数据源的格式").load("数据源的位置")
  // 写:df.write.format("数据源的格式").save("数据源的位置")

Parquet自动分区推断

表分区是一种常见的优化方式,比如Hive中就提供了表分区的特性。在一个分区表中,不同分区的数据通常存储在不同的目录中,分区列的值通常就包含在了分区目录的目录名中。Spark SQL中的Parquet数据源,支持自动根据目录名推断出分区信息。

// 分区1
spark.read
  .option("multiline",true)
  .json("src/main/resources/data/game/raw/user_login/00.json")
  .write
  .parquet("src/main/resources/data/eg/parquet/1/dt=20220401")

// 分区2
spark.read
  .option("multiline",true)
  .json("src/main/resources/data/game/raw/user_login/00.json")
  .write
  .parquet("src/main/resources/data/eg/parquet/1/dt=20220402")

// 查询所有
spark.read.parquet("src/main/resources/data/eg/parquet/1/").printSchema()

Spark Parquet 相关参数讲解

参数

Default

含义

spark.sql.optimizer.nestedSchemaPruning.enabled

false

默认false,设置为true时可开启嵌套结构列裁剪

spark.sql.parquet.filterPushdown

true

开启paruqet结构谓词下推

spark.sql.parquet.compression.codec

snappy

设置paruqet的压缩格式,支持none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.

ORC格式

和Parquet类似,它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。目前被Spark SQL、Presto等查询引擎支持,但是Impala对于ORC目前没有支持,Impala仍然使用Parquet作为主要的列式存储格式。

ORC具有以下一些优势:

ORC是列式存储,有多种文件压缩方式,并且有着很高的压缩比。

文件是可切分(Split)的。因此,在Hive中使用ORC作为表的文件存储格式,不仅节省HDFS存储资源,查询任务的输入数据量减少,使用的MapTask也就减少了。

提供了多种索引,row group index、bloom filter index。

ORC可以支持复杂的数据结构(比如Map等)

    // orc的写
    spark.read
      .option("multiline",true)
      .json("src/main/resources/data/game/raw/user_login/00.json")
      .write
      .orc("src/main/resources/data/orc/raw/1")

    // orc的读
    spark.read
      .orc("src/main/resources/data/orc/raw/1")
      .show()

    // orc也可以像parquet一样,自动识别分区
    spark.read
      .orc("src/main/resources/data/orc/raw/1")
      .write
      .orc("src/main/resources/data/orc/raw/2/dt=20230715")

    spark.read
      .orc("src/main/resources/data/orc/raw/1")
      .write
      .orc("src/main/resources/data/orc/raw/2/dt=20230716")

    spark.read
      .orc("src/main/resources/data/orc/raw/2")
      .printSchema()
企业中Parquet存储格式和ORC存储格式该如何选择

ORC主要还是针对Hive的存储格式,如果企业中是对Hive过度依赖的话,推荐使用ORC,但如果在企业中刚开始只是用Hive管理元数据,主要的计算引擎是Spark的话,还是推荐选择Parquet.

Spark新出的湖仓一体的组件Delta Lake底层存储的是Parquet,如果考虑企业中未来会使用Delta Lake,存储格式还是选择Parquet会更好

如果企业中的查询引擎主要是Presto的话,Presto对ORC文件读取做了特定的优化,这中情况下,推荐使用ORC会更好。


网站公告

今日签到

点亮在社区的每一天
去签到