大数据比对,shell脚本与hive技术结合

发布于:2024-10-18 ⋅ 阅读:(14) ⋅ 点赞:(0)

需求描述

从主机中获取加密数据内容,解密数据内容(可能会存在json解析)插入到另一个库中,比对原始库和新库的相同表数据的数据一致性内容。

数据一致性比对实现

上亿条数据,如何比对并发现两个表数据差异

相关流程

从其他主机获取大批量数据内容文件(zip格式)–>针对大批量数据文件进行解密、解压输出–>在新库里创建对应的比对表–>将解压、解密的文件内容直接入到hdfs路径上,并刷新分区–>写出比对脚本–>参考多进程跑多个脚本内容输出相关多个日志

相关脚本内容

获取批量数据文件内容

#!/bin/bash
# https://blog.csdn.net/axing2015/article/details/89313460
# SFTP:10.230.105.47/48/49,用户密码线下提供
# 存量数据:	 /data1/etl/csv/省拼音/批次/表名.csv
# 增量数据:	 /data1/etl/stream/省拼音/批次/表名
# 加载工具:	 /data1/etl/tool

# 已经有人帮我下载下来了,所有没必要去下载了

data_home=/data1/etl/csv/省拼音/批次/
sftp_path=/data1/etl/csv/heilongjiang/0/

sftp_ip=xxxx
sftp_user=xxxx
sftp_passwd=xxx
sftp_port=22
file_name=*.csv

mkdir -p ${data_home}

lftp -u ${sftp_user},${sftp_passwd} sftp://${sftp_ip}:${sftp_port} <<!
#关于ftp地址切换的命令 是在本地主机目录操作的命令 把东西下载到指定的本地目录
lcd ${data_home}
cd ${sftp_path}
# 下载多个文件
mget ${file_name}
bye
!

cd ${data_home}
# 创建hive表结构

# 将文件入到hive中去
sh putCsvLoadHive.sh

解密脚本(传输的大文件数据是加密的,需要解密)

#!/bin/bash
# 将传过来的增量文件进行解密
beginTime=$(date +%s)
if [ $# -eq 0 ]; then
    echo "没有传参数进来,请输入时间参数"
    exit
fi
source_path="/data0/e3base/wangsw_a/js_shell/sftp_file/stock_data"
decrypt_path="/data0/e3base/wangsw_a/js_shell/sftp_file/decrypt_data"
password="e3base1"
do_tran_path="/data0/e3base/do_trans/"

cd "$do_tran_path"

for file in "$source_path"/*.des3; do
    if [ -f "$file" ]; then
        filename=$(basename "$file")
        filename_without_ext="${filename%.*}"
        decrypted_file="$decrypt_path/$filename_without_ext"

        ./dzip -pwd "$password" -unzip "$file" "$decrypted_file"

        if [ -f "$decrypted_file" ]; then
            echo "解密成功: $decrypted_file"
        else
            echo "解密失败: $file"
        fi
    fi
done
endTime=$(date +%s)
executionTime=$((endTime - beginTime))
echo "脚本执行时间:$executionTime秒"

创建hive表

#!/bin/bash
beginTime=$(date +%s)
sql="
drop database if exists radmcsdb_restore_ah cascade;
create database radmcsdb_restore_ah;
-- ac_contract_info
create  table radmcsdb_restore_ah.oracle_ac_contract_info_ah(
\`ACCOUNT_LIMIT\` string,
\`ACCOUNT_TYPE\` string,
\`CONTRACTATT_TYPE\` string,
\`CONTRACT_NAME\` string,
\`CONTRACT_NAME_ENCRYPT\` string,
\`CONTRACT_NO\` string,
\`CONTRACT_PASSWD\` string,
\`CUST_ID\` string,
\`FINISH_FLAG\` string,
\`GROUP_ID\` string,
\`LOGIN_ACCEPT\` string,
\`LOGIN_NO\` string,
\`OP_CODE\` string,
\`OP_TIME\` string,
\`PAY_CODE\` string,
\`REPRESENT_PHONE\` string,
\`STATUS_CODE\` string,
\`STATUS_TIME\` string
)
PARTITIONED BY ( \`pt_day_time\` string)
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
    'separatorChar' = ',',   
    'quoteChar' = '\\\"',
    'escapeChar' = '\\\\'
) 
stored as textfile tblproperties("skip.header.line.count"="1");
-- location 'hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/oracle_ac_contract_info_ah';

--- 差异表
create table radmcsdb_restore_ah.ac_contract_info_ah_diff(
\`oracle_ah_ACCOUNT_LIMIT\` string,
\`oracle_ah_ACCOUNT_TYPE\` string,
\`oracle_ah_CONTRACTATT_TYPE\` string,
\`oracle_ah_CONTRACT_NAME\` string,
\`oracle_ah_CONTRACT_NAME_ENCRYPT\` string,
\`oracle_ah_CONTRACT_NO\` string,
\`oracle_ah_CONTRACT_PASSWD\` string,
\`oracle_ah_CUST_ID\` string,
\`oracle_ah_FINISH_FLAG\` string,
\`oracle_ah_GROUP_ID\` string,
\`oracle_ah_LOGIN_ACCEPT\` string,
\`oracle_ah_LOGIN_NO\` string,
\`oracle_ah_OP_CODE\` string,
\`oracle_ah_OP_TIME\` string,
\`oracle_ah_PAY_CODE\` string,
\`oracle_ah_REPRESENT_PHONE\` string,
\`oracle_ah_STATUS_CODE\` string,
\`oracle_ah_STATUS_TIME\` string,
\`oracle_ah_pt_day_time\` string,
\`restore_ah_ACCOUNT_LIMIT\` string,
\`restore_ah_ACCOUNT_TYPE\` string,
\`restore_ah_CONTRACTATT_TYPE\` string,
\`restore_ah_CONTRACT_NAME\` string,
\`restore_ah_CONTRACT_NAME_ENCRYPT\` string,
\`restore_ah_CONTRACT_NO\` string,
\`restore_ah_CONTRACT_PASSWD\` string,
\`restore_ah_CUST_ID\` string,
\`restore_ah_FINISH_FLAG\` string,
\`restore_ah_GROUP_ID\` string,
\`restore_ah_LOGIN_ACCEPT\` string,
\`restore_ah_LOGIN_NO\` string,
\`restore_ah_OP_CODE\` string,
\`restore_ah_OP_TIME\` string,
\`restore_ah_PAY_CODE\` string,
\`restore_ah_REPRESENT_PHONE\` string,
\`restore_ah_STATUS_CODE\` string,
\`restore_ah_STATUS_TIME\` string
)
PARTITIONED BY ( \`pt_day_time\` string)
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
    'separatorChar' = ',',   
    'quoteChar' = '\\\"',
    'escapeChar' = '\\\\'
) 
stored as textfile tblproperties("skip.header.line.count"="1");
-- location 'hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/ac_contract_info_ah_diff';

-- 主键文件表
create table radmcsdb_restore_ah.ac_contract_info_ah_primary(
\`primary_key_contract_no\` string
)
partitioned by ( \`pt_day_time\` string)
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
    'separatorChar' = ',',   
    'quoteChar' = '\\\"',
    'escapeChar' = '\\\\'
) 
stored as textfile tblproperties("skip.header.line.count"="1");
-- location 'hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/ac_contract_info_ah_primary';

..............
"

echo "${sql}!quit" | beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base

echo "结束时间 $endTimeYMD"
endTime=$(date +%s)
endTimeYMD=$(date +%Y%m%d%H%M%S)
echo "结束时间 $endTimeYMD"
echo "还原层表已创建完,请进行总共21个表核对,总共耗时:'$(($endTime - $beginTime))'秒"

解密文件入hive存储地址,并刷新分区

#!/bin/bash
# 将省端数据文件传入hive 并刷新分区。
beginTime=$(date +%s)
if [ $# -eq 0 ]; then
    echo "没有传参数进来,请输入省份参数"
    exit
fi

if [ $# -eq 1 ]; then
    echo "请确认是否输入省份和时间参数"
    exit
fi

# hive_url="beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base"
# hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"
data_home=/data1/etl/csv/$1

for ((i = 0; i < 21; i++)); do
    {
        cd ${data_home}/$i
        sql=""
        ls *.csv | {
            while read t1; do
                # 删除第一行
                # sed -i '1d' $t1
                # 判断是否是数字
                if [[ $t1 == *[0-9]* ]]; then
                    # PD_USERPRC_INFO_00.csv
                    name2=oracle_${t1%_*}_ah
                    # 大写边小写
                    name3=${name2,,}
                else
                    name2=oracle_${t1%.*}_ah
                    name3=${name2,,}
                fi
                # partition_primary=$($hive_url_e "show partitions radmcsdb_restore_ah.$name3;" | sort | tail -n 1)
                # partition_primary=$($hive_url_e "show partitions radmcsdb_restore_ah.oracle_ac_contract_info_ah;" | sort | tail -n 1)
                # hdfs dfs -rm -f hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/$partition_primary
                if [ $i == 0 ]; then
                    hdfs dfs -mkdir hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2
                fi
                hdfs dfs -put $t1 hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2/ && hdfs dfs -mv hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2/$t1 hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2/${i}_${t1}
                # 判断其是否是最后一个文件夹
                if [ $i == 20 ]; then
                    sql="alter table radmcsdb_restore_ah.$name3 add if not exists partition (pt_day_time=$2);$sql"
                fi
            done

            if [ $i == 20 ]; then
                echo "${sql}!quit" | beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base
            fi
        }
    }
done
# wait关键字确保每一个子进程都执行完成
# wait

endTime=$(date +%s)
echo "将数据上传hdfs,总共耗时:'$(($endTime - $beginTime))'秒"

比较数据一致性

#!/bin/bash
# 逻辑:插入并生成差异文件,提取差异文件,并将差异主键进行输出到指定目录。
# diff_table_sh
# 字符串切割 https://blog.csdn.net/bandaoyu/article/details/120659630
# 获取最该目录下最新的文件的数据
# https://blog.csdn.net/sh13661847134/article/details/113757792
# 第一部分获取分区
# 预支前提
beginTime=$(date +%s)
beginTimeYMD=$(date +%Y%m%d%H%M%S)
hive_url="beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base"
hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"

# 第一部分获取分区
join="full outer join"
partition_orcle=$($hive_url_e "show partitions radmcsdb_restore_ah.oracle_ac_account_rel_ah;" | sort | tail -n 1)
count_num=$($hive_url_e "select count(distinct pt_day_time) as count_num from radmcsdb_restore_ah.oracle_ac_account_rel_ah;")
if [ $count_num -gt 1 ]; then
  join="inner join"
fi

# 第二部分执行sql
sql="

insert overwrite table radmcsdb_restore_ah.ac_account_rel_ah_diff partition(pt_day_time)
 select left_table.*,
       right_table.*,
       from_unixtime(unix_timestamp(),'yyyyMMddHHmmss') as pt_day_time
  from (
        select *
          from radmcsdb_restore_ah.oracle_ac_account_rel_ah where $partition_orcle
       )left_table
  $join (
        select *
          from restore_ah.ac_account_rel
       )right_table
    on (left_table.CONTRACT_NO = right_table.CONTRACT_NO
    and left_table.ACCT_REL_TYPE = right_table.ACCT_REL_TYPE
    and left_table.REL_CONTRACT_NO = right_table.REL_CONTRACT_NO
    and left_table.ACCT_ITEMS = right_table.ACCT_ITEMS
    )
  where
COALESCE(left_table.login_accept, '0')  <> COALESCE(right_table.login_accept, '0') or
COALESCE(left_table.contract_no, '0')  <> COALESCE(right_table.contract_no, '0') or
COALESCE(left_table.rel_contract_no, '0')  <> COALESCE(right_table.rel_contract_no, '0') or
COALESCE(left_table.acct_rel_type, '0')  <> COALESCE(right_table.acct_rel_type, '0') or
COALESCE(left_table.acct_items, '0')  <> COALESCE(right_table.acct_items, '0') or
COALESCE(left_table.pay_value, '0')  <> COALESCE(right_table.pay_value, '0') or
COALESCE(left_table.pay_pri, '0')  <> COALESCE(right_table.pay_pri, '0') or
-- COALESCE(left_table.eff_date, '0')  <> COALESCE(right_table.eff_date, '0') or
-- COALESCE(left_table.exp_date, '0')  <> COALESCE(right_table.exp_date, '0') or
COALESCE(left_table.login_no, '0')  <> COALESCE(right_table.login_no, '0') or
-- COALESCE(left_table.op_time, '0')  <> COALESCE(right_table.op_time, '0') or
COALESCE(left_table.remark, '0')  <> COALESCE(right_table.remark, '0') or
COALESCE(left_table.busi_type, '0')  <> COALESCE(right_table.busi_type, '0');


insert overwrite table radmcsdb_restore_ah.ac_account_rel_ah_primary partition(pt_day_time)
select
oracle_ah_CONTRACT_NO,
oracle_ah_ACCT_REL_TYPE,
oracle_ah_REL_CONTRACT_NO,
oracle_ah_ACCT_ITEMS,
from_unixtime(unix_timestamp(),'yyyyMMddHHmmss') as pt_day_time
from (
select 
oracle_ah_CONTRACT_NO,
oracle_ah_ACCT_REL_TYPE,
oracle_ah_REL_CONTRACT_NO,
oracle_ah_ACCT_ITEMS
from radmcsdb_restore_ah.ac_account_rel_ah_diff  
where 
oracle_ah_CONTRACT_NO is not null 
and oracle_ah_ACCT_REL_TYPE is not null 
and oracle_ah_REL_CONTRACT_NO is not null 
and oracle_ah_ACCT_ITEMS is not null 
and pt_day_time=(select max(pt_day_time) from radmcsdb_restore_ah.ac_account_rel_ah_diff) 

union

select 
restore_ah_CONTRACT_NO,
restore_ah_ACCT_REL_TYPE,
restore_ah_REL_CONTRACT_NO,
restore_ah_ACCT_ITEMS
from radmcsdb_restore_ah.ac_account_rel_ah_diff 
where 
restore_ah_SERV_ACCT_ID is not null 
and restore_ah_ACCT_REL_TYPE is not null 
and restore_ah_REL_CONTRACT_NO is not null 
and restore_ah_ACCT_ITEMS is not null 
and pt_day_time=(select max(pt_day_time) from radmcsdb_restore_ah.ac_account_rel_ah_diff)) a;
"

echo "${sql}!quit" | $hive_url

# 第三部分获取主键文件
partition_primary=$($hive_url_e "show partitions radmcsdb_restore_ah.ac_account_rel_ah_primary;" | sort | tail -n 1)
pt_day_time=${partition_primary:12}
$hive_url_e "set hive.cli.print.header=true; select * from radmcsdb_restore_ah.ac_account_rel_ah_primary where $partition_primary;" | grep -v "WARN" >ac_account_rel_ah_primary_$pt_day_time.csv
# 针对主键文件进行传输到指定位置

# 第四部分输出主键数量
count_primary=$($hive_url_e "select count(1) from radmcsdb_restore_ah.ac_account_rel_ah_primary where $partition_primary;")
echo "差异主键数量还剩:$count_primary"

# 第五部分输出比对数据
sql="select left_table.pv,right_table.pv as pv_diff
  from (
        select count(1) as pv
          from radmcsdb_restore_ah.oracle_ac_account_rel_ah
       )left_table
  left outer join (
        select count(1) as pv
          from restore_ah.ac_account_rel
       )right_table
    on 1=1;"
# 将换行转换层空格,方面sql美观度
sql_text="$(echo $sql | tr '\n' ' ')"
count_compare=$($hive_url_e "$sql_text")

# 第六部分输出主键数量
count_primary=$($hive_url_e "select count(1) from radmcsdb_restore_ah.ac_account_rel_ah_primary where $partition_primary;")

# 结束标语
endTime=$(date +%s)
endTimeYMD=$(date +%Y%m%d%H%M%S)
echo "打印开始时间:$beginTimeYMD"
echo "ac_account_rel比较量级自己省端和还原层:$count_compare"
echo "差异主键数量还剩:$count_primary"
echo "打印结束时间:$endTimeYMD"
echo "ac_account_rel_ah_diff 异常表执行完成 ,开始时间:$beginTimeYMD,结束时间:$endTimeYMD,总共耗时:'$(($endTime - $beginTime))'秒"
echo "ac_account_rel_ah_diff 异常表执行完成"

多进程跑脚本输出日志

运行就nohup 这个主脚本即可

#!/bin/bash
if [ $# -eq 0 ]; then
    echo "没有传参数进来,请输入时间参数"
    exit
fi

hive_url="beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base"
hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"

beginTime=$(date +%s)
beginTimeYMD=$(date +%Y%m%d%H%M%S)

mkdir ./finalLog

# 第一张表
partition_orcle=$($hive_url_e "show partitions radmcsdb.oracle_cs_conuserrel_info_hlj;" | sort | tail -n 1)
count_orcle=$($hive_url_e "select count(1) from radmcsdb.oracle_cs_conuserrel_info where $partition_orcle;")

partition_primary=$($hive_url_e "show partitions radmcsdb.cs_conuserrel_info_primary;" | sort | tail -n 1)
count_primary=$($hive_url_e "select count(1) from radmcsdb.cs_conuserrel_info_primary where $partition_primary;")
echo "cs_conuserrel_info 表数量:$count_orcle,差异主键数量还剩:$count_primary"

# 第二十一张表
partition_orcle=$($hive_url_e "show partitions radmcsdb.oracle_ep_organization_hlj;" | sort | tail -n 1)
count_orcle=$($hive_url_e "select count(1) from radmcsdb.oracle_ct_custcontact_info_hlj where $partition_orcle;")

partition_primary=$($hive_url_e "show partitions radmcsdb.ep_organization_hlj_primary;" | sort | tail -n 1)
count_primary=$($hive_url_e "select count(1) from radmcsdb.ep_organization_hlj_primary where $partition_primary;")
echo "ep_organization 数量:$count_orcle,差异主键数量还剩:$count_primary"

endTime=$(date +%s)
endTimeYMD=$(date +%Y%m%d%H%M%S)
echo "结束时间 $endTimeYMD"
echo "多个表执行完成,总共耗时:'$(($endTime - $beginTime))'秒"

可能存在将json 数据解析 重新输出成 csv文件

#!/bin/bash
# 将json 数据 解析成 csv文件
# 遍历该下面的所有文件

# https://blog.csdn.net/qq_36836950/article/details/131063485
# https://blog.csdn.net/weixin_45842494/article/details/123943756
# https://blog.csdn.net/qq_38250124/article/details/86554834
# https://www.cnblogs.com/bymo/p/7601519.html
# https://blog.csdn.net/weixin_44056331/article/details/102411008

# 预支前提
beginTime=$(date +%s)
beginTimeYMD=$(date +%Y%m%d%H%M%S)
hive_url="beeline  -u 'jdbc:hive2://iot-e3base06:11001,iot-e3base07:11001,iot-e3base08:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk'  -n dwdapp"
hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"

# 从文件里面获取所有需要的表名 赋值一个数组
# 根据数组进行循环遍历

# 第一步根据表名获取列名
partition_stock=$($hive_url_e "desc dwdb.ods_soa_mft_bill_d;")
# $0 是指文本中的第一列 print arr[1] 输出第一列中的所有值    for(i in arr) print arr[i] 输出每一列的分割的值
partition_stock1=$(echo "$partition_stock" | awk '{split($0, arr, "|"); print arr[1]}')
# 这里是将列转行 并输出为数组的形式
arr=(${partition_stock1//\\n/})
# 变量拼接值
data_txt=''

# 读取文件的每一行
while read -r line; do
    # jq -r 是英文字符串输出出来会有双引号,-r 可以消除
    # 用jq 插件的对象的方式去获取
    for s in ${arr[@]}; do
        if [[ $s != 'pt_day' ]]; then
            # 字符拼接
            if [[ $data_txt == '' ]]; then
                data_txt="$(echo $line | jq -r ".COLUMNINFO.$s")"
            else
                data_txt="$data_txt,$(echo $line | jq -r ".COLUMNINFO.$s")"
            fi
        else
            # 输出内容到指定路径
            echo $data_txt >>'AC_CONTRACTADD_INFO_JSON.csv'
            break
        fi
    done
    # 需要解析的文件
done <ods_soa_mft_bill_d.csv

其他-scp脚本解密

#/user/bin/expect
# Expect是一个免费的编程工具语言,用来实现自动和交互式任务进行通信,而无需人的干预。
# 首行/usr/bin/expect,声明使用except组件,类似/bin/sh用法
# spawn: spawn + 需要执行的shell命令
# expect: 只有spawn执行的命令结果才会被expect捕捉到,因为spawn会启动一个进程,只有这个进程的相关信息才会被捕捉到,主要包括:标准输入的提示信息,eof和timeout。
# send和send_user:send会将expect脚本中需要的信息发送给spawn启动的那个进程,而send_user只是回显用户发出的信息,类似于shell中的echo而已。
spawn scp -r /data0/e3base/wangsw_a/sftp_file/hlj/ e3base@G030:/data1/hlj/stock_data/
expect "*password:"
send "E3base_12#34\n"
expect eof