TCP-based机制
tcp-based和credit-based介绍可以看:
https://flink-learning.org.cn/article/detail/138316d1556f8f9d34e517d04d670626
flink1.5之前是基本tcp实现的反压。同一个TaskManager的任务共享TCP缓冲区。一个任务产生背压,会影响TaskManager内的其他任务接受消息
- 在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。
- 依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。
Credit-based机制
Credit Based流量控制(since V1.5)的合适是确保发送端已经发送的任何数据,接收端都具有足够的能力(缓冲区)来接收。每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息。
client端下游的buffer已经耗尽,credit=0,此时并没有发送credit给server端,server因为没有收到credit,不会向client发送消息。
源码解析
实时数据流中的credit控制
client初始请求分区数据的时候会携带初始credit,默认是2
server收到这个PartitionRequest消息后会创建CreditBasedSequenceNumberingViewReader,它来负责server端的credit管理。CreditBasedSequenceNumberingViewReader初始credit是来自request中的credit。
numCreditsAvailable是client可用的credit,首次建立连接就是initialCredit。
CreditBasedSequenceNumberingViewReader创建完成后,会调用requestSubpartitionView
requestSubpartitionView中会创建读取数据的ResultSubpartitionView,再notifyDataAvailable通知读取数据。
notifyDataAvailable是将发送reader用户事件消息。
reader消息触发userEventTriggered方法。调用enqueueAvailableReader。
enqueueAvailableReader方法中有两个地方都会发送backlog。这里先说writeAndFlushNextMessageIfPossible部分,在消息中携带backlog。
writeAndFlushNextMessageIfPossible不断循环从reader读取数据放入buffer,包装成BufferResponse消息,BufferResponse消息也包含backlog,将BufferResponse消息发送给client。
client收到BufferResponse消息后,decodeBufferOrEvent解析buffer。在decodeBufferOrEvent中分两种情况,buffer为空和不为空,分别调用onEmptyBuffer和onBuffer方法。
在onEmptyBuffer和onBuffer方法中,都有backlog>0调用onSenderBacklog方法。
如果backlog=0表示没有消息剩余,不用增加credit。
onSenderBacklog方法中 bufferManager调用requestFloatingBuffers申请buffer,notifyBufferAvailable通知申请到的bufffer。
申请的数量是backlog + initialCredit。会立即返回可以申请到buffer,要是不足会注册监听器直到buffer数量满足。
notifyBufferAvailable最后调用的是partitionRequestClient的notifyCreditAvailable
partitionRequestClient发送用户事件AddCreditMessage
因为是userEvent,所以触发userEventTriggered方法。
userEventTriggered中将AddCreditMessage消息加入clientOutboundMessages待发送消息队列中,writeAndFlushNextMessageIfPossible将消息发送。
writeAndFlushNextMessageIfPossible获取到AddCreditMessage消息,调用buildMessage生成AddCredit消息,向server发送AddCredit消息。AddCreditMessage本质就是发送AddCredit消息。
server端收到AddCredit消息,调用addCreditOrResumeConsumption增加credit。
addCreditOrResumeConsumption传入的第二个参数是,reader -> reader.addCredit(request.credit) 这是一个函数。
addCreditOrResumeConsumption中首先获取对应的reader(CreditBasedSequenceNumberingViewReader),
operation.accept(reader) 这里是调用reader的addCredit方法。reader的addCredit就是在numCreditsAvailable上记录。
最后将reader加入队列中。
enqueueAvailableReader这个方法上面讲过,还是会注册reader,reader读取数据包装成携带backlog的BufferResponse发送client,这样可以动态调整credit。
独立的credit控制
上面提到enqueueAvailableReader中会单独发送backlog,既然在BufferResponse已经有了backlog,为什么还要单独发送backlog呢。因为单独发送backlog调度优先级更高,可以快速触发反压,同时如果当生产者有积压但无数据可发时(如只产生事件流),需要单独通知积压情况。
enqueueAvailableReader中获取reader的backlog(当前reader中还有多少数据没有被消费)调用announceBacklog通知client。
包装成BacklogAnnouncement消息发送给client
client收到BacklogAnnouncement消息,调用RemoteInputChannel的onSenderBacklog,client处理backlog的逻辑与上面相同。
反压处理
server数据产生速度一直大于client端消费速度,backlog不断传递到client,client申请buffer,当buffer不能再申请后,触发反压。
onSenderBacklog这里会使用requestFloatingBuffers申请buffer
requestFloatingBuffers调用tryRequestBuffers申请buffer
tryRequestBuffers是一个buffer一个buffer申请的,申请不到buffer的时候会调用addBufferListener添加监听器。添加监听器,也就是buffer不够,发生了反压。
上面监听器已经注册了,等待buffer释放。 notifyBufferAvailable处理申请到的buffer。反压申请不到buffer了,numAvailableBuffers=0,不会执行notifyCreditAvailable方法通知server来添加credit。
server没有收到credit,不会向client发送消息的。
当有buffer可用的时候会触发监听器bufferManager的notifyBufferAvailable方法。
notifyBufferAvailable会调用RemoteInputChannel的方法notifyBufferAvailable。
notifyBufferAvailable去申请新的buffer。