文章目录
- Pyspark catalog用法
-
- catalog 介绍
- cache 缓存表
- uncache 清除缓存表
- cleanCache 清理所有缓存表
- createExternalTable 创建外部表
- currentDatabase 返回当前默认库
- tableExists 检查数据表是否存在,包含临时视图
- databaseExists 检查数据库是否存在
- dropGlobalTempView 删除全局临时视图
- dropTempView 删除临时视图
- functionExists 检查函数是否存在
- getDatabase 获取具有指定名称的数据库
- getFunction 获取方法
- getTable 获取数据表
- isCached 检查是否缓存成功
- listCatalogs 列出可用的catalogs
- listColumns 返回数据表的列信息
- listDatabases 获取数据库列表
- listTables 获取数据表,包含临时视图
- setCurrentDatabase 设置当前数据库
- refreshTable 刷新缓存
- refreshByPath 刷新路径
- recoverPartitions 恢复分区
Pyspark catalog用法
catalog 介绍
Catalog
是Spark中用于管理元数据信息的接口,这些元数据可能包括库、内部或外部表、函数、表列及临时视图等。
总的来说,PySpark Catalogs是PySpark框架中用于管理和查询元数据的重要组件,它使得Python用户能够更有效地利用PySpark进行大数据处理和分析。
spark = SparkSession.builder.appName('LDSX_TEST') \
.config('hive.metastore.uris', 'thrift://hadoop01:9083') \
.config('spark.master',"local[2]" ) \
.enableHiveSupport().getOrCreate()
cache 缓存表
可以设置缓存等级,默认缓存等级为MEMORY_AND_DISK,是数据表级别的缓存,跟缓存dataframe存在区别,
设置不存在的表报错
# 缓存数据表
spark.catalog.cacheTable('ldsx_test.ldsx_table_one')
#检查是否缓存成功
ldsx = spark.catalog.isCached('ldsx_test.ldsx_table_one')
>True
uncache 清除缓存表
当表不存在数据库会报错
spark.catalog.uncacheTable("ldsx_test.ldsx_table_one")
cleanCache 清理所有缓存表
spark.catalog.clearCache()
createExternalTable 创建外部表
# spark.catalog.createExternalTable(
# tableName='ldsx_test_table',
# path = './ldsx_one.csv',
# database='ldsx_test',
#
# )
currentDatabase 返回当前默认库
返回当前默认所在数据库spark.catalog.setCurrentDatabase 设置所在数据库
data = spark.catalog.currentDatabase()
tableExists 检查数据表是否存在,包含临时视图
data = spark.catalog.tableExists('ldsx_test.ldsx_table_one')
>True
databaseExists 检查数据库是否存在
data = spark.catalog.databaseExists('ldsx_test')
dropGlobalTempView 删除全局临时视图
全局临时表查找时候需要指向global_temp
要删除的表不存在报错
#创建全局临时表
spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
#注意查询时候需要指向 global_temp
spark.sql('select * from global_temp.my_table').show()
#删除全局临时
ldsx= spark.catalog.dropGlobalTempView("my_table")
dropTempView 删除临时视图
要删除的表不存在报错
#创建临时表
spark.createDataFrame([(1, 1)]).createTempView("my_table")
spark.sql('select * from my_table').show()
#删除临时表
ldsx = spark.catalog.dropTempView("my_table")
functionExists 检查函数是否存在
spark.catalog.functionExists("count")
>True
getDatabase 获取具有指定名称的数据库
data = spark.catalog.getDatabase("ldsx_test")
print(data)
>>Database(name='ldsx_test', catalog='spark_catalog', description='', locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db')
getFunction 获取方法
获取不到方法报错
spark.sql("CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
data = spark.catalog.getFunction("my_func1")
print(data)
>>Function(name='my_func1', catalog='spark_catalog', namespace=['default'], description='N/A.', className='test.org.apache.spark.sql.MyDoubleAvg', isTemporary=False)
getTable 获取数据表
获取不到表报错
data = spark.catalog.getTable("ldsx_table_one")
print(data)
>>Table(name='ldsx_table_one', catalog='spark_catalog', namespace=['ldsx_test'], description=None, tableType='MANAGED', isTemporary=False)
isCached 检查是否缓存成功
# 缓存数据表
spark.catalog.cacheTable('ldsx_test.ldsx_table_one')
data = spark.catalog.isCached('ldsx_test.ldsx_table_one')
>True
listCatalogs 列出可用的catalogs
catalogs = spark.catalog.listCatalogs()
print(catalogs)
listColumns 返回数据表的列信息
# 参数:数据表,数据库
catalogs = spark.catalog.listColumns('ldsx_table_one','ldsx_test')
print(catalogs)
>> [Column(name='age', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='name', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='fraction', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='class', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='gender', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False)]
listDatabases 获取数据库列表
data1 = spark.catalog.listDatabases()
print(data1)
>>[Database(name='default', catalog='spark_catalog', description='Default Hive database',
locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data'),
Database(name='ldsx_test', catalog='spark_catalog', description='',
locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db')]
listTables 获取数据表,包含临时视图
# 展示数据库中数据表以及临时视图
spark.catalog.setCurrentDatabase('ldsx_test')
spark.createDataFrame([(1,1)]).createTempView('TEST')
data = spark.catalog.listTables()
print(data)
>>[Table(name='ldsx_table_one', catalog='spark_catalog', namespace=['ldsx_test'], description=None,
tableType='MANAGED', isTemporary=False),
Table(name='TEST', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
setCurrentDatabase 设置当前数据库
spark.catalog.setCurrentDatabase('ldsx_test')
data = spark.catalog.currentDatabase()
print(data)
>> ldsx_test
refreshTable 刷新缓存
看官网案例是,刷新已经缓存的表
当一个表执行了cacheTable后,元数据有变动使用refreshTable进行元数据刷新
refreshByPath 刷新路径
# 假设有一个 Hive 表,其数据存储在 HDFS 上的某个路径
path = "/user/hive/warehouse/mydb.db/mytable"
# 刷新该路径下的表或分区信息
spark.catalog.refreshByPath(path)
df = spark.sql("SELECT * FROM mydb.mytable")
df.show()
recoverPartitions 恢复分区
recoverPartitions尝试恢复 Hive 表中丢失的分区信息,实际使用后更新