DolphinScheduler应用实战笔记
一、前言
DolphinScheduler(后文简称DS)在项目中的应用实战笔记,包含DS执行SQL或存储过程、DS调用DataX同步数据、DS调用HTTP接口。
首先,在DS网页上创建项目:
点击项目名称进入项目
这两个工具栏:工作流定义和任务实例会经常使用
二、DS执行SQL或存储过程
点击操作栏的数据源中心,并创建数据源
选好并提交要执行sql的数据源即可,例如配置Hive。
点击操作栏的项目管理,点击项目进入,选择工作流定义,然后创建工作流:
拖拽出SQL标签,并填写数据源配置
填好节点名称、数据源配置和SQL:
SQL语句也可以是调用存储过程:(部分版本的hive不支持存储过程)
表结构:
CREATE TABLE `dev.test`(
`id` bigint DEFAULT NULL COMMENT 'ID',
`tradedate` timestamp DEFAULT NULL COMMENT '日期',
`infosource` string DEFAULT NULL COMMENT '信息来源'
)
COMMENT '测试表'
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'serialization.format'='1')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://nameservice1/quark1/user/hive/warehouse/dev.db/test'
TBLPROPERTIES (
'timelyre.replaced.count.col'='ID',
'timelyre.timestamp.col'='tradeDate',
'timelyre.tag.cols'='id,InfoSource')
存储过程语句:
CREATE
OR REPLACE PROCEDURE dev.sp_test(IN_DATA_DATE IN INT)
AS V_EXE_BEGIN_TIME TIMESTAMP;
--程序开始时间
V_EXE_END_TIME TIMESTAMP;
--程序结束时间
BEGIN --记录程序开始执行时间
V_EXE_BEGIN_TIME:=SYSTIMESTAMP;
EXECUTE IMMEDIATE 'truncate table dev.ai_exce_rate';
INSERT INTO TABLE dev.test (id, tradedate, infosource) SELECT client_id, last_modify, key FROM dev.ads_acc_stock_trade_detail LIMIT 10;
--记录程序结束时间
V_EXE_END_TIME:=SYSTIMESTAMP;
--打印执行日志
DBMS_OUTPUT.PUT_LINE(
'过程: sp_test ,业务日期:'||IN_DATA_DATE||',执行完成。开始时间:'||V_EXE_BEGIN_TIME||',结束时间:'||V_EXE_END_TIME
);
END
调用存储过程
CALL dev.sp_test(${IN_TRADE_DATE})
SQL不是select语句,就选择非查询,然后点击保存。
在页面再次点击保存:
租户即是刚刚创建的项目,全局变量就是整个任务都有效。
保存后,点击上线按钮,上线任务并执行:
运行起来后可以去任务实例中,查询任务执行情况:
二、DS调用DataX同步数据
在操作栏点击资源中心,并新建datax的json
json样例:
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/inceptor1/user/hive/warehouse/zx91.db/swscdc/ods_zx91_qt_tradingdaynew_all_day/dc_trade_date=${inTradeDate}",
"defaultFS": "hdfs://nameservice1",
"hadoopConfig": {
"dfs.nameservices": "nameservice1",
"dfs.ha.namenodes.nameservice1": "nn1,nn2",
"dfs.namenode.rpc-address.nameservice1.nn1": "${srcNameService1}",
"dfs.namenode.rpc-address.nameservice1.nn2": "${srcNameService2}",
"dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"fileType": "orc",
"fieldDelimiter": "\t",
"column": [
{
"index": "0",
"type": "decimal"
},
{
"index": "1",
"type": "timestamp"
},
{
"index": "2",
"type": "decimal"
},
{
"index": "3",
"type": "decimal"
},
{
"index": "4",
"type": "decimal"
},
{
"index": "5",
"type": "decimal"
},
{
"index": "6",
"type": "decimal"
},
{
"index": "7",
"type": "decimal"
},
{
"index": "8",
"type": "timestamp"
},
{
"index": "9",
"type": "decimal"
},
{
"index": "10",
"type": "string"
},
{
"index": "11",
"type": "int"
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://nameservice1",
"hadoopConfig": {
"dfs.nameservices": "nameservice1",
"dfs.ha.namenodes.nameservice1": "nn1,nn2",
"dfs.namenode.rpc-address.nameservice1.nn1": "timelyre02:8020",
"dfs.namenode.rpc-address.nameservice1.nn2": "timelyre03:8020",
"dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"fileType": "orc",
"path": "/quark1/user/hive/warehouse/dev.db/ylf/qt_tradingdaynew/dc_trade_date=${inTradeDate}",
"fileName": "qt_tradingdaynew",
"writeMode": "nonConflict",
"fieldDelimiter": "\t",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "tradingdate",
"type": "timestamp"
},
{
"name": "iftradingday",
"type": "int"
},
{
"name": "secumarket",
"type": "int"
},
{
"name": "ifweekend",
"type": "int"
},
{
"name": "ifmonthend",
"type": "int"
},
{
"name": "ifquarterend",
"type": "int"
},
{
"name": "ifyearend",
"type": "int"
},
{
"name": "xgrq",
"type": "timestamp"
},
{
"name": "jsid",
"type": "bigint"
},
{
"name": "dc_etl_time",
"type": "string"
},
{
"name": "dc_trade_date",
"type": "bigint"
}
]
}
}
}
]
}
}
注意读和写的字段顺序要一样。
再编写shell文件,来加载datax的json文件,datax_push_tdh.sh:
#!/bin/bash
# ********************************************************************************
# * Filename : datax_push_tdh.sh
# * Author : dcx
# * Version : 1.0.0
# * Created Date : 20240827
# * Description : DataX 推送TDH集群任务执行器
# * History :
# * <author> <version> <modified Date> <description>
# * dcx 1.0.0 2024-08-27 基于datax_push.sh修改,用于推送时序数据库(timelyre)
# *
# ********************************************************************************
# Module 1: 设置环境变量
export HADOOP_USER_NAME=swscdc
## dolphinscheduler 生产集群 TDH 客户端
source /usr/local/bigdata_client/TDH-oc-Client/init.sh
## dolphinscheduler 开发集群 TDH 客户端
# source /usr/local/bigdata_client/TDH-Client/init.sh
# Module 2: 公共函数
## 定义帮助文档
function ShowUsage() {
cat <<EOF
USAGE: $0 [OPTIONS]
OPTIONS:
-h --help 查看帮助文档.
-d --dc-trade-date [*]业务日期,格式为YYYYMMDD.
-j --json-file-name [*]Datax Json 配置文件名.
--src-name-service 源 HDFS HA NameService,格式为"TDHHOST1:PORT;TDHHOST2:PORT".
--src-jdbc 源数据库连接信息,格式为jdbc连接串.
--src-username 源数据库用户名.
--src-password 源数据库用户密码.
--tar-name-service 目标 HDFS HA NameService,格式为"TDHHOST1:PORT;TDHHOST2:PORT".
--tar-jdbc 目标数据库连接信息,格式为jdbc连接串.
--tar-username 目标数据库用户名.
--tar-password 目标数据库用户密码.
--tar-tablename 目标数据库表名(推送TDH集群进行日分区处理).
--push-days 数据推送周期,格式为"days,0/1",days:天数(正整数>=1与负整数<=-1),0:自然日,1:交易日,即推送n个自然日或交易日的数据.
--jvm-buffer JVM堆参数,即为防止jvm报内存溢出,根据实际情况调整jvm的堆参数,建议将内存设置为4G或者8G.
--inc-value 非时间戳增量标识字段值.
--dependence 当前依赖项,格式为"table_name1;table_name2;table_name3".
--tar-es-url 推送ES的URL信息.
--custom-partition 自定义分区值,即非日分区传入.
--nonpath-err HDFS路径不存在时强制成功,该参数表示中止执行并不提示错误,否则默认将置为报错状态.
EOF
}
## 定义日志记录函数
function Logger() {
echo -e "$(date +%Y/%m/%d\ %H:%M:%S)" : "${0##.*/}" "[${1}]"- "${2}"
}
# Module 3: 获取传入参数与参数值校验
## 使用 getopt 获取参数
OPTINOS=$(getopt --option hd:j: --long help,dc-trade-date:,json-file-name:,src-name-service:,src-jdbc:,src-username:,src-password:,tar-name-service:,tar-jdbc:,tar-username:,tar-password:,tar-tablename:,push-days:,jvm-buffer:,inc-value:,dependence:,tar-es-url:,custom-partition:,nonpath-err -n "$0" -- "$@")
VALID_OPTINOS=$?
[[ "${VALID_OPTINOS}" -ne "0" ]] && {
echo "Try '$0 -h or --help' for more information."
exit 10
}
## 使用 set 将 ${OPTINOS} 设置为位置参数
eval set -- "${OPTINOS}"
## 使用 shift 和 while 循环解析位置参数
while true; do
case "$1" in
-h | --help)
ShowUsage
exit 0
;;
-d | --dc-trade-date)
Logger "INFO" "业务日期: $2"
DC_TRADE_DATE=$2
shift 2
;;
-j | --json-file-name)
Logger "INFO" "Datax Json 配置文件名: $2"
JSON_FILE_NAME=$2
shift 2
;;
--src-name-service)
Logger "INFO" "源 HDFS HA NameService: $2"
SRC_NAME_SERVICE=$2
shift 2
;;
--src-jdbc)
Logger "INFO" "源数据库连接信息: $2"
SRC_JDBC=$2
shift 2
;;
--src-username)
Logger "INFO" "源数据库用户名: $2"
SRC_USERNAME=$2
shift 2
;;
--src-password)
Logger "INFO" "源数据库用户密码: $2"
SRC_PASSWORD=$2
shift 2
;;
--tar-name-service)
Logger "INFO" "目标 HDFS HA NameService: $2"
TAR_NAME_SERVICE=$2
shift 2
;;
--tar-jdbc)
Logger "INFO" "目标数据库连接信息: $2"
TAR_JDBC=$2
shift 2
;;
--tar-username)
Logger "INFO" "目标数据库用户名: $2"
TAR_USERNAME=$2
shift 2
;;
--tar-password)
Logger "INFO" "目标数据库用户密码: $2"
TAR_PASSWORD=$2
shift 2
;;
--tar-tablename)
Logger "INFO" "目标数据库表名(推送TDH集群进行日分区处理): $2"
TAR_TABLE_NAME=$2
shift 2
;;
--push-days)
Logger "INFO" "数据推送周期: $2"
PUSH_DAYS=$2
shift 2
;;
--jvm-buffer)
Logger "INFO" "JVM堆参数: $2"
JVM_BUFFER=$2
shift 2
;;
--inc-value)
Logger "INFO" "非时间戳增量标识字段值: $2"
INC_VALUE=$2
shift 2
;;
--dependence)
Logger "INFO" "当前依赖项: $2"
DEPENDENCE=$2
shift 2
;;
--tar-es-url)
Logger "INFO" "推送ES的URL信息: $2"
TAR_ES_URL=$2
shift 2
;;
--custom-partition)
Logger "INFO" "自定义分区值: $2"
CUSTOM_PARTITION=$2
shift 2
;;
--nonpath-err)
Logger "INFO" "HDFS路径不存在时强制成功: True"
NONPATH_ERR="True"
shift 1
;;
--)
shift
break
;;
*)
Logger "WARNING" "无效选项: $1"
ShowUsage
exit 10
;;
esac
done
## 校验参数值
if [[ -z "${DC_TRADE_DATE}" ]]; then
Logger "ERROR" "[-d | --dc-trade-date] : 业务日期不能为空!"
ShowUsage
exit 10
elif ! [[ "$(expr "${DC_TRADE_DATE}" : "[1-3][0-9][0-9][0-9][0-1][0-9][0-3][0-9]")" = "8" ]]; then
Logger "ERROR" "[-d | --dc-trade-date] : 业务日期格式错误!"
ShowUsage
exit 10
elif [[ -z "${JSON_FILE_NAME}" ]]; then
Logger "ERROR" "[-j | --json-file-name] : DATAX JSON 配置文件名不能为空!"
ShowUsage
exit 10
elif [[ -n "${PUSH_DAYS}" && ! "${PUSH_DAYS}" =~ ^(-)?[1-9]+,(0|1)$ ]]; then
Logger "ERROR" "[--push-days] : 数据推送周期格式错误!"
ShowUsage
exit 10
fi
# Module 4: 定义全局变量
# 日志表
LOG_TABLE_NAME="dwd:dwd_dc_etl_exec_logs"
## 记录开始时间
ETL_START_TIME="$(date "+%Y-%m-%d %H:%M:%S")"
## 日志表 任务名称
PROC_NAME=${JSON_FILE_NAME#*.}
## 日志表 任务类型,1:存储过程,2:数据推送,3:数据推送
PROC_TYPE=3
## 日志表 执行表名或者工作流名
TABLE_NAME=${JSON_FILE_NAME#*.}
## 日志表 任务执行结果标志位.1:成功,2:失败
EXEC_FLAG=1
# 是否写日志标志,0-不写入,非0-写入
IS_WRITE_LOG=1
# Inceptor 生产集群连接信息
INCEPTOR_JDBC="jdbc:inceptor2://10.151.6.28:10000/dev"
INCEPTOR_USERNAME="swscdc"
INCEPTOR_PASSWORD=$(echo "U3dzYzYwMDM2OQo=" | base64 -d)
# 设置全局DataX执行参数
PARAMS="-DparaReaderJdbc=${SRC_JDBC} -DsrcUsername=${SRC_USERNAME} -DsrcPassword=${SRC_PASSWORD} -DcustomPartition=${CUSTOM_PARTITION} -DtarEsUrl=${TAR_ES_URL} -DparaWriterJdbc=${TAR_JDBC} -DtarUsername=${TAR_USERNAME} -DtarPassword=${TAR_PASSWORD} -DincValue='${INC_VALUE}'"
# Module 5: 业务处理函数
## 获取 json 文件引入资源函数
function GetJsonFile() {
for file in "$1"/*; do
if test -f "$file"; then
if [[ "${file##*/}" == "${JSON_FILE_NAME#*.}.json" ]]; then
JSON_FILE="${file}"
fi
else
GetJsonFile "${file}"
fi
done
}
## 分区处理函数
function PartitionDel() {
Logger "INFO" "[PartitionDel] - 开始增删分区 ..."
Logger "INFO" "[PartitionDel] - TABLE : [${TAR_TABLE_NAME}], PARTITION : [${PARAM_TRADE_DATE}]"
local alter_sql="ALTER TABLE ${TAR_TABLE_NAME} DROP PARTITION (dc_trade_date=${PARAM_TRADE_DATE});ALTER TABLE ${TAR_TABLE_NAME} ADD PARTITION (dc_trade_date=${PARAM_TRADE_DATE});"
Logger "INFO" "[PartitionDel] - SQL : [${alter_sql}]."
local part_del_times=1
while true; do
beeline -u "${TAR_JDBC}" -n ${TAR_USERNAME} -p ${TAR_PASSWORD} -e "${alter_sql}"
local beeline_result=$?
if [[ "${beeline_result}" = "0" ]]; then
Logger "INFO" "[PartitionDel] - 增删分区完成."
break
else
Logger "INFO" "[PartitionDel] - 增删分区失败,将进行失败重做."
if [[ ${part_del_times} -ge 10 ]]; then
Logger "ERROR" "[PartitionDel] - 失败重做次数已超过限制[>=10],退出执行."
exit 10
else
Logger "INFO" "[PartitionDel] - 失败重做次数: [${part_del_times}], 失败重做间隔: [30s]"
sleep 30s
part_del_times=$((part_del_times + 1))
fi
fi
done
}
## 调用 datax 执行函数
function PushData() {
Logger "INFO" "[PushData] - 开始数据推送 ..."
# 设置DataX执行参数
local now_params
# 判断源数据库类型,inceptor或者关系型数据库
if [[ -n "${SRC_NAME_SERVICE}" ]]; then
## 推送前检查HDFS路径是否存在
PathIsExist
if [[ ${NOW_STATUS} == "False" ]]; then
return
fi
## HDFS HA NameService 参数值分割处理
local src_name_service1 src_name_service2
src_name_service1=$(echo "${SRC_NAME_SERVICE}" | awk -F ";" '{print $1}')
src_name_service2=$(echo "${SRC_NAME_SERVICE}" | awk -F ";" '{print $2}')
now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS} -DsrcNameService1=${src_name_service1} -DsrcNameService2=${src_name_service2}"
else
now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS}"
fi
## 判断目标是否为TDH集群
if [[ -n "${TAR_NAME_SERVICE}" ]]; then
## HDFS HA NameService 参数值分割处理
local tar_name_service1 tar_name_service2
tar_name_service1=$(echo "${TAR_NAME_SERVICE}" | awk -F ";" '{print $1}')
tar_name_service2=$(echo "${TAR_NAME_SERVICE}" | awk -F ";" '{print $2}')
now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS} -DtarNameService1=${tar_name_service1} -DtarNameService2=${tar_name_service2}"
fi
# 根据是否设置了JVM_BUFFER参数来决定是否需要修改JVM堆参数
if [[ -n "${JVM_BUFFER}" ]]; then
Logger "INFO" "[PushData] - JVM堆参数 : ${JVM_BUFFER}G"
JVM="-Xms${JVM_BUFFER}G -Xmx${JVM_BUFFER}G"
else
# JVM堆参数默认为2G
JVM="-Xms2G -Xmx2G"
fi
# 执行DataX任务
Logger "INFO" "[PushData] - DataX执行参数: ${now_params}"
python /usr/local/datax/bin/datax.py --jvm="${JVM}" "${JSON_FILE}" -p"${now_params}"
local datax_result=$?
if [[ "${datax_result}" == 0 ]]; then
Logger "INFO" "[PushData] - 数据推送完成."
else
Logger "ERROR" "[PushData] - 数据推送失败."
exit 10
fi
}
## 写入 Hyperbase 日志函数
function WriteEtlLogToHyperbase() {
local key=${JSON_FILE_NAME#*.}-${PARAM_TRADE_DATE}
local data_date=${PARAM_TRADE_DATE}
local etl_end_time
etl_end_time="$(date "+%Y-%m-%d %H:%M:%S")"
local write_log_times=1
Logger "INFO" "[WriteEtlLogToHyperbase] - 写入推送完成日志信息 ..."
Logger "INFO" "[WriteEtlLogToHyperbase] - TABLE : [dwd.dwd_dc_etl_exec_logs]"
Logger "INFO" "[WriteEtlLogToHyperbase] - Hyperbase写入值: {key : ${key}, proc_name : ${PROC_NAME}, proc_type : ${PROC_TYPE}, table_name : ${TABLE_NAME}, data_date : ${data_date}, etl_start_time : ${data_date}, etl_end_time : ${etl_end_time}, exec_flag : ${EXEC_FLAG}}"
while true; do
echo "hput '${LOG_TABLE_NAME}','${key}','f:q1','${PROC_NAME}'
hput '${LOG_TABLE_NAME}','${key}','f:q2','${PROC_TYPE}'
hput '${LOG_TABLE_NAME}','${key}','f:q3','${TABLE_NAME}'
hput '${LOG_TABLE_NAME}','${key}','f:q4','${data_date}'
hput '${LOG_TABLE_NAME}','${key}','f:q5','${ETL_START_TIME}'
hput '${LOG_TABLE_NAME}','${key}','f:q6','${etl_end_time}'
hput '${LOG_TABLE_NAME}','${key}','f:q7','${EXEC_FLAG}'" | hbase shell -n >/dev/null 2>&1
local write_result=$?
if [[ "${write_result}" == "0" ]]; then
Logger "INFO" "[WriteEtlLogToHyperbase] - 日志信息写入完成."
break
else
Logger "INFO" "[WriteEtlLogToHyperbase] - 日志信息写入失败,将进行失败重做."
if [[ ${write_log_times} -ge 3 ]]; then
Logger "ERROR" "[WriteEtlLogToHyperbase] - 失败重做次数已超过限制[>=3],退出执行."
exit 10
else
Logger "INFO" "[WriteEtlLogToHyperbase] - 失败重做次数: [${write_log_times}], 失败重做间隔: [10s]"
sleep 10s
write_log_times=$((write_log_times + 1))
fi
fi
done
}
## 检查当前依赖项是否完成函数
function CheckSourceByHbaseShell() {
Logger "INFO" "[CheckSourceByHbaseShell] - 检查当前依赖项是否完成 ..."
# 设置分隔符为分号
IFS=';'
# 将字符串转换为数组
read -ra arr <<< "${DEPENDENCE}"
# 将IFS变量重置为默认值
IFS=$' \t\n'
# 循环读取数组
for elem in "${arr[@]}"; do
local key="${elem}-${PARAM_TRADE_DATE}"
Logger "INFO" "[CheckSourceByHbaseShell] - 依赖项: ${elem}, key: ${key}"
while true; do
local hbase_result check_result
hbase_result=$(echo "hget '${LOG_TABLE_NAME}','${key}'" | hbase shell -n 2>&1)
check_result=$(echo "${hbase_result}"| awk '/row\(s\)/{print $0}' | awk -F " " '{print $1}' | sed ':a;N;$!ba;s/\n//g' | sed s/[[:space:]]//g)
if [[ -n "${check_result}" && "${check_result}" != "0" ]]; then
Logger "INFO" "[CheckSourceByHbaseShell] - 检查到依赖项已完成."
break
else
Logger "INFO" "[CheckSourceByHbaseShell] - 检查到依赖项未完成,将等待1m后再次检查."
sleep 1m
fi
done
done
}
## 获取自然日或交易日函数
function GetNatureOrTradeDays() {
local is_trade_date
DAYS=$(echo "${PUSH_DAYS}" | awk -F "," '{print $1}')
is_trade_date=$(echo "${PUSH_DAYS}" | awk -F "," '{print $2}')
local sub_query_sql
local days_abs
if [[ "${is_trade_date}" == "0" ]]; then
DAYS_FLAG="自然日"
if [[ "${DAYS}" -gt 0 ]]; then
sub_query_sql="AND T.DATE >= ${DC_TRADE_DATE} ORDER BY T.DATE LIMIT ${DAYS};"
else
days_abs=$((0 - "${DAYS}"))
sub_query_sql="AND T.DATE <= ${DC_TRADE_DATE} ORDER BY T.DATE DESC LIMIT ${days_abs};"
fi
else
DAYS_FLAG="交易日"
if [[ "${DAYS}" -gt 0 ]]; then
sub_query_sql="AND T.IS_TRADE_DATE = 1 AND T.DATE >= ${DC_TRADE_DATE} ORDER BY T.DATE LIMIT ${DAYS};"
else
days_abs=$((0 - "${DAYS}"))
sub_query_sql="AND T.IS_TRADE_DATE = 1 AND T.DATE <= ${DC_TRADE_DATE} ORDER BY T.DATE DESC LIMIT ${days_abs};"
fi
fi
# 初始化空数组
DAYS_ARR=()
local query_sql="SELECT T.DATE FROM DIM.DIM_TRADE_DATE_ADD_YEAR T WHERE T.MKT_CODE = '1' ${sub_query_sql}"
local query_result trade_days
query_result=$(beeline -u "${INCEPTOR_JDBC}" -n "${INCEPTOR_USERNAME}" -p "${INCEPTOR_PASSWORD}" -e "${query_sql}" --showHeader=false --silent=true)
trade_days=$(echo "${query_result}" | awk -F "|" '{print $2}' | sed ':a;N;$!ba;s/\n/;/g' | sed s/[[:space:]]//g | grep -oP '\d{8}' | paste -sd ' ' -)
# 设置分隔符为分号
IFS=' '
# 将字符串转换为数组
read -ra DAYS_ARR <<< "${trade_days}"
# 将IFS变量重置为默认值
IFS=$' \t\n'
}
## HDFS路径检查函数
function PathIsExist() {
Logger "INFO" "[PathIsExist] - 开始检查HDFS路径是否存在 ..."
local hdfs_path
hdfs_path=$(sed -n '/.*"path": "\(.*\)",/{s//\1/p;q}' "${JSON_FILE}" | sed "s/\${inTradeDate}/${PARAM_TRADE_DATE}/g" | sed "s/\${customPartition}/${CUSTOM_PARTITION}/g")
hdfs_path=$(echo -e "${hdfs_path}" | sed -e 's/\n//g' -e 's/\r//g')
Logger "INFO" "[PathIsExist] - fs.defaultFS path : ${hdfs_path}"
Logger "INFO" "[PathIsExist] - 路径检查命令 : hadoop fs -test -e ${hdfs_path} >/dev/null 2>&1"
hadoop fs -test -e "${hdfs_path}" >/dev/null 2>&1
local test_result=$?
if [[ "${test_result}" == 0 ]]; then
Logger "INFO" "[PathIsExist] - 已检查到配置项fs.defaultFS,path存在."
Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
NOW_STATUS="True"
else
if [[ "${NONPATH_ERR}" == "True" ]]; then
Logger "INFO" "[PathIsExist] - 已检查到配置项fs.defaultFS,path不存在,则默认本次无任何数据进行推送,中止执行,任务状态将置为成功!"
Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
Logger "INFO" "[PushData] - 数据推送完成."
NOW_STATUS="False"
else
Logger "ERROR" "[PathIsExist] - 无法读取路径${hdfs_path}下的所有文件,请确认您的配置项fs.defaultFS,path的值是否正确,是否有读写权限,网络是否已断开!"
Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
Logger "ERROR" "[PushData] - 数据推送失败."
exit 10
fi
fi
}
## 构建 main 函数
function Main() {
GetJsonFile .
## 判断 json 文件是否是否存在且是否为空文件
if [[ ! -s ${JSON_FILE} ]]; then
Logger "INFO" "[Main] - 配置文件 : ${JSON_FILE_NAME#*.}.json"
Logger "ERROR" "[Main] - 配置文件不存或者为空文件,请检查是否引入资源或者引入错误资源."
exit 10
fi
## 判断是否需要推送某个时间段内的数据
if [[ -z "${PUSH_DAYS}" ]]; then
## 设置后续函数的中PARAM_TRADE_DATE值为DC_TRADE_DATE
PARAM_TRADE_DATE="${DC_TRADE_DATE}"
# 判断是否需要进行当前依赖项检查
if [[ -n "${DEPENDENCE}" ]]; then
CheckSourceByHbaseShell
fi
## 判断是否需要推送TDH集群进行日分区处理
if [[ -n "${TAR_TABLE_NAME}" ]]; then
## 增删分区
PartitionDel
fi
## 推送数据
PushData
# 写入推送成功日志
if [[ ${IS_WRITE_LOG} -gt 0 ]]; then
## 向生产环境日志表写入日志,因为旧集群未启用HBase,所以暂时不写日志.
WriteEtlLogToHyperbase
fi
else
## 进行循环推数
GetNatureOrTradeDays
Logger "INFO" "[Main] - 推送近${DAYS}个${DAYS_FLAG}内的数据 ..."
Logger "INFO" "[Main] - ${DAYS_FLAG} : ${DAYS_ARR[*]}"
for idx in "${!DAYS_ARR[@]}"; do
PARAM_TRADE_DATE="${DAYS_ARR[idx]}"
Logger "INFO" "[Main] - [${idx}] - 业务日期 : ${DC_TRADE_DATE}, 数据日期 : ${PARAM_TRADE_DATE}"
# 判断是否需要进行当前依赖项检查
if [[ -n "${DEPENDENCE}" ]]; then
CheckSourceByHbaseShell
fi
## 判断是否需要推送TDH集群进行日分区处理
if [[ -n "${TAR_TABLE_NAME}" ]]; then
## 增删分区
PartitionDel
fi
## 推送数据
PushData
## 写入推送成功日志
if [[ ${IS_WRITE_LOG} -gt 0 ]]; then
## 向生产环境日志表写入日志,因为旧集群未启用HBase,所以暂时不写日志.
WriteEtlLogToHyperbase
fi
done
fi
}
# Module 6: 执行 main 函数
Main
这个就仅供参考~
然后就新建工作流定义,选择shell任务节点,需要填好名称、执行脚本和引入json和shell脚本。
执行脚本:
sh tag_platform/shell/datax_push_tdh.sh -d ${IN_TRADE_DATE} -j ${JSON_FILE_NAME} --src-name-service ${SRC_NAME_SERVICE} --tar-jdbc ${TAR_JDBC} --tar-username ${TAR_USERNAME} --tar-password ${TAR_PASSWORD} --tar-tablename ${TAR_TABLENAME}
然后上线任务并执行即可。
三、DS调用HTTP接口
选择HTTP任务节点
参数说明:
节点名称:一个工作流定义中的节点名称是唯一的。
运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
描述信息:描述该节点的功能。
任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
失败重试次数:任务失败重新提交的次数,支持下拉和手填。
失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
请求地址:http 请求 URL。
请求类型:支持 GET、POSt、HEAD、PUT、DELETE。
请求参数:支持 Parameter、Body、Headers。
校验条件:支持默认响应码、自定义响应码、内容包含、内容不包含。
校验内容:当校验条件选择自定义响应码、内容包含、内容不包含时,需填写校验内容。
自定义参数:是 http 局部的用户自定义参数,会替换脚本中以${变量}的内容。
四、DS依赖(DEPENDENT)节点
运行说明:依赖节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。
例如,A 流程为周报任务,B、C 流程为天任务,A 任务需要 B、C 任务在上周的每一天都执行成功,如图示:
假如,周报 A 同时还需要自身在上周二执行成功:
五、DS SPARK 节点
执行说明:通过 SPARK 节点,可以直接直接执行 SPARK 程序,对于 spark 节点,worker 会使用 spark-submit 方式提交任务 参数说明:
程序类型:支持 JAVA、Scala 和 Python 三种语言
主函数的 class:是 Spark 程序的入口 Main Class 的全路径
主 jar 包:是 Spark 的 jar 包
部署方式:支持 yarn-cluster、yarn-client、和 local 三种模式
Driver:设置 Driver 内核数 及 内存数
Executor:设置 Executor 数量、Executor 内存数、Executor 内核数
命令行参数:是设置 Spark 程序的输入参数,支持自定义参数变量的替换。
其他参数:支持 --jars、--files、--archives、--conf 格式
资源:如果其他参数中引用了资源文件,需要在资源中选择指定
自定义参数:是 MR 局部的用户自定义参数,会替换脚本中以${变量}的内容
注意:JAVA 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Spark 则没有主函数的 class ,其他都是一样
六、DS Flink 节点
参数说明:
程序类型:支持 JAVA、Scala和 Python 三种语言
主函数的 class:是 Flink 程序的入口 Main Class 的全路径
主 jar 包:是 Flink 的 jar 包
部署方式:支持 cluster、local 三种模式
slot 数量:可以设置slot数
taskManage 数量:可以设置 taskManage 数
jobManager 内存数:可以设置 jobManager 内存数
taskManager 内存数:可以设置 taskManager 内存数
命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。
其他参数:支持 --jars、--files、--archives、--conf 格式
资源:如果其他参数中引用了资源文件,需要在资源中选择指定
自定义参数:是 Flink 局部的用户自定义参数,会替换脚本中以${变量}的内容
注意:JAVA 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的class,其他都是一样
七、DS Flink 节点
例如:
项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
Task 1:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_hive_01
... ...
数据源:Hive test_hiveserver2
sql类型:查询 表格:√ 附件:√
主题:Test Hive
收件人:tourist@sohh.cn
sql语句(结尾不要加分号):
select * from test_table where score=${i}
自定义参数:
i -> IN -> INTEGER -> 97
前置sql:
INSERT INTO test_table values(null, 'Dog',97)
后置sql:
-> 确认添加
Task 2:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_hive_02
... ...
数据源:Hive test_hiveserver2_ha
sql类型:非查询
sql语句(结尾不要加分号):
create table test_table2 as select * from test_table
自定义参数:
前置sql:
后置sql:
-> 确认添加
------------------------------------------------------
串联任务节点 Test_sql_hive_01、 Test_sql_hive_02
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_sql_hive
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行
八、DS SQL 节点
参数说明:
数据源:选择对应的数据源
sql 类型:支持查询和非查询两种,查询是 select 类型的查询,是有结果集返回的,可以指定邮件通知为 表格、附件 或 表格与附件 三种模板。非查询是没有结果集返回的,是针对 update、delete、insert 三种类型的操作
主题、收件人、抄送人:邮件相关配置
sql 参数:输入参数格式为 key1=value1;key2=value2…
sql 语句:SQL 语句
UDF 函数:对于 HIVE 类型的数据源,可以引用资源中心中创建的 UDF 函数,其他类型的数据源暂不支持 UDF 函数
自定义参数:SQL 任务类型自定义参数会替换 sql 语句中 ${变量}。而存储过程是通过自定义参数给方法参数设置值,自定义参数类型和数据类型同存储过程任务类型一样。
前置 sql:执行 “sql语句” 前的操作
后置 sql:执行 “sql语句” 后的操作
例如MySQL:
项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
Task 1:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_mysql_01
... ...
数据源:MYSQL test01_mysql
sql类型:查询 表格:√ 附件:√
主题:Test MySQL
收件人:tourist@sohh.cn
sql语句:
select * from test_table where score=${i};
自定义参数:
i -> IN -> INTEGER -> 97
前置sql:
INSERT INTO test_table values(null, 'Dog',97)
后置sql:
-> 确认添加
Task 2:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_mysql_02
... ...
数据源:MYSQL test01_mysql
sql类型:非查询
sql语句:
create table test_table2 as select * from test_table;
自定义参数:
前置 sql:
后置 sql:
-> 确认添加
------------------------------------------------------
串联任务节点 Test_sql_mysql_01、Test_sql_mysql_02
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_sql_mysql
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行
九、DS Java程序
Java 程序参数说明:
程序类型:JAVA
主函数的 class:是 MR 程序的入口 Main Class 的全路径
主jar包:是 MR 的 jar 包
命令行参数:是设置 MR 程序的输入参数,支持自定义参数变量的替换
其他参数:支持 –D、-files、-libjars、-archives格式
资源:如果其他参数中引用了资源文件,需要在资源中选择指定
自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容
例如:
# 将 MR 的示例 jar 包上传到 资源中心;并创建测试文本上传到 HDFS 目录
# CDH 版本 Jar 包位置:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
拖拽 MR 节点到画布,新增一个 MR 任务
节点名称:Test_mr_java_01
... ...
程序类型:JAVA
主函数的class:wordcount
主jar包:hadoop-mapreduce-examples.jar
命令行参数:/tmp/test.txt /tmp/output
其他参数:
资源:
自定义参数:
-> 确认添加
------------------------------------------------------
保存 ->
设置DAG图名称:Test_mr_java
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行(运行MR的权限问题此处不再描述)
------------------------------------------------------
查看结果:
sudo -u hdfs hadoop fs -cat /tmp/output/*
十、DS Python节点
运行说明:使用 python 节点,可以直接执行 python 脚本,对于 python 节点,worker会使用 python ** 方式提交任务。参数说明:脚本:用户开发的 Python 程序 资源:是指脚本中需要调用的资源文件列表 自定义参数:是 Python 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容。
例如:
项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
拖拽 Python 节点到画布,新增一个 Python 任务
节点名称:Test_python_01
... ...
脚本:
#!/user/bin/python
# -*- coding: UTF-8 -*-
for num in range(0, 10): print 'Round %d ...' % num
资源:
自定义参数:
-> 确认添加
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_python
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行