【无标题】

发布于:2025-04-18 ⋅ 阅读:(24) ⋅ 点赞:(0)

以下是今天学习的知识点以及代码测试:

第七节 Spark-SQL核心编程(六)

MySQL

Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对

DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。

IDEA通过JDBC对MySQL进行操作:

  1. 导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

MySQL8  <version>8.0.11</version>

  1. 读取数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

//通用的load方式读取


spark.read.format("jdbc")
  .option("url","jdbc:mysql://localhost:3306/system")
  .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver
  .option("user","root")
  .option("password","123456")
  .option("dbtable","user")
  .load().show()

spark.stop()

//通用的load方法的另一种形式
spark.read.format("jdbc")
  .options(
    Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))
  .load().show()

//通过JDBC
val pros :Properties = new Properties()
pros.setProperty("user","root")
pros.setProperty("password","123456")
val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)
df.show()

  1. 写入数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._
val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),
  Stu("zs", 30)))
val ds:Dataset[Stu] = rdd.toDS()

ds.write.format("jdbc")
  .option("url","jdbc:mysql://localhost:3306/system")
  .option("driver","com.mysql.jdbc.Driver")
  .option("user","root")
  .option("password","123456")
  .option("dbtable","user2")
  .mode(SaveMode.Append)
  .save()

spark.stop()

 Spark-SQL核心编程(七)

代码操作Hive

1. 导入依赖。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.3.3</version>
</dependency>

可能出现下载jar包的问题:

D:\maven\repository\org\pentaho\pentaho-aggdesigner-algorithm\5.1.5-jhyde

2. 将hive-site.xml 文件拷贝到项目的 resources 目录中。

3. 代码实现。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hive")
val spark:SparkSession = SparkSession.builder()
  .enableHiveSupport()
  .config(sparkConf)
  .getOrCreate()

spark.sql("show databases").show()
spark.sql("create database spark_sql")
spark.sql("show databases").show()

可以在代码最前面增加如下代码解决:

System.setProperty("HADOOP_USER_NAME", "node01")

此处的 node01 改为自己的 hadoop 用户名称

  1. 在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址: config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse")


网站公告

今日签到

点亮在社区的每一天
去签到