Python实例题:分布式任务调度系统

发布于:2025-06-19 ⋅ 阅读:(14) ⋅ 点赞:(0)

目录

Python实例题

题目

问题描述

调度中心(Server):

工作节点(Worker):

客户端(Client):

系统架构

关键实现思路

核心代码框架(调度中心部分)

难点分析

扩展方向

Python实例题

题目

分布式任务调度系统

问题描述

设计一个简化版分布式任务调度系统,包含以下组件:

  • 调度中心(Server)

    • 接收客户端任务提交
    • 维护任务队列和工作节点状态
    • 分配任务到空闲节点
    • 存储任务执行结果和日志
  • 工作节点(Worker)

    • 连接调度中心获取任务
    • 执行计算密集型任务(如数学建模、数据处理)
    • 返回任务结果
  • 客户端(Client)

    • 提交任务到调度中心
    • 查询任务状态和结果

系统架构

+----------------+     +----------------+     +----------------+
|    客户端      |<--->|    调度中心    |<--->|    工作节点1    |
+----------------+     +----------------+     +----------------+
                                        ^
                                        |
+----------------+     +----------------+     +----------------+
|    客户端      |<--->|    调度中心    |<--->|    工作节点2    |
+----------------+     +----------------+     +----------------+

关键实现思路

  • 使用Socket实现网络通信(或选择ZeroMQ等更专业的消息队列)
  • SQLite/PostgreSQL存储任务信息和结果
  • 采用multiprocessingconcurrent.futures处理并发任务
  • 设计自定义通信协议(如 JSON 格式消息)
  • 实现心跳机制检测工作节点存活状态
  • 支持任务优先级和超时处理

核心代码框架(调度中心部分)

import socket
import json
import threading
import time
import sqlite3
import queue
from typing import Dict, List, Tuple, Optional

# 通信协议定义
class Protocol:
    # 消息类型
    TYPE_TASK_SUBMIT = "TASK_SUBMIT"    # 提交任务
    TYPE_TASK_ASSIGN = "TASK_ASSIGN"    # 分配任务
    TYPE_TASK_RESULT = "TASK_RESULT"    # 返回结果
    TYPE_WORKER_HEARTBEAT = "HEARTBEAT" # 工作节点心跳
    TYPE_WORKER_REGISTER = "REGISTER"   # 工作节点注册
    
    @staticmethod
    def pack_message(msg_type: str, data: dict) -> bytes:
        """打包消息为JSON格式"""
        message = {"type": msg_type, "data": data, "timestamp": time.time()}
        return json.dumps(message).encode()
    
    @staticmethod
    def unpack_message(msg: bytes) -> dict:
        """解析JSON消息"""
        try:
            return json.loads(msg.decode())
        except:
            return {"type": "ERROR", "data": "Invalid message format"}

# 任务管理类
class TaskManager:
    def __init__(self, db_path="task_system.db"):
        self.task_queue = queue.PriorityQueue()  # 优先级队列,数字越小优先级越高
        self.task_status = {}  # task_id: {"status": "", "worker": "", "result": ""}
        self.worker_nodes = {}  # worker_id: {"address": "", "last_heartbeat": 0, "busy": False}
        self.db_path = db_path
        self._init_database()
    
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS tasks (
            task_id TEXT PRIMARY KEY,
            name TEXT,
            priority INTEGER,
            status TEXT,
            input_data TEXT,
            result TEXT,
            worker_id TEXT,
            create_time REAL,
            update_time REAL
        )
        ''')
        conn.commit()
        conn.close()
    
    def add_task(self, task_id: str, name: str, priority: int, data: dict) -> bool:
        """添加任务到队列"""
        if task_id in self.task_status:
            return False
        
        self.task_status[task_id] = {
            "status": "PENDING",
            "worker": "",
            "result": None,
            "create_time": time.time()
        }
        self.task_queue.put((priority, task_id, name, data))
        
        # 存储到数据库
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO tasks (task_id, name, priority, status, input_data, create_time, update_time) "
            "VALUES (?, ?, ?, ?, ?, ?, ?)",
            (task_id, name, priority, "PENDING", json.dumps(data), time.time(), time.time())
        )
        conn.commit()
        conn.close()
        return True
    
    def get_available_worker(self) -> Optional[str]:
        """获取可用工作节点"""
        current_time = time.time()
        available_workers = []
        
        # 过滤存活且空闲的节点(10秒内有心跳)
        for worker_id, info in self.worker_nodes.items():
            if (not info["busy"]) and (current_time - info["last_heartbeat"] < 10):
                available_workers.append(worker_id)
        
        if available_workers:
            # 简单轮询或选择负载最低的节点
            return available_workers[0]
        return None
    
    def assign_task(self) -> Optional[Tuple[str, str, dict]]:
        """分配任务给工作节点"""
        if self.task_queue.empty():
            return None
        
        # 获取最高优先级任务
        priority, task_id, task_name, task_data = self.task_queue.get()
        
        # 检查任务状态
        if self.task_status[task_id]["status"] != "PENDING":
            return self.assign_task()  # 任务状态已变更,重新分配
        
        # 获取可用工作节点
        worker_id = self.get_available_worker()
        if not worker_id:
            # 无可用节点,将任务放回队列
            self.task_queue.put((priority, task_id, task_name, task_data))
            return None
        
        # 更新任务状态
        self.task_status[task_id] = {
            **self.task_status[task_id],
            "status": "RUNNING",
            "worker": worker_id,
            "update_time": time.time()
        }
        
        # 更新工作节点状态
        self.worker_nodes[worker_id]["busy"] = True
        
        # 更新数据库
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE tasks SET status = ?, worker_id = ?, update_time = ? WHERE task_id = ?",
            ("RUNNING", worker_id, time.time(), task_id)
        )
        conn.commit()
        conn.close()
        
        return (worker_id, task_id, task_data)
    
    def process_result(self, task_id: str, result: dict, worker_id: str) -> bool:
        """处理任务结果"""
        if task_id not in self.task_status:
            return False
        
        # 更新任务状态
        self.task_status[task_id] = {
            **self.task_status[task_id],
            "status": "COMPLETED",
            "result": result,
            "update_time": time.time()
        }
        
        # 释放工作节点
        if worker_id in self.worker_nodes:
            self.worker_nodes[worker_id]["busy"] = False
        
        # 存储结果到数据库
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE tasks SET status = ?, result = ?, worker_id = ?, update_time = ? WHERE task_id = ?",
            ("COMPLETED", json.dumps(result), "", time.time(), task_id)
        )
        conn.commit()
        conn.close()
        return True
    
    def register_worker(self, worker_id: str, address: str) -> bool:
        """注册工作节点"""
        if worker_id in self.worker_nodes:
            return False
        
        self.worker_nodes[worker_id] = {
            "address": address,
            "last_heartbeat": time.time(),
            "busy": False
        }
        return True
    
    def update_heartbeat(self, worker_id: str) -> bool:
        """更新工作节点心跳"""
        if worker_id in self.worker_nodes:
            self.worker_nodes[worker_id]["last_heartbeat"] = time.time()
            return True
        return False

# 调度中心服务器类
class SchedulerServer:
    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port
        self.task_manager = TaskManager()
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.running = False
        self.clients = {}  # client_id: socket
        
    def start(self):
        """启动服务器"""
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(10)
        self.running = True
        print(f"调度中心启动成功,监听地址: {self.host}:{self.port}")
        
        # 启动心跳检测线程
        heartbeat_thread = threading.Thread(target=self._heartbeat_check, daemon=True)
        heartbeat_thread.start()
        
        # 接受客户端连接
        while self.running:
            try:
                client_socket, address = self.server_socket.accept()
                print(f"新连接: {address}")
                client_thread = threading.Thread(
                    target=self._handle_client, 
                    args=(client_socket, address),
                    daemon=True
                )
                client_thread.start()
            except Exception as e:
                print(f"接受连接失败: {e}")
                if self.running:
                    time.sleep(1)
    
    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """处理客户端连接"""
        client_id = f"{address[0]}:{address[1]}"
        self.clients[client_id] = client_socket
        
        try:
            while True:
                # 接收消息
                data = client_socket.recv(4096)
                if not data:
                    break
                
                # 解析消息
                message = Protocol.unpack_message(data)
                msg_type = message.get("type", "UNKNOWN")
                msg_data = message.get("data", {})
                print(f"收到消息 [{client_id}]: {msg_type}")
                
                # 处理不同类型消息
                if msg_type == Protocol.TYPE_WORKER_REGISTER:
                    worker_id = msg_data.get("worker_id")
                    if self.task_manager.register_worker(worker_id, client_id):
                        response = Protocol.pack_message(
                            "REGISTER_SUCCESS", 
                            {"message": "Worker registered successfully"}
                        )
                    else:
                        response = Protocol.pack_message(
                            "REGISTER_FAILED", 
                            {"message": "Worker ID already exists"}
                        )
                    client_socket.send(response)
                
                elif msg_type == Protocol.TYPE_WORKER_HEARTBEAT:
                    worker_id = msg_data.get("worker_id")
                    if self.task_manager.update_heartbeat(worker_id):
                        # 尝试分配任务
                        task = self.task_manager.assign_task()
                        if task:
                            worker_id, task_id, task_data = task
                            response = Protocol.pack_message(
                                Protocol.TYPE_TASK_ASSIGN,
                                {"task_id": task_id, "data": task_data}
                            )
                        else:
                            response = Protocol.pack_message(
                                "NO_TASK_ASSIGN", 
                                {"message": "No task available"}
                            )
                    else:
                        response = Protocol.pack_message(
                            "HEARTBEAT_FAILED", 
                            {"message": "Worker not registered"}
                        )
                    client_socket.send(response)
                
                elif msg_type == Protocol.TYPE_TASK_SUBMIT:
                    task_id = msg_data.get("task_id")
                    task_name = msg_data.get("task_name", "Unknown Task")
                    priority = msg_data.get("priority", 5)  # 默认为中等优先级
                    task_data = msg_data.get("data", {})
                    
                    if self.task_manager.add_task(task_id, task_name, priority, task_data):
                        response = Protocol.pack_message(
                            "TASK_SUBMIT_SUCCESS", 
                            {"task_id": task_id, "message": "Task submitted"}
                        )
                    else:
                        response = Protocol.pack_message(
                            "TASK_SUBMIT_FAILED", 
                            {"message": "Task ID already exists"}
                        )
                    client_socket.send(response)
                
                elif msg_type == Protocol.TYPE_TASK_RESULT:
                    task_id = msg_data.get("task_id")
                    result = msg_data.get("result", {})
                    worker_id = msg_data.get("worker_id")
                    
                    if self.task_manager.process_result(task_id, result, worker_id):
                        response = Protocol.pack_message(
                            "RESULT_RECEIVED", 
                            {"task_id": task_id, "message": "Result received"}
                        )
                    else:
                        response = Protocol.pack_message(
                            "RESULT_FAILED", 
                            {"message": "Invalid task ID"}
                        )
                    client_socket.send(response)
                
                else:
                    response = Protocol.pack_message(
                        "UNKNOWN_MESSAGE", 
                        {"message": "Unknown message type"}
                    )
                    client_socket.send(response)
                    
        except Exception as e:
            print(f"客户端处理错误 [{client_id}]: {e}")
        finally:
            if client_id in self.clients:
                del self.clients[client_id]
            client_socket.close()
            print(f"连接关闭: {client_id}")
    
    def _heartbeat_check(self):
        """定期检查工作节点心跳"""
        while self.running:
            current_time = time.time()
            for worker_id, info in list(self.worker_nodes.items()):
                if current_time - info["last_heartbeat"] > 30:
                    # 节点超时,标记为离线
                    print(f"工作节点 {worker_id} 超时,标记为离线")
                    info["busy"] = False  # 释放可能正在处理的任务
            time.sleep(10)  # 每10秒检查一次
    
    def stop(self):
        """停止服务器"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()
        print("调度中心已停止")

# 使用示例
if __name__ == "__main__":
    server = SchedulerServer(port=9999)
    try:
        server.start()
    except KeyboardInterrupt:
        print("接收到停止信号")
    finally:
        server.stop()

难点分析

  • 分布式系统一致性:多个工作节点和客户端同时操作时确保数据一致性
  • 网络通信可靠性:处理网络延迟、断开重连、消息丢失等问题
  • 任务优先级调度:实现公平且高效的任务分配算法
  • 系统监控与容错:设计完善的监控机制和故障恢复策略
  • 性能优化:处理大量任务时的并发性能瓶颈(如数据库连接池、异步 IO)

扩展方向

  • 添加 Web 管理界面(使用 Flask/Django)
  • 集成 Redis 作为分布式消息队列
  • 支持任务依赖关系(DAG 任务流)
  • 实现资源监控(CPU / 内存 / 磁盘)
  • 增加任务重试和失败转移机制

网站公告

今日签到

点亮在社区的每一天
去签到