【PYTHON】Mapper Reducer

发布于:2024-12-18 ⋅ 阅读:(34) ⋅ 点赞:(0)
#!/usr/bin/env python
import sys


class Mapper(object):
    """
    Mapper
    """
    def __init__(self):
        return

    def mapper(self, arg_map):
        """
        mapper
        """
        pass


class Reducer(object):
    """
    Reducer
    """
    def __init__(self):
        return
    
    def reducer(self, arg_map):
        """
        reducer
        """
    pass


if __name__ == '__main__':
    action = sys.argv[1]
    arg_map = {}
    if len(sys.argv) > 2:
        for i in range(2, len(sys.argv)):
            arg_sps = sys.argv[i].split('=')
            if len(arg_sps) == 2:
                arg_map[arg_sps[0]] = arg_sps[1]
    if action == 'mapper':
        mapper = Mapper()
        mapper.mapper(arg_map)
    if action == 'reducer':
        reducer = Reducer()
        reducer.reducer(arg_map)

任务提交

#!/bin/bash
source /home/work/.bashrc
if [ ! -n "$1" ]; then
	date=`date +%Y%m%d -d '-1 days'`
else
	date=$1
fi

INPUT_PATH="afs://xxx.afs.xxx.com:xxx"
OUTPUT_PATH="afs://xxx.afs.xxx.com:xxx"


hadoop=~/.hmpclient/hadoop-client/hadoop/bin/hadoop
HADOOP_CONF=./conf/hadoop-site.xml


while true
do
    ${hadoop} fs -conf ./conf/hadoop-site.xml -test -e ${INPUT_PATH}/_SUCCESS
    if [ $? -ne 0 ]; then
        echo "event_day=${INPUT_PATH} not ready"
        sleep 3m
    else
        echo "event_day=${INPUT_PATH}/_SUCCESS exits"
        break
    fi
done

echo "INPUT_PATH: $INPUT_PATH"
echo "OUTPUT_PATH: $OUTPUT_PATH"

${hadoop} fs -conf ./conf/hadoop-site.xml -rmr ${OUTPUT_PATH}

${hadoop} streaming -D mapred.job.priority=VERY_HIGH \
-conf ./conf/hadoop-site.xml \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-jobconf mapred.combine.input.format.local.only=false \
-jobconf mapred.combine.input.format.dir.only=true \
-jobconf abaci.split.optimize.enable=false \
-jobconf mapred.max.map.failures.percent=10 \
-jobconf dfs.use.native.api=0 \
-jobconf mapred.job.queue.name=feed_qa_gzhl \
-jobconf mapred.max.split.size=30000000 \
-jobconf mapred.job.tracker=gzns-kunpeng-job.dmop.baidu.com:54311 \
-input "${INPUT_PROFILE_PATH}/part-*","${ACTIVE_USER_PATH}/part-*" \
-output ${OUTPUT_PATH} \
-mapper "./python/python/bin/python task.py mapper" \
-reducer "./python/python/bin/python task.py reducer log_date=${date}" \
-file ./scripts/*.py \
-file ./conf/*.conf \
-file ./infer_data/${yes_date}/* \
-jobconf mapred.reduce.tasks=1000 \
-jobconf mapred.job.reduce.capacity=1000 \
-jobconf mapred.job.map.capacity=4000 \
-jobconf mapred.job.name="${JOB_NAME}" \
-jobconf abaci.split.remote=false \
-jobconf mapred.output.compress=true \
-jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-cacheArchive afs://xingtian.afs.baidu.com:9902/user/feed/mlarch/feasign/sb_feed_live_small_video_v3/python_pb.tar.gz#python

if [[ $? -ne 0 ]];then
    echo "[HADOOP ERROR]:job ${JOB_NAME} failed!"
    kill -TERM $PPID   # 终止父进程
    exit 1
else
    ${hadoop} fs -conf ./conf/hadoop-site.xml -touchz "${OUTPUT_PATH}/to.hadoop.done"
    echo "[HADOOP ERROR]:job ${JOB_NAME} succeed!"
fi



写在最后:若本文章对您有帮助,请点个赞啦 ٩(๑•̀ω•́๑)۶


网站公告

今日签到

点亮在社区的每一天
去签到