DolphinScheduler 3.2.0 Worker启动核心源码解析

发布于:2025-07-10 ⋅ 阅读:(24) ⋅ 点赞:(0)

目录

一、概览

二、Worker 启动入口 run()

三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

3.2 Netty RPC Client(发送结果)

四、加载 Task 插件

五、注册中心客户端启动

六、Worker 管理线程启动

七、消息重试线程启动

八、RPC 请求到任务执行的完整链路

九、总结


一、概览

在 DolphinScheduler 中,Worker 负责从 Master 接收调度命令、执行具体的 Task,并将执行结果通过消息回传给 Master。Worker 在 JVM 中启动后,需要依次完成以下关键组件的初始化和启动:

  • Netty RPC 服务端(workerRpcServer)和客户端(workerRpcClient

  • 任务插件管理器(taskPluginManager

  • 注册中心客户端(workerRegistryClient

  • Worker 管理线程(调度任务分发)

  • 消息重试线程(保证消息可靠性)

最终,Worker 进入“消息接收 → 任务封装 → 入队等待 → 线程池执行 → 结果回传”的循环流程,持续响应 Master 的分发请求并执行任务。


二、Worker 启动入口 run()

Worker 的入口方法标记为 @PostConstruct,即在 Spring 容器完成 Bean 注入后自动调用。其源码如下:

@PostConstruct
public void run() {
    // 1. 启动 rpc 服务
    this.workerRpcServer.start();
    this.workerRpcClient.start();
    // 2. 加载插件
    this.taskPluginManager.loadPlugin();
    // 3. 启动注册中心客户端
    this.workerRegistryClient.setRegistryStoppable(this);
    this.workerRegistryClient.start();
    // 4. 启动管理线程
    this.workerManagerThread.start();
    // 5. 启动消息重试线程
    this.messageRetryRunner.start();
}

第1步:启动 RPC 服务(服务端 + 客户端),用于接收 Master 下发的分发消息以及向 Master 发送执行结果。 – 第2步:通过 SPI 加载 User 自定义或内置的 TaskChannelFactory,实现对不同 Task 类型的支持。 – 第3步:启动注册中心客户端,将自己注册到 Zookeeper(或其他注册中心)上,以便 Master 可发现并下发任务。 – 第4步:启动 Worker 管理线程,用于不断从等待队列取出任务并提交给线程池执行。 – 第5步:启动消息重试线程,负责对由于网络抖动或 Master 未及时 ACK 而需要重试发送的消息进行重发。


三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

public void start() {
    NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig();
    serverConfig.setListenPort(workerConfig.getListenPort());
    nettyRemotingServer = new NettyRemotingServer(serverConfig);
    for (WorkerRpcProcessor processor : workerRpcProcessors) {
        nettyRemotingServer.registerProcessor(processor);
    }
    this.nettyRemotingServer.start();
}
  1. 读取配置:从 workerConfig 中获取 RPC 服务端配置,包括端口、线程池大小等。

  2. 构造 NettyRemotingServer:底层基于 Netty 封装的服务器类,用于接收网络消息。

  3. 注册 Processor:通过 nettyRemotingServer.registerProcessor 将各类消息处理器(如 WorkerTaskDispatchProcessor)绑定到不同的消息类型上。

  4. 启动服务器:调用 Netty 的 bind(port).sync(),启动 BossGroup 和 WorkerGroup,监听客户端连接。

底层实现重点在 NettyRemotingServer.start(),核心伪码如下:

if (isStarted.compareAndSet(false, true)) {
    serverBootstrap
        .group(bossGroup, workGroup)
        .channel(getServerSocketChannelClass())
        .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel ch) {
                initNettyChannel(ch);
            }
        });
    serverBootstrap.bind(listenPort).sync();
}
  • BossGroup 负责接收连接,WorkerGroup 负责处理 I/O 读写。

  • 通过 ChannelInitializer 向 Pipeline 中注入解码器、编码器、心跳检测等 Handler。

3.2 Netty RPC Client(发送结果)

public void start() {
    this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig());
    for (WorkerRpcProcessor processor : workerRpcProcessors) {
        this.nettyRemotingClient.registerProcessor(processor);
    }
}

NettyRemotingClient 构造时会初始化:

  • EventLoopGroup(Epoll 或 NIO)

  • Callback Executor:处理响应回调

  • Response Future Executor:扫描超时的未回包请求

其启动逻辑:

bootstrap.group(workerGroup)
    .channel(getSocketChannelClass())
    .option(TCP_NODELAY, clientConfig.isTcpNoDelay())
    .handler(new ChannelInitializer<SocketChannel>() {
        public void initChannel(SocketChannel ch) {
            ch.pipeline()
              .addLast(new IdleStateHandler(...))
              .addLast(new NettyDecoder(), clientHandler, encoder);
        }
    });
responseFutureExecutor.scheduleWithFixedDelay(
    ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS);
isStarted.set(true);
  • 连接池与重连策略:客户端在发送消息时会根据连接状态自动重连或重建 Channel。

  • 心跳机制:通过 IdleStateHandler 定期向 Master 发送心跳,保持连接活跃。

  • 超时重试ResponseFuture 定时扫描未收到回包的请求,触发超时失败或重试。


四、加载 Task 插件

Worker 支持多种 Task 类型(Shell、Spark、MapReduce……),这些逻辑都通过 SPI 插件化管理:

public void loadPlugin() {
    PrioritySPIFactory<TaskChannelFactory> factoryLoader
        = new PrioritySPIFactory<>(TaskChannelFactory.class);
    for (Map.Entry<String, TaskChannelFactory> entry : factoryLoader.getSPIMap().entrySet()) {
        String name = entry.getKey();
        TaskChannelFactory factory = entry.getValue();
        taskChannelFactoryMap.put(name, factory);
        taskChannelMap.put(name, factory.create());
    }
}
  1. PrioritySPIFactory 利用 ServiceLoader.load(spiClass) 扫描 classpath 下所有 META-INF/services/... 配置的实现。

  2. 冲突处理:若多个插件使用同一标识(t.getIdentify().getName()),则通过 resolveConflict 按优先级或版本选择。

  3. 实例化 Factory 并调用 create(),生成具体 TaskChannel,用于执行阶段构建执行命令、日志采集等。

插件加载完成后,Worker 便可根据 Task 类型动态路由到对应的执行实现。


五、注册中心客户端启动

Worker 需要向统一的注册中心(如 Zookeeper)注册自己的可用性信息,Master 才能发现并调度任务给它。

public void start() {
    registry();
    registryClient.addConnectionStateListener(
        new WorkerConnectionStateListener(workerConfig, strategy));
}
private void registry() {
    WorkerHeartBeat hb = workerHeartBeatTask.getHeartBeat();
    String path = workerConfig.getWorkerRegistryPath();
    registryClient.remove(path);
    registryClient.persistEphemeral(path, JSONUtils.toJsonString(hb));
    workerHeartBeatTask.start();
}
  • persistEphemeral:将自身地址、可用线程数等写入 ${registryRoot}/workers/${workerAddress},并由 ZK 管理生命周期。

  • 心跳定时任务workerHeartBeatTask.start() 定时刷新节点数据/TTL,保证长连接下的可用性信息及时更新。

  • 连接监听:若与注册中心断开,WorkerConnectionStateListener 可触发重连或优雅退出。


六、Worker 管理线程启动

Worker 启动后,用一个独立线程不断从“等待提交队列”中取出任务并提交给线程池。

public void run() {
    while (!ServerLifeCycleManager.isStopped()) {
        if (!ServerLifeCycleManager.isRunning()) {
            Thread.sleep(SLEEP_TIME);
        }
        if (getThreadPoolQueueSize() <= workerExecThreads) {
            WorkerDelayTaskExecuteRunnable task = waitSubmitQueue.take();
            workerExecService.submit(task);
        } else {
            incOverloadCount();
            Thread.sleep(SLEEP_TIME);
        }
    }
}
  • waitSubmitQueue:阻塞队列,存放待执行的 WorkerDelayTaskExecuteRunnable 实例。

  • 背压机制:当线程池队列已满(> workerExecThreads)时,先统计过载指标,再稍后重试。

  • workerExecService:底层是带监听能力的 ListeningExecutorService,执行完毕后可注册回调处理日志和结果汇总。


七、消息重试线程启动

Worker 执行过程中需要向 Master 回传任务执行结果、日志位点、心跳等消息。若网络抖动导致消息发送失败,需重试:

public void run() {
    while (!ServerLifeCycleManager.isStopped()) {
        if (needToRetryMessages.isEmpty()) {
            Thread.sleep(MESSAGE_RETRY_WINDOW);
        }
        long now = System.currentTimeMillis();
        for (entry in needToRetryMessages) {
            for (message in entry.getValue()) {
                if (now - message.getSendTime() > RETRY_WINDOW) {
                    message.setSendTime(now);
                    messageSenderMap.get(type).sendMessage(message);
                }
            }
        }
        Thread.sleep(SLEEP_TIME);
    }
}
  • needToRetryMessages:按 TaskInstanceId 分类的待重试消息列表。

  • 定时扫描:以 MESSAGE_RETRY_WINDOW(如 30 秒)为周期,判断消息是否超时,若超时则重新调用对应的 messageSender 去向 Master 派发。

  • 幂等与日志:每次重试都会刷新发送时间,并输出重试日志,保证 Master 端能最终收到消息或记录重试失败。


八、RPC 请求到任务执行的完整链路

  1. Master 通过 RPC 调用 WorkerRpcClient.sendMessageTaskDispatchRequest 发往指定 Worker。

  2. Worker Netty Server 的 NettyServerHandler.channelRead() 接收消息,调用 processReceived()

    pair = processors.get(msg.getType());
    pair.getRight().submit(() -> processor.process(channel, msg));
  3. WorkerTaskDispatchProcessor.process() 反序列化 TaskDispatchRequest,构建 TaskExecutionContext 并缓存。

  4. 检查是否需要延时执行(delayTime);若需延时,则先发送延时执行消息给 Master,跳过入队。

  5. 通过 WorkerTaskExecuteRunnableFactoryBuilder 构造 WorkerDelayTaskExecuteRunnable,并调用:

    if (!workerManager.offer(runnable)) {
        sendDispatchRejectResult();
    } else {
        sendDispatchSuccessResult();
    }
  6. workerManager.offer() 根据满载策略将 Runnable 放入 waitSubmitQueue,等待管理线程消费。

  7. 管理线程取出后提交给线程池执行,进入 WorkerDelayTaskExecuteRunnable.run(),真正执行 TaskChannel 的 execute()

  8. Task 执行过程中通过 workerRpcClient 向 Master 发送日志、状态更新等消息;若发送失败,加入 needToRetryMessages 由重试线程处理。

  9. Task 完成后,最终调用 messageSenderMap.get(TASK_EXECUTE_RESULT).sendMessage(resultMessage),Master 收到后更新实例状态。


九、总结

DolphinScheduler Worker 的启动流程,实际上是一套“轻量级调度节点”从初始化、服务注册、插件加载,到任务接收、入队执行、结果回传以及消息可靠性保障的完整闭环。

  • 可插拔:SPI 机制加载 TaskChannel 工厂,支持按需扩展。

  • 高并发:Netty + 线程池,实现主从分离、异步解耦。

  • 可靠性:注册中心心跳、RPC 心跳、消息重试,保证节点可用性和消息可靠交互。

  • 可监控:丰富的度量指标(队列长度、过载次数、重试次数),有助于运维监控和自动扩缩容。


网站公告

今日签到

点亮在社区的每一天
去签到