Flink Task线程处理模型:Mailbox

发布于:2025-09-09 ⋅ 阅读:(22) ⋅ 点赞:(0)

Task的线程 和 MailboxProcessor 的绑定

executingThread 是 Task 类(StreamTask 的父类)在构造时创建的物理线程。MailboxProcessor 是 StreamTask 用来处理异步事件和驱动其主要处理逻辑(processInput)的核心组件。它们之间的绑定关系如下:

  • Task 作为 Runnable:

    • Task 类实现了 Runnable 接口,其 run() 方法是 executingThread 的入口点。
      // ...
      public class Task implements Runnable, TaskSlotPayload {
          // ...
          private final Thread executingThread; // 在构造函数中创建
      
          public Task(/*...*/) {
              // ...
              this.executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); // this 指向 Task 实例
          }
      
          @Override
          public void run() { // 这是 executingThread 执行的入口
              try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
                  doRun(); // 调用实际的工作方法
              } finally {
                  terminationFuture.complete(executionState);
              }
          }
      
          private void doRun() {
              // ...
              // 对于 StreamTask,这里会调用到 StreamTask 的 invoke() 方法
              invokable.invoke(); // invokable 就是 StreamTask 实例
              // ...
          }
          // ...
      }
      

StreamTask创建了TaskMailboxImpl,传递给MailboxProcessor,因此是MailboxProcessor的执行线程。

 protected StreamTask(
            Environment environment,
            @Nullable TimerService timerService,
            Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
            StreamTaskActionExecutor actionExecutor)
            throws Exception {
        this(
                environment,
                timerService,
                uncaughtExceptionHandler,
                actionExecutor,
                new TaskMailboxImpl(Thread.currentThread()));
    }


this.mailboxProcessor =
                    new MailboxProcessor(
                            this::processInput, mailbox, actionExecutor, mailboxMetricsControl);

StreamTask.invoke() 和 MailboxProcessor:

  • 当 executingThread 启动并执行到 StreamTask.invoke() 时,StreamTask 会使用其内部的 MailboxProcessor 来驱动其核心事件循环。

    StreamTask.java

    // ...
    @Override
    public final void invoke() throws Exception {
        // ... (初始化,如 restoreInternal()) ...
    
        // let the task do its work
        getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();
        runMailboxLoop(); // <--- 关键调用
    
        // ... (清理工作,如 afterInvoke()) ...
    }
    
    public void runMailboxLoop() throws Exception {
        mailboxProcessor.runMailboxLoop(); // 将控制权交给 MailboxProcessor
    }
    // ...
    
  • mailboxProcessor.runMailboxLoop() 是一个阻塞调用(从 executingThread 的视角看)。这个方法会在 executingThread 上运行一个循环,不断地从邮箱 (Mailbox) 中取出邮件 (Mail) 并执行它们,或者在没有邮件时执行默认操作 (通常是 StreamTask.processInput(),用于处理输入数据和调用算子)。

Mailbox 的线程模型:

  • MailboxProcessor 被设计为在其“拥有者”线程(即 executingThread)上执行其核心循环和邮件处理。
  • TaskMailbox (被 MailboxProcessor 使用) 内部有检查,确保其关键方法(如 takeput 的某些变体,以及邮件的执行)是在预期的邮箱线程(即 executingThread)上调用的。

    MailboxProcessor.java

    public void runMailboxLoop() throws Exception {
        // ...
        final TaskMailbox localMailbox = mailbox;
    
        checkState(
                localMailbox.isMailboxThread(), // 确保当前线程是邮箱线程
                "Method must be executed by declared mailbox thread!");
        // ...
        while (isNextLoopPossible()) {
            processMail(localMailbox, false); // 处理邮件,在 executingThread 上执行
            if (isNextLoopPossible()) {
                mailboxDefaultAction.runDefaultAction(mailboxController); // 执行默认动作,在 executingThread 上执行
            }
        }
    }
    
  • MailboxDefaultAction 通常包装了 StreamTask.processInput(),所以数据处理和算子调用也是在 executingThread 上发生的。
  • 其他线程(例如网络线程接收到数据后,或者定时器线程触发定时器)想要与 StreamTask 交互时,它们不会直接调用 StreamTask 的方法,而是向其 Mailbox 中放入一个“邮件”(一个 Runnable 或 Callable)。MailboxProcessor 会在 executingThread 上从邮箱中取出这个邮件并执行它。

总结线程与 Mailbox 的绑定:

  1. Task 构造时创建 executingThread,并将 Task 自身作为 Runnable 传递给该线程。
  2. executingThread 启动后,执行 Task.run() -> Task.doRun() -> StreamTask.invoke()
  3. 在 StreamTask.invoke() 中,调用 mailboxProcessor.runMailboxLoop()
  4. mailboxProcessor.runMailboxLoop() 在 executingThread 上运行,它负责从邮箱中拉取任务并执行,或者执行默认的数据处理逻辑 (processInput)。
  5. 所有提交到该 StreamTask 邮箱的异步操作最终都会在 executingThread 上被 MailboxProcessor 串行化执行。

因此,executingThread 成为了 MailboxProcessor 的“工作线程”。MailboxProcessor 确保了 StreamTask 的核心逻辑(包括状态访问、算子调用等)都在这个单一的 executingThread 上顺序执行,从而简化了并发控制。

MailboxProcessor的功能

MailboxProcessor 是 Flink 中任务(Task)执行模型的核心组件,它实现了基于邮箱(Mailbox)的单线程执行模式。其主要能力包括:

管理邮箱 (TaskMailbox):

  • 持有一个 TaskMailbox 实例,用于存储需要串行执行的各种动作(Mail)。这些动作可以是来自外部的请求(如 Checkpoint 触发、Timer 回调)或内部控制命令。
// ... existing code ...
public class MailboxProcessor implements Closeable {

    // ... existing code ...
    /**
     * The mailbox data-structure that manages request for special actions, like timers,
     * checkpoints, ...
     */
    protected final TaskMailbox mailbox;
// ... existing code ...

执行默认动作 (MailboxDefaultAction):

  • 在邮箱为空时,会循环执行一个预定义的“默认动作”。在 StreamTask 的上下文中,这个默认动作通常是处理输入数据(processInput)。
this.mailboxProcessor =
                    new MailboxProcessor(
                            this::processInput, mailbox, actionExecutor, mailboxMetricsControl);

// ... existing code ...
    /**
     * Action that is repeatedly executed if no action request is in the mailbox. Typically record
     * processing.
     */
    protected final MailboxDefaultAction mailboxDefaultAction;
// ... existing code ...

单线程执行循环 (runMailboxLoop):

  • 核心方法 runMailboxLoop() 驱动整个执行逻辑。它会不断检查邮箱中是否有新的 Mail,如果有则执行它们;如果没有,则执行默认动作。
  • 这种机制保证了默认动作(如数据处理)和邮箱中的其他动作(如 Checkpoint、Timer 事件)之间是单线程顺序执行的,避免了并发冲突。
// ... existing code ...
    public void runMailboxLoop() throws Exception {
        suspended = !mailboxLoopRunning;

        final TaskMailbox localMailbox = mailbox;

        checkState(
                localMailbox.isMailboxThread(),
                "Method must be executed by declared mailbox thread!");

        assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

        final MailboxController mailboxController = new MailboxController(this);

        while (isNextLoopPossible()) {
            // The blocking `processMail` call will not return until default action is available.
            processMail(localMailbox, false);
            if (isNextLoopPossible()) {
                mailboxDefaultAction.runDefaultAction(
                        mailboxController); // lock is acquired inside default action as needed
            }
        }
    }
// ... existing code ...

提供邮箱执行器 (MailboxExecutor):

  • 通过 getMainMailboxExecutor() 和 getMailboxExecutor(int priority) 方法,向外部提供 MailboxExecutor。这使得其他组件(如 TimerService、CheckpointCoordinator)可以将它们的动作提交到邮箱中,由 MailboxProcessor 在其单线程循环中统一调度执行。
// ... existing code ...
    public MailboxExecutor getMainMailboxExecutor() {
        return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);
    }

    /**
     * Returns an executor service facade to submit actions to the mailbox.
     *
     * @param priority the priority of the {@link MailboxExecutor}.
     */
    public MailboxExecutor getMailboxExecutor(int priority) {
        return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
    }
// ... existing code ...

生命周期管理:

  • 实现了 Closeable 接口,并有 prepareClose() 和 close() 方法,对应 TaskMailbox 的 quiesce() 和 close()。这确保了在任务结束时,邮箱能被正确关闭,并处理(如取消)剩余的 Mail
// ... existing code ...
    /** Lifecycle method to close the mailbox for action submission. */
    public void prepareClose() {
        mailbox.quiesce();
    }

    /**
     * Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all
     * instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the
     * mailbox.
     */
    @Override
    public void close() {
        List<Mail> droppedMails = mailbox.close();
// ... existing code ...
    }

挂起与恢复:

  • MailboxProcessor 的执行循环可以被挂起 (suspend()) 和恢复(通过再次调用 runMailboxLoop() 或相关控制逻辑)。默认动作也可以通过 MailboxController 暂时挂起。
// ... existing code ...
    /** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */
    public void suspend() {
        sendPoisonMail(() -> suspended = true);
    }
// ... existing code ...

以及 MailboxController 中的 suspendDefaultAction()

异常处理:

  • reportThrowable(Throwable throwable) 方法允许将其他线程中发生的异常报告给邮箱线程,并在邮箱线程中重新抛出,从而中断任务执行。
// ... existing code ...
    public void reportThrowable(Throwable throwable) {
        sendControlMail(
                () -> {
                    if (throwable instanceof Exception) {
                        throw (Exception) throwable;
                    } else if (throwable instanceof Error) {
                        throw (Error) throwable;
                    } else {
                        throw WrappingRuntimeException.wrapIfNecessary(throwable);
                    }
                },
                "Report throwable %s",
                throwable);
    }
// ... existing code ...

度量指标控制 (MailboxMetricsController):

  • 包含一个 MailboxMetricsController 用于控制和访问邮箱相关的度量指标,如邮箱延迟、处理的邮件数量等。
// ... existing code ...
    private final MailboxMetricsController mailboxMetricsControl;

// ... existing code ...
    @VisibleForTesting
    public MailboxMetricsController getMailboxMetricsControl() {
        return this.mailboxMetricsControl;
    }
// ... existing code ...

MailboxProcessor 与 StreamTask 的互动

MailboxProcessor 为 StreamTask 提供了一个强大的、基于邮箱的单线程执行引擎。StreamTask 委托 MailboxProcessor 来驱动其核心的数据处理循环,并通过 MailboxExecutor 将所有需要与任务主线程同步的异步操作(如 Timer、Checkpoint 事件)统一提交到邮箱中进行调度。这种设计确保了任务内部操作的串行化,简化了并发控制,并提高了系统的稳定性和可维护性。

StreamTask 是 Flink 流处理任务的基类,它使用 MailboxProcessor 来管理其核心执行逻辑。

  1. 创建和持有 MailboxProcessor:

    • StreamTask 在其构造函数中创建并持有一个 MailboxProcessor 实例。
    • MailboxDefaultAction 通常被设置为 StreamTask::processInput,这意味着当邮箱为空时,StreamTask 会执行其数据处理逻辑。
    • StreamTaskActionExecutor 也被传递给 MailboxProcessor
  2. 驱动执行循环:

    • StreamTask 的 invoke() 方法是任务的执行入口。在其核心逻辑中,它会调用 mailboxProcessor.runMailboxLoop() 来启动邮箱处理循环。这个循环会一直运行,直到任务完成或被取消。
    • 代码见 StreamTask.invoke():

      StreamTask.java

      // ... existing code ...
      public final void invoke() throws Exception {
          // ... initialization ...
          try {
              // ...
              // Run mailbox until all gates will be recovered.
              mailboxProcessor.runMailboxLoop(); // 启动邮箱循环
              // ...
          } finally {
              // ... cleanup ...
              // let mailbox execution reject all new letters from this point
              mailboxProcessor.prepareClose();
              // ...
              mailboxProcessor.close();
          }
      }
      // ... existing code ...
      
  3. 提交异步动作:

    • StreamTask 及其相关的组件(如 TimerServiceSubtaskCheckpointCoordinator)需要执行一些异步操作,例如触发 Timer、执行 Checkpoint、响应外部事件等。这些操作需要确保在任务的主线程中执行,以避免并发问题。
    • StreamTask 通过从 mailboxProcessor 获取的 MailboxExecutor 来提交这些异步操作。这些操作会被封装成 Mail 放入邮箱,由 MailboxProcessor 在其循环中按顺序执行。
    • 例如,ProcessingTimeService 的实现会使用 MailboxExecutor 来调度 Timer 的触发:

      StreamOperatorFactoryUtil.java

      // ... existing code ...
          public static <OUT, OP extends StreamOperator<OUT>> OP createOperator(
                  // ...
                  MailboxExecutor mailboxExecutor = // Obtained via containingTask.getMailboxExecutorFactory()
                          containingTask
                                  .getMailboxExecutorFactory()
                                  .createExecutor(configuration.getChainIndex());
      
          // ...
          final ProcessingTimeService processingTimeService;
          if (operatorFactory instanceof ProcessingTimeServiceAware) {
              processingTimeService =
                      ((ProcessingTimeServiceAware) operatorFactory)
                              .createProcessingTimeService(mailboxExecutor);
          } else {
              processingTimeService = processingTimeServiceFactory.get();
          }
      // ... existing code ...
      
      ProcessingTimeServiceImpl 内部会使用这个 mailboxExecutor 来 execute 或 schedule 定时任务。
  4. 控制流程与状态:

    • StreamTask 的 processInput 方法(作为 MailboxDefaultAction)可以通过 MailboxDefaultAction.Controller 与 MailboxProcessor 交互。例如,当输入数据处理完毕或遇到反压时,它可以调用 controller.suspendDefaultAction() 来暂时挂起默认动作的执行,让 MailboxProcessor 优先处理邮箱中的其他 Mail
    • 代码见 StreamTask.processInput():
      // ... existing code ...
      protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
          DataInputStatus status = inputProcessor.processInput();
          switch (status) {
              // ...
              case END_OF_INPUT:
                  // Suspend the mailbox processor, it would be resumed in afterInvoke and finished
                  // after all records processed by the downstream tasks. We also suspend the default
                  // actions to avoid repeat executing the empty default operation (namely process
                  // records).
                  controller.suspendDefaultAction(); // 通过 Controller 控制 MailboxProcessor
                  mailboxProcessor.suspend();
                  return;
          }
          // ...
      }
      // ... existing code ...
      
  5. 生命周期同步:

    • StreamTask 在其生命周期的不同阶段(如 cancelTaskafterInvoke)会调用 MailboxProcessor 的相应方法(如 prepareClosecloseallActionsCompleted)来同步状态和清理资源。
    • 例如,在任务正常结束或需要最终 Checkpoint 完成后,会调用 mailboxProcessor.allActionsCompleted():

      StreamTask.java

      // ... existing code ...
          FutureUtils.waitForAll(terminationConditions)
                  .thenRun(mailboxProcessor::allActionsCompleted);
      
          // Resumes the mailbox processor. The mailbox processor would be completed
          // after all records are processed by the downstream tasks.
          mailboxProcessor.runMailboxLoop();
      // ... existing code ...
      

TaskMailboxImpl 

虽然这个类的核心结构是“一个锁(ReentrantLock)加一个队列(Deque<Mail>)”,但它的实现中包含了一些针对 Flink Task 执行模型的特定优化和设计,使其不仅仅是一个简单的线程安全队列。

@ThreadSafe
public class TaskMailboxImpl implements TaskMailbox {
    /** Lock for all concurrent ops. */
    private final ReentrantLock lock = new ReentrantLock();

    /** Internal queue of mails. */
    @GuardedBy("lock")
    private final Deque<Mail> queue = new ArrayDeque<>();

    /** Condition that is triggered when the mailbox is no longer empty. */
    @GuardedBy("lock")
    private final Condition notEmpty = lock.newCondition();

    /** The state of the mailbox in the lifecycle of open, quiesced, and closed. */
    @GuardedBy("lock")
    private State state = OPEN;

    /** Reference to the thread that executes the mailbox mails. */
    @Nonnull private final Thread taskMailboxThread;

    /**
     * The current batch of mails. A new batch can be created with {@link #tryBuildBatch()} and
     * consumed with {@link #tryTakeFromBatch()}.
     */
    private final Deque<Mail> batch = new ArrayDeque<>();

    /**
     * Performance optimization where hasNewMail == !queue.isEmpty(). Will not reflect the state of
     * {@link #batch}.
     */
    private volatile boolean hasNewMail = false;

    /**
     * Performance optimization where there is new urgent mail. When there is no urgent mail in the
     * batch, it should be checked every time mail is taken, including taking mail from batch queue.
     */
    private volatile boolean hasNewUrgentMail = false;

    public TaskMailboxImpl(@Nonnull final Thread taskMailboxThread) {
        this.taskMailboxThread = taskMailboxThread;
    }

    @VisibleForTesting
    public TaskMailboxImpl() {
        this(Thread.currentThread());
    }

核心结构:

  1. lock: ReentrantLock: 这是实现线程安全的核心。所有对内部队列 queue 和状态 state 的并发访问都由此锁保护。

  2. queue: Deque<Mail>:

    • 实际存储待处理 Mail 对象的双端队列。Mail 对象封装了需要执行的动作(通常是一个 Runnable)及其优先级。
    • 使用 ArrayDeque 作为底层实现。
  3. notEmpty: Condition:

    • 与 lock 关联的条件变量。当邮箱从空变为非空时(即有新的 Mail 被放入),会通过 notEmpty.signal() 或 notEmpty.signalAll() 来唤醒可能正在等待获取 Mail 的线程(主要是邮箱处理线程)。
    • 在 take() 方法中,如果队列为空,线程会调用 notEmpty.await() 等待。
  4. state: State (enum: OPENQUIESCEDCLOSED):

    • 表示邮箱的生命周期状态:
      • OPEN: 邮箱正常工作,可以接收和发送邮件。
      • QUIESCED: 邮箱处于静默状态,不再接受新的邮件(put 操作会失败),但仍然可以取出已有的邮件。通常在任务准备关闭时进入此状态。
      • CLOSED: 邮箱已关闭,不能进行任何操作。所有未处理的邮件会被清空。
    • 状态转换由 quiesce() 和 close() 方法控制,并且这些操作也受 lock 保护。
  5. taskMailboxThread: Thread:

    • 一个非常重要的字段,它存储了被指定为“邮箱线程”的线程引用。
    • 很多操作(如 taketryTakehasMailcreateBatchtryTakeFromBatchquiesceclose)都强制要求调用者必须是这个 taskMailboxThread,通过 checkIsMailboxThread() 进行检查。这是因为 Flink 的 Task 执行模型是单线程的,MailboxProcessor 会在其专用的线程中处理邮箱中的邮件和默认动作。
  6. batch: Deque<Mail>:

    • 这是一个性能优化的设计。MailboxProcessor 在其主循环中,会先调用 createBatch() 将主队列 queue 中的所有邮件一次性转移到这个 batch 队列中。然后,MailboxProcessor 会优先从 batch 中通过 tryTakeFromBatch() 获取邮件进行处理。
    • 目的: 减少锁的竞争。createBatch() 在持有锁的情况下将一批邮件转移出来,之后 MailboxProcessor 处理 batch 中的邮件时就不再需要频繁获取锁去访问主队列 queue。这对于高吞吐量的场景非常重要。
    • batch 的操作也仅限于 taskMailboxThread
  7. hasNewMail: volatile boolean:

    • 这是另一个性能优化。它大致反映了主队列 queue 是否为空 (!queue.isEmpty())。
    • volatile 关键字确保了不同线程对它的可见性。
    • 目的: 允许邮箱线程在不获取锁的情况下快速检查是否有新邮件。例如,在 hasMail() 和 tryTake() 方法中,会先检查 batch,然后检查 hasNewMail,只有当 hasNewMail 为 true 时,才尝试获取锁并检查主队列 queue
    • 当有新邮件通过 put() 或 putFirst()(从非邮箱线程调用时)添加到 queue 时,hasNewMail 会被设置为 true。当邮件从 queue 中被取出或通过 createBatch() 转移到 batch 时,hasNewMail 会被更新。

特别需要注意的点:

  1. 单消费者(邮箱线程)设计:

    • 尽管 put() 和 putFirst() 方法允许从任何线程添加邮件(是线程安全的),但所有取邮件的操作(taketryTakecreateBatchtryTakeFromBatch)以及生命周期管理方法(quiesceclose)都必须由 taskMailboxThread 调用。这是 Flink Mailbox 模型的核心设计,确保了任务逻辑的单线程执行。
  2. 批处理优化 (batch 队列):

    • 理解 batch 队列的作用对于分析性能至关重要。它不是一个独立的邮箱,而是主队列 queue 的一个临时缓存,用于减少锁争用。MailboxProcessor 会周期性地将 queue 中的内容“批发”到 batch 中。
  3. hasNewMail 优化hasNewMail 变量提供了一种轻量级的检查机制,避免了邮箱线程在主队列可能为空时仍频繁获取锁。

  4. 优先级处理 (takeOrNull 方法):

    • takeOrNull(Deque<Mail> queue, int priority) 方法实现了从队列中根据优先级取出邮件的逻辑。它会遍历队列,找到第一个优先级大于或等于指定 priority 的邮件并返回。这意味着高优先级的邮件(如控制命令、Checkpoint barrier)可以被优先处理。
  5. putFirst() 的特殊行为:

    • putFirst(@Nonnull Mail mail) 方法很有意思:
      • 如果调用者是 taskMailboxThread,邮件会直接被添加到 batch 队列的头部。这是因为邮箱线程是当前批次邮件的消费者,将邮件直接放入批处理队列的头部可以使其被更快处理,而无需等待下一轮 createBatch
      • 如果调用者不是 taskMailboxThread,邮件会被添加到主队列 queue 的头部,并通过 notEmpty.signal() 唤醒邮箱线程。
  6. 生命周期管理 (statequiesce()close()):

    • 邮箱的生命周期状态转换是严格控制的,并且与任务的生命周期紧密相关。
    • quiesce(): 使邮箱不再接受新邮件,但允许处理完已有的邮件。
    • close(): 彻底关闭邮箱,清空所有邮件,并唤醒所有可能在等待的线程(通过 notEmpty.signalAll()),通常是为了让它们感知到关闭状态并退出。
  7. 锁的粒度和使用:

    • ReentrantLock 用于保护对共享数据(queuestate)的访问。
    • Condition (notEmpty) 用于实现生产者-消费者模式中的等待和通知机制。
    • lock.lockInterruptibly() 在 take() 方法中使用,允许等待的邮箱线程响应中断。
  8. runExclusively(Runnable runnable):

    • 提供了一种机制,允许以独占方式在邮箱的锁保护下执行一段代码。这对于需要原子地执行多个邮箱操作(例如,检查状态然后根据状态放入邮件)的场景非常有用,可以避免竞态条件。

总而言之,TaskMailboxImpl 虽然基于简单的锁和队列,但通过引入批处理、hasNewMail 标志、严格的线程模型以及精细的生命周期管理,为 Flink 的 MailboxProcessor 提供了一个高效且功能完备的邮件调度机制。这些设计都是为了在保证单线程执行模型的前提下,最大化吞吐量并减少不必要的同步开销。

Mail 类分析

Mail 类是 Apache Flink 流处理运行时任务邮箱机制中的一个核心组件。它代表一个可执行的任务单元,绑定到特定的操作符链中,可以被下游邮箱处理器选择执行。

主要属性

  • mailOptions: 邮件选项,用于配置邮件的行为,如是否可延迟执行。
  • runnable: 要执行的操作,是一个 ThrowingRunnable 类型的实例,可以抛出异常。
  • priority: 邮件的优先级。优先级并不直接决定执行顺序,而是用于避免上下游操作符之间的活锁或死锁问题。
  • descriptionFormat 和 descriptionArgs: 用于调试和错误报告的邮件描述信息。
  • actionExecutor: 用于执行 runnable 的执行器。

Mail 类提供了三个构造函数,允许灵活地创建邮件对象:

  1. 最简单的构造函数只需要 runnablepriority 和描述信息。
  2. 可以指定 MailboxExecutor.MailOptions 来配置邮件选项。
  3. 可以指定 StreamTaskActionExecutor 来控制操作的执行方式。

核心方法

  • getMailOptions(): 获取邮件选项。
  • getPriority(): 获取邮件的优先级。如果邮件是可延迟的,则返回最小优先级。
  • tryCancel(): 尝试取消邮件的执行。
  • toString(): 返回邮件的描述信息。
  • run(): 执行邮件中的操作。

Mail 类在 Flink 的流处理任务中扮演着重要角色。它允许将任务分解为小的、可执行的单元,并通过邮箱机制进行调度和执行。这种设计有助于提高任务的并发性和响应性,同时避免复杂的同步问题。

在实际使用中,可以通过创建 Mail 对象来封装需要执行的操作,并将其提交到邮箱中等待执行。通过设置不同的优先级和选项,可以控制操作的执行顺序和行为。

MailboxExecutorImpl

MailboxExecutorImpl 实现了 flink.api.common.operators.MailboxExecutor 接口,它充当了向 Flink Task 的邮箱(TaskMailbox)提交执行单元(Runnable 或 Callable)的一个入口或门面。它的核心目标是允许其他组件将代码片段(封装为 Mail 对象)放入邮箱,这些代码片段最终会由 MailboxProcessor 在其专用的单线程中执行。

核心成员变量:

  • mailbox: TaskMailbox: 这是实际存储待执行邮件的邮箱实例。MailboxExecutorImpl 将通过它来提交新的邮件。

    MailboxExecutorImpl.java

    // ... existing code ...
        /** The mailbox that manages the submitted runnable objects. */
        @Nonnull private final TaskMailbox mailbox;
    // ... existing code ...
    
  • priority: int: 与此 MailboxExecutorImpl 实例关联的邮件的默认优先级。当通过这个执行器提交任务时,任务会带上这个优先级。
    // ... existing code ...
        private final int priority;
    // ... existing code ...
    
  • actionExecutor: StreamTaskActionExecutor: 这是一个执行器,用于实际运行封装在 Mail 对象中的命令。Mail 对象在被 MailboxProcessor 取出后,其 run() 方法会使用这个 actionExecutor 来执行具体的逻辑。
    // ... existing code ...
        private final StreamTaskActionExecutor actionExecutor;
    // ... existing code ...
    
  • mailboxProcessor: MailboxProcessor (可能为 null): 指向驱动邮箱循环的 MailboxProcessor。主要用于 isIdle() 方法的判断。
    // ... existing code ...
        private final MailboxProcessor mailboxProcessor;
    // ... existing code ...
    

构造函数:

  • 提供了两个构造函数,主要的区别在于是否传入 MailboxProcessor
    // ... existing code ...
        public MailboxExecutorImpl(
                @Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {
            this(mailbox, priority, actionExecutor, null);
        }
    
        public MailboxExecutorImpl(
                @Nonnull TaskMailbox mailbox,
                int priority,
                StreamTaskActionExecutor actionExecutor,
                MailboxProcessor mailboxProcessor) {
            this.mailbox = mailbox;
            this.priority = priority;
            this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
            this.mailboxProcessor = mailboxProcessor;
        }
    // ... existing code ...
    
    它们初始化了执行器的核心组件。

主要方法分析:

execute

// ... existing code ...
    @Override
    public void execute(
            MailOptions mailOptions,
            final ThrowingRunnable<? extends Exception> command,
            final String descriptionFormat,
            final Object... descriptionArgs) {
        try {
            mailbox.put(
                    new Mail(
                            mailOptions,
                            command,
                            priority,
                            actionExecutor,
                            descriptionFormat,
                            descriptionArgs));
        } catch (MailboxClosedException mbex) {
            throw new RejectedExecutionException(mbex);
        }
    }
// ... existing code ...
  • 这是向邮箱提交任务的核心方法。
  • 它接收一个 ThrowingRunnable 作为要执行的命令,以及 MailOptions(用于配置邮件行为,例如是否可延迟)、描述信息等。
  • 内部会创建一个新的 Mail 对象,该对象封装了传入的 command、此执行器实例的 priorityactionExecutor 以及描述信息。
  • 然后调用 mailbox.put(new Mail(...)) 将这个新创建的 Mail 对象放入 TaskMailbox 中。
  • 如果邮箱已经关闭(MailboxClosedException),则会抛出 RejectedExecutionException,这是 Executor 服务在无法接受新任务时的标准行为。

yield 

此方法设计为由邮箱线程自身调用

  • 它尝试从 mailbox 中获取一个至少具有 此执行器 priority 的邮件 (mailbox.take(priority))。这是一个阻塞操作,如果当前没有符合条件的邮件,它会等待。
  • 一旦获取到 Mail 对象,它会立即在当前线程(即邮箱线程)中执行 mail.run()
  • 目的: 允许当前正在邮箱线程中执行的某个可能耗时较长的操作(例如用户函数)主动暂停,让邮箱中其他待处理的邮件(特别是具有相同或更高优先级的邮件,如 Checkpoint Barrier)有机会执行。这是一种协作式多任务处理机制,对于保证邮箱系统的响应性至关重要。
    @Override
    public void yield() throws InterruptedException {
        Mail mail = mailbox.take(priority);
        try {
            mail.run();
        } catch (Exception ex) {
            throw WrappingRuntimeException.wrapIfNecessary(ex);
        }
    }

  1. tryYield():

    • 与 yield() 类似,但是一个非阻塞版本。
    • 它调用 mailbox.tryTake(priority) 尝试获取邮件。
    • 如果成功获取到邮件,则执行它并返回 true
    • 如果没有符合条件的邮件,则立即返回 false,不会阻塞。
    • 需要注意的是,根据 MailboxExecutor 接口的约定和 MailOptions.deferrable() 的设计,yield() 和 tryYield() 通常不会执行被标记为 "deferrable"(可延迟)的邮件。这是为了在需要快速让出执行权(例如为了处理 Checkpoint)时,避免执行那些可以稍后处理的低优先级或非紧急任务。
  2. shouldInterrupt():

    • 此方法用于指示当前正在邮箱线程中执行的操作是否应该被中断(例如,一个长时间运行的用户函数)。
    • 目前的实现是简单地检查 mailbox.hasMail(),即只要邮箱中还有任何待处理的邮件,就建议中断。
    • 代码中的 TODO: FLINK-35051 注释表明,这是一个待优化的点。理想情况下,只有当邮箱中有时间敏感的邮件(例如与 Checkpoint 相关的邮件)时,才应该建议中断,以避免不必要的性能开销。
  3. isIdle():

    // ... existing code ...
        public boolean isIdle() {
            return !mailboxProcessor.isDefaultActionAvailable()
                    && !mailbox.hasMail()
                    && mailbox.getState().isAcceptingMails();
        }
    // ... existing code ...
    
    • 检查关联的 MailboxProcessor 是否处于空闲状态。
    • 判断条件为:
      • MailboxProcessor 的默认操作(通常是 processInput)当前不可用(即被挂起)。
      • TaskMailbox 中没有待处理的邮件。
      • TaskMailbox 的状态仍然是接受邮件的状态(即不是 QUIESCED 或 CLOSED)。
    • 这个方法需要 mailboxProcessor 成员不为 null

总结与作用

MailboxExecutorImpl 为 Flink 的异步操作和事件驱动模型提供了一个关键的接口。它使得系统中的不同部分(例如 Timer Service、Checkpoint Coordinator,甚至是算子自身)能够安全地将需要在 Task 主执行线程(即邮箱线程)中执行的逻辑提交到邮箱队列。

  • 封装提交逻辑: 它将创建 Mail 对象并将其放入 TaskMailbox 的细节封装起来,提供了一个更简洁的 Executor 风格的 API。
  • 支持优先级: 允许为通过特定执行器实例提交的任务指定一个默认优先级。
  • 协作式调度 (yield/tryYield): 这是 Mailbox 模型单线程执行模式下实现并发感和响应性的核心机制。它允许长时间运行的任务主动让出控制权,确保高优先级任务(如系统事件)能够及时处理。
  • 中断提示 (shouldInterrupt): 为长时间运行的用户代码提供了一个检查点,以便在需要时(例如为了执行 Checkpoint)能够优雅地中断。

通过 MailboxExecutorImpl,Flink 能够确保所有关键的 Task 级别操作(数据处理、状态访问、Checkpoint、Timer 回调等)都在同一个线程中有序执行,从而避免了复杂的并发控制问题,简化了状态管理和一致性保证。

MailboxProcessor 细节分析

MailboxProcessor 封装了基于 Mailbox 的执行模型的完整逻辑。它的核心是一个事件循环 (runMailboxLoop),该循环持续执行两个主要任务:

  1. 处理邮箱中的邮件 (Mail): 检查 TaskMailbox 中是否有待处理的邮件(例如 Checkpoint 触发、Timer 事件、用户通过 MailboxExecutor 提交的自定义逻辑等),并按优先级顺序执行它们。
  2. 执行默认动作 (MailboxDefaultAction): 如果邮箱中没有邮件,或者邮件处理完毕后,它会执行一个“默认动作”。在 StreamTask 的上下文中,这个默认动作通常是 processInput(),即处理来自上游的数据。

这种设计确保了 Task 内部所有操作(数据处理、Checkpoint、Timer 等)的单线程执行,从而极大地简化了并发控制和状态管理。

主要结构组件:

  1. mailbox: TaskMailbox: 这是实际存储和管理 Mail 对象的组件。MailboxProcessor 从它那里获取邮件。

  2. mailboxDefaultAction: MailboxDefaultAction: 代表在邮箱空闲时重复执行的默认操作。它通过 MailboxDefaultAction.Controller 与 MailboxProcessor 交互,例如在没有输入数据时通知 MailboxProcessor 暂停调用默认动作。

  3. actionExecutor: StreamTaskActionExecutor: 用于实际执行 Mail 中封装的 RunnableMail 对象本身不直接执行逻辑,而是委托给这个执行器。

  4. 控制标志 (Control Flags) - 这些标志必须只能从邮箱线程访问,以避免竞态条件:

    • mailboxLoopRunning: boolean: 控制主事件循环是否应该继续运行。当设置为 false 时,循环会在当前迭代完成后终止。
    • suspended: boolean: 控制邮箱处理器是否被临时挂起。如果为 truerunMailboxLoop 会退出,但之后可以被重新调用以恢复。
    • suspendedDefaultAction: DefaultActionSuspension: 记录当前默认动作是否被挂起。如果非 null,表示默认动作已挂起,MailboxProcessor 不会调用它。
    // ... existing code ...
        /**
         * Control flag to terminate the mailbox processor. Once it was terminated could not be
         * restarted again. Must only be accessed from mailbox thread.
         */
        private boolean mailboxLoopRunning;
    
        /**
         * Control flag to temporary suspend the mailbox loop/processor. After suspending the mailbox
         * processor can be still later resumed. Must only be accessed from mailbox thread.
         */
        private boolean suspended;
    
        /**
         * Remembers a currently active suspension of the default action. Serves as flag to indicate a
         * suspended default action (suspended if not-null) and to reuse the object as return value in
         * consecutive suspend attempts. Must only be accessed from mailbox thread.
         */
        private DefaultActionSuspension suspendedDefaultAction;
    // ... existing code ...
    
  5. mailboxMetricsControl: MailboxMetricsController: 用于管理和暴露与邮箱相关的度量指标。

MailboxProcessor 提供了多个构造函数,允许不同程度的定制。核心的构造函数接收 MailboxDefaultActionTaskMailboxStreamTaskActionExecutor 和 MailboxMetricsController

一个常见的用法是传入一个 MailboxDefaultAction,然后 MailboxProcessor 会使用默认的 TaskMailboxImpl(与当前线程绑定)和 StreamTaskActionExecutor.IMMEDIATE

// ... existing code ...
    public MailboxProcessor(
            MailboxDefaultAction mailboxDefaultAction,
            TaskMailbox mailbox,
            StreamTaskActionExecutor actionExecutor,
            MailboxMetricsController mailboxMetricsControl) {
        this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);
        this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
        this.mailbox = Preconditions.checkNotNull(mailbox);
        this.mailboxLoopRunning = true;
        this.suspendedDefaultAction = null;
        this.mailboxMetricsControl = mailboxMetricsControl;
    }
// ... existing code ...

runMailboxLoop()

// ... existing code ...
    public void runMailboxLoop() throws Exception {
        suspended = !mailboxLoopRunning;

        final TaskMailbox localMailbox = mailbox;

        checkState(
                localMailbox.isMailboxThread(),
                "Method must be executed by declared mailbox thread!");

        assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

        final MailboxController mailboxController = new MailboxController(this);

        while (isNextLoopPossible()) {
            // The blocking `processMail` call will not return until default action is available.
            processMail(localMailbox, false);
            if (isNextLoopPossible()) {
                mailboxDefaultAction.runDefaultAction(
                        mailboxController); // lock is acquired inside default action as needed
            }
        }
    }

    private boolean isNextLoopPossible() {
        // 'Suspended' can be false only when 'mailboxLoopRunning' is true.
        return !suspended;
    }
// ... existing code ...
  • 这是 MailboxProcessor 的心脏。它在一个 while (isNextLoopPossible()) 循环中运行。
  • 前置检查: 确保该方法由指定的邮箱线程执行,并且邮箱处于 OPEN 状态。
  • 创建 MailboxControllerMailboxController 是 MailboxDefaultAction 与 MailboxProcessor 交互的桥梁。
  • 循环体:
    • processMail(localMailbox, false): 调用此方法处理邮箱中的邮件。这是一个关键步骤,它会尝试非阻塞地处理一批邮件。如果默认操作被挂起,它可能会阻塞地等待邮件或默认操作变为可用。false 表示不是单步执行。
    • if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); }: 如果循环仍然可以继续(例如,没有被挂起或关闭),并且默认动作是可用的,则执行默认动作。
  • 设计理念: 注释中提到,runMailboxLoop 的设计目标是保持热路径(默认动作,邮箱中没有邮件)尽可能快。因此,对控制标志(如 mailboxLoopRunningsuspendedDefaultAction)的检查通常与 mailbox.hasMail() 为 true 相关联。这意味着,如果要在邮箱线程内部更改这些标志,必须确保邮箱中至少有一个邮件,以便更改能被及时感知。

processMail

// ... existing code ...
    private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {
        // Doing this check is an optimization to only have a volatile read in the expected hot
        // path, locks are only
        // acquired after this point.
        boolean isBatchAvailable = mailbox.createBatch();

        // Take mails in a non-blockingly and execute them.
        boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);
        if (singleStep) {
            return processed;
        }

        // If the default action is currently not available, we can run a blocking mailbox execution
        // until the default action becomes available again.
        processed |= processMailsWhenDefaultActionUnavailable();

        return processed;
    }


    private boolean processMailsNonBlocking(boolean singleStep) throws Exception {
        long processedMails = 0;
        Optional<Mail> maybeMail;

        while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {
            if (processedMails++ == 0) {
                maybePauseIdleTimer();
            }
            runMail(maybeMail.get());
            if (singleStep) {
                break;
            }
        }
        if (processedMails > 0) {
            maybeRestartIdleTimer();
            return true;
        } else {
            return false;
        }
    }
// ... existing code ...

其中 processMailsNonBlocking 和 processMailsWhenDefaultActionUnavailable 内部会调用 runMail(Mail mail) 来实际执行邮件:

// ... existing code ...
    private void runMail(Mail mail) throws Exception {
        mailboxMetricsControl.getMailCounter().inc();
        mail.run();
// ... existing code ...
  • 此方法负责处理邮箱中的邮件。
  • mailbox.createBatch(): 首先尝试从主队列创建一批邮件到 TaskMailbox 的内部批处理队列。这是一个优化,减少锁竞争。
  • processMailsNonBlocking(singleStep): 非阻塞地处理批处理队列中的邮件。如果 singleStep 为 true,则只处理一个邮件(用于测试或调试)。
  • processMailsWhenDefaultActionUnavailable(): 如果默认动作当前不可用(例如,由于反压或没有输入),此方法会尝试从邮箱中获取并处理邮件。它可能会阻塞地等待新邮件的到来,直到默认动作再次可用或循环终止。
  • 返回 true 如果至少处理了一封邮件。

suspend()

// ... existing code ...
    /** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */
    public void suspend() {
        sendPoisonMail(() -> suspended = true);
    }

    /** Send mail in first priority for internal needs. */
    private void sendPoisonMail(RunnableWithException mail) {
        mailbox.runExclusively(
                () -> {
                    // keep state check and poison mail enqueuing atomic, such that no intermediate
                    // #close may cause a
                    // MailboxStateException in #sendPriorityMail.
                    if (mailbox.getState() == TaskMailbox.State.OPEN) {
                        sendControlMail(mail, "poison mail");
                    }
                });

    public void runExclusively(Runnable runnable) {
        lock.lock();
        try {
            runnable.run();
        } finally {
            lock.unlock();
        }
    }
// ... existing code ...
  • 用于从外部(非邮箱线程)请求挂起邮箱循环。
  • 它通过 sendPoisonMail() 向邮箱头部插入一个高优先级的“毒丸”邮件。当这个邮件被处理时,它会将 suspended 标志设置为 true,从而导致 runMailboxLoop 在下一次检查 isNextLoopPossible() 时退出。
  • Poison Mail: 是一种特殊控制邮件,用于改变 MailboxProcessor 的内部状态。

allActionsCompleted()

// ... existing code ...
    /**
     * This method must be called to end the stream task when all actions for the tasks have been
     * performed.
     */
    public void allActionsCompleted() {
        sendPoisonMail(
                () -> {
                    mailboxLoopRunning = false;
                    suspended = true;
                });
    }
// ... existing code ...
  • 当 Task 的所有动作都已完成,需要终止邮箱循环时调用此方法。
  • 与 suspend() 类似,它也通过 sendPoisonMail() 发送一个毒丸邮件。该邮件会将 mailboxLoopRunning 设置为 false 并将 suspended 设置为 true,从而彻底停止事件循环。

sendPoisonMail 和 sendControlMail(...):

// ... existing code ...
    /** Send mail in first priority for internal needs. */
    private void sendPoisonMail(RunnableWithException mail) {
        mailbox.runExclusively(
                () -> {
                    // keep state check and poison mail enqueuing atomic, such that no intermediate
                    // #close may cause a
                    // MailboxStateException in #sendPriorityMail.
                    if (mailbox.getState() == TaskMailbox.State.OPEN) {
                        sendControlMail(mail, "poison mail");
                    }
                });
    }

    /**
     * Sends the given <code>mail</code> using {@link TaskMailbox#putFirst(Mail)} . Intended use is
     * to control this <code>MailboxProcessor</code>; no interaction with tasks should be performed;
     */
    private void sendControlMail(
            RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {
        mailbox.putFirst(
                new Mail(
                        mail,
                        Integer.MAX_VALUE /*not used with putFirst*/,
                        descriptionFormat,
                        descriptionArgs));
    }
// ... existing code ...
  • sendPoisonMail: 确保在邮箱 OPEN 状态下,通过 sendControlMail 发送一个控制邮件。它使用 mailbox.runExclusively 来原子地检查状态和入队。
  • sendControlMail: 将一个具有最高优先级的 Mail 对象(通过 mailbox.putFirst())放入邮箱。这些邮件用于内部控制,如挂起、终止、报告错误等。

生命周期方法 (prepareClose()close()):

// ... existing code ...
    /** Lifecycle method to close the mailbox for action submission. */
    public void prepareClose() {
        mailbox.quiesce();
    }

    /**
     * Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all
     * instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the
     * mailbox.
     */
    @Override
    public void close() {
        List<Mail> droppedMails = mailbox.close();
// ... existing code ...
  • prepareClose(): 调用 mailbox.quiesce()。这会使邮箱进入静默状态,不再接受新的邮件,但允许处理已有的邮件。这是关闭过程的第一步。
  • close(): 调用 mailbox.close()。这会彻底关闭邮箱,清空所有未处理的邮件,并尝试取消仍在邮箱中的 RunnableFuture 实例。

与 MailboxDefaultAction 的交互 (通过 MailboxController):

// ... existing code ...
protected static final class MailboxController implements MailboxDefaultAction.Controller {

        private final MailboxProcessor mailboxProcessor;

        protected MailboxController(MailboxProcessor mailboxProcessor) {
            this.mailboxProcessor = mailboxProcessor;
        }

        @Override
        public void allActionsCompleted() {
            mailboxProcessor.allActionsCompleted();
        }

        @Override
        public MailboxDefaultAction.Suspension suspendDefaultAction(
                PeriodTimer suspensionPeriodTimer) {
            return mailboxProcessor.suspendDefaultAction(suspensionPeriodTimer);
        }
// ... existing code ...
}

// ... existing code ...
private final class DefaultActionSuspension implements MailboxDefaultAction.Suspension {
        @Nullable private final PeriodTimer suspensionTimer;

        public DefaultActionSuspension(@Nullable PeriodTimer suspensionTimer) {
            this.suspensionTimer = suspensionTimer;
        }

        @Override
        public void resume() {
            if (mailbox.isMailboxThread()) {
                resumeInternal();
            } else {
                try {
                    sendControlMail(this::resumeInternal, "resume default action");
                } catch (MailboxClosedException ex) {
                    // Ignored
                }
            }
        }

        private void resumeInternal() {
            // This method must be called from the mailbox thread.
            if (mailboxProcessor.suspendedDefaultAction == this) {
                mailboxProcessor.suspendedDefaultAction = null;
                if (suspensionTimer != null) {
                    suspensionTimer.markEnd();
                }
            }
        }
    }
// ... existing code ...
  • MailboxController 是一个内部类,实现了 MailboxDefaultAction.Controller 接口。
  • MailboxDefaultAction 通过这个 Controller 来与 MailboxProcessor 通信。
  • suspendDefaultAction(): 当默认动作(如 processInput)发现当前没有工作可做时(例如,没有输入数据或下游反压),它会调用 controller.suspendDefaultAction()
  • MailboxProcessor.suspendDefaultAction(@Nullable PeriodTimer suspensionTimer):
    • 此方法(只能由邮箱线程调用)将 suspendedDefaultAction 设置为一个新的 DefaultActionSuspension 实例。
    • DefaultActionSuspension 实现了 MailboxDefaultAction.Suspension 接口,其 resume() 方法用于恢复默认动作的执行。resume() 可以从任何线程调用,如果不是邮箱线程,它会发送一个控制邮件来确保恢复逻辑在邮箱线程中执行。

获取 MailboxExecutor:

// ... existing code ...
    public MailboxExecutor getMainMailboxExecutor() {
        return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);
    }

    /**
     * Returns an executor service facade to submit actions to the mailbox.
     *
     * @param priority the priority of the {@link MailboxExecutor}.
     */
    public MailboxExecutor getMailboxExecutor(int priority) {
        return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
    }
// ... existing code ...
  • getMainMailboxExecutor(): 返回一个具有最低优先级的 MailboxExecutor
  • getMailboxExecutor(int priority): 返回一个指定优先级的 MailboxExecutor。这些 MailboxExecutor 实例允许其他组件向此 MailboxProcessor 的邮箱提交任务。

总结

MailboxProcessor 是 Flink Task 单线程执行模型的核心。它通过一个事件循环来协调处理高优先级的控制/事件邮件和低优先级的默认数据处理动作。这种机制确保了:

  • 单线程执行: 所有关键逻辑都在同一个线程中执行,避免了复杂的并发同步。
  • 响应性: 高优先级邮件(如 Checkpoint barriers)可以抢占默认动作,保证系统事件的及时处理。
  • 可控性: 提供了挂起、恢复、终止事件循环的机制。
  • 可扩展性: 通过 MailboxExecutor 允许外部组件向邮箱提交自定义任务。

processInput

processInput 方法是 StreamTask 执行其核心数据处理逻辑的地方。它是作为 MailboxProcessor默认动作 (MailboxDefaultAction) 来执行的。这意味着,当 MailboxProcessor 的邮箱中没有更高优先级的“邮件”(如 Checkpoint 触发、Timer 事件等)需要处理时,它就会循环调用这个 processInput 方法。

下面是对 processInput 方法的详细分析:

  1. 方法职责与设计理念:

    1. 处理输入事件: 其核心职责是从输入源(由 inputProcessor 代表)获取一个事件(通常是一条记录或一组记录),并将其传递给后续的算子链进行处理。

    2. 非阻塞性: 注释中强调“Implementations should (in general) be non-blocking”。这是非常关键的一点。因为 MailboxProcessor 是单线程执行其邮箱中的邮件和默认动作的,如果 processInput 长时间阻塞,将会导致 Checkpoint barriers、Timer 等重要事件无法及时处理,影响任务的正确性和性能。

    3. 与 MailboxProcessor 协作: 通过 MailboxDefaultAction.Controller controller 参数,processInput 可以与 MailboxProcessor 进行交互,例如在没有数据或遇到反压时,通知 MailboxProcessor 暂停调用默认动作。

  2. 处理输入 (inputProcessor.processInput()):

  • 方法首先调用 inputProcessor.processInput()InputProcessor 负责从上游读取数据、反序列化,并将数据喂给当前 Task 的第一个 Operator。
  • processInput() 的返回值 DataInputStatus 描述了本次输入处理的结果。

根据 DataInputStatus 进行分支处理:

DataInputStatus status = inputProcessor.processInput();
        switch (status) {
            case MORE_AVAILABLE:
                if (taskIsAvailable()) {
                    return;
                }
                break;
            case NOTHING_AVAILABLE:
                break;
            case END_OF_RECOVERY:
                throw new IllegalStateException("We should not receive this event here.");
            case STOPPED:
                endData(StopMode.NO_DRAIN);
                return;
            case END_OF_DATA:
                endData(StopMode.DRAIN);
                notifyEndOfData();
                return;
            case END_OF_INPUT:
                // Suspend the mailbox processor, it would be resumed in afterInvoke and finished
                // after all records processed by the downstream tasks. We also suspend the default
                // actions to avoid repeat executing the empty default operation (namely process
                // records).
                controller.suspendDefaultAction();
                mailboxProcessor.suspend();
                return;
        }

MORE_AVAILABLE: 表示 inputProcessor 中还有更多数据可以立即处理。

  1. if (taskIsAvailable()) { return; }: 如果当前任务本身也是可用的(例如,下游没有反压),则直接返回。MailboxProcessor 会很快再次调用 processInput 来处理更多数据。

  2. NOTHING_AVAILABLE: 表示 inputProcessor 当前没有可用的数据。此时,方法不会立即返回,而是会继续检查是否存在反压等情况,可能需要暂停默认动作的执行。

  3. END_OF_RECOVERY: 这是一个不期望在此处出现的状态,表示任务恢复逻辑可能存在问题,因此抛出 IllegalStateException

  4. STOPPED: 表示输入流被强制停止(例如任务被取消,且不需要流干数据)。

    • endData(StopMode.NO_DRAIN): 通知算子链以非排空模式结束处理。

    • return;: 结束当前 processInput 调用。

  5. END_OF_DATA: 表示当前输入流的所有数据都已到达(例如,有限流Source结束)。

    • endData(StopMode.DRAIN): 通知算子链以排空模式结束处理(处理完所有已缓冲的数据)。

    • notifyEndOfData(): 通知 TaskManager 当前任务的数据已结束。

    • return;: 结束当前 processInput 调用。

  6. END_OF_INPUT: 表示该 Task 的所有输入都已经结束。这是一个更强的结束信号。

    • controller.suspendDefaultAction(): 通知 MailboxProcessor 暂停调用 processInput。因为已经没有新的输入了,再继续调用也没有意义。

    • mailboxProcessor.suspend(): 暂停整个 MailboxProcessor 的事件循环。任务此时会等待下游处理完所有数据,并完成最终的 Checkpoint 等操作。

    • return;: 结束当前 processInput 调用。

处理反压和等待逻辑 (当 NOTHING_AVAILABLE 或其他需要等待的情况): 如果 inputProcessor.processInput() 返回 NOTHING_AVAILABLE,或者虽然有数据但任务本身不可用(例如下游反压),代码会进入等待逻辑:

// 如果前面没有return
        TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
        PeriodTimer timer;
        CompletableFuture<?> resumeFuture;
        if (!recordWriter.isAvailable()) {
            timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
            resumeFuture = recordWriter.getAvailableFuture();
        } else if (!inputProcessor.isAvailable()) {
            timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
            resumeFuture = inputProcessor.getAvailableFuture();
        } else if (changelogWriterAvailabilityProvider != null
                && !changelogWriterAvailabilityProvider.isAvailable()) {
            // waiting for changelog availability is reported as busy
            timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
            resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
        } else {
            // data availability has changed in the meantime; retry immediately
            return;
        }
        assertNoException(
                resumeFuture.thenRun(
                        new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
    
  1. TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();: 获取IO相关的度量指标组。

  2. PeriodTimer timer; CompletableFuture<?> resumeFuture;: 声明计时器和用于恢复的 Future。

  3. 检查输出是否可用 (!recordWriter.isAvailable()):

    • 如果 recordWriter(负责将处理结果写到下游)不可用,说明下游存在反压。

    • timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());: 启动一个计时器,用于度量由于下游反压导致的等待时间。

    • resumeFuture = recordWriter.getAvailableFuture();: 获取一个 Future,当 recordWriter 再次可用时,该 Future 会完成。

  4. 检查输入处理器是否可用 (!inputProcessor.isAvailable()):

    • 如果 inputProcessor 本身不可用(例如,等待网络缓冲区的到来)。

    • timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());: 启动一个计时器,用于度量由于上游输入不可用导致的空闲时间。

    • resumeFuture = inputProcessor.getAvailableFuture();: 获取一个 Future,当 inputProcessor 再次可用时,该 Future 会完成。

  5. 检查 Changelog Writer 是否可用:

    • 如果使用了 Changelog State Backend,并且其 changelogWriterAvailabilityProvider 表示不可用。

    • timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());: 启动计时器,度量等待 Changelog Writer 的繁忙时间。

    • resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();: 获取 Future,等待其可用。

  6. 数据可用性已改变 (else { return; }):

    • 如果以上等待条件都不满足,说明在 inputProcessor.processInput() 调用之后,数据的可用性可能已经发生了变化(例如,新的数据刚刚到达)。此时直接 return,让 MailboxProcessor 立即重试 processInput

  7. 挂起默认动作并等待恢复:

    • controller.suspendDefaultAction(timer): 调用 controllersuspendDefaultAction 方法,并传入之前启动的 timer。这会通知 MailboxProcessor 暂时停止调用 processInputMailboxProcessor 会使用这个 timer 来记录挂起的时间(用于监控和度量)。该方法返回一个 MailboxDefaultAction.Suspension 对象。

    • resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)): 当 resumeFuture 完成时(即等待的条件解除,例如下游不再反压或上游数据到达),会执行 ResumeWrapper 中的逻辑。ResumeWrapper 会调用 Suspension 对象的 resume() 方法,这会通知 MailboxProcessor 可以重新开始调用 processInput 了。同时,timer 也会被停止。

    • assertNoException(...): 确保 thenRun 中的操作不会抛出未捕获的异常。

Checkpoint

StreamTask 通过其内部的 MailboxProcessor 和相关的 MailboxExecutor 来发送和处理与 Checkpoint 相关的邮件(即需要在 Task 主线程中执行的 Checkpoint 操作)。

以下是 StreamTask 如何发送和处理与 Checkpoint 相关邮件的关键机制分析:

  1. MailboxProcessor 和 MailboxExecutor:

    • 每个 StreamTask 都有一个 MailboxProcessor 实例 (mailboxProcessor),它负责驱动 Task 的事件循环。
    • StreamTask 可以通过 mailboxProcessor.getMailboxExecutor(priority) 获取一个 MailboxExecutor。这个 MailboxExecutor 提供了 execute(...) 方法,可以将一个 Runnable(封装了 Checkpoint 相关逻辑)作为 Mail 提交到邮箱中。
    • 这些邮件会被 MailboxProcessor 在其主循环中按优先级取出并执行。
  2. SubtaskCheckpointCoordinator:

    • StreamTask 包含一个 SubtaskCheckpointCoordinator 实例 (subtaskCheckpointCoordinator)。这个协调器负责处理 Task 级别的 Checkpoint 逻辑,例如触发操作符的快照、处理 Barrier 对齐、通知 Checkpoint 完成或中止等。
    • 很多 Checkpoint 相关的操作会首先由 SubtaskCheckpointCoordinator 发起或处理,然后它可能会通过 StreamTask 的 MailboxExecutor 将具体的执行步骤提交到邮箱。
  3. actionExecutor:

    • StreamTask 还有一个 StreamTaskActionExecutor 实例 (actionExecutor)。虽然 MailboxExecutor 用于将任务 放入 邮箱,但当 Mail 从邮箱中被取出后,其内部的 Runnable 通常会通过这个 actionExecutor 来实际执行。对于 Checkpoint 相关的操作,这确保了它们在正确的 Task 主线程上下文中运行。

发送 Checkpoint 相关邮件的典型场景和方法:

  • 触发 Checkpoint (triggerCheckpointAsync):

    • 当 JobManager 向 TaskManager 发送触发 Checkpoint 的 RPC 时,Task(通常是 StreamTask 的父类或其本身)会调用 triggerCheckpointAsync 方法。
    • 这个方法会将实际的 Checkpoint 执行逻辑封装成一个 Runnable,并通过 mainMailboxExecutor(一个具有默认优先级的 MailboxExecutor)提交到邮箱。
    • 这样做是为了确保 Checkpoint 的所有阶段(例如调用操作符的 snapshotState)都在 Task 的主线程中执行,从而避免与正常的数据处理流程发生并发冲突。

    StreamTask.java

    // ... existing code ...
    @Override
    public CompletableFuture<Boolean> triggerCheckpointAsync(
            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        checkForcedFullSnapshotSupport(checkpointOptions);
    
        CompletableFuture<Boolean> result = new CompletableFuture<>();
        mainMailboxExecutor.execute(
                () -> {
                    try {
                        // Lock the mailbox to ensure that the checkpoint is not concurrent with other
                        // actions
                        synchronized (mailboxProcessor) {
                            result.complete(
                                    triggerUnfinishedChannelsCheckpoint(
                                            checkpointMetaData, checkpointOptions));
                        }
                    } catch (Exception ex) {
                        // Report the failure both via the Future result but also to the mailbox
                        result.completeExceptionally(ex);
                        throw ex;
                    }
                },
                "checkpoint %s with %s",
                checkpointMetaData,
                checkpointOptions);
        return result;
    }
    // ... existing code ...
    

    在上面的代码片段中,mainMailboxExecutor.execute(...) 就是将 Checkpoint 触发逻辑(triggerUnfinishedChannelsCheckpoint)作为邮件发送到邮箱的关键步骤。

  • 通知 Checkpoint 完成 (notifyCheckpointCompleteAsync):

    • 当 Task 完成一个 Checkpoint 并收到 JobManager 的确认后,会调用此方法。
    • 同样,通知操作符 Checkpoint 完成的逻辑也会被封装并通过 MailboxExecutor 提交到邮箱。

    StreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
        return notifyCheckpointOperation(
                () -> notifyCheckpointComplete(checkpointId),
                String.format("checkpoint %d completed", checkpointId));
    }
    // ... existing code ...
    

    而 notifyCheckpointOperation 内部会使用 MailboxExecutor

    StreamTask.java

    // ... existing code ...
    private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {
        CompletableFuture<Void> result = new CompletableFuture<>();
        mailboxProcessor
                .getMailboxExecutor(TaskMailbox.MAX_PRIORITY)
                .execute(
                        () -> {
                            try {
                                runnable.run();
                            } catch (Exception ex) {
                                result.completeExceptionally(ex);
                                throw ex;
                            }
                            result.complete(null);
                        },
                        description);
        return result;
    }
    // ... existing code ...
    

    这里使用了 TaskMailbox.MAX_PRIORITY,表明这是一个高优先级的操作。

  • 通知 Checkpoint 中止 (notifyCheckpointAbortAsync):

    • 当一个 Checkpoint 因为各种原因(超时、错误、被新的 Checkpoint 取代)需要中止时,会调用此方法。
    • 中止逻辑,包括清理操作符可能产生的临时状态,也会通过邮件发送到邮箱执行。

    StreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointAbortAsync(
            long checkpointId, long latestCompletedCheckpointId) {
        return notifyCheckpointOperation(
                () -> {
                    if (latestCompletedCheckpointId > 0) {
                        notifyCheckpointComplete(latestCompletedCheckpointId);
                    }
    
                    if (isCurrentSyncSavepoint(checkpointId)) {
                        throw new FlinkRuntimeException("Stop-with-savepoint failed.");
                    }
                    subtaskCheckpointCoordinator.notifyCheckpointAborted(
                            checkpointId, operatorChain, this::isRunning);
                },
                String.format("checkpoint %d aborted", checkpointId));
    }
    // ... existing code ...
    

    同样,它也使用了 notifyCheckpointOperation 方法,将中止逻辑放入邮箱。

  • 处理 Barrier 对齐时的 Timer 回调:

    • 当使用 Barrier 对齐策略时,如果一个 InputGate 等待某个 Channel 的 Barrier 超时,SubtaskCheckpointCoordinator 会注册一个 Timer。当这个 Timer 触发时,其回调逻辑(例如取消 Checkpoint 或强制触发 Checkpoint)也会被封装成邮件并通过 MailboxExecutor 提交到邮箱执行。
    • 在 BarrierAlignmentUtil.createRegisterTimerCallback 中可以看到相关的逻辑,它会返回一个 BiConsumer<Long, Long>,这个 Consumer 内部会使用 mainMailboxExecutor 来执行超时处理。
  • Source Task 的特定行为:

    • 例如在 SourceOperatorStreamTask 中,notifyCheckpointAbortAsync 和 notifyCheckpointSubsumedAsync 方法会直接使用 mainMailboxExecutor 来执行清理 Checkpoint 的逻辑。

    SourceOperatorStreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointAbortAsync(
            long checkpointId, long latestCompletedCheckpointId) {
        mainMailboxExecutor.execute(
                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
    }
    
    @Override
    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
        mainMailboxExecutor.execute(
                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
        return super.notifyCheckpointSubsumedAsync(checkpointId);
    // ... existing code ...
    

总结:

StreamTask 依赖其 MailboxProcessor 和通过它获取的 MailboxExecutor 来确保所有与 Checkpoint 相关的关键操作(触发、通知完成/中止、Barrier 处理等)都在 Task 的主事件循环线程中串行执行。这避免了复杂的并发控制,保证了 Checkpoint 过程与正常数据处理流程的一致性和正确性。当需要执行一个 Checkpoint 相关操作时,通常会将其封装为一个 Runnable,然后通过 MailboxExecutor.execute() 方法将其作为一封邮件提交到邮箱队列中,等待 MailboxProcessor 的调度执行。


网站公告

今日签到

点亮在社区的每一天
去签到