这一章将来讲解速度决策算法,也就是SPEED_HEURISTIC_OPTIMIZER task里面的内容。Apollo 9.0使用动态规划算法进行速度决策,从类名来看,PathTimeHeuristicOptimizer 是路径时间启发式优化器,顾名思义,这里的算法在非凸的ST空间进行了纵向超让的决策,同时也为后续速度规划算法提供了一个启发式的粗解。
1. 动态规划
动态规划,英文:Dynamic Programming,简称DP。
关于动态规划,网上其实有很多讲解,都很详细,这里不再赘述。其实如果你刷过动态规划的算法题,相信你对dp问题一定有很深的理解。如果没有,也没有关系。动态规划一句话概括就是:就是把大问题拆成小问题,记住小问题的答案,避免重复计算,最后拼出大问题的答案。
PathTimeHeuristicOptimizer算法,实际上是将ST空间进行离散,在这离散的图上寻找“最短”路径的问题,所以可以使用动态规划分而治之的思想进行求解。注意这里的“最短”,并不是路程上的最短,而是指包含障碍物代价,加速度代价等一系列代价最小的一条路径。
动态规划Task的入口在PathTimeHeuristicOptimizer类里面,下面是整个算法的求解步骤:
- 变道和非变道加载不同的dp参数
- 构建ST栅格图
- ST栅格图搜素
path_time_heuristic_optimizer.cc
bool PathTimeHeuristicOptimizer::SearchPathTimeGraph(SpeedData* speed_data) const {
// 变道和非变道加载不同的dp参数
const auto& dp_st_speed_optimizer_config = reference_line_info_->IsChangeLanePath()
? config_.lane_change_speed_config()
: config_.default_speed_config();
// 构建ST栅格图
GriddedPathTimeGraph st_graph(reference_line_info_->st_graph_data(),
dp_st_speed_optimizer_config,
reference_line_info_->path_decision()->obstacles().Items(),
init_point_);
// ST栅格图搜素
if (!st_graph.Search(speed_data).ok()) {
AERROR << "failed to search graph with dynamic programming.";
return false;
}
return true;
}
2. 采样
接下来就是在ST的维度进行采样:近端稠密一些,远端稀疏一点,这样可以节省计算量。同时变道和非变道场景,S方向采样的稠密点间隔和范围都不一样,非变道场景更稠密,范围更广一些,这样可以更好应对cut-in和cut-out场景,让纵向速度计算更精细。
上图中,最左下角的点,T肯定是0,而S是速度规划的起点:
S e n d {\color{Green} S_{end} } Send:速度规划的终点,是路径规划的path长度;
T e n d T_{end} Tend:很难计算,和道路的限速有关,也和动态障碍物有关。这里需要打破一个思维定式,在路径规划里面,必须要把比如60m路径全部规划完。在速度规划里面,不强求在 T e n d T_{end} Tend里面完全规划完 S e n d {\color{Green} S_{end} } Send。比如说路径规划出了60m,但是速度规划可以只规划20m的距离,因为现实中动态障碍物的轨迹是千奇百怪的。所以,在这里设置 T e n d = 8 s T_{end}=8s Tend=8s,在这8s的时间内,不强求把 S e n d {\color{Green} S_{end} } Send全部规划完。
关于具体采样的步骤和内容可以见下面的代码,可以说加上注释已经非常容易理解了。唯一需要补充的一点是:cost_table_其实是一个dimension_t_行,dimension_s_列的二维表,也就是T是行维度,S是列维度,将上图所示的ST离散图顺时针旋转90°,就可以刚好对应上。但是后面在计算代价和回溯的时候行列S和T又有些混淆,不过没有关系,对照图是很好理解的。
Status GriddedPathTimeGraph::InitCostTable() {
// Time dimension is homogeneous while Spatial dimension has two resolutions,
// dense and sparse with dense resolution coming first in the spatial horizon
// Sanity check for numerical stability
if (unit_t_ < kDoubleEpsilon) {
const std::string msg = "unit_t is smaller than the kDoubleEpsilon.";
AERROR << msg;
return Status(ErrorCode::PLANNING_ERROR, msg);
}
// Sanity check on s dimension setting
if (dense_dimension_s_ < 1) {
const std::string msg = "dense_dimension_s is at least 1.";
AERROR << msg;
return Status(ErrorCode::PLANNING_ERROR, msg);
}
dimension_t_ = static_cast<uint32_t>(std::ceil(total_length_t_ / static_cast<double>(unit_t_))) + 1;
double sparse_length_s = total_length_s_ -
static_cast<double>(dense_dimension_s_ - 1) * dense_unit_s_;
sparse_dimension_s_ = sparse_length_s > std::numeric_limits<double>::epsilon()
? static_cast<uint32_t>(std::ceil(sparse_length_s / sparse_unit_s_))
: 0;
dense_dimension_s_ = sparse_length_s > std::numeric_limits<double>::epsilon()
? dense_dimension_s_
: static_cast<uint32_t>(std::ceil(total_length_s_ / dense_unit_s_)) + 1;
dimension_s_ = dense_dimension_s_ + sparse_dimension_s_;
PrintCurves debug;
// Sanity Check
if (dimension_t_ < 1 || dimension_s_ < 1) {
const std::string msg = "Dp st cost table size incorrect.";
AERROR << msg;
return Status(ErrorCode::PLANNING_ERROR, msg);
}
// 定义dp代价表,注意这个表:常规ST图顺时针旋转90°,一共t行,每行s列 , total_cost_默认无穷大
cost_table_ = std::vector<std::vector<StGraphPoint>>(dimension_t_,
std::vector<StGraphPoint>(dimension_s_, StGraphPoint()));
double curr_t = 0.0;
for (uint32_t i = 0; i < cost_table_.size(); ++i, curr_t += unit_t_) { // T
auto& cost_table_i = cost_table_[i];
double curr_s = 0.0;
// 稠密
for (uint32_t j = 0; j < dense_dimension_s_; ++j, curr_s += dense_unit_s_) { // S
cost_table_i[j].Init(i, j, STPoint(curr_s, curr_t));
debug.AddPoint("dp_node_points", curr_t, curr_s);
}
// 稀疏
curr_s = static_cast<double>(dense_dimension_s_ - 1) * dense_unit_s_ + sparse_unit_s_;
for (uint32_t j = dense_dimension_s_; j < cost_table_i.size(); ++j, curr_s += sparse_unit_s_) {
cost_table_i[j].Init(i, j, STPoint(curr_s, curr_t));
debug.AddPoint("dp_node_points", curr_t, curr_s);
}
}
// 获取第一行的s
const auto& cost_table_0 = cost_table_[0];
spatial_distance_by_index_ = std::vector<double>(cost_table_0.size(), 0.0);
for (uint32_t i = 0; i < cost_table_0.size(); ++i) {
spatial_distance_by_index_[i] = cost_table_0[i].point().s();
}
return Status::OK();
}
3. 代价函数
接下来就是在离散表上,进行“最短”路径搜索,也就是使用动态规划算法计算速度规划起点到每个点的最小代价,然后回溯(反推)获取最优路径。
注意:速度动态规划可以进行剪枝操作,以减少计算量,依据车辆动力学max_acceleration_和max_deceleration_找到下一列可能到达的范围[next_lowest_row, next_highest_row],只需要计算当前点到下一列该范围内的点的代价。
一个点的total_cost由两部分构成:
- 点代价:
a. 障碍物代价:obstacle_cost
b. 距离终点代价:spatial_potential_cost
c. 前一个点的:total_cost - 边代价:
a. 速度代价:Speedcost
b. 加速度代价:AccelCost
c. 加加速度代价:JerkCost
3.1 障碍物代价
Apollo 9.0动态规划中障碍物代价计算做了简化,只需要计算该点到所有t时刻障碍物的代价,并进行累加。
如下图所示的 a 62 a_{62} a62点,该时刻有两个障碍物,通过公式计算障碍物代价即可:
- 不在范围内:0
- 在障碍物st边界框里:∞
- 其他: c o s t + = c o n f i g _ . o b s t a c l e _ w e i g h t ( ) ∗ c o n f i g _ . d e f a u l t _ o b s t a c l e _ c o s t ( ) ∗ s _ d i f f ∗ s _ d i f f cost += config\_.obstacle\_weight() * config\_.default\_obstacle\_cost() * s\_diff * s\_diff cost+=config_.obstacle_weight()∗config_.default_obstacle_cost()∗s_diff∗s_diff
这里需要注意的是,跟车和超车需要与前后车保持的安全距离大小不一样,超车情况下,需要离后车距离更大。
double DpStCost::GetObstacleCost(const StGraphPoint& st_graph_point) {
const double s = st_graph_point.point().s();
const double t = st_graph_point.point().t();
double cost = 0.0;
if (FLAGS_use_st_drivable_boundary) { // false
// TODO(Jiancheng): move to configs
static constexpr double boundary_resolution = 0.1;
int index = static_cast<int>(t / boundary_resolution);
const double lower_bound = st_drivable_boundary_.st_boundary(index).s_lower();
const double upper_bound = st_drivable_boundary_.st_boundary(index).s_upper();
if (s > upper_bound || s < lower_bound) {
return kInf;
}
}
// 遍历每个障碍物,计算t时刻障碍物st边界框的上界和下界,根据无人车的位置(t,s)与边界框是否重合,计算障碍物代价
for (const auto* obstacle : obstacles_) {
// Not applying obstacle approaching cost to virtual obstacle like created stop fences
if (obstacle->IsVirtual()) {
continue;
}
// Stop obstacles are assumed to have a safety margin when mapping them out,
// so repelling force in dp st is not needed as it is designed to have adc
// stop right at the stop distance we design in prior mapping process
if (obstacle->LongitudinalDecision().has_stop()) {
continue;
}
auto boundary = obstacle->path_st_boundary();
if (boundary.min_s() > FLAGS_speed_lon_decision_horizon) { // 纵向决策的最远距离 200
continue;
}
if (t < boundary.min_t() || t > boundary.max_t()) {
continue;
}
if (boundary.IsPointInBoundary(st_graph_point.point())) {
return kInf;
}
// 情况4:需要减速避让或加速超过的障碍物。计算障碍物在t时刻的上界和下界位置,即上下界的累积距离s
double s_upper = 0.0;
double s_lower = 0.0;
// 为了避免其他节点(t,s)再一次计算t时刻的障碍物上下界,利用缓存加速计算。GetBoundarySRange函数可以用来计算t时刻障碍物上界和下界累积距离s,并缓存
int boundary_index = boundary_map_[boundary.id()];
if (boundary_cost_[boundary_index][st_graph_point.index_t()].first < 0.0) { // 还没有计算过
boundary.GetBoundarySRange(t, &s_upper, &s_lower);
boundary_cost_[boundary_index][st_graph_point.index_t()] = std::make_pair(s_upper, s_lower);
} else {
s_upper = boundary_cost_[boundary_index][st_graph_point.index_t()].first; // 之前计算过,直接取值
s_lower = boundary_cost_[boundary_index][st_graph_point.index_t()].second;
}
// t时刻, 无人车在障碍物后方
if (s < s_lower) {
const double follow_distance_s = config_.safe_distance(); // 0.2
AINFO << "follow_distance_s : " << follow_distance_s;
if (s + follow_distance_s < s_lower) { // 如果障碍物和无人车在t时刻距离大于安全距离,距离比较远,cost=0
continue;
} else { // 否则距离小于安全距离,计算cost。obstacle_weight:1.0,default_obstacle_cost:1000,
auto s_diff = follow_distance_s - s_lower + s;
cost += config_.obstacle_weight() * config_.default_obstacle_cost() * s_diff * s_diff;
}
// t时刻, 无人车在障碍物前方
} else if (s > s_upper) {
const double overtake_distance_s = StGapEstimator::EstimateSafeOvertakingGap(); // 20
if (s > s_upper + overtake_distance_s) { // or calculated from velocity
continue; // 如果障碍物和无人车在t时刻距离大于安全距离,距离比较远,cost=0
} else { // 否则距离小于安全距离,计算cost。obstacle_weight:1.0,default_obstacle_cost:1000,
auto s_diff = overtake_distance_s + s_upper - s;
cost += config_.obstacle_weight() * config_.default_obstacle_cost() * s_diff * s_diff;
}
}
}
return cost * unit_t_; // unit_t_ = 1
}
3.2 距离终点代价
距离终点代价其实就是距离路径规划终点的一个惩罚,该点距离终点越近,代价越小;距离终点越远,代价越大。
目的很简单,就是希望速度规划能够就可能将路径规划的s全部路径都覆盖。
double DpStCost::GetSpatialPotentialCost(const StGraphPoint& point) {
return (total_s_ - point.point().s()) * config_.spatial_potential_penalty(); // 距离终点惩罚 100 or 100000(变道)
}
3.3 速度代价
速度代价很简单,主要考虑两方面:
- 不要超速,尽量贴近限速;
- 尽量贴近巡航速度,也就是驾驶员设定的速度。
double DpStCost::GetSpeedCost(const STPoint& first, const STPoint& second,
const double speed_limit,
const double cruise_speed) const {
double cost = 0.0;
const double speed = (second.s() - first.s()) / unit_t_; //计算时间段[t-1,t]内的平均速度
if (speed < 0) { // 倒车?速度代价无穷大
return kInf;
}
const double max_adc_stop_speed = common::VehicleConfigHelper::Instance()
->GetConfig().vehicle_param()
.max_abs_speed_when_stopped();
AINFO << "max_adc_stop_speed : "<< max_adc_stop_speed;
// 如果速度接近停车,并且在禁停区内 max_stop_speed = 0.2
if (speed < max_adc_stop_speed && InKeepClearRange(second.s())) {
// first.s in range 在KeepClear区域低速惩罚 10 * * 1000
cost += config_.keep_clear_low_speed_penalty() * unit_t_ * config_.default_speed_cost();
}
// 计算当前速度和限速的差值比,大于0,超速;小于0,未超速
double det_speed = (speed - speed_limit) / speed_limit;
if (det_speed > 0) {
cost += config_.exceed_speed_penalty() * config_.default_speed_cost() * (det_speed * det_speed) * unit_t_;
} else if (det_speed < 0) {
cost += config_.low_speed_penalty() * config_.default_speed_cost() * -det_speed * unit_t_;
}
if (config_.enable_dp_reference_speed()) {
double diff_speed = speed - cruise_speed;
cost += config_.reference_speed_penalty() * config_.default_speed_cost() * fabs(diff_speed) * unit_t_; // 10 * 1000 * Δv *
}
return cost;
}
3.4 加速度代价
要计算边的加速度代价,首先需要计算边的加速度。
以起点为例,计算起点到坐标点的加速度代价,直接用差分来代替速度、加速度、Jerk:
double DpStCost::GetAccelCostByThreePoints(const STPoint& first,
const STPoint& second,
const STPoint& third) {
// 利用3个节点的累积距离s1,s2,s3来计算加速度
double accel = (first.s() + third.s() - 2 * second.s()) / (unit_t_ * unit_t_);
return GetAccelCost(accel);
}
double DpStCost::GetAccelCostByTwoPoints(const double pre_speed,
const STPoint& pre_point,
const STPoint& curr_point) {
// 利用2个节点的累积距离s1,s2来计算加速度
double current_speed = (curr_point.s() - pre_point.s()) / unit_t_;
double accel = (current_speed - pre_speed) / unit_t_;
return GetAccelCost(accel);
}
边的加速度代价:
( w c o s t _ d e c ∗ s ¨ ) 2 1 + e ( a − m a x _ d e c ) + ( w c o s t _ a c c ∗ s ¨ ) 2 1 + e − ( a − m a x _ a c c ) \frac{\left ( w_{cost\_dec} *\ddot{s} \right ) ^{2} }{1+e^{(a- max\_dec)}} +\frac{\left ( w_{cost\_acc} *\ddot{s} \right ) ^{2} }{1+e^{-(a- max\_acc)}} 1+e(a−max_dec)(wcost_dec∗s¨)2+1+e−(a−max_acc)(wcost_acc∗s¨)2
double DpStCost::GetAccelCost(const double accel) {
double cost = 0.0;
// 将给定的加速度 accel 标准化并加上一个偏移量 kShift 来计算得到。这样做可以确保不同的 accel 值映射到 accel_cost_ 中不同的索引位置。
static constexpr double kEpsilon = 0.1; // 表示对加速度的精度要求
static constexpr size_t kShift = 100;
const size_t accel_key = static_cast<size_t>(accel / kEpsilon + 0.5 + kShift);
DCHECK_LT(accel_key, accel_cost_.size());
if (accel_key >= accel_cost_.size()) {
return kInf;
}
if (accel_cost_.at(accel_key) < 0.0) {
const double accel_sq = accel * accel;
double max_acc = config_.max_acceleration(); // 3.0 m/s^2
double max_dec = config_.max_deceleration(); // -4.0 m/s^2
double accel_penalty = config_.accel_penalty(); // 1.0
double decel_penalty = config_.decel_penalty(); // 1.0
if (accel > 0.0) { // 计算加速度正情况下cost
cost = accel_penalty * accel_sq;
} else { // 计算加速度负情况下cost
cost = decel_penalty * accel_sq;
}
// 总体cost
cost += accel_sq * decel_penalty * decel_penalty / (1 + std::exp(1.0 * (accel - max_dec))) +
accel_sq * accel_penalty * accel_penalty / (1 + std::exp(-1.0 * (accel - max_acc)));
accel_cost_.at(accel_key) = cost;
} else {
cost = accel_cost_.at(accel_key); // 该加速度之前计算过,直接索引
}
return cost * unit_t_;
}
3.5 jerk代价
加加速度代价的计算更简单:
double DpStCost::JerkCost(const double jerk) {
double cost = 0.0;
static constexpr double kEpsilon = 0.1;
static constexpr size_t kShift = 200;
const size_t jerk_key = static_cast<size_t>(jerk / kEpsilon + 0.5 + kShift); // 原理同acc的计算
if (jerk_key >= jerk_cost_.size()) {
return kInf;
}
if (jerk_cost_.at(jerk_key) < 0.0) {
double jerk_sq = jerk * jerk;
if (jerk > 0) {
cost = config_.positive_jerk_coeff() * jerk_sq * unit_t_;
} else {
cost = config_.negative_jerk_coeff() * jerk_sq * unit_t_;
}
jerk_cost_.at(jerk_key) = cost;
} else {
cost = jerk_cost_.at(jerk_key);
}
// TODO(All): normalize to unit_t_
return cost;
}
4. 回溯
回溯:也就是根据最小代价,从后往前倒推,得到最优的路径。这是动态规划算法中常用的一种手段。
因为 T e n d T_{end} Tend是人为给定的一个值,所以可能在 T e n d T_{end} Tend之前,速度规划就已经到达路径规划的终点。所以需要遍历最上边和最右边区域,找到cost最小的作为速度规划的终点。
由于我们不要求一定要规划完所有路径,所以最优的速度规划可以是紫色那种,也有可能是黄色那种,还有可能是橙色那种。
Status GriddedPathTimeGraph::RetrieveSpeedProfile(SpeedData* const speed_data) {
// Step 1 : 计算规划终点
double min_cost = std::numeric_limits<double>::infinity();
const StGraphPoint* best_end_point = nullptr;
PrintPoints debug("dp_node_edge");
for (const auto& points_vec : cost_table_) {
for (const auto& pt : points_vec) {
debug.AddPoint(pt.point().t(), pt.point().s());
}
}
// for debug plot
// debug.PrintToLog();
// 寻找最上一行(s=S)和最右一列(t=T)中最小的cost对应的节点,作为规划终点
for (const StGraphPoint& cur_point : cost_table_.back()) { // 最右一列(t=T)
if (!std::isinf(cur_point.total_cost()) &&
cur_point.total_cost() < min_cost) {
best_end_point = &cur_point;
min_cost = cur_point.total_cost();
}
}
for (const auto& row : cost_table_) { // 每一个t,也就是每一列
const StGraphPoint& cur_point = row.back(); // 每一列的最上一行(s=S)
if (!std::isinf(cur_point.total_cost()) &&
cur_point.total_cost() < min_cost) {
best_end_point = &cur_point;
min_cost = cur_point.total_cost();
}
}
if (best_end_point == nullptr) {
const std::string msg = "Fail to find the best feasible trajectory.";
AERROR << msg;
return Status(ErrorCode::PLANNING_ERROR, msg);
}
// Step 2 : 从规划终点开始回溯,找到最小cost的规划路径
std::vector<SpeedPoint> speed_profile;
const StGraphPoint* cur_point = best_end_point;
PrintPoints debug_res("dp_result");
while (cur_point != nullptr) {
ADEBUG << "Time: " << cur_point->point().t();
ADEBUG << "S: " << cur_point->point().s();
ADEBUG << "V: " << cur_point->GetOptimalSpeed();
SpeedPoint speed_point;
debug_res.AddPoint(cur_point->point().t(), cur_point->point().s());
speed_point.set_s(cur_point->point().s());
speed_point.set_t(cur_point->point().t());
speed_profile.push_back(speed_point);
cur_point = cur_point->pre_point();
}
// for debug plot
// debug_res.PrintToLog();
std::reverse(speed_profile.begin(), speed_profile.end()); // 颠倒容器中元素的顺序
static constexpr double kEpsilon = std::numeric_limits<double>::epsilon(); // 返回的是 double 类型能够表示的最小正数
if (speed_profile.front().t() > kEpsilon ||
speed_profile.front().s() > kEpsilon) {
const std::string msg = "Fail to retrieve speed profile.";
AERROR << msg;
return Status(ErrorCode::PLANNING_ERROR, msg);
}
AINFO << "front: " << speed_profile.front().t() << " " << speed_profile.front().s();
// Step 3 : 计算速度 v
for (size_t i = 0; i + 1 < speed_profile.size(); ++i) {
const double v = (speed_profile[i + 1].s() - speed_profile[i].s()) /
(speed_profile[i + 1].t() - speed_profile[i].t() + 1e-3); // 斜率
speed_profile[i].set_v(v);
AINFO << "v: " << v;
}
*speed_data = SpeedData(speed_profile);
return Status::OK();
}