WebRTC源码分析 pacer代码流程

发布于:2023-01-21 ⋅ 阅读:(444) ⋅ 点赞:(0)

看流程之前先看理论

pacer理论

数据流

1、入队列流程

1.1 入队列流程

RTPSenderVideo::LogAndSendToNetwork
RTPSender::EnqueuePackets
PacedSender::EnqueuePackets
PacingController::SetPacingRates
PacingController::EnqueuePacketInternal
RoundRobinPacketQueue::Push //放入队列

1.2 发送流程

PacedSender::Process()
PacingController::ProcessPackets()
PacingController::GetPendingPacket()
PacketRouter::SendPacket()
ModuleRtpRtcpImpl2::TrySendPacket()
RtpSenderEgress::SendPacket()
RtpSenderEgress::SendPacketToNetwork()
LayerFilteringTransport::SendRtp()

2、分析

2.1数据完成rtp封装后传入PacedSender

本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg webRTC rtmp hls rtsp ffplay srs↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓

void PacedSender::EnqueuePackets(
    std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
  {
   ......
   //视频帧会拆分成多个rtp包
    for (auto& packet : packets) {
      RTC_DCHECK_GE(packet->capture_time_ms(), 0);
      pacing_controller_.EnqueuePacket(std::move(packet));
    }
  }
  MaybeWakupProcessThread();
}

 2.2获取数据优先级

void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
      << "SetPacingRate must be called before InsertPacket.";
  RTC_CHECK(packet->packet_type());
  // Get priority first and store in temporary, to avoid chance of object being
  // moved before GetPriorityForType() being called.
  //优先级
  const int priority = GetPriorityForType(*packet->packet_type());
  EnqueuePacketInternal(std::move(packet), priority);
}

priority 数字越小优先级越高

int GetPriorityForType(RtpPacketMediaType type) {
  // Lower number takes priority over higher.
  switch (type) {
    case RtpPacketMediaType::kAudio:
      // Audio is always prioritized over other packet types.
      return kFirstPriority + 1;
    case RtpPacketMediaType::kRetransmission:
      // Send retransmissions before new media.
      return kFirstPriority + 2;
    case RtpPacketMediaType::kVideo:
    case RtpPacketMediaType::kForwardErrorCorrection:
      // Video has "normal" priority, in the old speak.
      // Send redundancy concurrently to video. If it is delayed it might have a
      // lower chance of being useful.
      return kFirstPriority + 3;
    case RtpPacketMediaType::kPadding:
      // Packets that are in themselves likely useless, only sent to keep the
      // BWE high.
      return kFirstPriority + 4;
  }
  RTC_CHECK_NOTREACHED();
}

发送顺序

  • 优先级高的报文排在fifo的前面,低的排在后面。
  • 首先判断报文的priority等级,等级越小的优先级越高(priority等级根据报文类型进行分类)。
  • 然后判断重发标示,重发的报文比普通报文的优先级更高
  • 最后是判断视频帧timestamp,越早的视频帧优先级更高。

2.3 放入队列

void PacingController::EnqueuePacketInternal(
    std::unique_ptr<RtpPacketToSend> packet,
    int priority) {
  //统计发送带宽
  prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));

  Timestamp now = CurrentTime();
  if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() &&
      NextSendTime() <= now) {
    TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
    UpdateBudgetWithElapsedTime(elapsed_time);
  }
  //放入队列
  packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}

2.4 rtp包是按照流分队列存储 

void RoundRobinPacketQueue::Push(QueuedPacket packet) {
  auto stream_info_it = streams_.find(packet.Ssrc());
  if (stream_info_it == streams_.end()) {
    stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
    stream_info_it->second.priority_it = stream_priorities_.end();
    stream_info_it->second.ssrc = packet.Ssrc();
  }

  Stream* stream = &stream_info_it->second;

  if (stream->priority_it == stream_priorities_.end()) {
    // If the SSRC is not currently scheduled, add it to `stream_priorities_`.
    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
  } else if (packet.Priority() < stream->priority_it->first.priority) {
    // If the priority of this SSRC increased, remove the outdated StreamPrioKey
    // and insert a new one with the new priority. Note that `priority_` uses
    // lower ordinal for higher priority.
    stream_priorities_.erase(stream->priority_it);
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
  }
  RTC_CHECK(stream->priority_it != stream_priorities_.end());

  if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
    // Promotion from single-packet queue. Just add to enqueue times.
    packet.UpdateEnqueueTimeIterator(
        enqueue_times_.insert(packet.EnqueueTime()));
  } else {
    // In order to figure out how much time a packet has spent in the queue
    // while not in a paused state, we subtract the total amount of time the
    // queue has been paused so far, and when the packet is popped we subtract
    // the total amount of time the queue has been paused at that moment. This
    // way we subtract the total amount of time the packet has spent in the
    // queue while in a paused state.
    UpdateQueueTime(packet.EnqueueTime());
    packet.SubtractPauseTime(pause_time_sum_);

    size_packets_ += 1;
    size_ += PacketSize(packet);
  }

  stream->packet_queue.push(packet);
}

 本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg webRTC rtmp hls rtsp ffplay srs↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓