核心技术选型表
技术组件 | 版本/型号 | 用途 |
---|---|---|
OpenCV DNN | 4.5.5+ | 人脸检测 |
FaceNet (facenet-pytorch) | 0.5.0+ | 人脸特征提取 |
MiniConda | 最新版 | Python环境管理 |
PyTorch | 1.8.0+ | FaceNet运行基础 |
OpenVINO | 2021.4+ | 模型加速(可选) |
SSD Caffe模型 | res10_300x300 | 高精度人脸检测 |
一、环境准备与项目搭建
1.1 MiniConda环境配置
首先我们需要安装MiniConda来管理我们的Python环境:
# 下载MiniConda安装包(Windows版)
# 官网地址:https://docs.conda.io/en/latest/miniconda.html
# 创建专用环境
conda create -n opencv_face python=3.8
conda activate opencv_face
# 安装基础依赖
pip install numpy pandas opencv-python-headless pillow facenet-pytorch openvino
注意:
opencv-python-headless
是不带GUI功能的OpenCV版本,适合服务器部署。如果需要显示图像,可以安装完整版opencv-python
。
1.2 项目结构说明
我们的attendance_system
目录组织如下:
attendance_system/
├── face_detection/
│ └── models/
│ └── opencv/
│ ├── deploy.prototxt.txt
│ └── res10_300x300_ssd_iter_140000.caffemodel
│ └── detector.py
│ └── recognizer.py
├── data/ # 存储人脸数据
├── configs/ # 配置文件
├── attendance_report.py # 考勤报表生成
├── database.py # 数据库操作
├── main.py # 主程序入口
├── register_employee.py # 员工注册
├── reports/ # 出勤记录文件
└── test_face.jpg # 测试图片
二、核心技术原理
2.1 OpenCV DNN人脸检测
我们使用OpenCV的DNN模块加载Caffe模型进行人脸检测:
人脸检测模型:
这个SSD-based模型在300x300输入分辨率下表现优异,准确率高且速度较快。
2.2 FaceNet人脸特征提取
FaceNet是由Google研究团队提出的人脸识别系统,它能将人脸图像映射到128维的特征空间,在这个空间中,相同人的人脸距离近,不同人的人脸距离远。
我们使用facenet-pytorch
库提供的预训练模型,它基于InceptionResnetV1架构。
FaceNet模型:
pip install facenet-pytorch
验证安装
python -c "from facenet_pytorch import InceptionResnetV1; print('安装成功!')"
如果显示
安装成功!
说明一切正常如果报错,请检查网络或尝试镜像源:
pip install facenet-pytorch -i https://pypi.tuna.tsinghua.edu.cn/simple
三、代码实现详解
3.1 人脸检测模块
首先在attendance_system\face_detection
目录下创建detector.py
:
# D:\demo\attendance_system\face_detection\detector.py
import cv2
import numpy as np
from configs.config import MODEL_PATHS, FACE_DETECTION
class FaceDetector:
def __init__(self):
# 加载OpenCV的DNN人脸检测器
self.net = cv2.dnn.readNetFromCaffe(
MODEL_PATHS["opencv_face_detector"],
MODEL_PATHS["opencv_face_weights"]
)
self.confidence_threshold = FACE_DETECTION["confidence_threshold"]
self.padding = FACE_DETECTION["padding"]
def detect_faces(self, image):
(h, w) = image.shape[:2]
# 构建blob并输入网络
blob = cv2.dnn.blobFromImage(
cv2.resize(image, (300, 300)), 1.0, (300, 300),
(104.0, 177.0, 123.0), swapRB=False, crop=False
)
self.net.setInput(blob)
detections = self.net.forward()
faces = []
for i in range(0, detections.shape[2]):
confidence = detections[0, 0, i, 2]
if confidence > self.confidence_threshold:
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
# 扩展人脸区域
startX = max(0, startX - self.padding)
startY = max(0, startY - self.padding)
endX = min(w, endX + self.padding)
endY = min(h, endY + self.padding)
faces.append((startX, startY, endX, endY))
return faces
3.2 人脸特征提取模块
attendance_system\face_detection
下创建recognizer.py
:
# D:\demo\attendance_system\face_detection\recognizer.py
import torch
import numpy as np
from facenet_pytorch import InceptionResnetV1
from PIL import Image
import cv2
from configs.config import MODEL_PATHS, FACE_RECOGNITION
class FaceRecognizer:
def __init__(self):
# 加载预训练的FaceNet模型
self.resnet = InceptionResnetV1(pretrained='vggface2').eval()
self.threshold = FACE_RECOGNITION["threshold"]
self.image_size = FACE_RECOGNITION["image_size"]
# 尝试使用OpenVINO加速
try:
from openvino.runtime import Core
core = Core()
# 转换为OpenVINO格式
ov_model = torch.export(self.resnet, torch.randn(1, 3, self.image_size, self.image_size))
compiled_model = core.compile_model(ov_model, 'GPU') # 使用GPU加速
self.resnet = compiled_model
print("使用OpenVINO GPU加速")
except Exception as e:
print(f"OpenVINO加速失败,使用原始模型: {e}")
def get_embedding(self, face_image):
# 转换图像为模型输入格式
face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
face = Image.fromarray(face)
face = face.resize((self.image_size, self.image_size))
face = np.array(face).astype(np.float32)
face = (face - 127.5) / 128.0 # FaceNet的标准化
face = torch.from_numpy(face).permute(2, 0, 1).unsqueeze(0)
# 获取特征向量
with torch.no_grad():
embedding = self.resnet(face)
return embedding.numpy().flatten()
def compare_faces(self, embedding1, embedding2):
# 计算余弦相似度
dot = np.dot(embedding1, embedding2)
norm = np.linalg.norm(embedding1) * np.linalg.norm(embedding2)
similarity = dot / norm
return similarity > self.threshold
3.3 员工注册功能
register_employee.py
的主要内容:
import cv2
import numpy as np
import time
from face_detection.detector import FaceDetector
from face_detection.recognizer import FaceRecognizer
from database import EmployeeDatabase
import winsound # Windows平台提示音使用
def register_employee():
detector = FaceDetector()
recognizer = FaceRecognizer()
db = EmployeeDatabase()
# 获取员工信息
employee_id = input("请输入员工ID: ")
name = input("请输入员工姓名: ")
department = input("请输入部门: ")
position = input("请输入职位: ")
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("无法打开摄像头")
return
print("\n========== 员工注册指引 ==========")
print("1. 请保持正对摄像头")
print("2. 确保光线充足,不要背光")
print("3. 保持面部无遮挡(眼镜/口罩需暂时取下)")
print("4. 系统将自动捕捉5张不同角度的人脸图像")
print("=================================\n")
captured_faces = []
last_capture_time = 0
CHECK_INTERVAL = 0.5
CAPTURE_COUNT = 5
guidance = [
"请正对摄像头",
"请稍微向左转头",
"请稍微向右转头",
"请微微抬头",
"请微微低头"
]
while len(captured_faces) < CAPTURE_COUNT:
ret, frame = cap.read()
if not ret:
print("错误: 无法获取视频帧")
break
current_time = time.time()
status_text = [
f"进度: {len(captured_faces)}/{CAPTURE_COUNT}",
"提示: " + guidance[min(len(captured_faces), 4)]
]
faces = detector.detect_faces(frame)
if len(faces) == 1:
(startX, startY, endX, endY) = faces[0]
face = frame[startY:endY, startX:endX]
cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
if current_time - last_capture_time > CHECK_INTERVAL:
captured_faces.append(face)
last_capture_time = current_time
print(f"✅ 已捕捉人脸 {len(captured_faces)}/{CAPTURE_COUNT} - {status_text[1]}")
winsound.Beep(1000, 200) # 播放提示音
for i, text in enumerate(status_text):
cv2.putText(frame, text, (10, 30 + i * 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow("员工注册", frame)
if cv2.waitKey(1) & 0xFF == 27: # ESC 键退出
break
cap.release()
cv2.destroyAllWindows()
if len(captured_faces) >= 3:
embeddings = []
for face in captured_faces:
embedding = recognizer.get_embedding(face)
embeddings.append(embedding)
avg_embedding = np.mean(embeddings, axis=0)
if db.add_employee(name, employee_id, department, position, avg_embedding):
print(f"🎉 员工 {name} 注册成功!")
else:
print("⚠️ 注册失败,可能员工ID已存在")
else:
print("⚠️ 捕捉的人脸数量不足,注册失败")
db.close()
if __name__ == "__main__":
register_employee()
3.4 出勤记录功能
attendance_report.py
的主要内容:
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
from configs.config import DATABASE
def check_dependencies():
"""检查并安装必要依赖"""
try:
import openpyxl, pandas
except ImportError:
import sys, subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "openpyxl", "pandas"])
def generate_report(days=7, output_dir=None):
"""
生成考勤报表
:param days: 统计最近多少天的数据
:param output_dir: 自定义输出目录
"""
check_dependencies()
# 路径处理
base_dir = Path(__file__).parent
output_dir = Path(output_dir) if output_dir else base_dir / "reports"
output_dir.mkdir(parents=True, exist_ok=True)
# 日期计算
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
filename = f"attendance_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}.xlsx"
report_path = output_dir / filename
# 数据库操作
try:
conn = sqlite3.connect(DATABASE["employees"])
query = '''
SELECT e.name, e.employee_id, e.department,
a.check_time,
CASE WHEN time(a.check_time) > '09:30:00' THEN '迟到'
WHEN time(a.check_time) < '08:30:00' THEN '早到'
ELSE '正常' END as status
FROM attendance a
JOIN employees e ON a.employee_id = e.employee_id
WHERE date(a.check_time) BETWEEN ? AND ?
ORDER BY a.check_time DESC
'''
df = pd.read_sql_query(query, conn, params=(start_date.date(), end_date.date()))
if not df.empty:
# 转换日期时间格式
df['check_time'] = pd.to_datetime(df['check_time'])
df['date'] = df['check_time'].dt.date
df['weekday'] = df['check_time'].dt.day_name()
# 创建透视表
pivot = df.pivot_table(index=['name', 'employee_id'],
columns='date',
values='status',
aggfunc='first')
# 保存文件
with pd.ExcelWriter(report_path, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='原始数据', index=False)
pivot.to_excel(writer, sheet_name='考勤汇总')
print(f"报表已生成: {report_path.resolve()}")
return report_path
else:
print("警告: 没有查询到考勤记录")
return None
except Exception as e:
print(f"生成报表失败: {str(e)}")
raise
finally:
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='生成考勤报表')
parser.add_argument('--days', type=int, default=7, help='统计天数')
parser.add_argument('--output', help='自定义输出目录')
args = parser.parse_args()
generate_report(days=args.days, output_dir=args.output)
3.5 数据库模块
database.py
的主要内容:
# D:\demo\attendance_system\database.py
import sqlite3
import numpy as np
import os
from configs.config import DATABASE
class EmployeeDatabase:
def __init__(self):
# 确保数据目录存在
os.makedirs(os.path.dirname(DATABASE["employees"]), exist_ok=True)
# 初始化数据库连接
self.conn = sqlite3.connect(DATABASE["employees"])
self.cursor = self.conn.cursor()
# 创建员工表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
employee_id TEXT UNIQUE NOT NULL,
department TEXT,
position TEXT,
register_date TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建打卡记录表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS attendance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id TEXT NOT NULL,
check_time TEXT DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'present',
FOREIGN KEY (employee_id) REFERENCES employees (employee_id)
)
''')
self.conn.commit()
# 加载人脸特征数据
self.load_face_data()
def load_face_data(self):
# 加载保存的人脸特征和姓名
if os.path.exists(DATABASE["embeddings"]):
self.embeddings = np.load(DATABASE["embeddings"])
self.names = np.load(DATABASE["names"])
else:
self.embeddings = np.array([])
self.names = np.array([])
def save_face_data(self):
# 保存人脸特征和姓名
np.save(DATABASE["embeddings"], self.embeddings)
np.save(DATABASE["names"], self.names)
def add_employee(self, name, employee_id, department, position, embedding):
# 添加新员工
try:
self.cursor.execute('''
INSERT INTO employees (name, employee_id, department, position)
VALUES (?, ?, ?, ?)
''', (name, employee_id, department, position))
# 更新人脸特征数据
if len(self.embeddings) == 0:
self.embeddings = np.array([embedding])
else:
self.embeddings = np.vstack((self.embeddings, embedding))
self.names = np.append(self.names, employee_id)
self.save_face_data()
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def record_attendance(self, employee_id, status='present'):
# 记录考勤
self.cursor.execute('''
INSERT INTO attendance (employee_id, status)
VALUES (?, ?)
''', (employee_id, status))
self.conn.commit()
def get_employee_info(self, employee_id):
# 获取员工信息
self.cursor.execute('''
SELECT name, department, position FROM employees WHERE employee_id = ?
''', (employee_id,))
return self.cursor.fetchone()
def close(self):
# 关闭数据库连接
self.conn.close()
3.6 考勤识别主程序
main.py
的主要内容:
# D:\demo\attendance_system\main.py
import cv2
import numpy as np
from face_detection.detector import FaceDetector
from face_detection.recognizer import FaceRecognizer
from database import EmployeeDatabase
from configs.config import DATABASE
import os
import time
import datetime
def main():
# 初始化组件
detector = FaceDetector()
recognizer = FaceRecognizer()
db = EmployeeDatabase()
# 创建数据目录
os.makedirs(os.path.dirname(DATABASE["employees"]), exist_ok=True)
# 初始化摄像头
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("无法打开摄像头")
return
print("人脸识别考勤系统已启动,按'q'键退出")
last_recognition_time = 0
recognized_employees = set()
while True:
ret, frame = cap.read()
if not ret:
print("无法获取视频帧")
break
# 检测人脸
faces = detector.detect_faces(frame)
# 在帧上绘制人脸框
for (startX, startY, endX, endY) in faces:
cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
# 提取人脸区域
face = frame[startY:endY, startX:endX]
# 每隔1秒尝试识别一次
current_time = time.time()
if current_time - last_recognition_time > 1.0:
try:
# 获取人脸特征
embedding = recognizer.get_embedding(face)
# 与数据库中的特征比较
if len(db.embeddings) > 0:
similarities = np.array([
recognizer.compare_faces(embedding, db_emb)
for db_emb in db.embeddings
])
if np.any(similarities):
matched_idx = np.argmax(similarities)
employee_id = db.names[matched_idx]
# 如果之前未识别过,则记录考勤
if employee_id not in recognized_employees:
name, department, position = db.get_employee_info(employee_id)
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{current_time_str} - 识别到员工: {name}, 部门: {department}, 职位: {position}")
# 记录考勤
db.record_attendance(employee_id)
recognized_employees.add(employee_id)
# 在图像上显示员工信息
cv2.putText(frame, f"ID: {employee_id}", (startX, startY - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.putText(frame, f"Name: {name}", (startX, startY - 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
last_recognition_time = current_time
except Exception as e:
print(f"人脸识别错误: {e}")
# 显示结果
cv2.imshow("Face Recognition Attendance System", frame)
# 按'q'键退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 清理资源
cap.release()
cv2.destroyAllWindows()
db.close()
if __name__ == "__main__":
main()
四、性能优化技巧
4.1 使用OpenVINO加速
我们可以将模型转换为OpenVINO格式以获得更好的性能:
from openvino.inference_engine import IECore
class OpenVINOFaceDetector:
def __init__(self, model_xml, model_bin, device='CPU'):
self.ie = IECore()
self.net = self.ie.read_network(model=model_xml, weights=model_bin)
self.exec_net = self.ie.load_network(network=self.net, device_name=device)
self.input_blob = next(iter(self.net.input_info))
self.out_blob = next(iter(self.net.outputs))
def detect_faces(self, image):
# 与之前类似的预处理
blob = cv2.dnn.blobFromImage(...)
# 使用OpenVINO推理
res = self.exec_net.infer(inputs={self.input_blob: blob})
detections = res[self.out_blob]
# 后处理相同
...
4.2 多线程处理
对于实时视频流,可以使用多线程来提高性能:
from threading import Thread
import queue
class VideoStream:
def __init__(self, src=0):
self.stream = cv2.VideoCapture(src)
self.stopped = False
self.frames = queue.Queue(maxsize=128)
def start(self):
Thread(target=self.update, args=()).start()
return self
def update(self):
while not self.stopped:
if not self.frames.full():
ret, frame = self.stream.read()
if not ret:
self.stop()
return
self.frames.put(frame)
def read(self):
return self.frames.get()
def stop(self):
self.stopped = True
五、部署运行
5.1 启动系统
conda activate opencv_face
5.2 初始化数据库
(opencv_face) D:\demo\attendance_system>python database.py init
5.3 注册新员工
(opencv_face) D:\demo\attendance_system>python register_employee.py
Q:退出系统
5.4 调用主程序
(opencv_face) D:\demo\attendance_system>python main.py
5.5 生成考勤报表
(opencv_face) D:\demo\attendance_system>python attendance_report.py
报表已生成: D:\demo\attendance_system\reports\attendance_20250707_to_20250714.xlsx
通过这个项目,我们学习了如何将深度学习模型应用到实际场景中,构建一个完整的人脸识别系统。这个系统可以进一步扩展为门禁系统、会员识别等多种应用。
如果你在实现过程中遇到任何问题,欢迎在评论区留言讨论!