目标跟踪之DeepSort算法(4)

发布于:2025-03-15 ⋅ 阅读:(20) ⋅ 点赞:(0)

1 安装

参考:https://github.com/beeduchai/YOLOv8-DeepSORT-Object-Tracking

1.1 代码下载与安装

1 创建一个全新的虚拟环境

conda create -n yolov8_deepsort python=3.9

2 激活虚拟环境

conda activate yolov8_deepsort

3 在当前地址创建一个文件夹存放将要下载的YOLOv8-DeepSORT-Object-Tracking

mkdir my_yolov8_deepsort 

4 跳转到yolov8_deepsort 文件夹

cd my_yolov8_deepsort 

5 下载代码

git clone https://github.com/MuhammadMoinFaisal/YOLOv8-DeepSORT-Object-Tracking.git

6 安装依赖库

pip install -e .

7 在https://drive.google.com/drive/folders/1kna8eWGrSfzaR6DtNJ8_GchGgPMv3VC8下载文件,将文件夹解压放在ultralytics/yolo/v8/detect目录下

8 在ultralytics/yolo/v8/detect放一个视频,执行以下命令.首次执行以下代码,会自动取下载yolov8l.pt模型

python predict.py model=yolov8l.pt source="traffic.mp4" show=True

注释:如果出现错误,可能是库的版本不对,根据报错提示更改版本。

在这里插入图片描述

1. 2 DeepSort检测流程

DeepSort检测流程:

  1. 模型初始化
  2. 模型推理
# ultralytics/yolo/v8/detect/predict.py
def predict(cfg):    # cfg:ultralytics/yolo/configs/default.yaml  
    # 1.模型初始化
    init_tracker()   
    cfg.model = cfg.model or "yolov8n.pt"
    cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2)  # check image size
    cfg.source = cfg.source if cfg.source is not None else ROOT / "assets"
    # 2. 模型推理
    predictor = DetectionPredictor(cfg)
    predictor()

1.3 模型初始化流程

  1. 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数。
  2. 初始化跟踪器
    2.1 实例化特征提取器
    2.2 实例化匹配代价矩阵
    2.3 实例化跟踪器
# ultralytics/yolo/v8/detect/predict.py 
def init_tracker():
    global deepsort
    # 1 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数
    cfg_deep = get_config()  
    cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")
    # 2 初始化跟踪器
    deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
                            max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,
                            nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
                            max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,
                            use_cuda=True)

# ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/deep_sort.py
class DeepSort(object):
    def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
        self.min_confidence = min_confidence   # 检测物体的最小置信度
        self.nms_max_overlap = nms_max_overlap # NMS时iou的最大值
        # 2.1 实例化特征提取器
        self.extractor = Extractor(model_path, use_cuda=use_cuda)   # 提取检测到的物体的特征,用于计算轨迹与检测框的余弦距离

        max_cosine_distance = max_dist         # 计算距离的最大值
        2.2 实例化匹配代价矩阵
        metric = NearestNeighborDistanceMetric(
            "cosine", max_cosine_distance, nn_budget)   # metric计算匹配代价矩阵。在级联匹配中用欧式距离计算特级特征和检测特征的距离
        2.3 实例化跟踪器
        self.tracker = Tracker(
            metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)

2. 模型推理

模型推理流程:

  1. 准备模型和数据,对数据进行预处理
    1.1 数据进行预处理
  2. 数据带入模型得到预测结果
  3. 处理预测结果
    3.1 处理预测结果
  4. 对预测结果跟踪

2.1 模型推理代码解析

# ultralytics/yolo/engine/predictor.py
    @smart_inference_mode()
    def __call__(self, source=None, model=None):
        # 1.准备模型和数据,对数据进行预处理
        self.run_callbacks("on_predict_start")
        model = self.model if self.done_setup else self.setup(source, model)
        model.eval()
        self.seen, self.windows, self.dt = 0, [], (ops.Profile(), ops.Profile(), ops.Profile())
        self.all_outputs = []
        for batch in self.dataset:
            self.run_callbacks("on_predict_batch_start")
            path, im, im0s, vid_cap, s = batch
            visualize = increment_path(self.save_dir / Path(path).stem, mkdir=True) if self.args.visualize else False
            with self.dt[0]:
                im = self.preprocess(im)
                if len(im.shape) == 3:
                    im = im[None]  # expand for batch dim
            # 2. 数据带入模型得到预测结果
            # Inference
            with self.dt[1]:
                preds = model(im, augment=self.args.augment, visualize=visualize)

            # 3. 处理预测结果
            # postprocess
            with self.dt[2]:
                preds = self.postprocess(preds, im, im0s)

            for i in range(len(im)):
                if self.webcam:
                    path, im0s = path[i], im0s[i]
                p = Path(path)
                # 4. 对预测结果跟踪
                s += self.write_results(i, preds, (p, im, im0s))

                if self.args.show:
                    self.show(p)

                if self.args.save:
                    self.save_preds(vid_cap, i, str(self.save_dir / p.name))

            # Print time (inference-only)
            LOGGER.info(f"{s}{'' if len(preds) else '(no detections), '}{self.dt[1].dt * 1E3:.1f}ms")

            self.run_callbacks("on_predict_batch_end")

        # Print results
        t = tuple(x.t / self.seen * 1E3 for x in self.dt)  # speeds per image
        LOGGER.info(
            f'Speed: %.1fms pre-process, %.1fms inference, %.1fms postprocess per image at shape {(1, 3, *self.imgsz)}'
            % t)
        if self.args.save_txt or self.args.save:
            s = f"\n{len(list(self.save_dir.glob('labels/*.txt')))} labels saved to {self.save_dir / 'labels'}" if self.args.save_txt else ''
            LOGGER.info(f"Results saved to {colorstr('bold', self.save_dir)}{s}")

        self.run_callbacks("on_predict_end")
        return self.all_outputs

# 1.1 数据进行预处理 ultralytics/yolo/v8/detect/predict.py
    def preprocess(self, img):
        img = torch.from_numpy(img).to(self.model.device)     # 图片数据转换成tensor格式
        img = img.half() if self.model.fp16 else img.float()  # uint8 to fp16/32  选择半精度还是双精度
        img /= 255  # 0 - 255 to 0.0 - 1.0                    # 图片数据缩放到0~1之间
        return img
        
# 3.1 处理预测结果 ultralytics/yolo/v8/detect/predict.py
    def postprocess(self, preds, img, orig_img):
        # 1 非极大值抑制
        preds = ops.non_max_suppression(preds,
                                        self.args.conf,
                                        self.args.iou,
                                        agnostic=self.args.agnostic_nms,
                                        max_det=self.args.max_det) 

        for i, pred in enumerate(preds):
            shape = orig_img[i].shape if self.webcam else orig_img.shape
            # 2 把检测到的框映射到原图
            pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round()

        return preds

2.2 对预测结果跟踪代码解析

把检测框的中心点宽高、置信度、类别、原图输入 deepsort.update()中进行跟踪,跟踪代码流程如下:

  1. 提取特征。
  2. 轨迹预测。
  3. 轨迹跟踪。
# ultralytics/yolo/v8/detect/predict.py-->def write_results(self, idx, preds, batch)-->outputs = deepsort.update(xywhs, confss, oids, im0)-->  self.tracker.predict() self.tracker.update(detections)

    def update(self, bbox_xywh, confidences, oids, ori_img):
        self.height, self.width = ori_img.shape[:2]
        # generate detections 
        # 1. 提取特征
        features = self._get_features(bbox_xywh, ori_img)  # 提取检测框对应物体的特征
        bbox_tlwh = self._xywh_to_tlwh(bbox_xywh) # 把框由中心点宽高的格式转换为左上角坐标和宽高的格式
        detections = [Detection(bbox_tlwh[i], conf, features[i],oid) for i, (conf,oid) in enumerate(zip(confidences,oids)) if conf > self.min_confidence]   # 删除资信度小于阈值的检测框,并把检测框的特征添加到detections中

        # run on non-maximum supression
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])

        # update tracker
        # 2. 轨迹预测
        self.tracker.predict()
        # 3. 轨迹跟踪
        self.tracker.update(detections)

        # output bbox identities
        outputs = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
                continue 
            box = track.to_tlwh()
            x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
            track_id = track.track_id
            track_oid = track.oid
            outputs.append(np.array([x1, y1, x2, y2, track_id, track_oid], dtype=np.int))
        if len(outputs) > 0:
            outputs = np.stack(outputs, axis=0)
        return outputs

2.3 轨迹预测

ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/sort/kalman_filter.py
self.tracker.predict()–>track.predict(self.kf)–>def predict(self, mean, covariance)
轨迹预测预测流程:

  1. 状态转移矩阵。
  2. 根据状态转移矩阵获取下一状态和其协方差。
    def predict(self, mean, covariance):
        """Run Kalman filter prediction step.

        Parameters
        ----------
        mean : ndarray
            The 8 dimensional mean vector of the object state at the previous
            time step.
        covariance : ndarray
            The 8x8 dimensional covariance matrix of the object state at the
            previous time step.

        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.

        """
        std_pos = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            1e-2,
            self._std_weight_position * mean[3]]
        std_vel = [
            self._std_weight_velocity * mean[3],
            self._std_weight_velocity * mean[3],
            1e-5,
            self._std_weight_velocity * mean[3]]
        # 1. 状态转移矩阵
        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
        # 2. 根据状态转移矩阵获取下一状态和其协方差
        mean = np.dot(self._motion_mat, mean)
        covariance = np.linalg.multi_dot((
            self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

        return mean, covariance

2.4 轨迹跟踪

ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/sort/tracker.py
self.tracker.update(detections)
轨迹跟踪步骤:

  1. 轨迹与特征匹配
  2. 更新匹配轨迹、未匹配轨迹和未匹配检测
  3. 更新轨迹的特征

    def update(self, detections):
        """Perform measurement update and track management.

        Parameters
        ----------   bbox_tlwh[i], conf, features[i],oid
        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        """
        # Run matching cascade.  1. 轨迹与特征 级联匹配和IOU匹配
        matches, unmatched_tracks, unmatched_detections = \
            self._match(detections)

        # Update track set. 2. 更新匹配轨迹、未匹配轨迹和未匹配检测
        for track_idx, detection_idx in matches:
            self.tracks[track_idx].update(
                self.kf, detections[detection_idx])                   # 对匹配轨迹更新得到当前最优轨迹
        for track_idx in unmatched_tracks:
            self.tracks[track_idx].mark_missed()                      # 对未匹配轨迹进行标识
        for detection_idx in unmatched_detections:
            self._initiate_track(detections[detection_idx])           # 对新检测框初始化轨迹
        self.tracks = [t for t in self.tracks if not t.is_deleted()]  # 剔除掉删除的轨迹

        # Update distance metric. 3. 更新轨迹的历史特征
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]   # 存放确认状态轨迹的id
        features, targets = [], []  # 分别存放当前所有轨迹的特征和id
        for track in self.tracks:
            if not track.is_confirmed():
                continue
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
        self.metric.partial_fit(   #
            np.asarray(features), np.asarray(targets), active_targets)



2.5 轨迹与特征匹配

轨迹与特征匹配步骤:

  1. 把轨迹分为确认状态和非确认状态。
  2. 级联匹配。通过级联匹配得到确认匹配轨迹和检测的id
    matches_a,未匹配轨迹unmatched_tracks_a, 未匹配检测unmatched_detections
  3. IOU匹配。把级联匹配步骤中只有一次未匹配的轨迹并如非确认轨迹用于IOU匹配;保留级联匹配步骤中未匹配的轨迹中连续2帧或2帧以上没有匹配的轨迹。
  4. 更新级联匹配和IOU匹配的结果。
    def _match(self, detections):

        def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])   # 检测框的特征
            targets = np.array([tracks[i].track_id for i in track_indices])     # 轨迹的id
            cost_matrix = self.metric.distance(features, targets)               # 计算轨迹与检测的特征余弦距离
            cost_matrix = linear_assignment.gate_cost_matrix(                   # 用轨迹与检测的马氏距离跟新cost_matrix矩阵
                self.kf, cost_matrix, tracks, dets, track_indices,
                detection_indices)

            return cost_matrix

        # Split track set into confirmed and unconfirmed tracks.
        # 1 把轨迹分为确认状态和非确认状态
        confirmed_tracks = [
            i for i, t in enumerate(self.tracks) if t.is_confirmed()]
        unconfirmed_tracks = [
            i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

        # Associate confirmed tracks using appearance features. 
        # 2 级联匹配
        matches_a, unmatched_tracks_a, unmatched_detections = \
            linear_assignment.matching_cascade(
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)

        # Associate remaining tracks together with unconfirmed tracks using IOU.
        # 3 IOU匹配
        iou_track_candidates = unconfirmed_tracks + [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update == 1]   # 如果级联未匹配轨迹上一帧匹配成功,这一帧匹配失败,则把其添加到不确认轨迹中
        unmatched_tracks_a = [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]   # 跟新unmatched_tracks_a,只保留大于等于连续两帧没有被匹配上的
        matches_b, unmatched_tracks_b, unmatched_detections = \
            linear_assignment.min_cost_matching(
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)  # IOU匹配
        # 4 更新级联匹配和IOU匹配的结果
        matches = matches_a + matches_b
        unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
        return matches, unmatched_tracks, unmatched_detections

    def _initiate_track(self, detection):
        mean, covariance = self.kf.initiate(detection.to_xyah()) # 根据新检测到的框初始化轨迹的数值和协方差
        self.tracks.append(Track(
            mean, covariance, self._next_id, self.n_init, self.max_age,detection.oid,
            detection.feature))               # 添加新的轨迹
        self._next_id += 1

# 2 级联匹配
级联匹配步骤:
2.1 获取(1 + level)次没有被匹配上的轨迹
2.2 计算轨迹特征和检测特征的代价矩阵
    2.2.1 计算轨迹与检测的特征余弦距离
    2.2.2 用轨迹与检测的马氏距离跟新cost_matrix矩阵

def matching_cascade(
        distance_metric, max_distance, cascade_depth, tracks, detections,
        track_indices=None, detection_indices=None):
    """Run matching cascade.

    Parameters
    ----------
    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
        disregarded.
    cascade_depth: int
        The cascade depth, should be se to the maximum track age.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : Optional[List[int]]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above). Defaults to all tracks.
    detection_indices : Optional[List[int]]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above). Defaults to all
        detections.

    Returns
    -------
    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    """
    if track_indices is None:
        track_indices = list(range(len(tracks)))
    if detection_indices is None:
        detection_indices = list(range(len(detections)))

    unmatched_detections = detection_indices              # 初始所有的检测都没有匹配
    matches = []
    for level in range(cascade_depth):
        if len(unmatched_detections) == 0:                # No detections left
            break
        # 1 获取(1 + level)次没有被匹配上的轨迹
        track_indices_l = [
            k for k in track_indices
            if tracks[k].time_since_update == 1 + level   
        ]
        if len(track_indices_l) == 0:                     # Nothing to match at this level
            continue
        # 2 计算轨迹特征和检测特征的马氏距离
        matches_l, _, unmatched_detections = \
            min_cost_matching(
                distance_metric, max_distance, tracks, detections,  # max_distance阈值
                track_indices_l, unmatched_detections)
        matches += matches_l
    unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
    return matches, unmatched_tracks, unmatched_detections

2.6 计算轨迹与检测的特征余弦距离

对检测特征和历史的轨迹特征先归一化,再1减去特征的矩阵相乘得到余弦距离

def _cosine_distance(a, b, data_is_normalized=False):
    """Compute pair-wise cosine distance between points in `a` and `b`.

    Parameters
    ----------
    a : array_like
        An NxM matrix of N samples of dimensionality M.
    b : array_like
        An LxM matrix of L samples of dimensionality M.
    data_is_normalized : Optional[bool]
        If True, assumes rows in a and b are unit length vectors.
        Otherwise, a and b are explicitly normalized to lenght 1.

    Returns
    -------
    ndarray
        Returns a matrix of size len(a), len(b) such that element (i, j)
        contains the squared distance between `a[i]` and `b[j]`.

    """
    if not data_is_normalized:
        a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)  # a是轨迹特征归一化
        b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)  # b是检测特征归一化
    return 1. - np.dot(a, b.T)

2.7 用轨迹与检测的马氏距离跟新cost_matrix矩阵

根据公式计算马氏距离

def gate_cost_matrix(
        kf, cost_matrix, tracks, detections, track_indices, detection_indices,
        gated_cost=INFTY_COST, only_position=False):
    """Invalidate infeasible entries in cost matrix based on the state
    distributions obtained by Kalman filtering.

    Parameters
    ----------
    kf : The Kalman filter.
    cost_matrix : ndarray
        The NxM dimensional cost matrix, where N is the number of track indices
        and M is the number of detection indices, such that entry (i, j) is the
        association cost between `tracks[track_indices[i]]` and
        `detections[detection_indices[j]]`.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).
    gated_cost : Optional[float]
        Entries in the cost matrix corresponding to infeasible associations are
        set this value. Defaults to a very large value.
    only_position : Optional[bool]
        If True, only the x, y position of the state distribution is considered
        during gating. Defaults to False.

    Returns
    -------
    ndarray
        Returns the modified cost matrix.

    """
    gating_dim = 2 if only_position else 4
    gating_threshold = kalman_filter.chi2inv95[gating_dim]
    measurements = np.asarray(
        [detections[i].to_xyah() for i in detection_indices])
    for row, track_idx in enumerate(track_indices):
        track = tracks[track_idx]               # gating_distance马氏距离
        gating_distance = kf.gating_distance(   # track.mean, track.covariance是轨迹的均值和协方差。measurements是检测框
            track.mean, track.covariance, measurements, only_position)
        cost_matrix[row, gating_distance > gating_threshold] = gated_cost  # 如果马氏距离大于阈值,则赋值为极大值。
    return cost_matrix
    
    # 马氏距离
    def gating_distance(self, mean, covariance, measurements,
                        only_position=False):
        """Compute gating distance between state distribution and measurements.

        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.

        Parameters
        ----------
        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.

        Returns
        -------
        ndarray
            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and
            `measurements[i]`.

        """
        mean, covariance = self.project(mean, covariance)  # 轨迹特征
        if only_position:                                  # 计算马氏距离用几个值
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)   # 对轨迹的协方差分解成下三角矩阵和,前提covariance正定
        d = measurements - mean                            #(det-track).T *track's covariance.INV *(det-track)
        z = scipy.linalg.solve_triangular(    # 求线性方程组的解    mash = d.T *covariance^{-1} *d = d.T *(L.T*L) *d = (Ld).T*(Ld)
            cholesky_factor, d.T, lower=True, check_finite=False,  # L.T*? = d.T--> ? = ((L.T)^{-1} d.T=(dL^{-1}).T
            overwrite_b=True)
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha