一、获取文件或目录
1. 获取某个目录下的文件
// 必须的依赖
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
// 获取某个目录下的文件路径
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {
// 获取文件系统
val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意这里用 URI 让 Hadoop 根据 scheme 找对应 FS
// 递归获取该目录下所有文件
val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)
// 获取文件路径
val buffer = scala.collection.mutable.ArrayBuffer[String]()
while (it.hasNext) {
val fileStatus = it.next()
buffer += fileStatus.getPath.toString
}
// 关闭文件系统
fs.close()
// 返回结果
buffer.toArray
}
// 设定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem") // 读取oss的路径
// 需要指定的路径
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)
2. 获取某个目录下的子目录
import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}
/**
* 获取某个目录下所有子目录的路径, 以字符串数组的形式返回
*/
def getOnlineFirstDir: Array[String] = {
// 获取路径
val path = s"s3://aa/bb/"
val filePath = new org.apache.hadoop.fs.Path( path )
// 获取文件系统
val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )
// 获取所有子目录的路径
val allFiles = FileUtil.stat2Paths( fileSystem.listStatus( filePath ) )
val res = allFiles.filter( fileSystem.getFileStatus( _ ).isDirectory() ).map( _.toString)
// 返回结果
res
}
二、删除文件或目录
/**
* 删除目录
*/
def deletePath(spark: SparkSession, path: String): Unit = {
// 1 获取文件系统
val file_path = new org.apache.hadoop.fs.Path( path )
val file_system = file_path.getFileSystem( spark.sparkContext.hadoopConfiguration )
// 2 判断路径存在时, 则删除
if (file_system.exists( file_path )) {
file_system.delete( file_path, true )
}
}
三、获取文件或目录大小
/**
* 获取某个目录的大小(单位b字节),注意:只能在driver端使用,可以多线程来提速。
*/
def get_path_size(spark: SparkSession, path: String): Long = {
//取文件系统
val filePath = new org.apache.hadoop.fs.Path( path )
val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )
// 获取该目录的大小,单位是字节
if (fileSystem.exists( filePath )) {
fileSystem.getContentSummary( filePath ).getLength
} else {
0
}
}
四、判读文件或目录是否存在
方式一
/**
* 判断目录是否存在,注意:只能在driver端使用,可以多线程来提速。问题: 对删除过的目录可能会误判
*/
def pathIsExist(spark: SparkSession, path: String): Boolean = {
//取文件系统
val filePath = new org.apache.hadoop.fs.Path( path )
val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )
// 判断路径是否存在
fileSystem.exists( filePath )
}
方式二
/**
* 通过目录是否大于0来判断目录是否存在(消除对删除过的目录的误判),注意:只能在driver端使用,可以多线程来提速。
*/
def def pathIsExist(spark: SparkSession, path: String): Boolean =
//取文件系统
val filePath = new org.apache.hadoop.fs.Path( path )
val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )
// 获取该目录的大小,单位是字节
val size = if (fileSystem.exists( filePath )) {
fileSystem.getContentSummary( filePath ).getLength
} else {
0
}
// 返回结果
size > 0
}
五、parquet文的行组信息
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.parquet.column.statistics.Statistics
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.api.Binary
import java.{lang, util}
// 获取某个目录下的文件路径
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {
// 获取文件系统
val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意这里用 URI 让 Hadoop 根据 scheme 找对应 FS
// 递归获取该目录下所有文件
val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)
// 获取文件路径
val buffer = scala.collection.mutable.ArrayBuffer[String]()
while (it.hasNext) {
val fileStatus = it.next()
buffer += fileStatus.getPath.toString
}
// 关闭文件系统
fs.close()
// 返回结果
buffer.toArray
}
// 某个文件某列的行组信息
def print_row_groupp(conf: Configuration, file_name: String, col_name: String): Unit = {
// 读取元数据
val parquetFilePath = new Path(file_name)
val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)
val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter
// 遍历每个行组,并手动添加索引
val blocks: util.List[BlockMetaData] = footer.getBlocks
for (i <- 0 until blocks.size()) {
val block = blocks.get(i)
println(s"Row Group #${i}:")
println(s" - Total Rows: ${block.getRowCount}")
println(s" - Total Size: ${block.getTotalByteSize} bytes")
// 遍历每个列块
block.getColumns.forEach { columnChunkMetaData =>
val columnPath = columnChunkMetaData.getPath.toDotString
// 过滤目标列
if (columnPath == col_name) {
val statistics: Statistics[_] = columnChunkMetaData.getStatistics
println(s" Column: $columnPath")
if (statistics != null) {
// 获取最小值和最大值并解码
val minValue = statistics.genericGetMin match {
case b: Binary => b.toStringUsingUTF8
case l: lang.Long => l.toString
case i: Integer => i.toString
case other => other.toString
}
val maxValue = statistics.genericGetMax match {
case b: Binary => b.toStringUsingUTF8
case l: lang.Long => l.toString
case i: Integer => i.toString
case other => other.toString
}
println(s" - Min Value: $minValue")
println(s" - Max Value: $maxValue")
println(s" - Null Count: ${statistics.getNumNulls}")
} else {
println(" - No statistics available for this column.")
}
println(" ------")
}
}
println("======================")
}
}
// 某个文件的行组数
def get_row_group_size(conf: Configuration, file_name: String): Int = {
// 读取元数据
val parquetFilePath = new Path(file_name)
val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)
val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter
// 行组数
footer.getBlocks.size()
}
// 设定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
// 需要指定的路径
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)
// 获取第一个文件的行组信息
val first_file = file_paths(0)
print_row_groupp(conf, first_file, "odid")
// 统计行组数
for (file_path <- file_paths) {
val file_index = file_path.split("part-")(1).split("-")(0)
println(file_index + " = " + get_row_group_size(conf, file_path))
}