Hive3.1.2的HQL执行过程
前言
上一篇讲解了Hive3.1.2的Beeline执行过程:lizhiyong.blog.csdn.net/article/details/126634843
总结概括如下:
Main方法初始化Beeline对象→吊起唯一的方法→读取配置的load方法→
启动历史的setupHistory方法→平滑退出的addBeelineShutdownHook方法→
初始化命令行读取器的initializeConsoleReader方法→初始化入参的initArgs方法→
调度的dispatch方法→执行脚本文件的executeFile方法【不一定执行】→
最终执行execute方法【如果有-f传入脚本文件,内部也会吊起调度的dispatch方法】
通过扒源码的方式得出结论:Beeline底层是走JDBC方式操作Hive。
[root@zhiyong2 ~]# cd /opt/usdp-srv/srv/udp/2.0.0.0/hive/bin
[root@zhiyong2 bin]# ls -ltr
总用量 64
-rwxrwxrwx. 1 hadoop hadoop 884 8月 23 2019 schematool
-rwxrwxrwx. 1 hadoop hadoop 832 8月 23 2019 metatool
-rwxrwxrwx. 1 hadoop hadoop 3064 8月 23 2019 init-hive-dfs.sh
-rwxrwxrwx. 1 hadoop hadoop 880 8月 23 2019 hplsql
-rwxrwxrwx. 1 hadoop hadoop 885 8月 23 2019 hiveserver2
-rwxrwxrwx. 1 hadoop hadoop 881 8月 23 2019 beeline
drwxrwxrwx. 3 hadoop hadoop 4096 12月 24 2020 ext
-rwxrwxrwx. 1 hadoop hadoop 1981 12月 14 2021 hive-config.sh
-rwxrwxrwx. 1 hadoop hadoop 10414 3月 1 2022 hive
-rwxrwxrwx. 1 hadoop hadoop 141 3月 1 2022 init-metastore-db.sh
-rwxrwxrwx. 1 hadoop hadoop 601 3月 1 2022 metastore-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop 588 3月 1 2022 hive-server2-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop 962 3月 1 2022 check-warehouse-dir.sh
-rwxrwxrwx. 1 hadoop hadoop 1077 3月 1 2022 check-tez-dir.sh
在Hive安装路径的bin下,有beeline及hive这2个shell脚本。beeline的内容:
#!/usr/bin/env bash
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/hive --service beeline "$@"
很简洁,就是切到bin路径,然后执行了hive这个shell脚本,并且传参。hive的脚本:
#!/usr/bin/env bash
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/hive-config.sh
SERVICE=""
HELP=""
SKIP_HBASECP=false
SKIP_HADOOPVERSION=false
SERVICE_ARGS=()
while [ $# -gt 0 ]; do
case "$1" in
--version)
shift
SERVICE=version
;;
--service)
shift
SERVICE=$1
shift
;;
--rcfilecat)
SERVICE=rcfilecat
shift
;;
--orcfiledump)
SERVICE=orcfiledump
shift
;;
--llapdump)
SERVICE=llapdump
shift
;;
--skiphadoopversion)
SKIP_HADOOPVERSION=true
shift
;;
--skiphbasecp)
SKIP_HBASECP=true
shift
;;
--help)
HELP=_help
shift
;;
--debug*)
DEBUG=$1
shift
;;
*)
SERVICE_ARGS=("${SERVICE_ARGS[@]}" "$1")
shift
;;
esac
done
if [ "$SERVICE" = "" ] ; then
if [ "$HELP" = "_help" ] ; then
SERVICE="help"
else
SERVICE="cli"
fi
fi
if [[ "$SERVICE" == "cli" && "$USE_BEELINE_FOR_HIVE_CLI" == "true" ]] ; then
SERVICE="beeline"
fi
if [[ "$SERVICE" =~ ^(help|version|orcfiledump|rcfilecat|schemaTool|cleardanglingscratchdir|metastore|beeline|llapstatus|llap)$ ]] ; then
SKIP_HBASECP=true
fi
if [[ "$SERVICE" =~ ^(help|schemaTool)$ ]] ; then
SKIP_HADOOPVERSION=true
fi
if [ -f "${HIVE_CONF_DIR}/hive-env.sh" ]; then
. "${HIVE_CONF_DIR}/hive-env.sh"
fi
if [[ -z "$SPARK_HOME" ]]
then
bin=`dirname "$0"`
# many hadoop installs are in dir/{spark,hive,hadoop,..}
if test -e $bin/../../spark; then
sparkHome=$(readlink -f $bin/../../spark)
if [[ -d $sparkHome ]]
then
export SPARK_HOME=$sparkHome
fi
fi
fi
CLASSPATH="${TEZ_CONF_DIR:-/etc/tez/conf}:${HIVE_CONF_DIR}"
HIVE_LIB=${HIVE_HOME}/lib
# needed for execution
if [ ! -f ${HIVE_LIB}/hive-exec-*.jar ]; then
echo "Missing Hive Execution Jar: ${HIVE_LIB}/hive-exec-*.jar"
exit 1;
fi
if [ ! -f ${HIVE_LIB}/hive-metastore-*.jar ]; then
echo "Missing Hive MetaStore Jar"
exit 2;
fi
# cli specific code
if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then
echo "Missing Hive CLI Jar"
exit 3;
fi
# Hbase and Hadoop use their own log4j jars. Including hives log4j jars can cause
# log4j warnings. So save hives log4j jars in LOG_JAR_CLASSPATH, and add it to classpath
# after Hbase and Hadoop calls finish
LOG_JAR_CLASSPATH="";
for f in ${HIVE_LIB}/*.jar; do
if [[ $f == *"log4j"* ]]; then
LOG_JAR_CLASSPATH=${LOG_JAR_CLASSPATH}:$f;
else
CLASSPATH=${CLASSPATH}:$f;
fi
done
# add the auxillary jars such as serdes
if [ -d "${HIVE_AUX_JARS_PATH}" ]; then
hive_aux_jars_abspath=`cd ${HIVE_AUX_JARS_PATH} && pwd`
for f in $hive_aux_jars_abspath/*.jar; do
if [[ ! -f $f ]]; then
continue;
fi
if $cygwin; then
f=`cygpath -w "$f"`
fi
AUX_CLASSPATH=${AUX_CLASSPATH}:$f
if [ "${AUX_PARAM}" == "" ]; then
AUX_PARAM=file://$f
else
AUX_PARAM=${AUX_PARAM},file://$f;
fi
done
elif [ "${HIVE_AUX_JARS_PATH}" != "" ]; then
HIVE_AUX_JARS_PATH=`echo $HIVE_AUX_JARS_PATH | sed 's/,/:/g'`
if $cygwin; then
HIVE_AUX_JARS_PATH=`cygpath -p -w "$HIVE_AUX_JARS_PATH"`
HIVE_AUX_JARS_PATH=`echo $HIVE_AUX_JARS_PATH | sed 's/;/,/g'`
fi
AUX_CLASSPATH=${AUX_CLASSPATH}:${HIVE_AUX_JARS_PATH}
AUX_PARAM="file://$(echo ${HIVE_AUX_JARS_PATH} | sed 's/:/,file:\/\//g')"
fi
# adding jars from auxlib directory
for f in ${HIVE_HOME}/auxlib/*.jar; do
if [[ ! -f $f ]]; then
continue;
fi
if $cygwin; then
f=`cygpath -w "$f"`
fi
AUX_CLASSPATH=${AUX_CLASSPATH}:$f
if [ "${AUX_PARAM}" == "" ]; then
AUX_PARAM=file://$f
else
AUX_PARAM=${AUX_PARAM},file://$f;
fi
done
if $cygwin; then
CLASSPATH=`cygpath -p -w "$CLASSPATH"`
CLASSPATH=${CLASSPATH};${AUX_CLASSPATH}
else
CLASSPATH=${CLASSPATH}:${AUX_CLASSPATH}
fi
# supress the HADOOP_HOME warnings in 1.x.x
export HADOOP_HOME_WARN_SUPPRESS=true
# to make sure log4j2.x and jline jars are loaded ahead of the jars pulled by hadoop
export HADOOP_USER_CLASSPATH_FIRST=true
# pass classpath to hadoop
if [ "$HADOOP_CLASSPATH" != "" ]; then
export HADOOP_CLASSPATH="${CLASSPATH}:${HADOOP_CLASSPATH}"
else
export HADOOP_CLASSPATH="$CLASSPATH"
fi
# also pass hive classpath to hadoop
if [ "$HIVE_CLASSPATH" != "" ]; then
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${HIVE_CLASSPATH}";
fi
# check for hadoop in the path
HADOOP_IN_PATH=`which hadoop 2>/dev/null`
if [ -f ${HADOOP_IN_PATH} ]; then
HADOOP_DIR=`dirname "$HADOOP_IN_PATH"`/..
fi
# HADOOP_HOME env variable overrides hadoop in the path
HADOOP_HOME=${HADOOP_HOME:-${HADOOP_PREFIX:-$HADOOP_DIR}}
if [ "$HADOOP_HOME" == "" ]; then
echo "Cannot find hadoop installation: \$HADOOP_HOME or \$HADOOP_PREFIX must be set or hadoop must be in the path";
exit 4;
fi
# add distcp to classpath, hive depends on it
for f in ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-distcp-*.jar; do
export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:$f;
done
HADOOP=$HADOOP_HOME/bin/hadoop
if [ ! -f ${HADOOP} ]; then
echo "Cannot find hadoop installation: \$HADOOP_HOME or \$HADOOP_PREFIX must be set or hadoop must be in the path";
exit 4;
fi
if [ "$SKIP_HADOOPVERSION" = false ]; then
# Make sure we're using a compatible version of Hadoop
if [ "x$HADOOP_VERSION" == "x" ]; then
HADOOP_VERSION=$($HADOOP version 2>&2 | awk -F"\t" '/Hadoop/ {print $0}' | cut -d' ' -f 2);
fi
# Save the regex to a var to workaround quoting incompatabilities
# between Bash 3.1 and 3.2
hadoop_version_re="^([[:digit:]]+)\.([[:digit:]]+)(\.([[:digit:]]+))?.*$"
if [[ "$HADOOP_VERSION" =~ $hadoop_version_re ]]; then
hadoop_major_ver=${BASH_REMATCH[1]}
hadoop_minor_ver=${BASH_REMATCH[2]}
hadoop_patch_ver=${BASH_REMATCH[4]}
else
echo "Unable to determine Hadoop version information."
echo "'hadoop version' returned:"
echo `$HADOOP version`
exit 5
fi
if [ "$hadoop_major_ver" -lt "1" -a "$hadoop_minor_ver$hadoop_patch_ver" -lt "201" ]; then
echo "Hive requires Hadoop 0.20.x (x >= 1)."
echo "'hadoop version' returned:"
echo `$HADOOP version`
exit 6
fi
fi
if [ "$SKIP_HBASECP" = false ]; then
# HBase detection. Need bin/hbase and a conf dir for building classpath entries.
# Start with BigTop defaults for HBASE_HOME and HBASE_CONF_DIR.
HBASE_HOME=${HBASE_HOME:-"/usr/lib/hbase"}
HBASE_CONF_DIR=${HBASE_CONF_DIR:-"/etc/hbase/conf"}
if [[ ! -d $HBASE_CONF_DIR ]] ; then
# not explicitly set, nor in BigTop location. Try looking in HBASE_HOME.
HBASE_CONF_DIR="$HBASE_HOME/conf"
fi
# perhaps we've located the HBase config. if so, include it on classpath.
if [[ -d $HBASE_CONF_DIR ]] ; then
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${HBASE_CONF_DIR}"
fi
# look for the hbase script. First check HBASE_HOME and then ask PATH.
if [[ -e $HBASE_HOME/bin/hbase ]] ; then
HBASE_BIN="$HBASE_HOME/bin/hbase"
fi
HBASE_BIN=${HBASE_BIN:-"$(which hbase)"}
# perhaps we've located HBase. If so, include its details on the classpath
if [[ -n $HBASE_BIN ]] ; then
# exclude ZK, PB, and Guava (See HIVE-2055)
# depends on HBASE-8438 (hbase-0.94.14+, hbase-0.96.1+) for `hbase mapredcp` command
for x in $($HBASE_BIN mapredcp 2>&2 | tr ':' '\n') ; do
if [[ $x == *zookeeper* || $x == *protobuf-java* || $x == *guava* ]] ; then
continue
fi
# TODO: should these should be added to AUX_PARAM as well?
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${x}"
done
fi
fi
if [ "${AUX_PARAM}" != "" ]; then
if [[ "$SERVICE" != beeline ]]; then
HIVE_OPTS="$HIVE_OPTS --hiveconf hive.aux.jars.path=${AUX_PARAM}"
fi
AUX_JARS_CMD_LINE="-libjars ${AUX_PARAM}"
fi
if [ "$SERVICE" = "hiveserver2" ] ; then
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS $HIVE_SERVER2_JMX_OPTS "
fi
if [ "$SERVICE" = "metastore" ] ; then
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS $HIVE_METASTORE_JMX_OPTS "
fi
SERVICE_LIST=""
for i in "$bin"/ext/*.sh ; do
. $i
done
for i in "$bin"/ext/util/*.sh ; do
. $i
done
if [ "$DEBUG" ]; then
if [ "$HELP" ]; then
debug_help
exit 0
else
get_debug_params "$DEBUG"
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS $HIVE_MAIN_CLIENT_DEBUG_OPTS"
fi
fi
TORUN=""
for j in $SERVICE_LIST ; do
if [ "$j" = "$SERVICE" ] ; then
TORUN=${j}$HELP
fi
done
# to initialize logging for all services
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j2.formatMsgNoLookups=true -Dlog4j.configurationFile=hive-log4j2.properties "
if [ -f "${HIVE_CONF_DIR}/parquet-logging.properties" ]; then
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Djava.util.logging.config.file=${HIVE_CONF_DIR}/parquet-logging.properties "
else
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Djava.util.logging.config.file=$bin/../conf/parquet-logging.properties "
fi
if [[ "$SERVICE" =~ ^(hiveserver2|beeline|cli)$ ]] ; then
# If process is backgrounded, don't change terminal settings
if [[ ( ! $(ps -o stat= -p $$) =~ "+" ) && ! ( -p /dev/stdin ) && ( ! $(ps -o tty= -p $$) =~ "?" ) ]]; then
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Djline.terminal=jline.UnsupportedTerminal"
fi
fi
# include the log4j jar that is used for hive into the classpath
CLASSPATH="${CLASSPATH}:${LOG_JAR_CLASSPATH}"
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${LOG_JAR_CLASSPATH}"
if [ "$TORUN" = "" ] ; then
echo "Service $SERVICE not found"
echo "Available Services: $SERVICE_LIST"
exit 7
else
set -- "${SERVICE_ARGS[@]}"
$TORUN "$@"
fi
从hive脚本可以看出,
for i in "$bin"/ext/*.sh ; do
. $i
done
for i in "$bin"/ext/util/*.sh ; do
. $i
done
Hive脚本会把安装包路径下/ext及/ext/util的所有脚本都执行一遍!!!也就是这些脚本:
/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin
[root@zhiyong2 bin]# ll
总用量 64
-rwxrwxrwx. 1 hadoop hadoop 881 8月 23 2019 beeline
-rwxrwxrwx. 1 hadoop hadoop 1077 3月 1 2022 check-tez-dir.sh
-rwxrwxrwx. 1 hadoop hadoop 962 3月 1 2022 check-warehouse-dir.sh
drwxrwxrwx. 3 hadoop hadoop 4096 12月 24 2020 ext
-rwxrwxrwx. 1 hadoop hadoop 10414 3月 1 2022 hive
-rwxrwxrwx. 1 hadoop hadoop 1981 12月 14 2021 hive-config.sh
-rwxrwxrwx. 1 hadoop hadoop 885 8月 23 2019 hiveserver2
-rwxrwxrwx. 1 hadoop hadoop 588 3月 1 2022 hive-server2-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop 880 8月 23 2019 hplsql
-rwxrwxrwx. 1 hadoop hadoop 3064 8月 23 2019 init-hive-dfs.sh
-rwxrwxrwx. 1 hadoop hadoop 141 3月 1 2022 init-metastore-db.sh
-rwxrwxrwx. 1 hadoop hadoop 601 3月 1 2022 metastore-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop 832 8月 23 2019 metatool
-rwxrwxrwx. 1 hadoop hadoop 884 8月 23 2019 schematool
[root@zhiyong2 bin]# cd ext/
[root@zhiyong2 ext]# ll
总用量 88
-rwxrwxrwx. 1 hadoop hadoop 1591 8月 23 2019 beeline.sh
-rwxrwxrwx. 1 hadoop hadoop 1113 8月 23 2019 cleardanglingscratchdir.sh
-rwxrwxrwx. 1 hadoop hadoop 1596 8月 23 2019 cli.sh
-rwxrwxrwx. 1 hadoop hadoop 3199 8月 23 2019 debug.sh
-rwxrwxrwx. 1 hadoop hadoop 1531 8月 23 2019 fixacidkeyindex.sh
-rwxrwxrwx. 1 hadoop hadoop 1456 8月 23 2019 help.sh
-rwxrwxrwx. 1 hadoop hadoop 1187 8月 23 2019 hiveburninclient.sh
-rwxrwxrwx. 1 hadoop hadoop 1341 8月 23 2019 hiveserver2.sh
-rwxrwxrwx. 1 hadoop hadoop 1372 8月 23 2019 hplsql.sh
-rwxrwxrwx. 1 hadoop hadoop 1424 8月 23 2019 jar.sh
-rwxrwxrwx. 1 hadoop hadoop 1238 8月 23 2019 lineage.sh
-rwxrwxrwx. 1 hadoop hadoop 1185 8月 23 2019 llapdump.sh
-rwxrwxrwx. 1 hadoop hadoop 1669 8月 23 2019 llap.sh
-rwxrwxrwx. 1 hadoop hadoop 1471 8月 23 2019 llapstatus.sh
-rwxrwxrwx. 1 hadoop hadoop 1393 8月 23 2019 metastore.sh
-rwxrwxrwx. 1 hadoop hadoop 1101 8月 23 2019 metatool.sh
-rwxrwxrwx. 1 hadoop hadoop 1894 8月 23 2019 orcfiledump.sh
-rwxrwxrwx. 1 hadoop hadoop 1059 8月 23 2019 rcfilecat.sh
-rwxrwxrwx. 1 hadoop hadoop 1080 8月 23 2019 schemaTool.sh
-rwxrwxrwx. 1 hadoop hadoop 1078 8月 23 2019 strictmanagedmigration.sh
-rwxrwxrwx. 1 hadoop hadoop 2871 8月 23 2019 tokentool.sh
drwxrwxrwx. 2 hadoop hadoop 28 11月 15 2020 util
-rwxrwxrwx. 1 hadoop hadoop 1271 8月 23 2019 version.sh
[root@zhiyong2 ext]# cd util/
[root@zhiyong2 util]# ll
总用量 4
-rwxrwxrwx. 1 hadoop hadoop 1460 8月 23 2019 execHiveCmd.sh
除了一大坨取环境变量之类的参数变量外,最主要的需要3个依赖包:
[root@zhiyong2 lib]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/lib
[root@zhiyong2 lib]# ll | grep -e hive-exec -e hive-metastore -e hive-cli
-rwxrwxrwx. 1 hadoop hadoop 46942 11月 15 2020 hive-cli-3.1.2.jar
-rwxrwxrwx. 1 hadoop hadoop 40625273 6月 25 2021 hive-exec-3.1.2.jar
-rwxrwxrwx. 1 hadoop hadoop 36888 11月 15 2020 hive-metastore-3.1.2.jar
就是这3个,显然Hive脚本的运行必须有这3个包。
其实Hive的Java API开发过程中也非常依赖hive-exec【例如写UDF:lizhiyong.blog.csdn.net/article/details/126186377】,这是Hive运行时的具体执行类库。hive-metastore当然是Hive的元数据类库。hive-cli当然就是hive命令行的类库,所有命令都是通过它作为入口来运行的。其实JDBC和hive-cli只是入口不同,底层执行【没错,就是指代SQL→MapReduce/Tez/Spark】是一致的。这次就从入口扒源码,一探Hive的HQL执行过程,加深对Hive底层实现的理解。绝对不能像肤浅的SQL Boy们那样,只停留在表面的几句SQL,距离真正意义的大数据开发还有很长一段路要走。
入口
需要注意:虽然里外都叫hive和beeline脚本,但是bin和bin/ext的一定多少有所差异,不然谁也不会闲的蛋疼特意用路径区分。
Beeline的真实入口
[root@zhiyong2 ext]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin/ext
[root@zhiyong2 ext]# cat beeline.sh
# Need arguments [host [port [db]]]
THISSERVICE=beeline
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
beeline () {
CLASS=org.apache.hive.beeline.BeeLine;
# include only the beeline client jar and its dependencies
beelineJarPath=`ls ${HIVE_LIB}/hive-beeline-*.jar`
superCsvJarPath=`ls ${HIVE_LIB}/super-csv-*.jar`
jlineJarPath=`ls ${HIVE_LIB}/jline-*.jar`
hadoopClasspath=""
if [[ -n "${HADOOP_CLASSPATH}" ]]
then
hadoopClasspath="${HADOOP_CLASSPATH}:"
fi
export HADOOP_CLASSPATH="${hadoopClasspath}${HIVE_CONF_DIR}:${beelineJarPath}:${superCsvJarPath}:${jlineJarPath}"
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=beeline-log4j2.properties "
exec $HADOOP jar ${beelineJarPath} $CLASS $HIVE_OPTS "$@"
}
beeline_help () {
beeline "--help"
}
从这个写死的org.apache.hive.beeline.BeeLine类库可以看出,之前之前分析Hive3.1.2的Beeline执行过程:lizhiyong.blog.csdn.net/article/details/126634843
盲猜的入口类无误,此处验证了之前的猜测。此处不再赘述。
Cli的真实入口
[root@zhiyong2 util]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin/ext
[root@zhiyong2 ext]# cat cl
cleardanglingscratchdir.sh cli.sh
[root@zhiyong2 ext]# cat cli.sh
THISSERVICE=cli
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
# Set old CLI as the default client
# if USE_DEPRECATED_CLI is not set or is not equal to false use old CLI
if [ -z "$USE_DEPRECATED_CLI" ] || [ "$USE_DEPRECATED_CLI" != "false" ]; then
USE_DEPRECATED_CLI="true"
fi
updateCli() {
if [ "$USE_DEPRECATED_CLI" == "true" ]; then
export HADOOP_CLIENT_OPTS=" -Dproc_hivecli $HADOOP_CLIENT_OPTS "
CLASS=org.apache.hadoop.hive.cli.CliDriver
JAR=hive-cli-*.jar
else
export HADOOP_CLIENT_OPTS=" -Dproc_beeline $HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=beeline-log4j2.properties"
CLASS=org.apache.hive.beeline.cli.HiveCli
JAR=hive-beeline-*.jar
fi
}
cli () {
updateCli
execHiveCmd $CLASS $JAR "$@"
}
cli_help () {
updateCli
execHiveCmd $CLASS $JAR "--help"
}
从这里可以看出,Hive Cli的真实入口有2个:org.apache.hive.beeline.cli.HiveCli和org.apache.hadoop.hive.cli.CliDriver。
[root@zhiyong2 util]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin/ext/util
[root@zhiyong2 util]# ll
总用量 4
-rwxrwxrwx. 1 hadoop hadoop 1460 8月 23 2019 execHiveCmd.sh
[root@zhiyong2 util]# cat execHiveCmd.sh
CLI_JAR="hive-cli-*.jar"
BEELINE_JAR="hive-beeline-*.jar"
execHiveCmd () {
CLASS=$1;
shift;
# if jar is not passed as parameter use corresponding cli jar
if [ "$1" == "$CLI_JAR" ] || [ "$1" == "$BEELINE_JAR" ]; then
JAR="$1"
shift;
else
if [ "$USE_DEPRECATED_CLI" == "true" ]; then
JAR="$CLI_JAR"
else
JAR="$BEELINE_JAR"
fi
fi
# cli specific code
if [ ! -f ${HIVE_LIB}/$JAR ]; then
echo "Missing $JAR Jar"
exit 3;
fi
if $cygwin; then
HIVE_LIB=`cygpath -w "$HIVE_LIB"`
fi
# hadoop 20 or newer - skip the aux_jars option. picked up from hiveconf
exec $HADOOP jar ${HIVE_LIB}/$JAR $CLASS $HIVE_OPTS "$@"
}
当然这个是具体执行jar包的脚本,根据USE_DEPRECATED_CLI来判断是用淘汰的cli的Jar包还是新的BeeLine的Jar包。
BeeLine和Hive Cli的区别
此处科普一下区别:BeeLine更新,且轻量级【是一个纯Client】,SQL命令会先走JDBC传给Hive Server2,然后Hive Server2再去访问Meta Store,更容易实现权限管控。Hive Cli是淘汰的老版本,比较重量级【本地编译HQL】,SQL命令编译好以后直接访问Meta Store。显然Hive Cli提供的2种方式,也就给了用户可选择的余地。
Hive Cli的BeeLine真实入口
package org.apache.hive.beeline.cli;
import org.apache.hive.beeline.BeeLine;
import java.io.IOException;
import java.io.InputStream;
public class HiveCli {
private BeeLine beeLine;
public static void main(String[] args) throws IOException {
int status = new HiveCli().runWithArgs(args, null);
System.exit(status);
}
public int runWithArgs(String[] cmd, InputStream inputStream) throws IOException {
beeLine = new BeeLine(false);
try {
return beeLine.begin(cmd, inputStream);
} finally {
beeLine.close();
}
}
}
结合之前分析Hive3.1.2的Beeline执行过程:lizhiyong.blog.csdn.net/article/details/126634843
Hive Cli的入口和BeeLine的入口调用的方法底层都是:beeLine.begin(cmd, inputStream),但是初始化BeeLine时给构造方法传的参数不同【Hive Cli的isBeeLine=false】,也就导致了底层执行的分支不同:
if (isBeeLine) {
int code = initArgs(args);
if (code != 0) {
return code;
}
} else {
int code = initArgsFromCliVars(args);
if (code != 0 || exit) {
return code;
}
defaultConnect(false);
}
对应的初始化方法也不同:
int initArgsFromCliVars(String[] args) {
List<String> commands = Collections.emptyList();
CliOptionsProcessor optionsProcessor = new CliOptionsProcessor();
if (!optionsProcessor.process(args)) {
return 1;
}
CommandLine commandLine = optionsProcessor.getCommandLine();
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
setHiveConfVar(propKey, confProps.getProperty(propKey));
}
Properties hiveVars = commandLine.getOptionProperties("define");
for (String propKey : hiveVars.stringPropertyNames()) {
getOpts().getHiveConfVariables().put(propKey, hiveVars.getProperty(propKey));
}
Properties hiveVars2 = commandLine.getOptionProperties("hivevar");
for (String propKey : hiveVars2.stringPropertyNames()) {
getOpts().getHiveConfVariables().put(propKey, hiveVars2.getProperty(propKey));
}
getOpts().setScriptFile(commandLine.getOptionValue("f"));
if (commandLine.getOptionValues("i") != null) {
getOpts().setInitFiles(commandLine.getOptionValues("i"));
}
dbName = commandLine.getOptionValue("database");
getOpts().setVerbose(Boolean.parseBoolean(commandLine.getOptionValue("verbose")));
getOpts().setSilent(Boolean.parseBoolean(commandLine.getOptionValue("silent")));
int code = 0;
if (commandLine.getOptionValues("e") != null) {
commands = Arrays.asList(commandLine.getOptionValues("e"));
}
if (!commands.isEmpty() && getOpts().getScriptFile() != null) {
System.err.println("The '-e' and '-f' options cannot be specified simultaneously");
optionsProcessor.printCliUsage();
return 1;
}
if (!commands.isEmpty()) {
embeddedConnect();
connectDBInEmbededMode();
for (Iterator<String> i = commands.iterator(); i.hasNext(); ) {
String command = i.next().toString();
debug(loc("executing-command", command));
if (!dispatch(command)) {
code++;
}
}
exit = true; // execute and exit
}
return code;
}
最后还是通过dispatch方法分发:
boolean dispatch(String line) {
if (line == null) {
// exit
exit = true;
return true;
}
if (line.trim().length() == 0) {
return true;
}
if (isComment(line)) {
return true;
}
line = line.trim();
// save it to the current script, if any
if (scriptOutputFile != null) {
scriptOutputFile.addLine(line);
}
if (isHelpRequest(line)) {
line = "!help";
}
if (isBeeLine) {
if (line.startsWith(COMMAND_PREFIX)) {
// handle SQLLine command in beeline which starts with ! and does not end with ;
return execCommandWithPrefix(line);
} else {
return commands.sql(line, getOpts().getEntireLineAsCommand());
}
} else {
return commands.sql(line, getOpts().getEntireLineAsCommand());
}
}
然后执行了commands.sql(line, getOpts().getEntireLineAsCommand())方法:
public boolean sql(String line, boolean entireLineAsCommand) {
return execute(line, false, entireLineAsCommand);
}
之后就和BeeLine半差不差,走JDBC执行命令。再然后executeFile和最终的execute也会被执行。和普通的BeeLine并没有太大的差距。不再赘述。
Hive Cli的老版本真实入口
老版本倒也不是一无是处,由于是本地编译HQL,所以顺藤摸瓜可以找到HQL解析MapReduce、Tez、Spark任务的入口。
package org.apache.hadoop.hive.cli;
public class CliDriver {
public static void main(String[] args) throws Exception {
int ret = new CliDriver().run(args);
System.exit(ret);
}
public int run(String[] args) throws Exception {
OptionsProcessor oproc = new OptionsProcessor();
if (!oproc.process_stage1(args)) {
return 1;
}
// NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
boolean logInitFailed = false;
String logInitDetailMessage;
try {
logInitDetailMessage = LogUtils.initHiveLog4j();
} catch (LogInitializationException e) {
logInitFailed = true;
logInitDetailMessage = e.getMessage();
}
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
try {
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.info = new PrintStream(System.err, true, "UTF-8");
ss.err = new CachingPrintStream(System.err, true, "UTF-8");
} catch (UnsupportedEncodingException e) {
return 3;
}
if (!oproc.process_stage2(ss)) {
return 2;
}
if (!ss.getIsSilent()) {
if (logInitFailed) {
System.err.println(logInitDetailMessage);
} else {
SessionState.getConsole().printInfo(logInitDetailMessage);
}
}
// set all properties specified via command line
HiveConf conf = ss.getConf();
for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
ss.getOverriddenConfigurations().put((String) item.getKey(), (String) item.getValue());
}
// read prompt configuration and substitute variables.
prompt = conf.getVar(HiveConf.ConfVars.CLIPROMPT);
prompt = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(conf, prompt);
prompt2 = spacesForString(prompt);
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {
// Start the session in a fire-and-forget manner. When the asynchronously initialized parts of
// the session are needed, the corresponding getters and other methods will wait as needed.
SessionState.beginStart(ss, console);
} else {
SessionState.start(ss);
}
ss.updateThreadName();
// Create views registry
HiveMaterializedViewsRegistry.get().init();
// execute cli driver work
try {
return executeDriver(ss, conf, oproc);
} finally {
ss.resetThreadName();
ss.close();
}
}
}
从这里就找到了Hive Cli的老版本真实入口。executeDriver(ss, conf, oproc)这个方法就是SQL具体转换为运算引擎任务的方法。
此处new出来的HiveConf非常重要,存储了所有Hive的配置项【HiveConf.java有5000多行,需要时自行查找即可】。Hive在beeline的所有Hadoop及Hive的配置项可以参考:lizhiyong.blog.csdn.net/article/details/126634922
当然后续的SQL转换运算引擎任务就都是顺着executeDriver(ss, conf, oproc)这个方法。
执行Cli工作的主方法executeDriver
package org.apache.hadoop.hive.cli;
public class CliDriver {}
/**
* Execute the cli work
* @param ss CliSessionState of the CLI driver
* @param conf HiveConf for the driver session
* @param oproc Operation processor of the CLI invocation
* @return status of the CLI command execution
* @throws Exception
*/
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
throws Exception {
CliDriver cli = new CliDriver();
cli.setHiveVariables(oproc.getHiveVariables());
// use the specified database if specified
cli.processSelectDatabase(ss);
// Execute -i init files (always in silent mode)
cli.processInitFiles(ss);
if (ss.execString != null) {
int cmdProcessStatus = cli.processLine(ss.execString);
return cmdProcessStatus;
}
try {
if (ss.fileName != null) {
return cli.processFile(ss.fileName);
}
} catch (FileNotFoundException e) {
System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
return 3;
}
if ("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE))) {
console.printInfo(HiveConf.generateMrDeprecationWarning());
}
setupConsoleReader();
String line;
int ret = 0;
String prefix = "";
String curDB = getFormattedDb(conf, ss);
String curPrompt = prompt + curDB;
String dbSpaces = spacesForString(curDB);
while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
if (line.trim().startsWith("--")) {
continue;
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
ret = cli.processLine(line, true);
prefix = "";
curDB = getFormattedDb(conf, ss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
} else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}
return ret;
}
初始化-i等提供的参数这些不是重点。
其中根据这个配置:
package org.apache.hadoop.hive.conf;
public class HiveConf extends Configuration {}
HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet(true, "mr", "tez", "spark"),
"Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR\n" +
"remains the default engine for historical reasons, it is itself a historical engine\n" +
"and is deprecated in Hive 2 line. It may be removed without further warning.")
public static String generateMrDeprecationWarning() {
return "Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. "
+ "Consider using a different execution engine (i.e. " + HiveConf.getNonMrEngines()
+ ") or using Hive 1.X releases.";
}
当运算引擎是默认的mr时就会报这种喜闻乐见的警告【但是至今也还能用,只不过做类似insert的操作时,容易报错】。
修改执行引擎参考:lizhiyong.blog.csdn.net/article/details/123436630
根据cloudera的描述,目前CDP7只留下Hive On Tez:https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/hive-introduction/topics/hive-unsupported.html
所以本文主要会关注Tez而非MapReduce和Spark。主要是运算引擎的不同,解析AST之类的操作还是相同的。
简单分析后可以定位到ret = cli.processLine(line, true),这个方法明显是具体执行拆分后的SQL语句的。
执行拆分后SQL的processLine方法
package org.apache.hadoop.hive.cli;
public class CliDriver {}
/**
* Processes a line of semicolon separated commands
*
* @param line
* The commands to process
* @param allowInterrupting
* When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and
* returning -1
* @return 0 if ok
*/
public int processLine(String line, boolean allowInterrupting) {
SignalHandler oldSignal = null;
Signal interruptSignal = null;
if (allowInterrupting) {
// Remember all threads that were running at the time we started line processing.
// Hook up the custom Ctrl+C handler while processing this line
interruptSignal = new Signal("INT");
oldSignal = Signal.handle(interruptSignal, new SignalHandler() {
private boolean interruptRequested;
@Override
public void handle(Signal signal) {
boolean initialRequest = !interruptRequested;
interruptRequested = true;
// Kill the VM on second ctrl+c
if (!initialRequest) {
console.printInfo("Exiting the JVM");
System.exit(127);
}
// Interrupt the CLI thread to stop the current statement and return
// to prompt
console.printInfo("Interrupting... Be patient, this might take some time.");
console.printInfo("Press Ctrl+C again to kill JVM");
// First, kill any running MR jobs
HadoopJobExecHelper.killRunningJobs();
TezJobExecHelper.killRunningJobs();
HiveInterruptUtils.interrupt();
}
});
}
try {
int lastRet = 0, ret = 0;
// we can not use "split" function directly as ";" may be quoted
List<String> commands = splitSemiColon(line);
String command = "";
for (String oneCmd : commands) {
if (StringUtils.endsWith(oneCmd, "\\")) {
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
if (StringUtils.isBlank(command)) {
continue;
}
ret = processCmd(command);
command = "";
lastRet = ret;
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
return ret;
}
}
return lastRet;
} finally {
// Once we are done processing the line, restore the old handler
if (oldSignal != null && interruptSignal != null) {
Signal.handle(interruptSignal, oldSignal);
}
}
}
这个方法检测到ctrl+C就强制结束MapReduce和Tez的任务,但是没有看到kill掉Spark任务的方法也是神奇,不过Hive On Spark正常情况没什么人吧?Spark On Hive的Spark SQL好像还更科学一些。
之后就会执行ret = processCmd(command)方法,执行具体的命令行操作。
执行命令行的processCmd方法
package org.apache.hadoop.hive.cli;
public class CliDriver {}
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
ss.updateThreadName();
// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
// if we have come this far - either the previous commands
// are all successful or this is command line. in either case
// this counts as a successful run
ss.close();
System.exit(0);
} else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
cmd_1 = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), cmd_1);
File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
} else {
try {
ret = processFile(cmd_1);
} catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
}
} else if (cmd_trimmed.startsWith("!")) {
// for shell commands, use unstripped command
String shell_cmd = cmd.trim().substring(1);
shell_cmd = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), shell_cmd);
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
ret = executor.execute();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
} else { // local mode
try {
try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) {
if (proc instanceof IDriver) {
// Let Driver strip comments using sql parser
ret = processLocalCmd(cmd, proc, ss);
} else {
ret = processLocalCmd(cmd_trimmed, proc, ss);
}
}
} catch (SQLException e) {
console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
ss.resetThreadName();
return ret;
}
这个方法会根据exit和quit关键字关闭会话,结束cli。之后根据是否解析出source关键字、是否“!”开头做了一些骚操作,不是重点。最后一个else分支中try的操作才是需要关注的核心。首先根据配置项构建CommandProcessor,如果proc是IDriver类的实例对象就传参cmd否则传参cmd_trimeed,都是执行processLocalCmd方法。这个IDriver显然是个很重要的类。
首先CommandProcessor是个接口类:
而IDriver是其一个继承的接口类。除了IDriver,剩下的都是普通实现类。显然这普通实现类就是执行add jar这类非SQL命令的处理器,HQL具体执行还是依靠IDriver:
package org.apache.hadoop.hive.ql;
/**
* Hive query executer driver
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface IDriver extends CommandProcessor {
int compile(String string);
CommandProcessorResponse compileAndRespond(String statement);
QueryPlan getPlan();
QueryDisplay getQueryDisplay();
void setOperationId(String guid64);
CommandProcessorResponse run();
@Override
CommandProcessorResponse run(String command);
// create some "cover" to the result?
boolean getResults(List res) throws IOException;
void setMaxRows(int maxRows);
FetchTask getFetchTask();
Schema getSchema();
boolean isFetchingTable();
void resetFetch() throws IOException;
// close&destroy is used in seq coupling most of the time - the difference is either not clear; or not relevant - remove?
@Override
void close();
void destroy();
HiveConf getConf();
Context getContext();
boolean hasResultSet();
}
这个接口类有编译compile方法和获取执行计划getPlan方法,显然是需要关注的类。
执行本地命令行的processLocalCmd方法
package org.apache.hadoop.hive.cli;
public class CliDriver {}
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
boolean escapeCRLF = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_ESCAPE_CRLF);
int ret = 0;
if (proc != null) {
if (proc instanceof IDriver) {
IDriver qp = (IDriver) proc;
PrintStream out = ss.out;
long start = System.currentTimeMillis();
if (ss.getIsVerbose()) {
out.println(cmd);
}
ret = qp.run(cmd).getResponseCode();
if (ret != 0) {
qp.close();
return ret;
}
// query has run capture the time
long end = System.currentTimeMillis();
double timeTaken = (end - start) / 1000.0;
ArrayList<String> res = new ArrayList<String>();
printHeader(qp, out);
// print the results
int counter = 0;
try {
if (out instanceof FetchConverter) {
((FetchConverter) out).fetchStarted();
}
while (qp.getResults(res)) {
for (String r : res) {
if (escapeCRLF) {
r = EscapeCRLFHelper.escapeCRLF(r);
}
out.println(r);
}
counter += res.size();
res.clear();
if (out.checkError()) {
break;
}
}
} catch (IOException e) {
console.printError("Failed with exception " + e.getClass().getName() + ":" + e.getMessage(),
"\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
qp.close();
if (out instanceof FetchConverter) {
((FetchConverter) out).fetchFinished();
}
console.printInfo(
"Time taken: " + timeTaken + " seconds" + (counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
} else {
String firstToken = tokenizeCmd(cmd.trim())[0];
String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());
if (ss.getIsVerbose()) {
ss.out.println(firstToken + " " + cmd_1);
}
CommandProcessorResponse res = proc.run(cmd_1);
if (res.getResponseCode() != 0) {
ss.out
.println("Query returned non-zero code: " + res.getResponseCode() + ", cause: " + res.getErrorMessage());
}
if (res.getConsoleMessages() != null) {
for (String consoleMsg : res.getConsoleMessages()) {
console.printInfo(consoleMsg);
}
}
ret = res.getResponseCode();
}
}
return ret;
}
这个方法又有2个分支:如果传入的proc是IDriver类的实例对象就强转,然后执行Run方法并获取返回码。显示结果和关闭对象的操作不是重点。
如果不是IDriver类的实例对象就解析参数,然后同样是执行run方法,显然最后肯定是执行了各种不同的实现类具体的实现方法。需要关注的当然就是IDriver的run方法。
Driver的run方法
首先查看接口的实现类:
可以看到有Driver和ReExecDriver2种实现类,根据名称就可以知道,需要先看Driver类再去看重新执行的ReExecDriver类。其实只需要关注Driver类的run方法就好:
package org.apache.hadoop.hive.ql;
public class Driver implements IDriver {}
public CommandProcessorResponse run(String command) {
return run(command, false);
}
public CommandProcessorResponse run(String command, boolean alreadyCompiled) {
try {
runInternal(command, alreadyCompiled);
return createProcessorResponse(0);
} catch (CommandProcessorResponse cpr) {
SessionState ss = SessionState.get();
if(ss == null) {
return cpr;
}
MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());
if(!(mdf instanceof JsonMetaDataFormatter)) {
return cpr;
}
/*Here we want to encode the error in machine readable way (e.g. JSON)
* Ideally, errorCode would always be set to a canonical error defined in ErrorMsg.
* In practice that is rarely the case, so the messy logic below tries to tease
* out canonical error code if it can. Exclude stack trace from output when
* the error is a specific/expected one.
* It's written to stdout for backward compatibility (WebHCat consumes it).*/
try {
if(downstreamError == null) {
mdf.error(ss.out, errorMessage, cpr.getResponseCode(), SQLState);
return cpr;
}
ErrorMsg canonicalErr = ErrorMsg.getErrorMsg(cpr.getResponseCode());
if(canonicalErr != null && canonicalErr != ErrorMsg.GENERIC_ERROR) {
/*Some HiveExceptions (e.g. SemanticException) don't set
canonical ErrorMsg explicitly, but there is logic
(e.g. #compile()) to find an appropriate canonical error and
return its code as error code. In this case we want to
preserve it for downstream code to interpret*/
mdf.error(ss.out, errorMessage, cpr.getResponseCode(), SQLState, null);
return cpr;
}
if(downstreamError instanceof HiveException) {
HiveException rc = (HiveException) downstreamError;
mdf.error(ss.out, errorMessage,
rc.getCanonicalErrorMsg().getErrorCode(), SQLState,
rc.getCanonicalErrorMsg() == ErrorMsg.GENERIC_ERROR ?
org.apache.hadoop.util.StringUtils.stringifyException(rc)
: null);
}
else {
ErrorMsg canonicalMsg =
ErrorMsg.getErrorMsg(downstreamError.getMessage());
mdf.error(ss.out, errorMessage, canonicalMsg.getErrorCode(),
SQLState, org.apache.hadoop.util.StringUtils.
stringifyException(downstreamError));
}
}
catch(HiveException ex) {
console.printError("Unable to JSON-encode the error",
org.apache.hadoop.util.StringUtils.stringifyException(ex));
}
return cpr;
}
}
当然此时alreadyCompiled=false,处于未编译状态。真正有用的也就是runInternal(command, alreadyCompiled)方法。
Driver的runInternal方法
package org.apache.hadoop.hive.ql;
public class Driver implements IDriver {}
private void runInternal(String command, boolean alreadyCompiled) throws CommandProcessorResponse {
errorMessage = null;
SQLState = null;
downstreamError = null;
LockedDriverState.setLockedDriverState(lDrvState);
lDrvState.stateLock.lock();
try {
if (alreadyCompiled) {
if (lDrvState.driverState == DriverState.COMPILED) {
lDrvState.driverState = DriverState.EXECUTING;
} else {
errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
console.printError(errorMessage);
throw createProcessorResponse(12);
}
} else {
lDrvState.driverState = DriverState.COMPILING;
}
} finally {
lDrvState.stateLock.unlock();
}
// a flag that helps to set the correct driver state in finally block by tracking if
// the method has been returned by an error or not.
boolean isFinishedWithError = true;
try {
HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf,
alreadyCompiled ? ctx.getCmd() : command);
// Get all the driver run hooks and pre-execute them.
try {
hookRunner.runPreDriverHooks(hookContext);
} catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
throw createProcessorResponse(12);
}
PerfLogger perfLogger = null;
if (!alreadyCompiled) {
// compile internal will automatically reset the perf logger
compileInternal(command, true);
// then we continue to use this perf logger
perfLogger = SessionState.getPerfLogger();
} else {
// reuse existing perf logger.
perfLogger = SessionState.getPerfLogger();
// Since we're reusing the compiled plan, we need to update its start time for current run
plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
}
// the reason that we set the txn manager for the cxt here is because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
ctx.setHiveTxnManager(queryTxnMgr);
checkInterrupted("at acquiring the lock.", null, null);
lockAndRespond();
try {
if (!isValidTxnListState()) {
LOG.info("Compiling after acquiring locks");
// Snapshot was outdated when locks were acquired, hence regenerate context,
// txn list and retry
// TODO: Lock acquisition should be moved before analyze, this is a bit hackish.
// Currently, we acquire a snapshot, we compile the query wrt that snapshot,
// and then, we acquire locks. If snapshot is still valid, we continue as usual.
// But if snapshot is not valid, we recompile the query.
retrial = true;
backupContext.addRewrittenStatementContext(ctx);
backupContext.setHiveLocks(ctx.getHiveLocks());
ctx = backupContext;
conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString());
if (plan.hasAcidResourcesInQuery()) {
recordValidWriteIds(queryTxnMgr);
}
if (!alreadyCompiled) {
// compile internal will automatically reset the perf logger
compileInternal(command, true);
} else {
// Since we're reusing the compiled plan, we need to update its start time for current run
plan.setQueryStartTime(queryDisplay.getQueryStartTime());
}
if (!isValidTxnListState()) {
// Throw exception
throw handleHiveException(new HiveException("Operation could not be executed"), 14);
}
//Reset the PerfLogger
perfLogger = SessionState.getPerfLogger(true);
// the reason that we set the txn manager for the cxt here is because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
ctx.setHiveTxnManager(queryTxnMgr);
}
} catch (LockException e) {
throw handleHiveException(e, 13);
}
try {
execute();
} catch (CommandProcessorResponse cpr) {
rollback(cpr);
throw cpr;
}
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
//since set autocommit starts an implicit txn, close it
if(queryTxnMgr.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
releaseLocksAndCommitOrRollback(true);
}
else if(plan.getOperation() == HiveOperation.ROLLBACK) {
releaseLocksAndCommitOrRollback(false);
}
else {
//txn (if there is one started) is not finished
}
} catch (LockException e) {
throw handleHiveException(e, 12);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
queryDisplay.setPerfLogStarts(QueryDisplay.Phase.EXECUTION, perfLogger.getStartTimes());
queryDisplay.setPerfLogEnds(QueryDisplay.Phase.EXECUTION, perfLogger.getEndTimes());
// Take all the driver run hooks and post-execute them.
try {
hookRunner.runPostDriverHooks(hookContext);
} catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
throw createProcessorResponse(12);
}
isFinishedWithError = false;
} finally {
if (lDrvState.isAborted()) {
closeInProcess(true);
} else {
// only release the related resources ctx, driverContext as normal
releaseResources();
}
lDrvState.stateLock.lock();
try {
lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED;
} finally {
lDrvState.stateLock.unlock();
}
}
}
这里最重要的就是compileInternal方法去编译HQL,以及execute方法具体执行。
Driver的complieInternal
package org.apache.hadoop.hive.ql;
public class Driver implements IDriver {}
private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
Metrics metrics = MetricsFactory.getInstance();
if (metrics != null) {
metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
}
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.WAIT_COMPILE);
final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
command);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.WAIT_COMPILE);
if (metrics != null) {
metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
}
if (compileLock == null) {
throw createProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode());
}
try {
compile(command, true, deferClose);
} catch (CommandProcessorResponse cpr) {
try {
releaseLocksAndCommitOrRollback(false);
} catch (LockException e) {
LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e));
}
throw cpr;
} finally {
compileLock.unlock();
}
//Save compile-time PerfLogging for WebUI.
//Execution-time Perf logs are done by either another thread's PerfLogger
//or a reset PerfLogger.
queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());
queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());
}
显然还要执行compile方法。
Driver的compile方法
// deferClose indicates if the close/destroy should be deferred when the process has been
// interrupted, it should be set to true if the compile is called within another method like
// runInternal, which defers the close to the called in that method.
private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
lDrvState.stateLock.lock();
try {
lDrvState.driverState = DriverState.COMPILING;
} finally {
lDrvState.stateLock.unlock();
}
command = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(conf, command);
String queryStr = command;
try {
// command should be redacted to avoid to logging sensitive data
queryStr = HookUtils.redactLogString(conf, command);
} catch (Exception e) {
LOG.warn("WARNING! Query command could not be redacted." + e);
}
checkInterrupted("at beginning of compilation.", null, null);
if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
// close the existing ctx etc before compiling a new query, but does not destroy driver
closeInProcess(false);
}
if (resetTaskIds) {
TaskFactory.resetId();
}
LockedDriverState.setLockedDriverState(lDrvState);
String queryId = queryState.getQueryId();
if (ctx != null) {
setTriggerContext(queryId);
}
//save some info for webUI for use after plan is freed
this.queryDisplay.setQueryStr(queryStr);
this.queryDisplay.setQueryId(queryId);
LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);
conf.setQueryString(queryStr);
// FIXME: sideeffect will leave the last query set at the session level
if (SessionState.get() != null) {
SessionState.get().getConf().setQueryString(queryStr);
SessionState.get().setupQueryCurrentTimestamp();
}
// Whether any error occurred during query compilation. Used for query lifetime hook.
boolean compileError = false;
boolean parseError = false;
try {
// Initialize the transaction manager. This must be done before analyze is called.
if (initTxnMgr != null) {
queryTxnMgr = initTxnMgr;
} else {
queryTxnMgr = SessionState.get().initTxnMgr(conf);
}
if (queryTxnMgr instanceof Configurable) {
((Configurable) queryTxnMgr).setConf(conf);
}
queryState.setTxnManager(queryTxnMgr);
// In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks
// if compile is being called multiple times, clear the old shutdownhook
ShutdownHookManager.removeShutdownHook(shutdownRunner);
final HiveTxnManager txnMgr = queryTxnMgr;
shutdownRunner = new Runnable() {
@Override
public void run() {
try {
releaseLocksAndCommitOrRollback(false, txnMgr);
} catch (LockException e) {
LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " +
e.getMessage());
}
}
};
ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
checkInterrupted("before parsing and analysing the query", null, null);
if (ctx == null) {
ctx = new Context(conf);
setTriggerContext(queryId);
}
ctx.setHiveTxnManager(queryTxnMgr);
ctx.setStatsSource(statsSource);
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
// Trigger query hook before compilation
hookRunner.runBeforeParseHook(command);
ASTNode tree;
try {
tree = ParseUtils.parse(command, ctx);
} catch (ParseException e) {
parseError = true;
throw e;
} finally {
hookRunner.runAfterParseHook(command, parseError);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
hookRunner.runBeforeCompileHook(command);
// clear CurrentFunctionsInUse set, to capture new set of functions
// that SemanticAnalyzer finds are in use
SessionState.get().getCurrentFunctionsInUse().clear();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
// Flush the metastore cache. This assures that we don't pick up objects from a previous
// query running in this same thread. This has to be done after we get our semantic
// analyzer (this is when the connection to the metastore is made) but before we analyze,
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
backupContext = new Context(ctx);
boolean executeHooks = hookRunner.hasPreAnalyzeHooks();
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
if (executeHooks) {
hookCtx.setConf(conf);
hookCtx.setUserName(userName);
hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
hookCtx.setCommand(command);
hookCtx.setHiveOperation(queryState.getHiveOperation());
tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree);
}
// Do semantic analysis and plan generation
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
if (!retrial) {
openTransaction();
generateValidTxnList();
}
sem.analyze(tree, ctx);
if (executeHooks) {
hookCtx.update(sem);
hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
}
LOG.info("Semantic Analysis Completed (retrial = {})", retrial);
// Retrieve information about cache usage for the query.
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
cacheUsage = sem.getCacheUsage();
}
// validate the plan
sem.validate();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
checkInterrupted("after analyzing query.", null, null);
// get the output schema
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
queryState.getHiveOperation(), schema);
conf.set("mapreduce.workflow.id", "hive_" + queryId);
conf.set("mapreduce.workflow.name", queryStr);
// initialize FetchTask right here
if (plan.getFetchTask() != null) {
plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());
}
//do the authorization check
if (!sem.skipAuthorization() &&
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
try {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
doAuthorization(queryState.getHiveOperation(), sem, command);
} catch (AuthorizationException authExp) {
console.printError("Authorization failed:" + authExp.getMessage()
+ ". Use SHOW GRANT to get more details.");
errorMessage = authExp.getMessage();
SQLState = "42000";
throw createProcessorResponse(403);
} finally {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
}
}
if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
String explainOutput = getExplainOutput(sem, plan, tree);
if (explainOutput != null) {
LOG.info("EXPLAIN output for queryid " + queryId + " : "
+ explainOutput);
if (conf.isWebUiQueryInfoCacheEnabled()) {
queryDisplay.setExplainPlan(explainOutput);
}
}
}
} catch (CommandProcessorResponse cpr) {
throw cpr;
} catch (Exception e) {
checkInterrupted("during query compilation: " + e.getMessage(), null, null);
compileError = true;
ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
errorMessage = "FAILED: " + e.getClass().getSimpleName();
if (error != ErrorMsg.GENERIC_ERROR) {
errorMessage += " [Error " + error.getErrorCode() + "]:";
}
// HIVE-4889
if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) {
errorMessage += " " + e.getCause().getMessage();
} else {
errorMessage += " " + e.getMessage();
}
if (error == ErrorMsg.TXNMGR_NOT_ACID) {
errorMessage += ". Failed command: " + queryStr;
}
SQLState = error.getSQLState();
downstreamError = e;
console.printError(errorMessage, "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
throw createProcessorResponse(error.getErrorCode());
} finally {
// Trigger post compilation hook. Note that if the compilation fails here then
// before/after execution hook will never be executed.
if (!parseError) {
try {
hookRunner.runAfterCompilationHook(command, compileError);
} catch (Exception e) {
LOG.warn("Failed when invoking query after-compilation hook.", e);
}
}
double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00;
ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation");
queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);
boolean isInterrupted = lDrvState.isAborted();
if (isInterrupted && !deferClose) {
closeInProcess(true);
}
lDrvState.stateLock.lock();
try {
if (isInterrupted) {
lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;
} else {
lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;
}
} finally {
lDrvState.stateLock.unlock();
}
if (isInterrupted) {
LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
} else {
LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
}
}
}
这里通过tree = ParseUtils.parse(command, ctx)解析到AST树,之后刷新了函数及MetaStore。然后还要sem.analyze(tree, ctx)方法进行语义分析,sem.validate()方法做计划生成。
这个BaseSemanticAnalyzer类其实是个抽象类:
package org.apache.hadoop.hive.ql.parse;
public abstract class BaseSemanticAnalyzer {}
public void analyze(ASTNode ast, Context ctx) throws SemanticException {
initCtx(ctx);
init(true);
analyzeInternal(ast);
}
public void validate() throws SemanticException {
// Implementations may choose to override this
}
自然没什么用,还要看实现类overwrite的方法怎么实现。
根据名称就可以看出,有解析DDL的SemanticAnalyzer、解析Explain命令的ExplainSemanticAnalyzer、解析UDF的FunctionSemanticAnalyzer,当然最主要的就是SemanticAnalyzer,这个是解析常规SQL的。
至此获取到执行计划。
Driver的execute方法
package org.apache.hadoop.hive.ql;
public class Driver implements IDriver {}
private void execute() throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME));
int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
Metrics metrics = MetricsFactory.getInstance();
String queryId = queryState.getQueryId();
// Get the query string from the conf file as the compileInternal() method might
// hide sensitive information during query redaction.
String queryStr = conf.getQueryString();
lDrvState.stateLock.lock();
try {
// if query is not in compiled state, or executing state which is carried over from
// a combined compile/execute in runInternal, throws the error
if (lDrvState.driverState != DriverState.COMPILED &&
lDrvState.driverState != DriverState.EXECUTING) {
SQLState = "HY008";
errorMessage = "FAILED: unexpected driverstate: " + lDrvState + ", for query " + queryStr;
console.printError(errorMessage);
throw createProcessorResponse(1000);
} else {
lDrvState.driverState = DriverState.EXECUTING;
}
} finally {
lDrvState.stateLock.unlock();
}
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
HookContext hookContext = null;
// Whether there's any error occurred during query execution. Used for query lifetime hook.
boolean executionError = false;
try {
LOG.info("Executing command(queryId=" + queryId + "): " + queryStr);
// compile and execute can get called from different threads in case of HS2
// so clear timing in this thread's Hive object before proceeding.
Hive.get().clearMetaCallTiming();
plan.setStarted();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startQuery(queryStr, queryId);
SessionState.get().getHiveHistory().logPlanProgress(plan);
}
resStream = null;
SessionState ss = SessionState.get();
// TODO: should this use getUserFromAuthenticator?
hookContext = new PrivateHookContext(plan, queryState, ctx.getPathToCS(), SessionState.get().getUserName(),
ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId,
ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, queryInfo, ctx);
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
hookRunner.runPreHooks(hookContext);
// Trigger query hooks before query execution.
hookRunner.runBeforeExecutionHook(queryStr, hookContext);
setQueryDisplays(plan.getRootTasks());
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size()
+ Utilities.getSparkTasks(plan.getRootTasks()).size();
if (jobs > 0) {
logMrWarning(mrJobs);
console.printInfo("Query ID = " + queryId);
console.printInfo("Total jobs = " + jobs);
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
String.valueOf(jobs));
SessionState.get().getHiveHistory().setIdToTableMap(plan.getIdToTableNameMap());
}
String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
// A runtime that launches runnable tasks as separate Threads through
// TaskRunners
// As soon as a task isRunnable, it is put in a queue
// At any time, at most maxthreads tasks can be running
// The main thread polls the TaskRunners to check if they have finished.
checkInterrupted("before running tasks.", hookContext, perfLogger);
DriverContext driverCxt = new DriverContext(ctx);
driverCxt.prepare(plan);
ctx.setHDFSCleanup(true);
this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)
SessionState.get().setMapRedStats(new LinkedHashMap<>());
SessionState.get().setStackTraces(new HashMap<>());
SessionState.get().setLocalMapRedErrors(new HashMap<>());
// Add root Tasks to runnable
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
// This should never happen, if it does, it's a bug with the potential to produce
// incorrect results.
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk);
if (metrics != null) {
tsk.updateTaskMetrics(metrics);
}
}
preExecutionCacheActions();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}
// poll the Tasks to see which one completed
TaskRunner tskRun = driverCxt.pollFinished();
if (tskRun == null) {
continue;
}
/*
This should be removed eventually. HIVE-17814 gives more detail
explanation of whats happening and HIVE-17815 as to why this is done.
Briefly for replication the graph is huge and so memory pressure is going to be huge if
we keep a lot of references around.
*/
String opName = plan.getOperationName();
boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName())
|| opName.equals(HiveOperation.REPLLOAD.getOperationName());
if (!isReplicationOperation) {
hookContext.addCompleteTask(tskRun);
}
queryDisplay.setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult());
Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();
int exitVal = result.getExitVal();
checkInterrupted("when checking the execution result.", hookContext, perfLogger);
if (exitVal != 0) {
Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
if (backupTask != null) {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
console.printError(errorMessage);
errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
console.printError(errorMessage);
// add backup task to runnable
if (DriverContext.isLaunchable(backupTask)) {
driverCxt.addToRunnable(backupTask);
}
continue;
} else {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
if (driverCxt.isShutdown()) {
errorMessage = "FAILED: Operation cancelled. " + errorMessage;
}
invokeFailureHooks(perfLogger, hookContext,
errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
SQLState = "08S01";
// 08S01 (Communication error) is the default sql state. Override the sqlstate
// based on the ErrorMsg set in HiveException.
if (result.getTaskError() instanceof HiveException) {
ErrorMsg errorMsg = ((HiveException) result.getTaskError()).
getCanonicalErrorMsg();
if (errorMsg != ErrorMsg.GENERIC_ERROR) {
SQLState = errorMsg.getSQLState();
}
}
console.printError(errorMessage);
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
throw createProcessorResponse(exitVal);
}
}
driverCxt.finished(tskRun);
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(),
Keys.TASK_RET_CODE, String.valueOf(exitVal));
SessionState.get().getHiveHistory().endTask(queryId, tsk);
}
if (tsk.getChildTasks() != null) {
for (Task<? extends Serializable> child : tsk.getChildTasks()) {
if (DriverContext.isLaunchable(child)) {
driverCxt.addToRunnable(child);
}
}
}
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
postExecutionCacheActions();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
if (driverCxt.isShutdown()) {
SQLState = "HY008";
errorMessage = "FAILED: Operation cancelled";
invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
console.printError(errorMessage);
throw createProcessorResponse(1000);
}
// remove incomplete outputs.
// Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
// remove them
HashSet<WriteEntity> remOutputs = new LinkedHashSet<WriteEntity>();
for (WriteEntity output : plan.getOutputs()) {
if (!output.isComplete()) {
remOutputs.add(output);
}
}
for (WriteEntity output : remOutputs) {
plan.getOutputs().remove(output);
}
hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
hookRunner.runPostExecHooks(hookContext);
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
String.valueOf(0));
SessionState.get().getHiveHistory().printRowCount(queryId);
}
releasePlan(plan);
} catch (CommandProcessorResponse cpr) {
executionError = true;
throw cpr;
} catch (Throwable e) {
executionError = true;
checkInterrupted("during query execution: \n" + e.getMessage(), hookContext, perfLogger);
ctx.restoreOriginalTracker();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
String.valueOf(12));
}
// TODO: do better with handling types of Exception here
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
if (hookContext != null) {
try {
invokeFailureHooks(perfLogger, hookContext, errorMessage, e);
} catch (Exception t) {
LOG.warn("Failed to invoke failure hook", t);
}
}
SQLState = "08S01";
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
throw createProcessorResponse(12);
} finally {
// Trigger query hooks after query completes its execution.
try {
hookRunner.runAfterExecutionHook(queryStr, hookContext, executionError);
} catch (Exception e) {
LOG.warn("Failed when invoking query after execution hook", e);
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().endQuery(queryId);
}
if (noName) {
conf.set(MRJobConfig.JOB_NAME, "");
}
double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00;
ImmutableMap<String, Long> executionHMSTimings = dumpMetaCallTimingWithoutEx("execution");
queryDisplay.setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings);
Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
if (stats != null && !stats.isEmpty()) {
long totalCpu = 0;
console.printInfo("MapReduce Jobs Launched: ");
for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
console.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
totalCpu += entry.getValue().getCpuMSec();
}
console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
}
lDrvState.stateLock.lock();
try {
lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;
} finally {
lDrvState.stateLock.unlock();
}
if (lDrvState.isAborted()) {
LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
} else {
LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
}
}
if (console != null) {
console.printInfo("OK");
}
}
该方法的重点就是TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);这一步会根据运行时的最大线程数循环启动任务。
Driver的launchTask方法
/**
* Launches a new task
*
* @param tsk
* task being launched
* @param queryId
* Id of the query containing the task
* @param noName
* whether the task has a name set
* @param jobname
* name of the task, if it is a map-reduce job
* @param jobs
* number of map-reduce jobs
* @param cxt
* the driver context
*/
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
String jobname, int jobs, DriverContext cxt) throws HiveException {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
}
if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")");
}
conf.set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId());
Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
tsk.initialize(queryState, plan, cxt, ctx.getOpContext());
TaskRunner tskRun = new TaskRunner(tsk);
cxt.launching(tskRun);
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in serial mode");
}
tskRun.runSequential();
}
return tskRun;
}
当HiveConf.java的:
EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel")
这个参数被设置为true时,会并行启动任务。如果保留默认值,就是串行启动任务。
Driver的runSequential方法
package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TaskRunner implementation.
**/
public class TaskRunner extends Thread {
protected Task<? extends Serializable> tsk;
protected TaskResult result;
protected SessionState ss;
private static AtomicLong taskCounter = new AtomicLong(0);
private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() {
@Override
protected Long initialValue() {
return taskCounter.incrementAndGet();
}
};
protected Thread runner;
private static transient final Logger LOG = LoggerFactory.getLogger(TaskRunner.class);
public TaskRunner(Task<? extends Serializable> tsk) {
this.tsk = tsk;
this.result = new TaskResult();
ss = SessionState.get();
}
public Task<? extends Serializable> getTask() {
return tsk;
}
public TaskResult getTaskResult() {
return result;
}
public Thread getRunner() {
return runner;
}
public boolean isRunning() {
return result.isRunning();
}
@Override
public void run() {
runner = Thread.currentThread();
try {
SessionState.start(ss);
runSequential();
} finally {
try {
// Call Hive.closeCurrent() that closes the HMS connection, causes
// HMS connection leaks otherwise.
Hive.closeCurrent();
} catch (Exception e) {
LOG.warn("Exception closing Metastore connection:" + e.getMessage());
}
runner = null;
result.setRunning(false);
}
}
/**
* Launches a task, and sets its exit value in the result variable.
*/
public void runSequential() {
int exitVal = -101;
try {
exitVal = tsk.executeTask(ss == null ? null : ss.getHiveHistory());
} catch (Throwable t) {
if (tsk.getException() == null) {
tsk.setException(t);
}
LOG.error("Error in executeTask", t);
}
result.setExitVal(exitVal);
if (tsk.getException() != null) {
result.setTaskError(tsk.getException());
}
}
public static long getTaskRunnerID () {
return taskRunnerID.get();
}
}
之后调用tsk.executeTask:
package org.apache.hadoop.hive.ql.exec;
/**
* Task implementation.
**/
public abstract class Task<T extends Serializable> implements Serializable, Node {}
/**
* This method is called in the Driver on every task. It updates counters and calls execute(),
* which is overridden in each task
*
* @return return value of execute()
*/
public int executeTask(HiveHistory hiveHistory) {
try {
this.setStarted();
if (hiveHistory != null) {
hiveHistory.logPlanProgress(queryPlan);
}
int retval = execute(driverContext);
this.setDone();
if (hiveHistory != null) {
hiveHistory.logPlanProgress(queryPlan);
}
return retval;
} catch (IOException e) {
throw new RuntimeException("Unexpected error: " + e.getMessage(), e);
}
}
protected abstract int execute(DriverContext driverContext);
之后根据上下文对象执行execute方法。当然这货又是个抽象方法,需要先找到继承类/实现类:
其中3个类:MapredLocalTask、TezTask、SparkTask就是MapReduce、Tez、Spark运算引擎的调用类。
MapredLocalTask
package org.apache.hadoop.hive.ql.exec.mr;
/**
* MapredLocalTask represents any local work (i.e.: client side work) that hive needs to
* execute. E.g.: This is used for generating Hashtables for Mapjoins on the client
* before the Join is executed on the cluster.
*
* MapRedLocalTask does not actually execute the work in process, but rather generates
* a command using ExecDriver. ExecDriver is what will finally drive processing the records.
*/
public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {}
@Override
public int execute(DriverContext driverContext) {
if (conf.getBoolVar(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD)) {
// send task off to another jvm
return executeInChildVM(driverContext);
} else {
// execute in process
return executeInProcess(driverContext);
}
}
该方法还要使用一个HiveConf.java的配置:
SUBMITLOCALTASKVIACHILD("hive.exec.submit.local.task.via.child", true,
"Determines whether local tasks (typically mapjoin hashtable generation phase) runs in \n" +
"separate JVM (true recommended) or not. \n" +
"Avoids the overhead of spawning new JVM, but can lead to out-of-memory issues.")
这种淘汰的MapReduce执行引擎大概看看就好,毕竟淘汰了。。。除了百e千e这种数据量Tez和Spark容易炸,才会不得不退而求其次,跑MapReduce,而且一跑就是十天半个月。。。对于只有千万到几亿这种小体量数据的公司,其实MPP数据库【Doris、StarRocks这种】也跑得动。。。
TezTask
package org.apache.hadoop.hive.ql.exec.tez;
/**
*
* TezTask handles the execution of TezWork. Currently it executes a graph of map and reduce work
* using the Tez APIs directly.
*
*/
@SuppressWarnings({"serial"})
public class TezTask extends Task<TezWork> {}
@Override
public int execute(DriverContext driverContext) {
int rc = 1;
boolean cleanContext = false;
Context ctx = null;
Ref<TezSessionState> sessionRef = Ref.from(null);
try {
// Get or create Context object. If we create it we have to clean it later as well.
ctx = driverContext.getCtx();
if (ctx == null) {
ctx = new Context(conf);
cleanContext = true;
// some DDL task that directly executes a TezTask does not setup Context and hence TriggerContext.
// Setting queryId is messed up. Some DDL tasks have executionId instead of proper queryId.
String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
WmContext wmContext = new WmContext(System.currentTimeMillis(), queryId);
ctx.setWmContext(wmContext);
}
// Need to remove this static hack. But this is the way currently to get a session.
SessionState ss = SessionState.get();
// Note: given that we return pool sessions to the pool in the finally block below, and that
// we need to set the global to null to do that, this "reuse" may be pointless.
TezSessionState session = sessionRef.value = ss.getTezSession();
if (session != null && !session.isOpen()) {
LOG.warn("The session: " + session + " has not been opened");
}
// We only need a username for UGI to use for groups; getGroups will fetch the groups
// based on Hadoop configuration, as documented at
// https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
String userName = getUserNameForGroups(ss);
List<String> groups = null;
if (userName == null) {
userName = "anonymous";
} else {
try {
groups = UserGroupInformation.createRemoteUser(userName).getGroups();
} catch (Exception ex) {
LOG.warn("Cannot obtain groups for " + userName, ex);
}
}
MappingInput mi = new MappingInput(userName, groups,
ss.getHiveVariables().get("wmpool"), ss.getHiveVariables().get("wmapp"));
WmContext wmContext = ctx.getWmContext();
// jobConf will hold all the configuration for hadoop, tez, and hive
JobConf jobConf = utils.createConfiguration(conf);
// Get all user jars from work (e.g. input format stuff).
String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf);
// DAG scratch dir. We get a session from the pool so it may be different from Tez one.
// TODO: we could perhaps reuse the same directory for HiveResources?
Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), conf);
CallerContext callerContext = CallerContext.create(
"HIVE", queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr());
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_GET_SESSION);
session = sessionRef.value = WorkloadManagerFederation.getSession(
sessionRef.value, conf, mi, getWork().getLlapMode(), wmContext);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_GET_SESSION);
try {
ss.setTezSession(session);
LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(),
wmContext.getQueryId());
// Ensure the session is open and has the necessary local resources.
// This would refresh any conf resources and also local resources.
ensureSessionHasResources(session, allNonConfFiles);
// This is a combination of the jar stuff from conf, and not from conf.
List<LocalResource> allNonAppResources = session.getLocalizedResources();
logResources(allNonAppResources);
Map<String, LocalResource> allResources = DagUtils.createTezLrMap(
session.getAppJarLr(), allNonAppResources);
// next we translate the TezWork to a Tez DAG
DAG dag = build(jobConf, work, scratchDir, ctx, allResources);
dag.setCallerContext(callerContext);
// Note: we no longer call addTaskLocalFiles because all the resources are correctly
// updated in the session resource lists now, and thus added to vertices.
// If something breaks, dag.addTaskLocalFiles might need to be called here.
// Check isShutdown opportunistically; it's never unset.
if (this.isShutdown) {
throw new HiveException("Operation cancelled");
}
DAGClient dagClient = submit(jobConf, dag, sessionRef);
session = sessionRef.value;
boolean wasShutdown = false;
synchronized (dagClientLock) {
assert this.dagClient == null;
wasShutdown = this.isShutdown;
if (!wasShutdown) {
this.dagClient = dagClient;
}
}
if (wasShutdown) {
closeDagClientOnCancellation(dagClient);
throw new HiveException("Operation cancelled");
}
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), dagClient, conf, dag, ctx);
rc = monitor.monitorExecution();
if (rc != 0) {
this.setException(new HiveException(monitor.getDiagnostics()));
}
// fetch the counters
try {
Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
} catch (Exception err) {
// Don't fail execution due to counters - just don't print summary info
LOG.warn("Failed to get counters. Ignoring, summary info will be incomplete. " + err, err);
counters = null;
}
} finally {
// Note: due to TEZ-3846, the session may actually be invalid in case of some errors.
// Currently, reopen on an attempted reuse will take care of that; we cannot tell
// if the session is usable until we try.
// We return this to the pool even if it's unusable; reopen is supposed to handle this.
wmContext = ctx.getWmContext();
try {
if (sessionRef.value != null) {
sessionRef.value.returnToSessionManager();
}
} catch (Exception e) {
LOG.error("Failed to return session: {} to pool", session, e);
throw e;
}
if (!conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("none") &&
wmContext != null) {
if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("json")) {
wmContext.printJson(console);
} else if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("text")) {
wmContext.print(console);
}
}
}
if (LOG.isInfoEnabled() && counters != null
&& (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
Utilities.isPerfOrAboveLogging(conf))) {
for (CounterGroup group: counters) {
LOG.info(group.getDisplayName() +":");
for (TezCounter counter: group) {
LOG.info(" "+counter.getDisplayName()+": "+counter.getValue());
}
}
}
} catch (Exception e) {
LOG.error("Failed to execute tez graph.", e);
// rc will be 1 at this point indicating failure.
} finally {
Utilities.clearWork(conf);
// Clear gWorkMap
for (BaseWork w : work.getAllWork()) {
JobConf workCfg = workToConf.get(w);
if (workCfg != null) {
Utilities.clearWorkMapForConf(workCfg);
}
}
if (cleanContext) {
try {
ctx.clear();
} catch (Exception e) {
/*best effort*/
LOG.warn("Failed to clean up after tez job", e);
}
}
// need to either move tmp files or remove them
DAGClient dagClient = null;
synchronized (dagClientLock) {
dagClient = this.dagClient;
this.dagClient = null;
}
// TODO: not clear why we don't do the rest of the cleanup if dagClient is not created.
// E.g. jobClose will be called if we fail after dagClient creation but no before...
// DagClient as such should have no bearing on jobClose.
if (dagClient != null) {
// rc will only be overwritten if close errors out
rc = close(work, rc, dagClient);
}
}
return rc;
}
根据注释可以看到,这是使用了Tez的Java API实现的。里边的具体实现过程也可以看出调用API的端倪。甚至还获取了UGI。。。
SparkTask
package org.apache.hadoop.hive.ql.exec.spark;
public class SparkTask extends Task<SparkWork> {}
@Override
public int execute(DriverContext driverContext) {
int rc = 0;
perfLogger = SessionState.getPerfLogger();
SparkSession sparkSession = null;
SparkSessionManager sparkSessionManager = null;
try {
printConfigInfo();
sparkSessionManager = SparkSessionManagerImpl.getInstance();
sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
SparkWork sparkWork = getWork();
sparkWork.setRequiredCounterPrefix(getOperatorCounters());
// Submit the Spark job
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
jobRef = sparkSession.submit(driverContext, sparkWork);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
// If the driver context has been shutdown (due to query cancellation) kill the Spark job
if (driverContext.isShutdown()) {
LOG.warn("Killing Spark job");
killJob();
throw new HiveException("Operation is cancelled.");
}
// Get the Job Handle id associated with the Spark job
sparkJobHandleId = jobRef.getJobId();
// Add Spark job handle id to the Hive History
addToHistory(Keys.SPARK_JOB_HANDLE_ID, jobRef.getJobId());
LOG.debug("Starting Spark job with job handle id " + sparkJobHandleId);
// Get the application id of the Spark app
jobID = jobRef.getSparkJobStatus().getAppID();
// Start monitoring the Spark job, returns when the Spark job has completed / failed, or if
// a timeout occurs
rc = jobRef.monitorJob();
// Get the id the Spark job that was launched, returns -1 if no Spark job was launched
sparkJobID = jobRef.getSparkJobStatus().getJobId();
// Add Spark job id to the Hive History
addToHistory(Keys.SPARK_JOB_ID, Integer.toString(sparkJobID));
// Get the final state of the Spark job and parses its job info
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
getSparkJobInfo(sparkJobStatus, rc);
if (rc == 0) {
sparkStatistics = sparkJobStatus.getSparkStatistics();
printExcessiveGCWarning();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID));
}
LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " +
jobID + " and task ID " + getId());
} else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
// TODO: If the timeout is because of lack of resources in the cluster, we should
// ideally also cancel the app request here. But w/o facilities from Spark or YARN,
// it's difficult to do it on hive side alone. See HIVE-12650.
LOG.debug("Failed to submit Spark job with job handle id " + sparkJobHandleId);
LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(jobID)
? "UNKNOWN" : jobID));
killJob();
} else if (rc == 4) {
LOG.info("The spark job or one stage of it has too many tasks" +
". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
killJob();
}
if (this.jobID == null) {
this.jobID = sparkJobStatus.getAppID();
}
sparkJobStatus.cleanup();
} catch (Exception e) {
String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";
// Has to use full name to make sure it does not conflict with
// org.apache.commons.lang.StringUtils
console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
LOG.error(msg, e);
setException(e);
if (e instanceof HiveException) {
HiveException he = (HiveException) e;
rc = he.getCanonicalErrorMsg().getErrorCode();
} else {
rc = 1;
}
} finally {
startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING);
// The startTime may not be set if the sparkTask finished too fast,
// because SparkJobMonitor will sleep for 1 second then check the state,
// right after sleep, the spark job may be already completed.
// In this case, set startTime the same as submitTime.
if (startTime < submitTime) {
startTime = submitTime;
}
finishTime = perfLogger.getEndTime(PerfLogger.SPARK_RUN_JOB);
Utilities.clearWork(conf);
if (sparkSession != null && sparkSessionManager != null) {
rc = close(rc);
try {
sparkSessionManager.returnSession(sparkSession);
} catch (HiveException ex) {
LOG.error("Failed to return the session to SessionManager", ex);
}
}
}
return rc;
}
当然并没有像平时那种方式执行spark-submit,而是sparkSession.submit直接利用API去提交任务。Hive On Spark现在也没啥人用了,用Spark On Hive的Spark SQL还能享受catalyst优化器,尤其是Spark3.3.0较2.4.0做了很多SQL Boy们很喜欢的join优化,不比Tez香么?
Tez执行引擎
可以看出,Hive完成了AST解析、语义分析、计划生成后就会调用Tez的API去实际跑任务。显然只有Hive的参数是远远不够的,还需要对Tez的参数进行调整才能获得更好的计算性能。虽然离线批处理对时效性要求不算很高,对性能要求也不会很过分,但是能快一点也没什么错,节省的算力哪怕是给HBase做归档Merge都是极好的。
Tez运行时参数
TezRuntimeConfiguration
Property Name | Default Value | Description | Type | Is Private? | Is Unstable? | Is Evolving? |
---|---|---|---|---|---|---|
tez.runtime.cleanup.files.on.interrupt | false | Used only for internal testing. Strictly not recommended to be used elsewhere. This parameter could be changed/dropped later. | boolean | true | false | true |
tez.runtime.combiner.class | null | Specifies a combiner class (primarily for Shuffle) | string | false | false | false |
tez.runtime.combine.min.spills | 3 | integer | false | false | false | |
tez.runtime.compress | null | boolean | false | false | false | |
tez.runtime.compress.codec | null | string | false | false | false | |
tez.runtime.convert.user-payload.to.history-text | false | Value: Boolean Whether to publish configuration information to History logger. Default false. | string | false | false | false |
tez.runtime.empty.partitions.info-via-events.enabled | true | boolean | false | false | false | |
tez.runtime.enable.final-merge.in.output | true | Expert level setting. Enable final merge in ordered (defaultsorter/pipelinedsorter) outputs. Speculative execution needs to be turned off when disabling this parameter. //TODO: TEZ-2132 | boolean | false | false | false |
tez.runtime.group.comparator.class | null | string | false | false | false | |
tez.runtime.ifile.readahead | true | Configuration key to enable/disable IFile readahead. | boolean | false | false | false |
tez.runtime.ifile.readahead.bytes | 4194304 | Configuration key to set the IFile readahead length in bytes. | integer | false | false | false |
tez.runtime.index.cache.memory.limit.bytes | 1048576 | integer | false | false | false | |
tez.runtime.task.input.post-merge.buffer.percent | null | float | false | false | false | |
tez.runtime.internal.sorter.class | null | string | false | false | false | |
tez.runtime.io.sort.factor | 100 | integer | false | false | false | |
tez.runtime.io.sort.mb | 100 | integer | false | false | false | |
tez.runtime.key.class | null | string | false | false | false | |
tez.runtime.key.comparator.class | null | string | false | false | false | |
tez.runtime.key.secondary.comparator.class | null | string | false | false | false | |
tez.runtime.optimize.local.fetch | true | If the shuffle input is on the local host bypass the http fetch and access the files directly | boolean | false | false | false |
tez.runtime.optimize.shared.fetch | false | Share data fetched between tasks running on the same host if applicable | boolean | false | false | false |
tez.runtime.partitioner.class | null | Specifies a partitioner class, which is used in Tez Runtime components like OnFileSortedOutput | string | false | false | false |
tez.runtime.pipelined-shuffle.enabled | false | Expert level setting. Enable pipelined shuffle in ordered outputs and in unordered partitioned outputs. In ordered cases, it works with PipelinedSorter. set tez.runtime.sort.threads to greater than 1 to enable pipelinedsorter. Ensure to set tez.runtime.enable.final-merge.in.output=false. Speculative execution needs to be turned off when using this parameter. //TODO: TEZ-2132 | boolean | false | false | false |
tez.runtime.pipelined.sorter.lazy-allocate.memory | false | Setting this to true would enable sorter to auto-allocate memory on need basis in progressive fashion. Setting to false would allocate all available memory during initialization of sorter. In such cases,@link{#TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB} would be honored and memory specified in @link{#TEZ_RUNTIME_IO_SORT_MB} would be initialized upfront. | boolean | false | false | false |
tez.runtime.pipelined.sorter.min-block.size.in.mb | 2000 | Tries to allocate @link{#TEZ_RUNTIME_IO_SORT_MB} in chunks specified in this parameter. | integer | false | false | false |
tez.runtime.pipelined.sorter.sort.threads | 2 | integer | false | false | false | |
tez.runtime.merge.progress.records | 10000 | integer | true | false | true | |
tez.runtime.report.partition.stats | null | Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496 This can be enabled/disabled at vertex level. {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats} defines the list of values that can be specified. TODO TEZ-3303 Given ShuffleVertexManager doesn’t consume precise stats yet. So do not set the value to “precise” yet when ShuffleVertexManager is used. | string | false | false | false |
tez.runtime.shuffle.acceptable.host-fetch.failure.fraction | 0.2 | float | true | false | true | |
tez.runtime.shuffle.batch.wait | -1 | Expert level setting. How long should @link{ShuffleManager} wait for batching before sending the events in milliseconds. Set to -1 to not wait. | integer | false | false | false |
tez.runtime.shuffle.buffersize | 8192 | integer | false | false | false | |
tez.runtime.shuffle.connect.timeout | null | integer | false | false | false | |
tez.runtime.shuffle.memory-to-memory.enable | false | boolean | false | false | false | |
tez.runtime.shuffle.ssl.enable | false | boolean | false | false | false | |
tez.runtime.shuffle.failed.check.since-last.completion | true | boolean | true | false | true | |
tez.runtime.shuffle.fetcher.use-shared-pool | false | boolean | true | false | true | |
tez.runtime.shuffle.fetch.buffer.percent | 0.9 | float | false | false | false | |
tez.runtime.shuffle.fetch.failures.limit | 5 | integer | false | false | false | |
tez.runtime.shuffle.fetch.max.task.output.at.once | 20 | integer | false | false | false | |
tez.runtime.shuffle.fetch.verify-disk-checksum | true | Controls verification of data checksums when fetching data directly to disk. Enabling verification allows the fetcher to detect corrupted data and report the failure against the upstream task before the data reaches the Processor and causes the fetching task to fail. | boolean | false | false | false |
tez.runtime.shuffle.host.penalty.time.limit | 600000 | Specifies in milliseconds the maximum delay a penalized host can have before being retried, defaults to 10 minutes. | integer | false | false | false |
tez.runtime.shuffle.keep-alive.enabled | false | boolean | false | false | false | |
tez.runtime.shuffle.keep-alive.max.connections | 20 | integer | false | false | false | |
tez.runtime.shuffle.max.allowed.failed.fetch.fraction | 0.5 | float | true | false | true | |
tez.runtime.shuffle.max.stall.time.fraction | 0.5 | float | true | false | true | |
tez.runtime.shuffle.memory.limit.percent | 0.25 | float | false | false | false | |
tez.runtime.shuffle.memory-to-memory.segments | null | integer | false | false | false | |
tez.runtime.shuffle.merge.percent | 0.9 | float | false | false | false | |
tez.runtime.shuffle.min.failures.per.host | 4 | integer | true | false | true | |
tez.runtime.shuffle.min.required.progress.fraction | 0.5 | float | true | false | true | |
tez.runtime.shuffle.notify.readerror | true | boolean | false | false | false | |
tez.runtime.shuffle.parallel.copies | 20 | integer | false | false | false | |
tez.runtime.shuffle.read.timeout | 180000 | integer | false | false | false | |
tez.runtime.shuffle.src-attempt.abort.limit | -1 | integer | true | false | true | |
tez.runtime.shuffle.use.async.http | false | boolean | false | false | false | |
tez.runtime.sorter.class | null | String value. Which sorter implementation to use. Valid values: - LEGACY - PIPELINED ( default ) {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl} | string | false | false | false |
tez.runtime.sort.spill.percent | 0.8 | float | false | false | false | |
tez.runtime.unordered.output.buffer.size-mb | 100 | Size of the buffer to use if not writing directly to disk. | integer | false | false | false |
tez.runtime.unordered.output.max-per-buffer.size-bytes | null | Maximum size for individual buffers used in the UnsortedPartitionedOutput. This is only meant to be used by unit tests for now. | integer | true | false | false |
tez.runtime.unordered-partitioned-kvwriter.buffer-merge-percent | 0 | Integer value. Percentage of buffer to be filled before we spill to disk. Default value is 0, which will spill for every buffer. | int | false | false | false |
tez.runtime.value.class | null | string | false | false | false |
运行时的可调参数还是很多的。
Tez静态参数
参照官网:https://tez.apache.org/releases/0.10.1/tez-api-javadocs/configs/TezConfiguration.html
TezConfiguration
Property Name | Default Value | Description | Type | Is Private? | Is Unstable? | Is Evolving? |
---|---|---|---|---|---|---|
tez.dag.recovery.enabled | true | Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the incomplete DAGs from the previous instance of the app master. | boolean | false | false | false |
tez.dag.recovery.io.buffer.size | 8192 | Int value. Size in bytes for the IO buffer size while processing the recovery file. Expert level setting. | integer | false | false | false |
tez.dag.recovery.flush.interval.secs | 30 | Int value. Interval, in seconds, between flushing recovery data to the recovery log. | integer | false | false | false |
tez.dag.recovery.max.unflushed.events | 100 | Int value. Number of recovery events to buffer before flushing them to the recovery log. | integer | false | false | false |
tez.task.heartbeat.timeout.check-ms | 30000 | Int value. Time interval, in milliseconds, between checks for lost tasks. Expert level setting. | integer | false | false | false |
tez.task.timeout-ms | 300000 | Int value. Time interval, in milliseconds, within which a task must heartbeat to the app master before its considered lost. Expert level setting. | integer | false | false | false |
tez.am.acls.enabled | true | Boolean value. Configuration to enable/disable ACL checks. | boolean | false | false | false |
tez.allow.disabled.timeline-domains | false | Boolean value. Allow disabling of Timeline Domains even if Timeline is being used. | boolean | true | false | false |
tez.am.client.am.port-range | null | String value. Range of ports that the AM can use when binding for client connections. Leave blank to use all possible ports. Expert level setting. It’s hadoop standard range configuration. For example 50000-50050,50100-50200 | string | false | false | false |
tez.am.client.heartbeat.timeout.secs | -1 | Int value. Time interval (in seconds). If the Tez AM does not receive a heartbeat from the client within this time interval, it will kill any running DAG and shut down. Required to re-cycle orphaned Tez applications where the client is no longer alive. A negative value can be set to disable this check. For a positive value, the minimum value is 10 seconds. Values between 0 and 10 seconds will be reset to the minimum value. Only relevant in session mode. This is disabled by default i.e. by default, the Tez AM will go on to complete the DAG and only kill itself after hitting the DAG submission timeout defined by {@link #TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS} | integer | false | false | false |
tez.am.client.am.thread-count | 2 | Int value. Number of threads to handle client RPC requests. Expert level setting. | integer | false | false | false |
tez.am.commit-all-outputs-on-dag-success | true | Boolean value. Determines when the final outputs to data sinks are committed. Commit is an output specific operation and typically involves making the output visible for consumption. If the config is true, then the outputs are committed at the end of DAG completion after all constituent vertices have completed. If false, outputs for each vertex are committed after that vertex succeeds. Depending on the desired output visibility and downstream consumer dependencies this value must be appropriately chosen. Defaults to the safe choice of true. | boolean | false | false | false |
tez.am.containerlauncher.thread-count-limit | 500 | Int value. Upper limit on the number of threads user to launch containers in the app master. Expert level setting. | integer | false | false | false |
tez.am.container.idle.release-timeout-max.millis | 10000 | Int value. The maximum amount of time to hold on to a container if no task can be assigned to it immediately. Only active when reuse is enabled. The value must be +ve and >= TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS. Containers will have an expire time set to a random value between TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS && TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This creates a graceful reduction in the amount of idle resources held | long | false | false | false |
tez.am.container.idle.release-timeout-min.millis | 5000 | Int value. The minimum amount of time to hold on to a container that is idle. Only active when reuse is enabled. Set to -1 to never release idle containers (not recommended). | integer | false | false | false |
tez.am.container.reuse.enabled | true | Boolean value. Configuration to specify whether container should be reused across tasks. This improves performance by not incurring recurring launch overheads. | boolean | false | false | false |
tez.am.container.reuse.locality.delay-allocation-millis | 250 | Int value. The amount of time to wait before assigning a container to the next level of locality. NODE -> RACK -> NON_LOCAL. Delay scheduling parameter. Expert level setting. | long | false | false | false |
tez.am.container.reuse.new-containers.enabled | false | Boolean value. Whether to reuse new containers that could not be immediately assigned to pending requests. If enabled then newly assigned containers that cannot be immediately allocated will be held for potential reuse as if it were a container that had just completed a task. If disabled then newly assigned containers that cannot be immediately allocated will be released. Active only if container reuse is enabled. | boolean | false | false | false |
tez.am.container.reuse.non-local-fallback.enabled | false | Boolean value. Whether to reuse containers for non-local tasks. Active only if reuse is enabled. Turning this on can severely affect locality and can be bad for jobs with high data volume being read from the primary data sources. | boolean | false | false | false |
tez.am.container.reuse.rack-fallback.enabled | true | Boolean value. Whether to reuse containers for rack local tasks. Active only if reuse is enabled. | boolean | false | false | false |
tez.am.credentials-merge | null | Boolean value. If true then Tez will add the ApplicationMaster credentials to all task credentials. | boolean | false | false | false |
tez.am.dag.appcontext.thread-count-limit | 10 | Int value. Upper limit on the number of threads used by app context (vertex management and input init events). | integer | false | false | false |
tez.am.dag.cleanup.on.completion | false | Boolean value. Instructs AM to delete Dag directory upon completion | boolean | false | false | false |
tez.am.dag.deletion.thread-count-limit | 10 | Int value. Upper limit on the number of threads used to delete DAG directories on nodes. | integer | false | false | false |
tez.am.dag.scheduler.class | org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrder | String value. The class to be used for DAG Scheduling. Expert level setting. | string | false | false | false |
tez.am.deletion.tracker.class | org.apache.tez.dag.app.launcher.DeletionTrackerImpl | String value that is a class name. Specify the class to use for Deletion tracking. | string | false | false | false |
tez.am.disable.client-version-check | false | Boolean value. Disable version check between client and AM/DAG. Default false. | boolean | true | false | false |
tez.am.task.estimator.exponential.lambda.ms | null | Long value. Specifies amount of time (in ms) of the lambda value in the smoothing function of the task estimator | long | false | false | true |
tez.am.task.estimator.exponential.skip.initials | 24 | The number of initial readings that the estimator ignores before giving a prediction. At the beginning the smooth estimator won’t be accurate in prediction | integer | false | false | true |
tez.am.task.estimator.exponential.stagnated.ms | null | The window length in the simple exponential smoothing that considers the task attempt is stagnated. | long | false | false | true |
tez.am.inline.task.execution.enabled | false | Tez AM Inline Mode flag. Not valid till Tez-684 get checked-in | boolean | true | false | false |
tez.am.inline.task.execution.max-tasks | 1 | Int value. The maximium number of tasks running in parallel within the app master process. | integer | false | false | false |
tez.am.launch.cluster-default.cmd-opts | -server -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN | String value. Command line options which will be prepended to {@link #TEZ_AM_LAUNCH_CMD_OPTS} during the launch of the AppMaster process. This property will typically be configured to include default options meant to be used by all jobs in a cluster. If required, the values can be overridden per job. | string | false | false | false |
tez.am.launch.cluster-default.env | null | String value. Env settings will be merged with {@link #TEZ_AM_LAUNCH_ENV} during the launch of the AppMaster process. This property will typically be configured to include default system env meant to be used by all jobs in a cluster. If required, the values can be appended to per job. | string | false | false | false |
tez.am.launch.cmd-opts | null | String value. Command line options provided during the launch of the Tez AppMaster process. Its recommended to not set any Xmx or Xms in these launch opts so that Tez can determine them automatically. | string | false | false | false |
tez.am.launch.env | String value. Env settings for the Tez AppMaster process. Should be specified as a comma-separated of key-value pairs where each pair is defined as KEY=VAL e.g. “LD_LIBRARY_PATH=.,USERNAME=foo” These take least precedence compared to other methods of setting env. These get added to the app master environment prior to launching it. This setting will prepend existing settings in the cluster default | string | false | false | false | |
tez.am.legacy.speculative.single.task.vertex.timeout | -1 | Long value. Specifies the timeout after which tasks on a single task vertex must be speculated. A negative value means not to use timeout for speculation of single task vertices. | long | false | false | true |
tez.am.legacy.speculative.slowtask.threshold | null | Float value. Specifies how many standard deviations away from the mean task execution time should be considered as an outlier/slow task. | float | false | false | true |
tez.am.log.level | INFO | Root Logging level passed to the Tez app master. Simple configuration: Set the log level for all loggers. e.g. INFO This sets the log level to INFO for all loggers. Advanced configuration: Set the log level for all classes, along with a different level for some. e.g. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO This sets the log level for all loggers to DEBUG, expect for the org.apache.hadoop.ipc and org.apache.hadoop.security, which are set to INFO Note: The global log level must always be the first parameter. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid | string | false | false | false |
tez.am.max.allowed.time-sec.for-read-error | 300 | int value. Represents the maximum time in seconds for which a consumer attempt can report a read error against its producer attempt, after which the producer attempt will be re-run to re-generate the output. There are other heuristics which determine the retry and mainly try to guard against a flurry of re-runs due to intermittent read errors (due to network issues). This configuration puts a time limit on those heuristics to ensure jobs dont hang indefinitely due to lack of closure in those heuristics Expert level setting. | integer | false | false | false |
tez.am.max.app.attempts | 2 | Int value. Specifies the number of times the app master can be launched in order to recover from app master failure. Typically app master failures are non-recoverable. This parameter is for cases where the app master is not at fault but is lost due to system errors. Expert level setting. | integer | false | false | false |
tez.am.maxtaskfailures.per.node | 10 | Int value. Specifies the number of task failures on a node before the node is considered faulty. | integer | false | false | false |
tez.am.minimum.allowed.speculative.tasks | 10 | Integer value. The minimum allowed tasks that can be speculatively re-executed at any time. | integer | false | false | true |
tez.am.modify-acls | null | String value. AM modify ACLs. This allows the specified users/groups to run modify operations on the AM such as submitting DAGs, pre-warming the session, killing DAGs or shutting down the session. Comma separated list of users, followed by whitespace, followed by a comma separated list of groups | string | false | false | false |
tez.am.node-blacklisting.enabled | true | Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes will not be used to execute tasks. | boolean | false | false | false |
tez.am.node-blacklisting.ignore-threshold-node-percent | 33 | Int value. Specifies the percentage of nodes in the cluster that may be considered faulty. This limits the number of nodes that are blacklisted in an effort to minimize the effects of temporary surges in failures (e.g. due to network outages). | integer | false | false | false |
tez.am.node-unhealthy-reschedule-tasks | false | Boolean value. Enable task rescheduling for node updates. When enabled the task scheduler will reschedule task attempts that are associated with an unhealthy node to avoid potential data transfer errors from downstream tasks. | boolean | false | false | false |
tez.am.preemption.heartbeats-between-preemptions | 3 | Int value. The number of RM heartbeats to wait after preempting running tasks before preempting more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the RM can act on the released resources and assign new ones to us. Expert level setting. | integer | false | false | false |
tez.am.preemption.max.wait-time-ms | 60000 | Int value. Time (in millisecs) that an unsatisfied request will wait before preempting other resources. In rare cases, the cluster says there are enough free resources but does not end up getting enough on a node to actually assign it to the job. This configuration tries to put a deadline on such wait to prevent indefinite job hangs. | integer | false | false | false |
tez.am.preemption.percentage | 10 | Int value. Specifies the percentage of tasks eligible to be preempted that will actually be preempted in a given round of Tez internal preemption. This slows down preemption and gives more time for free resources to be allocated by the cluster (if any) and gives more time for preemptable tasks to finish. Valid values are 0-100. Higher values will preempt quickly at the cost of losing work. Setting to 0 turns off preemption. Expert level setting. | integer | false | false | false |
tez.am.proportion.running.tasks.speculatable | 0.1 | Double value. The max percent (0-1) of running tasks that can be speculatively re-executed at any time. | double | false | false | true |
tez.am.proportion.total.tasks.speculatable | 0.01 | Double value. The max percent (0-1) of all tasks that can be speculatively re-executed at any time. | double | false | false | true |
tez.am.resource.cpu.vcores | 1 | Int value. The number of virtual cores to be used by the app master | integer | false | false | false |
tez.am.resource.memory.mb | 1024 | Int value. The amount of memory in MB to be used by the AppMaster | integer | false | false | false |
tez.am.am-rm.heartbeat.interval-ms.max | 1000 | Int value. The maximum heartbeat interval between the AM and RM in milliseconds Increasing this reduces the communication between the AM and the RM and can help in scaling up. Expert level setting. | integer | false | false | false |
tez.am.session.min.held-containers | 0 | Int value. The minimum number of containers that will be held in session mode. Not active in non-session mode. Enables an idle session (not running any DAG) to hold on to a minimum number of containers to provide fast response times for the next DAG. | integer | false | false | false |
tez.am.mode.session | false | Boolean value. Execution mode for the Tez application. True implies session mode. If the client code is written according to best practices then the same code can execute in either mode based on this configuration. Session mode is more aggressive in reserving execution resources and is typically used for interactive applications where multiple DAGs are submitted in quick succession by the same user. For long running applications, one-off executions, batch jobs etc non-session mode is recommended. If session mode is enabled then container reuse is recommended. | boolean | false | false | false |
tez.am.shuffle.auxiliary-service.id | mapreduce_shuffle | String value. Specifies the name of the shuffle auxiliary service. | string | false | false | false |
tez.am.soonest.retry.after.no.speculate | 1000 | Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of speculation if there is no task speculated in this round. | long | false | false | true |
tez.am.soonest.retry.after.speculate | 15000 | Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of speculation if there are tasks speculated in this round. | long | false | false | true |
tez.am.speculation.enabled | false | boolean | false | false | true | |
tez.am.speculator.class | null | The class that should be used for speculative execution calculations. | string | false | false | false |
tez.staging-dir | null | String value. Specifies a directory where Tez can create temporary job artifacts. | string | false | false | false |
tez.am.staging.scratch-data.auto-delete | true | Boolean value. If true then Tez will try to automatically delete temporary job artifacts that it creates within the specified staging dir. Does not affect any user data. | boolean | false | false | false |
tez.am.task.estimator.class | null | The class that should be used for task runtime estimation. | string | false | false | false |
tez.am.task.listener.thread-count | 30 | Int value. The number of threads used to listen to task heartbeat requests. Expert level setting. | integer | false | false | false |
tez.am.task.max.attempts | 0 | Int value. The maximum number of attempts that can run for a particular task before the task is failed. This count every attempts, including failed, killed attempts. Task failure results in DAG failure. Default is 0, which disables this feature. | integer | false | false | false |
tez.am.task.max.failed.attempts | 4 | Int value. The maximum number of attempts that can fail for a particular task before the task is failed. This does not count killed attempts. Task failure results in DAG failure. | integer | false | false | false |
tez.am.task.reschedule.higher.priority | true | Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous failures gets higher priority | boolean | false | false | false |
tez.am.task.reschedule.relaxed.locality | true | Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous failure get relaxed locality | boolean | false | false | false |
tez.am.tez-ui.history-url.template | HISTORY_URL_BASE/#/tez-app/APPLICATION_ID | String value Tez UI URL template for the application. Expert level setting. The AM will redirect the user to the Tez UI via this url. Template supports the following parameters to be replaced with the actual runtime information: APPLICATION_ID : Replaces this with application ID HISTORY_URL_BASE: replaces this with TEZ_HISTORY_URL_BASE For example, "http://uihost:9001/#/tez-app/APPLICATION_ID/ will be replaced to http://uihost:9001/#/tez-app/application_1421880306565_0001/ | string | false | false | false |
tez.am.vertex.max-task-concurrency | -1 | Int value. The maximum number of attempts that can run concurrently for a given vertex. Setting <=0 implies no limit | integer | false | false | false |
tez.am.view-acls | null | String value. AM view ACLs. This allows the specified users/groups to view the status of the AM and all DAGs that run within this AM. Comma separated list of users, followed by whitespace, followed by a comma separated list of groups | string | false | false | false |
tez.am.tez-ui.webservice.enable | true | String value Allow disabling of the Tez AM webservice. If set to false the Tez-UI wont show progress updates for running application. | boolean | false | false | false |
tez.am.yarn.scheduler.class | org.apache.tez.dag.app.rm.YarnTaskSchedulerService | String value. The class to be used for the YARN task scheduler. Expert level setting. | string | false | false | false |
tez.application.tags | null | String value. Tags for the job that will be passed to YARN at submission time. Queries to YARN for applications can filter on these tags. | string | false | false | false |
tez.aux.uris | null | Auxiliary resources to be localized for the Tez AM and all its containers. Value is comma-separated list of fully-resolved directories or file paths. All resources are made available into the working directory of the AM and/or containers i.e. $CWD. If directories are specified, they are not traversed recursively. Only files directly under the specified directory are localized. All duplicate resources are ignored. | string | false | false | false |
tez.cancel.delegation.tokens.on.completion | true | boolean | true | false | false | |
tez.classpath.add-hadoop-conf | false | Boolean value. If this value is true then tez explicitly adds hadoop conf directory into classpath for AM and task containers. Default is false. | boolean | true | false | true |
tez.client.asynchronous-stop | true | Boolean value. Backwards compatibility setting. Changes TezClient stop to be a synchronous call waiting until AM is in a final state before returning to the user. Expert level setting. | boolean | false | false | false |
tez.client.diagnostics.wait.timeout-ms | 3000 | Long value Time to wait (in milliseconds) for yarn app’s diagnotics is available Workaround for YARN-2560 | long | true | false | false |
tez.client.timeout-ms | 30000 | Long value. Time interval, in milliseconds, for client to wait during client-requested AM shutdown before issuing a hard kill to the RM for this application. Expert level setting. | long | false | false | false |
tez.java.opts.checker.class | null | String value. Ability to provide a different implementation to check/verify java opts defined for vertices/tasks. Class has to be an instance of JavaOptsChecker | string | true | false | false |
tez.java.opts.checker.enabled | true | Boolean value. Default true. Ability to disable the Java Opts Checker | boolean | true | false | false |
tez.task.concurrent.edge.trigger.type | null | String value. In the presence of concurrent input edge to a vertex, this describes the timing of scheduling downstream vertex tasks. It may be closely related to the type of event that will contribute to a scheduling decision. | string | false | false | false |
tez.container.max.java.heap.fraction | 0.8 | Double value. Tez automatically determines the Xmx for the JVMs used to run Tez tasks and app masters. This feature is enabled if the user has not specified Xmx or Xms values in the launch command opts. Doing automatic Xmx calculation is preferred because Tez can determine the best value based on actual allocation of memory to tasks the cluster. The value if used as a fraction that is applied to the memory allocated Factor to size Xmx based on container memory size. Value should be greater than 0 and less than 1. Set this value to -1 to allow Tez to use different default max heap fraction for different container memory size. Current policy is to use 0.7 for container smaller than 4GB and use 0.8 for larger container. | float | false | false | false |
tez.counters.counter-name.max-length | 64 | Int value. Configuration to limit the length of counter names. This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting. | integer | false | false | true |
tez.counters.group-name.max-length | 256 | Int value. Configuration to limit the counter group names per app master. This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting. | integer | false | false | true |
tez.counters.max | 1200 | Int value. Configuration to limit the counters per dag (AppMaster and Task). This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting. | integer | false | false | true |
tez.counters.max.groups | 500 | Int value. Configuration to limit the number of counter groups for a DAG. This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting. | integer | false | false | true |
tez.credentials.path | null | String value that is a file path. Path to a credentials file (with serialized credentials) located on the local file system. | string | false | false | false |
tez.dag.status.pollinterval-ms | 500 | Long value Status Poll interval in Milliseconds used when getting DAG status with timeout. | long | false | false | false |
tez.generate.debug.artifacts | false | boolean | false | false | true | |
tez.history.logging.log.level | null | Enum value. Config to limit the type of events published to the history logging service. The valid log levels are defined in the enum {@link HistoryLogLevel}. The default value is defined in {@link HistoryLogLevel#DEFAULT}. | string | false | false | false |
tez.history.logging.proto-base-dir | null | String value. The base directory into which history data will be written when proto history logging service is used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}. If this is not set, then logging is disabled for ProtoHistoryLoggingService. | string | false | false | false |
tez.history.logging.proto-doas | false | Long value. The amount of time in seconds to wait to ensure all events for a day is synced to disk. This should be maximum time variation b/w machines + maximum time to sync file content and metadata. | boolean | false | false | false |
tez.history.logging.queue.size | 100000 | Int value. Maximum queue size for proto history event logger. | integer | false | false | false |
tez.history.logging.split-dag-start | false | Boolean value. Set this to true, if the underlying file system does not support flush (Ex: s3). The dag submitted, initialized and started events are written into a file and closed. The rest of the events are written into another file. | boolean | false | false | false |
tez.history.logging.proto-sync-window-secs | 60 | Long value. The amount of time in seconds to wait to ensure all events for a day is synced to disk. This should be maximum time variation b/w machines + maximum time to sync file content and metadata. | long | false | false | false |
tez.history.logging.service.class | org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService | String value that is a class name. Specify the class to use for logging history data. To disable, set this to “org.apache.tez.dag.history.logging.impl.DevNullHistoryLoggingService” | string | false | false | false |
tez.history.logging.taskattempt-filters | null | List of comma separated enum values. Specifies the list of task attempt termination causes, which have to be suppressed from being logged to ATS. The valid filters are defined in the enum TaskAttemptTerminationCause. The filters are applied only if tez.history.logging.log.level is set to TASK_ATTEMPT. | string | false | false | false |
tez.history.logging.timeline-cache-plugin.old-num-dags-per-group | null | Comma separated list of Integers. These are the values that were set for the config value for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so that the groupIds generated previously will continue to be generated by the plugin. If an older value is not present then the UI may not show information for DAGs which were created with a different grouping value. Note: Do not add too many values here as it will affect the performance of Yarn Timeline Server/Tez UI due to the need to scan for more log files. | string | true | false | true |
tez.history.logging.timeline.num-dags-per-group | 1 | Integer value. Number of DAGs to be grouped together. This is used by the history logging service to generate groupIds such that numDagsPerGroup will have same groupId in a given session. If the value is set to 1 then we disable grouping. This config is used to control the number of DAGs written into one log file, and hence controls number of files created in the Filesystem used by YARN Timeline. | integer | true | false | true |
tez.tez-ui.history-url.base | null | String value Tez-UI Url base. This gets replaced in the TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE ex http://ui-host:9001 or if its hosted with a prefix http://ui-host:9001/~user if the ui is hosted on the default port (80 for http and 443 for https), the port should not be specified. | string | false | false | false |
tez.ignore.lib.uris | null | Boolean value. Allows to ignore ‘tez.lib.uris’. Useful during development as well as raw Tez application where classpath is propagated with application via {@link LocalResource}s. This is mainly useful for developer/debugger scenarios. | boolean | false | false | true |
tez.ipc.payload.reserved.bytes | 5242880 | Int value. SubmitDAGPlanRequest cannot be larger than Max IPC message size minus this number; otherwise, it will be serialized to HDFS and we transfer the path to server. Server will deserialize the request from HDFS. | int | true | false | false |
tez.job.fs-servers | null | Acquire all FileSystems info. e.g., all namenodes info of HDFS federation cluster. | string | false | false | false |
tez.job.fs-servers.token-renewal.exclude | null | Skip delegation token renewal for specified FileSystems. | string | false | false | false |
tez.tez.jvm.system-properties-to-log | null | String value. Determines what JVM properties will be logged for debugging purposes in the AM and Task runtime logs. | string | false | false | false |
tez.lib.uris | null | String value to a file path. The location of the Tez libraries which will be localized for DAGs. This follows the following semanticsTo use .tar.gz or .tgz files (generated by the tez or hadoop builds), the full path to this file (including filename) should be specified. The internal structure of the uncompressed tgz will be defined by 'tez.lib.uris.classpath’If a single file is specified without the above mentioned extensions - it will be treated as a regular file. This means it will not be uncompressed during runtime.If multiple entries existRegular Files: will be treated as regular files (not uncompressed during runtime)Archive Files: will be treated as archives and will be uncompressed during runtimeDirectories: all files under the directory (non-recursive) will be made available (but not uncompressed during runtime). | string | false | false | false |
tez.lib.uris.classpath | null | Specify additional user classpath information to be used for Tez AM and all containers. This will be appended to the classpath after PWD ‘tez.lib.uris.classpath’ defines the relative classpath into the archives that are set in ‘tez.lib.uris’ | string | false | false | false |
tez.local.cache.root.folder | . | String value. TezLocalCacheManager uses this folder as a root for temp and localized files. | string | false | false | false |
tez.local.mode | false | Boolean value. Enable local mode execution in Tez. Enables tasks to run in the same process as the app master. Primarily used for debugging. | boolean | false | false | false |
tez.local.mode.without.network | false | Boolean value. Enable local mode execution in Tez without using network for communicating with DAGAppMaster. This option only makes sense when {@link #TEZ_LOCAL_MODE} is true. When TEZ_LOCAL_MODE_WITHOUT_NETWORK is turned on, LocalClient will call DAGAppMaster’s methods directly. | boolean | false | false | false |
tez.mrreader.config.update.properties | null | Comma-separated list of properties that MRReaderMapred should return (if present) when calling for config updates. | string | false | false | false |
tez.queue.name | null | String value. The queue name for all jobs being submitted from a given client. | string | false | false | false |
tez.session.am.dag.submit.timeout.secs | 300 | Int value. Time (in seconds) for which the Tez AM should wait for a DAG to be submitted before shutting down. Only relevant in session mode. Any negative value will disable this check and allow the AM to hang around forever in idle mode. | integer | false | false | false |
tez.session.client.timeout.secs | 120 | Int value. Time (in seconds) to wait for AM to come up when trying to submit a DAG from the client. Only relevant in session mode. If the cluster is busy and cannot launch the AM then this timeout may be hit. In those case, using non-session mode is recommended if applicable. Otherwise increase the timeout (set to -1 for infinity. Not recommended) | integer | false | false | false |
tez.simple.history.logging.dir | null | String value. The directory into which history data will be written. This defaults to the container logging directory. This is relevant only when SimpleHistoryLoggingService is being used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS} | string | false | false | false |
tez.simple.history.max.errors | 10 | Int value. Maximum errors allowed while logging history data. After crossing this limit history logging gets disabled. The job continues to run after this. | integer | false | false | false |
tez.task.am.heartbeat.counter.interval-ms.max | 4000 | Int value. Interval, in milliseconds, after which counters are sent to AM in heartbeat from tasks. This reduces the amount of network traffice between AM and tasks to send high-volume counters. Improves AM scalability. Expert level setting. | integer | false | false | false |
tez.task.am.heartbeat.interval-ms.max | 100 | Int value. The maximum heartbeat interval, in milliseconds, between the app master and tasks. Increasing this can help improve app master scalability for a large number of concurrent tasks. Expert level setting. | integer | false | false | false |
tez.task.generate.counters.per.io | false | Whether to generate counters per IO or not. Enabling this will rename CounterGroups / CounterNames to making them unique per Vertex + Src|Destination | boolean | true | false | true |
tez.task.get-task.sleep.interval-ms.max | 200 | Int value. The maximum amount of time, in milliseconds, to wait before a task asks an AM for another task. Increasing this can help improve app master scalability for a large number of concurrent tasks. Expert level setting. | integer | false | false | false |
tez.task.initialize-processor-first | false | Boolean value. Backwards compatibility setting for initializing IO processor before inputs and outputs. Expert level setting. | boolean | false | false | false |
tez.task.initialize-processor-io-serially | false | Boolean value. Backwards compatibility setting for initializing inputs and outputs serially instead of the parallel default. Expert level setting. | boolean | false | false | false |
tez.task.launch.cluster-default.cmd-opts | -server -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN | String value. Command line options which will be prepended to {@link #TEZ_TASK_LAUNCH_CMD_OPTS} during the launch of Tez tasks. This property will typically be configured to include default options meant to be used by all jobs in a cluster. If required, the values can be overridden per job. | string | false | false | false |
tez.task.launch.cluster-default.env | null | String value. Env settings will be merged with {@link #TEZ_TASK_LAUNCH_ENV} during the launch of the task process. This property will typically be configured to include default system env meant to be used by all jobs in a cluster. If required, the values can be appended to per job. | string | false | false | false |
tez.task.launch.cmd-opts | null | String value. Command line options provided during the launch of Tez Task processes. Its recommended to not set any Xmx or Xms in these launch opts so that Tez can determine them automatically. | string | false | false | false |
tez.task.launch.env | String value. Env settings for the Tez Task processes. Should be specified as a comma-separated of key-value pairs where each pair is defined as KEY=VAL e.g. “LD_LIBRARY_PATH=.,USERNAME=foo” These take least precedence compared to other methods of setting env These get added to the task environment prior to launching it. This setting will prepend existing settings in the cluster default | string | false | false | false | |
tez.task.log.level | INFO | Root Logging level passed to the Tez tasks. Simple configuration: Set the log level for all loggers. e.g. INFO This sets the log level to INFO for all loggers. Advanced configuration: Set the log level for all classes, along with a different level for some. e.g. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO This sets the log level for all loggers to DEBUG, expect for the org.apache.hadoop.ipc and org.apache.hadoop.security, which are set to INFO Note: The global log level must always be the first parameter. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid | string | false | false | false |
tez.task.max-events-per-heartbeat | 500 | Int value. Maximum number of of events to fetch from the AM by the tasks in a single heartbeat. Expert level setting. Expert level setting. | integer | false | false | false |
tez.task.max-event-backlog | 10000 | Int value. Maximum number of pending task events before a task will stop asking for more events in the task heartbeat. Expert level setting. | integer | false | false | false |
tez.task.progress.stuck.interval-ms | -1 | Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output components need to make successive progress notifications. If the progress is not notified for this interval then the task will be considered hung and terminated. The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS} and larger than 2 times the value of {@link TezConfiguration#TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS}. A config value <=0 disables this. | string | false | false | false |
tez.task.resource.calculator.process-tree.class | null | string | true | false | true | |
tez.task.resource.cpu.vcores | 1 | Int value. The number of virtual cores to be used by tasks. | integer | false | false | false |
tez.task.resource.memory.mb | 1024 | Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across all vertices. Setting it to the same value for all tasks is helpful for container reuse and thus good for performance typically. | integer | false | false | false |
tez.task.scale.memory.additional-reservation.fraction.max | null | float | true | false | true | |
tez.task.scale.memory.additional-reservation.fraction.per-io | null | Fraction of available memory to reserve per input/output. This amount is removed from the total available pool before allocation and is for factoring in overheads. | float | true | false | true |
tez.task.scale.memory.allocator.class | org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor | The allocator to use for initial memory allocation | string | true | false | true |
tez.task.scale.memory.enabled | true | Whether to scale down memory requested by each component if the total exceeds the available JVM memory | boolean | true | false | true |
tez.task.scale.memory.reserve-fraction | 0.3 | The fraction of the JVM memory which will not be considered for allocation. No defaults, since there are pre-existing defaults based on different scenarios. | double | true | false | true |
tez.task.scale.memory.ratios | null | string | true | false | true | |
tez.task-specific.launch.cmd-opts | null | Additional launch command options to be added for specific tasks. VERTEX_NAME and TASK_INDEX can be specified, which would be replaced at runtime by vertex name and task index. e.g tez.task-specific.launch.cmd-opts= “-agentpath:libpagent.so,dir=/tmp/VERTEX_NAME/TASK_INDEX” | string | false | false | true |
tez.task-specific.launch.cmd-opts.list | null | Set of tasks for which specific launch command options need to be added. Format: “vertexName[csv of task ids];vertexName[csv of task ids]…” Valid e.g: v[0,1,2] - Additional launch-cmd options for tasks 0,1,2 of vertex v v[1,2,3];v2[5,6,7] - Additional launch-cmd options specified for tasks of vertices v and v2. v[1:5,20,30];v2[2:5,60,7] - Additional launch-cmd options for 1,2,3,4,5,20,30 of vertex v; 2, 3,4,5,60,7 of vertex v2 Partial ranges like :5, 1: are not supported. v[] - Additional launch-cmd options for all tasks in vertex v | string | false | false | true |
tez.task-specific.log.level | null | Task specific log level. Simple configuration: Set the log level for all loggers. e.g. INFO This sets the log level to INFO for all loggers. Advanced configuration: Set the log level for all classes, along with a different level for some. e.g. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO This sets the log level for all loggers to DEBUG, expect for the org.apache.hadoop.ipc and org.apache.hadoop.security, which are set to INFO Note: The global log level must always be the first parameter. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid | string | false | false | true |
tez.test.minicluster.app.wait.on.shutdown.secs | 30 | Long value. Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown. | long | true | false | false |
tez.user.classpath.first | true | Boolean value. Specify whether the user classpath takes precedence over the Tez framework classpath. | boolean | false | false | false |
tez.use.cluster.hadoop-libs | false | Boolean value. Specify whether hadoop libraries required to run Tez should be the ones deployed on the cluster. This is disabled by default - with the expectation being that tez.lib.uris has a complete tez-deployment which contains the hadoop libraries. | boolean | false | false | false |
tez.yarn.ats.acl.domains.auto-create | true | boolean | false | false | false | |
tez.yarn.ats.event.flush.timeout.millis | -1 | Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown. Expert level setting. | long | false | false | false |
tez.yarn.ats.max.events.per.batch | 5 | Int value. Max no. of events to send in a single batch to ATS. Expert level setting. | integer | false | false | false |
tez.yarn.ats.max.polling.time.per.event.millis | 10 | Int value. Time, in milliseconds, to wait for an event before sending a batch to ATS. Expert level setting. | integer | false | false | false |
不必怀疑。。。静态参数就是比运行时参数更多。
总结
通过查脚本及源代码,可以知道Hive的命令行方式有3个入口:
Beeline:先进,直接org.apache.hive.beeline.BeeLine主类Main方法,构造方法isBeeLine=true
Hive Cli:落后,可选Beeline入口【org.apache.hive.beeline.cli.HiveCli】,构造方法isBeeLine=false
可选Cli入口【org.apache.hadoop.hive.cli.CliDriver】
从落后的Cli入口可以看出HQL的具体执行过程:
入口org.apache.hadoop.hive.cli.CliDriver进入→run()方法→executeDriver方法具体完成SQL解析及计算任务吊起
主方法executeDriver的执行又细分为如下步骤:
processLine方法执行拆分后的每个SQL→processCmd方法执行具体SQL命令→根据是否解析出source关键字、是否“!”开头做了一些骚操作→
processLocalCmd方法吊起run方法做运算→processLocalCmd方法做结果展示
processLocalCmd方法具体调用的是Driver的run方法:
未编译状态执行runInternal方法→
compileInternal方法编译SQL
【主要是内部的compile方法:解析到AST树,刷新了函数及MetaStore,sem.analyze语义分析,sem.validate做计划生成】→
execute吊起【launchTask并行吊起、runSequential串行吊起】
先留个坑,最核心且最复杂的compile方法及Calcite解析优化还需要慢慢研究。。。
转载请注明出处。。。