apache hive
源码中有 org.apache.hadoop.hive.ql.hooks.LineageLogger
类可以获取 insert hql
的字段之间的关系。但是又由于 org.apache.hadoop.hive.ql.optimizer.Optimizer
的原因,使我们重写 hook
类无法实现字段级血缘。
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_LINEAGE_INFO) // 版本 4.0+加入
|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")
|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")
// 版本 2.3 加入
|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {
transformations.add(new Generator(postExecHooks));
}
现在考虑通过LineageLogger
搭配日志监测服务来实现字段级血缘
- 加入插件
conf/hive-site.xml
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value>
</property>
- 打开日志
conf/log4j.properties
log4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO
hive
任务日志目录
>set system:hive.log.dir; # 服务日志
>set hive.querylog.location; #查询日志
/tmp/hive-{用户名}/
4.写脚本监测
# -*- coding: utf-8 -*-
import hashlib
import json
import os.path
from json import JSONDecodeError
import requests
log_path_list = [
"/tmp/root/hive.log"
]
def read_hive_log(file_path):
"""
读取Hive日志文件并返回包含关键词的行内容列表
参数:
file_path (str):Hive日志文件的路径
返回:
content (list):包含关键词的行内容json列表
"""
save_dict = {}
if os.path.exists('./hash_index.log'):
try:
with open("./hash_index.log", 'r') as f:
file_content = f.read()
if file_content != '':
save_dict = json.loads(file_content)
except json.JSONDecodeError as e:
print(f"无法将文件内容转换为JSON:{e}")
new_file = log_path.split("/")[-1]
if new_file in save_dict.keys():
old_size = save_dict.get(new_file).get('size', 0)
line_index = save_dict.get('index', 0)
else:
# print("此为新文件,从头开始读取")
old_size = 0
line_index = 0
is_new_file = False
try:
new_size: int = os.path.getsize(file_path)
except Exception as e:
print("读取文件大小失败:", e)
new_size = 0
if (new_file not in save_dict.keys()) or (new_file in save_dict.keys() and (new_size < old_size or old_size == 0)):
is_new_file = True
content = []
is_new_file_only_one = is_old_file_only_one = is_just_info_only_one = False
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as log_file:
for line_number, line in enumerate(log_file, 1):
if search_keyword in line:
if is_new_file:
if not is_new_file_only_one:
print("是新文件,从头开始读取")
is_new_file_only_one = True
content.append((line_number, line.split(search_keyword)[-1]))
line_index = line_number
else:
if line_number >= line_index:
if not is_old_file_only_one:
print("是旧文件,从上次读取位置继续读取: {}".format(line_index))
is_old_file_only_one = True
content.append((line_number, line.split(search_keyword)[-1]))
line_index = line_number
except Exception as e:
print(f"读取Hive日志文件失败:{e}")
return content, new_size, line_index, new_file
def parse_vertice(vertices):
"""
解析顶点数据并返回顶点字典
参数:
vertices(list): 顶点数据列表
返回值:
vertex_dict(dict): 顶点字典,键为顶点ID,值为元组,包含数据库名、表名和列名(如果顶点类型为列)
"""
vertex_dict = {}
for vertex in vertices:
vertex_id = vertex.get("id", "")
vertex_type = vertex.get("vertexType", "")
vertex_names = vertex.get("vertexId", "").split(".")
if len(vertex_names) >= 3:
db_name = vertex_names[0]
tb_name = vertex_names[1]
col_name = vertex_names[-1] if vertex_type == "COLUMN" else ""
if col_name not in partition_field:
vertex_dict.setdefault(vertex_id, {"db": db_name, "tb": tb_name, "col": col_name})
return vertex_dict
def parse_edge(edges):
"""
解析边的函数
参数:
edges (list): 边的列表
返回值:
list: 边元素的列表,每个元素为一个元组,包含源节点列表、目标节点列表和表达式
"""
edge_elem_list = []
for edge in edges:
source_arr = edge.get("sources", [])
target_arr = edge.get("targets", [])
expression = edge.get("expression", "")
edge_type = edge.get("edgeType", "")
edge_elem_list.append({"source": source_arr, "target": target_arr, "exp": expression, "type": edge_type})
return edge_elem_list
def parse_lineage_log(content: list):
column_info_dict = {}
# 去重数据
for (line_number, line) in content:
try:
lineage_dict = json.loads(line)
vertex_dict = parse_vertice(lineage_dict.get('vertices', []))
edge_list = parse_edge(lineage_dict.get('edges', []))
tb, column_info = get_column_depend(vertex_dict, edge_list)
column_info_dict[tb] = column_info
except JSONDecodeError as e:
print("json解析错误: {}".format(line))
print("该行错误位置: {}".format(line_number))
return column_info_dict
if __name__ == '__main__':
print("开始启动....")
log_dict = {}
for log_path in log_path_list:
contents, file_size, index, new_file_name = read_hive_log(log_path)
column_info_dicts = parse_lineage_log(contents)
print("{} 文件执行完".format(log_path))
log_dict.setdefault(log_path.split('/')[-1], dict(size=file_size, index=index, file=new_file_name))
with open("./hash_index.log", 'w') as f:
f.write(json.dumps(log_dict))
print("执行结束...")