使用 YOLOv8 的 results.xyxy
结构,下面是一个完整的 MQTT 推送脚本,用于把识别到的目标(比如突涌水、水渍、障碍物等)发送到 Frigate 的 MQTT 接口。
✅ 前提假设
- YOLOv8 推理代码已经运行并生成
results.xyxy
。 - 每一行是
[x1, y1, x2, y2, conf, class_id]
。 - 类别标签由你自己定义,如
model.names[class_id]
。 - MQTT 服务运行在本地(或局域网),默认端口 1883。
- Frigate 配置正确,监听
frigate/events
主题。
📜 完整 Python 脚本(YOLOv8 + MQTT 发事件)
import paho.mqtt.publish as publish
import uuid
import json
from ultralytics import YOLO
import cv2
# ---------- 你可以自定义的变量 ----------
MQTT_HOST = '127.0.0.1' # Mosquitto服务器地址
MQTT_PORT = 1883 # MQTT端口
FRIGATE_TOPIC = 'frigate/events'
CAMERA_NAME = 'usb_cam' # Frigate中配置的摄像头名
DETECTION_CONFIDENCE_THRESHOLD = 0.5
# ----------------------------------------
# 加载你训练好的YOLOv8模型
model = YOLO("best.pt") # 改成你的模型路径
# 读取图片/视频帧(你可以用cv2.VideoCapture循环处理流)
frame = cv2.imread("test.jpg") # 或者循环处理帧
# 推理
results = model(frame)
# 遍历检测结果(默认只有一张图)
for r in results:
boxes = r.boxes.xyxy.cpu().numpy() # shape: (n, 4)
scores = r.boxes.conf.cpu().numpy() # shape: (n,)
classes = r.boxes.cls.cpu().numpy().astype(int)
for i in range(len(boxes)):
conf = scores[i]
if conf < DETECTION_CONFIDENCE_THRESHOLD:
continue # 忽略低置信度结果
box = boxes[i]
class_id = classes[i]
label = model.names[class_id] # 类别名
payload = {
"type": "new",
"after": {
"id": str(uuid.uuid4()),
"camera": CAMERA_NAME,
"label": label,
"top_score": float(conf),
"box": {
"x_min": int(box[0]),
"y_min": int(box[1]),
"x_max": int(box[2]),
"y_max": int(box[3])
}
}
}
# 发布到 MQTT
publish.single(
topic=FRIGATE_TOPIC,
payload=json.dumps(payload),
hostname=MQTT_HOST,
port=MQTT_PORT
)
print(f"已发送检测事件: {label} ({conf:.2f})")
🧪 示例运行流程
- 替换你的模型路径和摄像头名称(与你 Frigate 中的一致);
- 放一张测试图片
test.jpg
; - 运行脚本,Frigate 界面就能看到事件了。
📌 可选增强
目标 | 实现方式 |
---|---|
处理实时摄像头流 | cv2.VideoCapture() 读取帧循环处理 |
支持多帧批量推送 | 放在循环中,设置时间间隔 |
显示检测图像 | 用 cv2.rectangle() 可视化框 |
🚨 注意事项
- MQTT 消息中
camera
字段必须是 Frigate 配置文件中的某个摄像头名。 box
坐标建议使用整数。- YOLO 推理较慢时注意控制帧率,不要每帧都发(可 1~2 秒一次)。
- 如果你在 Windows 下网络受限,确保 Frigate、MQTT、这个脚本三者之间是能连通的(用
127.0.0.1
通常没问题)。