Python pyqt+flask做一个简单实用的自动排班系统

发布于:2025-03-20 ⋅ 阅读:(18) ⋅ 点赞:(0)

这是一个基于Flask和PyQt的排班系统,可以将Web界面嵌入到桌面应用程序中。

系统界面:

功能特点:

- 读取员工信息和现有排班表

- 自动生成排班表

- 美观的Web界面

- 独立的桌面应用程序

整体架构:

系统采用前后端分离的架构设计,通过 PyQt5 的 WebEngine 组件将 Web 界面嵌入到桌面应用中。

├── 桌面应用层 (PyQt5)

│   └── WebEngine 视图

├── Web 层 (Flask)

│   ├── 路由控制

│   └── 业务逻辑

└── 数据层

    ├── CSV 数据文件

    └── Excel 导出

核心模块:

主程序模块 (main.py)

  • 负责初始化 PyQt5 应用
  • 集成 Flask 服务器
  • 管理主窗口和 Web 视图

后端服务模块 (app.py)

  • 提供 RESTful API
  • 处理排班算法
  • 管理数据导入导出

前端界面模块 (templates/index.html)

  • 员工列表管理
  • 排班表显示
  • 用户交互处理

核心代码:main.py

import sys
import time
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout
from PyQt5.QtWebEngineWidgets import QWebEngineView
from PyQt5.QtCore import QUrl
from flask import Flask
import threading
import os

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("排班系统")
        self.setGeometry(100, 100, 1200, 800)
        
        # 创建中心部件
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        layout = QVBoxLayout(central_widget)
        
        # 创建Web视图
        self.web_view = QWebEngineView()
        layout.addWidget(self.web_view)
        
        # 启动Flask服务器
        self.start_flask_server()
        
        # 等待服务器启动后加载页面
        time.sleep(1)  # 给服务器一点启动时间
        self.web_view.setUrl(QUrl("http://127.0.0.1:3863"))
        
    def start_flask_server(self):
        # 在新线程中启动Flask服务器
        threading.Thread(target=self.run_flask, daemon=True).start()
        
    def run_flask(self):
        from app import app
        app.run(host='127.0.0.1', port=3863)

def main():
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    main() 

核心代码:app.py

from flask import Flask, render_template, request, jsonify, send_file
import pandas as pd
from datetime import datetime, timedelta
import calendar
import json
import numpy as np
import os

app = Flask(__name__)

# 班次定义
SHIFTS = {
    '白班': 'D',
    '晚班': 'N',
    '休息': 'R'
}

# 读取员工数据
def load_employee_data():
    try:
        df = pd.read_csv('Employee.csv', encoding='utf-8')
        # 只返回员工姓名列
        return pd.DataFrame({'name': df["Employee'sName"]})
    except Exception as e:
        print(f"Error loading employee data: {e}")
        return pd.DataFrame({'name': []})

# 读取排班表
def load_schedule():
    try:
        df = pd.read_excel('客户服务部排班表20250301-20250331.xls')
        return df
    except Exception as e:
        print(f"Error loading schedule: {e}")
        return pd.DataFrame()

def get_month_calendar(year, month):
    cal = calendar.monthcalendar(year, month)
    return cal

def generate_monthly_schedule(employees, year, month):
    num_days = calendar.monthrange(year, month)[1]
    num_employees = len(employees)
    
    # 将employees列表转换为numpy数组
    employees_array = np.array(employees)
    
    # 创建排班表
    schedule = pd.DataFrame(index=employees, columns=range(1, num_days + 1))
    schedule.fillna('R', inplace=True)  # 默认全部休息
    
    # 为每一天分配班次
    for day in range(1, num_days + 1):
        # 确保每天有足够的白班和晚班
        day_employees = employees_array.copy()
        np.random.shuffle(day_employees)
        
        # 分配白班(约40%的员工)
        day_shifts = int(num_employees * 0.4)
        schedule.loc[day_employees[:day_shifts], day] = 'D'
        
        # 分配晚班(约30%的员工)
        night_shifts = int(num_employees * 0.3)
        schedule.loc[day_employees[day_shifts:day_shifts+night_shifts], day] = 'N'
    
    # 确保每周至少休息两天
    for employee in employees:
        for week in range(0, num_days, 7):
            week_schedule = schedule.loc[employee, week+1:min(week+7, num_days)]
            rest_days = (week_schedule == 'R').sum()
            if rest_days < 2:
                work_days = list(week_schedule[week_schedule != 'R'].index)
                if work_days:  # 确保有工作日可以调整
                    np.random.shuffle(work_days)
                    for i in range(min(2-rest_days, len(work_days))):
                        schedule.loc[employee, work_days[i]] = 'R'
    
    return schedule

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/api/employees')
def get_employees():
    df = load_employee_data()
    return jsonify(df.to_dict('records'))

@app.route('/api/calendar/<int:year>/<int:month>')
def get_calendar(year, month):
    cal = get_month_calendar(year, month)
    return jsonify(cal)

@app.route('/api/generate_schedule', methods=['POST'])
def generate_schedule():
    try:
        data = request.get_json()
        year = data.get('year', 2025)
        month = data.get('month', 1)
        selected_employees = data.get('employees', [])
        
        if not selected_employees:
            return jsonify({"status": "error", "message": "请选择员工"})
            
        schedule = generate_monthly_schedule(selected_employees, year, month)
        
        # 将DataFrame转换为字典格式
        schedule_dict = {}
        for employee in selected_employees:
            schedule_dict[employee] = schedule.loc[employee].to_dict()
            
        return jsonify({
            "status": "success",
            "schedule": schedule_dict,
            "message": "排班表生成成功"
        })
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)})

@app.route('/api/export_schedule', methods=['POST'])
def export_schedule():
    try:
        data = request.get_json()
        year = data.get('year', 2025)
        month = data.get('month', 1)
        schedule_data = data.get('schedule', {})
        
        # 创建新的排班表
        df = pd.DataFrame.from_dict(schedule_data, orient='index')
        
        # 设置列名为日期
        df.columns = [str(i) for i in range(1, len(df.columns) + 1)]
        
        # 重置索引,将员工名称作为一列
        df.reset_index(inplace=True)
        df.rename(columns={'index': '姓名'}, inplace=True)
        
        # 保存文件
        output_file = f'客户服务部排班表{year}{month:02d}01-{year}{month:02d}{calendar.monthrange(year, month)[1]}.xlsx'
        
        # 使用 openpyxl 引擎保存为 xlsx 格式
        df.to_excel(output_file, index=False, engine='openpyxl')
        
        # 返回文件下载路径
        return send_file(
            output_file,
            as_attachment=True,
            download_name=output_file,
            mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
        )
    except Exception as e:
        print(f"Export error: {str(e)}")  # 添加错误日志
        return jsonify({"status": "error", "message": f"导出失败: {str(e)}"})

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=3863, debug=True)