基于OpenCV的深度学习人脸识别系统开发全攻略(DNN+FaceNet核心技术选型)

发布于:2025-07-15 ⋅ 阅读:(19) ⋅ 点赞:(0)

核心技术选型表

技术组件 版本/型号 用途
OpenCV DNN 4.5.5+ 人脸检测
FaceNet (facenet-pytorch) 0.5.0+ 人脸特征提取
MiniConda 最新版 Python环境管理
PyTorch 1.8.0+ FaceNet运行基础
OpenVINO 2021.4+ 模型加速(可选)
SSD Caffe模型 res10_300x300 高精度人脸检测

一、环境准备与项目搭建

1.1 MiniConda环境配置

首先我们需要安装MiniConda来管理我们的Python环境:

# 下载MiniConda安装包(Windows版)
# 官网地址:https://docs.conda.io/en/latest/miniconda.html

# 创建专用环境
conda create -n opencv_face python=3.8
conda activate opencv_face

# 安装基础依赖
pip install numpy pandas opencv-python-headless pillow facenet-pytorch openvino

注意opencv-python-headless是不带GUI功能的OpenCV版本,适合服务器部署。如果需要显示图像,可以安装完整版opencv-python

1.2 项目结构说明

我们的attendance_system目录组织如下:

attendance_system/
├── face_detection/
│   └── models/
│       └── opencv/
│           ├── deploy.prototxt.txt
│           └── res10_300x300_ssd_iter_140000.caffemodel
│   └── detector.py
│   └── recognizer.py
├── data/                  # 存储人脸数据
├── configs/               # 配置文件
├── attendance_report.py   # 考勤报表生成
├── database.py            # 数据库操作
├── main.py                # 主程序入口
├── register_employee.py   # 员工注册
├── reports/               # 出勤记录文件
└── test_face.jpg          # 测试图片

二、核心技术原理

2.1 OpenCV DNN人脸检测

我们使用OpenCV的DNN模块加载Caffe模型进行人脸检测:

人脸检测模型

这个SSD-based模型在300x300输入分辨率下表现优异,准确率高且速度较快。

2.2 FaceNet人脸特征提取

FaceNet是由Google研究团队提出的人脸识别系统,它能将人脸图像映射到128维的特征空间,在这个空间中,相同人的人脸距离近,不同人的人脸距离远。

我们使用facenet-pytorch库提供的预训练模型,它基于InceptionResnetV1架构。

FaceNet模型:

pip install facenet-pytorch

验证安装

python -c "from facenet_pytorch import InceptionResnetV1; print('安装成功!')"
  • 如果显示 安装成功! 说明一切正常

  • 如果报错,请检查网络或尝试镜像源:

    pip install facenet-pytorch -i https://pypi.tuna.tsinghua.edu.cn/simple
    

三、代码实现详解

3.1 人脸检测模块

首先在attendance_system\face_detection目录下创建detector.py

# D:\demo\attendance_system\face_detection\detector.py

import cv2
import numpy as np
from configs.config import MODEL_PATHS, FACE_DETECTION


class FaceDetector:
    def __init__(self):
        # 加载OpenCV的DNN人脸检测器
        self.net = cv2.dnn.readNetFromCaffe(
            MODEL_PATHS["opencv_face_detector"],
            MODEL_PATHS["opencv_face_weights"]
        )
        self.confidence_threshold = FACE_DETECTION["confidence_threshold"]
        self.padding = FACE_DETECTION["padding"]

    def detect_faces(self, image):
        (h, w) = image.shape[:2]
        # 构建blob并输入网络
        blob = cv2.dnn.blobFromImage(
            cv2.resize(image, (300, 300)), 1.0, (300, 300),
            (104.0, 177.0, 123.0), swapRB=False, crop=False
        )
        self.net.setInput(blob)
        detections = self.net.forward()

        faces = []
        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]

            if confidence > self.confidence_threshold:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")

                # 扩展人脸区域
                startX = max(0, startX - self.padding)
                startY = max(0, startY - self.padding)
                endX = min(w, endX + self.padding)
                endY = min(h, endY + self.padding)

                faces.append((startX, startY, endX, endY))

        return faces

3.2 人脸特征提取模块

attendance_system\face_detection下创建recognizer.py

# D:\demo\attendance_system\face_detection\recognizer.py

import torch
import numpy as np
from facenet_pytorch import InceptionResnetV1
from PIL import Image
import cv2
from configs.config import MODEL_PATHS, FACE_RECOGNITION


class FaceRecognizer:
    def __init__(self):
        # 加载预训练的FaceNet模型
        self.resnet = InceptionResnetV1(pretrained='vggface2').eval()
        self.threshold = FACE_RECOGNITION["threshold"]
        self.image_size = FACE_RECOGNITION["image_size"]

        # 尝试使用OpenVINO加速
        try:
            from openvino.runtime import Core
            core = Core()
            # 转换为OpenVINO格式
            ov_model = torch.export(self.resnet, torch.randn(1, 3, self.image_size, self.image_size))
            compiled_model = core.compile_model(ov_model, 'GPU')  # 使用GPU加速
            self.resnet = compiled_model
            print("使用OpenVINO GPU加速")
        except Exception as e:
            print(f"OpenVINO加速失败,使用原始模型: {e}")

    def get_embedding(self, face_image):
        # 转换图像为模型输入格式
        face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
        face = Image.fromarray(face)
        face = face.resize((self.image_size, self.image_size))
        face = np.array(face).astype(np.float32)
        face = (face - 127.5) / 128.0  # FaceNet的标准化
        face = torch.from_numpy(face).permute(2, 0, 1).unsqueeze(0)

        # 获取特征向量
        with torch.no_grad():
            embedding = self.resnet(face)

        return embedding.numpy().flatten()

    def compare_faces(self, embedding1, embedding2):
        # 计算余弦相似度
        dot = np.dot(embedding1, embedding2)
        norm = np.linalg.norm(embedding1) * np.linalg.norm(embedding2)
        similarity = dot / norm
        return similarity > self.threshold

3.3 员工注册功能

register_employee.py的主要内容:

import cv2
import numpy as np
import time
from face_detection.detector import FaceDetector
from face_detection.recognizer import FaceRecognizer
from database import EmployeeDatabase
import winsound  # Windows平台提示音使用

def register_employee():
    detector = FaceDetector()
    recognizer = FaceRecognizer()
    db = EmployeeDatabase()

    # 获取员工信息
    employee_id = input("请输入员工ID: ")
    name = input("请输入员工姓名: ")
    department = input("请输入部门: ")
    position = input("请输入职位: ")

    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("无法打开摄像头")
        return

    print("\n========== 员工注册指引 ==========")
    print("1. 请保持正对摄像头")
    print("2. 确保光线充足,不要背光")
    print("3. 保持面部无遮挡(眼镜/口罩需暂时取下)")
    print("4. 系统将自动捕捉5张不同角度的人脸图像")
    print("=================================\n")

    captured_faces = []
    last_capture_time = 0
    CHECK_INTERVAL = 0.5
    CAPTURE_COUNT = 5

    guidance = [
        "请正对摄像头",
        "请稍微向左转头",
        "请稍微向右转头",
        "请微微抬头",
        "请微微低头"
    ]

    while len(captured_faces) < CAPTURE_COUNT:
        ret, frame = cap.read()
        if not ret:
            print("错误: 无法获取视频帧")
            break

        current_time = time.time()
        status_text = [
            f"进度: {len(captured_faces)}/{CAPTURE_COUNT}",
            "提示: " + guidance[min(len(captured_faces), 4)]
        ]

        faces = detector.detect_faces(frame)

        if len(faces) == 1:
            (startX, startY, endX, endY) = faces[0]
            face = frame[startY:endY, startX:endX]

            cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)

            if current_time - last_capture_time > CHECK_INTERVAL:
                captured_faces.append(face)
                last_capture_time = current_time
                print(f"✅ 已捕捉人脸 {len(captured_faces)}/{CAPTURE_COUNT} - {status_text[1]}")
                winsound.Beep(1000, 200)  # 播放提示音

        for i, text in enumerate(status_text):
            cv2.putText(frame, text, (10, 30 + i * 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

        cv2.imshow("员工注册", frame)
        if cv2.waitKey(1) & 0xFF == 27:  # ESC 键退出
            break

    cap.release()
    cv2.destroyAllWindows()

    if len(captured_faces) >= 3:
        embeddings = []
        for face in captured_faces:
            embedding = recognizer.get_embedding(face)
            embeddings.append(embedding)

        avg_embedding = np.mean(embeddings, axis=0)

        if db.add_employee(name, employee_id, department, position, avg_embedding):
            print(f"🎉 员工 {name} 注册成功!")
        else:
            print("⚠️ 注册失败,可能员工ID已存在")
    else:
        print("⚠️ 捕捉的人脸数量不足,注册失败")

    db.close()


if __name__ == "__main__":
    register_employee()

3.4 出勤记录功能

attendance_report.py的主要内容:

import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
from configs.config import DATABASE


def check_dependencies():
    """检查并安装必要依赖"""
    try:
        import openpyxl, pandas
    except ImportError:
        import sys, subprocess
        subprocess.check_call([sys.executable, "-m", "pip", "install", "openpyxl", "pandas"])


def generate_report(days=7, output_dir=None):
    """
    生成考勤报表
    :param days: 统计最近多少天的数据
    :param output_dir: 自定义输出目录
    """
    check_dependencies()

    # 路径处理
    base_dir = Path(__file__).parent
    output_dir = Path(output_dir) if output_dir else base_dir / "reports"
    output_dir.mkdir(parents=True, exist_ok=True)

    # 日期计算
    end_date = datetime.now()
    start_date = end_date - timedelta(days=days)
    filename = f"attendance_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}.xlsx"
    report_path = output_dir / filename

    # 数据库操作
    try:
        conn = sqlite3.connect(DATABASE["employees"])
        query = '''
        SELECT e.name, e.employee_id, e.department, 
               a.check_time,
               CASE WHEN time(a.check_time) > '09:30:00' THEN '迟到'
                    WHEN time(a.check_time) < '08:30:00' THEN '早到'
                    ELSE '正常' END as status
        FROM attendance a
        JOIN employees e ON a.employee_id = e.employee_id
        WHERE date(a.check_time) BETWEEN ? AND ?
        ORDER BY a.check_time DESC
        '''
        df = pd.read_sql_query(query, conn, params=(start_date.date(), end_date.date()))

        if not df.empty:
            # 转换日期时间格式
            df['check_time'] = pd.to_datetime(df['check_time'])
            df['date'] = df['check_time'].dt.date
            df['weekday'] = df['check_time'].dt.day_name()

            # 创建透视表
            pivot = df.pivot_table(index=['name', 'employee_id'],
                                   columns='date',
                                   values='status',
                                   aggfunc='first')

            # 保存文件
            with pd.ExcelWriter(report_path, engine='openpyxl') as writer:
                df.to_excel(writer, sheet_name='原始数据', index=False)
                pivot.to_excel(writer, sheet_name='考勤汇总')

            print(f"报表已生成: {report_path.resolve()}")
            return report_path
        else:
            print("警告: 没有查询到考勤记录")
            return None

    except Exception as e:
        print(f"生成报表失败: {str(e)}")
        raise
    finally:
        if 'conn' in locals():
            conn.close()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description='生成考勤报表')
    parser.add_argument('--days', type=int, default=7, help='统计天数')
    parser.add_argument('--output', help='自定义输出目录')
    args = parser.parse_args()

    generate_report(days=args.days, output_dir=args.output)

3.5 数据库模块

database.py的主要内容:

# D:\demo\attendance_system\database.py

import sqlite3
import numpy as np
import os
from configs.config import DATABASE


class EmployeeDatabase:
    def __init__(self):
        # 确保数据目录存在
        os.makedirs(os.path.dirname(DATABASE["employees"]), exist_ok=True)

        # 初始化数据库连接
        self.conn = sqlite3.connect(DATABASE["employees"])
        self.cursor = self.conn.cursor()

        # 创建员工表
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS employees (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            employee_id TEXT UNIQUE NOT NULL,
            department TEXT,
            position TEXT,
            register_date TEXT DEFAULT CURRENT_TIMESTAMP
        )
        ''')

        # 创建打卡记录表
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS attendance (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            employee_id TEXT NOT NULL,
            check_time TEXT DEFAULT CURRENT_TIMESTAMP,
            status TEXT DEFAULT 'present',
            FOREIGN KEY (employee_id) REFERENCES employees (employee_id)
        )
        ''')

        self.conn.commit()

        # 加载人脸特征数据
        self.load_face_data()

    def load_face_data(self):
        # 加载保存的人脸特征和姓名
        if os.path.exists(DATABASE["embeddings"]):
            self.embeddings = np.load(DATABASE["embeddings"])
            self.names = np.load(DATABASE["names"])
        else:
            self.embeddings = np.array([])
            self.names = np.array([])

    def save_face_data(self):
        # 保存人脸特征和姓名
        np.save(DATABASE["embeddings"], self.embeddings)
        np.save(DATABASE["names"], self.names)

    def add_employee(self, name, employee_id, department, position, embedding):
        # 添加新员工
        try:
            self.cursor.execute('''
            INSERT INTO employees (name, employee_id, department, position)
            VALUES (?, ?, ?, ?)
            ''', (name, employee_id, department, position))

            # 更新人脸特征数据
            if len(self.embeddings) == 0:
                self.embeddings = np.array([embedding])
            else:
                self.embeddings = np.vstack((self.embeddings, embedding))

            self.names = np.append(self.names, employee_id)
            self.save_face_data()
            self.conn.commit()
            return True
        except sqlite3.IntegrityError:
            return False

    def record_attendance(self, employee_id, status='present'):
        # 记录考勤
        self.cursor.execute('''
        INSERT INTO attendance (employee_id, status)
        VALUES (?, ?)
        ''', (employee_id, status))
        self.conn.commit()

    def get_employee_info(self, employee_id):
        # 获取员工信息
        self.cursor.execute('''
        SELECT name, department, position FROM employees WHERE employee_id = ?
        ''', (employee_id,))
        return self.cursor.fetchone()

    def close(self):
        # 关闭数据库连接
        self.conn.close()

3.6 考勤识别主程序

main.py的主要内容:

# D:\demo\attendance_system\main.py

import cv2
import numpy as np
from face_detection.detector import FaceDetector
from face_detection.recognizer import FaceRecognizer
from database import EmployeeDatabase
from configs.config import DATABASE
import os
import time
import datetime


def main():
    # 初始化组件
    detector = FaceDetector()
    recognizer = FaceRecognizer()
    db = EmployeeDatabase()

    # 创建数据目录
    os.makedirs(os.path.dirname(DATABASE["employees"]), exist_ok=True)

    # 初始化摄像头
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("无法打开摄像头")
        return

    print("人脸识别考勤系统已启动,按'q'键退出")

    last_recognition_time = 0
    recognized_employees = set()

    while True:
        ret, frame = cap.read()
        if not ret:
            print("无法获取视频帧")
            break

        # 检测人脸
        faces = detector.detect_faces(frame)

        # 在帧上绘制人脸框
        for (startX, startY, endX, endY) in faces:
            cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)

            # 提取人脸区域
            face = frame[startY:endY, startX:endX]

            # 每隔1秒尝试识别一次
            current_time = time.time()
            if current_time - last_recognition_time > 1.0:
                try:
                    # 获取人脸特征
                    embedding = recognizer.get_embedding(face)

                    # 与数据库中的特征比较
                    if len(db.embeddings) > 0:
                        similarities = np.array([
                            recognizer.compare_faces(embedding, db_emb)
                            for db_emb in db.embeddings
                        ])

                        if np.any(similarities):
                            matched_idx = np.argmax(similarities)
                            employee_id = db.names[matched_idx]

                            # 如果之前未识别过,则记录考勤
                            if employee_id not in recognized_employees:
                                name, department, position = db.get_employee_info(employee_id)
                                current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                                print(f"{current_time_str} - 识别到员工: {name}, 部门: {department}, 职位: {position}")

                                # 记录考勤
                                db.record_attendance(employee_id)
                                recognized_employees.add(employee_id)

                                # 在图像上显示员工信息
                                cv2.putText(frame, f"ID: {employee_id}", (startX, startY - 10),
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                                cv2.putText(frame, f"Name: {name}", (startX, startY - 30),
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

                    last_recognition_time = current_time
                except Exception as e:
                    print(f"人脸识别错误: {e}")

        # 显示结果
        cv2.imshow("Face Recognition Attendance System", frame)

        # 按'q'键退出
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # 清理资源
    cap.release()
    cv2.destroyAllWindows()
    db.close()


if __name__ == "__main__":
    main()

四、性能优化技巧

4.1 使用OpenVINO加速

我们可以将模型转换为OpenVINO格式以获得更好的性能:

from openvino.inference_engine import IECore

class OpenVINOFaceDetector:
    def __init__(self, model_xml, model_bin, device='CPU'):
        self.ie = IECore()
        self.net = self.ie.read_network(model=model_xml, weights=model_bin)
        self.exec_net = self.ie.load_network(network=self.net, device_name=device)
        self.input_blob = next(iter(self.net.input_info))
        self.out_blob = next(iter(self.net.outputs))
        
    def detect_faces(self, image):
        # 与之前类似的预处理
        blob = cv2.dnn.blobFromImage(...)
        
        # 使用OpenVINO推理
        res = self.exec_net.infer(inputs={self.input_blob: blob})
        detections = res[self.out_blob]
        
        # 后处理相同
        ...

4.2 多线程处理

对于实时视频流,可以使用多线程来提高性能:

from threading import Thread
import queue

class VideoStream:
    def __init__(self, src=0):
        self.stream = cv2.VideoCapture(src)
        self.stopped = False
        self.frames = queue.Queue(maxsize=128)
        
    def start(self):
        Thread(target=self.update, args=()).start()
        return self
        
    def update(self):
        while not self.stopped:
            if not self.frames.full():
                ret, frame = self.stream.read()
                if not ret:
                    self.stop()
                    return
                self.frames.put(frame)
                
    def read(self):
        return self.frames.get()
        
    def stop(self):
        self.stopped = True

五、部署运行

5.1 启动系统

conda activate opencv_face

在这里插入图片描述

5.2 初始化数据库

(opencv_face) D:\demo\attendance_system>python database.py init

5.3 注册新员工

(opencv_face) D:\demo\attendance_system>python register_employee.py
Q:退出系统

在这里插入图片描述

5.4 调用主程序

(opencv_face) D:\demo\attendance_system>python main.py

在这里插入图片描述

5.5 生成考勤报表

(opencv_face) D:\demo\attendance_system>python attendance_report.py
报表已生成: D:\demo\attendance_system\reports\attendance_20250707_to_20250714.xlsx

在这里插入图片描述

在这里插入图片描述

通过这个项目,我们学习了如何将深度学习模型应用到实际场景中,构建一个完整的人脸识别系统。这个系统可以进一步扩展为门禁系统、会员识别等多种应用。

如果你在实现过程中遇到任何问题,欢迎在评论区留言讨论!


网站公告

今日签到

点亮在社区的每一天
去签到