增强版视频表情分析系统

发布于:2025-04-20 ⋅ 阅读:(18) ⋅ 点赞:(0)
import cv2
import dlib
import numpy as np
from keras.models import load_model
from collections import defaultdict, deque
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from PIL import Image, ImageTk
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import time
import os
import threading
from sklearn.cluster import KMeans


class EnhancedEmotionAnalysisApp:
    def __init__(self, root):
        self.root = root
        self.root.title("增强版视频表情分析系统")
        self.root.geometry("1400x900")

        # 初始化模型
        self.initialize_models()

        # 创建UI
        self.create_ui()

        # 分析状态变量
        self.is_analyzing = False
        self.use_camera = False
        self.video_path = ""
        self.output_path = ""
        self.results = []
        self.face_tracker = {}  # 用于跟踪不同人物
        self.next_face_id = 1
        self.face_emotion_history = defaultdict(lambda: deque(maxlen=30))

    def initialize_models(self):
        """初始化面部检测和情绪识别模型"""
        try:
            # 面部检测器
            self.detector = dlib.get_frontal_face_detector()

            # 面部特征点预测器
            self.predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")

            # 面部识别模型 (用于区分不同人物)
            self.face_recognizer = dlib.face_recognition_model_v1("dlib_face_recognition_resnet_model_v1.dat")

            # 情绪识别模型
            self.emotion_model = load_model("emotion_model.hdf5")

            # 情绪标签
            self.emotion_labels = ['愤怒', '厌恶', '恐惧', '开心', '悲伤', '惊讶', '平静']

            # 情绪评价映射
            self.emotion_evaluations = {
                '愤怒': "负面情绪,可能表示不满或生气",
                '厌恶': "强烈负面情绪,表示反感或不喜欢",
                '恐惧': "负面情绪,可能感到害怕或担忧",
                '开心': "积极情绪,表示快乐或满意",
                '悲伤': "负面情绪,可能感到难过或失落",
                '惊讶': "中性情绪,可能是惊喜或惊吓",
                '平静': "中性情绪,情绪稳定"
            }

            # 情绪颜色映射
            self.emotion_colors = {
                '愤怒': (0, 0, 255),  # 红色
                '厌恶': (0, 102, 0),  # 深绿色
                '恐惧': (255, 255, 0),  # 青色
                '开心': (0, 255, 0),  # 绿色
                '悲伤': (255, 0, 0),  # 蓝色
                '惊讶': (0, 255, 255),  # 黄色
                '平静': (255, 255, 255)  # 白色
            }

        except Exception as e:
            messagebox.showerror("错误", f"模型初始化失败: {str(e)}")
            self.root.destroy()

    def create_ui(self):
        """创建用户界面"""
        # 顶部控制面板
        control_frame = tk.Frame(self.root, padx=10, pady=10)
        control_frame.pack(fill=tk.X)

        # 视频选择按钮
        self.btn_select = tk.Button(control_frame, text="选择视频", command=self.select_video)
        self.btn_select.pack(side=tk.LEFT, padx=5)

        # 摄像头按钮
        self.btn_camera = tk.Button(control_frame, text="使用摄像头", command=self.use_webcam)
        self.btn_camera.pack(side=tk.LEFT, padx=5)

        # 开始分析按钮
        self.btn_analyze = tk.Button(control_frame, text="开始分析", command=self.start_analysis, state=tk.DISABLED)
        self.btn_analyze.pack(side=tk.LEFT, padx=5)

        # 停止分析按钮
        self.btn_stop = tk.Button(control_frame, text="停止分析", command=self.stop_analysis, state=tk.DISABLED)
        self.btn_stop.pack(side=tk.LEFT, padx=5)

        # 导出结果按钮
        self.btn_export = tk.Button(control_frame, text="导出结果", command=self.export_results, state=tk.DISABLED)
        self.btn_export.pack(side=tk.LEFT, padx=5)

        # 视频路径显示
        self.lbl_video_path = tk.Label(control_frame, text="未选择视频", anchor=tk.W)
        self.lbl_video_path.pack(side=tk.LEFT, padx=10, fill=tk.X, expand=True)

        # 主内容区域
        content_frame = tk.Frame(self.root)
        content_frame.pack(fill=tk.BOTH, expand=True)

        # 视频显示区域
        video_frame = tk.Frame(content_frame, width=800, height=500, bg='black')
        video_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        video_frame.pack_propagate(False)

        self.lbl_video = tk.Label(video_frame)
        self.lbl_video.pack(fill=tk.BOTH, expand=True)

        # 分析结果区域
        result_frame = tk.Frame(content_frame, width=600, height=500, bg='white')
        result_frame.pack(side=tk.RIGHT, fill=tk.BOTH)
        result_frame.pack_propagate(False)

        # 创建结果笔记本(多标签页)
        self.result_notebook = ttk.Notebook(result_frame)
        self.result_notebook.pack(fill=tk.BOTH, expand=True)

        # 实时分析标签页
        realtime_frame = ttk.Frame(self.result_notebook)
        self.result_notebook.add(realtime_frame, text="实时分析")

        # 人物情绪显示区域
        self.person_frames = []
        for i in range(3):  # 最多显示3个人的实时情绪
            person_frame = ttk.Frame(realtime_frame)
            person_frame.pack(fill=tk.X, padx=5, pady=5)

            lbl_person = ttk.Label(person_frame, text=f"人物 {i + 1}:", font=('Arial', 12))
            lbl_person.pack(side=tk.LEFT)

            lbl_emotion = ttk.Label(person_frame, text="等待分析...", font=('Arial', 12))
            lbl_emotion.pack(side=tk.LEFT, padx=10)

            lbl_eval = ttk.Label(person_frame, text="", wraplength=400)
            lbl_eval.pack(side=tk.LEFT)

            self.person_frames.append({
                'frame': person_frame,
                'lbl_person': lbl_person,
                'lbl_emotion': lbl_emotion,
                'lbl_eval': lbl_eval
            })

        # 情绪趋势标签页
        trend_frame = ttk.Frame(self.result_notebook)
        self.result_notebook.add(trend_frame, text="情绪趋势")

        # 情绪趋势图表
        self.trend_fig, self.trend_ax = plt.subplots(figsize=(5, 3), dpi=100)
        self.trend_canvas = FigureCanvasTkAgg(self.trend_fig, master=trend_frame)
        self.trend_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)

        # 情绪分布标签页
        dist_frame = ttk.Frame(self.result_notebook)
        self.result_notebook.add(dist_frame, text="情绪分布")

        # 情绪分布图表
        self.dist_fig, self.dist_ax = plt.subplots(figsize=(5, 3), dpi=100)
        self.dist_canvas = FigureCanvasTkAgg(self.dist_fig, master=dist_frame)
        self.dist_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)

        # 结果表格标签页
        table_frame = ttk.Frame(self.result_notebook)
        self.result_notebook.add(table_frame, text="详细结果")

        # 结果表格
        columns = ("时间", "人物", "情绪", "置信度", "评价")
        self.result_tree = ttk.Treeview(table_frame, columns=columns, show="headings", height=15)

        for col in columns:
            self.result_tree.heading(col, text=col)
            self.result_tree.column(col, width=100, anchor=tk.CENTER)

        self.result_tree.column("评价", width=200)
        self.result_tree.pack(fill=tk.BOTH, expand=True)

        # 底部状态栏
        self.status_var = tk.StringVar()
        self.status_var.set("准备就绪")
        status_bar = tk.Label(self.root, textvariable=self.status_var, bd=1, relief=tk.SUNKEN, anchor=tk.W)
        status_bar.pack(side=tk.BOTTOM, fill=tk.X)

    def select_video(self):
        """选择视频文件"""
        self.use_camera = False
        file_path = filedialog.askopenfilename(
            title="选择视频文件",
            filetypes=[("视频文件", "*.mp4 *.avi *.mov"), ("所有文件", "*.*")]
        )

        if file_path:
            self.video_path = file_path
            self.lbl_video_path.config(text=file_path)
            self.btn_analyze.config(state=tk.NORMAL)
            self.status_var.set(f"已选择视频: {os.path.basename(file_path)}")

            # 显示视频第一帧
            self.preview_video()

    def use_webcam(self):
        """使用摄像头"""
        self.use_camera = True
        self.video_path = "摄像头"
        self.lbl_video_path.config(text="使用摄像头")
        self.btn_analyze.config(state=tk.NORMAL)
        self.status_var.set("准备使用摄像头进行分析")

        # 测试摄像头
        cap = cv2.VideoCapture(0)
        if cap.isOpened():
            ret, frame = cap.read()
            cap.release()

            if ret:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                img = Image.fromarray(frame)
                img = ImageTk.PhotoImage(image=img)

                self.lbl_video.config(image=img)
                self.lbl_video.image = img

    def preview_video(self):
        """预览视频第一帧"""
        cap = cv2.VideoCapture(self.video_path)
        ret, frame = cap.read()
        cap.release()

        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img = Image.fromarray(frame)
            img = ImageTk.PhotoImage(image=img)

            self.lbl_video.config(image=img)
            self.lbl_video.image = img

    def start_analysis(self):
        """开始分析视频或摄像头"""
        self.is_analyzing = True
        self.results = []
        self.face_tracker.clear()
        self.next_face_id = 1
        self.face_emotion_history.clear()

        # 清空结果表格
        for item in self.result_tree.get_children():
            self.result_tree.delete(item)

        self.btn_select.config(state=tk.DISABLED)
        self.btn_camera.config(state=tk.DISABLED)
        self.btn_analyze.config(state=tk.DISABLED)
        self.btn_stop.config(state=tk.NORMAL)
        self.btn_export.config(state=tk.DISABLED)

        # 在后台线程中运行分析
        analysis_thread = threading.Thread(target=self.analyze_video, daemon=True)
        analysis_thread.start()

    def stop_analysis(self):
        """停止分析"""
        self.is_analyzing = False
        self.btn_stop.config(state=tk.DISABLED)
        self.btn_select.config(state=tk.NORMAL)
        self.btn_camera.config(state=tk.NORMAL)
        self.btn_export.config(state=tk.NORMAL)
        self.status_var.set("分析已停止")

    def analyze_video(self):
        """分析视频帧或摄像头"""
        if self.use_camera:
            cap = cv2.VideoCapture(0)
            frame_count = 0
            fps = 30  # 估计的摄像头FPS
        else:
            cap = cv2.VideoCapture(self.video_path)
            fps = cap.get(cv2.CAP_PROP_FPS)
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        start_time = time.time()
        frame_num = 0

        while cap.isOpened() and self.is_analyzing:
            ret, frame = cap.read()
            frame_num += 1

            if not ret:
                break

            # 计算进度
            if not self.use_camera:
                progress = frame_num / frame_count
                elapsed = time.time() - start_time
                remaining = (frame_count - frame_num) / fps if frame_num < frame_count else 0
                self.status_var.set(
                    f"分析中... 进度: {progress:.1%} 已用时间: {elapsed:.1f}s 剩余时间: {remaining:.1f}s"
                )
            else:
                self.status_var.set(f"摄像头分析中... 已处理帧数: {frame_num}")

            # 分析情绪
            analyzed_frame, face_results = self.analyze_frame(frame,
                                                              frame_num / fps if not self.use_camera else frame_num)

            # 更新UI
            self.update_ui(analyzed_frame, face_results, frame_num / fps if not self.use_camera else frame_num)

            # 显示处理后的帧
            analyzed_frame = cv2.cvtColor(analyzed_frame, cv2.COLOR_BGR2RGB)
            img = Image.fromarray(analyzed_frame)
            img = ImageTk.PhotoImage(image=img)

            self.lbl_video.config(image=img)
            self.lbl_video.image = img

            # 控制处理速度
            if not self.use_camera:
                time.sleep(1 / fps)  # 保持视频原始速度

        cap.release()
        self.is_analyzing = False

        if not self.use_camera and frame_num == frame_count:
            self.status_var.set(f"分析完成! 共分析 {frame_num} 帧")
            messagebox.showinfo("完成", "视频分析完成!")
        else:
            self.status_var.set(f"分析中断! 已分析 {frame_num} 帧")

        self.btn_select.config(state=tk.NORMAL)
        self.btn_camera.config(state=tk.NORMAL)
        self.btn_analyze.config(state=tk.NORMAL)
        self.btn_export.config(state=tk.NORMAL)

    def analyze_frame(self, frame, timestamp):
        """分析单帧图像"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.detector(gray)
        analyzed_frame = frame.copy()
        face_results = []

        # 获取所有面部的特征向量
        face_descriptors = []
        shapes = []
        for face in faces:
            shape = self.predictor(gray, face)
            shapes.append(shape)
            face_descriptor = self.face_recognizer.compute_face_descriptor(frame, shape)
            face_descriptors.append(np.array(face_descriptor))

        # 如果有多个面部,进行聚类区分不同人物
        face_ids = []
        if len(faces) > 1 and len(face_descriptors) == len(faces):
            # 使用KMeans聚类区分不同人物
            kmeans = KMeans(n_clusters=min(len(faces), 3), random_state=0).fit(face_descriptors)
            face_ids = kmeans.labels_
        elif len(faces) == 1:
            face_ids = [0]

        # 处理每个面部
        for i, (face, shape, face_id) in enumerate(zip(faces, shapes, face_ids)):
            # 获取或分配面部ID
            if len(face_descriptors) == len(faces):
                descriptor = face_descriptors[i]

                # 检查是否已经跟踪这个面部
                matched_id = None
                for fid, data in self.face_tracker.items():
                    dist = np.linalg.norm(data['descriptor'] - descriptor)
                    if dist < 0.5:  # 阈值,小于这个值认为是同一个人
                        matched_id = fid
                        break

                if matched_id is None:
                    matched_id = self.next_face_id
                    self.next_face_id += 1
                    self.face_tracker[matched_id] = {
                        'descriptor': descriptor,
                        'color': (np.random.randint(0, 255),
                                  np.random.randint(0, 255),
                                  np.random.randint(0, 255))
                    }
            else:
                matched_id = i + 1

            # 提取面部区域
            (x, y, w, h) = self.get_face_bounds(face)
            face_roi = frame[y:y + h, x:x + w]

            if face_roi.size == 0:
                continue

            # 预处理面部图像
            processed_face = self.preprocess_face(face_roi)

            # 预测情绪
            emotion_prediction = self.emotion_model.predict(processed_face)
            emotion_index = np.argmax(emotion_prediction)
            emotion = self.emotion_labels[emotion_index]
            confidence = np.max(emotion_prediction)

            # 记录结果
            result = {
                'time': timestamp,
                'person_id': matched_id,
                'emotion': emotion,
                'confidence': confidence,
                'evaluation': self.emotion_evaluations[emotion]
            }
            face_results.append(result)

            # 添加到历史记录
            self.face_emotion_history[matched_id].append(emotion_index)

            # 获取面部颜色
            face_color = self.face_tracker.get(matched_id, {}).get('color', (255, 255, 255))

            # 绘制面部边界框
            cv2.rectangle(analyzed_frame, (x, y), (x + w, y + h), face_color, 2)

            # 显示人物ID和情绪
            info_text = f"人物 {matched_id}: {emotion} ({confidence:.2f})"
            cv2.putText(analyzed_frame, info_text, (x, y - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, face_color, 2)

            # 绘制面部特征点
            shape_np = self.shape_to_np(shape)
            for (x_point, y_point) in shape_np:
                cv2.circle(analyzed_frame, (x_point, y_point), 1, (0, 255, 0), -1)

        return analyzed_frame, face_results

    def update_ui(self, frame, face_results, timestamp):
        """更新UI显示"""
        # 更新实时分析标签页
        for i, person_frame in enumerate(self.person_frames):
            if i < len(face_results):
                result = face_results[i]
                person_id = result['person_id']
                emotion = result['emotion']
                confidence = result['confidence']
                evaluation = result['evaluation']

                color = self.rgb_to_hex(self.face_tracker.get(person_id, {}).get('color', (255, 255, 255)))

                person_frame['lbl_person'].config(
                    text=f"人物 {person_id}:",
                    foreground=color
                )
                person_frame['lbl_emotion'].config(
                    text=f"{emotion} ({confidence:.2f})",
                    foreground=self.rgb_to_hex(self.emotion_colors[emotion])
                )
                person_frame['lbl_eval'].config(text=evaluation)
                person_frame['frame'].pack()
            else:
                person_frame['frame'].pack_forget()

        # 更新结果表格
        for result in face_results:
            self.result_tree.insert("", tk.END, values=(
                f"{result['time']:.1f}s",
                f"人物 {result['person_id']}",
                result['emotion'],
                f"{result['confidence']:.2f}",
                result['evaluation']
            ))

        # 更新情绪趋势图表
        self.update_trend_chart()

        # 更新情绪分布图表
        self.update_distribution_chart()

        # 自动滚动到最新结果
        if face_results:
            self.result_tree.yview_moveto(1)

    def update_trend_chart(self):
        """更新情绪趋势图表"""
        self.trend_ax.clear()

        # 为每个跟踪的人物绘制情绪趋势
        for person_id, history in self.face_emotion_history.items():
            if len(history) > 1:
                color = np.array(self.face_tracker.get(person_id, {}).get('color', (255, 255, 255))) / 255.0
                self.trend_ax.plot(
                    range(len(history)),
                    list(history),
                    label=f"人物 {person_id}",
                    color=color,
                    marker='o'
                )

        if self.face_emotion_history:
            self.trend_ax.set_title('情绪变化趋势')
            self.trend_ax.set_xlabel('时间(帧)')
            self.trend_ax.set_ylabel('情绪')
            self.trend_ax.set_yticks(range(len(self.emotion_labels)))
            self.trend_ax.set_yticklabels(self.emotion_labels)
            self.trend_ax.legend()
            self.trend_ax.grid(True)

        self.trend_canvas.draw()

    def update_distribution_chart(self):
        """更新情绪分布图表"""
        self.dist_ax.clear()

        # 统计所有情绪
        emotion_counts = {emotion: 0 for emotion in self.emotion_labels}
        for history in self.face_emotion_history.values():
            for idx in history:
                emotion_counts[self.emotion_labels[idx]] += 1

        # 绘制柱状图
        emotions = list(emotion_counts.keys())
        counts = list(emotion_counts.values())
        colors = [self.rgb_to_hex(self.emotion_colors[e]) for e in emotions]

        bars = self.dist_ax.bar(emotions, counts, color=colors)
        self.dist_ax.set_title('情绪分布统计')
        self.dist_ax.set_ylabel('出现次数')

        # 在柱子上方显示数值
        for bar in bars:
            height = bar.get_height()
            self.dist_ax.text(bar.get_x() + bar.get_width() / 2., height,
                              f'{int(height)}', ha='center', va='bottom')

        self.dist_canvas.draw()

    def export_results(self):
        """导出分析结果"""
        if not self.face_emotion_history:
            messagebox.showwarning("警告", "没有可导出的分析结果")
            return

        # 准备数据
        data = []
        for item in self.result_tree.get_children():
            values = self.result_tree.item(item, 'values')
            if values and len(values) == 5:
                data.append({
                    '时间': values[0],
                    '人物': values[1],
                    '情绪': values[2],
                    '置信度': float(values[3]),
                    '评价': values[4]
                })

        if not data:
            messagebox.showwarning("警告", "没有可导出的分析结果")
            return

        # 选择保存路径
        save_path = filedialog.asksaveasfilename(
            title="保存分析结果",
            defaultextension=".xlsx",
            filetypes=[("Excel文件", "*.xlsx"), ("CSV文件", "*.csv"), ("所有文件", "*.*")]
        )

        if not save_path:
            return

        try:
            # 创建DataFrame
            df = pd.DataFrame(data)

            # 根据文件类型保存
            if save_path.endswith('.csv'):
                df.to_csv(save_path, index=False, encoding='utf-8-sig')
            else:
                df.to_excel(save_path, index=False)

            messagebox.showinfo("成功", f"结果已成功导出到: {save_path}")
            self.status_var.set(f"结果已导出到: {os.path.basename(save_path)}")
        except Exception as e:
            messagebox.showerror("错误", f"导出失败: {str(e)}")

    def preprocess_face(self, face_image):
        """预处理面部图像以用于情绪识别"""
        face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
        face_image = cv2.resize(face_image, (64, 64))
        face_image = np.expand_dims(face_image, axis=0)
        face_image = np.expand_dims(face_image, axis=-1)
        return face_image / 255.0

    def shape_to_np(self, shape, dtype="int"):
        """将dlib形状对象转换为numpy数组"""
        coords = np.zeros((68, 2), dtype=dtype)
        for i in range(0, 68):
            coords[i] = (shape.part(i).x, shape.part(i).y)
        return coords

    def get_face_bounds(self, face):
        """获取面部边界框"""
        x = face.left()
        y = face.top()
        w = face.right() - x
        h = face.bottom() - y
        return (x, y, w, h)

    def rgb_to_hex(self, rgb):
        """将RGB颜色转换为十六进制"""
        if isinstance(rgb, np.ndarray):
            rgb = tuple(rgb.astype(int))
        return '#%02x%02x%02x' % rgb[:3]


if __name__ == "__main__":
    root = tk.Tk()
    app = EnhancedEmotionAnalysisApp(root)
    root.mainloop()

网站公告

今日签到

点亮在社区的每一天
去签到