通过启用Ranger插件的Hive审计日志同步到Doris做分析

发布于:2025-03-21 ⋅ 阅读:(19) ⋅ 点赞:(0)

以下是基于Apache Doris的Ranger Hive审计日志同步方案详细步骤,结合审计日志插件与数据导入策略实现:


一、Doris环境准备

1. 创建审计日志库表

参考搜索结果的表结构设计,根据Ranger日志字段调整建表语句:

CREATE DATABASE IF NOT EXISTS ranger_audit;
 
 
CREATE TABLE IF NOT EXISTS ranger_audit_hive_log (
  repoType INT COMMENT '仓库类型标识',
  repo VARCHAR(50) COMMENT '目标仓库名称',
  reqUser VARCHAR(50) COMMENT '请求用户',
  evtTime DATETIMEV2(3) COMMENT '事件时间(毫秒精度)',
  access VARCHAR(20) COMMENT '操作类型',
  resource VARCHAR(255) COMMENT '资源路径',
  resType VARCHAR(20) COMMENT '资源类型',
  action VARCHAR(20) COMMENT '具体动作',
  result TINYINT COMMENT '执行结果(0失败/1成功)',
  agent VARCHAR(50) COMMENT '代理服务类型',
  policy INT COMMENT '策略ID',
  enforcer VARCHAR(50) COMMENT '策略执行组件',
  sess VARCHAR(36) COMMENT '会话ID(UUID)',
  cliType VARCHAR(20) COMMENT '客户端类型',
  cliIP ipv4 COMMENT '客户端IP',
  reqData TEXT COMMENT '原始请求数据',
  agentHost VARCHAR(50) COMMENT '代理主机名',
  logType VARCHAR(20) DEFAULT 'RangerAudit' COMMENT '日志类型',
  id VARCHAR(50) COMMENT '唯一事件ID',
  seq_num INT COMMENT '序列号',
  event_count INT COMMENT '事件计数',
  event_dur_ms INT COMMENT '事件持续时间(ms)',
  tags ARRAY<VARCHAR(50)> COMMENT '标签数组',
  additional_info VARCHAR(500) COMMENT '扩展信息(结构化数据)',
  cluster_name VARCHAR(50) COMMENT '集群名称',
  policy_version INT COMMENT '策略版本'
)
ENGINE=OLAP
DUPLICATE KEY(repoType,repo,reqUser,evtTime)
COMMENT 'Ranger审计Hive日志存储表'
PARTITION BY RANGE(evtTime)()
DISTRIBUTED BY HASH(id) BUCKETS auto
PROPERTIES (
  "replication_num" = "3",
  "dynamic_partition.enable" = "true",
  "dynamic_partition.create_history_partition" = "true",
  "dynamic_partition.time_unit" = "DAY",
  "dynamic_partition.start" = "-180",
  "dynamic_partition.end" = "7",
  "dynamic_partition.prefix" = "p",
  "dynamic_partition.buckets" = "8"
);

2. 启用审计日志插件

在Doris FE节点部署审计日志插件:

# 解压插件到FE插件目录
unzip auditloader.zip -d /opt/doris/fe/plugins/audit/
# 修改plugin.conf配置Ranger日志源
frontend_host_port = 192.168.1.101:8030  # Doris FE地址
database = ranger_audit
audit_log_table = log_main
user = sync_user  # 有写入权限的账户

三、日志传输与加载

1. Broker Load定时导入

创建每日定时任务加载HDFS上的审计日志:

LOAD LABEL broker_load_test_user111
(
    DATA INFILE("hdfs://ns01/ranger/audit/hiveServer2/20250318/hiveServer2_ranger_audit.log")
    INTO TABLE ranger_audit_hive_log
    FORMAT AS "json"
)
WITH HDFS
(
    "fs.defaultFS" = "hdfs://ns01",
    "hadoop.security.authentication" = "kerberos",
    "hadoop.kerberos.principal" = "user01@HADOOP.COM",
    "hadoop.kerberos.keytab" = "/etc/security/keytabs/user01.keytab",
    "dfs.nameservices" = "ns01",
    "dfs.ha.namenodes.ns01" = "nn1,nn2",
    "dfs.namenode.rpc-address.ns01.nn1" = "192.168.1.101:8020",
    "dfs.namenode.rpc-address.ns01.nn2" = "192.168.1.102:8020",
    "dfs.client.failover.proxy.provider.ns01" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "hadoop.security.auth_to_local" = "RULE:[1:$1@$0](^.*@.*$)s/^(.*)@.*$/$1/g
                                   RULE:[2:$1@$0](^.*@.*$)s/^(.*)@.*$/$1/g
                                   DEFAULT"
)
PROPERTIES
(
    "timeout" = "14400",
    "max_filter_ratio" = "0.1",
    "exec_mem_limit" = "2147483648",
    "strict_mode" = "false",
    "timezone" = "Asia/Shanghai",
    "load_parallelism" = "1",
    "send_batch_parallelism" = "1",
    "load_to_single_tablet" = "false",
    "priority" = "NORMAL"
)
COMMENT "备注、注释";

2. 脏数据处理

# 我这里additional_info 字段中不是一个正确的json串,所以建表类型用的是varchar
报错一:Reason: no partition for this tuple. tuple=
数据写入时,没有对应的分区

报错二:Reason: column(additional_info) value is incorrect while strict mode is true, src value is
additional_info类型设置为varchar

四、运维监控与优化

  1. 数据质量校验
    -- 检查数据完整性,1000根据自身实际的审计日志数量调整
    SELECT DATE(evtTime) AS day, COUNT(*) FROM ranger_audit_hive_log
    GROUP BY day HAVING COUNT(*) < 1000;  # 阈值告警
    
  2. 查询加速
    为核心字段添加索引:
    ALTER TABLE log_main ADD INDEX idx_action (action) USING BITMAP;
    

五、安全加固

  1. 权限隔离
    按角色控制访问权限:

    -- 创建只读角色
    CREATE ROLE audit_viewer;
    GRANT Select_priv ON audit_log.* TO audit_viewer;
    
  2. 敏感字段脱敏
    通过Ranger策略对reqData字段动态脱敏:

    -- 在Ranger中配置策略
    Mask类型: Partial mask show last 4
    生效字段: reqData中的手机号(如`1*******5678`

六、故障处理指南

  1. 常见问题排查
    数据延迟:检查Broker Load任务状态 SHOW LOAD WHERE LABEL LIKE 'ranger%';
    格式错误:使用curl -X GET http://fe_host:8030/api/_load_error_log?file=__ranger_audit_log获取错误详情
    权限拒绝:验证用户权限 SHOW GRANTS FOR sync_user;

  2. 日志清理策略

    -- 调整保留周期为90天
    ALTER TABLE log_main SET ("dynamic_partition.start" = "-90");
    

通过以上步骤可实现Ranger审计日志从采集、传输到分析的全链路管理。实际部署时需根据集群规模调整分桶数和并发参数,并定期检查存储策略有效性。