我正在的github给大家开发一个用于做实验的项目 —— github.com/qw225967/Bifrost
1.1 模块介绍
WebRTC 的 InterArrival 类是用于计算包之间的到达时间差(Inter-Arrival Time)的类。 如果观察WebRTC的提交记录你会发现,这个类随着卡尔曼滤波器、趋势线等等算法的变更也一直在调整。那么为什么要存在这个接收间隔的计算类呢?
1.2 代码
void DelayBasedBwe::IncomingPacketFeedback(const PacketResult& packet_feedback,
Timestamp at_time) {
uint32_t ts_delta = 0;
int64_t t_delta = 0;
int size_delta = 0;
// 校准接收间隔
bool calculated_deltas = inter_arrival_->ComputeDeltas(
timestamp, packet_feedback.receive_time.ms(), at_time.ms(),
packet_feedback.sent_packet.size.bytes(), &ts_delta, &t_delta,
double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
// 把间隔放入检测器进行拥塞检测
delay_detector_->Update(t_delta, ts_delta_ms,
packet_feedback.receive_time.ms(), calculated_deltas);
bool InterArrival::ComputeDeltas(uint32_t timestamp, int64_t arrival_time_ms,
int64_t system_time_ms, size_t packet_size,
uint32_t* timestamp_delta,
int64_t* arrival_time_delta_ms,
int* packet_size_delta) {
// 传入参数:
// timestamp 数据发送的时间戳
// arrival_time_ms 对端接收到数据的时间戳
// system_time_ms 当前系统时间,其实是feedback接到的时间
// packet_size 当前数据包的大小
// 输出内容 —— timestamp_delta 发送间隔
// 输出内容 —— arrival_time_delta_ms 接收间隔
// 输出内容 —— packet_size_delta 两个包组直接包数量的差值
bool calculated_deltas = false;
// 第一个包组,记录信息
if (current_timestamp_group_.IsFirstPacket()) {
// We don't have enough data to update the filter, so we store it until we
// have two frames of data to process.
current_timestamp_group_.timestamp = timestamp;
current_timestamp_group_.first_timestamp = timestamp;
current_timestamp_group_.first_arrival_ms = arrival_time_ms;
// 连续包组返回
} else if (!PacketInOrder(timestamp, arrival_time_ms)) {
return false;
// 新包组开始计算间隔
} else if (NewTimestampGroup(arrival_time_ms, timestamp)) {
// First packet of a later frame, the previous frame sample is ready.
// 第一个包组不计算,后续包组开始计算
if (prev_timestamp_group_.complete_time_ms >= 0) {
// 记录包组之间的发送间隔
*timestamp_delta =
current_timestamp_group_.timestamp - prev_timestamp_group_.timestamp;
// 记录包组之间的接收间隔
*arrival_time_delta_ms = current_timestamp_group_.complete_time_ms -
// Check system time differences to see if we have an unproportional jump
// in arrival time. In that case reset the inter-arrival computations.
// 计算系统时间变化,防止系统时间跳变影响计算
int64_t system_time_delta_ms =
current_timestamp_group_.last_system_time_ms -
if (*arrival_time_delta_ms - system_time_delta_ms >=
kArrivalTimeOffsetThresholdMs) {
return false;
// 对端的接收时间戳可能已经生变化,影响计算
if (*arrival_time_delta_ms < 0) {
// The group of packets has been reordered since receiving its local
// arrival timestamp.
if (num_consecutive_reordered_packets_ >= kReorderedResetThreshold) {
return false;
} else {
num_consecutive_reordered_packets_ = 0;
// 计算两个包组的数据量差值
*packet_size_delta = static_cast<int>(current_timestamp_group_.size) -
calculated_deltas = true;
// 更新数据
prev_timestamp_group_ = current_timestamp_group_;
// The new timestamp is now the current frame.
current_timestamp_group_.first_timestamp = timestamp;
current_timestamp_group_.timestamp = timestamp;
current_timestamp_group_.first_arrival_ms = arrival_time_ms;
current_timestamp_group_.size = 0;
} else {
current_timestamp_group_.timestamp =
LatestTimestamp(current_timestamp_group_.timestamp, timestamp);
// Accumulate the frame size.
current_timestamp_group_.size += packet_size;
current_timestamp_group_.complete_time_ms = arrival_time_ms;
current_timestamp_group_.last_system_time_ms = system_time_ms;
return calculated_deltas;
bool InterArrival::PacketInOrder(uint32_t timestamp, int64_t arrival_time_ms) {
if (current_timestamp_group_.IsFirstPacket()) {
return true;
} else if (arrival_time_ms < 0) {
// NOTE: Change related to
// https://github.com/versatica/mediaproxy/issues/357
// Sometimes we do get negative arrival time, which causes BelongsToBurst()
// to fail, which may cause anything that uses InterArrival to crash.
// Credits to @sspanak and @Ivaka.
return false;
} else {
// Assume that a diff which is bigger than half the timestamp interval
// (32 bits) must be due to reordering. This code is almost identical to
// that in IsNewerTimestamp() in module_common_types.h.
uint32_t timestamp_diff =
timestamp - current_timestamp_group_.first_timestamp;
const static uint32_t int_middle = 0x80000000;
// 处理跳变
if (timestamp_diff == int_middle) {
return timestamp > current_timestamp_group_.first_timestamp;
return timestamp_diff < int_middle;
// Assumes that |timestamp| is not reordered compared to
// |current_timestamp_group_|.
bool InterArrival::NewTimestampGroup(int64_t arrival_time_ms,
uint32_t timestamp) const {
if (current_timestamp_group_.IsFirstPacket()) {
return false;
// 计算突发数据,确认突发数据直接返回
} else if (BelongsToBurst(arrival_time_ms, timestamp)) {
return false;
} else {
// 差值大于5ms就认为是下一个发送周期
uint32_t timestamp_diff =
timestamp - current_timestamp_group_.first_timestamp;
return timestamp_diff > kTimestampGroupLengthTicks;
bool InterArrival::BelongsToBurst(int64_t arrival_time_ms,
uint32_t timestamp) const {
if (!burst_grouping_) {
return false;
// 计算于上一个发送时间、接收时间的差值
int64_t arrival_time_delta_ms =
arrival_time_ms - current_timestamp_group_.complete_time_ms;
uint32_t timestamp_diff = timestamp - current_timestamp_group_.timestamp;
// 当发送间隔转为ms后差值在0.5浮动范围内
int64_t ts_delta_ms = timestamp_to_ms_coeff_ * timestamp_diff + 0.5;
if (ts_delta_ms == 0) return true;
// 一旦接收间隔比发送间隔加上浮动值0.5还小,证明这些包连续发送
int propagation_delta_ms = arrival_time_delta_ms - ts_delta_ms;
if (propagation_delta_ms < 0 && /* 连续发送 */
arrival_time_delta_ms <= kBurstDeltaThresholdMs && /* 处于同一发送周期 */
arrival_time_ms - current_timestamp_group_.first_arrival_ms <
kMaxBurstDurationMs) /* 最大异常值限制 */
return true;
return false;
2.1 背景
状态机切换大家都很熟悉,就是下面这张图(来自于谷歌公开的Performance Analysis of Google Congestion Control Algorithm for WebRTC文章):
enum class BandwidthUsage {
// 网络正常
kBwNormal = 0,
// 网络过度使用
kBwUnderusing = 1,
// 网络正在排空
kBwOverusing = 2,
2.2 代码
void AimdRateControl::ChangeState(const RateControlInput& input,
Timestamp at_time) {
switch (input.bw_state) {
case BandwidthUsage::kBwNormal:
// 只有hold状态能进入码率上涨
if (rate_control_state_ == kRcHold) {
time_last_bitrate_change_ = at_time;
rate_control_state_ = kRcIncrease;
case BandwidthUsage::kBwOverusing:
// 出现网络过度使用时下调码率
if (rate_control_state_ != kRcDecrease) {
rate_control_state_ = kRcDecrease;
case BandwidthUsage::kBwUnderusing:
// 当网络开始排空时先转成hold状态
rate_control_state_ = kRcHold;
DataRate AimdRateControl::ChangeBitrate(DataRate new_bitrate,
const RateControlInput& input,
Timestamp at_time) {
// 取出吞吐量估计值
DataRate estimated_throughput =
if (input.estimated_throughput)
latest_estimated_throughput_ = *input.estimated_throughput;
// An over-use should always trigger us to reduce the bitrate, even though
// we have not yet established our first estimate. By acting on the over-use,
// we will end up with a valid estimate.
// 初始阶段,只要不是网络拥塞就不进行以下逻辑计算
if (!bitrate_is_initialized_ &&
input.bw_state != BandwidthUsage::kBwOverusing)
return current_bitrate_;
// 状态切换
ChangeState(input, at_time);
switch (rate_control_state_) {
// hold状态直接返回
case kRcHold:
case kRcIncrease:
// 吞吐量估计大于了链路容量统计,则重置容量统计
if (estimated_throughput > link_capacity_.UpperBound())
// Do not increase the delay based estimate in alr since the estimator
// will not be able to get transport feedback necessary to detect if
// the new estimate is correct.
// alr状态下可以根据no_bitrate_increase_in_alr_决定是否继续进行码率增长
// 当alr状态下持续码率增长,一旦出现码率暴增发送码率就会爆发式增大
if (!(send_side_ && in_alr_ && no_bitrate_increase_in_alr_)) {
// 计算出链路容量则进入加性增,因为当前瓶颈已知
if (link_capacity_.has_estimate()) {
// The link_capacity estimate is reset if the measured throughput
// is too far from the estimate. We can therefore assume that our
// target rate is reasonably close to link capacity and use additive
// increase.
DataRate additive_increase =
AdditiveRateIncrease(at_time, time_last_bitrate_change_);
new_bitrate += additive_increase;
} else {
// If we don't have an estimate of the link capacity, use faster ramp
// up to discover the capacity.
// 未存在里哪路容量则需要乘性增去做探测
DataRate multiplicative_increase = MultiplicativeRateIncrease(
at_time, time_last_bitrate_change_, new_bitrate);
new_bitrate += multiplicative_increase;
time_last_bitrate_change_ = at_time;
case kRcDecrease:
// TODO(srte): Remove when |estimate_bounded_backoff_| has been validated.
// 取当前链路容量的小值与吞吐量对比取大值,用于激进地下调码率
if (network_estimate_ && capacity_deviation_ratio_threshold_ &&
!estimate_bounded_backoff_) {
estimated_throughput = std::max(estimated_throughput,
if (estimated_throughput > low_throughput_threshold_) {
// Set bit rate to something slightly lower than the measured throughput
// to get rid of any self-induced delay.
// 新的码率需要略低于吞吐量,避免引入新的排队导致延迟
new_bitrate = estimated_throughput * beta_;
if (new_bitrate > current_bitrate_) {
// Avoid increasing the rate when over-using.
// 当此时新的码率仍然高于当前码率,则根据链路容量重新设置新码率
if (link_capacity_.has_estimate()) {
new_bitrate = beta_ * link_capacity_.estimate();
// estimate_bounded_backoff_ 称为边界避退,目的是标记在链路容量下限高于当前容量时,使用链路容量下限
if (estimate_bounded_backoff_ && network_estimate_) {
new_bitrate = std::max(
new_bitrate, network_estimate_->link_capacity_lower * beta_);
} else {
// 吞吐量小于低吞吐的阈值,则直接使用吞吐量
new_bitrate = estimated_throughput;
// 已经估计出带宽则取吞吐量、带宽的最大值
if (link_capacity_.has_estimate()) {
new_bitrate = std::max(new_bitrate, link_capacity_.estimate());
// 超过吞吐阈值都使用阈值
new_bitrate = std::min(new_bitrate, low_throughput_threshold_.Get());
// 如果当前码率已经很小,则继续使用当前码率
new_bitrate = std::min(new_bitrate, current_bitrate_);
if (bitrate_is_initialized_ && estimated_throughput < current_bitrate_) {
// 有可能存在过度下降码率的情况,一旦超过下降的90%,则不使用该码率
constexpr double kDegradationFactor = 0.9;
if (smoothing_experiment_ &&
new_bitrate < kDegradationFactor * beta_ * current_bitrate_) {
// If bitrate decreases more than a normal back off after overuse, it
// indicates a real network degradation. We do not let such a decrease
// to determine the bandwidth estimation period.
last_decrease_ = absl::nullopt;
} else {
// 记录该下降的码率
last_decrease_ = current_bitrate_ - new_bitrate;
if (estimated_throughput < link_capacity_.LowerBound()) {
// The current throughput is far from the estimated link capacity. Clear
// the estimate to allow an immediate update in OnOveruseDetected.
// 更新状态记录
bitrate_is_initialized_ = true;
// Stay on hold until the pipes are cleared.
rate_control_state_ = kRcHold;
time_last_bitrate_change_ = at_time;
time_last_bitrate_decrease_ = at_time;
// 选择码率
return ClampBitrate(new_bitrate, estimated_throughput);
DataRate AimdRateControl::ClampBitrate(DataRate new_bitrate,
DataRate estimated_throughput) const {
// Allow the estimate to increase as long as alr is not detected to ensure
// that there is no BWE values that can make the estimate stuck at a too
// low bitrate. If an encoder can not produce the bitrate necessary to
// fully use the capacity, alr will sooner or later trigger.
if (!(send_side_ && no_bitrate_increase_in_alr_)) {
// Don't change the bit rate if the send side is too far off.
// We allow a bit more lag at very low rates to not too easily get stuck if
// the encoder produces uneven outputs.
// 每次上涨有一个最大的上涨限度,1.5 * 吞吐量 + 10kbps,避免超出吞吐量上涨过多
const DataRate max_bitrate =
1.5 * estimated_throughput + DataRate::kbps(10);
if (new_bitrate > current_bitrate_ && new_bitrate > max_bitrate) {
new_bitrate = std::max(current_bitrate_, max_bitrate);
if (network_estimate_ &&
(estimate_bounded_increase_ || capacity_limit_deviation_factor_)) {
DataRate upper_bound = network_estimate_->link_capacity_upper;
new_bitrate = std::min(new_bitrate, upper_bound);
new_bitrate = std::max(new_bitrate, min_configured_bitrate_);
return new_bitrate;
DataRate AimdRateControl::MultiplicativeRateIncrease(
Timestamp at_time, Timestamp last_time, DataRate current_bitrate) const {
// 1.08这个参数与丢包估计的上涨参数一样
double alpha = 1.08;
if (last_time.IsFinite()) {
auto time_since_last_update = at_time - last_time;
alpha = pow(alpha, std::min(time_since_last_update.seconds<double>(), 1.0));
DataRate multiplicative_increase =
std::max(current_bitrate * (alpha - 1.0), DataRate::bps(1000));
return multiplicative_increase;
DataRate AimdRateControl::AdditiveRateIncrease(Timestamp at_time,
Timestamp last_time) const {
double time_period_seconds = (at_time - last_time).seconds<double>();
double data_rate_increase_bps =
GetNearMaxIncreaseRateBpsPerSecond() * time_period_seconds;
return DataRate::bps(data_rate_increase_bps);
double AimdRateControl::GetNearMaxIncreaseRateBpsPerSecond() const {
// RTC_DCHECK(!current_bitrate_.IsZero());
// 加性增以固定的15帧换算帧间隔,最终根据帧间隔计算出一个大致的平均包大小
const TimeDelta kFrameInterval = TimeDelta::seconds(1) / 15;
DataSize frame_size = current_bitrate_ * kFrameInterval;
const DataSize kPacketSize = DataSize::bytes(1200);
double packets_per_frame = std::ceil(frame_size / kPacketSize);
DataSize avg_packet_size = frame_size / packets_per_frame;
// Approximate the over-use estimator delay to 100 ms.
// 使用平均包大小换算出每个计算周期的增长值,最大为4kbps
TimeDelta response_time = rtt_ + TimeDelta::ms(100);
if (in_experiment_) response_time = response_time * 2;
double increase_rate_bps_per_second =
(avg_packet_size / response_time).bps<double>();
double kMinIncreaseRateBpsPerSecond = 4000;
return std::max(kMinIncreaseRateBpsPerSecond, increase_rate_bps_per_second);