低版本hive(1.2.1)UDF实现清除历史分区数据

发布于:2025-07-12 ⋅ 阅读:(16) ⋅ 点赞:(0)

目标:通过UDF实现对表历史数据清除

入参:表名、保留天数N

一、pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>hive-udf-example</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>hive-udf-example</name>
  <description>Hive UDF for deleting partitions by date</description>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <!-- Hive Exec (Hive 1.2.1版本) -->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>1.2.1</version>
    </dependency>

    <!-- Hive Metastore (Hive 1.2.1版本) -->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-metastore</artifactId>
      <version>1.2.1</version>
    </dependency>

    <!-- Hadoop Client (Hive 1.2.1默认依赖Hadoop 2.7.3) -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.3</version>
    </dependency>


    <!-- SLF4J 日志 -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.25</version>
    </dependency>

    <!-- 打包 -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>


  </dependencies>

  <build>
    <plugins>
      <!-- 编译插件 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

二、java代码

package org.udf;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

@Description(
        name = "del_dt",
        value = "通过删除HDFS文件并同步元数据的方式删除表N天前的分区 - 入参:表名, N(天数)"
)
public class del_dt extends UDF {
    /*
    UDF复用需要修改 dbName(数仓schema) 和 sdf(分区格式)两个参数
     */

    private static final Logger LOG = LoggerFactory.getLogger(del_dt.class);

    public String evaluate(String tableName, int days) {
        if (tableName == null || days < 0) {
            return "错误:表名不能为空且天数不能为负数";
        }

        Configuration conf = new Configuration();
        FileSystem fs = null;
        HiveMetaStoreClient client = null;
        int deletedCount = 0;

        try {
            // 获取HDFS文件系统
            fs = FileSystem.get(conf);

            // 获取Hive元数据客户端
            HiveConf hiveConf = new HiveConf(conf, this.getClass());
            client = new HiveMetaStoreClient(hiveConf);

            // 解析表名(处理db.table格式)                                                    -- 需要修改的变量
            String dbName = "bjsythzczcpt";
            String tableOnlyName = tableName;
            if (tableName.contains(".")) {
                String[] parts = tableName.split("\\.");
                dbName = parts[0];
                tableOnlyName = parts[1];
            }

            // 检查表是否存在
            if (!client.tableExists(dbName, tableOnlyName)) {
                return "错误:表 " + tableName + " 不存在";
            }

            // 获取表信息
            Table table = client.getTable(dbName, tableOnlyName);

            // 检查表是否为分区表
            List<FieldSchema> partitionKeys = table.getPartitionKeys();
            if (partitionKeys == null || partitionKeys.isEmpty()) {
                return "错误:表 " + tableName + " 不是分区表";
            }

            // 检查是否包含日期分区列(假设为dt)
            boolean hasDatePartition = false;
            for (FieldSchema key : partitionKeys) {
                if (key.getName().equalsIgnoreCase("dt")) {
                    hasDatePartition = true;
                    break;
                }
            }

            if (!hasDatePartition) {
                return "错误:表 " + tableName + " 不包含日期分区列(dt)";
            }

            // 计算N天前的日期
            Calendar cal = Calendar.getInstance();
            cal.add(Calendar.DAY_OF_YEAR, -days);
            Date cutoffDate = cal.getTime();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");               // 分区格式
            String cutoffDateStr = sdf.format(cutoffDate);

            // 获取表的所有分区
            List<Partition> partitions = client.listPartitions(dbName, tableOnlyName, (short) -1);

            // 获取表的存储描述符(用于构建分区路径)
            String tableLocation = table.getSd().getLocation();

            // 遍历分区并删除N天前的分区
            for (Partition partition : partitions) {
                Map<String, String> partitionValues = getPartitionValues(client, partition);
                String dtValue = partitionValues.get("dt");

                if (dtValue != null) {
                    try {
                        Date partitionDate = sdf.parse(dtValue);
                        if (partitionDate.before(cutoffDate)) {
                            // 构建分区路径
                            String partitionPath = buildPartitionPath(tableLocation, partition.getValues(), partitionKeys);
                            Path hdfsPath = new Path(partitionPath);

                            // 删除HDFS上的分区数据
                            if (fs.exists(hdfsPath)) {
                                fs.delete(hdfsPath, true);
                                LOG.info("成功删除HDFS分区路径: {}", partitionPath);

                                // 从元数据中删除分区
                                client.dropPartition(dbName, tableOnlyName, partition.getValues(), true);

                                deletedCount++;
                                LOG.info("成功删除分区: {}", partition.getValues());
                            }
                        }
                    } catch (Exception e) {
                        LOG.error("处理分区失败 ({}): {}", partition.getValues(), e.getMessage());
                    }
                }
            }

            return "操作完成:成功删除 " + deletedCount + " 个分区";

        } catch (IOException | TException e) {
            LOG.error("执行失败: {}", e.getMessage());
            return "错误:执行失败 - " + e.getMessage();
        } finally {
            // 关闭资源
            if (fs != null) {
                try {
                    fs.close();
                } catch (IOException e) {
                    LOG.error("关闭HDFS连接失败: {}", e.getMessage());
                }
            }
            if (client != null) {
                client.close();
            }
        }
    }

    // 解析分区值(修正静态方法问题)
    private Map<String, String> getPartitionValues(HiveMetaStoreClient client, Partition partition) {
        Map<String, String> values = new HashMap<>();
        List<String> partitionVals = partition.getValues();

        try {
            // 使用已创建的client实例获取表信息
            Table table = client.getTable(partition.getDbName(), partition.getTableName());
            List<FieldSchema> partitionKeys = table.getPartitionKeys();

            for (int i = 0; i < Math.min(partitionKeys.size(), partitionVals.size()); i++) {
                values.put(partitionKeys.get(i).getName(), partitionVals.get(i));
            }
        } catch (TException e) {
            LOG.error("获取分区键失败: {}", e.getMessage());
        }

        return values;
    }

    // 构建分区路径
    private String buildPartitionPath(String tableLocation, List<String> partitionValues, List<FieldSchema> partitionKeys) {
        StringBuilder pathBuilder = new StringBuilder(tableLocation);

        for (int i = 0; i < partitionValues.size(); i++) {
            if (i < partitionKeys.size()) {
                pathBuilder.append("/")
                        .append(partitionKeys.get(i).getName())
                        .append("=")
                        .append(partitionValues.get(i));
            }
        }

        return pathBuilder.toString();
    }
}

三、函数创建与修改

-- 创建函数
add jar hdfs:///hdfs路径/jar包名.jar;
CREATE  FUNCTION del_dt AS 'org.udf.del_dt';

-- 修改函数
DELETE jar hdfs:///hdfs路径/jar包名.jar;
add jar hdfs:///hdfs路径/jar包名.jar;
drop FUNCTION del_dt;
CREATE  FUNCTION del_dt AS 'org.udf.del_dt';

四、调用示例;

-- 删除dwd_abc_df表历史分区数据,保留最近7天分区
hive> SELECT del_dt('dwd_abc_df',7);
-- 结果输出
OK
操作完成:成功删除 0 个分区
Time taken: 0.192 seconds

网站公告

今日签到

点亮在社区的每一天
去签到