目录
一、概览
在 DolphinScheduler 中,Worker 负责从 Master 接收调度命令、执行具体的 Task,并将执行结果通过消息回传给 Master。Worker 在 JVM 中启动后,需要依次完成以下关键组件的初始化和启动:
Netty RPC 服务端(
workerRpcServer
)和客户端(workerRpcClient
)任务插件管理器(
taskPluginManager
)注册中心客户端(
workerRegistryClient
)Worker 管理线程(调度任务分发)
消息重试线程(保证消息可靠性)
最终,Worker 进入“消息接收 → 任务封装 → 入队等待 → 线程池执行 → 结果回传”的循环流程,持续响应 Master 的分发请求并执行任务。
二、Worker 启动入口 run()
Worker 的入口方法标记为 @PostConstruct
,即在 Spring 容器完成 Bean 注入后自动调用。其源码如下:
@PostConstruct public void run() { // 1. 启动 rpc 服务 this.workerRpcServer.start(); this.workerRpcClient.start(); // 2. 加载插件 this.taskPluginManager.loadPlugin(); // 3. 启动注册中心客户端 this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.start(); // 4. 启动管理线程 this.workerManagerThread.start(); // 5. 启动消息重试线程 this.messageRetryRunner.start(); }
– 第1步:启动 RPC 服务(服务端 + 客户端),用于接收 Master 下发的分发消息以及向 Master 发送执行结果。 – 第2步:通过 SPI 加载 User 自定义或内置的 TaskChannelFactory,实现对不同 Task 类型的支持。 – 第3步:启动注册中心客户端,将自己注册到 Zookeeper(或其他注册中心)上,以便 Master 可发现并下发任务。 – 第4步:启动 Worker 管理线程,用于不断从等待队列取出任务并提交给线程池执行。 – 第5步:启动消息重试线程,负责对由于网络抖动或 Master 未及时 ACK 而需要重试发送的消息进行重发。
三、启动 RPC 服务
3.1 Netty RPC Server(接收请求)
public void start() { NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig(); serverConfig.setListenPort(workerConfig.getListenPort()); nettyRemotingServer = new NettyRemotingServer(serverConfig); for (WorkerRpcProcessor processor : workerRpcProcessors) { nettyRemotingServer.registerProcessor(processor); } this.nettyRemotingServer.start(); }
读取配置:从
workerConfig
中获取 RPC 服务端配置,包括端口、线程池大小等。构造
NettyRemotingServer
:底层基于 Netty 封装的服务器类,用于接收网络消息。注册 Processor:通过
nettyRemotingServer.registerProcessor
将各类消息处理器(如WorkerTaskDispatchProcessor
)绑定到不同的消息类型上。启动服务器:调用 Netty 的
bind(port).sync()
,启动 BossGroup 和 WorkerGroup,监听客户端连接。
底层实现重点在 NettyRemotingServer.start()
,核心伪码如下:
if (isStarted.compareAndSet(false, true)) { serverBootstrap .group(bossGroup, workGroup) .channel(getServerSocketChannelClass()) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) { initNettyChannel(ch); } }); serverBootstrap.bind(listenPort).sync(); }
BossGroup 负责接收连接,WorkerGroup 负责处理 I/O 读写。
通过
ChannelInitializer
向 Pipeline 中注入解码器、编码器、心跳检测等 Handler。
3.2 Netty RPC Client(发送结果)
public void start() { this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig()); for (WorkerRpcProcessor processor : workerRpcProcessors) { this.nettyRemotingClient.registerProcessor(processor); } }
NettyRemotingClient
构造时会初始化:
EventLoopGroup(Epoll 或 NIO)
Callback Executor:处理响应回调
Response Future Executor:扫描超时的未回包请求
其启动逻辑:
bootstrap.group(workerGroup) .channel(getSocketChannelClass()) .option(TCP_NODELAY, clientConfig.isTcpNoDelay()) .handler(new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new IdleStateHandler(...)) .addLast(new NettyDecoder(), clientHandler, encoder); } }); responseFutureExecutor.scheduleWithFixedDelay( ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS); isStarted.set(true);
连接池与重连策略:客户端在发送消息时会根据连接状态自动重连或重建 Channel。
心跳机制:通过
IdleStateHandler
定期向 Master 发送心跳,保持连接活跃。超时重试:
ResponseFuture
定时扫描未收到回包的请求,触发超时失败或重试。
四、加载 Task 插件
Worker 支持多种 Task 类型(Shell、Spark、MapReduce……),这些逻辑都通过 SPI 插件化管理:
public void loadPlugin() { PrioritySPIFactory<TaskChannelFactory> factoryLoader = new PrioritySPIFactory<>(TaskChannelFactory.class); for (Map.Entry<String, TaskChannelFactory> entry : factoryLoader.getSPIMap().entrySet()) { String name = entry.getKey(); TaskChannelFactory factory = entry.getValue(); taskChannelFactoryMap.put(name, factory); taskChannelMap.put(name, factory.create()); } }
PrioritySPIFactory
利用ServiceLoader.load(spiClass)
扫描 classpath 下所有META-INF/services/...
配置的实现。冲突处理:若多个插件使用同一标识(
t.getIdentify().getName()
),则通过resolveConflict
按优先级或版本选择。实例化 Factory 并调用
create()
,生成具体 TaskChannel,用于执行阶段构建执行命令、日志采集等。
插件加载完成后,Worker 便可根据 Task 类型动态路由到对应的执行实现。
五、注册中心客户端启动
Worker 需要向统一的注册中心(如 Zookeeper)注册自己的可用性信息,Master 才能发现并调度任务给它。
public void start() { registry(); registryClient.addConnectionStateListener( new WorkerConnectionStateListener(workerConfig, strategy)); } private void registry() { WorkerHeartBeat hb = workerHeartBeatTask.getHeartBeat(); String path = workerConfig.getWorkerRegistryPath(); registryClient.remove(path); registryClient.persistEphemeral(path, JSONUtils.toJsonString(hb)); workerHeartBeatTask.start(); }
persistEphemeral
:将自身地址、可用线程数等写入${registryRoot}/workers/${workerAddress}
,并由 ZK 管理生命周期。心跳定时任务:
workerHeartBeatTask.start()
定时刷新节点数据/TTL,保证长连接下的可用性信息及时更新。连接监听:若与注册中心断开,
WorkerConnectionStateListener
可触发重连或优雅退出。
六、Worker 管理线程启动
Worker 启动后,用一个独立线程不断从“等待提交队列”中取出任务并提交给线程池。
public void run() { while (!ServerLifeCycleManager.isStopped()) { if (!ServerLifeCycleManager.isRunning()) { Thread.sleep(SLEEP_TIME); } if (getThreadPoolQueueSize() <= workerExecThreads) { WorkerDelayTaskExecuteRunnable task = waitSubmitQueue.take(); workerExecService.submit(task); } else { incOverloadCount(); Thread.sleep(SLEEP_TIME); } } }
waitSubmitQueue
:阻塞队列,存放待执行的WorkerDelayTaskExecuteRunnable
实例。背压机制:当线程池队列已满(
> workerExecThreads
)时,先统计过载指标,再稍后重试。workerExecService
:底层是带监听能力的ListeningExecutorService
,执行完毕后可注册回调处理日志和结果汇总。
七、消息重试线程启动
Worker 执行过程中需要向 Master 回传任务执行结果、日志位点、心跳等消息。若网络抖动导致消息发送失败,需重试:
public void run() { while (!ServerLifeCycleManager.isStopped()) { if (needToRetryMessages.isEmpty()) { Thread.sleep(MESSAGE_RETRY_WINDOW); } long now = System.currentTimeMillis(); for (entry in needToRetryMessages) { for (message in entry.getValue()) { if (now - message.getSendTime() > RETRY_WINDOW) { message.setSendTime(now); messageSenderMap.get(type).sendMessage(message); } } } Thread.sleep(SLEEP_TIME); } }
needToRetryMessages
:按 TaskInstanceId 分类的待重试消息列表。定时扫描:以
MESSAGE_RETRY_WINDOW
(如 30 秒)为周期,判断消息是否超时,若超时则重新调用对应的messageSender
去向 Master 派发。幂等与日志:每次重试都会刷新发送时间,并输出重试日志,保证 Master 端能最终收到消息或记录重试失败。
八、RPC 请求到任务执行的完整链路
Master 通过 RPC 调用
WorkerRpcClient.sendMessage
将TaskDispatchRequest
发往指定 Worker。Worker Netty Server 的
NettyServerHandler.channelRead()
接收消息,调用processReceived()
:pair = processors.get(msg.getType()); pair.getRight().submit(() -> processor.process(channel, msg));
WorkerTaskDispatchProcessor.process()
反序列化TaskDispatchRequest
,构建TaskExecutionContext
并缓存。检查是否需要延时执行(
delayTime
);若需延时,则先发送延时执行消息给 Master,跳过入队。通过
WorkerTaskExecuteRunnableFactoryBuilder
构造WorkerDelayTaskExecuteRunnable
,并调用:if (!workerManager.offer(runnable)) { sendDispatchRejectResult(); } else { sendDispatchSuccessResult(); }
workerManager.offer()
根据满载策略将 Runnable 放入waitSubmitQueue
,等待管理线程消费。管理线程取出后提交给线程池执行,进入
WorkerDelayTaskExecuteRunnable.run()
,真正执行 TaskChannel 的execute()
。Task 执行过程中通过
workerRpcClient
向 Master 发送日志、状态更新等消息;若发送失败,加入needToRetryMessages
由重试线程处理。Task 完成后,最终调用
messageSenderMap.get(TASK_EXECUTE_RESULT).sendMessage(resultMessage)
,Master 收到后更新实例状态。
九、总结
DolphinScheduler Worker 的启动流程,实际上是一套“轻量级调度节点”从初始化、服务注册、插件加载,到任务接收、入队执行、结果回传以及消息可靠性保障的完整闭环。
可插拔:SPI 机制加载 TaskChannel 工厂,支持按需扩展。
高并发:Netty + 线程池,实现主从分离、异步解耦。
可靠性:注册中心心跳、RPC 心跳、消息重试,保证节点可用性和消息可靠交互。
可监控:丰富的度量指标(队列长度、过载次数、重试次数),有助于运维监控和自动扩缩容。