DolphinScheduler 3.1.9 启动、任务执行过程 源码 解析

发布于:2024-07-02 ⋅ 阅读:(13) ⋅ 点赞:(0)

前言

本来想要先介绍一下DolphinScheduler 是什么,与其他任务调度框架的比较等等。但是感觉自己的语言组织能力是一方面,而且网上已经由很多的相关博客。在写无非是粘贴复制,没有太大的意义。所以这里只简单的说明下我们为什么要用DolphinScheduler。对技术选型、架构感兴趣的可以去网上搜一搜。

为什么选择DolphinScheduler

1、去中性化:不会因为master挂掉导致服务不可用。

2、分布式:水平扩展容易,只要增加work节点既可。

3、丰富的任务类型:包括shell、http、sqoop、datax、seatunnel等常用的任务类型。

4、可视化强大:支持拖拽的任务编排方式。

5、监控:任务状态、系统状态等监控完善。

源码

个人经验,看源码之前我们要知道为什么看源码,想从源码中得到什么信息,带着问题去看源码。否则源码很多,而且来来回回跳,跟着跟着就乱了。所以这里先提出几个问题,如果大家有类似的疑惑,让我们一起去源码中看看是怎么做的。

1、DolphinScheduler 是一个分布式系统,那么节点之间是怎么通信的?

2、DolphinScheduler 是怎么启动的?

3、DolphinScheduler 是怎么执行一个任务的?

让我们带着这些问题在源码里面找找答案

master

BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}

source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"

JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

if [[ "$DOCKER" == "true" ]]; then
  JAVA_OPTS="${JAVA_OPTS} -XX:-UseContainerSupport"
fi
# 实际的启动命令,因为是一个开源的框架,所以使用java -cp方式启动。找到了关键类MasterServer
$JAVA_HOME/bin/java $JAVA_OPTS \
  -cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \
  org.apache.dolphinscheduler.server.master.MasterServer

进入org.apache.dolphinscheduler.server.master.MasterServer

/**
 * run master server
 */
@PostConstruct
public void run() throws SchedulerException {
    // 启动远程调用功能(实际用的是netty)
    this.masterRPCServer.start();

    // 组件加载
    this.taskPluginManager.loadPlugin();

    // 注册相关
    this.masterRegistryClient.start();
    this.masterRegistryClient.setRegistryStoppable(this);

    // 调度程序初始化和启动
    this.masterSchedulerBootstrap.init();
    this.masterSchedulerBootstrap.start();
     // 事件执行线程
    this.eventExecuteService.start();
    // 容灾和失败线程
    this.failoverExecuteThread.start();
    // 调度相关线程
    this.schedulerApi.start();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (!ServerLifeCycleManager.isStopped()) {
            close("MasterServer shutdownHook");
        }
    }));
}

进入this.masterRPCServer.start()

public void start() {
    logger.info("Starting Master RPC Server...");
    // init remoting server
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(masterConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    // 给不同类型的命令,绑定不同的执行器
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST,
            workflowExecutingDataRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor);

    // logger server
    this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
    // rpc核心启动
    this.nettyRemotingServer.start();
    logger.info("Started Master RPC Server...");
}


// 进入this.nettyRemotingServer.start();

/**
 * server start
 */
public void start() {
    if (isStarted.compareAndSet(false, true)) {
        this.serverBootstrap
                .group(this.bossGroup, this.workGroup)
                .channel(NettyUtils.getServerSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
                .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
                .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
                .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
                .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) {
                        initNettyChannel(ch);
                    }
                });

        ChannelFuture future;
        try {
            future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
        } catch (Exception e) {
            logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
            throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
        }
        if (future.isSuccess()) {
            logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
        } else if (future.cause() != null) {
            throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
        } else {
            throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
        }
    }
}

很明显,将master搞成了个netty服务端,所以这里可以大胆的猜测。work服务肯定也是一个netty服务端和客户端。而且要用netty进行信息的传递。至于传什么。这里能肯定的是心跳和数据同步肯定是通过netty实现的。所以问题1、DolphinScheduler 是一个分布式系统,那么节点之间是怎么通信的?就有了答案。

回到主程序。其他的this.taskPluginManager.loadPlugin();和this.masterRegistryClient.start();this.masterRegistryClient.setRegistryStoppable(this);不是重点,不跟进去看了。

进入masterSchedulerBootstrap相关的操作

this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();

/**
 * constructor of MasterSchedulerService
 启动守护线程
 绑定master所在的服务器地址
 */
public void init() {
    this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils
            .newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
    this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}

// 重要代码this.masterSchedulerBootstrap.start()
@Override
public synchronized void start() {
    logger.info("Master schedule bootstrap starting..");
    // 启动的核心
    super.start();
    workflowEventLooper.start();
    logger.info("Master schedule bootstrap started...");
}

// super.start()看到这方法就知道这个类是一个线程类。所以直接在代码中找run方法
/**
 * run of MasterSchedulerService
 */
@Override
public void run() {
// 直接进来一个while循环一直运行。都不用多想。代码里面肯定有thread.sleep。要不然cpu不干爆了
    while (!ServerLifeCycleManager.isStopped()) {
        try {
            if (!ServerLifeCycleManager.isRunning()) {
                logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress);
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            }
            boolean isOverload =
                    OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
            if (isOverload) {
                logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
                MasterServerMetrics.incMasterOverload();
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                continue;
            }
            // 找到所有要找到的流程,这个是从api模块点击运行生成的。
            List<Command> commands = findCommands();
            if (CollectionUtils.isEmpty(commands)) {
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                continue;
            }
            // 将命令转为流程实例对象集合
            List<ProcessInstance> processInstances = command2ProcessInstance(commands);
            if (CollectionUtils.isEmpty(processInstances)) {
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                continue;
            }
            MasterServerMetrics.incMasterConsumeCommand(commands.size());

            processInstances.forEach(processInstance -> {
                try {
                    LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
                    if (processInstanceExecCacheManager.contains(processInstance.getId())) {
                        logger.error(
                                "The workflow instance is already been cached, this case shouldn't be happened");
                    }
                    // 封装成WorkflowExecuteRunnable工作流执行的任务,看名字都知道内部肯定有run方法。
                    WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
                            processService,
                            processInstanceDao,
                            nettyExecutorManager,
                            processAlertManager,
                            masterConfig,
                            stateWheelExecuteThread,
                            curingGlobalParamsService);
                    // 先将所有WorkflowExecuteRunnable和流程实例绑定
                    processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
                    // 添加一个事件到工作流事件队列中
                    workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
                            processInstance.getId()));
                } finally {
                    LoggerUtils.removeWorkflowInstanceIdMDC();
                }
            });
        } catch (InterruptedException interruptedException) {
            logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
            Thread.currentThread().interrupt();
            break;
        } catch (Exception e) {
            logger.error("Master schedule workflow error", e);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        }
    }
}

// 此时我们拿到了所有需要执行的任务实例,进入workflowEventLooper.start();
@Override
public synchronized void start() {
    logger.info("WorkflowEventLooper thread starting");
    // 老方法,找到run
    super.start();
    logger.info("WorkflowEventLooper thread started");
}

public void run() {
    WorkflowEvent workflowEvent = null;
    while (!ServerLifeCycleManager.isStopped()) {
        try {
        // 从队列中拿到在上面代码中放入的事件,并且只有一种类型START_WORKFLOW。
            workflowEvent = workflowEventQueue.poolEvent();
            LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
            logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
            // 这里怎么就get拿东西了,是从哪里放进去的?代码搜个init方法,在上边也贴出来了
            WorkflowEventHandler workflowEventHandler =
                    workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
            // 进入这个方法
            workflowEventHandler.handleWorkflowEvent(workflowEvent);
        } catch (InterruptedException e) {
            logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
            Thread.currentThread().interrupt();
            break;
        } catch (WorkflowEventHandleException workflowEventHandleException) {
            logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
                    workflowEvent, workflowEventHandleException);
            workflowEventQueue.addEvent(workflowEvent);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        } catch (WorkflowEventHandleError workflowEventHandleError) {
            logger.error("Handle workflow event error, will drop this event, event: {}",
                    workflowEvent,
                    workflowEventHandleError);
        } catch (Exception unknownException) {
            logger.error(
                    "Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
                    workflowEvent, unknownException);
            workflowEventQueue.addEvent(workflowEvent);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        } finally {
            LoggerUtils.removeWorkflowInstanceIdMDC();
        }
    }
}

@PostConstruct
public void init() {
    workflowEventHandlerList.forEach(
            workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
                    workflowEventHandler));
}

@Override
public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
    logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
    // 还记的之前processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);维护的关系吗,从这个map中取出来WorkflowExecuteRunnable
    WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(
            workflowEvent.getWorkflowInstanceId());
    if (workflowExecuteRunnable == null) {
        throw new WorkflowEventHandleError(
                "The workflow start event is invalid, cannot find the workflow instance from cache");
    }
    ProcessInstanceMetrics.incProcessInstanceByState("submit");
    ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
    // 进入call方法
    CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
            .thenAccept(workflowSubmitStatue -> {
                if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
                    logger.info("Success submit the workflow instance");
                    if (processInstance.getTimeout() > 0) {
                        stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
                    }
                } else if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
                    logger.error(
                            "Failed to submit the workflow instance, will send fail state event: {}",
                            workflowEvent);
                    WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()
                            .processInstanceId(processInstance.getId())
                            .type(StateEventType.PROCESS_SUBMIT_FAILED)
                            .status(WorkflowExecutionStatus.FAILURE)
                            .build();
                    workflowExecuteRunnable.addStateEvent(stateEvent);
                }
            });
}

@Override
public WorkflowSubmitStatue call() {
    if (isStart()) {
        logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
        return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
    }

    try {
    // 每次处理完都会执行workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;也就意味着每个流程实例都会执行一边这个逻辑。
        LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
        if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
        // 如果有看DolphinScheduler文档的话,他会把任务构建成有向无环图。这里就是干这个事
            buildFlowDag();
            workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
            logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
        }
        if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
        // 初始换任务队列
            initTaskQueue();
            workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
            logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
        }
        if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
        // 进入方法
            submitPostNode(null);
            workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
            logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
        }
        return WorkflowSubmitStatue.SUCCESS;
    } catch (Exception e) {
        logger.error("Start workflow error", e);
        return WorkflowSubmitStatue.FAILED;
    } finally {
        LoggerUtils.removeWorkflowInstanceIdMDC();
    }
}

private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
    Set<String> submitTaskNodeList =
            DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
    List<TaskInstance> taskInstances = new ArrayList<>();
    for (String taskNode : submitTaskNodeList) {
        TaskNode taskNodeObject = dag.getNode(taskNode);
        Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode());
        if (existTaskInstanceOptional.isPresent()) {
            taskInstances.add(existTaskInstanceOptional.get());
            continue;
        }
        TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
        taskInstances.add(task);
    }
    if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)) {
        TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
        String taskInstanceVarPool = endTaskInstance.getVarPool();
        if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
            Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
            String processInstanceVarPool = processInstance.getVarPool();
            if (StringUtils.isNotEmpty(processInstanceVarPool)) {
                Set<Property> properties = new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
                properties.addAll(taskProperties);
                processInstance.setVarPool(JSONUtils.toJsonString(properties));
            } else {
                processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
            }
        }
    }

    for (TaskInstance task : taskInstances) {

        if (readyToSubmitTaskQueue.contains(task)) {
            logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId());
            continue;
        }

        if (task.getId() != null && completeTaskMap.containsKey(task.getTaskCode())) {
            logger.info("Task has already run success, taskName: {}", task.getName());
            continue;
        }
        if (task.getState().isKill()) {
            logger.info("Task is be stopped, the state is {}, taskInstanceId: {}", task.getState(), task.getId());
            continue;
        }
        // 这个方法实际就是往readyToSubmitTaskQueue队列中放准备提交的任务。
        addTaskToStandByList(task);
    }
    // 进入方法
    submitStandByTask();
    updateProcessInstanceState();
}

/**
 * handling the list of tasks to be submitted
 整体的逻辑就是从readyToSubmitTaskQueue拿到准备好提交的任务,然后先处理头任务,如果存在依赖,在处理依赖的任务
 */
public void submitStandByTask() throws StateEventHandleException {
    int length = readyToSubmitTaskQueue.size();
    for (int i = 0; i < length; i++) {
        TaskInstance task = readyToSubmitTaskQueue.peek();
        if (task == null) {
            continue;
        }
        if (task.taskCanRetry()) {
            TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
            if (retryTask != null && retryTask.getState().isForceSuccess()) {
                task.setState(retryTask.getState());
                logger.info(
                        "Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
                        task.getName(), task.getId());
                removeTaskFromStandbyList(task);
                completeTaskMap.put(task.getTaskCode(), task.getId());
                taskInstanceMap.put(task.getId(), task);
                submitPostNode(Long.toString(task.getTaskCode()));
                continue;
            }
        }
        if (task.isFirstRun()) {
            Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
            getPreVarPool(task, preTask);
        }
        DependResult dependResult = getDependResultForTask(task);
        if (DependResult.SUCCESS == dependResult) {
            logger.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());
            // 提交任务,进入方法
            Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
            if (!taskInstanceOptional.isPresent()) {
                this.taskFailedSubmit = true;
                if (!removeTaskFromStandbyList(task)) {
                    logger.error(
                            "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}",
                            processInstance.getId(),
                            task.getTaskCode());
                }
                completeTaskMap.put(task.getTaskCode(), task.getId());
                taskInstanceMap.put(task.getId(), task);
                errorTaskMap.put(task.getTaskCode(), task.getId());
                activeTaskProcessorMaps.remove(task.getTaskCode());
                logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}",
                        task.getProcessInstanceId(),
                        task.getId(),
                        task.getTaskCode());
            } else {
                removeTaskFromStandbyList(task);
            }
        } else if (DependResult.FAILED == dependResult) {
            dependFailedTaskSet.add(task.getTaskCode());
            removeTaskFromStandbyList(task);
            logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(),
                    dependResult);
        } else if (DependResult.NON_EXEC == dependResult) {
            removeTaskFromStandbyList(task);
            logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}",
                    task.getId(), dependResult);
        }
    }
}

/**
 * submit task to execute
 代码很长,只找到重要的看看
 *
 * @param taskInstance task instance
 * @return TaskInstance
 */
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
    try {
        processService.packageTaskInstance(taskInstance, processInstance);
        // 使用spi的方式获取与任务类型匹配的任务执行器。(autoservice spi可以在网上搜一搜)
        // 这里就不跟进去了,感兴趣可以去看看
        ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
        taskProcessor.init(taskInstance, processInstance);

        if (taskInstance.getState().isRunning()
                && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
            notifyProcessHostUpdate(taskInstance);
        }
        // 具体的执行逻辑,TaskAction有很多状态,进入方法
        boolean submit = taskProcessor.action(TaskAction.SUBMIT);
        if (!submit) {
            logger.error("Submit standby task failed!, taskCode: {}, taskName: {}",
                    taskInstance.getTaskCode(),
                    taskInstance.getName());
            return Optional.empty();
        }
        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
        if (validTaskMap.containsKey(taskInstance.getTaskCode())) {
            int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode());
            if (taskInstance.getId() != oldTaskInstanceId) {
                TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
                oldTaskInstance.setFlag(Flag.NO);
                processService.updateTaskInstance(oldTaskInstance);
                validTaskMap.remove(taskInstance.getTaskCode());
                activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
            }
        }

        validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
        taskInstanceMap.put(taskInstance.getId(), taskInstance);
        activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
        int taskGroupId = taskInstance.getTaskGroupId();
        if (taskGroupId > 0) {
            boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
                    taskInstance.getName(),
                    taskGroupId,
                    taskInstance.getProcessInstanceId(),
                    taskInstance.getTaskGroupPriority());
            if (!acquireTaskGroup) {
                logger.info(
                        "Submitted task will not be dispatch right now because the first time to try to acquire" +
                                " task group failed, taskInstanceName: {}, taskGroupId: {}",
                        taskInstance.getName(), taskGroupId);
                return Optional.of(taskInstance);
            }
        }

        boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
        if (!dispatchSuccess) {
            logger.error("Dispatch standby process {} task {} failed", processInstance.getName(),
                    taskInstance.getName());
            return Optional.empty();
        }
        taskProcessor.action(TaskAction.RUN);

        stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
        stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);

        if (taskProcessor.taskInstance().getState().isFinished()) {
            if (processInstance.isBlocked()) {
                TaskStateEvent processBlockEvent = TaskStateEvent.builder()
                        .processInstanceId(processInstance.getId())
                        .taskInstanceId(taskInstance.getId())
                        .status(taskProcessor.taskInstance().getState())
                        .type(StateEventType.PROCESS_BLOCKED)
                        .build();
                this.stateEvents.add(processBlockEvent);
            }
            TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder()
                    .processInstanceId(processInstance.getId())
                    .taskInstanceId(taskInstance.getId())
                    .status(taskProcessor.taskInstance().getState())
                    .type(StateEventType.TASK_STATE_CHANGE)
                    .build();
            this.stateEvents.add(taskStateChangeEvent);
        }
        return Optional.of(taskInstance);
    } catch (Exception e) {
        logger.error("Submit standby task {} error, taskCode: {}", taskInstance.getName(),
                taskInstance.getTaskCode(), e);
        return Optional.empty();
    } finally {
        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}

/**
可以看到TaskAction的状态很多,从上边的代码中可以找到几个状态的处理
SUBMIT(提交)、DISPATCH(转发,这里是重要状态,看这个代码)、RUN(运行)。
我理解这里的主要作用是用来修改任务不同节点的状态,分的有点细了,不知道这里当时的设计理念。
*/
@Override
public boolean action(TaskAction taskAction) {
    String threadName = Thread.currentThread().getName();
    if (StringUtils.isNotEmpty(threadLoggerInfoName)) {
        Thread.currentThread().setName(threadLoggerInfoName);
    }
    boolean result = false;
    try {
        switch (taskAction) {
            case STOP:
                result = stop();
                break;
            case PAUSE:
                result = pause();
                break;
            case TIMEOUT:
                result = timeout();
                break;
            case SUBMIT:
                result = submit();
                break;
            case RUN:
                result = run();
                break;
            case DISPATCH:
            // 进入方法
                result = dispatch();
                break;
            case RESUBMIT:
                result = resubmit();
                break;
            default:
                logger.error("unknown task action: {}", taskAction);
        }
        return result;
    } finally {
        // reset thread name
        Thread.currentThread().setName(threadName);

    }
}

// 我们进入CommonTaskProcessor实现类的方法中

// 有个任务队列的处理话,这里可以记一下。因为后边会用到。我当时就是在这里卡了好久,想不明白,任务到底是怎么被消费的。
private TaskPriorityQueue<TaskPriority> taskUpdateQueue;

public void initQueue() {
    this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
}

@Override
public boolean dispatchTask() {
    try {
        if (taskUpdateQueue == null) {
            this.initQueue();
        }
        if (taskInstance.getState().isFinished()) {
            logger.info("Task {} has already finished, no need to submit to task queue, taskState: {}",
                    taskInstance.getName(), taskInstance.getState());
            return true;
        }
        if (taskInstance.getState() == TaskExecutionStatus.RUNNING_EXECUTION
                || taskInstance.getState() == TaskExecutionStatus.DELAY_EXECUTION) {
            logger.info("Task {} is already running or delayed, no need to submit to task queue, taskState: {}",
                    taskInstance.getName(), taskInstance.getState());
            return true;
        }
        logger.info("Task {} is ready to dispatch to worker", taskInstance.getName());

        TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
                processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
                taskInstance.getId(), taskInstance.getTaskGroupPriority(),
                Constants.DEFAULT_WORKER_GROUP);

        TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
        if (taskExecutionContext == null) {
            logger.error("Get taskExecutionContext fail, task: {}", taskInstance);
            return false;
        }

        taskPriority.setTaskExecutionContext(taskExecutionContext);
        // 主要为了把任务放到这个队列中,以供后续消费
        taskUpdateQueue.put(taskPriority);
        logger.info("Task {} is submitted to priority queue success by master", taskInstance.getName());
        return true;
    } catch (Exception e) {
        logger.error("Task {} is submitted to priority queue error", taskInstance.getName(), e);
        return false;
    }
}

masterSchedulerBootstrap方法还是很深的。但是最主要的作用就是把任务进行包装,并且修改任务的状态,然后把任务放到消费队列中,等待消费,发给work节点。

接下来就是执行this.eventExecuteService.start();

@Override
public synchronized void start() {
    logger.info("Master Event execute service starting");
    super.start();
    logger.info("Master Event execute service started");
}

@Override
public void run() {
    while (!ServerLifeCycleManager.isStopped()) {
        try {
            workflowEventHandler();
            streamTaskEventHandler();
            TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
        } catch (InterruptedException interruptedException) {
            logger.warn("Master event service interrupted, will exit this loop", interruptedException);
            Thread.currentThread().interrupt();
            break;
        } catch (Exception e) {
            logger.error("Master event execute service error", e);
        }
    }
}

咱们在处理任务实例的时候不是产生了很多的状态事件么,我个人理解这里就是状态机的意思,eventExecuteService主要的最用就是根据这些状态事件,修改数据库中的状态,并且清楚原来产生的数据。

这里DolphinScheduler的master就启动完成了。但是还没有彻底完,还记得我们把要提交给work执行的任务都放在了一个队列中吗。要找到是在哪里消费这个任务的(我找这个找了半天~~!)

可以从两个地方找到。1、代码目录中有个consumer包下就是消费代码(说实话没人说根本不知道)。2、找到放任务到队列中的代码。this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);然后看看哪里用到了这个TaskPriorityQueueImpl。根据这个类名就知道是个实现类,事实上也确实有接口TaskPriorityQueue。所以看看哪里用到了TaskPriorityQueue。在idea中还是比较容易找到的。

这个时候就得结合类名猜了,第一个包含消费者字眼。所以进入TaskPriorityQueueConsumer。发现也是一个线程。所以老套路,直接找run。

@PostConstruct
public void init() {
    this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils
            .newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
    logger.info("Task priority queue consume thread staring");
    super.start();
    logger.info("Task priority queue consume thread started");
}

@Override
public void run() {
    // 可以自定义每次分发的个数。
    int fetchTaskNum = masterConfig.getDispatchTaskNumber();
    while (!ServerLifeCycleManager.isStopped()) {
        try {
        // 进入方法
            List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);

            if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
                logger.info("{} tasks dispatch failed, will retry to dispatch", failedDispatchTasks.size());
                TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
                for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                    taskPriorityQueue.put(dispatchFailedTask);
                }
                if (fetchTaskNum == failedDispatchTasks.size()) {
                    logger.info("All tasks dispatch failed, will sleep a while to avoid the master cpu higher");
                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                }
            }
        } catch (Exception e) {
            TaskMetrics.incTaskDispatchError();
            logger.error("dispatcher task error", e);
        }
    }
}

/**
 * batch dispatch with thread pool
 */
public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
    List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch latch = new CountDownLatch(fetchTaskNum);

    for (int i = 0; i < fetchTaskNum; i++) {
        TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
        if (Objects.isNull(taskPriority)) {
            latch.countDown();
            continue;
        }

        consumerThreadPoolExecutor.submit(() -> {
            try {
            // 代码很简单,进入方法
                boolean dispatchResult = this.dispatchTask(taskPriority);
                if (!dispatchResult) {
                    failedDispatchTasks.add(taskPriority);
                }
            } finally {
                latch.countDown();
            }
        });
    }

    latch.await();

    return failedDispatchTasks;
}

/**
 * Dispatch task to worker.
 *
 * @param taskPriority taskPriority
 * @return dispatch result, return true if dispatch success, return false if dispatch failed.
 */
protected boolean dispatchTask(TaskPriority taskPriority) {
    TaskMetrics.incTaskDispatch();
    boolean result = false;
    try {
        WorkflowExecuteRunnable workflowExecuteRunnable =
                processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
        if (workflowExecuteRunnable == null) {
            logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
            return true;
        }
        Optional<TaskInstance> taskInstanceOptional =
                workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
        if (!taskInstanceOptional.isPresent()) {
            logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
                    taskPriority);
            return true;
        }
        TaskInstance taskInstance = taskInstanceOptional.get();
        TaskExecutionContext context = taskPriority.getTaskExecutionContext();
        ExecutionContext executionContext =
                new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(),
                        taskInstance);

        if (isTaskNeedToCheck(taskPriority)) {
            if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
                logger.info("Task {} is already finished, no need to dispatch, task instance id: {}",
                        taskInstance.getName(), taskInstance.getId());
                return true;
            }
        }
        // 进入方法
        result = dispatcher.dispatch(executionContext);

        if (result) {
            logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
                    taskPriority.getTaskId(),
                    executionContext.getHost());
            addDispatchEvent(context, executionContext);
        } else {
            logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
                    taskPriority.getTaskId(),
                    executionContext.getHost());
        }
    } catch (RuntimeException | ExecuteException e) {
        logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
    }
    return result;
}

/**
 * task dispatch
 *
 * @param context context
 * @return result
 * @throws ExecuteException if error throws ExecuteException
 */
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
    ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
    if (executorManager == null) {
        throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
    }
    // 选择一个host
    Host host = hostManager.select(context);
    if (StringUtils.isEmpty(host.getAddress())) {
        logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute",
            context.getCommand(), context.getWorkerGroup());
        return false;
    }
    context.setHost(host);
    // 类似springboot的启动,之前前、后的增强
    executorManager.beforeExecute(context);
    try {
        // 进入方法
        return executorManager.execute(context);
    } finally {
        executorManager.afterExecute(context);
    }
}

/**
 * execute logic
 *
 * @param context context
 * @return result
 * @throws ExecuteException if error throws ExecuteException
 */
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
    Set<String> allNodes = getAllNodes(context);
    Set<String> failNodeSet = new HashSet<>();
    Command command = context.getCommand();
    Host host = context.getHost();
    boolean success = false;
    while (!success) {
        try {
        // 进入方法
            doExecute(host, command);
            success = true;
            context.setHost(host);
            context.getTaskInstance().setHost(host.getAddress());
        } catch (ExecuteException ex) {
            logger.error("Execute command {} error", command, ex);
            try {
                failNodeSet.add(host.getAddress());
                Set<String> tmpAllIps = new HashSet<>(allNodes);
                Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                if (remained != null && remained.size() > 0) {
                    host = Host.of(remained.iterator().next());
                    logger.error("retry execute command : {} host : {}", command, host);
                } else {
                    throw new ExecuteException("fail after try all nodes");
                }
            } catch (Throwable t) {
                throw new ExecuteException("fail after try all nodes");
            }
        }
    }

    return success;
}

/**
 * execute logic
 *
 * @param host host
 * @param command command
 * @throws ExecuteException if error throws ExecuteException
 */
public void doExecute(final Host host, final Command command) throws ExecuteException {
    // 重试次数,没有用其他的重试框架,简单的重试3次,失败抛出异常
    int retryCount = 3;
    boolean success = false;
    do {
        try {
            // 最终发送的地方。不在跟进去看了,已经很明显了。netty给work发送命令。再由work执行
            nettyRemotingClient.send(host, command);
            success = true;
        } catch (Exception ex) {
            logger.error("Send command to {} error, command: {}", host, command, ex);
            retryCount--;
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        }
    } while (retryCount >= 0 && !success);

    if (!success) {
        throw new ExecuteException(String.format("send command : %s to %s error", command, host));
    }
}

有了master相关的源码经验,work也是类似的分析过程。api模块就更不用说了,和普通的web项目一样,不进一步跟源码。接下来对work进行源码分析。

work

和master一样,根据启动脚本,找到启动类

BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}

source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"

export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME}

JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

if [[ "$DOCKER" == "true" ]]; then
  JAVA_OPTS="${JAVA_OPTS} -XX:-UseContainerSupport"
fi
# 启动类为 org.apache.dolphinscheduler.server.worker.WorkerServer
$JAVA_HOME/bin/java $JAVA_OPTS \
  -cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \
  org.apache.dolphinscheduler.server.worker.WorkerServer

进入org.apache.dolphinscheduler.server.worker.WorkerServer

@PostConstruct
public void run() {
    // netty通信server
    this.workerRpcServer.start();
    // netty通信client
    this.workerRpcClient.start();
    // 任务组件加载
    this.taskPluginManager.loadPlugin();
    // 注册服务和心跳等
    this.workerRegistryClient.setRegistryStoppable(this);
    this.workerRegistryClient.start();
    // 从这名字就可以看出来是和任务的执行、管理有关。所以这个是核心。进入方法
    this.workerManagerThread.start();
    // 任务执行信息发送
    this.messageRetryRunner.start();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (!ServerLifeCycleManager.isStopped()) {
            close("WorkerServer shutdown hook");
        }
    }));
}

public void start() {
    logger.info("Worker manager thread starting");
    Thread thread = new Thread(this, this.getClass().getName());
    thread.setDaemon(true);
    // 找到run方法
    thread.start();
    logger.info("Worker manager thread started");
}

@Override
public void run() {
    Thread.currentThread().setName("Worker-Execute-Manager-Thread");
    while (!ServerLifeCycleManager.isStopped()) {
        try {
            if (!ServerLifeCycleManager.isRunning()) {
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            }
            if (this.getThreadPoolQueueSize() <= workerExecThreads) {
            // waitSubmitQueue是一个延迟队列,这个延迟队列存放了从master发过来的要执行的任务(这里有个疑问,任务是何时、怎么存储到这个队列中的。这里先不跟代码,要不然来回跳代码容易乱。分析完任务的执行在说明这里)。
                final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
                // 进入submit方法
                workerExecService.submit(workerDelayTaskExecuteRunnable);
            } else {
                WorkerServerMetrics.incWorkerOverloadCount();
                logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
                        this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize());
                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
            }
        } catch (Exception e) {
            logger.error("An unexpected interrupt is happened, "
                    + "the exception will be ignored and this thread will continue to run", e);
        }
    }
}

public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
    taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
    // 这里使用ListenableFuture 异步非阻塞线程的方式执行任务,如果有结果了会自动通知。所以我们需要跟WorkerTaskExecuteRunnable的run方法
    ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
    FutureCallback futureCallback = new FutureCallback() {

        @Override
        public void onSuccess(Object o) {
            taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
        }

        @Override
        public void onFailure(Throwable throwable) {
            logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
                    taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
                    taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
                    throwable);
            taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
        }
    };
    Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}


@Override
public void run() {
    try {
        Thread.currentThread().setName(taskExecutionContext.getTaskLogName());

        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
        logger.info("Begin to pulling task");
        // 初始化任务信息。taskExecutionContext。其实我们日常开发中也可以用上下文Context。我看到一些业务代码中,一直用参数的形式传递任务信息。导致代码特别臃肿。而且参数经常赋值错误。
        initializeTask();

        if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
            taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
            taskExecutionContext.setEndTime(new Date());
            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
            workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress,
                    CommandType.TASK_EXECUTE_RESULT);
            logger.info(
                    "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
            return;
        }
        // 执行前后的增强
        beforeExecute();

        TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender)
                .masterAddress(masterAddress).build();
        // 执行任务,进入方法        
        executeTask(taskCallBack);

        afterExecute();

    } catch (Throwable ex) {
        logger.error("Task execute failed, due to meet an exception", ex);
        afterThrowing(ex);
    } finally {
        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}


@Override
public void executeTask(TaskCallBack taskCallBack) throws TaskException {
    if (task == null) {
        throw new TaskException("The task plugin instance is not initialized");
    }
    // 根据不通的任务插件,处理任务。不同的任务加载就是在this.taskPluginManager.loadPlugin();环节。感兴趣可以跟进去看看
    // 我们以shell类型任务实现具体看看。
    task.handle(taskCallBack);
}

/**
这里先不直接看代码,想想如果要你在代码中实现java执行shell命令,你怎么办。
我先说说我的思路吧
1、组装命令
2、使用Runtime或者ProcessBuilder执行命令(如果对这块不了解,自行搜索)
3、由于是两个进程,所以需要解决跨进程获取shell执行日志的问题
4、获取进程号,方便控制进程的执行、暂停、终止。
*/

// 执行shell类型的任务

@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
    try {
        // 构建命令
        String command = buildCommand();
        // 执行,进入run方法
        TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
        setExitStatusCode(commandExecuteResult.getExitStatusCode());
        setProcessId(commandExecuteResult.getProcessId());
        shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        logger.error("The current Shell task has been interrupted", e);
        setExitStatusCode(EXIT_CODE_FAILURE);
        throw new TaskException("The current Shell task has been interrupted", e);
    } catch (Exception e) {
        logger.error("shell task error", e);
        setExitStatusCode(EXIT_CODE_FAILURE);
        throw new TaskException("Execute shell task error", e);
    }
}

public TaskResponse run(String execCommand) throws IOException, InterruptedException {
    TaskResponse result = new TaskResponse();
    int taskInstanceId = taskRequest.getTaskInstanceId();
    if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
        result.setExitStatusCode(EXIT_CODE_KILL);
        return result;
    }
    if (StringUtils.isEmpty(execCommand)) {
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
        return result;
    }

    String commandFilePath = buildCommandFilePath();

    createCommandFileIfNotExists(execCommand, commandFilePath);
    
    // 构建java执行shell脚本的processBuilder
    buildProcess(commandFilePath);
    // 分析日志输出
    parseProcessOutput(process);
    // 获取进程号
    int processId = getProcessId(process);
    // 设置任务属性,用于master更新任务信息
    result.setProcessId(processId);
    taskRequest.setProcessId(processId);
    boolean updateTaskExecutionContextStatus =
            TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
    if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
        ProcessUtils.kill(taskRequest);
        result.setExitStatusCode(EXIT_CODE_KILL);
        return result;
    }
    logger.info("process start, process id is: {}", processId);
    long remainTime = getRemainTime();
    // 等待shell执行结束
    boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
    if (status) {
        result.setExitStatusCode(process.exitValue());

    } else {
        logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
                taskRequest.getTaskTimeout());
        ProcessUtils.kill(taskRequest);
        result.setExitStatusCode(EXIT_CODE_FAILURE);
    }
    // 防止进程执行异常,如果还没有停止抛出异常
    int exitCode = process.exitValue();
    String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
    logger.info(exitLogMessage
            + " execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
            taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
    return result;

}

到这一个shell类型的任务就在work中执行完了,剩下的就是在把任务的执行结果发送给master。由master进行流程、任务的信息维护。

还没结束啊,回想下,我们不是之前遗留了一个问题。waitSubmitQueue中的任务是合适放到队列中的。

还是先提供思路,waitSubmitQueue有take方法,那么必然有个add或者offer方法。找到有个offer的地方。

public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
    if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
        return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
    }

    if (waitSubmitQueue.size() > workerExecThreads) {
        logger.warn("Wait submit queue is full, will retry submit task later");
        WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
        // if waitSubmitQueue is full, it will wait 1s, then try add
        ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        if (waitSubmitQueue.size() > workerExecThreads) {
            return false;
        }
    }
    return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
}

// 之前看源码是从上至下看。这次我们要找哪里用到了,所以从下至上找找

TaskDispatchProcessor.process

public void process(Channel channel, Command command) {
    Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
            String.format("invalid command type : %s", command.getType()));

    TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);

    if (taskDispatchCommand == null) {
        logger.error("task execute request command content is null");
        return;
    }
    final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
    logger.info("Receive task dispatch request, command: {}", taskDispatchCommand);

    TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();

    if (taskExecutionContext == null) {
        logger.error("task execution context is null");
        return;
    }
    try {
        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
        TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());

        TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
        taskExecutionContext.setHost(workerConfig.getWorkerAddress());
        taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));

        long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
                taskExecutionContext.getDelayTime() * 60L);
        if (remainTime > 0) {
            logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
            taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
            workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress,
                    CommandType.TASK_EXECUTE_RESULT);
        }

        WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder
                .createWorkerDelayTaskExecuteRunnableFactory(
                        taskExecutionContext,
                        workerConfig,
                        workflowMasterAddress,
                        workerMessageSender,
                        alertClientService,
                        taskPluginManager,
                        storageOperate)
                .createWorkerTaskExecuteRunnable();
        // submit task to manager
        boolean offer = workerManager.offer(workerTaskExecuteRunnable);
        if (!offer) {
            logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
            workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
        } else {
            logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize());
        }
    } finally {
        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}

// 继续
NettyServerHandler

private void processReceived(final Channel channel, final Command msg) {
    final CommandType commandType = msg.getType();
    if (CommandType.HEART_BEAT.equals(commandType)) {
        if (logger.isDebugEnabled()) {
            logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
        }
        return;
    }
    final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
    if (pair != null) {
        Runnable r = () -> {
            try {
                pair.getLeft().process(channel, msg);
            } catch (Exception ex) {
                logger.error("process msg {} error", msg, ex);
            }
        };
        try {
            pair.getRight().submit(r);
        } catch (RejectedExecutionException e) {
            logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
        }
    } else {
        logger.warn("commandType {} not support", commandType);
    }
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    processReceived(ctx.channel(), (Command) msg);
}

// channelRead这个方法熟悉吗。没错就是netty里面的那个,所以我们可以回到work程序入口哪里,进入this.workerRpcServer.start();看看去

public void start() {
    LOGGER.info("Worker rpc server starting");
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(workerConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
            taskExecuteRunningAckProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_SAVEPOINT_REQUEST, taskSavePointProcessor);
   
    this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
    // 进入start方法
    this.nettyRemotingServer.start();
    LOGGER.info("Worker rpc server started");
}

/**
 * server start
 */
public void start() {
    if (isStarted.compareAndSet(false, true)) {
        this.serverBootstrap
                .group(this.bossGroup, this.workGroup)
                .channel(NettyUtils.getServerSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
                .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
                .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
                .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
                .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) {
                        // 不用多说吧。进入initNettyChannel
                        initNettyChannel(ch);
                    }
                });

        ChannelFuture future;
        try {
            future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
        } catch (Exception e) {
            logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
            throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
        }
        if (future.isSuccess()) {
            logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
        } else if (future.cause() != null) {
            throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
        } else {
            throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
        }
    }
}

/**
 * init netty channel
 *
 * @param ch socket channel
 */
private void initNettyChannel(SocketChannel ch) {
    ch.pipeline()
            .addLast("encoder", new NettyEncoder())
            .addLast("decoder", new NettyDecoder())
            .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
            // serverHandler 就是NettyServerHandler
            .addLast("handler", serverHandler);
}

这时候是不是串起来了?netty服务接收到客户端发来的信息。将任务信息放到了waitSubmitQueue队列中。后边其他的线程执行器在从里面取,然后处理。

我们在回顾文章开头提出的三个问题,想必都已经有了答案了。

结语

至此DolphinScheduler的所有重要源码就分析完了。代码很多,逻辑很长。所以在写文章的时候,总会忽略一些代码,或者思维的跳跃导致读者有点晕。可能受限于文字表达能力,也可能是框架的结构如此(各种线程、异步、解耦、分布式)。

博客只是一个方面,帮助大家可以“更快”的进入源码。如果感兴趣还是得自己亲自跟一下代码,有些地方由于要来回跳代码,导致不能很好的展示出来,自己跟源码的时候会有更深刻的理解。

DolphinScheduler作为Apache顶级项目,其中还是有很多值得学习地方。也说一点我自己的感受吧。

1、代码中大量使用线程处理。保证了系统的性能。

2、各个模块之间解耦。不同模块就干自己的事情。比如api模块就是对外提供web服务、master模块提供任务编排、切分、状态信息维护以及work心跳维护、work模块就专注与任务的处理。也正是这样的架构,使得DolphinScheduler在大数据场景下的可扩展性和可靠性。

3、单个模块内部使用很多的map、queue。每个环节都有自己明确的工作。从整体来看就像是一台机器,每个部件都有自己的作用。逻辑清晰。这也想到平常自己写代码,对于简单的代码,往往很容易写出来逻辑清晰的代码,但是当业务复杂之后,往往代码会写成“一坨”。这样的代码别人再读起来也是不清晰,而且要维护起来也是难以下手。可以借鉴DolphinScheduler。1、加载数据;2、组装数据;3、归类数据;4、执行处理。

4、状态的管理。我们在自己写代码维护业务状态的时候,总会写的很乱,恨不得每执行一个操作,就修改一下状态。而DolphinScheduler将状态都抽象成一个事件。每次都添加一个状态,然后有专门的线程或者统一处理这些事件。

个人能力有限,希望大家可以提出来一些问题,一起进步。