import cv2
import dlib
import numpy as np
from keras.models import load_model
from collections import defaultdict, deque
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from PIL import Image, ImageTk
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import time
import os
import threading
from sklearn.cluster import KMeans
class EnhancedEmotionAnalysisApp:
def __init__(self, root):
self.root = root
self.root.title("增强版视频表情分析系统")
self.root.geometry("1400x900")
self.initialize_models()
self.create_ui()
self.is_analyzing = False
self.use_camera = False
self.video_path = ""
self.output_path = ""
self.results = []
self.face_tracker = {}
self.next_face_id = 1
self.face_emotion_history = defaultdict(lambda: deque(maxlen=30))
def initialize_models(self):
"""初始化面部检测和情绪识别模型"""
try:
self.detector = dlib.get_frontal_face_detector()
self.predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
self.face_recognizer = dlib.face_recognition_model_v1("dlib_face_recognition_resnet_model_v1.dat")
self.emotion_model = load_model("emotion_model.hdf5")
self.emotion_labels = ['愤怒', '厌恶', '恐惧', '开心', '悲伤', '惊讶', '平静']
self.emotion_evaluations = {
'愤怒': "负面情绪,可能表示不满或生气",
'厌恶': "强烈负面情绪,表示反感或不喜欢",
'恐惧': "负面情绪,可能感到害怕或担忧",
'开心': "积极情绪,表示快乐或满意",
'悲伤': "负面情绪,可能感到难过或失落",
'惊讶': "中性情绪,可能是惊喜或惊吓",
'平静': "中性情绪,情绪稳定"
}
self.emotion_colors = {
'愤怒': (0, 0, 255),
'厌恶': (0, 102, 0),
'恐惧': (255, 255, 0),
'开心': (0, 255, 0),
'悲伤': (255, 0, 0),
'惊讶': (0, 255, 255),
'平静': (255, 255, 255)
}
except Exception as e:
messagebox.showerror("错误", f"模型初始化失败: {str(e)}")
self.root.destroy()
def create_ui(self):
"""创建用户界面"""
control_frame = tk.Frame(self.root, padx=10, pady=10)
control_frame.pack(fill=tk.X)
self.btn_select = tk.Button(control_frame, text="选择视频", command=self.select_video)
self.btn_select.pack(side=tk.LEFT, padx=5)
self.btn_camera = tk.Button(control_frame, text="使用摄像头", command=self.use_webcam)
self.btn_camera.pack(side=tk.LEFT, padx=5)
self.btn_analyze = tk.Button(control_frame, text="开始分析", command=self.start_analysis, state=tk.DISABLED)
self.btn_analyze.pack(side=tk.LEFT, padx=5)
self.btn_stop = tk.Button(control_frame, text="停止分析", command=self.stop_analysis, state=tk.DISABLED)
self.btn_stop.pack(side=tk.LEFT, padx=5)
self.btn_export = tk.Button(control_frame, text="导出结果", command=self.export_results, state=tk.DISABLED)
self.btn_export.pack(side=tk.LEFT, padx=5)
self.lbl_video_path = tk.Label(control_frame, text="未选择视频", anchor=tk.W)
self.lbl_video_path.pack(side=tk.LEFT, padx=10, fill=tk.X, expand=True)
content_frame = tk.Frame(self.root)
content_frame.pack(fill=tk.BOTH, expand=True)
video_frame = tk.Frame(content_frame, width=800, height=500, bg='black')
video_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
video_frame.pack_propagate(False)
self.lbl_video = tk.Label(video_frame)
self.lbl_video.pack(fill=tk.BOTH, expand=True)
result_frame = tk.Frame(content_frame, width=600, height=500, bg='white')
result_frame.pack(side=tk.RIGHT, fill=tk.BOTH)
result_frame.pack_propagate(False)
self.result_notebook = ttk.Notebook(result_frame)
self.result_notebook.pack(fill=tk.BOTH, expand=True)
realtime_frame = ttk.Frame(self.result_notebook)
self.result_notebook.add(realtime_frame, text="实时分析")
self.person_frames = []
for i in range(3):
person_frame = ttk.Frame(realtime_frame)
person_frame.pack(fill=tk.X, padx=5, pady=5)
lbl_person = ttk.Label(person_frame, text=f"人物 {i + 1}:", font=('Arial', 12))
lbl_person.pack(side=tk.LEFT)
lbl_emotion = ttk.Label(person_frame, text="等待分析...", font=('Arial', 12))
lbl_emotion.pack(side=tk.LEFT, padx=10)
lbl_eval = ttk.Label(person_frame, text="", wraplength=400)
lbl_eval.pack(side=tk.LEFT)
self.person_frames.append({
'frame': person_frame,
'lbl_person': lbl_person,
'lbl_emotion': lbl_emotion,
'lbl_eval': lbl_eval
})
trend_frame = ttk.Frame(self.result_notebook)
self.result_notebook.add(trend_frame, text="情绪趋势")
self.trend_fig, self.trend_ax = plt.subplots(figsize=(5, 3), dpi=100)
self.trend_canvas = FigureCanvasTkAgg(self.trend_fig, master=trend_frame)
self.trend_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
dist_frame = ttk.Frame(self.result_notebook)
self.result_notebook.add(dist_frame, text="情绪分布")
self.dist_fig, self.dist_ax = plt.subplots(figsize=(5, 3), dpi=100)
self.dist_canvas = FigureCanvasTkAgg(self.dist_fig, master=dist_frame)
self.dist_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
table_frame = ttk.Frame(self.result_notebook)
self.result_notebook.add(table_frame, text="详细结果")
columns = ("时间", "人物", "情绪", "置信度", "评价")
self.result_tree = ttk.Treeview(table_frame, columns=columns, show="headings", height=15)
for col in columns:
self.result_tree.heading(col, text=col)
self.result_tree.column(col, width=100, anchor=tk.CENTER)
self.result_tree.column("评价", width=200)
self.result_tree.pack(fill=tk.BOTH, expand=True)
self.status_var = tk.StringVar()
self.status_var.set("准备就绪")
status_bar = tk.Label(self.root, textvariable=self.status_var, bd=1, relief=tk.SUNKEN, anchor=tk.W)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def select_video(self):
"""选择视频文件"""
self.use_camera = False
file_path = filedialog.askopenfilename(
title="选择视频文件",
filetypes=[("视频文件", "*.mp4 *.avi *.mov"), ("所有文件", "*.*")]
)
if file_path:
self.video_path = file_path
self.lbl_video_path.config(text=file_path)
self.btn_analyze.config(state=tk.NORMAL)
self.status_var.set(f"已选择视频: {os.path.basename(file_path)}")
self.preview_video()
def use_webcam(self):
"""使用摄像头"""
self.use_camera = True
self.video_path = "摄像头"
self.lbl_video_path.config(text="使用摄像头")
self.btn_analyze.config(state=tk.NORMAL)
self.status_var.set("准备使用摄像头进行分析")
cap = cv2.VideoCapture(0)
if cap.isOpened():
ret, frame = cap.read()
cap.release()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(frame)
img = ImageTk.PhotoImage(image=img)
self.lbl_video.config(image=img)
self.lbl_video.image = img
def preview_video(self):
"""预览视频第一帧"""
cap = cv2.VideoCapture(self.video_path)
ret, frame = cap.read()
cap.release()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(frame)
img = ImageTk.PhotoImage(image=img)
self.lbl_video.config(image=img)
self.lbl_video.image = img
def start_analysis(self):
"""开始分析视频或摄像头"""
self.is_analyzing = True
self.results = []
self.face_tracker.clear()
self.next_face_id = 1
self.face_emotion_history.clear()
for item in self.result_tree.get_children():
self.result_tree.delete(item)
self.btn_select.config(state=tk.DISABLED)
self.btn_camera.config(state=tk.DISABLED)
self.btn_analyze.config(state=tk.DISABLED)
self.btn_stop.config(state=tk.NORMAL)
self.btn_export.config(state=tk.DISABLED)
analysis_thread = threading.Thread(target=self.analyze_video, daemon=True)
analysis_thread.start()
def stop_analysis(self):
"""停止分析"""
self.is_analyzing = False
self.btn_stop.config(state=tk.DISABLED)
self.btn_select.config(state=tk.NORMAL)
self.btn_camera.config(state=tk.NORMAL)
self.btn_export.config(state=tk.NORMAL)
self.status_var.set("分析已停止")
def analyze_video(self):
"""分析视频帧或摄像头"""
if self.use_camera:
cap = cv2.VideoCapture(0)
frame_count = 0
fps = 30
else:
cap = cv2.VideoCapture(self.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
start_time = time.time()
frame_num = 0
while cap.isOpened() and self.is_analyzing:
ret, frame = cap.read()
frame_num += 1
if not ret:
break
if not self.use_camera:
progress = frame_num / frame_count
elapsed = time.time() - start_time
remaining = (frame_count - frame_num) / fps if frame_num < frame_count else 0
self.status_var.set(
f"分析中... 进度: {progress:.1%} 已用时间: {elapsed:.1f}s 剩余时间: {remaining:.1f}s"
)
else:
self.status_var.set(f"摄像头分析中... 已处理帧数: {frame_num}")
analyzed_frame, face_results = self.analyze_frame(frame,
frame_num / fps if not self.use_camera else frame_num)
self.update_ui(analyzed_frame, face_results, frame_num / fps if not self.use_camera else frame_num)
analyzed_frame = cv2.cvtColor(analyzed_frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(analyzed_frame)
img = ImageTk.PhotoImage(image=img)
self.lbl_video.config(image=img)
self.lbl_video.image = img
if not self.use_camera:
time.sleep(1 / fps)
cap.release()
self.is_analyzing = False
if not self.use_camera and frame_num == frame_count:
self.status_var.set(f"分析完成! 共分析 {frame_num} 帧")
messagebox.showinfo("完成", "视频分析完成!")
else:
self.status_var.set(f"分析中断! 已分析 {frame_num} 帧")
self.btn_select.config(state=tk.NORMAL)
self.btn_camera.config(state=tk.NORMAL)
self.btn_analyze.config(state=tk.NORMAL)
self.btn_export.config(state=tk.NORMAL)
def analyze_frame(self, frame, timestamp):
"""分析单帧图像"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self.detector(gray)
analyzed_frame = frame.copy()
face_results = []
face_descriptors = []
shapes = []
for face in faces:
shape = self.predictor(gray, face)
shapes.append(shape)
face_descriptor = self.face_recognizer.compute_face_descriptor(frame, shape)
face_descriptors.append(np.array(face_descriptor))
face_ids = []
if len(faces) > 1 and len(face_descriptors) == len(faces):
kmeans = KMeans(n_clusters=min(len(faces), 3), random_state=0).fit(face_descriptors)
face_ids = kmeans.labels_
elif len(faces) == 1:
face_ids = [0]
for i, (face, shape, face_id) in enumerate(zip(faces, shapes, face_ids)):
if len(face_descriptors) == len(faces):
descriptor = face_descriptors[i]
matched_id = None
for fid, data in self.face_tracker.items():
dist = np.linalg.norm(data['descriptor'] - descriptor)
if dist < 0.5:
matched_id = fid
break
if matched_id is None:
matched_id = self.next_face_id
self.next_face_id += 1
self.face_tracker[matched_id] = {
'descriptor': descriptor,
'color': (np.random.randint(0, 255),
np.random.randint(0, 255),
np.random.randint(0, 255))
}
else:
matched_id = i + 1
(x, y, w, h) = self.get_face_bounds(face)
face_roi = frame[y:y + h, x:x + w]
if face_roi.size == 0:
continue
processed_face = self.preprocess_face(face_roi)
emotion_prediction = self.emotion_model.predict(processed_face)
emotion_index = np.argmax(emotion_prediction)
emotion = self.emotion_labels[emotion_index]
confidence = np.max(emotion_prediction)
result = {
'time': timestamp,
'person_id': matched_id,
'emotion': emotion,
'confidence': confidence,
'evaluation': self.emotion_evaluations[emotion]
}
face_results.append(result)
self.face_emotion_history[matched_id].append(emotion_index)
face_color = self.face_tracker.get(matched_id, {}).get('color', (255, 255, 255))
cv2.rectangle(analyzed_frame, (x, y), (x + w, y + h), face_color, 2)
info_text = f"人物 {matched_id}: {emotion} ({confidence:.2f})"
cv2.putText(analyzed_frame, info_text, (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, face_color, 2)
shape_np = self.shape_to_np(shape)
for (x_point, y_point) in shape_np:
cv2.circle(analyzed_frame, (x_point, y_point), 1, (0, 255, 0), -1)
return analyzed_frame, face_results
def update_ui(self, frame, face_results, timestamp):
"""更新UI显示"""
for i, person_frame in enumerate(self.person_frames):
if i < len(face_results):
result = face_results[i]
person_id = result['person_id']
emotion = result['emotion']
confidence = result['confidence']
evaluation = result['evaluation']
color = self.rgb_to_hex(self.face_tracker.get(person_id, {}).get('color', (255, 255, 255)))
person_frame['lbl_person'].config(
text=f"人物 {person_id}:",
foreground=color
)
person_frame['lbl_emotion'].config(
text=f"{emotion} ({confidence:.2f})",
foreground=self.rgb_to_hex(self.emotion_colors[emotion])
)
person_frame['lbl_eval'].config(text=evaluation)
person_frame['frame'].pack()
else:
person_frame['frame'].pack_forget()
for result in face_results:
self.result_tree.insert("", tk.END, values=(
f"{result['time']:.1f}s",
f"人物 {result['person_id']}",
result['emotion'],
f"{result['confidence']:.2f}",
result['evaluation']
))
self.update_trend_chart()
self.update_distribution_chart()
if face_results:
self.result_tree.yview_moveto(1)
def update_trend_chart(self):
"""更新情绪趋势图表"""
self.trend_ax.clear()
for person_id, history in self.face_emotion_history.items():
if len(history) > 1:
color = np.array(self.face_tracker.get(person_id, {}).get('color', (255, 255, 255))) / 255.0
self.trend_ax.plot(
range(len(history)),
list(history),
label=f"人物 {person_id}",
color=color,
marker='o'
)
if self.face_emotion_history:
self.trend_ax.set_title('情绪变化趋势')
self.trend_ax.set_xlabel('时间(帧)')
self.trend_ax.set_ylabel('情绪')
self.trend_ax.set_yticks(range(len(self.emotion_labels)))
self.trend_ax.set_yticklabels(self.emotion_labels)
self.trend_ax.legend()
self.trend_ax.grid(True)
self.trend_canvas.draw()
def update_distribution_chart(self):
"""更新情绪分布图表"""
self.dist_ax.clear()
emotion_counts = {emotion: 0 for emotion in self.emotion_labels}
for history in self.face_emotion_history.values():
for idx in history:
emotion_counts[self.emotion_labels[idx]] += 1
emotions = list(emotion_counts.keys())
counts = list(emotion_counts.values())
colors = [self.rgb_to_hex(self.emotion_colors[e]) for e in emotions]
bars = self.dist_ax.bar(emotions, counts, color=colors)
self.dist_ax.set_title('情绪分布统计')
self.dist_ax.set_ylabel('出现次数')
for bar in bars:
height = bar.get_height()
self.dist_ax.text(bar.get_x() + bar.get_width() / 2., height,
f'{int(height)}', ha='center', va='bottom')
self.dist_canvas.draw()
def export_results(self):
"""导出分析结果"""
if not self.face_emotion_history:
messagebox.showwarning("警告", "没有可导出的分析结果")
return
data = []
for item in self.result_tree.get_children():
values = self.result_tree.item(item, 'values')
if values and len(values) == 5:
data.append({
'时间': values[0],
'人物': values[1],
'情绪': values[2],
'置信度': float(values[3]),
'评价': values[4]
})
if not data:
messagebox.showwarning("警告", "没有可导出的分析结果")
return
save_path = filedialog.asksaveasfilename(
title="保存分析结果",
defaultextension=".xlsx",
filetypes=[("Excel文件", "*.xlsx"), ("CSV文件", "*.csv"), ("所有文件", "*.*")]
)
if not save_path:
return
try:
df = pd.DataFrame(data)
if save_path.endswith('.csv'):
df.to_csv(save_path, index=False, encoding='utf-8-sig')
else:
df.to_excel(save_path, index=False)
messagebox.showinfo("成功", f"结果已成功导出到: {save_path}")
self.status_var.set(f"结果已导出到: {os.path.basename(save_path)}")
except Exception as e:
messagebox.showerror("错误", f"导出失败: {str(e)}")
def preprocess_face(self, face_image):
"""预处理面部图像以用于情绪识别"""
face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
face_image = cv2.resize(face_image, (64, 64))
face_image = np.expand_dims(face_image, axis=0)
face_image = np.expand_dims(face_image, axis=-1)
return face_image / 255.0
def shape_to_np(self, shape, dtype="int"):
"""将dlib形状对象转换为numpy数组"""
coords = np.zeros((68, 2), dtype=dtype)
for i in range(0, 68):
coords[i] = (shape.part(i).x, shape.part(i).y)
return coords
def get_face_bounds(self, face):
"""获取面部边界框"""
x = face.left()
y = face.top()
w = face.right() - x
h = face.bottom() - y
return (x, y, w, h)
def rgb_to_hex(self, rgb):
"""将RGB颜色转换为十六进制"""
if isinstance(rgb, np.ndarray):
rgb = tuple(rgb.astype(int))
return '#%02x%02x%02x' % rgb[:3]
if __name__ == "__main__":
root = tk.Tk()
app = EnhancedEmotionAnalysisApp(root)
root.mainloop()