#!/usr/bin/env python
import sys
class Mapper(object):
"""
Mapper
"""
def __init__(self):
return
def mapper(self, arg_map):
"""
mapper
"""
pass
class Reducer(object):
"""
Reducer
"""
def __init__(self):
return
def reducer(self, arg_map):
"""
reducer
"""
pass
if __name__ == '__main__':
action = sys.argv[1]
arg_map = {}
if len(sys.argv) > 2:
for i in range(2, len(sys.argv)):
arg_sps = sys.argv[i].split('=')
if len(arg_sps) == 2:
arg_map[arg_sps[0]] = arg_sps[1]
if action == 'mapper':
mapper = Mapper()
mapper.mapper(arg_map)
if action == 'reducer':
reducer = Reducer()
reducer.reducer(arg_map)
任务提交
#!/bin/bash
source /home/work/.bashrc
if [ ! -n "$1" ]; then
date=`date +%Y%m%d -d '-1 days'`
else
date=$1
fi
INPUT_PATH="afs://xxx.afs.xxx.com:xxx"
OUTPUT_PATH="afs://xxx.afs.xxx.com:xxx"
hadoop=~/.hmpclient/hadoop-client/hadoop/bin/hadoop
HADOOP_CONF=./conf/hadoop-site.xml
while true
do
${hadoop} fs -conf ./conf/hadoop-site.xml -test -e ${INPUT_PATH}/_SUCCESS
if [ $? -ne 0 ]; then
echo "event_day=${INPUT_PATH} not ready"
sleep 3m
else
echo "event_day=${INPUT_PATH}/_SUCCESS exits"
break
fi
done
echo "INPUT_PATH: $INPUT_PATH"
echo "OUTPUT_PATH: $OUTPUT_PATH"
${hadoop} fs -conf ./conf/hadoop-site.xml -rmr ${OUTPUT_PATH}
${hadoop} streaming -D mapred.job.priority=VERY_HIGH \
-conf ./conf/hadoop-site.xml \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-jobconf mapred.combine.input.format.local.only=false \
-jobconf mapred.combine.input.format.dir.only=true \
-jobconf abaci.split.optimize.enable=false \
-jobconf mapred.max.map.failures.percent=10 \
-jobconf dfs.use.native.api=0 \
-jobconf mapred.job.queue.name=feed_qa_gzhl \
-jobconf mapred.max.split.size=30000000 \
-jobconf mapred.job.tracker=gzns-kunpeng-job.dmop.baidu.com:54311 \
-input "${INPUT_PROFILE_PATH}/part-*","${ACTIVE_USER_PATH}/part-*" \
-output ${OUTPUT_PATH} \
-mapper "./python/python/bin/python task.py mapper" \
-reducer "./python/python/bin/python task.py reducer log_date=${date}" \
-file ./scripts/*.py \
-file ./conf/*.conf \
-file ./infer_data/${yes_date}/* \
-jobconf mapred.reduce.tasks=1000 \
-jobconf mapred.job.reduce.capacity=1000 \
-jobconf mapred.job.map.capacity=4000 \
-jobconf mapred.job.name="${JOB_NAME}" \
-jobconf abaci.split.remote=false \
-jobconf mapred.output.compress=true \
-jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-cacheArchive afs://xingtian.afs.baidu.com:9902/user/feed/mlarch/feasign/sb_feed_live_small_video_v3/python_pb.tar.gz#python
if [[ $? -ne 0 ]];then
echo "[HADOOP ERROR]:job ${JOB_NAME} failed!"
kill -TERM $PPID # 终止父进程
exit 1
else
${hadoop} fs -conf ./conf/hadoop-site.xml -touchz "${OUTPUT_PATH}/to.hadoop.done"
echo "[HADOOP ERROR]:job ${JOB_NAME} succeed!"
fi
写在最后:若本文章对您有帮助,请点个赞啦 ٩(๑•̀ω•́๑)۶