Python实例题:基于 Flask 的任务管理系统

发布于:2025-07-03 ⋅ 阅读:(28) ⋅ 点赞:(0)

目录

Python实例题

题目

要求:

解题思路:

代码实现:

Python实例题

题目

基于 Flask 的任务管理系统

要求

  • 使用 Flask 框架构建一个任务管理系统,支持以下功能:
    • 用户认证(注册、登录、注销)
    • 任务的创建、编辑、删除和标记完成
    • 任务分类和优先级管理
    • 任务截止日期和提醒功能
    • 任务统计和可视化分析
  • 使用 SQLite 数据库存储用户、任务等信息。
  • 添加美观的前端界面,支持响应式设计。

解题思路

  • 使用 Flask 蓝图组织代码结构。
  • 通过 Flask-Login 处理用户认证。
  • 使用 Flask-SQLAlchemy 管理数据库操作。
  • 结合 Bootstrap 和 Chart.js 实现数据可视化。

代码实现

from flask import Flask, render_template, request, redirect, url_for, flash, Blueprint
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
import os

# 初始化 Flask 应用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(app.root_path, 'task_manager.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# 初始化数据库
db = SQLAlchemy(app)

# 初始化 Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'auth.login'

# 数据模型
class User(UserMixin, db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password_hash = db.Column(db.String(120), nullable=False)
    tasks = db.relationship('Task', backref='user', lazy=True)
    
    def set_password(self, password):
        self.password_hash = generate_password_hash(password)
    
    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

class Category(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), nullable=False)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    tasks = db.relationship('Task', backref='category', lazy=True)
    
    def __repr__(self):
        return self.name

class Task(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    description = db.Column(db.Text)
    created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
    due_date = db.Column(db.DateTime)
    completed = db.Column(db.Boolean, default=False)
    priority = db.Column(db.Integer, default=1)  # 1: 低, 2: 中, 3: 高
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    category_id = db.Column(db.Integer, db.ForeignKey('category.id'))
    
    def is_overdue(self):
        return self.due_date and not self.completed and datetime.now() > self.due_date
    
    def is_due_soon(self):
        return self.due_date and not self.completed and datetime.now() < self.due_date and \
               self.due_date - datetime.now() <= timedelta(days=1)

# 用户加载回调
@login_manager.user_loader
def load_user(user_id):
    return User.query.get(int(user_id))

# 认证蓝图
auth = Blueprint('auth', __name__)

@auth.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        if User.query.filter_by(username=username).first():
            flash('用户名已存在', 'error')
            return redirect(url_for('auth.register'))
        
        user = User(username=username)
        user.set_password(password)
        db.session.add(user)
        db.session.commit()
        
        # 为新用户创建默认分类
        default_category = Category(name='默认', user_id=user.id)
        db.session.add(default_category)
        db.session.commit()
        
        flash('注册成功,请登录', 'success')
        return redirect(url_for('auth.login'))
    
    return render_template('register.html')

@auth.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        user = User.query.filter_by(username=username).first()
        
        if user and user.check_password(password):
            login_user(user)
            flash('登录成功', 'success')
            return redirect(url_for('main.dashboard'))
        else:
            flash('用户名或密码错误', 'error')
    
    return render_template('login.html')

@auth.route('/logout')
@login_required
def logout():
    logout_user()
    flash('已注销', 'success')
    return redirect(url_for('auth.login'))

# 主蓝图
main = Blueprint('main', __name__)

@main.route('/')
@login_required
def dashboard():
    # 获取用户的任务统计数据
    total_tasks = Task.query.filter_by(user_id=current_user.id).count()
    completed_tasks = Task.query.filter_by(user_id=current_user.id, completed=True).count()
    pending_tasks = total_tasks - completed_tasks
    
    # 获取即将到期的任务
    due_soon_tasks = Task.query.filter_by(user_id=current_user.id, completed=False) \
                         .filter(Task.due_date <= datetime.now() + timedelta(days=1)) \
                         .order_by(Task.due_date).all()
    
    # 获取逾期任务
    overdue_tasks = Task.query.filter_by(user_id=current_user.id, completed=False) \
                       .filter(Task.due_date < datetime.now()) \
                       .order_by(Task.due_date).all()
    
    # 获取最近添加的任务
    recent_tasks = Task.query.filter_by(user_id=current_user.id) \
                      .order_by(Task.created_at.desc()).limit(5).all()
    
    # 获取分类统计
    categories = Category.query.filter_by(user_id=current_user.id).all()
    category_stats = []
    
    for category in categories:
        total = Task.query.filter_by(user_id=current_user.id, category_id=category.id).count()
        completed = Task.query.filter_by(user_id=current_user.id, category_id=category.id, completed=True).count()
        
        if total > 0:
            completion_rate = (completed / total) * 100
        else:
            completion_rate = 0
            
        category_stats.append({
            'category': category,
            'total': total,
            'completed': completed,
            'completion_rate': completion_rate
        })
    
    return render_template('dashboard.html', 
                           total_tasks=total_tasks,
                           completed_tasks=completed_tasks,
                           pending_tasks=pending_tasks,
                           due_soon_tasks=due_soon_tasks,
                           overdue_tasks=overdue_tasks,
                           recent_tasks=recent_tasks,
                           category_stats=category_stats)

@main.route('/tasks')
@login_required
def tasks():
    # 获取过滤参数
    status = request.args.get('status', '')
    category = request.args.get('category', '')
    priority = request.args.get('priority', '')
    sort = request.args.get('sort', 'due_date')
    order = request.args.get('order', 'asc')
    
    # 构建查询
    query = Task.query.filter_by(user_id=current_user.id)
    
    if status == 'completed':
        query = query.filter_by(completed=True)
    elif status == 'pending':
        query = query.filter_by(completed=False)
    
    if category and category != 'all':
        query = query.filter_by(category_id=category)
    
    if priority and priority != 'all':
        query = query.filter_by(priority=int(priority))
    
    # 排序
    if sort == 'due_date':
        if order == 'asc':
            query = query.order_by(Task.due_date.asc())
        else:
            query = query.order_by(Task.due_date.desc())
    elif sort == 'priority':
        if order == 'asc':
            query = query.order_by(Task.priority.asc())
        else:
            query = query.order_by(Task.priority.desc())
    elif sort == 'created_at':
        if order == 'asc':
            query = query.order_by(Task.created_at.asc())
        else:
            query = query.order_by(Task.created_at.desc())
    
    tasks = query.all()
    
    # 获取用户的分类
    categories = Category.query.filter_by(user_id=current_user.id).all()
    
    return render_template('tasks.html', tasks=tasks, categories=categories,
                           status=status, category=category, priority=priority,
                           sort=sort, order=order)

@main.route('/task/add', methods=['GET', 'POST'])
@login_required
def add_task():
    categories = Category.query.filter_by(user_id=current_user.id).all()
    
    if request.method == 'POST':
        title = request.form['title']
        description = request.form.get('description', '')
        due_date_str = request.form.get('due_date', '')
        category_id = request.form.get('category', '')
        priority = request.form.get('priority', 1)
        
        if not title:
            flash('任务标题不能为空', 'error')
            return redirect(url_for('main.add_task'))
        
        due_date = None
        if due_date_str:
            try:
                due_date = datetime.strptime(due_date_str, '%Y-%m-%d')
            except ValueError:
                flash('日期格式不正确,请使用 YYYY-MM-DD 格式', 'error')
                return redirect(url_for('main.add_task'))
        
        task = Task(
            title=title,
            description=description,
            due_date=due_date,
            priority=priority,
            user_id=current_user.id,
            category_id=category_id if category_id else None
        )
        
        db.session.add(task)
        db.session.commit()
        
        flash('任务已添加', 'success')
        return redirect(url_for('main.tasks'))
    
    return render_template('task_form.html', categories=categories, is_edit=False)

@main.route('/task/<int:task_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_task(task_id):
    task = Task.query.get_or_404(task_id)
    
    # 确保用户只能编辑自己的任务
    if task.user_id != current_user.id:
        flash('权限不足', 'error')
        return redirect(url_for('main.tasks'))
    
    categories = Category.query.filter_by(user_id=current_user.id).all()
    
    if request.method == 'POST':
        task.title = request.form['title']
        task.description = request.form.get('description', '')
        due_date_str = request.form.get('due_date', '')
        task.category_id = request.form.get('category', '')
        task.priority = request.form.get('priority', 1)
        
        if not task.title:
            flash('任务标题不能为空', 'error')
            return redirect(url_for('main.edit_task', task_id=task_id))
        
        task.due_date = None
        if due_date_str:
            try:
                task.due_date = datetime.strptime(due_date_str, '%Y-%m-%d')
            except ValueError:
                flash('日期格式不正确,请使用 YYYY-MM-DD 格式', 'error')
                return redirect(url_for('main.edit_task', task_id=task_id))
        
        db.session.commit()
        
        flash('任务已更新', 'success')
        return redirect(url_for('main.tasks'))
    
    return render_template('task_form.html', task=task, categories=categories, is_edit=True)

@main.route('/task/<int:task_id>/toggle', methods=['POST'])
@login_required
def toggle_task(task_id):
    task = Task.query.get_or_404(task_id)
    
    # 确保用户只能操作自己的任务
    if task.user_id != current_user.id:
        flash('权限不足', 'error')
        return redirect(url_for('main.tasks'))
    
    task.completed = not task.completed
    db.session.commit()
    
    flash('任务状态已更新', 'success')
    return redirect(url_for('main.tasks'))

@main.route('/task/<int:task_id>/delete', methods=['POST'])
@login_required
def delete_task(task_id):
    task = Task.query.get_or_404(task_id)
    
    # 确保用户只能删除自己的任务
    if task.user_id != current_user.id:
        flash('权限不足', 'error')
        return redirect(url_for('main.tasks'))
    
    db.session.delete(task)
    db.session.commit()
    
    flash('任务已删除', 'success')
    return redirect(url_for('main.tasks'))

@main.route('/categories')
@login_required
def categories():
    categories = Category.query.filter_by(user_id=current_user.id).all()
    return render_template('categories.html', categories=categories)

@main.route('/category/add', methods=['POST'])
@login_required
def add_category():
    name = request.form['name']
    
    if not name:
        flash('分类名称不能为空', 'error')
        return redirect(url_for('main.categories'))
    
    if Category.query.filter_by(user_id=current_user.id, name=name).first():
        flash('分类名称已存在', 'error')
        return redirect(url_for('main.categories'))
    
    category = Category(name=name, user_id=current_user.id)
    db.session.add(category)
    db.session.commit()
    
    flash('分类已添加', 'success')
    return redirect(url_for('main.categories'))

@main.route('/category/<int:category_id>/edit', methods=['POST'])
@login_required
def edit_category(category_id):
    category = Category.query.get_or_404(category_id)
    
    # 确保用户只能编辑自己的分类
    if category.user_id != current_user.id:
        flash('权限不足', 'error')
        return redirect(url_for('main.categories'))
    
    name = request.form['name']
    
    if not name:
        flash('分类名称不能为空', 'error')
        return redirect(url_for('main.categories'))
    
    if Category.query.filter_by(user_id=current_user.id, name=name).first() and name != category.name:
        flash('分类名称已存在', 'error')
        return redirect(url_for('main.categories'))
    
    category.name = name
    db.session.commit()
    
    flash('分类已更新', 'success')
    return redirect(url_for('main.categories'))

@main.route('/category/<int:category_id>/delete', methods=['POST'])
@login_required
def delete_category(category_id):
    category = Category.query.get_or_404(category_id)
    
    # 确保用户只能删除自己的分类
    if category.user_id != current_user.id:
        flash('权限不足', 'error')
        return redirect(url_for('main.categories'))
    
    # 检查是否有任务属于该分类
    if Task.query.filter_by(user_id=current_user.id, category_id=category_id).count() > 0:
        flash('该分类下有任务,不能删除', 'error')
        return redirect(url_for('main.categories'))
    
    db.session.delete(category)
    db.session.commit()
    
    flash('分类已删除', 'success')
    return redirect(url_for('main.categories'))

@main.route('/analytics')
@login_required
def analytics():
    # 获取任务统计数据
    total_tasks = Task.query.filter_by(user_id=current_user.id).count()
    completed_tasks = Task.query.filter_by(user_id=current_user.id, completed=True).count()
    pending_tasks = total_tasks - completed_tasks
    
    # 按优先级统计
    priority_stats = {}
    for priority in [1, 2, 3]:
        total = Task.query.filter_by(user_id=current_user.id, priority=priority).count()
        completed = Task.query.filter_by(user_id=current_user.id, priority=priority, completed=True).count()
        pending_tasks = total - completed
        
        if total > 0:
            completion_rate = (completed / total) * 100
        else:
            completion_rate = 0
            
        priority_stats[priority] = {
            'total': total,
            'completed': completed,
            'pending': pending_tasks,
            'completion_rate': completion_rate
        }
    
    # 按分类统计
    categories = Category.query.filter_by(user_id=current_user.id).all()
    category_stats = []
    
    for category in categories:
        total = Task.query.filter_by(user_id=current_user.id, category_id=category.id).count()
        completed = Task.query.filter_by(user_id=current_user.id, category_id=category.id, completed=True).count()
        
        if total > 0:
            completion_rate = (completed / total) * 100
        else:
            completion_rate = 0
            
        category_stats.append({
            'category': category,
            'total': total,
            'completed': completed,
            'completion_rate': completion_rate
        })
    
    # 最近7天的任务完成情况
    today = datetime.now().date()
    last_7_days = [(today - timedelta(days=i)) for i in range(6, -1, -1)]
    daily_stats = []
    
    for day in last_7_days:
        start_time = datetime.combine(day, datetime.min.time())
        end_time = datetime.combine(day, datetime.max.time())
        
        created = Task.query.filter_by(user_id=current_user.id) \
                     .filter(Task.created_at.between(start_time, end_time)).count()
        
        completed = Task.query.filter_by(user_id=current_user.id, completed=True) \
                       .filter(Task.due_date.between(start_time, end_time)).count()
        
        daily_stats.append({
            'date': day,
            'created': created,
            'completed': completed
        })
    
    return render_template('analytics.html', 
                           total_tasks=total_tasks,
                           completed_tasks=completed_tasks,
                           pending_tasks=pending_tasks,
                           priority_stats=priority_stats,
                           category_stats=category_stats,
                           daily_stats=daily_stats)

# 注册蓝图
app.register_blueprint(main)
app.register_blueprint(auth)

# 创建数据库表
with app.app_context():
    db.create_all()

if __name__ == '__main__':
    app.run(debug=True)

网站公告

今日签到

点亮在社区的每一天
去签到