本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示,同时,统计目标数量以及违停数量,生成结果评估,一并发送到前端展示。对于本文任何技术上的空缺,可在博主主页前面博客寻找,有任何问题欢迎私信或评论区讨论!!!
目录
涉及技术栈:
Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+异步+多进程+flv.js+node.js
基本效果:
web端推拉流测试、抽帧识别计数,一键式生成巡检报告
项目结构(Django):
├── DJI_yolo
│ ├── __init__.py
│ ├── __pycache__
│ ├── asgi.py
│ ├── settings.py
│ ├── templates
│ ├── urls.py
│ └── wsgi.py
├── app01
│ ├── __init__.py
│ ├── __pycache__
│ ├── admin.py
│ ├── apps.py
│ ├── car_best.pt
│ ├── consumers
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ ├── detection_consumer.py
│ │ └── report_consumer.py
│ ├── migrations
│ │ ├── __init__.py
│ │ └── __pycache__
│ ├── models.py
│ ├── pictures
│ ├── routings.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ └── detection_engine.py
│ ├── stream_status.py
│ ├── tests.py
│ ├── utils.py
│ └── views.py
├── manage.py
├── media
│ ├── 2025-06-02
│ │ ├── 20时14分54秒-20时17分03秒
│ │ │ ├── 关键帧图片
│ │ │ ├── 原视频
│ │ │ └── 识别视频
│ │ ├── 20时26分11秒-20时29分00秒
│ │ │ ├── 关键帧图片
│ │ │ ├── 原视频
│ │ │ └── 识别视频
前端代码因保密不方便展示,可自行根据后端端口续写,后端Django完整代码在文章末尾,请查收!!!
存在的问题,亟需解决:
1、后端逐帧识别并返回前端,前端显示卡顿,考虑跳帧识别,一秒识别一帧,剩余原始帧直接返回。
2、曾尝试过使用 yolo 中的 model 类中的 track 方法进行跨帧追踪,为每帧每个目标进行编号(id),提取帧中目标特征(速度、目标面积、唯位移等等),进行特征化工程,从而判断相邻两识别帧中的某两目标是否为同一目标。由于添加追踪算法,单帧识别时间过长,单帧识别时间大于拉流帧传输时间,导致进程堵塞,程序崩溃。后续采取措施改进。
3、模型并未做违停检测,考虑重新训练模型,添加违停训练集,重新训练,顺便搭配 yolo12 环境,用 yolo12 进行训练。
4、Django 后端接受到流,并不能直接开始识别,而是先用四五秒时间加载模型,需要优化。
5、执行任务、完成任务到数据存储、前端显示巡检报告,延迟较高,需要优化。
6、为后端添加车辆运动状态算法,若连续识别的两个帧同一目标静止,并且识别为违停,则判断为违停;若非静止但识别违停,不做处理。相对静止判断?无人机在动,无法建立参考系。
7、进行车牌识别。
代码及粗略解释:
detection_consumer.py:
此处代码使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的类进行帧识别和计数,然后判断计数,调用讯飞星火 API 生成 AI 答复,获取关键帧,将所有信息封装成一个字典,并传给前端,以用于违停报告生成。同时,将实时识别帧通过 websocket 的消费者传给前端显示。
import asyncio
import base64
import os
import subprocess
import time
import traceback
from datetime import datetime
from pathlib import Path
from .report_consumer import ReportConsumer
import numpy as np
import cv2
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from app01.services.detection_engine import DetectionEngine, logger
from concurrent.futures import ThreadPoolExecutor
from app01.utils import rename_time_folder # 导入重命名函数
from app01.stream_status import stream_status, status_lock # 从新模块导入
import requests
RTMP_URL = "rtmp://127.0.0.1:1935/live/stream"
channel_layer = get_channel_layer()
class DetectionStreamConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.channel_layer.group_add("detection_group", self.channel_name)
await self.accept()
print("WebSocket 连接建立成功")
async def disconnect(self, code):
await self.channel_layer.group_discard("detection_group", self.channel_name)
print("WebSocket 连接关闭")
async def start_detection(self):
print("开始检测帧流...")
await self.monitor_stream_status()
async def monitor_stream_status(self):
while True:
if stream_status.is_streaming:
logger.info("检测到推流,开始处理...")
await self.start_detection_stream()
else:
logger.info("当前无推流,等待中...")
await asyncio.sleep(1)
async def start_detection_stream(self, all_dict=None):
ffmpeg_pull_command = [
"ffmpeg",
"-c:v", "h264",
"-i", RTMP_URL,
"-f", "rawvideo",
"-pix_fmt", "bgr24",
"-s", "640x360",
"-r", "5",
"-vcodec", "rawvideo",
"-an",
"-flags", "+low_delay",
"-tune", "zerolatency",
"-"
]
try:
process = subprocess.Popen(
ffmpeg_pull_command,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
bufsize=10 * 8
)
except Exception as e:
logger.error(f"FFmpeg 子进程启动失败: {e}")
return
if process.stdout is None:
logger.error("FFmpeg stdout 为空")
return
frame_width, frame_height = 640, 360
frame_size = frame_width * frame_height * 3
executor = ThreadPoolExecutor(max_workers=4) # 最多可以同时运行 4 个线程
engine = DetectionEngine()
frame_count = 0
skip_interval = 1
try:
while stream_status.is_streaming:
try:
loop = asyncio.get_event_loop()
raw_frame = await asyncio.wait_for(
loop.run_in_executor(executor, process.stdout.read, frame_size),
timeout=8 # 超时检测断流
)
except asyncio.TimeoutError:
logger.warning("读取帧超时,触发流结束")
with status_lock:
stream_status.is_streaming = False
break
if len(raw_frame) != frame_size:
continue
try:
frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))
except Exception as e:
logger.warning(f"帧解析失败: {e}")
continue
frame_count += 1
if frame_count % skip_interval != 0:
continue
result = await loop.run_in_executor(executor, engine.process_frame, frame)
# 增加空值检查
if result is None:
logger.warning("检测结果为空,跳过处理")
continue
processed_frame, detected_classes = result
# 更新车辆统计(仅按类别)
with DetectionEngine.counter_lock:
for class_id in detected_classes:
if class_id == 0:
DetectionEngine.total_count['car'] += 1
DetectionEngine.total_count['total'] += 1
elif class_id == 1:
DetectionEngine.total_count['bus'] += 1
DetectionEngine.total_count['total'] += 1
elif class_id == 2:
DetectionEngine.total_count['truck'] += 1
DetectionEngine.total_count['total'] += 1
elif class_id == 3:
DetectionEngine.total_count['van'] += 1
DetectionEngine.total_count['total'] += 1
_, jpeg = cv2.imencode('.jpg', processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
if channel_layer:
await channel_layer.group_send(
"detection_group",
{"type": "send_frame", "frame": jpeg.tobytes()}
)
except Exception as e:
logger.error(f"检测处理错误: {e}")
logger.error(traceback.format_exc())
finally:
logger.warning("流结束,进入处理...")
if process.returncode is None:
process.kill()
logger.warning("FFmpeg 进程已终止")
# 延迟一小段时间,确保文件操作完全释放
time.sleep(1)
stream_status.is_streaming = False
stream_status.end_datetime = datetime.now() # 记录结束时间
logger.warning("正在获取三个关键帧...")
# 找3个关键帧,必须在重命名前执行
folder = Path(f"{stream_status.image_path}")
files = [f for f in folder.iterdir() if f.is_file()]
if not files:
return None, None, None
# 按修改时间排序
files.sort(key=lambda x: x.stat().st_mtime)
first = files[0]
last = files[-1]
middle = files[len(files) // 2]
# 转换为Base64
stream_status.image_li = [
self.image_to_base64(str(f)) # 传入字符串路径,避免Path对象序列化问题
for f in [first, middle, last]
]
logger.warning("正在重命名文件夹...")
# 先复制 time_path,因为后续可能被修改
time_path = stream_status.time_path
start_datetime = stream_status.start_datetime
end_datetime = stream_status.end_datetime
save_path = ""
# 调用重命名函数(使用完整 datetime)
if time_path and start_datetime:
path = rename_time_folder(
time_path,
start_datetime, # 传入开始时间
end_datetime # 传入结束时间
)
save_path = path
logger.warning(
f"文件夹已重命名为 {start_datetime.strftime('%H时%M分%S秒')}-{end_datetime.strftime('%H时%M分%S秒')}")
try:
logger.warning("正在更新数据...")
# 更新识别结果
stats = self.correct_overcounted_stats(DetectionEngine.total_count)
print("stats:", stats)
stream_status.total_stats = stats
print("stream_status.total_stats:", stream_status.total_stats)
self.save_vehicle_stats(stats, save_path)
except Exception as e:
logger.error(f"保存统计信息失败: {e}")
print("正在获取AI答复...")
self.AI_response()
# 定义总数据,并序列化
all_dict = {
'datetime': stream_status.start_time.strftime("%Y-%m-%d %H:%M:%S.%f"), # 序列化,转为字符串
'image_li': [bs for bs in stream_status.image_li], # 关键帧转二进制
'detect_car': [_ for _ in stream_status.total_stats.values()],
'Al_response': stream_status.AI_talk,
}
print("all_dict:", all_dict)
print("正在发送信息...")
await ReportConsumer.send_report_data(all_dict)
executor.shutdown(wait=False)
DetectionEngine.reset_stats()
stream_status.reset_all_dict()
def save_vehicle_stats(self, stats, save_path=None):
# 如果未提供路径,使用默认的 time_path
if not save_path:
save_path = stream_status.time_path
if not save_path:
logger.warning("无法保存统计信息:路径为空")
return
stats_path = os.path.join(save_path, 'vehicle_stats.txt')
try:
with open(stats_path, 'w', encoding='utf-8-sig') as f:
f.write("车辆统计结果\n")
f.write("-------------------------\n")
for k, v in stats.items():
f.write(f"{k}: {v}\n")
f.write("-------------------------\n")
logger.info(f"统计信息已保存至 {stats_path}")
except Exception as e:
logger.error(f"写入统计信息失败: {e}")
def AI_response(self):
"""获取AI回复"""
url = "https://spark-api-open.xf-yun.com/v1/chat/completions"
headers = {
"Authorization": "Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv",
"Content-Type": "application/json",
}
content = (
f'我现在要做一个无人机城市交通智慧巡检报告,现在巡检结果是:巡检时间:{stream_status.start_time}'
f'小汽车有{stream_status.total_stats["car"]}辆,面包车有{stream_status.total_stats["van"]}辆,'
f'公交车有{stream_status.total_stats["bus"]}辆,卡车有{stream_status.total_stats["truck"]}辆,'
f'识别到的违停车辆共有{stream_status.total_stats["illegally_car"]}辆。'
f'帮我生成一段结论,要求不要有废话,也不要写具体那四个类别识别到的车辆数,字数200字,语言精炼严谨'
)
question = {
"role": "user",
"content": content
}
data = {
"max_tokens": 4096,
"top_k": 4,
"messages": [question],
"temperature": 0.5,
"model": "4.0Ultra",
"stream": False,
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
result = response.json()
# 提取模型输出的内容
reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
print(reply)
stream_status.AI_talk = reply
# 读取图片文件并转换为 Base64
@staticmethod # 静态方法不应接收self参数
def image_to_base64(image_path):
with open(image_path, "rb") as img_file:
base64_str = base64.b64encode(img_file.read()).decode("utf-8")
return f"data:image/jpeg;base64,{base64_str}"
@staticmethod # 去掉self第一个参数
def correct_overcounted_stats(stats, avg_appearance=6):
"""
简单修正严重重复统计的结果
"""
corrected = {}
for cls in ['car', 'van', 'bus', 'truck']:
if cls in stats:
corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))
# 可选:重新计算 total
corrected['total'] = sum(corrected.values())
corrected['illegally_car'] = int(corrected['total'] / 100)
print(corrected)
return corrected
async def send_frame(self, event):
frame_bytes = event["frame"]
await self.send(bytes_data=frame_bytes)
stream_status.py:
本文件用于 Django 项目全局状态管理,相当于 vue3 中的 pinia , is_streaming 是前端推本地视频、后端上传 SRS 后,变为 True ,后端因为 is_streaming = True 而开始拉流,当超过 6 秒未拉到流,判断流结束,is_streaming = False。还定义了全局状态锁、状态重置方法、时间获取方法、文件夹名获取方法、状态实例。全局状态锁是为了防止多线程修改同一个共享变量,只有with status_lock:在 with
块内,只有当前线程持有锁时才能运行代码,其他线程必须等待,相当于在此处把锁”锁“起来了。
import threading
# 全局状态锁和状态对象
status_lock = threading.Lock()
class StreamStatus:
def __init__(self):
self.is_streaming = False # 是否正在推流/检测
self.start_time = None # 推流开始时间(datetime对象)
self.end_time = None # 推流结束时间(datetime对象)
self.time_path = None # 当前时间文件夹路径(如 Media/2025-06-01/12点)
self.total_stats = { # 车辆统计
"car": 0,
"van": 0,
"bus": 0,
"truck": 0,
"total": 0,
"illegally_car": 0,
}
self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
self.image_path = None
self.image_li = []
self.AI_talk = None
self.all_dict = {}
@property
def date_folder(self):
"""获取日期文件夹名称(如 2025-06-01)"""
if self.start_time:
return self.start_time.strftime("%Y-%m-%d")
return None
@property
def start_hour(self):
"""获取开始小时(如 12)"""
if self.start_time:
return self.start_time.hour
return None
def reset(self):
"""重置状态(用于新任务开始)"""
self.is_streaming = False
self.start_time = None
self.end_time = None
self.time_path = None
self.total_stats = {k: 0 for k in self.total_stats}
self.last_frame_time = None
def reset_all_dict(self):
self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
self.image_path = None
self.image_li = []
self.AI_talk = None
self.all_dict = {}
self.all_dict = {}
# 全局唯一的状态实例
stream_status = StreamStatus()
detection_engine.py:
此处是 detection_consumer.py 使用的服务代码块,用于为其提供帧识别服务,consumer 拉到的每一个流都会调用该类中的 process_frame 类方法,并同时进行计数和关键帧保存。
import threading
import time
import cv2
from ultralytics import YOLO
import logging
import os
from app01.stream_status import stream_status, status_lock # 从新模块导入
logger = logging.getLogger(__name__)
MODEL_PATH = "F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt"
model = YOLO(MODEL_PATH)
# 关键帧存储位置
pictures_dir = "pictures"
if not os.path.exists(pictures_dir):
os.makedirs(pictures_dir)
class DetectionEngine:
frame_counter = 1
counter_lock = threading.Lock() # 线程锁,保证计数器安全
# 新增:全局计数器和已统计 ID
total_count = {
'car': 0,
'van': 0,
'bus': 0,
'truck': 0,
"total": 0,
"illegally_car": 0
}
# 下次任务重置
@classmethod
def reset_stats(cls):
with cls.counter_lock:
for k in cls.total_count:
cls.total_count[k] = 0
# 下次任务重置
@classmethod
def update_stats(cls, vehicle_type):
with cls.counter_lock:
if vehicle_type in cls.total_count:
cls.total_count[vehicle_type] += 1
cls.total_count["total"] += 1
def __init__(self):
self.model = model
# 调整检测参数,减少计算量
self.detect_params = {
'conf': 0.3, # 提高置信度阈值,减少无效检测
'iou': 0.5, # 调整 NMS 阈值
'imgsz': [384, 640], # 输入尺寸,降低分辨率
'classes': [0, 1, 2, 3], # 仅检测车辆类别(2: car, 5: bus, 7: truck)
'device': 0, # 使用 GPU
'half': True, # 使用 FP16 精度
'show': False # 关闭实时显示,减少开销
}
def process_frame(self, frame):
"""处理单帧图像:YOLO推理 + 保存结果到关键帧图片文件夹"""
results = model(frame)
detected_objects = []
annotated_frame = frame.copy() # 复制原始帧以防止修改原图
# 防御性检查:确保 results 非空且包含有效数据
if not results or len(results) == 0:
logger.warning("未获取到检测结果,跳过处理")
return frame, []
boxes = results[0].boxes
if boxes is None or len(boxes) == 0:
logger.warning("检测结果为空,跳过处理")
return frame, []
try:
# 假设类别信息存储在 cls 属性中
classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, 'cls') and boxes.cls is not None else []
for class_id in classes:
detected_objects.append(class_id)
annotated_frame = results[0].plot() # 绘制检测框
except Exception as e:
logger.error(f"解析检测数据失败: {e}")
return annotated_frame, []
with status_lock:
car_count = sum(1 for cls in detected_objects if cls == 0)
if car_count > 18:
try:
timestamp = int(time.time())
save_path = os.path.join(stream_status.image_path, f"keyframe_{timestamp}.jpg")
cv2.imwrite(save_path, annotated_frame)
logger.info(f"关键帧保存成功: {save_path}")
except Exception as e:
logger.error(f"保存关键帧失败: {e}")
return annotated_frame, detected_objects
repoet_consumer.py:
用于将前面 detection_consumer.py 中的字典数据发送给前端,首先会将数据缓存到后端,等到前端消费者建立成功,立马发送出去。
import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
import json
# 缓存队列:用于保存尚未发送的数据
queue = []
# 记录是否有消费者连接
has_consumer = False
class ReportConsumer(AsyncWebsocketConsumer):
async def connect(self):
global has_consumer
# 加入指定组
await self.channel_layer.group_add("report_group", self.channel_name)
await self.accept()
print("巡检报告WebSocket 已连接")
# 设置有消费者连接的标志
has_consumer = True
# 尝试发送缓存数据
await flush_queue(self.channel_layer)
async def disconnect(self, close_code):
global has_consumer
# 移除组成员
await self.channel_layer.group_discard("report_group", self.channel_name)
print("巡检报告WebSocket 断开连接")
# 重置消费者连接标志
has_consumer = False
async def receive(self, text_data=None, bytes_data=None):
pass
async def send_report(self, event):
"""处理 send_report 类型的消息并发送给客户端"""
data = event['data']
print("【发送到客户端】", data)
# 转换为JSON字符串发送
try:
# 处理可能包含的非JSON可序列化对象
# 这里假设 image_li 包含 Path 对象,需要转换为字符串
if 'image_li' in data:
data['image_li'] = [str(path) for path in data['image_li']]
await self.send(text_data=json.dumps(data))
except Exception as e:
print(f"【发送到客户端失败】{e}")
@classmethod
async def send_report_data(cls, data):
"""供其他模块调用的方法"""
global queue, has_consumer
# 不再检查是否有人连接,直接发送或缓存,生产模式用Redis
channel_layer = get_channel_layer()
# 如果没有消费者连接,将数据加入队列
if not has_consumer:
queue.append(data)
print(f"【数据已缓存】等待消费者连接: {data}")
else:
# 有消费者连接,直接发送
await send_report(channel_layer, data)
# ========== 全局函数 ==========
# 定期检查并发送缓存数据的任务
async def queue_check_task():
channel_layer = get_channel_layer()
while True:
await asyncio.sleep(1) # 每秒检查一次
if queue and has_consumer:
await flush_queue(channel_layer)
# 在应用启动时启动队列检查任务
async def start_queue_check():
asyncio.create_task(queue_check_task())
async def send_report(channel_layer, data):
try:
await channel_layer.group_send(
"report_group",
{
"type": "send_report",
"data": data,
},
)
print("【发送成功】", data)
except Exception as e:
print(f"【发送失败】{e}")
async def flush_queue(channel_layer):
global queue
if not queue:
return
items = list(queue)
queue.clear()
for data in items:
await send_report(channel_layer, data)
routings.py:
用于定义 websocket 的后端接收路径,并连接消费者
from django.urls import re_path
from app01.consumers import detection_consumer, report_consumer
websocket_urlpatterns = [
re_path(r'^ws/detection/$', detection_consumer.DetectionStreamConsumer.as_asgi()),
re_path(r'^ws/report/$', report_consumer.ReportConsumer.as_asgi()),
]
urls.py:
用于接收前端发送的 http 请求,并与 view.py 中的视图函数建立联系
"""
URL configuration for DJI_yolo project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/5.1/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatqterns: path('blog/', include('blog.urls'))
"""
from django.conf.urls.static import static
from django.contrib import admin
from django.urls import path
from DJI_yolo import settings
from app01 import views
urlpatterns = [
path("admin/", admin.site.urls),
path('csrf/', views.get_csrf_token, name='get_csrf_token'),
# 推流测试
path('push_stream/', views.push_stream, name='push_stream'),
# 媒体库管理
path('api4/date-folders/', views.get_date_folders, name='date-folders'),
path('api4/time-folders/<str:date>/', views.get_time_folders, name='time-folders'),
path('api4/files/<str:date>/<str:time_range>/<str:category>/count/', views.get_file_count, name='file-count'),
path('api4/files/<str:date>/<str:time_range>/<str:category>/', views.get_files, name='files'),
path('api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/', views.delete_file,
name='delete-file'),
path('delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>', views.delete_file,
name='delete-file'), # 添加一个用于删除文件的 URL 路径
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
views.py:
用于处理前端请求,这里报告推流、媒体管理系统、原始视频存储逻辑等等。
# views.py
from datetime import datetime
from pathlib import Path
import urllib.parse
from django.middleware.csrf import get_token
import os
import subprocess
import threading
from django.http import JsonResponse
from django.conf import settings
import asyncio
from .consumers.detection_consumer import DetectionStreamConsumer
from .utils import create_date_folder, create_time_folder
from .stream_status import stream_status
MEDIA_ROOT = Path(settings.MEDIA_ROOT)
def get_csrf_token(request):
token = get_token(request)
print(token)
return JsonResponse({'csrfToken': token})
def push_stream(request):
print("前端返回本地视频,准备推流")
video_file = request.FILES.get('video')
if not video_file:
return JsonResponse({'error': '未上传视频文件'}, status=400)
# ------------------- 创建精确时间文件夹 -------------------
start_datetime = datetime.now() # 记录精确到秒的开始时间
date_str = start_datetime.strftime('%Y-%m-%d')
date_path = create_date_folder(date_str)
stream_status.start_time = datetime.now()
# 创建带时分秒的文件夹(如 15:30:45)
time_path, time_folder = create_time_folder(date_path, start_datetime)
# 创建三类子文件夹
original_path = os.path.join(time_path, '原视频')
recognized_path = os.path.join(time_path, '识别视频')
image_path = os.path.join(time_path, '关键帧图片')
stream_status.image_path = image_path # 保存到全局状态
for folder in [original_path, recognized_path, image_path]:
os.makedirs(folder, exist_ok=True)
# ------------------- 保存上传视频 -------------------
original_video_name = f"original_{start_datetime.strftime('%H%M%S')}.mp4"
original_video_path = os.path.join(original_path, original_video_name)
with open(original_video_path, 'wb') as f:
for chunk in video_file.chunks():
f.write(chunk)
# ------------------- 推流配置 -------------------
push_url = "rtmp://127.0.0.1:1935/live/stream"
recognized_video_name = f"recognized_{start_datetime.strftime('%H%M%S')}.mp4"
recognized_video_path = os.path.join(recognized_path, recognized_video_name)
command = [
"ffmpeg",
"-re",
"-i", original_video_path,
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-g", "50",
"-r", "30",
"-an",
"-f", "flv",
push_url,
"-f", "mp4",
recognized_video_path
]
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 保存完整时间状态
stream_status.is_streaming = True
stream_status.start_datetime = start_datetime # 保存完整 datetime
stream_status.time_path = time_path
stream_status.recognized_path = recognized_path
stream_status.image_path = image_path
detection_thread = threading.Thread(
target=run_detection_in_background,
daemon=True
)
detection_thread.start()
return JsonResponse({
'status': 'success',
'date_folder': date_str,
'time_folder': time_folder # 格式如 15:30:45
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
def run_detection_in_background():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(DetectionStreamConsumer().start_detection())
except Exception as e:
print(f"后台检测线程异常: {e}")
def get_date_folders(request):
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 10))
MEDIA_ROOT_PATH = Path(r'F:\FullStack\Django\DJI_yolo\media')
date_folders = []
for name in os.listdir(MEDIA_ROOT_PATH):
folder_path = MEDIA_ROOT_PATH / name
if folder_path.is_dir() and len(name.split('-')) == 3:
date_folders.append({'date': name})
date_folders.sort(key=lambda x: x['date'], reverse=True)
start = (page - 1) * page_size
end = start + page_size
return JsonResponse({
'data': date_folders[start:end],
'total': len(date_folders)
})
def get_time_folders(request, date):
date_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date
if not date_path.is_dir():
return JsonResponse({'error': '日期文件夹不存在'}, status=404)
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 10))
time_folders = []
for name in os.listdir(date_path):
time_path = date_path / name
if time_path.is_dir():
original_count = len(os.listdir(time_path / '原视频')) if (time_path / '原视频').is_dir() else 0
recognized_count = len(os.listdir(time_path / '识别视频')) if (time_path / '识别视频').is_dir() else 0
image_count = len(os.listdir(time_path / '关键帧图片')) if (time_path / '关键帧图片').is_dir() else 0
time_folders.append({
'time_range': name, # 直接使用时间区间名称
'original_count': original_count,
'recognized_count': recognized_count,
'image_count': image_count
})
time_folders.sort(key=lambda x: x['time_range'], reverse=True)
start = (page - 1) * page_size
end = start + page_size
return JsonResponse({
'data': time_folders[start:end],
'total': len(time_folders)
})
def get_file_count(request, date, time_range, category):
category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / time_range / category
if not category_path.is_dir():
return JsonResponse({'count': 0})
count = len(os.listdir(category_path))
return JsonResponse({'count': count})
def get_files(request, date, time_range, category):
try:
decoded_time_range = urllib.parse.unquote(time_range)
category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / category
if not category_path.is_dir():
return JsonResponse({'error': '文件类别不存在'}, status=404)
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 10))
files = []
for filename in os.listdir(category_path):
file_path = category_path / filename
if file_path.is_file():
encoded_time_range = urllib.parse.quote(decoded_time_range)
files.append({
'name': filename,
'url': f'http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}'
})
start = (page - 1) * page_size
end = start + page_size
return JsonResponse({
'data': files[start:end],
'total': len(files),
'status': 'success'
})
except Exception as e:
print(f"文件列表错误: {str(e)}")
return JsonResponse({
'error': '服务器内部错误',
'detail': str(e)
}, status=500)
def delete_file(request, date, time_range, category, filename):
try:
decoded_time_range = urllib.parse.unquote(time_range)
file_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / category / filename
if not file_path.exists():
return JsonResponse({'error': '文件不存在'}, status=404)
os.remove(file_path)
return JsonResponse({'status': 'success'})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
感谢您的观看!