spark-sql可以直接使用hive的元数据
1、环境搭建如下:
## 1、启动hive的元数据服务
```shell
# 1、修改hive的配置文件
cd /usr/local/soft/hive-3.1.3/conf
# 2、增加配置
vim hive-site.xml
<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083</value>
</property>
# 3、启动hive元数据服务
nohup hive --service metastore >> metastore.log 2>&1 &
```
## 2、将hive的配置文件同步到spark conf目录下
```sql
cp hive-site.xml /usr/local/soft/spark-3.1.3/conf/
```
### 3、在spark sql命令行中使用hive的表
```shell
#命令行启动spark-sql
spark-sql --master local
#指定分区数为1
set spark.sql.shuffle.partitions=1;
create external table if not exists students(
id bigint comment '学生id'
,name string comment '学生姓名'
,age bigint comment '学生年龄'
,sex string comment '学生性别'
,clazz string comment '学生班级'
) comment '学生信息表'
row format delimited fields terminated by ','
stored as textfile
location 'hdfs://master:9000/data/student';
select clazz,count(1) as num from students
group by clazz;
```
2、在代码中写spark-sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Demo1DataFrame {
def main(args: Array[String]): Unit = {
//1、创建spark sql环境
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("df")
//指定shuffle之后RDD的分区数
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import spark.implicits._
//2、读取数据
//DataFrame:在RDD的基础上增加了表结构,为了写sql
val studentDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",")
.schema("id STRING,name STRING,age INT,sex STRING,clazz STRING")
.load("data/students.txt")
//查看数据
studentDF.show()
//创建临时视图
studentDF.createOrReplaceTempView("students")
//编写sql处理数据
val clazzNumDF: DataFrame = spark.sql(
"""
|select clazz,count(1) as num
|from students
|group by clazz
|""".stripMargin)
clazzNumDF.show()
import org.apache.spark.sql.functions._
//使用DSL处理数据
val clazzNum: DataFrame = studentDF
.groupBy("clazz")
.agg(count("id") as "num")
//保存结果
clazzNum
.write
.format("csv")
.option("sep", "\t")
//.save("data/clazz_num")
//使用RDD处理数据
val kvDS: RDD[(String, Int)] = studentDF
//转换成RDD
.rdd
.map {
//DF中的每一行是一个ROW对象
case Row(id, name, age, sex, clazz: String) => (clazz, 1)
}
kvDS
.reduceByKey(_ + _)
.foreach(println)
}
}