web端rtmp推拉流测试、抽帧识别计数,一键式生成巡检报告

发布于:2025-06-09 ⋅ 阅读:(13) ⋅ 点赞:(0)

        本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示,同时,统计目标数量以及违停数量,生成结果评估,一并发送到前端展示。对于本文任何技术上的空缺,可在博主主页前面博客寻找,有任何问题欢迎私信或评论区讨论!!!

目录

涉及技术栈

基本效果

存在的问题,亟需解决

代码及粗略解释

资源


涉及技术栈:

Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+异步+多进程+flv.js+node.js

基本效果:

web端推拉流测试、抽帧识别计数,一键式生成巡检报告

项目结构(Django):
├── DJI_yolo
│   ├── __init__.py
│   ├── __pycache__
│   ├── asgi.py
│   ├── settings.py
│   ├── templates
│   ├── urls.py
│   └── wsgi.py
├── app01
│   ├── __init__.py
│   ├── __pycache__
│   ├── admin.py
│   ├── apps.py
│   ├── car_best.pt
│   ├── consumers
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   ├── detection_consumer.py
│   │   └── report_consumer.py
│   ├── migrations
│   │   ├── __init__.py
│   │   └── __pycache__
│   ├── models.py
│   ├── pictures
│   ├── routings.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   └── detection_engine.py
│   ├── stream_status.py
│   ├── tests.py
│   ├── utils.py
│   └── views.py
├── manage.py
├── media
│   ├── 2025-06-02
│   │   ├── 20时14分54秒-20时17分03秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频
│   │   ├── 20时26分11秒-20时29分00秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频

前端代码因保密不方便展示,可自行根据后端端口续写,后端Django完整代码在文章末尾,请查收!!!

存在的问题,亟需解决:

1、后端逐帧识别并返回前端,前端显示卡顿,考虑跳帧识别,一秒识别一帧,剩余原始帧直接返回。

2、曾尝试过使用 yolo 中的 model 类中的 track 方法进行跨帧追踪,为每帧每个目标进行编号(id),提取帧中目标特征(速度、目标面积、唯位移等等),进行特征化工程,从而判断相邻两识别帧中的某两目标是否为同一目标。由于添加追踪算法,单帧识别时间过长,单帧识别时间大于拉流帧传输时间,导致进程堵塞,程序崩溃。后续采取措施改进。

3、模型并未做违停检测,考虑重新训练模型,添加违停训练集,重新训练,顺便搭配 yolo12 环境,用 yolo12 进行训练。

4、Django 后端接受到流,并不能直接开始识别,而是先用四五秒时间加载模型,需要优化。

5、执行任务、完成任务到数据存储、前端显示巡检报告,延迟较高,需要优化。

6、为后端添加车辆运动状态算法,若连续识别的两个帧同一目标静止,并且识别为违停,则判断为违停;若非静止但识别违停,不做处理。相对静止判断?无人机在动,无法建立参考系。

7、进行车牌识别。

代码及粗略解释:

detection_consumer.py:

此处代码使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的类进行帧识别和计数,然后判断计数,调用讯飞星火 API 生成 AI 答复,获取关键帧,将所有信息封装成一个字典,并传给前端,以用于违停报告生成。同时,将实时识别帧通过 websocket 的消费者传给前端显示。

import asyncio
import base64
import os
import subprocess
import time
import traceback
from datetime import datetime
from pathlib import Path
from .report_consumer import ReportConsumer
import numpy as np
import cv2
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from app01.services.detection_engine import DetectionEngine, logger
from concurrent.futures import ThreadPoolExecutor
from app01.utils import rename_time_folder  # 导入重命名函数
from app01.stream_status import stream_status, status_lock  # 从新模块导入
import requests

RTMP_URL = "rtmp://127.0.0.1:1935/live/stream"
channel_layer = get_channel_layer()

class DetectionStreamConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.channel_layer.group_add("detection_group", self.channel_name)
        await self.accept()
        print("WebSocket 连接建立成功")

    async def disconnect(self, code):
        await self.channel_layer.group_discard("detection_group", self.channel_name)
        print("WebSocket 连接关闭")

    async def start_detection(self):
        print("开始检测帧流...")
        await self.monitor_stream_status()

    async def monitor_stream_status(self):
        while True:
            if stream_status.is_streaming:
                logger.info("检测到推流,开始处理...")
                await self.start_detection_stream()
            else:
                logger.info("当前无推流,等待中...")
                await asyncio.sleep(1)

    async def start_detection_stream(self, all_dict=None):
        ffmpeg_pull_command = [
            "ffmpeg",
            "-c:v", "h264",
            "-i", RTMP_URL,
            "-f", "rawvideo",
            "-pix_fmt", "bgr24",
            "-s", "640x360",
            "-r", "5",
            "-vcodec", "rawvideo",
            "-an",
            "-flags", "+low_delay",
            "-tune", "zerolatency",
            "-"
        ]

        try:
            process = subprocess.Popen(
                ffmpeg_pull_command,
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
                bufsize=10 * 8
            )
        except Exception as e:
            logger.error(f"FFmpeg 子进程启动失败: {e}")
            return

        if process.stdout is None:
            logger.error("FFmpeg stdout 为空")
            return

        frame_width, frame_height = 640, 360
        frame_size = frame_width * frame_height * 3
        executor = ThreadPoolExecutor(max_workers=4)  # 最多可以同时运行 4 个线程
        engine = DetectionEngine()
        frame_count = 0
        skip_interval = 1

        try:
            while stream_status.is_streaming:
                try:
                    loop = asyncio.get_event_loop()
                    raw_frame = await asyncio.wait_for(
                        loop.run_in_executor(executor, process.stdout.read, frame_size),
                        timeout=8  # 超时检测断流
                    )
                except asyncio.TimeoutError:
                    logger.warning("读取帧超时,触发流结束")
                    with status_lock:
                        stream_status.is_streaming = False
                    break

                if len(raw_frame) != frame_size:
                    continue

                try:
                    frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))
                except Exception as e:
                    logger.warning(f"帧解析失败: {e}")
                    continue

                frame_count += 1
                if frame_count % skip_interval != 0:
                    continue

                result = await loop.run_in_executor(executor, engine.process_frame, frame)
                # 增加空值检查
                if result is None:
                    logger.warning("检测结果为空,跳过处理")
                    continue
                processed_frame, detected_classes = result

                # 更新车辆统计(仅按类别)
                with DetectionEngine.counter_lock:
                    for class_id in detected_classes:
                        if class_id == 0:
                            DetectionEngine.total_count['car'] += 1
                            DetectionEngine.total_count['total'] += 1
                        elif class_id == 1:
                            DetectionEngine.total_count['bus'] += 1
                            DetectionEngine.total_count['total'] += 1
                        elif class_id == 2:
                            DetectionEngine.total_count['truck'] += 1
                            DetectionEngine.total_count['total'] += 1
                        elif class_id == 3:
                            DetectionEngine.total_count['van'] += 1
                            DetectionEngine.total_count['total'] += 1

                _, jpeg = cv2.imencode('.jpg', processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
                if channel_layer:
                    await channel_layer.group_send(
                        "detection_group",
                        {"type": "send_frame", "frame": jpeg.tobytes()}
                    )
        except Exception as e:
            logger.error(f"检测处理错误: {e}")
            logger.error(traceback.format_exc())
        finally:
            logger.warning("流结束,进入处理...")
            if process.returncode is None:
                process.kill()
                logger.warning("FFmpeg 进程已终止")

            # 延迟一小段时间,确保文件操作完全释放
            time.sleep(1)

            stream_status.is_streaming = False
            stream_status.end_datetime = datetime.now()  # 记录结束时间

            logger.warning("正在获取三个关键帧...")
            # 找3个关键帧,必须在重命名前执行
            folder = Path(f"{stream_status.image_path}")
            files = [f for f in folder.iterdir() if f.is_file()]
            if not files:
                return None, None, None
            # 按修改时间排序
            files.sort(key=lambda x: x.stat().st_mtime)
            first = files[0]
            last = files[-1]
            middle = files[len(files) // 2]

            # 转换为Base64
            stream_status.image_li = [
                self.image_to_base64(str(f))  # 传入字符串路径,避免Path对象序列化问题
                for f in [first, middle, last]
            ]

            logger.warning("正在重命名文件夹...")
            # 先复制 time_path,因为后续可能被修改
            time_path = stream_status.time_path
            start_datetime = stream_status.start_datetime
            end_datetime = stream_status.end_datetime
            save_path = ""
            # 调用重命名函数(使用完整 datetime)
            if time_path and start_datetime:

                path = rename_time_folder(
                        time_path,
                        start_datetime,  # 传入开始时间
                        end_datetime  # 传入结束时间
                )
                save_path = path
                logger.warning(
                        f"文件夹已重命名为 {start_datetime.strftime('%H时%M分%S秒')}-{end_datetime.strftime('%H时%M分%S秒')}")

            try:
                logger.warning("正在更新数据...")
                # 更新识别结果
                stats = self.correct_overcounted_stats(DetectionEngine.total_count)
                print("stats:", stats)
                stream_status.total_stats = stats
                print("stream_status.total_stats:", stream_status.total_stats)
                self.save_vehicle_stats(stats, save_path)

            except Exception as e:
                logger.error(f"保存统计信息失败: {e}")
            print("正在获取AI答复...")
            self.AI_response()

            # 定义总数据,并序列化
            all_dict = {
                'datetime': stream_status.start_time.strftime("%Y-%m-%d %H:%M:%S.%f"),  # 序列化,转为字符串
                'image_li': [bs for bs in stream_status.image_li],  # 关键帧转二进制
                'detect_car': [_ for _ in stream_status.total_stats.values()],
                'Al_response': stream_status.AI_talk,
            }

            print("all_dict:", all_dict)
            print("正在发送信息...")
            await ReportConsumer.send_report_data(all_dict)

            executor.shutdown(wait=False)
            DetectionEngine.reset_stats()
            stream_status.reset_all_dict()

    def save_vehicle_stats(self, stats, save_path=None):
        # 如果未提供路径,使用默认的 time_path
        if not save_path:
            save_path = stream_status.time_path

        if not save_path:
            logger.warning("无法保存统计信息:路径为空")
            return

        stats_path = os.path.join(save_path, 'vehicle_stats.txt')
        try:
            with open(stats_path, 'w', encoding='utf-8-sig') as f:
                f.write("车辆统计结果\n")
                f.write("-------------------------\n")
                for k, v in stats.items():
                    f.write(f"{k}: {v}\n")
                f.write("-------------------------\n")
            logger.info(f"统计信息已保存至 {stats_path}")
        except Exception as e:
            logger.error(f"写入统计信息失败: {e}")


    def AI_response(self):
        """获取AI回复"""
        url = "https://spark-api-open.xf-yun.com/v1/chat/completions"
        headers = {
            "Authorization": "Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv",
            "Content-Type": "application/json",
        }
        content = (
            f'我现在要做一个无人机城市交通智慧巡检报告,现在巡检结果是:巡检时间:{stream_status.start_time}'
            f'小汽车有{stream_status.total_stats["car"]}辆,面包车有{stream_status.total_stats["van"]}辆,'
            f'公交车有{stream_status.total_stats["bus"]}辆,卡车有{stream_status.total_stats["truck"]}辆,'
            f'识别到的违停车辆共有{stream_status.total_stats["illegally_car"]}辆。'
            f'帮我生成一段结论,要求不要有废话,也不要写具体那四个类别识别到的车辆数,字数200字,语言精炼严谨'
        )

        question = {
            "role": "user",
            "content": content
        }

        data = {
            "max_tokens": 4096,
            "top_k": 4,
            "messages": [question],
            "temperature": 0.5,
            "model": "4.0Ultra",
            "stream": False,
        }
        response = requests.post(url, headers=headers, json=data)
        response.raise_for_status()
        result = response.json()

        # 提取模型输出的内容
        reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
        print(reply)
        stream_status.AI_talk = reply

    # 读取图片文件并转换为 Base64
    @staticmethod  # 静态方法不应接收self参数
    def image_to_base64(image_path):
        with open(image_path, "rb") as img_file:
            base64_str = base64.b64encode(img_file.read()).decode("utf-8")
            return f"data:image/jpeg;base64,{base64_str}"

    @staticmethod  # 去掉self第一个参数
    def correct_overcounted_stats(stats, avg_appearance=6):
        """
        简单修正严重重复统计的结果
        """
        corrected = {}
        for cls in ['car', 'van', 'bus', 'truck']:
            if cls in stats:
                corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))

        # 可选:重新计算 total
        corrected['total'] = sum(corrected.values())
        corrected['illegally_car'] = int(corrected['total'] / 100)
        print(corrected)

        return corrected

    async def send_frame(self, event):
        frame_bytes = event["frame"]
        await self.send(bytes_data=frame_bytes)

stream_status.py:

本文件用于 Django 项目全局状态管理,相当于 vue3 中的 pinia , is_streaming  是前端推本地视频、后端上传 SRS 后,变为 True ,后端因为 is_streaming  = True 而开始拉流,当超过 6 秒未拉到流,判断流结束,is_streaming  = False。还定义了全局状态锁、状态重置方法、时间获取方法、文件夹名获取方法、状态实例。全局状态锁是为了防止多线程修改同一个共享变量,只有with status_lock:with 块内,只有当前线程持有锁时才能运行代码,其他线程必须等待,相当于在此处把锁”锁“起来了。

import threading

# 全局状态锁和状态对象
status_lock = threading.Lock()
class StreamStatus:

    def __init__(self):
        self.is_streaming = False  # 是否正在推流/检测
        self.start_time = None      # 推流开始时间(datetime对象)
        self.end_time = None        # 推流结束时间(datetime对象)
        self.time_path = None       # 当前时间文件夹路径(如 Media/2025-06-01/12点)
        self.total_stats = {        # 车辆统计
            "car": 0,
            "van": 0,
            "bus": 0,
            "truck": 0,
            "total": 0,
            "illegally_car": 0,
        }
        self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
        self.image_path = None
        self.image_li = []
        self.AI_talk = None
        self.all_dict = {}

    @property
    def date_folder(self):
        """获取日期文件夹名称(如 2025-06-01)"""
        if self.start_time:
            return self.start_time.strftime("%Y-%m-%d")
        return None

    @property
    def start_hour(self):
        """获取开始小时(如 12)"""
        if self.start_time:
            return self.start_time.hour
        return None

    def reset(self):
        """重置状态(用于新任务开始)"""
        self.is_streaming = False
        self.start_time = None
        self.end_time = None
        self.time_path = None
        self.total_stats = {k: 0 for k in self.total_stats}
        self.last_frame_time = None

    def reset_all_dict(self):
        self.last_frame_time = None  # 最后收到有效帧的时间(用于断流检测)
        self.image_path = None
        self.image_li = []
        self.AI_talk = None
        self.all_dict = {}
        self.all_dict = {}
# 全局唯一的状态实例
stream_status = StreamStatus()

detection_engine.py:

此处是 detection_consumer.py 使用的服务代码块,用于为其提供帧识别服务,consumer 拉到的每一个流都会调用该类中的 process_frame 类方法,并同时进行计数和关键帧保存。

import threading
import time
import cv2
from ultralytics import YOLO
import logging
import os
from app01.stream_status import stream_status, status_lock  # 从新模块导入

logger = logging.getLogger(__name__)

MODEL_PATH = "F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt"
model = YOLO(MODEL_PATH)

# 关键帧存储位置
pictures_dir = "pictures"
if not os.path.exists(pictures_dir):
    os.makedirs(pictures_dir)


class DetectionEngine:
    frame_counter = 1
    counter_lock = threading.Lock()  # 线程锁,保证计数器安全

    # 新增:全局计数器和已统计 ID
    total_count = {
        'car': 0,
        'van': 0,
        'bus': 0,
        'truck': 0,
        "total": 0,
        "illegally_car": 0
    }
    # 下次任务重置
    @classmethod
    def reset_stats(cls):
        with cls.counter_lock:
            for k in cls.total_count:
                cls.total_count[k] = 0

    # 下次任务重置
    @classmethod
    def update_stats(cls, vehicle_type):
        with cls.counter_lock:
            if vehicle_type in cls.total_count:
                cls.total_count[vehicle_type] += 1
                cls.total_count["total"] += 1

    def __init__(self):
        self.model = model
        # 调整检测参数,减少计算量
        self.detect_params = {
            'conf': 0.3,       # 提高置信度阈值,减少无效检测
            'iou': 0.5,        # 调整 NMS 阈值
            'imgsz': [384, 640],  # 输入尺寸,降低分辨率
            'classes': [0, 1, 2, 3],  # 仅检测车辆类别(2: car, 5: bus, 7: truck)
            'device': 0,  # 使用 GPU
            'half': True,  # 使用 FP16 精度
            'show': False  # 关闭实时显示,减少开销
        }

    def process_frame(self, frame):
        """处理单帧图像:YOLO推理 + 保存结果到关键帧图片文件夹"""
        results = model(frame)
        detected_objects = []
        annotated_frame = frame.copy()  # 复制原始帧以防止修改原图
        # 防御性检查:确保 results 非空且包含有效数据
        if not results or len(results) == 0:
            logger.warning("未获取到检测结果,跳过处理")
            return frame, []

        boxes = results[0].boxes
        if boxes is None or len(boxes) == 0:
            logger.warning("检测结果为空,跳过处理")
            return frame, []
        try:
            # 假设类别信息存储在 cls 属性中
            classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, 'cls') and boxes.cls is not None else []

            for class_id in classes:
                detected_objects.append(class_id)

            annotated_frame = results[0].plot()  # 绘制检测框
        except Exception as e:
            logger.error(f"解析检测数据失败: {e}")
            return annotated_frame, []
        with status_lock:
            car_count = sum(1 for cls in detected_objects if cls == 0)
            if car_count > 18:
                try:
                    timestamp = int(time.time())
                    save_path = os.path.join(stream_status.image_path, f"keyframe_{timestamp}.jpg")
                    cv2.imwrite(save_path, annotated_frame)
                    logger.info(f"关键帧保存成功: {save_path}")
                except Exception as e:
                    logger.error(f"保存关键帧失败: {e}")

        return annotated_frame, detected_objects

repoet_consumer.py:

用于将前面 detection_consumer.py 中的字典数据发送给前端,首先会将数据缓存到后端,等到前端消费者建立成功,立马发送出去。

import asyncio

from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
import json

# 缓存队列:用于保存尚未发送的数据
queue = []
# 记录是否有消费者连接
has_consumer = False

class ReportConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        global has_consumer
        # 加入指定组
        await self.channel_layer.group_add("report_group", self.channel_name)
        await self.accept()
        print("巡检报告WebSocket 已连接")
        # 设置有消费者连接的标志
        has_consumer = True
        # 尝试发送缓存数据
        await flush_queue(self.channel_layer)

    async def disconnect(self, close_code):
        global has_consumer
        # 移除组成员
        await self.channel_layer.group_discard("report_group", self.channel_name)
        print("巡检报告WebSocket 断开连接")
        # 重置消费者连接标志
        has_consumer = False

    async def receive(self, text_data=None, bytes_data=None):
        pass

    async def send_report(self, event):
        """处理 send_report 类型的消息并发送给客户端"""
        data = event['data']
        print("【发送到客户端】", data)

        # 转换为JSON字符串发送
        try:
            # 处理可能包含的非JSON可序列化对象
            # 这里假设 image_li 包含 Path 对象,需要转换为字符串
            if 'image_li' in data:
                data['image_li'] = [str(path) for path in data['image_li']]

            await self.send(text_data=json.dumps(data))
        except Exception as e:
            print(f"【发送到客户端失败】{e}")

    @classmethod
    async def send_report_data(cls, data):
        """供其他模块调用的方法"""
        global queue, has_consumer
        # 不再检查是否有人连接,直接发送或缓存,生产模式用Redis
        channel_layer = get_channel_layer()

        # 如果没有消费者连接,将数据加入队列
        if not has_consumer:
            queue.append(data)
            print(f"【数据已缓存】等待消费者连接: {data}")
        else:
            # 有消费者连接,直接发送
            await send_report(channel_layer, data)


# ========== 全局函数 ==========
# 定期检查并发送缓存数据的任务
async def queue_check_task():
    channel_layer = get_channel_layer()
    while True:
        await asyncio.sleep(1)  # 每秒检查一次
        if queue and has_consumer:
            await flush_queue(channel_layer)
# 在应用启动时启动队列检查任务
async def start_queue_check():
    asyncio.create_task(queue_check_task())
async def send_report(channel_layer, data):
    try:
        await channel_layer.group_send(
            "report_group",
            {
                "type": "send_report",
                "data": data,
            },
        )
        print("【发送成功】", data)
    except Exception as e:
        print(f"【发送失败】{e}")


async def flush_queue(channel_layer):
    global queue
    if not queue:
        return

    items = list(queue)
    queue.clear()
    for data in items:
        await send_report(channel_layer, data)

routings.py:

用于定义 websocket 的后端接收路径,并连接消费者

from django.urls import re_path
from app01.consumers import detection_consumer, report_consumer

websocket_urlpatterns = [
    re_path(r'^ws/detection/$', detection_consumer.DetectionStreamConsumer.as_asgi()),
    re_path(r'^ws/report/$', report_consumer.ReportConsumer.as_asgi()),
]

urls.py:

用于接收前端发送的 http 请求,并与 view.py 中的视图函数建立联系

"""
URL configuration for DJI_yolo project.

The `urlpatterns` list routes URLs to views. For more information please see:
    https://docs.djangoproject.com/en/5.1/topics/http/urls/
Examples:
Function views
    1. Add an import:  from my_app import views
    2. Add a URL to urlpatterns:  path('', views.home, name='home')
Class-based views
    1. Add an import:  from other_app.views import Home
    2. Add a URL to urlpatterns:  path('', Home.as_view(), name='home')
Including another URLconf
    1. Import the include() function: from django.urls import include, path
    2. Add a URL to urlpatqterns:  path('blog/', include('blog.urls'))
"""
from django.conf.urls.static import static
from django.contrib import admin
from django.urls import path

from DJI_yolo import settings
from app01 import views
urlpatterns = [
    path("admin/", admin.site.urls),
    path('csrf/', views.get_csrf_token, name='get_csrf_token'),
    # 推流测试
    path('push_stream/', views.push_stream, name='push_stream'),

    # 媒体库管理
    path('api4/date-folders/', views.get_date_folders, name='date-folders'),
    path('api4/time-folders/<str:date>/', views.get_time_folders, name='time-folders'),
    path('api4/files/<str:date>/<str:time_range>/<str:category>/count/', views.get_file_count, name='file-count'),
    path('api4/files/<str:date>/<str:time_range>/<str:category>/', views.get_files, name='files'),
    path('api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/', views.delete_file,
         name='delete-file'),
    path('delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>', views.delete_file,
         name='delete-file'),  # 添加一个用于删除文件的 URL 路径

] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

views.py:

用于处理前端请求,这里报告推流、媒体管理系统、原始视频存储逻辑等等。

# views.py
from datetime import datetime
from pathlib import Path
import urllib.parse
from django.middleware.csrf import get_token
import os
import subprocess
import threading
from django.http import JsonResponse
from django.conf import settings
import asyncio
from .consumers.detection_consumer import DetectionStreamConsumer
from .utils import create_date_folder, create_time_folder
from .stream_status import stream_status

MEDIA_ROOT = Path(settings.MEDIA_ROOT)

def get_csrf_token(request):
    token = get_token(request)
    print(token)
    return JsonResponse({'csrfToken': token})

def push_stream(request):
    print("前端返回本地视频,准备推流")
    video_file = request.FILES.get('video')
    if not video_file:
        return JsonResponse({'error': '未上传视频文件'}, status=400)

    # ------------------- 创建精确时间文件夹 -------------------
    start_datetime = datetime.now()  # 记录精确到秒的开始时间

    date_str = start_datetime.strftime('%Y-%m-%d')
    date_path = create_date_folder(date_str)
    stream_status.start_time = datetime.now()
    # 创建带时分秒的文件夹(如 15:30:45)
    time_path, time_folder = create_time_folder(date_path, start_datetime)

    # 创建三类子文件夹
    original_path = os.path.join(time_path, '原视频')
    recognized_path = os.path.join(time_path, '识别视频')
    image_path = os.path.join(time_path, '关键帧图片')
    stream_status.image_path = image_path  # 保存到全局状态
    for folder in [original_path, recognized_path, image_path]:
        os.makedirs(folder, exist_ok=True)

    # ------------------- 保存上传视频 -------------------
    original_video_name = f"original_{start_datetime.strftime('%H%M%S')}.mp4"
    original_video_path = os.path.join(original_path, original_video_name)
    with open(original_video_path, 'wb') as f:
        for chunk in video_file.chunks():
            f.write(chunk)

    # ------------------- 推流配置 -------------------
    push_url = "rtmp://127.0.0.1:1935/live/stream"
    recognized_video_name = f"recognized_{start_datetime.strftime('%H%M%S')}.mp4"
    recognized_video_path = os.path.join(recognized_path, recognized_video_name)


    command = [
        "ffmpeg",
        "-re",
        "-i", original_video_path,
        "-c:v", "libx264",
        "-preset", "ultrafast",
        "-tune", "zerolatency",
        "-g", "50",
        "-r", "30",
        "-an",
        "-f", "flv",
        push_url,
        "-f", "mp4",
        recognized_video_path
    ]

    try:
        process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        # 保存完整时间状态
        stream_status.is_streaming = True
        stream_status.start_datetime = start_datetime  # 保存完整 datetime
        stream_status.time_path = time_path
        stream_status.recognized_path = recognized_path
        stream_status.image_path = image_path

        detection_thread = threading.Thread(
            target=run_detection_in_background,
            daemon=True
        )
        detection_thread.start()

        return JsonResponse({
            'status': 'success',
            'date_folder': date_str,
            'time_folder': time_folder  # 格式如 15:30:45
        })
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

def run_detection_in_background():
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(DetectionStreamConsumer().start_detection())
    except Exception as e:
        print(f"后台检测线程异常: {e}")

def get_date_folders(request):
    page = int(request.GET.get('page', 1))
    page_size = int(request.GET.get('page_size', 10))
    MEDIA_ROOT_PATH = Path(r'F:\FullStack\Django\DJI_yolo\media')

    date_folders = []
    for name in os.listdir(MEDIA_ROOT_PATH):
        folder_path = MEDIA_ROOT_PATH / name
        if folder_path.is_dir() and len(name.split('-')) == 3:
            date_folders.append({'date': name})

    date_folders.sort(key=lambda x: x['date'], reverse=True)
    start = (page - 1) * page_size
    end = start + page_size
    return JsonResponse({
        'data': date_folders[start:end],
        'total': len(date_folders)
    })

def get_time_folders(request, date):
    date_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date
    if not date_path.is_dir():
        return JsonResponse({'error': '日期文件夹不存在'}, status=404)

    page = int(request.GET.get('page', 1))
    page_size = int(request.GET.get('page_size', 10))
    time_folders = []

    for name in os.listdir(date_path):
        time_path = date_path / name
        if time_path.is_dir():
            original_count = len(os.listdir(time_path / '原视频')) if (time_path / '原视频').is_dir() else 0
            recognized_count = len(os.listdir(time_path / '识别视频')) if (time_path / '识别视频').is_dir() else 0
            image_count = len(os.listdir(time_path / '关键帧图片')) if (time_path / '关键帧图片').is_dir() else 0

            time_folders.append({
                'time_range': name,  # 直接使用时间区间名称
                'original_count': original_count,
                'recognized_count': recognized_count,
                'image_count': image_count
            })

    time_folders.sort(key=lambda x: x['time_range'], reverse=True)
    start = (page - 1) * page_size
    end = start + page_size
    return JsonResponse({
        'data': time_folders[start:end],
        'total': len(time_folders)
    })

def get_file_count(request, date, time_range, category):
    category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / time_range / category
    if not category_path.is_dir():
        return JsonResponse({'count': 0})

    count = len(os.listdir(category_path))
    return JsonResponse({'count': count})

def get_files(request, date, time_range, category):
    try:
        decoded_time_range = urllib.parse.unquote(time_range)
        category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / category

        if not category_path.is_dir():
            return JsonResponse({'error': '文件类别不存在'}, status=404)

        page = int(request.GET.get('page', 1))
        page_size = int(request.GET.get('page_size', 10))
        files = []

        for filename in os.listdir(category_path):
            file_path = category_path / filename
            if file_path.is_file():
                encoded_time_range = urllib.parse.quote(decoded_time_range)
                files.append({
                    'name': filename,
                    'url': f'http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}'
                })

        start = (page - 1) * page_size
        end = start + page_size
        return JsonResponse({
            'data': files[start:end],
            'total': len(files),
            'status': 'success'
        })
    except Exception as e:
        print(f"文件列表错误: {str(e)}")
        return JsonResponse({
            'error': '服务器内部错误',
            'detail': str(e)
        }, status=500)

def delete_file(request, date, time_range, category, filename):
    try:
        decoded_time_range = urllib.parse.unquote(time_range)
        file_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / category / filename
        if not file_path.exists():
            return JsonResponse({'error': '文件不存在'}, status=404)

        os.remove(file_path)
        return JsonResponse({'status': 'success'})
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

感谢您的观看!

资源如下:

https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501