sparksql

发布于:2024-10-18 ⋅ 阅读:(5) ⋅ 点赞:(0)

1.1什么是sparksql

 Spark SQL 是 Spark 用于处理结构化数据的模块。

一、主要特点

  1. 统一的数据处理

    • Spark SQL 提供了一个统一的编程接口,可以处理多种数据源,包括 Hive 表、Parquet 文件、JSON 文件、关系型数据库等。这使得用户可以在一个统一的环境中进行数据处理,无需切换不同的工具和技术。
    • 例如,可以使用相同的代码读取 Hive 表和 Parquet 文件,并进行联合查询和分析。
  2. 支持 SQL 查询

    • 用户可以使用标准的 SQL 语言进行数据查询和分析。Spark SQL 支持大多数 SQL 语法和功能,包括 SELECT、JOIN、GROUP BY、WHERE 等语句,以及窗口函数、子查询等高级功能。
    • 这使得熟悉 SQL 的用户可以轻松地使用 Spark SQL 进行数据分析,无需学习新的编程语言。
  3. 与 Spark 生态系统集成

    • Spark SQL 与 Spark 的其他模块(如 Spark Core、Spark Streaming、MLlib、GraphX)紧密集成,可以在大规模数据处理和分析中发挥重要作用。
    • 例如,可以将 Spark Streaming 接收的实时数据流存储到 Spark SQL 的表中,然后使用 SQL 进行实时分析;或者将 Spark SQL 的查询结果作为输入,用于机器学习算法的训练和预测。
  4. 性能优化

    • Spark SQL 采用了多种性能优化技术,以提高数据处理的效率和速度。例如,它使用了基于成本的优化器(CBO)来自动优化 SQL 查询计划,选择最优的执行方式;同时,它还支持内存列式存储和向量化执行,以提高数据的读取和处理速度。
    • 此外,Spark SQL 还可以利用 Spark 的分布式计算能力,将数据处理任务并行化到多个节点上,提高处理大规模数据的能力。

二、使用场景

  1. 数据仓库和数据分析

    • Spark SQL 可以作为一个数据仓库解决方案,用于存储和分析大规模的结构化数据。它支持与 Hive 兼容的 metastore,可以直接读取和写入 Hive 表,方便与现有的数据仓库集成。
    • 同时,Spark SQL 提供了丰富的数据分析功能,如 SQL 查询、数据透视表、聚合函数等,可以满足各种数据分析需求。
  2. 实时数据分析

    • 结合 Spark Streaming,Spark SQL 可以实现实时数据分析。可以将实时数据流存储到 Spark SQL 的表中,然后使用 SQL 进行实时查询和分析,以满足对实时数据的监控和决策需求。
    • 例如,在金融交易、网络监控、社交媒体分析等领域,可以使用 Spark SQL 进行实时风险监控、异常检测和趋势分析。
  3. 机器学习和数据挖掘

    • Spark SQL 可以与 Spark MLlib 集成,用于机器学习和数据挖掘任务。可以使用 SQL 查询从大规模数据集中提取特征,然后将这些特征输入到机器学习算法中进行训练和预测。
    • 例如,在推荐系统、客户细分、欺诈检测等领域,可以使用 Spark SQL 和 MLlib 进行数据预处理、特征工程和模型训练。
  4. 数据可视化

    • Spark SQL 的查询结果可以与各种数据可视化工具集成,如 Tableau、PowerBI 等,以实现数据的可视化展示。通过 SQL 查询,可以从大规模数据集中提取关键信息,并以直观的图表和图形展示出来,帮助用户更好地理解和分析数据。

 1.2创建datafram

# 导入行类Row
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *

# 创建行数据
r1 = Row(id=1, name='张三', age=20)
r2 = Row(id=2, name='李四', age=22)
# 创建元数据
schema = (StructType().add('id', IntegerType(), nullable=False)
                      .add('name', StringType(), nullable=False)
                      .add('age',IntegerType(),nullable=False)
                      .add('gender', StringType(), nullable=False))
print(schema)

# 创建dataframe
# 生成sparksession对象  按照固定写法创建
ss = SparkSession.builder.getOrCreate()
# 使用sparksession对象方法创建df
# createDataFrame 第一参数是一个列表数据,将每行数据放入列表
# 第二个参数指定表元数据信息
# df是一个dataframe类型的对象
df = ss.createDataFrame([r1, r2], schema=schema)

# dataframe数据的操作
# 查看df数据
df.show()  # 查看所有数据,超过20行时,默认只显示20行
# 查看元信息
df.printSchema()

1.3rdd与df之间的转化 

# RDD和DF之间的转换
# 导入SparkSession
from pyspark.sql import SparkSession

# 创建对象
ss = SparkSession.builder.getOrCreate()

# 使用sparksession获取sparkcontext
sc = ss.sparkContext # 不要括号,可以直接获取到sparkcontext对象

# 生成rdd数据
# rdd转df时,要求数据是二维嵌套列表
data = [[1,'张三',20,'男'],[2,'小红',19,'女']]
rdd = sc.parallelize(data)

# rdd转df
df = rdd.toDF(schema='id int,name string,age int,gender string')

# 查看df数据
df.show()

# 查看表结构
df.printSchema()


# 将df转为rdd
rdd2 = df.rdd
# 查看rdd中数据
res = rdd2.collect() # [Row(),Row()]
# 转化后的rdd中每个元素是有个Row类对象
print(res)
print(res[0])
print(res[0]['name'])

1.4pandas和spark之间转化

pandas的df转为spark的df  

# Pandas和spark之间的转化
import pandas as pd
from pyspark.sql import SparkSession
# 创建pd的df
pd_df = pd.DataFrame(
    {
        'id':[1,2,3,4],
        'name':['a','b','c','d'],
        'age':[20,21,22,24],
        'gender':['男','女','男','男']
    }
)
# 查看数据
print(pd_df)

# 将pd_df 转为spark的df
ss = SparkSession.builder.getOrCreate()
spark_df = ss.createDataFrame(pd_df)

# 查看数据
spark_df.show()

spark的df转为pandas的df

# 将spark_df转为pd_df
pd_df2 = spark_df.toPandas()
print(pd_df2)

1.5读取文件数据转为df

# 读取数据转化为df
from pyspark.sql import SparkSession

# 创建sparksession
ss = SparkSession.builder.getOrCreate()

# 读取不同数据源
# header=True 是否需要获取表头
# sep 指定数据字段按照什么字符分割
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
# schema当没有表头时,可以自己指定字段
df2 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')

df3 = ss.read.json('hdfs://node1:8020/data/employees.json')
df4 = ss.read.orc('hdfs://node1:8020/data/users.orc')
df5 = ss.read.parquet('hdfs://node1:8020/data/users.parquet')


# 查看
# show中可以指定显示多少行,默认是20行
df.show(100)

df2.show()

 2.DataFrame基本使用

2.1SQL语句

# 使用sql方式开发
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取数据得到df数据
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')

# 对df数据进行sql操作
# 需要给df指定一个表名
df.createTempView('tb_user')

# 编写sql语句执行
# sql执行后的结果被保存新的df中
new_df =  ss.sql('select gender,avg(age) as avg_data from tb_user group by gender')
new_df.show()

 2.2DSL方法

# 使用DSL方式开发
from pyspark.sql import  SparkSession

ss = SparkSession.builder.getOrCreate()

# 生成df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')
# 查看df数据
df.show()

print('---------------select方法----------------------')
# 使用select方法指定输出展示的数据字段
# 方式一指定字段
df_select = df.select('id','name')
# 方式二
df_select2 = df.select(df.age,df.gender)
# 方式三
df_select3 = df.select(df['id'],df['cls'])

df_select.show()
df_select2.show()
df_select3.show()

print('---------------alias方法----------------------')
# 字段名称修改,需要配合select中使用
df_alias = df.select(df.id.alias('user_id'),df.name.alias('username'))
df_alias.show()

print('---------------cast方法----------------------')
# 修改字段的数据类型
df.printSchema()
df_cast = df.select(df.id.cast('int'),df.name,df.age)
df_cast.printSchema()

print('---------------where方法----------------------')
# 数据过滤,where方法内部是调用了filter方法
# 方式1
df_where = df.where('age > 20')
df_where.show()
#方式2
df_where2 = df.where(df.age > 20)
df_where2.show()

# 与或非多条件 只能使用方式1  条件的书写和在sql中的where书写内容一样
df_where3 = df.where('age > 20 and gender = "男" ')
df_where3.show()

print('---------------groupby方法----------------------')
# 分组计算,可以配和聚合方法一起使用  使用该方式聚合一次只能计算一个聚合数据 ,可以使用内置函数配合agg方法
# groupby指定分组字段,可以指定多个
# avg 聚合方法  指定聚合字段  sum  count  avg  max  min
df_groupby = df.groupby('gender').avg('age')
df_groupby.show()

# groupby指定分组字段,可以指定多个
df_groupby2 = df.groupby('gender','cls').avg('age')
df_groupby2.show()

# 分组后的数据过滤
df_groupby3 = df.groupby('gender','cls').avg('age').where(' avg(age) > 19')
df_groupby3.show()

print('---------------orderby方法----------------------')
# 数据排序 内部调用sort方法
df_orderby = df.orderBy('age')
df_orderby.show()
# ascending=False 降序
df_orderby2 = df.orderBy('age',ascending=False)
df_orderby2.show()

print('---------------limit方法----------------------')
# 指定获取多条数据
df_limit = df.orderBy('age',ascending=False).limit(5)

df_limit.show()