DolphinScheduler应用实战笔记

发布于:2024-09-17 ⋅ 阅读:(57) ⋅ 点赞:(0)

一、前言

DolphinScheduler(后文简称DS)在项目中的应用实战笔记,包含DS执行SQL或存储过程、DS调用DataX同步数据、DS调用HTTP接口。
首先,在DS网页上创建项目:
在这里插入图片描述
点击项目名称进入项目
在这里插入图片描述
这两个工具栏:工作流定义和任务实例会经常使用

二、DS执行SQL或存储过程

点击操作栏的数据源中心,并创建数据源
在这里插入图片描述
在这里插入图片描述

选好并提交要执行sql的数据源即可,例如配置Hive。

点击操作栏的项目管理,点击项目进入,选择工作流定义,然后创建工作流:
在这里插入图片描述
拖拽出SQL标签,并填写数据源配置
在这里插入图片描述
填好节点名称、数据源配置和SQL:
在这里插入图片描述
在这里插入图片描述
SQL语句也可以是调用存储过程:(部分版本的hive不支持存储过程)

表结构:

CREATE TABLE `dev.test`(
  `id` bigint DEFAULT NULL COMMENT 'ID', 
  `tradedate` timestamp DEFAULT NULL COMMENT '日期', 
  `infosource` string DEFAULT NULL COMMENT '信息来源'
)
COMMENT '测试表'
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde' 
WITH SERDEPROPERTIES ( 
  'serialization.format'='1') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'hdfs://nameservice1/quark1/user/hive/warehouse/dev.db/test'
TBLPROPERTIES (
  'timelyre.replaced.count.col'='ID', 
  'timelyre.timestamp.col'='tradeDate', 
  'timelyre.tag.cols'='id,InfoSource')

存储过程语句:

CREATE
OR        REPLACE PROCEDURE dev.sp_test(IN_DATA_DATE IN INT)

          AS V_EXE_BEGIN_TIME TIMESTAMP;

--程序开始时间
V_EXE_END_TIME TIMESTAMP;

--程序结束时间
BEGIN --记录程序开始执行时间
V_EXE_BEGIN_TIME:=SYSTIMESTAMP;
EXECUTE IMMEDIATE 'truncate  table dev.ai_exce_rate';

INSERT INTO TABLE dev.test (id, tradedate, infosource) SELECT client_id, last_modify, key FROM dev.ads_acc_stock_trade_detail LIMIT 10;

--记录程序结束时间
V_EXE_END_TIME:=SYSTIMESTAMP;

--打印执行日志
DBMS_OUTPUT.PUT_LINE(
'过程: sp_test ,业务日期:'||IN_DATA_DATE||',执行完成。开始时间:'||V_EXE_BEGIN_TIME||',结束时间:'||V_EXE_END_TIME
);
END

调用存储过程

CALL dev.sp_test(${IN_TRADE_DATE})

SQL不是select语句,就选择非查询,然后点击保存。
在页面再次点击保存:
在这里插入图片描述
在这里插入图片描述
租户即是刚刚创建的项目,全局变量就是整个任务都有效。
在这里插入图片描述
保存后,点击上线按钮,上线任务并执行:
在这里插入图片描述
在这里插入图片描述
运行起来后可以去任务实例中,查询任务执行情况:
在这里插入图片描述

二、DS调用DataX同步数据

在操作栏点击资源中心,并新建datax的json
在这里插入图片描述
json样例:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/inceptor1/user/hive/warehouse/zx91.db/swscdc/ods_zx91_qt_tradingdaynew_all_day/dc_trade_date=${inTradeDate}",
            "defaultFS": "hdfs://nameservice1",
            "hadoopConfig": {
              "dfs.nameservices": "nameservice1",
              "dfs.ha.namenodes.nameservice1": "nn1,nn2",
              "dfs.namenode.rpc-address.nameservice1.nn1": "${srcNameService1}",
              "dfs.namenode.rpc-address.nameservice1.nn2": "${srcNameService2}",
              "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
            },
            "fileType": "orc",
            "fieldDelimiter": "\t",
            "column": [
              {
                "index": "0",
                "type": "decimal"
              },
              {
                "index": "1",
                "type": "timestamp"
              },
              {
                "index": "2",
                "type": "decimal"
              },
              {
                "index": "3",
                "type": "decimal"
              },
              {
                "index": "4",
                "type": "decimal"
              },
              {
                "index": "5",
                "type": "decimal"
              },
              {
                "index": "6",
                "type": "decimal"
              },
              {
                "index": "7",
                "type": "decimal"
              },
              {
                "index": "8",
                "type": "timestamp"
              },
              {
                "index": "9",
                "type": "decimal"
              },
              {
                "index": "10",
                "type": "string"
              },
              {
                "index": "11",
                "type": "int"
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://nameservice1",
            "hadoopConfig": {
              "dfs.nameservices": "nameservice1",
              "dfs.ha.namenodes.nameservice1": "nn1,nn2",
              "dfs.namenode.rpc-address.nameservice1.nn1": "timelyre02:8020",
              "dfs.namenode.rpc-address.nameservice1.nn2": "timelyre03:8020",
              "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
            },
            "fileType": "orc",
            "path": "/quark1/user/hive/warehouse/dev.db/ylf/qt_tradingdaynew/dc_trade_date=${inTradeDate}",
            "fileName": "qt_tradingdaynew",
            "writeMode": "nonConflict",
            "fieldDelimiter": "\t",
            "column": [
              {
                "name": "id",
                "type": "bigint"
              },
              {
                "name": "tradingdate",
                "type": "timestamp"
              },
              {
                "name": "iftradingday",
                "type": "int"
              },
              {
                "name": "secumarket",
                "type": "int"
              },
              {
                "name": "ifweekend",
                "type": "int"
              },
              {
                "name": "ifmonthend",
                "type": "int"
              },
              {
                "name": "ifquarterend",
                "type": "int"
              },
              {
                "name": "ifyearend",
                "type": "int"
              },
              {
                "name": "xgrq",
                "type": "timestamp"
              },
              {
                "name": "jsid",
                "type": "bigint"
              },
              {
                "name": "dc_etl_time",
                "type": "string"
              },
              {
                "name": "dc_trade_date",
                "type": "bigint"
              }
            ]
          }
        }
      }
    ]
  }
}

注意读和写的字段顺序要一样。
再编写shell文件,来加载datax的json文件,datax_push_tdh.sh:

#!/bin/bash
# ********************************************************************************
# * Filename     : datax_push_tdh.sh
# * Author       : dcx
# * Version      : 1.0.0
# * Created Date : 20240827
# * Description  : DataX 推送TDH集群任务执行器
# * History      :
# * <author> <version> <modified Date> <description>
# * dcx 1.0.0 2024-08-27 基于datax_push.sh修改,用于推送时序数据库(timelyre)
# *
# ********************************************************************************

# Module 1: 设置环境变量
export HADOOP_USER_NAME=swscdc
## dolphinscheduler 生产集群 TDH 客户端
source /usr/local/bigdata_client/TDH-oc-Client/init.sh
## dolphinscheduler 开发集群 TDH 客户端
# source /usr/local/bigdata_client/TDH-Client/init.sh

# Module 2: 公共函数
## 定义帮助文档
function ShowUsage() {
    cat <<EOF
USAGE: $0 [OPTIONS]

OPTIONS:
    -h --help           查看帮助文档.
    -d --dc-trade-date  [*]业务日期,格式为YYYYMMDD.
    -j --json-file-name [*]Datax Json 配置文件名.
    --src-name-service  源 HDFS HA NameService,格式为"TDHHOST1:PORT;TDHHOST2:PORT".
    --src-jdbc          源数据库连接信息,格式为jdbc连接串.
    --src-username      源数据库用户名.
    --src-password      源数据库用户密码.
    --tar-name-service  目标 HDFS HA NameService,格式为"TDHHOST1:PORT;TDHHOST2:PORT".
    --tar-jdbc          目标数据库连接信息,格式为jdbc连接串.
    --tar-username      目标数据库用户名.
    --tar-password      目标数据库用户密码.
    --tar-tablename     目标数据库表名(推送TDH集群进行日分区处理).
    --push-days         数据推送周期,格式为"days,0/1",days:天数(正整数>=1与负整数<=-1),0:自然日,1:交易日,即推送n个自然日或交易日的数据. 
    --jvm-buffer        JVM堆参数,即为防止jvm报内存溢出,根据实际情况调整jvm的堆参数,建议将内存设置为4G或者8G.
    --inc-value         非时间戳增量标识字段值.
    --dependence        当前依赖项,格式为"table_name1;table_name2;table_name3".
    --tar-es-url        推送ESURL信息.
    --custom-partition  自定义分区值,即非日分区传入.
    --nonpath-err       HDFS路径不存在时强制成功,该参数表示中止执行并不提示错误,否则默认将置为报错状态.

EOF
}
## 定义日志记录函数
function Logger() {
    echo -e "$(date +%Y/%m/%d\ %H:%M:%S)" : "${0##.*/}" "[${1}]"- "${2}"
}

# Module 3: 获取传入参数与参数值校验
## 使用 getopt 获取参数
OPTINOS=$(getopt --option hd:j: --long help,dc-trade-date:,json-file-name:,src-name-service:,src-jdbc:,src-username:,src-password:,tar-name-service:,tar-jdbc:,tar-username:,tar-password:,tar-tablename:,push-days:,jvm-buffer:,inc-value:,dependence:,tar-es-url:,custom-partition:,nonpath-err -n "$0" -- "$@")
VALID_OPTINOS=$?
[[ "${VALID_OPTINOS}" -ne "0" ]] && {
    echo "Try '$0 -h or --help' for more information."
    exit 10
}
## 使用 set 将 ${OPTINOS} 设置为位置参数
eval set -- "${OPTINOS}"
## 使用 shift 和 while 循环解析位置参数
while true; do
    case "$1" in
    -h | --help)
        ShowUsage
        exit 0
        ;;
    -d | --dc-trade-date)
        Logger "INFO" "业务日期: $2"
        DC_TRADE_DATE=$2
        shift 2
        ;;
    -j | --json-file-name)
        Logger "INFO" "Datax Json 配置文件名: $2"
        JSON_FILE_NAME=$2
        shift 2
        ;;
    --src-name-service)
        Logger "INFO" "源 HDFS HA NameService: $2"
        SRC_NAME_SERVICE=$2
        shift 2
        ;;
    --src-jdbc)
        Logger "INFO" "源数据库连接信息: $2"
        SRC_JDBC=$2
        shift 2
        ;;
    --src-username)
        Logger "INFO" "源数据库用户名: $2"
        SRC_USERNAME=$2
        shift 2
        ;;
    --src-password)
        Logger "INFO" "源数据库用户密码: $2"
        SRC_PASSWORD=$2
        shift 2
        ;;
    --tar-name-service)
        Logger "INFO" "目标 HDFS HA NameService: $2"
        TAR_NAME_SERVICE=$2
        shift 2
        ;;
    --tar-jdbc)
        Logger "INFO" "目标数据库连接信息: $2"
        TAR_JDBC=$2
        shift 2
        ;;
    --tar-username)
        Logger "INFO" "目标数据库用户名: $2"
        TAR_USERNAME=$2
        shift 2
        ;;
    --tar-password)
        Logger "INFO" "目标数据库用户密码: $2"
        TAR_PASSWORD=$2
        shift 2
        ;;
    --tar-tablename)
        Logger "INFO" "目标数据库表名(推送TDH集群进行日分区处理): $2"
        TAR_TABLE_NAME=$2
        shift 2
        ;;
    --push-days)
        Logger "INFO" "数据推送周期: $2"
        PUSH_DAYS=$2
        shift 2
        ;;
    --jvm-buffer)
        Logger "INFO" "JVM堆参数: $2"
        JVM_BUFFER=$2
        shift 2
        ;;
    --inc-value)
        Logger "INFO" "非时间戳增量标识字段值: $2"
        INC_VALUE=$2
        shift 2
        ;;
    --dependence)
        Logger "INFO" "当前依赖项: $2"
        DEPENDENCE=$2
        shift 2
        ;;
    --tar-es-url)
        Logger "INFO" "推送ES的URL信息: $2"
        TAR_ES_URL=$2
        shift 2
        ;;
    --custom-partition)
        Logger "INFO" "自定义分区值: $2"
        CUSTOM_PARTITION=$2
        shift 2
        ;;
    --nonpath-err)
        Logger "INFO" "HDFS路径不存在时强制成功: True"
        NONPATH_ERR="True"
        shift 1
        ;;
    --)
        shift
        break
        ;;
    *)
        Logger "WARNING" "无效选项: $1"
        ShowUsage
        exit 10
        ;;
    esac
done
## 校验参数值
if [[ -z "${DC_TRADE_DATE}" ]]; then
    Logger "ERROR" "[-d | --dc-trade-date] : 业务日期不能为空!"
    ShowUsage
    exit 10
elif ! [[ "$(expr "${DC_TRADE_DATE}" : "[1-3][0-9][0-9][0-9][0-1][0-9][0-3][0-9]")" = "8" ]]; then
    Logger "ERROR" "[-d | --dc-trade-date] : 业务日期格式错误!"
    ShowUsage
    exit 10
elif [[ -z "${JSON_FILE_NAME}" ]]; then
    Logger "ERROR" "[-j | --json-file-name] : DATAX JSON 配置文件名不能为空!"
    ShowUsage
    exit 10
elif [[ -n "${PUSH_DAYS}" && ! "${PUSH_DAYS}" =~ ^(-)?[1-9]+,(0|1)$ ]]; then
    Logger "ERROR" "[--push-days] : 数据推送周期格式错误!"
    ShowUsage
    exit 10
fi

# Module 4: 定义全局变量
# 日志表
LOG_TABLE_NAME="dwd:dwd_dc_etl_exec_logs"
## 记录开始时间
ETL_START_TIME="$(date "+%Y-%m-%d %H:%M:%S")"
## 日志表 任务名称
PROC_NAME=${JSON_FILE_NAME#*.}
## 日志表 任务类型,1:存储过程,2:数据推送,3:数据推送
PROC_TYPE=3
## 日志表 执行表名或者工作流名
TABLE_NAME=${JSON_FILE_NAME#*.}
## 日志表 任务执行结果标志位.1:成功,2:失败
EXEC_FLAG=1
# 是否写日志标志,0-不写入,0-写入
IS_WRITE_LOG=1
# Inceptor 生产集群连接信息
INCEPTOR_JDBC="jdbc:inceptor2://10.151.6.28:10000/dev"
INCEPTOR_USERNAME="swscdc"
INCEPTOR_PASSWORD=$(echo "U3dzYzYwMDM2OQo=" | base64 -d)
# 设置全局DataX执行参数
PARAMS="-DparaReaderJdbc=${SRC_JDBC} -DsrcUsername=${SRC_USERNAME} -DsrcPassword=${SRC_PASSWORD} -DcustomPartition=${CUSTOM_PARTITION} -DtarEsUrl=${TAR_ES_URL} -DparaWriterJdbc=${TAR_JDBC} -DtarUsername=${TAR_USERNAME} -DtarPassword=${TAR_PASSWORD} -DincValue='${INC_VALUE}'"

# Module 5: 业务处理函数
## 获取 json 文件引入资源函数
function GetJsonFile() {
    for file in "$1"/*; do
        if test -f "$file"; then
            if [[ "${file##*/}" == "${JSON_FILE_NAME#*.}.json" ]]; then
                JSON_FILE="${file}"
            fi
        else
            GetJsonFile "${file}"
        fi
    done
}
## 分区处理函数
function PartitionDel() {
    Logger "INFO" "[PartitionDel] - 开始增删分区 ..."
    Logger "INFO" "[PartitionDel] - TABLE : [${TAR_TABLE_NAME}], PARTITION : [${PARAM_TRADE_DATE}]"
    local alter_sql="ALTER TABLE ${TAR_TABLE_NAME} DROP PARTITION (dc_trade_date=${PARAM_TRADE_DATE});ALTER TABLE ${TAR_TABLE_NAME} ADD PARTITION (dc_trade_date=${PARAM_TRADE_DATE});"
    Logger "INFO" "[PartitionDel] - SQL : [${alter_sql}]."
    local part_del_times=1
    while true; do
        beeline -u "${TAR_JDBC}" -n ${TAR_USERNAME} -p ${TAR_PASSWORD} -e "${alter_sql}"
        local beeline_result=$?
        if [[ "${beeline_result}" = "0" ]]; then
            Logger "INFO" "[PartitionDel] - 增删分区完成."
            break
        else
            Logger "INFO" "[PartitionDel] - 增删分区失败,将进行失败重做."
            if [[ ${part_del_times} -ge 10 ]]; then
                Logger "ERROR" "[PartitionDel] - 失败重做次数已超过限制[>=10],退出执行."
                exit 10
            else
                Logger "INFO" "[PartitionDel] - 失败重做次数: [${part_del_times}], 失败重做间隔: [30s]"
                sleep 30s
                part_del_times=$((part_del_times + 1))
            fi
        fi
    done
}
## 调用 datax 执行函数
function PushData() {
    Logger "INFO" "[PushData] - 开始数据推送 ..."
    # 设置DataX执行参数
    local now_params
    # 判断源数据库类型,inceptor或者关系型数据库
    if [[ -n "${SRC_NAME_SERVICE}" ]]; then
        ## 推送前检查HDFS路径是否存在
        PathIsExist
        if [[ ${NOW_STATUS} == "False" ]]; then
            return
        fi
        ## HDFS HA NameService 参数值分割处理
        local src_name_service1 src_name_service2
        src_name_service1=$(echo "${SRC_NAME_SERVICE}" | awk -F ";" '{print $1}')
        src_name_service2=$(echo "${SRC_NAME_SERVICE}" | awk -F ";" '{print $2}')
        now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS} -DsrcNameService1=${src_name_service1} -DsrcNameService2=${src_name_service2}"
    else
        now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS}"
    fi
    ## 判断目标是否为TDH集群
    if [[ -n "${TAR_NAME_SERVICE}" ]]; then
        ## HDFS HA NameService 参数值分割处理
        local tar_name_service1 tar_name_service2
        tar_name_service1=$(echo "${TAR_NAME_SERVICE}" | awk -F ";" '{print $1}')
        tar_name_service2=$(echo "${TAR_NAME_SERVICE}" | awk -F ";" '{print $2}')
        now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS} -DtarNameService1=${tar_name_service1} -DtarNameService2=${tar_name_service2}"
    fi
    # 根据是否设置了JVM_BUFFER参数来决定是否需要修改JVM堆参数
    if [[ -n "${JVM_BUFFER}" ]]; then
        Logger "INFO" "[PushData] - JVM堆参数 : ${JVM_BUFFER}G"
        JVM="-Xms${JVM_BUFFER}G -Xmx${JVM_BUFFER}G"
    else
        # JVM堆参数默认为2G
        JVM="-Xms2G -Xmx2G"
    fi
    # 执行DataX任务
    Logger "INFO" "[PushData] - DataX执行参数: ${now_params}"
    python /usr/local/datax/bin/datax.py --jvm="${JVM}" "${JSON_FILE}" -p"${now_params}"
    local datax_result=$?
    if [[ "${datax_result}" == 0 ]]; then
        Logger "INFO" "[PushData] - 数据推送完成."
    else
        Logger "ERROR" "[PushData] - 数据推送失败."
        exit 10
    fi
}
## 写入 Hyperbase 日志函数
function WriteEtlLogToHyperbase() {
    local key=${JSON_FILE_NAME#*.}-${PARAM_TRADE_DATE}
    local data_date=${PARAM_TRADE_DATE}
    local etl_end_time
    etl_end_time="$(date "+%Y-%m-%d %H:%M:%S")"
    local write_log_times=1
    Logger "INFO" "[WriteEtlLogToHyperbase] - 写入推送完成日志信息 ..."
    Logger "INFO" "[WriteEtlLogToHyperbase] - TABLE : [dwd.dwd_dc_etl_exec_logs]"
    Logger "INFO" "[WriteEtlLogToHyperbase] - Hyperbase写入值: {key : ${key}, proc_name : ${PROC_NAME}, proc_type : ${PROC_TYPE}, table_name : ${TABLE_NAME}, data_date : ${data_date}, etl_start_time : ${data_date}, etl_end_time : ${etl_end_time}, exec_flag : ${EXEC_FLAG}}"
    while true; do
        echo "hput '${LOG_TABLE_NAME}','${key}','f:q1','${PROC_NAME}'
              hput '${LOG_TABLE_NAME}','${key}','f:q2','${PROC_TYPE}'
              hput '${LOG_TABLE_NAME}','${key}','f:q3','${TABLE_NAME}'
              hput '${LOG_TABLE_NAME}','${key}','f:q4','${data_date}'
              hput '${LOG_TABLE_NAME}','${key}','f:q5','${ETL_START_TIME}'
              hput '${LOG_TABLE_NAME}','${key}','f:q6','${etl_end_time}'
              hput '${LOG_TABLE_NAME}','${key}','f:q7','${EXEC_FLAG}'" | hbase shell -n >/dev/null 2>&1
        local write_result=$?
        if [[ "${write_result}" == "0" ]]; then
            Logger "INFO" "[WriteEtlLogToHyperbase] - 日志信息写入完成."
            break
        else
            Logger "INFO" "[WriteEtlLogToHyperbase] - 日志信息写入失败,将进行失败重做."
            if [[ ${write_log_times} -ge 3 ]]; then
                Logger "ERROR" "[WriteEtlLogToHyperbase] - 失败重做次数已超过限制[>=3],退出执行."
                exit 10
            else
                Logger "INFO" "[WriteEtlLogToHyperbase] - 失败重做次数: [${write_log_times}], 失败重做间隔: [10s]"
                sleep 10s
                write_log_times=$((write_log_times + 1))
            fi
        fi
    done
}
## 检查当前依赖项是否完成函数
function CheckSourceByHbaseShell() {
    Logger "INFO" "[CheckSourceByHbaseShell] - 检查当前依赖项是否完成 ..."
    # 设置分隔符为分号
    IFS=';'
    # 将字符串转换为数组
    read -ra arr <<< "${DEPENDENCE}"
    # 将IFS变量重置为默认值
    IFS=$' \t\n'
    # 循环读取数组
    for elem in "${arr[@]}"; do
        local key="${elem}-${PARAM_TRADE_DATE}"
        Logger "INFO" "[CheckSourceByHbaseShell] - 依赖项: ${elem}, key: ${key}"
        while true; do
            local hbase_result check_result
            hbase_result=$(echo "hget '${LOG_TABLE_NAME}','${key}'" | hbase shell -n 2>&1)
            check_result=$(echo "${hbase_result}"| awk '/row\(s\)/{print $0}' | awk -F " " '{print $1}' | sed ':a;N;$!ba;s/\n//g' | sed s/[[:space:]]//g)
           if [[ -n "${check_result}" && "${check_result}" != "0" ]]; then
                Logger "INFO" "[CheckSourceByHbaseShell] - 检查到依赖项已完成."
                break
           else
                Logger "INFO" "[CheckSourceByHbaseShell] - 检查到依赖项未完成,将等待1m后再次检查."
                sleep 1m
           fi
        done
    done
}
## 获取自然日或交易日函数
function GetNatureOrTradeDays() {
    local is_trade_date
    DAYS=$(echo "${PUSH_DAYS}" | awk -F "," '{print $1}')
    is_trade_date=$(echo "${PUSH_DAYS}" | awk -F "," '{print $2}')
    local sub_query_sql
    local days_abs
    if [[ "${is_trade_date}" == "0" ]]; then
        DAYS_FLAG="自然日"
        if [[ "${DAYS}" -gt 0 ]]; then
            sub_query_sql="AND T.DATE >= ${DC_TRADE_DATE} ORDER BY T.DATE LIMIT ${DAYS};"
        else
            days_abs=$((0 - "${DAYS}"))
            sub_query_sql="AND T.DATE <= ${DC_TRADE_DATE} ORDER BY T.DATE DESC LIMIT ${days_abs};"
        fi
    else
        DAYS_FLAG="交易日"
        if [[ "${DAYS}" -gt 0 ]]; then
            sub_query_sql="AND T.IS_TRADE_DATE = 1 AND T.DATE >= ${DC_TRADE_DATE} ORDER BY T.DATE LIMIT ${DAYS};"
        else
            days_abs=$((0 - "${DAYS}"))
            sub_query_sql="AND T.IS_TRADE_DATE = 1 AND T.DATE <= ${DC_TRADE_DATE} ORDER BY T.DATE DESC LIMIT ${days_abs};"
        fi
    fi
    # 初始化空数组
    DAYS_ARR=()
    local query_sql="SELECT T.DATE FROM DIM.DIM_TRADE_DATE_ADD_YEAR T WHERE T.MKT_CODE = '1' ${sub_query_sql}"
    local query_result trade_days
    query_result=$(beeline -u "${INCEPTOR_JDBC}" -n "${INCEPTOR_USERNAME}" -p "${INCEPTOR_PASSWORD}" -e "${query_sql}" --showHeader=false --silent=true)
    trade_days=$(echo "${query_result}" | awk -F "|" '{print $2}' | sed ':a;N;$!ba;s/\n/;/g' | sed s/[[:space:]]//g | grep -oP '\d{8}' | paste -sd ' ' -)
    # 设置分隔符为分号
    IFS=' '
    # 将字符串转换为数组
    read -ra DAYS_ARR <<< "${trade_days}"
    # 将IFS变量重置为默认值
    IFS=$' \t\n'
}
## HDFS路径检查函数
function PathIsExist() {
    Logger "INFO" "[PathIsExist] - 开始检查HDFS路径是否存在 ..."
    local hdfs_path
    hdfs_path=$(sed -n '/.*"path": "\(.*\)",/{s//\1/p;q}' "${JSON_FILE}" | sed "s/\${inTradeDate}/${PARAM_TRADE_DATE}/g" | sed "s/\${customPartition}/${CUSTOM_PARTITION}/g")
    hdfs_path=$(echo -e "${hdfs_path}" | sed -e 's/\n//g' -e 's/\r//g')
    Logger "INFO" "[PathIsExist] - fs.defaultFS path : ${hdfs_path}"
    Logger "INFO" "[PathIsExist] - 路径检查命令 : hadoop fs -test -e ${hdfs_path} >/dev/null 2>&1"
    hadoop fs -test -e "${hdfs_path}" >/dev/null 2>&1
    local test_result=$?
    if [[ "${test_result}" == 0 ]]; then
        Logger "INFO" "[PathIsExist] - 已检查到配置项fs.defaultFS,path存在."
        Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
        NOW_STATUS="True"
    else
        if [[ "${NONPATH_ERR}" == "True" ]]; then
            Logger "INFO" "[PathIsExist] - 已检查到配置项fs.defaultFS,path不存在,则默认本次无任何数据进行推送,中止执行,任务状态将置为成功!"
            Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
            Logger "INFO" "[PushData] - 数据推送完成."
            NOW_STATUS="False"
        else
            Logger "ERROR" "[PathIsExist] - 无法读取路径${hdfs_path}下的所有文件,请确认您的配置项fs.defaultFS,path的值是否正确,是否有读写权限,网络是否已断开!"
            Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
            Logger "ERROR" "[PushData] - 数据推送失败."
            exit 10
        fi
    fi
}
## 构建 main 函数
function Main() {
    GetJsonFile .
    ## 判断 json 文件是否是否存在且是否为空文件
    if [[ ! -s ${JSON_FILE} ]]; then
        Logger "INFO" "[Main] - 配置文件 : ${JSON_FILE_NAME#*.}.json"
        Logger "ERROR" "[Main] - 配置文件不存或者为空文件,请检查是否引入资源或者引入错误资源."
        exit 10
    fi
    ## 判断是否需要推送某个时间段内的数据
    if [[ -z "${PUSH_DAYS}" ]]; then
        ## 设置后续函数的中PARAM_TRADE_DATE值为DC_TRADE_DATE
        PARAM_TRADE_DATE="${DC_TRADE_DATE}"
        # 判断是否需要进行当前依赖项检查
        if [[ -n "${DEPENDENCE}" ]]; then
            CheckSourceByHbaseShell
        fi
        ## 判断是否需要推送TDH集群进行日分区处理
        if [[ -n "${TAR_TABLE_NAME}" ]]; then
            ## 增删分区
            PartitionDel
        fi
        ## 推送数据
        PushData
        # 写入推送成功日志
        if [[ ${IS_WRITE_LOG} -gt 0 ]]; then
            ## 向生产环境日志表写入日志,因为旧集群未启用HBase,所以暂时不写日志.
            WriteEtlLogToHyperbase
        fi
    else
        ## 进行循环推数
        GetNatureOrTradeDays
        Logger "INFO" "[Main] - 推送近${DAYS}个${DAYS_FLAG}内的数据 ..."
        Logger "INFO" "[Main] - ${DAYS_FLAG} : ${DAYS_ARR[*]}"
        for idx in "${!DAYS_ARR[@]}"; do
            PARAM_TRADE_DATE="${DAYS_ARR[idx]}"
            Logger "INFO" "[Main] - [${idx}] - 业务日期 : ${DC_TRADE_DATE}, 数据日期 : ${PARAM_TRADE_DATE}"
            # 判断是否需要进行当前依赖项检查
            if [[ -n "${DEPENDENCE}" ]]; then
                CheckSourceByHbaseShell
            fi
            ## 判断是否需要推送TDH集群进行日分区处理
            if [[ -n "${TAR_TABLE_NAME}" ]]; then
                ## 增删分区
                PartitionDel
            fi
            ## 推送数据
            PushData
            ## 写入推送成功日志
            if [[ ${IS_WRITE_LOG} -gt 0 ]]; then
                ## 向生产环境日志表写入日志,因为旧集群未启用HBase,所以暂时不写日志.
                WriteEtlLogToHyperbase
            fi
        done
    fi
}

# Module 6: 执行 main 函数
Main

这个就仅供参考~
然后就新建工作流定义,选择shell任务节点,需要填好名称、执行脚本和引入json和shell脚本。
在这里插入图片描述
执行脚本:

sh tag_platform/shell/datax_push_tdh.sh -d ${IN_TRADE_DATE} -j ${JSON_FILE_NAME} --src-name-service ${SRC_NAME_SERVICE} --tar-jdbc ${TAR_JDBC} --tar-username ${TAR_USERNAME} --tar-password ${TAR_PASSWORD} --tar-tablename ${TAR_TABLENAME} 

然后上线任务并执行即可。

三、DS调用HTTP接口

选择HTTP任务节点
在这里插入图片描述

参数说明:

节点名称:一个工作流定义中的节点名称是唯一的。

运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。

描述信息:描述该节点的功能。

任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。

Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。

失败重试次数:任务失败重新提交的次数,支持下拉和手填。

失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。

超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.

请求地址:http 请求 URL。

请求类型:支持 GETPOStHEADPUTDELETE。

请求参数:支持 ParameterBodyHeaders。

校验条件:支持默认响应码、自定义响应码、内容包含、内容不包含。

校验内容:当校验条件选择自定义响应码、内容包含、内容不包含时,需填写校验内容。

自定义参数:是 http 局部的用户自定义参数,会替换脚本中以${变量}的内容。

四、DS依赖(DEPENDENT)节点

运行说明:依赖节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。

例如,A 流程为周报任务,B、C 流程为天任务,A 任务需要 B、C 任务在上周的每一天都执行成功,如图示:
在这里插入图片描述
假如,周报 A 同时还需要自身在上周二执行成功:
在这里插入图片描述

五、DS SPARK 节点

执行说明:通过 SPARK 节点,可以直接直接执行 SPARK 程序,对于 spark 节点,worker 会使用 spark-submit 方式提交任务 参数说明:

程序类型:支持 JAVAScalaPython 三种语言

主函数的 class:是 Spark 程序的入口 Main Class 的全路径

主 jar 包:是 Spark 的 jar 包

部署方式:支持 yarn-cluster、yarn-client、和 local 三种模式

Driver:设置 Driver 内核数 及 内存数

Executor:设置 Executor 数量、Executor 内存数、Executor 内核数

命令行参数:是设置 Spark 程序的输入参数,支持自定义参数变量的替换。

其他参数:支持 --jars、--files、--archives、--conf 格式

资源:如果其他参数中引用了资源文件,需要在资源中选择指定

自定义参数:是 MR 局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVAScala 只是用来标识,没有区别,如果是 Python 开发的 Spark 则没有主函数的 class ,其他都是一样

六、DS Flink 节点

参数说明:

程序类型:支持 JAVAScalaPython 三种语言

主函数的 class:是 Flink 程序的入口 Main Class 的全路径

主 jar 包:是 Flink 的 jar 包

部署方式:支持 cluster、local 三种模式

slot 数量:可以设置slot数

taskManage 数量:可以设置 taskManage 数

jobManager 内存数:可以设置 jobManager 内存数

taskManager 内存数:可以设置 taskManager 内存数

命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。

其他参数:支持 --jars、--files、--archives、--conf 格式

资源:如果其他参数中引用了资源文件,需要在资源中选择指定

自定义参数:是 Flink 局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVAScala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的class,其他都是一样

七、DS Flink 节点

例如:

项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
Task 1:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_hive_01
... ...
数据源:Hive  test_hiveserver2
sql类型:查询   表格:√ 附件:√
主题:Test Hive
收件人:tourist@sohh.cn
sql语句(结尾不要加分号):
    select * from test_table where score=${i}
自定义参数:
    i -> IN -> INTEGER -> 97
前置sql:
    INSERT INTO test_table values(null, 'Dog',97)
后置sql-> 确认添加
Task 2:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_hive_02
... ...
数据源:Hive  test_hiveserver2_ha
sql类型:非查询
sql语句(结尾不要加分号):
    create table test_table2 as select * from test_table
自定义参数:
前置sql:
后置sql-> 确认添加
------------------------------------------------------
串联任务节点 Test_sql_hive_01、 Test_sql_hive_02
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_sql_hive
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行

八、DS SQL 节点

参数说明:

数据源:选择对应的数据源

sql 类型:支持查询和非查询两种,查询是 select  类型的查询,是有结果集返回的,可以指定邮件通知为 表格、附件 或 表格与附件 三种模板。非查询是没有结果集返回的,是针对 update、delete、insert 三种类型的操作

主题、收件人、抄送人:邮件相关配置

sql 参数:输入参数格式为 key1=value1;key2=value2…

sql 语句:SQL 语句

UDF 函数:对于 HIVE 类型的数据源,可以引用资源中心中创建的 UDF 函数,其他类型的数据源暂不支持 UDF 函数

自定义参数:SQL 任务类型自定义参数会替换 sql 语句中 ${变量}。而存储过程是通过自定义参数给方法参数设置值,自定义参数类型和数据类型同存储过程任务类型一样。

前置 sql:执行 “sql语句” 前的操作

后置 sql:执行 “sql语句” 后的操作

例如MySQL:

项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
Task 1:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_mysql_01
... ...
数据源:MYSQL   test01_mysql
sql类型:查询   表格:√ 附件:√
主题:Test MySQL
收件人:tourist@sohh.cn
sql语句:
    select * from test_table where score=${i};
自定义参数:
    i -> IN -> INTEGER -> 97
前置sql:
    INSERT INTO test_table values(null, 'Dog',97)
后置sql-> 确认添加
Task 2:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_mysql_02
... ...
数据源:MYSQL   test01_mysql
sql类型:非查询
sql语句:
    create table test_table2 as select * from test_table;
自定义参数:
前置 sql:
后置 sql-> 确认添加
------------------------------------------------------
串联任务节点 Test_sql_mysql_01、Test_sql_mysql_02
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_sql_mysql
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行

九、DS Java程序

Java 程序参数说明:

程序类型:JAVA

主函数的 class:是 MR 程序的入口 Main Class 的全路径

主jar包:是 MR 的 jar 包

命令行参数:是设置 MR 程序的输入参数,支持自定义参数变量的替换

其他参数:支持 –D-files、-libjars、-archives格式

资源:如果其他参数中引用了资源文件,需要在资源中选择指定

自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

例如:

# 将 MR 的示例 jar 包上传到 资源中心;并创建测试文本上传到 HDFS 目录
# CDH 版本 Jar 包位置:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
拖拽 MR 节点到画布,新增一个 MR 任务
节点名称:Test_mr_java_01
... ...
程序类型:JAVA
主函数的class:wordcount
主jar包:hadoop-mapreduce-examples.jar
命令行参数:/tmp/test.txt /tmp/output
其他参数:
资源:
自定义参数:
-> 确认添加
------------------------------------------------------
保存 ->
设置DAG图名称:Test_mr_java
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行(运行MR的权限问题此处不再描述)
------------------------------------------------------
查看结果:
sudo -u hdfs hadoop fs -cat /tmp/output/*

十、DS Python节点

运行说明:使用 python 节点,可以直接执行 python 脚本,对于 python 节点,worker会使用 python ** 方式提交任务。参数说明:脚本:用户开发的 Python 程序 资源:是指脚本中需要调用的资源文件列表 自定义参数:是 Python 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容。
例如:

项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
拖拽 Python 节点到画布,新增一个 Python 任务
节点名称:Test_python_01
... ...
脚本:
    #!/user/bin/python
    # -*- coding: UTF-8 -*-
    for num in range(0, 10): print 'Round %d ...' % num
资源:
自定义参数:
-> 确认添加
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_python
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行