1 安装


1.1 代码下载与安装

1 创建一个全新的虚拟环境

conda create -n yolov8_deepsort python=3.9

2 激活虚拟环境

conda activate yolov8_deepsort

3 在当前地址创建一个文件夹存放将要下载的YOLOv8-DeepSORT-Object-Tracking

mkdir my_yolov8_deepsort 

4 跳转到yolov8_deepsort 文件夹

cd my_yolov8_deepsort 

5 下载代码

git clone https://github.com/MuhammadMoinFaisal/YOLOv8-DeepSORT-Object-Tracking.git

6 安装依赖库

pip install -e .

7 在https://drive.google.com/drive/folders/1kna8eWGrSfzaR6DtNJ8_GchGgPMv3VC8下载文件,将文件夹解压放在ultralytics/yolo/v8/detect目录下

8 在ultralytics/yolo/v8/detect放一个视频,执行以下命令.首次执行以下代码,会自动取下载yolov8l.pt模型

python predict.py model=yolov8l.pt source="traffic.mp4" show=True



1. 2 DeepSort检测流程


  1. 模型初始化
  2. 模型推理
# ultralytics/yolo/v8/detect/predict.py
def predict(cfg):    # cfg:ultralytics/yolo/configs/default.yaml  
    # 1.模型初始化
    cfg.model = cfg.model or "yolov8n.pt"
    cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2)  # check image size
    cfg.source = cfg.source if cfg.source is not None else ROOT / "assets"
    # 2. 模型推理
    predictor = DetectionPredictor(cfg)

1.3 模型初始化流程

  1. 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数。
  2. 初始化跟踪器
    2.1 实例化特征提取器
    2.2 实例化匹配代价矩阵
    2.3 实例化跟踪器
# ultralytics/yolo/v8/detect/predict.py 
def init_tracker():
    global deepsort
    # 1 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数
    cfg_deep = get_config()  
    # 2 初始化跟踪器
    deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
                            max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,
                            nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
                            max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,

# ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/deep_sort.py
class DeepSort(object):
    def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
        self.min_confidence = min_confidence   # 检测物体的最小置信度
        self.nms_max_overlap = nms_max_overlap # NMS时iou的最大值
        # 2.1 实例化特征提取器
        self.extractor = Extractor(model_path, use_cuda=use_cuda)   # 提取检测到的物体的特征,用于计算轨迹与检测框的余弦距离

        max_cosine_distance = max_dist         # 计算距离的最大值
        2.2 实例化匹配代价矩阵
        metric = NearestNeighborDistanceMetric(
            "cosine", max_cosine_distance, nn_budget)   # metric计算匹配代价矩阵。在级联匹配中用欧式距离计算特级特征和检测特征的距离
        2.3 实例化跟踪器
        self.tracker = Tracker(
            metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)

2. 模型推理


  1. 准备模型和数据,对数据进行预处理
    1.1 数据进行预处理
  2. 数据带入模型得到预测结果
  3. 处理预测结果
    3.1 处理预测结果
  4. 对预测结果跟踪

2.1 模型推理代码解析

# ultralytics/yolo/engine/predictor.py
    def __call__(self, source=None, model=None):
        # 1.准备模型和数据,对数据进行预处理
        model = self.model if self.done_setup else self.setup(source, model)
        self.seen, self.windows, self.dt = 0, [], (ops.Profile(), ops.Profile(), ops.Profile())
        self.all_outputs = []
        for batch in self.dataset:
            path, im, im0s, vid_cap, s = batch
            visualize = increment_path(self.save_dir / Path(path).stem, mkdir=True) if self.args.visualize else False
            with self.dt[0]:
                im = self.preprocess(im)
                if len(im.shape) == 3:
                    im = im[None]  # expand for batch dim
            # 2. 数据带入模型得到预测结果
            # Inference
            with self.dt[1]:
                preds = model(im, augment=self.args.augment, visualize=visualize)

            # 3. 处理预测结果
            # postprocess
            with self.dt[2]:
                preds = self.postprocess(preds, im, im0s)

            for i in range(len(im)):
                if self.webcam:
                    path, im0s = path[i], im0s[i]
                p = Path(path)
                # 4. 对预测结果跟踪
                s += self.write_results(i, preds, (p, im, im0s))

                if self.args.show:

                if self.args.save:
                    self.save_preds(vid_cap, i, str(self.save_dir / p.name))

            # Print time (inference-only)
            LOGGER.info(f"{s}{'' if len(preds) else '(no detections), '}{self.dt[1].dt * 1E3:.1f}ms")


        # Print results
        t = tuple(x.t / self.seen * 1E3 for x in self.dt)  # speeds per image
            f'Speed: %.1fms pre-process, %.1fms inference, %.1fms postprocess per image at shape {(1, 3, *self.imgsz)}'
            % t)
        if self.args.save_txt or self.args.save:
            s = f"\n{len(list(self.save_dir.glob('labels/*.txt')))} labels saved to {self.save_dir / 'labels'}" if self.args.save_txt else ''
            LOGGER.info(f"Results saved to {colorstr('bold', self.save_dir)}{s}")

        return self.all_outputs

# 1.1 数据进行预处理 ultralytics/yolo/v8/detect/predict.py
    def preprocess(self, img):
        img = torch.from_numpy(img).to(self.model.device)     # 图片数据转换成tensor格式
        img = img.half() if self.model.fp16 else img.float()  # uint8 to fp16/32  选择半精度还是双精度
        img /= 255  # 0 - 255 to 0.0 - 1.0                    # 图片数据缩放到0~1之间
        return img
# 3.1 处理预测结果 ultralytics/yolo/v8/detect/predict.py
    def postprocess(self, preds, img, orig_img):
        # 1 非极大值抑制
        preds = ops.non_max_suppression(preds,

        for i, pred in enumerate(preds):
            shape = orig_img[i].shape if self.webcam else orig_img.shape
            # 2 把检测到的框映射到原图
            pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round()

        return preds

2.2 对预测结果跟踪代码解析

把检测框的中心点宽高、置信度、类别、原图输入 deepsort.update()中进行跟踪,跟踪代码流程如下:

  1. 提取特征。
  2. 轨迹预测。
  3. 轨迹跟踪。
# ultralytics/yolo/v8/detect/predict.py-->def write_results(self, idx, preds, batch)-->outputs = deepsort.update(xywhs, confss, oids, im0)-->  self.tracker.predict() self.tracker.update(detections)

    def update(self, bbox_xywh, confidences, oids, ori_img):
        self.height, self.width = ori_img.shape[:2]
        # generate detections 
        # 1. 提取特征
        features = self._get_features(bbox_xywh, ori_img)  # 提取检测框对应物体的特征
        bbox_tlwh = self._xywh_to_tlwh(bbox_xywh) # 把框由中心点宽高的格式转换为左上角坐标和宽高的格式
        detections = [Detection(bbox_tlwh[i], conf, features[i],oid) for i, (conf,oid) in enumerate(zip(confidences,oids)) if conf > self.min_confidence]   # 删除资信度小于阈值的检测框,并把检测框的特征添加到detections中

        # run on non-maximum supression
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])

        # update tracker
        # 2. 轨迹预测
        # 3. 轨迹跟踪

        # output bbox identities
        outputs = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
            box = track.to_tlwh()
            x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
            track_id = track.track_id
            track_oid = track.oid
            outputs.append(np.array([x1, y1, x2, y2, track_id, track_oid], dtype=np.int))
        if len(outputs) > 0:
            outputs = np.stack(outputs, axis=0)
        return outputs

2.3 轨迹预测

self.tracker.predict()–>track.predict(self.kf)–>def predict(self, mean, covariance)

  1. 状态转移矩阵。
  2. 根据状态转移矩阵获取下一状态和其协方差。
    def predict(self, mean, covariance):
        """Run Kalman filter prediction step.

        mean : ndarray
            The 8 dimensional mean vector of the object state at the previous
            time step.
        covariance : ndarray
            The 8x8 dimensional covariance matrix of the object state at the
            previous time step.

        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.

        std_pos = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3]]
        std_vel = [
            self._std_weight_velocity * mean[3],
            self._std_weight_velocity * mean[3],
            self._std_weight_velocity * mean[3]]
        # 1. 状态转移矩阵
        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
        # 2. 根据状态转移矩阵获取下一状态和其协方差
        mean = np.dot(self._motion_mat, mean)
        covariance = np.linalg.multi_dot((
            self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

        return mean, covariance

2.4 轨迹跟踪


  1. 轨迹与特征匹配
  2. 更新匹配轨迹、未匹配轨迹和未匹配检测
  3. 更新轨迹的特征

    def update(self, detections):
        """Perform measurement update and track management.

        ----------   bbox_tlwh[i], conf, features[i],oid
        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        # Run matching cascade.  1. 轨迹与特征 级联匹配和IOU匹配
        matches, unmatched_tracks, unmatched_detections = \

        # Update track set. 2. 更新匹配轨迹、未匹配轨迹和未匹配检测
        for track_idx, detection_idx in matches:
                self.kf, detections[detection_idx])                   # 对匹配轨迹更新得到当前最优轨迹
        for track_idx in unmatched_tracks:
            self.tracks[track_idx].mark_missed()                      # 对未匹配轨迹进行标识
        for detection_idx in unmatched_detections:
            self._initiate_track(detections[detection_idx])           # 对新检测框初始化轨迹
        self.tracks = [t for t in self.tracks if not t.is_deleted()]  # 剔除掉删除的轨迹

        # Update distance metric. 3. 更新轨迹的历史特征
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]   # 存放确认状态轨迹的id
        features, targets = [], []  # 分别存放当前所有轨迹的特征和id
        for track in self.tracks:
            if not track.is_confirmed():
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
        self.metric.partial_fit(   #
            np.asarray(features), np.asarray(targets), active_targets)

2.5 轨迹与特征匹配


  1. 把轨迹分为确认状态和非确认状态。
  2. 级联匹配。通过级联匹配得到确认匹配轨迹和检测的id
    matches_a,未匹配轨迹unmatched_tracks_a, 未匹配检测unmatched_detections
  3. IOU匹配。把级联匹配步骤中只有一次未匹配的轨迹并如非确认轨迹用于IOU匹配;保留级联匹配步骤中未匹配的轨迹中连续2帧或2帧以上没有匹配的轨迹。
  4. 更新级联匹配和IOU匹配的结果。
    def _match(self, detections):

        def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])   # 检测框的特征
            targets = np.array([tracks[i].track_id for i in track_indices])     # 轨迹的id
            cost_matrix = self.metric.distance(features, targets)               # 计算轨迹与检测的特征余弦距离
            cost_matrix = linear_assignment.gate_cost_matrix(                   # 用轨迹与检测的马氏距离跟新cost_matrix矩阵
                self.kf, cost_matrix, tracks, dets, track_indices,

            return cost_matrix

        # Split track set into confirmed and unconfirmed tracks.
        # 1 把轨迹分为确认状态和非确认状态
        confirmed_tracks = [
            i for i, t in enumerate(self.tracks) if t.is_confirmed()]
        unconfirmed_tracks = [
            i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

        # Associate confirmed tracks using appearance features. 
        # 2 级联匹配
        matches_a, unmatched_tracks_a, unmatched_detections = \
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)

        # Associate remaining tracks together with unconfirmed tracks using IOU.
        # 3 IOU匹配
        iou_track_candidates = unconfirmed_tracks + [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update == 1]   # 如果级联未匹配轨迹上一帧匹配成功,这一帧匹配失败,则把其添加到不确认轨迹中
        unmatched_tracks_a = [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]   # 跟新unmatched_tracks_a,只保留大于等于连续两帧没有被匹配上的
        matches_b, unmatched_tracks_b, unmatched_detections = \
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)  # IOU匹配
        # 4 更新级联匹配和IOU匹配的结果
        matches = matches_a + matches_b
        unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
        return matches, unmatched_tracks, unmatched_detections

    def _initiate_track(self, detection):
        mean, covariance = self.kf.initiate(detection.to_xyah()) # 根据新检测到的框初始化轨迹的数值和协方差
            mean, covariance, self._next_id, self.n_init, self.max_age,detection.oid,
            detection.feature))               # 添加新的轨迹
        self._next_id += 1

# 2 级联匹配
2.1 获取(1 + level)次没有被匹配上的轨迹
2.2 计算轨迹特征和检测特征的代价矩阵
    2.2.1 计算轨迹与检测的特征余弦距离
    2.2.2 用轨迹与检测的马氏距离跟新cost_matrix矩阵

def matching_cascade(
        distance_metric, max_distance, cascade_depth, tracks, detections,
        track_indices=None, detection_indices=None):
    """Run matching cascade.

    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
    cascade_depth: int
        The cascade depth, should be se to the maximum track age.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : Optional[List[int]]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above). Defaults to all tracks.
    detection_indices : Optional[List[int]]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above). Defaults to all

    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    if track_indices is None:
        track_indices = list(range(len(tracks)))
    if detection_indices is None:
        detection_indices = list(range(len(detections)))

    unmatched_detections = detection_indices              # 初始所有的检测都没有匹配
    matches = []
    for level in range(cascade_depth):
        if len(unmatched_detections) == 0:                # No detections left
        # 1 获取(1 + level)次没有被匹配上的轨迹
        track_indices_l = [
            k for k in track_indices
            if tracks[k].time_since_update == 1 + level   
        if len(track_indices_l) == 0:                     # Nothing to match at this level
        # 2 计算轨迹特征和检测特征的马氏距离
        matches_l, _, unmatched_detections = \
                distance_metric, max_distance, tracks, detections,  # max_distance阈值
                track_indices_l, unmatched_detections)
        matches += matches_l
    unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
    return matches, unmatched_tracks, unmatched_detections

2.6 计算轨迹与检测的特征余弦距离


def _cosine_distance(a, b, data_is_normalized=False):
    """Compute pair-wise cosine distance between points in `a` and `b`.

    a : array_like
        An NxM matrix of N samples of dimensionality M.
    b : array_like
        An LxM matrix of L samples of dimensionality M.
    data_is_normalized : Optional[bool]
        If True, assumes rows in a and b are unit length vectors.
        Otherwise, a and b are explicitly normalized to lenght 1.

        Returns a matrix of size len(a), len(b) such that element (i, j)
        contains the squared distance between `a[i]` and `b[j]`.

    if not data_is_normalized:
        a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)  # a是轨迹特征归一化
        b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)  # b是检测特征归一化
    return 1. - np.dot(a, b.T)

2.7 用轨迹与检测的马氏距离跟新cost_matrix矩阵


def gate_cost_matrix(
        kf, cost_matrix, tracks, detections, track_indices, detection_indices,
        gated_cost=INFTY_COST, only_position=False):
    """Invalidate infeasible entries in cost matrix based on the state
    distributions obtained by Kalman filtering.

    kf : The Kalman filter.
    cost_matrix : ndarray
        The NxM dimensional cost matrix, where N is the number of track indices
        and M is the number of detection indices, such that entry (i, j) is the
        association cost between `tracks[track_indices[i]]` and
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).
    gated_cost : Optional[float]
        Entries in the cost matrix corresponding to infeasible associations are
        set this value. Defaults to a very large value.
    only_position : Optional[bool]
        If True, only the x, y position of the state distribution is considered
        during gating. Defaults to False.

        Returns the modified cost matrix.

    gating_dim = 2 if only_position else 4
    gating_threshold = kalman_filter.chi2inv95[gating_dim]
    measurements = np.asarray(
        [detections[i].to_xyah() for i in detection_indices])
    for row, track_idx in enumerate(track_indices):
        track = tracks[track_idx]               # gating_distance马氏距离
        gating_distance = kf.gating_distance(   # track.mean, track.covariance是轨迹的均值和协方差。measurements是检测框
            track.mean, track.covariance, measurements, only_position)
        cost_matrix[row, gating_distance > gating_threshold] = gated_cost  # 如果马氏距离大于阈值,则赋值为极大值。
    return cost_matrix
    # 马氏距离
    def gating_distance(self, mean, covariance, measurements,
        """Compute gating distance between state distribution and measurements.

        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.

        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.

            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and

        mean, covariance = self.project(mean, covariance)  # 轨迹特征
        if only_position:                                  # 计算马氏距离用几个值
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)   # 对轨迹的协方差分解成下三角矩阵和,前提covariance正定
        d = measurements - mean                            #(det-track).T *track's covariance.INV *(det-track)
        z = scipy.linalg.solve_triangular(    # 求线性方程组的解    mash = d.T *covariance^{-1} *d = d.T *(L.T*L) *d = (Ld).T*(Ld)
            cholesky_factor, d.T, lower=True, check_finite=False,  # L.T*? = d.T--> ? = ((L.T)^{-1} d.T=(dL^{-1}).T
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha