前言
本来想要先介绍一下DolphinScheduler 是什么,与其他任务调度框架的比较等等。但是感觉自己的语言组织能力是一方面,而且网上已经由很多的相关博客。在写无非是粘贴复制,没有太大的意义。所以这里只简单的说明下我们为什么要用DolphinScheduler。对技术选型、架构感兴趣的可以去网上搜一搜。
为什么选择DolphinScheduler
1、去中性化:不会因为master挂掉导致服务不可用。
2、分布式:水平扩展容易,只要增加work节点既可。
3、丰富的任务类型:包括shell、http、sqoop、datax、seatunnel等常用的任务类型。
4、可视化强大:支持拖拽的任务编排方式。
5、监控:任务状态、系统状态等监控完善。
源码
个人经验,看源码之前我们要知道为什么看源码,想从源码中得到什么信息,带着问题去看源码。否则源码很多,而且来来回回跳,跟着跟着就乱了。所以这里先提出几个问题,如果大家有类似的疑惑,让我们一起去源码中看看是怎么做的。
1、DolphinScheduler 是一个分布式系统,那么节点之间是怎么通信的?
2、DolphinScheduler 是怎么启动的?
3、DolphinScheduler 是怎么执行一个任务的?
让我们带着这些问题在源码里面找找答案
master
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
if [[ "$DOCKER" == "true" ]]; then
JAVA_OPTS="${JAVA_OPTS} -XX:-UseContainerSupport"
fi
# 实际的启动命令,因为是一个开源的框架,所以使用java -cp方式启动。找到了关键类MasterServer
$JAVA_HOME/bin/java $JAVA_OPTS \
-cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \
org.apache.dolphinscheduler.server.master.MasterServer
进入org.apache.dolphinscheduler.server.master.MasterServer
/**
* run master server
*/
@PostConstruct
public void run() throws SchedulerException {
// 启动远程调用功能(实际用的是netty)
this.masterRPCServer.start();
// 组件加载
this.taskPluginManager.loadPlugin();
// 注册相关
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
// 调度程序初始化和启动
this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();
// 事件执行线程
this.eventExecuteService.start();
// 容灾和失败线程
this.failoverExecuteThread.start();
// 调度相关线程
this.schedulerApi.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!ServerLifeCycleManager.isStopped()) {
close("MasterServer shutdownHook");
}
}));
}
进入this.masterRPCServer.start()
public void start() {
logger.info("Starting Master RPC Server...");
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
// 给不同类型的命令,绑定不同的执行器
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST,
workflowExecutingDataRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
// rpc核心启动
this.nettyRemotingServer.start();
logger.info("Started Master RPC Server...");
}
// 进入this.nettyRemotingServer.start();
/**
* server start
*/
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
initNettyChannel(ch);
}
});
ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
} else {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
}
}
很明显,将master搞成了个netty服务端,所以这里可以大胆的猜测。work服务肯定也是一个netty服务端和客户端。而且要用netty进行信息的传递。至于传什么。这里能肯定的是心跳和数据同步肯定是通过netty实现的。所以问题1、DolphinScheduler 是一个分布式系统,那么节点之间是怎么通信的?就有了答案。
回到主程序。其他的this.taskPluginManager.loadPlugin();和this.masterRegistryClient.start();this.masterRegistryClient.setRegistryStoppable(this);不是重点,不跟进去看了。
进入masterSchedulerBootstrap相关的操作
this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();
/**
* constructor of MasterSchedulerService
启动守护线程
绑定master所在的服务器地址
*/
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils
.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
// 重要代码this.masterSchedulerBootstrap.start()
@Override
public synchronized void start() {
logger.info("Master schedule bootstrap starting..");
// 启动的核心
super.start();
workflowEventLooper.start();
logger.info("Master schedule bootstrap started...");
}
// super.start()看到这方法就知道这个类是一个线程类。所以直接在代码中找run方法
/**
* run of MasterSchedulerService
*/
@Override
public void run() {
// 直接进来一个while循环一直运行。都不用多想。代码里面肯定有thread.sleep。要不然cpu不干爆了
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// 找到所有要找到的流程,这个是从api模块点击运行生成的。
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// 将命令转为流程实例对象集合
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
MasterServerMetrics.incMasterConsumeCommand(commands.size());
processInstances.forEach(processInstance -> {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
logger.error(
"The workflow instance is already been cached, this case shouldn't be happened");
}
// 封装成WorkflowExecuteRunnable工作流执行的任务,看名字都知道内部肯定有run方法。
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
processService,
processInstanceDao,
nettyExecutorManager,
processAlertManager,
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService);
// 先将所有WorkflowExecuteRunnable和流程实例绑定
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
// 添加一个事件到工作流事件队列中
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
});
} catch (InterruptedException interruptedException) {
logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Master schedule workflow error", e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
// 此时我们拿到了所有需要执行的任务实例,进入workflowEventLooper.start();
@Override
public synchronized void start() {
logger.info("WorkflowEventLooper thread starting");
// 老方法,找到run
super.start();
logger.info("WorkflowEventLooper thread started");
}
public void run() {
WorkflowEvent workflowEvent = null;
while (!ServerLifeCycleManager.isStopped()) {
try {
// 从队列中拿到在上面代码中放入的事件,并且只有一种类型START_WORKFLOW。
workflowEvent = workflowEventQueue.poolEvent();
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
// 这里怎么就get拿东西了,是从哪里放进去的?代码搜个init方法,在上边也贴出来了
WorkflowEventHandler workflowEventHandler =
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
// 进入这个方法
workflowEventHandler.handleWorkflowEvent(workflowEvent);
} catch (InterruptedException e) {
logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
Thread.currentThread().interrupt();
break;
} catch (WorkflowEventHandleException workflowEventHandleException) {
logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
workflowEvent, workflowEventHandleException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (WorkflowEventHandleError workflowEventHandleError) {
logger.error("Handle workflow event error, will drop this event, event: {}",
workflowEvent,
workflowEventHandleError);
} catch (Exception unknownException) {
logger.error(
"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
workflowEvent, unknownException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
@PostConstruct
public void init() {
workflowEventHandlerList.forEach(
workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
workflowEventHandler));
}
@Override
public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
// 还记的之前processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);维护的关系吗,从这个map中取出来WorkflowExecuteRunnable
WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(
workflowEvent.getWorkflowInstanceId());
if (workflowExecuteRunnable == null) {
throw new WorkflowEventHandleError(
"The workflow start event is invalid, cannot find the workflow instance from cache");
}
ProcessInstanceMetrics.incProcessInstanceByState("submit");
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
// 进入call方法
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
logger.info("Success submit the workflow instance");
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
} else if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
logger.error(
"Failed to submit the workflow instance, will send fail state event: {}",
workflowEvent);
WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()
.processInstanceId(processInstance.getId())
.type(StateEventType.PROCESS_SUBMIT_FAILED)
.status(WorkflowExecutionStatus.FAILURE)
.build();
workflowExecuteRunnable.addStateEvent(stateEvent);
}
});
}
@Override
public WorkflowSubmitStatue call() {
if (isStart()) {
logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
}
try {
// 每次处理完都会执行workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;也就意味着每个流程实例都会执行一边这个逻辑。
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
// 如果有看DolphinScheduler文档的话,他会把任务构建成有向无环图。这里就是干这个事
buildFlowDag();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
// 初始换任务队列
initTaskQueue();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
// 进入方法
submitPostNode(null);
workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
return WorkflowSubmitStatue.SUCCESS;
} catch (Exception e) {
logger.error("Start workflow error", e);
return WorkflowSubmitStatue.FAILED;
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
Set<String> submitTaskNodeList =
DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode());
if (existTaskInstanceOptional.isPresent()) {
taskInstances.add(existTaskInstanceOptional.get());
continue;
}
TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
taskInstances.add(task);
}
if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)) {
TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
String taskInstanceVarPool = endTaskInstance.getVarPool();
if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
String processInstanceVarPool = processInstance.getVarPool();
if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties = new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
properties.addAll(taskProperties);
processInstance.setVarPool(JSONUtils.toJsonString(properties));
} else {
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
}
}
}
for (TaskInstance task : taskInstances) {
if (readyToSubmitTaskQueue.contains(task)) {
logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId());
continue;
}
if (task.getId() != null && completeTaskMap.containsKey(task.getTaskCode())) {
logger.info("Task has already run success, taskName: {}", task.getName());
continue;
}
if (task.getState().isKill()) {
logger.info("Task is be stopped, the state is {}, taskInstanceId: {}", task.getState(), task.getId());
continue;
}
// 这个方法实际就是往readyToSubmitTaskQueue队列中放准备提交的任务。
addTaskToStandByList(task);
}
// 进入方法
submitStandByTask();
updateProcessInstanceState();
}
/**
* handling the list of tasks to be submitted
整体的逻辑就是从readyToSubmitTaskQueue拿到准备好提交的任务,然后先处理头任务,如果存在依赖,在处理依赖的任务
*/
public void submitStandByTask() throws StateEventHandleException {
int length = readyToSubmitTaskQueue.size();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
if (task == null) {
continue;
}
if (task.taskCanRetry()) {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
logger.info(
"Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
}
if (task.isFirstRun()) {
Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
getPreVarPool(task, preTask);
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
logger.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());
// 提交任务,进入方法
Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
if (!removeTaskFromStandbyList(task)) {
logger.error(
"Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}",
processInstance.getId(),
task.getTaskCode());
}
completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
errorTaskMap.put(task.getTaskCode(), task.getId());
activeTaskProcessorMaps.remove(task.getTaskCode());
logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}",
task.getProcessInstanceId(),
task.getId(),
task.getTaskCode());
} else {
removeTaskFromStandbyList(task);
}
} else if (DependResult.FAILED == dependResult) {
dependFailedTaskSet.add(task.getTaskCode());
removeTaskFromStandbyList(task);
logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(),
dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
removeTaskFromStandbyList(task);
logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}",
task.getId(), dependResult);
}
}
}
/**
* submit task to execute
代码很长,只找到重要的看看
*
* @param taskInstance task instance
* @return TaskInstance
*/
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
processService.packageTaskInstance(taskInstance, processInstance);
// 使用spi的方式获取与任务类型匹配的任务执行器。(autoservice spi可以在网上搜一搜)
// 这里就不跟进去了,感兴趣可以去看看
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
taskProcessor.init(taskInstance, processInstance);
if (taskInstance.getState().isRunning()
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
// 具体的执行逻辑,TaskAction有很多状态,进入方法
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("Submit standby task failed!, taskCode: {}, taskName: {}",
taskInstance.getTaskCode(),
taskInstance.getName());
return Optional.empty();
}
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
if (validTaskMap.containsKey(taskInstance.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode());
if (taskInstance.getId() != oldTaskInstanceId) {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
oldTaskInstance.setFlag(Flag.NO);
processService.updateTaskInstance(oldTaskInstance);
validTaskMap.remove(taskInstance.getTaskCode());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
}
}
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
taskInstance.getName(),
taskGroupId,
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info(
"Submitted task will not be dispatch right now because the first time to try to acquire" +
" task group failed, taskInstanceName: {}, taskGroupId: {}",
taskInstance.getName(), taskGroupId);
return Optional.of(taskInstance);
}
}
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(),
taskInstance.getName());
return Optional.empty();
}
taskProcessor.action(TaskAction.RUN);
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);
if (taskProcessor.taskInstance().getState().isFinished()) {
if (processInstance.isBlocked()) {
TaskStateEvent processBlockEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(taskInstance.getId())
.status(taskProcessor.taskInstance().getState())
.type(StateEventType.PROCESS_BLOCKED)
.build();
this.stateEvents.add(processBlockEvent);
}
TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(taskInstance.getId())
.status(taskProcessor.taskInstance().getState())
.type(StateEventType.TASK_STATE_CHANGE)
.build();
this.stateEvents.add(taskStateChangeEvent);
}
return Optional.of(taskInstance);
} catch (Exception e) {
logger.error("Submit standby task {} error, taskCode: {}", taskInstance.getName(),
taskInstance.getTaskCode(), e);
return Optional.empty();
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
/**
可以看到TaskAction的状态很多,从上边的代码中可以找到几个状态的处理
SUBMIT(提交)、DISPATCH(转发,这里是重要状态,看这个代码)、RUN(运行)。
我理解这里的主要作用是用来修改任务不同节点的状态,分的有点细了,不知道这里当时的设计理念。
*/
@Override
public boolean action(TaskAction taskAction) {
String threadName = Thread.currentThread().getName();
if (StringUtils.isNotEmpty(threadLoggerInfoName)) {
Thread.currentThread().setName(threadLoggerInfoName);
}
boolean result = false;
try {
switch (taskAction) {
case STOP:
result = stop();
break;
case PAUSE:
result = pause();
break;
case TIMEOUT:
result = timeout();
break;
case SUBMIT:
result = submit();
break;
case RUN:
result = run();
break;
case DISPATCH:
// 进入方法
result = dispatch();
break;
case RESUBMIT:
result = resubmit();
break;
default:
logger.error("unknown task action: {}", taskAction);
}
return result;
} finally {
// reset thread name
Thread.currentThread().setName(threadName);
}
}
// 我们进入CommonTaskProcessor实现类的方法中
// 有个任务队列的处理话,这里可以记一下。因为后边会用到。我当时就是在这里卡了好久,想不明白,任务到底是怎么被消费的。
private TaskPriorityQueue<TaskPriority> taskUpdateQueue;
public void initQueue() {
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
}
@Override
public boolean dispatchTask() {
try {
if (taskUpdateQueue == null) {
this.initQueue();
}
if (taskInstance.getState().isFinished()) {
logger.info("Task {} has already finished, no need to submit to task queue, taskState: {}",
taskInstance.getName(), taskInstance.getState());
return true;
}
if (taskInstance.getState() == TaskExecutionStatus.RUNNING_EXECUTION
|| taskInstance.getState() == TaskExecutionStatus.DELAY_EXECUTION) {
logger.info("Task {} is already running or delayed, no need to submit to task queue, taskState: {}",
taskInstance.getName(), taskInstance.getState());
return true;
}
logger.info("Task {} is ready to dispatch to worker", taskInstance.getName());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(), taskInstance.getTaskGroupPriority(),
Constants.DEFAULT_WORKER_GROUP);
TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
if (taskExecutionContext == null) {
logger.error("Get taskExecutionContext fail, task: {}", taskInstance);
return false;
}
taskPriority.setTaskExecutionContext(taskExecutionContext);
// 主要为了把任务放到这个队列中,以供后续消费
taskUpdateQueue.put(taskPriority);
logger.info("Task {} is submitted to priority queue success by master", taskInstance.getName());
return true;
} catch (Exception e) {
logger.error("Task {} is submitted to priority queue error", taskInstance.getName(), e);
return false;
}
}
masterSchedulerBootstrap方法还是很深的。但是最主要的作用就是把任务进行包装,并且修改任务的状态,然后把任务放到消费队列中,等待消费,发给work节点。
接下来就是执行this.eventExecuteService.start();
@Override
public synchronized void start() {
logger.info("Master Event execute service starting");
super.start();
logger.info("Master Event execute service started");
}
@Override
public void run() {
while (!ServerLifeCycleManager.isStopped()) {
try {
workflowEventHandler();
streamTaskEventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (InterruptedException interruptedException) {
logger.warn("Master event service interrupted, will exit this loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Master event execute service error", e);
}
}
}
咱们在处理任务实例的时候不是产生了很多的状态事件么,我个人理解这里就是状态机的意思,eventExecuteService主要的最用就是根据这些状态事件,修改数据库中的状态,并且清楚原来产生的数据。
这里DolphinScheduler的master就启动完成了。但是还没有彻底完,还记得我们把要提交给work执行的任务都放在了一个队列中吗。要找到是在哪里消费这个任务的(我找这个找了半天~~!)
可以从两个地方找到。1、代码目录中有个consumer包下就是消费代码(说实话没人说根本不知道)。2、找到放任务到队列中的代码。this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);然后看看哪里用到了这个TaskPriorityQueueImpl。根据这个类名就知道是个实现类,事实上也确实有接口TaskPriorityQueue。所以看看哪里用到了TaskPriorityQueue。在idea中还是比较容易找到的。
这个时候就得结合类名猜了,第一个包含消费者字眼。所以进入TaskPriorityQueueConsumer。发现也是一个线程。所以老套路,直接找run。
@PostConstruct
public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils
.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
logger.info("Task priority queue consume thread staring");
super.start();
logger.info("Task priority queue consume thread started");
}
@Override
public void run() {
// 可以自定义每次分发的个数。
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
while (!ServerLifeCycleManager.isStopped()) {
try {
// 进入方法
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
logger.info("{} tasks dispatch failed, will retry to dispatch", failedDispatchTasks.size());
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
if (fetchTaskNum == failedDispatchTasks.size()) {
logger.info("All tasks dispatch failed, will sleep a while to avoid the master cpu higher");
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
} catch (Exception e) {
TaskMetrics.incTaskDispatchError();
logger.error("dispatcher task error", e);
}
}
}
/**
* batch dispatch with thread pool
*/
public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(fetchTaskNum);
for (int i = 0; i < fetchTaskNum; i++) {
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(taskPriority)) {
latch.countDown();
continue;
}
consumerThreadPoolExecutor.submit(() -> {
try {
// 代码很简单,进入方法
boolean dispatchResult = this.dispatchTask(taskPriority);
if (!dispatchResult) {
failedDispatchTasks.add(taskPriority);
}
} finally {
latch.countDown();
}
});
}
latch.await();
return failedDispatchTasks;
}
/**
* Dispatch task to worker.
*
* @param taskPriority taskPriority
* @return dispatch result, return true if dispatch success, return false if dispatch failed.
*/
protected boolean dispatchTask(TaskPriority taskPriority) {
TaskMetrics.incTaskDispatch();
boolean result = false;
try {
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
return true;
}
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
if (!taskInstanceOptional.isPresent()) {
logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
taskPriority);
return true;
}
TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext =
new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(),
taskInstance);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
logger.info("Task {} is already finished, no need to dispatch, task instance id: {}",
taskInstance.getName(), taskInstance.getId());
return true;
}
}
// 进入方法
result = dispatcher.dispatch(executionContext);
if (result) {
logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
addDispatchEvent(context, executionContext);
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
}
} catch (RuntimeException | ExecuteException e) {
logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
}
return result;
}
/**
* task dispatch
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
// 选择一个host
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute",
context.getCommand(), context.getWorkerGroup());
return false;
}
context.setHost(host);
// 类似springboot的启动,之前前、后的增强
executorManager.beforeExecute(context);
try {
// 进入方法
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
Set<String> allNodes = getAllNodes(context);
Set<String> failNodeSet = new HashSet<>();
Command command = context.getCommand();
Host host = context.getHost();
boolean success = false;
while (!success) {
try {
// 进入方法
doExecute(host, command);
success = true;
context.setHost(host);
context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
logger.error("Execute command {} error", command, ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
logger.error("retry execute command : {} host : {}", command, host);
} else {
throw new ExecuteException("fail after try all nodes");
}
} catch (Throwable t) {
throw new ExecuteException("fail after try all nodes");
}
}
}
return success;
}
/**
* execute logic
*
* @param host host
* @param command command
* @throws ExecuteException if error throws ExecuteException
*/
public void doExecute(final Host host, final Command command) throws ExecuteException {
// 重试次数,没有用其他的重试框架,简单的重试3次,失败抛出异常
int retryCount = 3;
boolean success = false;
do {
try {
// 最终发送的地方。不在跟进去看了,已经很明显了。netty给work发送命令。再由work执行
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error("Send command to {} error, command: {}", host, command, ex);
retryCount--;
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} while (retryCount >= 0 && !success);
if (!success) {
throw new ExecuteException(String.format("send command : %s to %s error", command, host));
}
}
有了master相关的源码经验,work也是类似的分析过程。api模块就更不用说了,和普通的web项目一样,不进一步跟源码。接下来对work进行源码分析。
work
和master一样,根据启动脚本,找到启动类
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME}
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
if [[ "$DOCKER" == "true" ]]; then
JAVA_OPTS="${JAVA_OPTS} -XX:-UseContainerSupport"
fi
# 启动类为 org.apache.dolphinscheduler.server.worker.WorkerServer
$JAVA_HOME/bin/java $JAVA_OPTS \
-cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \
org.apache.dolphinscheduler.server.worker.WorkerServer
进入org.apache.dolphinscheduler.server.worker.WorkerServer
@PostConstruct
public void run() {
// netty通信server
this.workerRpcServer.start();
// netty通信client
this.workerRpcClient.start();
// 任务组件加载
this.taskPluginManager.loadPlugin();
// 注册服务和心跳等
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();
// 从这名字就可以看出来是和任务的执行、管理有关。所以这个是核心。进入方法
this.workerManagerThread.start();
// 任务执行信息发送
this.messageRetryRunner.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!ServerLifeCycleManager.isStopped()) {
close("WorkerServer shutdown hook");
}
}));
}
public void start() {
logger.info("Worker manager thread starting");
Thread thread = new Thread(this, this.getClass().getName());
thread.setDaemon(true);
// 找到run方法
thread.start();
logger.info("Worker manager thread started");
}
@Override
public void run() {
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
// waitSubmitQueue是一个延迟队列,这个延迟队列存放了从master发过来的要执行的任务(这里有个疑问,任务是何时、怎么存储到这个队列中的。这里先不跟代码,要不然来回跳代码容易乱。分析完任务的执行在说明这里)。
final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
// 进入submit方法
workerExecService.submit(workerDelayTaskExecuteRunnable);
} else {
WorkerServerMetrics.incWorkerOverloadCount();
logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize());
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
logger.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will continue to run", e);
}
}
}
public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
// 这里使用ListenableFuture 异步非阻塞线程的方式执行任务,如果有结果了会自动通知。所以我们需要跟WorkerTaskExecuteRunnable的run方法
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
@Override
public void onFailure(Throwable throwable) {
logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}
@Override
public void run() {
try {
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("Begin to pulling task");
// 初始化任务信息。taskExecutionContext。其实我们日常开发中也可以用上下文Context。我看到一些业务代码中,一直用参数的形式传递任务信息。导致代码特别臃肿。而且参数经常赋值错误。
initializeTask();
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info(
"The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
return;
}
// 执行前后的增强
beforeExecute();
TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender)
.masterAddress(masterAddress).build();
// 执行任务,进入方法
executeTask(taskCallBack);
afterExecute();
} catch (Throwable ex) {
logger.error("Task execute failed, due to meet an exception", ex);
afterThrowing(ex);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@Override
public void executeTask(TaskCallBack taskCallBack) throws TaskException {
if (task == null) {
throw new TaskException("The task plugin instance is not initialized");
}
// 根据不通的任务插件,处理任务。不同的任务加载就是在this.taskPluginManager.loadPlugin();环节。感兴趣可以跟进去看看
// 我们以shell类型任务实现具体看看。
task.handle(taskCallBack);
}
/**
这里先不直接看代码,想想如果要你在代码中实现java执行shell命令,你怎么办。
我先说说我的思路吧
1、组装命令
2、使用Runtime或者ProcessBuilder执行命令(如果对这块不了解,自行搜索)
3、由于是两个进程,所以需要解决跨进程获取shell执行日志的问题
4、获取进程号,方便控制进程的执行、暂停、终止。
*/
// 执行shell类型的任务
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// 构建命令
String command = buildCommand();
// 执行,进入run方法
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Shell task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current Shell task has been interrupted", e);
} catch (Exception e) {
logger.error("shell task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("Execute shell task error", e);
}
}
public TaskResponse run(String execCommand) throws IOException, InterruptedException {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(execCommand)) {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}
String commandFilePath = buildCommandFilePath();
createCommandFileIfNotExists(execCommand, commandFilePath);
// 构建java执行shell脚本的processBuilder
buildProcess(commandFilePath);
// 分析日志输出
parseProcessOutput(process);
// 获取进程号
int processId = getProcessId(process);
// 设置任务属性,用于master更新任务信息
result.setProcessId(processId);
taskRequest.setProcessId(processId);
boolean updateTaskExecutionContextStatus =
TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
ProcessUtils.kill(taskRequest);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
logger.info("process start, process id is: {}", processId);
long remainTime = getRemainTime();
// 等待shell执行结束
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
if (status) {
result.setExitStatusCode(process.exitValue());
} else {
logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
ProcessUtils.kill(taskRequest);
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
// 防止进程执行异常,如果还没有停止抛出异常
int exitCode = process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
logger.info(exitLogMessage
+ " execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
return result;
}
到这一个shell类型的任务就在work中执行完了,剩下的就是在把任务的执行结果发送给master。由master进行流程、任务的信息维护。
还没结束啊,回想下,我们不是之前遗留了一个问题。waitSubmitQueue中的任务是合适放到队列中的。
还是先提供思路,waitSubmitQueue有take方法,那么必然有个add或者offer方法。找到有个offer的地方。
public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
}
if (waitSubmitQueue.size() > workerExecThreads) {
logger.warn("Wait submit queue is full, will retry submit task later");
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
// if waitSubmitQueue is full, it will wait 1s, then try add
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
if (waitSubmitQueue.size() > workerExecThreads) {
return false;
}
}
return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
}
// 之前看源码是从上至下看。这次我们要找哪里用到了,所以从下至上找找
TaskDispatchProcessor.process
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);
if (taskDispatchCommand == null) {
logger.error("task execute request command content is null");
return;
}
final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
logger.info("Receive task dispatch request, command: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
if (taskExecutionContext == null) {
logger.error("task execution context is null");
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress,
CommandType.TASK_EXECUTE_RESULT);
}
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder
.createWorkerDelayTaskExecuteRunnableFactory(
taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate)
.createWorkerTaskExecuteRunnable();
// submit task to manager
boolean offer = workerManager.offer(workerTaskExecuteRunnable);
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
} else {
logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize());
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
// 继续
NettyServerHandler
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
if (CommandType.HEART_BEAT.equals(commandType)) {
if (logger.isDebugEnabled()) {
logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
}
return;
}
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = () -> {
try {
pair.getLeft().process(channel, msg);
} catch (Exception ex) {
logger.error("process msg {} error", msg, ex);
}
};
try {
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("commandType {} not support", commandType);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (Command) msg);
}
// channelRead这个方法熟悉吗。没错就是netty里面的那个,所以我们可以回到work程序入口哪里,进入this.workerRpcServer.start();看看去
public void start() {
LOGGER.info("Worker rpc server starting");
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_SAVEPOINT_REQUEST, taskSavePointProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
// 进入start方法
this.nettyRemotingServer.start();
LOGGER.info("Worker rpc server started");
}
/**
* server start
*/
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 不用多说吧。进入initNettyChannel
initNettyChannel(ch);
}
});
ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
} else {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
}
}
/**
* init netty channel
*
* @param ch socket channel
*/
private void initNettyChannel(SocketChannel ch) {
ch.pipeline()
.addLast("encoder", new NettyEncoder())
.addLast("decoder", new NettyDecoder())
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
// serverHandler 就是NettyServerHandler
.addLast("handler", serverHandler);
}
这时候是不是串起来了?netty服务接收到客户端发来的信息。将任务信息放到了waitSubmitQueue队列中。后边其他的线程执行器在从里面取,然后处理。
我们在回顾文章开头提出的三个问题,想必都已经有了答案了。
结语
至此DolphinScheduler的所有重要源码就分析完了。代码很多,逻辑很长。所以在写文章的时候,总会忽略一些代码,或者思维的跳跃导致读者有点晕。可能受限于文字表达能力,也可能是框架的结构如此(各种线程、异步、解耦、分布式)。
博客只是一个方面,帮助大家可以“更快”的进入源码。如果感兴趣还是得自己亲自跟一下代码,有些地方由于要来回跳代码,导致不能很好的展示出来,自己跟源码的时候会有更深刻的理解。
DolphinScheduler作为Apache顶级项目,其中还是有很多值得学习地方。也说一点我自己的感受吧。
1、代码中大量使用线程处理。保证了系统的性能。
2、各个模块之间解耦。不同模块就干自己的事情。比如api模块就是对外提供web服务、master模块提供任务编排、切分、状态信息维护以及work心跳维护、work模块就专注与任务的处理。也正是这样的架构,使得DolphinScheduler在大数据场景下的可扩展性和可靠性。
3、单个模块内部使用很多的map、queue。每个环节都有自己明确的工作。从整体来看就像是一台机器,每个部件都有自己的作用。逻辑清晰。这也想到平常自己写代码,对于简单的代码,往往很容易写出来逻辑清晰的代码,但是当业务复杂之后,往往代码会写成“一坨”。这样的代码别人再读起来也是不清晰,而且要维护起来也是难以下手。可以借鉴DolphinScheduler。1、加载数据;2、组装数据;3、归类数据;4、执行处理。
4、状态的管理。我们在自己写代码维护业务状态的时候,总会写的很乱,恨不得每执行一个操作,就修改一下状态。而DolphinScheduler将状态都抽象成一个事件。每次都添加一个状态,然后有专门的线程或者统一处理这些事件。
个人能力有限,希望大家可以提出来一些问题,一起进步。