Apollo 9.0 速度动态规划决策算法 – path time heuristic optimizer

发布于:2025-02-15 ⋅ 阅读:(15) ⋅ 点赞:(0)


这一章将来讲解速度决策算法,也就是SPEED_HEURISTIC_OPTIMIZER task里面的内容。Apollo 9.0使用动态规划算法进行速度决策,从类名来看,PathTimeHeuristicOptimizer 是路径时间启发式优化器,顾名思义,这里的算法在非凸的ST空间进行了纵向超让的决策,同时也为后续速度规划算法提供了一个启发式的粗解。

1. 动态规划

动态规划,英文:Dynamic Programming,简称DP。

关于动态规划,网上其实有很多讲解,都很详细,这里不再赘述。其实如果你刷过动态规划的算法题,相信你对dp问题一定有很深的理解。如果没有,也没有关系。动态规划一句话概括就是:就是把大问题拆成小问题,记住小问题的答案,避免重复计算,最后拼出大问题的答案。

PathTimeHeuristicOptimizer算法,实际上是将ST空间进行离散,在这离散的图上寻找“最短”路径的问题,所以可以使用动态规划分而治之的思想进行求解。注意这里的“最短”,并不是路程上的最短,而是指包含障碍物代价,加速度代价等一系列代价最小的一条路径。

动态规划Task的入口在PathTimeHeuristicOptimizer类里面,下面是整个算法的求解步骤:

  • 变道和非变道加载不同的dp参数
  • 构建ST栅格图
  • ST栅格图搜素

path_time_heuristic_optimizer.cc

bool PathTimeHeuristicOptimizer::SearchPathTimeGraph(SpeedData* speed_data) const {
  // 变道和非变道加载不同的dp参数
  const auto& dp_st_speed_optimizer_config = reference_line_info_->IsChangeLanePath()
                                             ? config_.lane_change_speed_config()
                                             : config_.default_speed_config();

  // 构建ST栅格图
  GriddedPathTimeGraph st_graph(reference_line_info_->st_graph_data(), 
                                dp_st_speed_optimizer_config,
                                reference_line_info_->path_decision()->obstacles().Items(), 
                                init_point_);
  // ST栅格图搜素
  if (!st_graph.Search(speed_data).ok()) {
    AERROR << "failed to search graph with dynamic programming.";
    return false;
  }
  return true;
}

2. 采样

接下来就是在ST的维度进行采样:近端稠密一些,远端稀疏一点,这样可以节省计算量。同时变道和非变道场景,S方向采样的稠密点间隔和范围都不一样,非变道场景更稠密,范围更广一些,这样可以更好应对cut-in和cut-out场景,让纵向速度计算更精细。

在这里插入图片描述

上图中,最左下角的点,T肯定是0,而S是速度规划的起点:
在这里插入图片描述

S e n d {\color{Green} S_{end} } Send:速度规划的终点,是路径规划的path长度;
T e n d T_{end} Tend:很难计算,和道路的限速有关,也和动态障碍物有关。这里需要打破一个思维定式,在路径规划里面,必须要把比如60m路径全部规划完。在速度规划里面,不强求在 T e n d T_{end} Tend里面完全规划完 S e n d {\color{Green} S_{end} } Send。比如说路径规划出了60m,但是速度规划可以只规划20m的距离,因为现实中动态障碍物的轨迹是千奇百怪的。所以,在这里设置 T e n d = 8 s T_{end}=8s Tend=8s,在这8s的时间内,不强求把 S e n d {\color{Green} S_{end} } Send全部规划完。

关于具体采样的步骤和内容可以见下面的代码,可以说加上注释已经非常容易理解了。唯一需要补充的一点是:cost_table_其实是一个dimension_t_行,dimension_s_列的二维表,也就是T是行维度,S是列维度,将上图所示的ST离散图顺时针旋转90°,就可以刚好对应上。但是后面在计算代价和回溯的时候行列S和T又有些混淆,不过没有关系,对照图是很好理解的。

Status GriddedPathTimeGraph::InitCostTable() {
  // Time dimension is homogeneous while Spatial dimension has two resolutions,
  // dense and sparse with dense resolution coming first in the spatial horizon

  // Sanity check for numerical stability
  if (unit_t_ < kDoubleEpsilon) {
    const std::string msg = "unit_t is smaller than the kDoubleEpsilon.";
    AERROR << msg;
    return Status(ErrorCode::PLANNING_ERROR, msg);
  }

  // Sanity check on s dimension setting
  if (dense_dimension_s_ < 1) {
    const std::string msg = "dense_dimension_s is at least 1.";
    AERROR << msg;
    return Status(ErrorCode::PLANNING_ERROR, msg);
  }

  dimension_t_ = static_cast<uint32_t>(std::ceil(total_length_t_ / static_cast<double>(unit_t_))) + 1;

  double sparse_length_s = total_length_s_ -
                           static_cast<double>(dense_dimension_s_ - 1) * dense_unit_s_;
  sparse_dimension_s_ = sparse_length_s > std::numeric_limits<double>::epsilon()
                        ? static_cast<uint32_t>(std::ceil(sparse_length_s / sparse_unit_s_))
                        : 0;
  dense_dimension_s_ = sparse_length_s > std::numeric_limits<double>::epsilon()
                       ? dense_dimension_s_
                       : static_cast<uint32_t>(std::ceil(total_length_s_ / dense_unit_s_)) + 1;
  dimension_s_ = dense_dimension_s_ + sparse_dimension_s_;

  PrintCurves debug;
  // Sanity Check
  if (dimension_t_ < 1 || dimension_s_ < 1) {
    const std::string msg = "Dp st cost table size incorrect.";
    AERROR << msg;
    return Status(ErrorCode::PLANNING_ERROR, msg);
  }

  // 定义dp代价表,注意这个表:常规ST图顺时针旋转90°,一共t行,每行s列 , total_cost_默认无穷大
  cost_table_ = std::vector<std::vector<StGraphPoint>>(dimension_t_, 
                                                       std::vector<StGraphPoint>(dimension_s_, StGraphPoint()));

  double curr_t = 0.0;
  for (uint32_t i = 0; i < cost_table_.size(); ++i, curr_t += unit_t_) {          // T
    auto& cost_table_i = cost_table_[i];
    double curr_s = 0.0;

    // 稠密
    for (uint32_t j = 0; j < dense_dimension_s_; ++j, curr_s += dense_unit_s_) {  // S
      cost_table_i[j].Init(i, j, STPoint(curr_s, curr_t));
      debug.AddPoint("dp_node_points", curr_t, curr_s);
    }

    // 稀疏
    curr_s = static_cast<double>(dense_dimension_s_ - 1) * dense_unit_s_ + sparse_unit_s_;
    for (uint32_t j = dense_dimension_s_; j < cost_table_i.size(); ++j, curr_s += sparse_unit_s_) {
      cost_table_i[j].Init(i, j, STPoint(curr_s, curr_t));
      debug.AddPoint("dp_node_points", curr_t, curr_s);
    }
  }

  // 获取第一行的s
  const auto& cost_table_0 = cost_table_[0];
  spatial_distance_by_index_ = std::vector<double>(cost_table_0.size(), 0.0);
  for (uint32_t i = 0; i < cost_table_0.size(); ++i) {
    spatial_distance_by_index_[i] = cost_table_0[i].point().s();
  }
  return Status::OK();
}

3. 代价函数

接下来就是在离散表上,进行“最短”路径搜索,也就是使用动态规划算法计算速度规划起点到每个点的最小代价,然后回溯(反推)获取最优路径。

注意:速度动态规划可以进行剪枝操作,以减少计算量,依据车辆动力学max_acceleration_和max_deceleration_找到下一列可能到达的范围[next_lowest_row, next_highest_row],只需要计算当前点到下一列该范围内的点的代价。

在这里插入图片描述

一个点的total_cost由两部分构成:

  1. 点代价:
    a. 障碍物代价:obstacle_cost
    b. 距离终点代价:spatial_potential_cost
    c. 前一个点的:total_cost
  2. 边代价:
    a. 速度代价:Speedcost
    b. 加速度代价:AccelCost
    c. 加加速度代价:JerkCost

3.1 障碍物代价

Apollo 9.0动态规划中障碍物代价计算做了简化,只需要计算该点到所有t时刻障碍物的代价,并进行累加。
如下图所示的 a 62 a_{62} a62点,该时刻有两个障碍物,通过公式计算障碍物代价即可:

  • 不在范围内:0
  • 在障碍物st边界框里:∞
  • 其他: c o s t + = c o n f i g _ . o b s t a c l e _ w e i g h t ( ) ∗ c o n f i g _ . d e f a u l t _ o b s t a c l e _ c o s t ( ) ∗ s _ d i f f ∗ s _ d i f f cost += config\_.obstacle\_weight() * config\_.default\_obstacle\_cost() * s\_diff * s\_diff cost+=config_.obstacle_weight()config_.default_obstacle_cost()s_diffs_diff

这里需要注意的是,跟车和超车需要与前后车保持的安全距离大小不一样,超车情况下,需要离后车距离更大。

在这里插入图片描述

double DpStCost::GetObstacleCost(const StGraphPoint& st_graph_point) {
  const double s = st_graph_point.point().s();
  const double t = st_graph_point.point().t();

  double cost = 0.0;

  if (FLAGS_use_st_drivable_boundary) {   // false 
    // TODO(Jiancheng): move to configs
    static constexpr double boundary_resolution = 0.1;
    int index = static_cast<int>(t / boundary_resolution);
    const double lower_bound = st_drivable_boundary_.st_boundary(index).s_lower();
    const double upper_bound = st_drivable_boundary_.st_boundary(index).s_upper();

    if (s > upper_bound || s < lower_bound) {
      return kInf;
    }
  }

  // 遍历每个障碍物,计算t时刻障碍物st边界框的上界和下界,根据无人车的位置(t,s)与边界框是否重合,计算障碍物代价
  for (const auto* obstacle : obstacles_) {
    // Not applying obstacle approaching cost to virtual obstacle like created stop fences
    if (obstacle->IsVirtual()) {
      continue;
    }

    // Stop obstacles are assumed to have a safety margin when mapping them out,
    // so repelling force in dp st is not needed as it is designed to have adc
    // stop right at the stop distance we design in prior mapping process
    if (obstacle->LongitudinalDecision().has_stop()) {
      continue;
    }

    auto boundary = obstacle->path_st_boundary();

    if (boundary.min_s() > FLAGS_speed_lon_decision_horizon) {  // 纵向决策的最远距离 200
      continue;
    }
    if (t < boundary.min_t() || t > boundary.max_t()) {
      continue;
    }
    if (boundary.IsPointInBoundary(st_graph_point.point())) {
      return kInf;
    }

    // 情况4:需要减速避让或加速超过的障碍物。计算障碍物在t时刻的上界和下界位置,即上下界的累积距离s
    double s_upper = 0.0;
    double s_lower = 0.0;

    // 为了避免其他节点(t,s)再一次计算t时刻的障碍物上下界,利用缓存加速计算。GetBoundarySRange函数可以用来计算t时刻障碍物上界和下界累积距离s,并缓存
    int boundary_index = boundary_map_[boundary.id()];
    if (boundary_cost_[boundary_index][st_graph_point.index_t()].first < 0.0) { // 还没有计算过
      boundary.GetBoundarySRange(t, &s_upper, &s_lower);
      boundary_cost_[boundary_index][st_graph_point.index_t()] = std::make_pair(s_upper, s_lower);
    } else {
      s_upper = boundary_cost_[boundary_index][st_graph_point.index_t()].first; // 之前计算过,直接取值
      s_lower = boundary_cost_[boundary_index][st_graph_point.index_t()].second;
    }

    // t时刻, 无人车在障碍物后方
    if (s < s_lower) {
      const double follow_distance_s = config_.safe_distance(); // 0.2
      AINFO << "follow_distance_s : " << follow_distance_s;
      if (s + follow_distance_s < s_lower) {                    // 如果障碍物和无人车在t时刻距离大于安全距离,距离比较远,cost=0
        continue;
      } else {                                                  // 否则距离小于安全距离,计算cost。obstacle_weight:1.0,default_obstacle_cost:1000,
        auto s_diff = follow_distance_s - s_lower + s;
        cost += config_.obstacle_weight() * config_.default_obstacle_cost() * s_diff * s_diff;
      }
    // t时刻, 无人车在障碍物前方
    } else if (s > s_upper) {
      const double overtake_distance_s = StGapEstimator::EstimateSafeOvertakingGap(); // 20
      if (s > s_upper + overtake_distance_s) {  // or calculated from velocity
        continue;                               // 如果障碍物和无人车在t时刻距离大于安全距离,距离比较远,cost=0
      } else {                                  // 否则距离小于安全距离,计算cost。obstacle_weight:1.0,default_obstacle_cost:1000,
        auto s_diff = overtake_distance_s + s_upper - s;
        cost += config_.obstacle_weight() * config_.default_obstacle_cost() * s_diff * s_diff;
      }
    }
  }
  return cost * unit_t_;  // unit_t_ = 1
}

3.2 距离终点代价

距离终点代价其实就是距离路径规划终点的一个惩罚,该点距离终点越近,代价越小;距离终点越远,代价越大。

目的很简单,就是希望速度规划能够就可能将路径规划的s全部路径都覆盖。
在这里插入图片描述

double DpStCost::GetSpatialPotentialCost(const StGraphPoint& point) {
  return (total_s_ - point.point().s()) * config_.spatial_potential_penalty();  // 距离终点惩罚 100 or 100000(变道)
}

3.3 速度代价

速度代价很简单,主要考虑两方面:

  • 不要超速,尽量贴近限速;
  • 尽量贴近巡航速度,也就是驾驶员设定的速度。
double DpStCost::GetSpeedCost(const STPoint& first, const STPoint& second,
                              const double speed_limit,
                              const double cruise_speed) const {
  double cost = 0.0;
  const double speed = (second.s() - first.s()) / unit_t_; //计算时间段[t-1,t]内的平均速度
  if (speed < 0) { // 倒车?速度代价无穷大
    return kInf;
  }

  const double max_adc_stop_speed = common::VehicleConfigHelper::Instance()
                                                                 ->GetConfig().vehicle_param()
                                                                 .max_abs_speed_when_stopped();
  AINFO << "max_adc_stop_speed : "<< max_adc_stop_speed; 
  // 如果速度接近停车,并且在禁停区内 max_stop_speed = 0.2  
  if (speed < max_adc_stop_speed && InKeepClearRange(second.s())) {
    // first.s in range  在KeepClear区域低速惩罚 10 * * 1000
    cost += config_.keep_clear_low_speed_penalty() * unit_t_ * config_.default_speed_cost();
  }

  // 计算当前速度和限速的差值比,大于0,超速;小于0,未超速
  double det_speed = (speed - speed_limit) / speed_limit;
  if (det_speed > 0) {
    cost += config_.exceed_speed_penalty() * config_.default_speed_cost() * (det_speed * det_speed) * unit_t_;
  } else if (det_speed < 0) {
    cost += config_.low_speed_penalty() * config_.default_speed_cost() * -det_speed * unit_t_;
  }

  if (config_.enable_dp_reference_speed()) {
    double diff_speed = speed - cruise_speed;
    cost += config_.reference_speed_penalty() * config_.default_speed_cost() * fabs(diff_speed) * unit_t_;  // 10 * 1000 *  Δv * 
  }

  return cost;
}

3.4 加速度代价

要计算边的加速度代价,首先需要计算边的加速度。
[图片]

以起点为例,计算起点到坐标点的加速度代价,直接用差分来代替速度、加速度、Jerk:
[图片]

double DpStCost::GetAccelCostByThreePoints(const STPoint& first,
                                           const STPoint& second,
                                           const STPoint& third) {
  // 利用3个节点的累积距离s1,s2,s3来计算加速度   
  double accel = (first.s() + third.s() - 2 * second.s()) / (unit_t_ * unit_t_);
  return GetAccelCost(accel);
}

double DpStCost::GetAccelCostByTwoPoints(const double pre_speed,
                                         const STPoint& pre_point,
                                         const STPoint& curr_point) {
  // 利用2个节点的累积距离s1,s2来计算加速度
  double current_speed = (curr_point.s() - pre_point.s()) / unit_t_;
  double accel = (current_speed - pre_speed) / unit_t_;
  return GetAccelCost(accel);
}

边的加速度代价:
( w c o s t _ d e c ∗ s ¨ ) 2 1 + e ( a − m a x _ d e c ) + ( w c o s t _ a c c ∗ s ¨ ) 2 1 + e − ( a − m a x _ a c c ) \frac{\left ( w_{cost\_dec} *\ddot{s} \right ) ^{2} }{1+e^{(a- max\_dec)}} +\frac{\left ( w_{cost\_acc} *\ddot{s} \right ) ^{2} }{1+e^{-(a- max\_acc)}} 1+e(amax_dec)(wcost_decs¨)2+1+e(amax_acc)(wcost_accs¨)2

double DpStCost::GetAccelCost(const double accel) {
  double cost = 0.0;

  // 将给定的加速度 accel 标准化并加上一个偏移量 kShift 来计算得到。这样做可以确保不同的 accel 值映射到 accel_cost_ 中不同的索引位置。
  static constexpr double kEpsilon = 0.1; // 表示对加速度的精度要求
  static constexpr size_t kShift = 100;
  const size_t accel_key = static_cast<size_t>(accel / kEpsilon + 0.5 + kShift);
  
  DCHECK_LT(accel_key, accel_cost_.size());
  if (accel_key >= accel_cost_.size()) {
    return kInf;
  }

  if (accel_cost_.at(accel_key) < 0.0) {
    const double accel_sq = accel * accel;
    double max_acc = config_.max_acceleration();          // 3.0  m/s^2
    double max_dec = config_.max_deceleration();          // -4.0 m/s^2
    double accel_penalty = config_.accel_penalty();       // 1.0
    double decel_penalty = config_.decel_penalty();       // 1.0

    if (accel > 0.0) {          // 计算加速度正情况下cost
      cost = accel_penalty * accel_sq;
    } else {                    // 计算加速度负情况下cost
      cost = decel_penalty * accel_sq;
    }
    
    // 总体cost
    cost += accel_sq * decel_penalty * decel_penalty / (1 + std::exp(1.0 * (accel - max_dec))) +
            accel_sq * accel_penalty * accel_penalty / (1 + std::exp(-1.0 * (accel - max_acc)));
    accel_cost_.at(accel_key) = cost;
  } else {
    cost = accel_cost_.at(accel_key); // 该加速度之前计算过,直接索引
  }
  return cost * unit_t_;
}

3.5 jerk代价

加加速度代价的计算更简单:
[图片]

double DpStCost::JerkCost(const double jerk) {
  double cost = 0.0;
  static constexpr double kEpsilon = 0.1;
  static constexpr size_t kShift = 200;
  const size_t jerk_key = static_cast<size_t>(jerk / kEpsilon + 0.5 + kShift);  // 原理同acc的计算
  if (jerk_key >= jerk_cost_.size()) {
    return kInf;
  }

  if (jerk_cost_.at(jerk_key) < 0.0) {
    double jerk_sq = jerk * jerk;
    if (jerk > 0) {
      cost = config_.positive_jerk_coeff() * jerk_sq * unit_t_;
    } else {
      cost = config_.negative_jerk_coeff() * jerk_sq * unit_t_;
    }
    jerk_cost_.at(jerk_key) = cost;
  } else {
    cost = jerk_cost_.at(jerk_key);
  }

  // TODO(All): normalize to unit_t_
  return cost;
}

4. 回溯

回溯:也就是根据最小代价,从后往前倒推,得到最优的路径。这是动态规划算法中常用的一种手段。

因为 T e n d T_{end} Tend是人为给定的一个值,所以可能在 T e n d T_{end} Tend之前,速度规划就已经到达路径规划的终点。所以需要遍历最上边和最右边区域,找到cost最小的作为速度规划的终点。
[图片]

由于我们不要求一定要规划完所有路径,所以最优的速度规划可以是紫色那种,也有可能是黄色那种,还有可能是橙色那种。

Status GriddedPathTimeGraph::RetrieveSpeedProfile(SpeedData* const speed_data) {
  // Step 1 : 计算规划终点
  double min_cost = std::numeric_limits<double>::infinity();
  const StGraphPoint* best_end_point = nullptr;
  PrintPoints debug("dp_node_edge");
  for (const auto& points_vec : cost_table_) {
    for (const auto& pt : points_vec) {
      debug.AddPoint(pt.point().t(), pt.point().s());
    }
  }
  // for debug plot
  // debug.PrintToLog();

  // 寻找最上一行(s=S)和最右一列(t=T)中最小的cost对应的节点,作为规划终点
  for (const StGraphPoint& cur_point : cost_table_.back()) {  // 最右一列(t=T)
    if (!std::isinf(cur_point.total_cost()) &&
        cur_point.total_cost() < min_cost) {
      best_end_point = &cur_point;
      min_cost = cur_point.total_cost();
    }
  }
  for (const auto& row : cost_table_) {                      // 每一个t,也就是每一列
    const StGraphPoint& cur_point = row.back();              // 每一列的最上一行(s=S)
    if (!std::isinf(cur_point.total_cost()) &&
        cur_point.total_cost() < min_cost) {
      best_end_point = &cur_point;
      min_cost = cur_point.total_cost();
    }
  }

  if (best_end_point == nullptr) {
    const std::string msg = "Fail to find the best feasible trajectory.";
    AERROR << msg;
    return Status(ErrorCode::PLANNING_ERROR, msg);
  }

  // Step 2 : 从规划终点开始回溯,找到最小cost的规划路径
  std::vector<SpeedPoint> speed_profile;
  const StGraphPoint* cur_point = best_end_point;
  PrintPoints debug_res("dp_result");
  while (cur_point != nullptr) {
    ADEBUG << "Time: " << cur_point->point().t();
    ADEBUG << "S: " << cur_point->point().s();
    ADEBUG << "V: " << cur_point->GetOptimalSpeed();

    SpeedPoint speed_point;
    debug_res.AddPoint(cur_point->point().t(), cur_point->point().s());
    speed_point.set_s(cur_point->point().s());
    speed_point.set_t(cur_point->point().t());
    speed_profile.push_back(speed_point);

    cur_point = cur_point->pre_point();
  }
  //  for debug plot
  //   debug_res.PrintToLog();
  std::reverse(speed_profile.begin(), speed_profile.end());  // 颠倒容器中元素的顺序

  static constexpr double kEpsilon = std::numeric_limits<double>::epsilon();  // 返回的是 double 类型能够表示的最小正数
  if (speed_profile.front().t() > kEpsilon ||
      speed_profile.front().s() > kEpsilon) {
    const std::string msg = "Fail to retrieve speed profile.";
    AERROR << msg;
    return Status(ErrorCode::PLANNING_ERROR, msg);
  }
  AINFO << "front: " << speed_profile.front().t() << " " << speed_profile.front().s();

  // Step 3 : 计算速度 v
  for (size_t i = 0; i + 1 < speed_profile.size(); ++i) {
    const double v = (speed_profile[i + 1].s() - speed_profile[i].s()) /
                     (speed_profile[i + 1].t() - speed_profile[i].t() + 1e-3);  // 斜率 
    speed_profile[i].set_v(v);
    AINFO << "v: " << v;
  }

  *speed_data = SpeedData(speed_profile);
  return Status::OK();
}