Flink Credit-based机制解析

发布于:2025-03-29 ⋅ 阅读:(28) ⋅ 点赞:(0)

TCP-based机制

tcp-based和credit-based介绍可以看:

https://flink-learning.org.cn/article/detail/138316d1556f8f9d34e517d04d670626

flink1.5之前是基本tcp实现的反压。同一个TaskManager的任务共享TCP缓冲区。一个任务产生背压,会影响TaskManager内的其他任务接受消息

  • 在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。
  • 依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。

Credit-based机制

Credit Based流量控制(since V1.5)的合适是确保发送端已经发送的任何数据,接收端都具有足够的能力(缓冲区)来接收。每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息。

client端下游的buffer已经耗尽,credit=0,此时并没有发送credit给server端,server因为没有收到credit,不会向client发送消息。

源码解析

实时数据流中的credit控制

client初始请求分区数据的时候会携带初始credit,默认是2

server收到这个PartitionRequest消息后会创建CreditBasedSequenceNumberingViewReader,它来负责server端的credit管理。CreditBasedSequenceNumberingViewReader初始credit是来自request中的credit。

numCreditsAvailable是client可用的credit,首次建立连接就是initialCredit。

CreditBasedSequenceNumberingViewReader创建完成后,会调用requestSubpartitionView

requestSubpartitionView中会创建读取数据的ResultSubpartitionView,再notifyDataAvailable通知读取数据。

notifyDataAvailable是将发送reader用户事件消息。

reader消息触发userEventTriggered方法。调用enqueueAvailableReader。

enqueueAvailableReader方法中有两个地方都会发送backlog。这里先说writeAndFlushNextMessageIfPossible部分,在消息中携带backlog。

writeAndFlushNextMessageIfPossible不断循环从reader读取数据放入buffer,包装成BufferResponse消息,BufferResponse消息也包含backlog,将BufferResponse消息发送给client。

client收到BufferResponse消息后,decodeBufferOrEvent解析buffer。在decodeBufferOrEvent中分两种情况,buffer为空和不为空,分别调用onEmptyBuffer和onBuffer方法。

在onEmptyBuffer和onBuffer方法中,都有backlog>0调用onSenderBacklog方法。

如果backlog=0表示没有消息剩余,不用增加credit。

onSenderBacklog方法中 bufferManager调用requestFloatingBuffers申请buffer,notifyBufferAvailable通知申请到的bufffer。

申请的数量是backlog + initialCredit。会立即返回可以申请到buffer,要是不足会注册监听器直到buffer数量满足。

notifyBufferAvailable最后调用的是partitionRequestClient的notifyCreditAvailable

partitionRequestClient发送用户事件AddCreditMessage

因为是userEvent,所以触发userEventTriggered方法。

userEventTriggered中将AddCreditMessage消息加入clientOutboundMessages待发送消息队列中,writeAndFlushNextMessageIfPossible将消息发送。

writeAndFlushNextMessageIfPossible获取到AddCreditMessage消息,调用buildMessage生成AddCredit消息,向server发送AddCredit消息。AddCreditMessage本质就是发送AddCredit消息。

server端收到AddCredit消息,调用addCreditOrResumeConsumption增加credit。

addCreditOrResumeConsumption传入的第二个参数是,reader -> reader.addCredit(request.credit) 这是一个函数。

addCreditOrResumeConsumption中首先获取对应的reader(CreditBasedSequenceNumberingViewReader),

operation.accept(reader) 这里是调用reader的addCredit方法。reader的addCredit就是在numCreditsAvailable上记录。

最后将reader加入队列中。

enqueueAvailableReader这个方法上面讲过,还是会注册reader,reader读取数据包装成携带backlog的BufferResponse发送client,这样可以动态调整credit。

独立的credit控制

上面提到enqueueAvailableReader中会单独发送backlog,既然在BufferResponse已经有了backlog,为什么还要单独发送backlog呢。因为单独发送backlog调度优先级更高,可以快速触发反压,同时如果当生产者有积压但无数据可发时(如只产生事件流),需要单独通知积压情况。

enqueueAvailableReader中获取reader的backlog(当前reader中还有多少数据没有被消费)调用announceBacklog通知client。

包装成BacklogAnnouncement消息发送给client

client收到BacklogAnnouncement消息,调用RemoteInputChannel的onSenderBacklog,client处理backlog的逻辑与上面相同。

反压处理

server数据产生速度一直大于client端消费速度,backlog不断传递到client,client申请buffer,当buffer不能再申请后,触发反压。

onSenderBacklog这里会使用requestFloatingBuffers申请buffer

requestFloatingBuffers调用tryRequestBuffers申请buffer

tryRequestBuffers是一个buffer一个buffer申请的,申请不到buffer的时候会调用addBufferListener添加监听器。添加监听器,也就是buffer不够,发生了反压。

上面监听器已经注册了,等待buffer释放。 notifyBufferAvailable处理申请到的buffer。反压申请不到buffer了,numAvailableBuffers=0,不会执行notifyCreditAvailable方法通知server来添加credit。

server没有收到credit,不会向client发送消息的。

当有buffer可用的时候会触发监听器bufferManager的notifyBufferAvailable方法。

notifyBufferAvailable会调用RemoteInputChannel的方法notifyBufferAvailable。

notifyBufferAvailable去申请新的buffer。


网站公告

今日签到

点亮在社区的每一天
去签到