SparkSQL整合Hive

发布于:2024-10-18 ⋅ 阅读:(82) ⋅ 点赞:(0)

spark-sql可以直接使用hive的元数据

1、环境搭建如下:

## 1、启动hive的元数据服务

```shell
# 1、修改hive的配置文件
cd /usr/local/soft/hive-3.1.3/conf

# 2、增加配置
vim hive-site.xml

<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083</value>
</property>

# 3、启动hive元数据服务
nohup  hive --service metastore >> metastore.log 2>&1 &
```

## 2、将hive的配置文件同步到spark conf目录下

```sql
cp hive-site.xml /usr/local/soft/spark-3.1.3/conf/
```

### 3、在spark sql命令行中使用hive的表

```shell
#命令行启动spark-sql
spark-sql --master local

#指定分区数为1
set spark.sql.shuffle.partitions=1;

create external table if not exists students(
    id bigint comment '学生id'
    ,name string comment '学生姓名'
    ,age bigint comment '学生年龄'
    ,sex string comment '学生性别'
    ,clazz string comment '学生班级'
) comment '学生信息表'
row format delimited fields terminated by ','
stored as textfile 
location 'hdfs://master:9000/data/student';

select clazz,count(1) as num from students
group by clazz;
```

2、在代码中写spark-sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Demo1DataFrame {
  def main(args: Array[String]): Unit = {

    //1、创建spark sql环境
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("df")
      //指定shuffle之后RDD的分区数
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import spark.implicits._

    //2、读取数据
    //DataFrame:在RDD的基础上增加了表结构,为了写sql
    val studentDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("id STRING,name STRING,age INT,sex STRING,clazz STRING")
      .load("data/students.txt")

    //查看数据
    studentDF.show()

    //创建临时视图
    studentDF.createOrReplaceTempView("students")

    //编写sql处理数据
    val clazzNumDF: DataFrame = spark.sql(
      """
        |select clazz,count(1) as num
        |from students
        |group by clazz
        |""".stripMargin)

    clazzNumDF.show()

    import org.apache.spark.sql.functions._
    //使用DSL处理数据
    val clazzNum: DataFrame = studentDF
      .groupBy("clazz")
      .agg(count("id") as "num")

    //保存结果
    clazzNum
      .write
      .format("csv")
      .option("sep", "\t")
    //.save("data/clazz_num")

    //使用RDD处理数据
    val kvDS: RDD[(String, Int)] = studentDF
      //转换成RDD
      .rdd
      .map {
        //DF中的每一行是一个ROW对象
        case Row(id, name, age, sex, clazz: String) => (clazz, 1)
      }

    kvDS
      .reduceByKey(_ + _)
      .foreach(println)

  }
}


今日签到

点亮在社区的每一天
去签到