第五章:自动化脚本开发

发布于:2025-09-14 ⋅ 阅读:(20) ⋅ 点赞:(0)

5.1 文件系统操作

文件和目录管理

import os
import shutil
import glob
from pathlib import Path

class FileManager {
    func __init__(base_path: str = ".") {
        self.base_path = Path(base_path).resolve()
        self.operations_log = []
    }
    
    func create_directory(path: str, parents: bool = true) -> bool {
        """创建目录"""
        try {
            dir_path = self.base_path / path
            dir_path.mkdir(parents=parents, exist_ok=true)
            self._log_operation(f"创建目录: {dir_path}")
            return true
        } catch Exception as e {
            self._log_operation(f"创建目录失败: {path}, 错误: {e}")
            return false
        }
    }
    
    func copy_file(source: str, destination: str) -> bool {
        """复制文件"""
        try {
            src_path = self.base_path / source
            dst_path = self.base_path / destination
            
            # 确保目标目录存在
            dst_path.parent.mkdir(parents=true, exist_ok=true)
            
            shutil.copy2(src_path, dst_path)
            self._log_operation(f"复制文件: {src_path} -> {dst_path}")
            return true
        } catch Exception as e {
            self._log_operation(f"复制文件失败: {source} -> {destination}, 错误: {e}")
            return false
        }
    }
    
    func move_file(source: str, destination: str) -> bool {
        """移动文件"""
        try {
            src_path = self.base_path / source
            dst_path = self.base_path / destination
            
            # 确保目标目录存在
            dst_path.parent.mkdir(parents=true, exist_ok=true)
            
            shutil.move(str(src_path), str(dst_path))
            self._log_operation(f"移动文件: {src_path} -> {dst_path}")
            return true
        } catch Exception as e {
            self._log_operation(f"移动文件失败: {source} -> {destination}, 错误: {e}")
            return false
        }
    }
    
    func delete_file(path: str) -> bool {
        """删除文件"""
        try {
            file_path = self.base_path / path
            if file_path.is_file() {
                file_path.unlink()
                self._log_operation(f"删除文件: {file_path}")
                return true
            } else {
                self._log_operation(f"文件不存在: {file_path}")
                return false
            }
        } catch Exception as e {
            self._log_operation(f"删除文件失败: {path}, 错误: {e}")
            return false
        }
    }
    
    func delete_directory(path: str, recursive: bool = false) -> bool {
        """删除目录"""
        try {
            dir_path = self.base_path / path
            if dir_path.is_dir() {
                if recursive {
                    shutil.rmtree(dir_path)
                } else {
                    dir_path.rmdir()
                }
                self._log_operation(f"删除目录: {dir_path}")
                return true
            } else {
                self._log_operation(f"目录不存在: {dir_path}")
                return false
            }
        } catch Exception as e {
            self._log_operation(f"删除目录失败: {path}, 错误: {e}")
            return false
        }
    }
    
    func find_files(pattern: str, recursive: bool = true) -> list[str] {
        """查找文件"""
        try {
            if recursive {
                search_pattern = f"**/{pattern}"
            } else {
                search_pattern = pattern
            }
            
            files = list(self.base_path.glob(search_pattern))
            file_paths = [str(f.relative_to(self.base_path)) for f in files if f.is_file()]
            
            self._log_operation(f"查找文件: {pattern}, 找到 {len(file_paths)} 个文件")
            return file_paths
        } catch Exception as e {
            self._log_operation(f"查找文件失败: {pattern}, 错误: {e}")
            return []
        }
    }
    
    func get_file_info(path: str) -> dict? {
        """获取文件信息"""
        try {
            file_path = self.base_path / path
            if not file_path.exists() {
                return null
            }
            
            stat = file_path.stat()
            return {
                "path": str(file_path),
                "size": stat.st_size,
                "created": stat.st_ctime,
                "modified": stat.st_mtime,
                "is_file": file_path.is_file(),
                "is_directory": file_path.is_dir(),
                "permissions": oct(stat.st_mode)[-3:]
            }
        } catch Exception as e {
            self._log_operation(f"获取文件信息失败: {path}, 错误: {e}")
            return null
        }
    }
    
    func _log_operation(message: str) {
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        self.operations_log.append(f"[{timestamp}] {message}")
    }
    
    func get_operations_log() -> list[str] {
        return self.operations_log.copy()
    }
}

# 使用文件管理器
file_manager = FileManager("/path/to/workspace")

# 创建目录结构
file_manager.create_directory("projects/ai_script")
file_manager.create_directory("data/raw")
file_manager.create_directory("data/processed")

# 查找Python文件
python_files = file_manager.find_files("*.py")
print(f"找到 {len(python_files)} 个Python文件")

# 批量操作示例
for file in python_files[:5] {  # 只处理前5个文件
    info = file_manager.get_file_info(file)
    if info and info["size"] > 1024 {  # 大于1KB的文件
        backup_path = f"backup/{file}"
        file_manager.copy_file(file, backup_path)
    }
}

文件内容处理

import re
import json
import csv
import yaml

class FileProcessor {
    func __init__() {
        self.encoding = "utf-8"
        self.processed_files = []
    }
    
    func read_text_file(filepath: str) -> str? {
        """读取文本文件"""
        try {
            with open(filepath, "r", encoding=self.encoding) as f {
                content = f.read()
            self.processed_files.append(filepath)
            return content
        } catch Exception as e {
            print(f"读取文件失败: {filepath}, 错误: {e}")
            return null
        }
    }
    
    func write_text_file(filepath: str, content: str) -> bool {
        """写入文本文件"""
        try {
            # 确保目录存在
            Path(filepath).parent.mkdir(parents=true, exist_ok=true)
            
            with open(filepath, "w", encoding=self.encoding) as f {
                f.write(content)
            self.processed_files.append(filepath)
            return true
        } catch Exception as e {
            print(f"写入文件失败: {filepath}, 错误: {e}")
            return false
        }
    }
    
    func read_json_file(filepath: str) -> dict? {
        """读取JSON文件"""
        try {
            with open(filepath, "r", encoding=self.encoding) as f {
                data = json.load(f)
            return data
        } catch Exception as e {
            print(f"读取JSON文件失败: {filepath}, 错误: {e}")
            return null
        }
    }
    
    func write_json_file(filepath: str, data: dict, indent: int = 2) -> bool {
        """写入JSON文件"""
        try {
            Path(filepath).parent.mkdir(parents=true, exist_ok=true)
            
            with open(filepath, "w", encoding=self.encoding) as f {
                json.dump(data, f, indent=indent, ensure_ascii=false)
            return true
        } catch Exception as e {
            print(f"写入JSON文件失败: {filepath}, 错误: {e}")
            return false
        }
    }
    
    func read_csv_file(filepath: str, delimiter: str = ",") -> list[dict]? {
        """读取CSV文件"""
        try {
            data = []
            with open(filepath, "r", encoding=self.encoding) as f {
                reader = csv.DictReader(f, delimiter=delimiter)
                for row in reader {
                    data.append(dict(row))
                }
            return data
        } catch Exception as e {
            print(f"读取CSV文件失败: {filepath}, 错误: {e}")
            return null
        }
    }
    
    func write_csv_file(
        filepath: str, 
        data: list[dict], 
        fieldnames: list[str]? = null,
        delimiter: str = ","
    ) -> bool {
        """写入CSV文件"""
        try {
            if not data {
                return false
            }
            
            if fieldnames is null {
                fieldnames = list(data[0].keys())
            }
            
            Path(filepath).parent.mkdir(parents=true, exist_ok=true)
            
            with open(filepath, "w", newline="", encoding=self.encoding) as f {
                writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=delimiter)
                writer.writeheader()
                writer.writerows(data)
            return true
        } catch Exception as e {
            print(f"写入CSV文件失败: {filepath}, 错误: {e}")
            return false
        }
    }
    
    func process_text_batch(
        input_pattern: str,
        output_dir: str,
        processor_func: callable,
        file_extension: str = ".txt"
    ) -> int {
        """批量处理文本文件"""
        processed_count = 0
        
        for filepath in glob.glob(input_pattern) {
            try {
                # 读取文件
                content = self.read_text_file(filepath)
                if content is null {
                    continue
                }
                
                # 处理内容
                processed_content = processor_func(content)
                
                # 生成输出文件路径
                filename = Path(filepath).stem
                output_path = Path(output_dir) / f"{filename}_processed{file_extension}"
                
                # 写入处理后的内容
                if self.write_text_file(str(output_path), processed_content) {
                    processed_count += 1
                    print(f"处理完成: {filepath} -> {output_path}")
                }
                
            } catch Exception as e {
                print(f"处理文件失败: {filepath}, 错误: {e}")
            }
        }
        
        return processed_count
    }
    
    func search_and_replace(
        filepath: str,
        search_pattern: str,
        replacement: str,
        use_regex: bool = false
    ) -> bool {
        """搜索和替换文件内容"""
        try {
            content = self.read_text_file(filepath)
            if content is null {
                return false
            }
            
            if use_regex {
                modified_content = re.sub(search_pattern, replacement, content)
            } else {
                modified_content = content.replace(search_pattern, replacement)
            }
            
            return self.write_text_file(filepath, modified_content)
        } catch Exception as e {
            print(f"搜索替换失败: {filepath}, 错误: {e}")
            return false
        }
    }
}

# 使用文件处理器
processor = FileProcessor()

# 文本处理函数示例
func clean_text(text: str) -> str {
    # 移除多余空白
    text = re.sub(r'\s+', ' ', text)
    # 移除特殊字符
    text = re.sub(r'[^\w\s.,!?]', '', text)
    return text.strip()
}

func extract_emails(text: str) -> str {
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    emails = re.findall(email_pattern, text)
    return '\n'.join(emails)
}

# 批量处理示例
processed_count = processor.process_text_batch(
    "data/raw/*.txt",
    "data/processed",
    clean_text
)
print(f"处理了 {processed_count} 个文件")

# 配置文件处理示例
config_data = {
    "database": {
        "host": "localhost",
        "port": 5432,
        "name": "ai_script_db"
    },
    "ai_models": {
        "text_model": "gpt-4",
        "embedding_model": "text-embedding-ada-002"
    }
}

processor.write_json_file("config/settings.json", config_data)

5.2 系统管理与监控

进程管理

import subprocess
import psutil
import signal
import threading

class ProcessManager {
    func __init__() {
        self.running_processes = {}
        self.process_logs = {}
    }
    
    func start_process(
        command: list[str],
        name: str? = null,
        working_dir: str? = null,
        env_vars: dict? = null,
        capture_output: bool = true
    ) -> str? {
        """启动进程"""
        try {
            if name is null {
                name = f"process_{len(self.running_processes)}"
            }
            
            # 准备环境变量
            env = os.environ.copy()
            if env_vars {
                env.update(env_vars)
            }
            
            # 启动进程
            if capture_output {
                process = subprocess.Popen(
                    command,
                    cwd=working_dir,
                    env=env,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    text=true
                )
            } else {
                process = subprocess.Popen(
                    command,
                    cwd=working_dir,
                    env=env
                )
            }
            
            self.running_processes[name] = {
                "process": process,
                "command": command,
                "start_time": time.time(),
                "capture_output": capture_output
            }
            
            print(f"进程 '{name}' 已启动,PID: {process.pid}")
            return name
            
        } catch Exception as e {
            print(f"启动进程失败: {e}")
            return null
        }
    }
    
    func stop_process(name: str, timeout: int = 10) -> bool {
        """停止进程"""
        if name not in self.running_processes {
            print(f"进程 '{name}' 不存在")
            return false
        }
        
        try {
            process_info = self.running_processes[name]
            process = process_info["process"]
            
            # 尝试优雅关闭
            process.terminate()
            
            try {
                process.wait(timeout=timeout)
            } catch subprocess.TimeoutExpired {
                # 强制关闭
                process.kill()
                process.wait()
            }
            
            # 获取输出
            if process_info["capture_output"] {
                stdout, stderr = process.communicate()
                self.process_logs[name] = {
                    "stdout": stdout,
                    "stderr": stderr,
                    "return_code": process.returncode
                }
            }
            
            del self.running_processes[name]
            print(f"进程 '{name}' 已停止")
            return true
            
        } catch Exception as e {
            print(f"停止进程失败: {e}")
            return false
        }
    }
    
    func get_process_status(name: str) -> dict? {
        """获取进程状态"""
        if name not in self.running_processes {
            return null
        }
        
        process_info = self.running_processes[name]
        process = process_info["process"]
        
        try {
            # 使用psutil获取详细信息
            ps_process = psutil.Process(process.pid)
            
            return {
                "name": name,
                "pid": process.pid,
                "status": ps_process.status(),
                "cpu_percent": ps_process.cpu_percent(),
                "memory_percent": ps_process.memory_percent(),
                "memory_info": ps_process.memory_info()._asdict(),
                "create_time": ps_process.create_time(),
                "running_time": time.time() - process_info["start_time"],
                "command": process_info["command"]
            }
        } catch psutil.NoSuchProcess {
            # 进程已结束
            del self.running_processes[name]
            return null
        } catch Exception as e {
            print(f"获取进程状态失败: {e}")
            return null
        }
    }
    
    func list_processes() -> list[dict] {
        """列出所有管理的进程"""
        processes = []
        for name in list(self.running_processes.keys()) {
            status = self.get_process_status(name)
            if status {
                processes.append(status)
            }
        }
        return processes
    }
    
    func get_process_output(name: str) -> dict? {
        """获取进程输出"""
        return self.process_logs.get(name)
    }
    
    func monitor_process(name: str, callback: callable? = null) {
        """监控进程"""
        func monitor_loop() {
            while name in self.running_processes {
                status = self.get_process_status(name)
                if status {
                    if callback {
                        callback(status)
                    }
                    time.sleep(5)  # 每5秒检查一次
                } else {
                    break
            }
        }
        
        monitor_thread = threading.Thread(target=monitor_loop)
        monitor_thread.daemon = true
        monitor_thread.start()
    }
}

# 使用进程管理器
process_manager = ProcessManager()

# 启动一个长期运行的进程
process_name = process_manager.start_process(
    ["python", "-m", "http.server", "8000"],
    name="web_server",
    working_dir="/path/to/web/root"
)

if process_name {
    # 监控进程
    func log_process_status(status: dict) {
        print(f"进程 {status['name']}: CPU {status['cpu_percent']:.1f}%, 内存 {status['memory_percent']:.1f}%")
    }
    
    process_manager.monitor_process(process_name, log_process_status)
    
    # 等待一段时间后停止
    time.sleep(30)
    process_manager.stop_process(process_name)
}

系统监控

import psutil
import platform
import socket

class SystemMonitor {
    func __init__() {
        self.monitoring = false
        self.alerts = []
        self.thresholds = {
            "cpu_percent": 80.0,
            "memory_percent": 85.0,
            "disk_percent": 90.0,
            "temperature": 70.0
        }
    }
    
    func get_system_info() -> dict {
        """获取系统基本信息"""
        return {
            "platform": platform.platform(),
            "processor": platform.processor(),
            "architecture": platform.architecture(),
            "hostname": socket.gethostname(),
            "python_version": platform.python_version(),
            "boot_time": psutil.boot_time()
        }
    }
    
    func get_cpu_info() -> dict {
        """获取CPU信息"""
        return {
            "physical_cores": psutil.cpu_count(logical=false),
            "total_cores": psutil.cpu_count(logical=true),
            "max_frequency": psutil.cpu_freq().max if psutil.cpu_freq() else null,
            "current_frequency": psutil.cpu_freq().current if psutil.cpu_freq() else null,
            "cpu_percent": psutil.cpu_percent(interval=1),
            "per_cpu_percent": psutil.cpu_percent(interval=1, percpu=true)
        }
    }
    
    func get_memory_info() -> dict {
        """获取内存信息"""
        virtual_memory = psutil.virtual_memory()
        swap_memory = psutil.swap_memory()
        
        return {
            "total": virtual_memory.total,
            "available": virtual_memory.available,
            "used": virtual_memory.used,
            "percentage": virtual_memory.percent,
            "swap_total": swap_memory.total,
            "swap_used": swap_memory.used,
            "swap_percentage": swap_memory.percent
        }
    }
    
    func get_disk_info() -> list[dict] {
        """获取磁盘信息"""
        disk_info = []
        
        for partition in psutil.disk_partitions() {
            try {
                usage = psutil.disk_usage(partition.mountpoint)
                disk_info.append({
                    "device": partition.device,
                    "mountpoint": partition.mountpoint,
                    "file_system": partition.fstype,
                    "total": usage.total,
                    "used": usage.used,
                    "free": usage.free,
                    "percentage": (usage.used / usage.total) * 100
                })
            } catch PermissionError {
                continue
            }
        }
        
        return disk_info
    }
    
    func get_network_info() -> dict {
        """获取网络信息"""
        network_io = psutil.net_io_counters()
        network_connections = len(psutil.net_connections())
        
        return {
            "bytes_sent": network_io.bytes_sent,
            "bytes_received": network_io.bytes_recv,
            "packets_sent": network_io.packets_sent,
            "packets_received": network_io.packets_recv,
            "connections": network_connections
        }
    }
    
    func get_process_list(limit: int = 10) -> list[dict] {
        """获取进程列表"""
        processes = []
        
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']) {
            try {
                processes.append(proc.info)
            } catch (psutil.NoSuchProcess, psutil.AccessDenied) {
                continue
            }
        }
        
        # 按CPU使用率排序
        processes.sort(key=lambda x: x['cpu_percent'] or 0, reverse=true)
        return processes[:limit]
    }
    
    func check_alerts() -> list[str] {
        """检查系统警告"""
        alerts = []
        
        # CPU检查
        cpu_info = self.get_cpu_info()
        if cpu_info["cpu_percent"] > self.thresholds["cpu_percent"] {
            alerts.append(f"CPU使用率过高: {cpu_info['cpu_percent']:.1f}%")
        }
        
        # 内存检查
        memory_info = self.get_memory_info()
        if memory_info["percentage"] > self.thresholds["memory_percent"] {
            alerts.append(f"内存使用率过高: {memory_info['percentage']:.1f}%")
        }
        
        # 磁盘检查
        disk_info = self.get_disk_info()
        for disk in disk_info {
            if disk["percentage"] > self.thresholds["disk_percent"] {
                alerts.append(f"磁盘 {disk['device']} 使用率过高: {disk['percentage']:.1f}%")
            }
        }
        
        return alerts
    }
    
    func start_monitoring(interval: int = 60, callback: callable? = null) {
        """开始系统监控"""
        self.monitoring = true
        
        func monitoring_loop() {
            while self.monitoring {
                try {
                    # 收集系统信息
                    system_data = {
                        "timestamp": time.time(),
                        "cpu": self.get_cpu_info(),
                        "memory": self.get_memory_info(),
                        "disk": self.get_disk_info(),
                        "network": self.get_network_info(),
                        "top_processes": self.get_process_list(5)
                    }
                    
                    # 检查警告
                    alerts = self.check_alerts()
                    if alerts {
                        system_data["alerts"] = alerts
                        self.alerts.extend(alerts)
                    }
                    
                    # 调用回调函数
                    if callback {
                        callback(system_data)
                    }
                    
                    time.sleep(interval)
                    
                } catch Exception as e {
                    print(f"监控错误: {e}")
                    time.sleep(interval)
            }
        }
        
        monitor_thread = threading.Thread(target=monitoring_loop)
        monitor_thread.daemon = true
        monitor_thread.start()
        
        print(f"系统监控已启动,间隔: {interval}秒")
    }
    
    func stop_monitoring() {
        """停止系统监控"""
        self.monitoring = false
        print("系统监控已停止")
    }
    
    func get_system_report() -> dict {
        """生成系统报告"""
        return {
            "system_info": self.get_system_info(),
            "cpu_info": self.get_cpu_info(),
            "memory_info": self.get_memory_info(),
            "disk_info": self.get_disk_info(),
            "network_info": self.get_network_info(),
            "top_processes": self.get_process_list(10),
            "alerts": self.check_alerts(),
            "report_time": time.time()
        }
    }
}

# 使用系统监控器
monitor = SystemMonitor()

# 生成系统报告
report = monitor.get_system_report()
print("系统报告:")
print(f"CPU使用率: {report['cpu_info']['cpu_percent']:.1f}%")
print(f"内存使用率: {report['memory_info']['percentage']:.1f}%")
print(f"活跃连接数: {report['network_info']['connections']}")

# 检查警告
alerts = report["alerts"]
if alerts {
    print("\n系统警告:")
    for alert in alerts {
        print(f"  - {alert}")
}

# 启动持续监控
func log_system_data(data: dict) {
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["timestamp"]))
    print(f"[{timestamp}] CPU: {data['cpu']['cpu_percent']:.1f}%, 内存: {data['memory']['percentage']:.1f}%")
    
    if "alerts" in data {
        for alert in data["alerts"] {
            print(f"  警告: {alert}")
    }
}

# monitor.start_monitoring(interval=30, callback=log_system_data)

5.3 网络自动化

HTTP客户端

import requests
import json
import time
from urllib.parse import urljoin, urlparse

class HTTPClient {
    func __init__(base_url: str? = null, timeout: int = 30) {
        self.base_url = base_url
        self.timeout = timeout
        self.session = requests.Session()
        self.request_history = []
        self.retry_config = {
            "max_retries": 3,
            "backoff_factor": 1,
            "status_forcelist": [500, 502, 503, 504]
        }
    }
    
    func set_headers(headers: dict) {
        """设置默认请求头"""
        self.session.headers.update(headers)
    }
    
    func set_auth(username: str, password: str) {
        """设置基本认证"""
        self.session.auth = (username, password)
    }
    
    func set_bearer_token(token: str) {
        """设置Bearer Token"""
        self.session.headers["Authorization"] = f"Bearer {token}"
    }
    
    func _build_url(endpoint: str) -> str {
        """构建完整URL"""
        if self.base_url {
            return urljoin(self.base_url, endpoint)
        }
        return endpoint
    }
    
    func _log_request(method: str, url: str, response: any, duration: float) {
        """记录请求日志"""
        self.request_history.append({
            "timestamp": time.time(),
            "method": method,
            "url": url,
            "status_code": response.status_code,
            "duration": duration,
            "success": response.ok
        })
    }
    
    func _retry_request(method: str, url: str, **kwargs) -> any {
        """重试请求"""
        max_retries = self.retry_config["max_retries"]
        backoff_factor = self.retry_config["backoff_factor"]
        status_forcelist = self.retry_config["status_forcelist"]
        
        for attempt in range(max_retries + 1) {
            try {
                start_time = time.time()
                response = self.session.request(method, url, timeout=self.timeout, **kwargs)
                duration = time.time() - start_time
                
                self._log_request(method, url, response, duration)
                
                if response.ok or response.status_code not in status_forcelist {
                    return response
                }
                
                if attempt < max_retries {
                    wait_time = backoff_factor * (2 ** attempt)
                    print(f"请求失败 (状态码: {response.status_code}),{wait_time}秒后重试...")
                    time.sleep(wait_time)
                }
                
            } catch requests.exceptions.RequestException as e {
                if attempt < max_retries {
                    wait_time = backoff_factor * (2 ** attempt)
                    print(f"请求异常: {e},{wait_time}秒后重试...")
                    time.sleep(wait_time)
                } else {
                    raise e
                }
            }
        }
        
        return response
    }
    
    func get(endpoint: str, params: dict? = null, **kwargs) -> any {
        """GET请求"""
        url = self._build_url(endpoint)
        return self._retry_request("GET", url, params=params, **kwargs)
    }
    
    func post(endpoint: str, data: any? = null, json_data: dict? = null, **kwargs) -> any {
        """POST请求"""
        url = self._build_url(endpoint)
        return self._retry_request("POST", url, data=data, json=json_data, **kwargs)
    }
    
    func put(endpoint: str, data: any? = null, json_data: dict? = null, **kwargs) -> any {
        """PUT请求"""
        url = self._build_url(endpoint)
        return self._retry_request("PUT", url, data=data, json=json_data, **kwargs)
    }
    
    func delete(endpoint: str, **kwargs) -> any {
        """DELETE请求"""
        url = self._build_url(endpoint)
        return self._retry_request("DELETE", url, **kwargs)
    }
    
    func download_file(url: str, filepath: str, chunk_size: int = 8192) -> bool {
        """下载文件"""
        try {
            response = self.get(url, stream=true)
            response.raise_for_status()
            
            # 确保目录存在
            Path(filepath).parent.mkdir(parents=true, exist_ok=true)
            
            with open(filepath, "wb") as f {
                for chunk in response.iter_content(chunk_size=chunk_size) {
                    if chunk {
                        f.write(chunk)
                    }
                }
            }
            
            print(f"文件下载完成: {filepath}")
            return true
            
        } catch Exception as e {
            print(f"文件下载失败: {e}")
            return false
        }
    }
    
    func upload_file(endpoint: str, filepath: str, field_name: str = "file") -> any {
        """上传文件"""
        url = self._build_url(endpoint)
        
        with open(filepath, "rb") as f {
            files = {field_name: f}
            return self._retry_request("POST", url, files=files)
        }
    }
    
    func get_request_stats() -> dict {
        """获取请求统计"""
        if not self.request_history {
            return {"total_requests": 0}
        }
        
        total_requests = len(self.request_history)
        successful_requests = sum(1 for req in self.request_history if req["success"])
        failed_requests = total_requests - successful_requests
        
        durations = [req["duration"] for req in self.request_history]
        avg_duration = sum(durations) / len(durations)
        
        return {
            "total_requests": total_requests,
            "successful_requests": successful_requests,
            "failed_requests": failed_requests,
            "success_rate": successful_requests / total_requests,
            "average_duration": avg_duration,
            "min_duration": min(durations),
            "max_duration": max(durations)
        }
    }
}

# API客户端示例
class APIClient(HTTPClient) {
    func __init__(base_url: str, api_key: str? = null) {
        super().__init__(base_url)
        
        if api_key {
            self.set_headers({"X-API-Key": api_key})
        }
        
        self.set_headers({
            "Content-Type": "application/json",
            "User-Agent": "AIScript-Client/1.0"
        })
    }
    
    func get_user(user_id: int) -> dict? {
        """获取用户信息"""
        try {
            response = self.get(f"/users/{user_id}")
            response.raise_for_status()
            return response.json()
        } catch Exception as e {
            print(f"获取用户失败: {e}")
            return null
        }
    }
    
    func create_user(user_data: dict) -> dict? {
        """创建用户"""
        try {
            response = self.post("/users", json_data=user_data)
            response.raise_for_status()
            return response.json()
        } catch Exception as e {
            print(f"创建用户失败: {e}")
            return null
        }
    }
    
    func update_user(user_id: int, user_data: dict) -> dict? {
        """更新用户"""
        try {
            response = self.put(f"/users/{user_id}", json_data=user_data)
            response.raise_for_status()
            return response.json()
        } catch Exception as e {
            print(f"更新用户失败: {e}")
            return null
        }
    }
    
    func delete_user(user_id: int) -> bool {
        """删除用户"""
        try {
            response = self.delete(f"/users/{user_id}")
            response.raise_for_status()
            return true
        } catch Exception as e {
            print(f"删除用户失败: {e}")
            return false
        }
    }
    
    func search_users(query: str, limit: int = 10) -> list[dict] {
        """搜索用户"""
        try {
            params = {"q": query, "limit": limit}
            response = self.get("/users/search", params=params)
            response.raise_for_status()
            return response.json().get("users", [])
        } catch Exception as e {
            print(f"搜索用户失败: {e}")
            return []
        }
    }
}

# 使用HTTP客户端
client = HTTPClient("https://api.example.com")
client.set_bearer_token("your-api-token")

# 发送请求
response = client.get("/status")
if response.ok {
    print(f"API状态: {response.json()}")
}

# 下载文件
client.download_file(
    "https://example.com/data.csv",
    "downloads/data.csv"
)

# 获取请求统计
stats = client.get_request_stats()
print(f"请求统计: {stats}")

网络工具

import socket
import ping3
import subprocess
import re

class NetworkTools {
    func __init__() {
        self.results = []
    }
    
    func ping_host(host: str, count: int = 4, timeout: int = 3) -> dict {
        """Ping主机"""
        results = {
            "host": host,
            "packets_sent": count,
            "packets_received": 0,
            "packet_loss": 0.0,
            "min_time": null,
            "max_time": null,
            "avg_time": null,
            "times": []
        }
        
        times = []
        for i in range(count) {
            try {
                response_time = ping3.ping(host, timeout=timeout)
                if response_time is not null {
                    times.append(response_time * 1000)  # 转换为毫秒
                    results["packets_received"] += 1
                }
            } catch Exception as e {
                print(f"Ping失败: {e}")
            }
        }
        
        if times {
            results["times"] = times
            results["min_time"] = min(times)
            results["max_time"] = max(times)
            results["avg_time"] = sum(times) / len(times)
        }
        
        results["packet_loss"] = (count - results["packets_received"]) / count * 100
        
        return results
    }
    
    func check_port(host: str, port: int, timeout: int = 3) -> bool {
        """检查端口是否开放"""
        try {
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            sock.close()
            return result == 0
        } catch Exception {
            return false
        }
    }
    
    func scan_ports(host: str, ports: list[int], timeout: int = 1) -> dict {
        """扫描端口"""
        open_ports = []
        closed_ports = []
        
        for port in ports {
            if self.check_port(host, port, timeout) {
                open_ports.append(port)
            } else {
                closed_ports.append(port)
            }
        }
        
        return {
            "host": host,
            "open_ports": open_ports,
            "closed_ports": closed_ports,
            "total_scanned": len(ports)
        }
    }
    
    func get_local_ip() -> str? {
        """获取本地IP地址"""
        try {
            # 连接到外部地址来获取本地IP
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            sock.connect(("8.8.8.8", 80))
            local_ip = sock.getsockname()[0]
            sock.close()
            return local_ip
        } catch Exception {
            return null
        }
    }
    
    func resolve_hostname(hostname: str) -> list[str] {
        """解析主机名"""
        try {
            ip_addresses = socket.gethostbyname_ex(hostname)[2]
            return ip_addresses
        } catch Exception {
            return []
        }
    }
    
    func reverse_dns(ip: str) -> str? {
        """反向DNS查询"""
        try {
            hostname = socket.gethostbyaddr(ip)[0]
            return hostname
        } catch Exception {
            return null
        }
    }
    
    func traceroute(host: str, max_hops: int = 30) -> list[dict] {
        """路由跟踪"""
        try {
            if platform.system().lower() == "windows" {
                cmd = ["tracert", "-h", str(max_hops), host]
            } else {
                cmd = ["traceroute", "-m", str(max_hops), host]
            }
            
            result = subprocess.run(cmd, capture_output=true, text=true, timeout=60)
            
            # 解析输出
            hops = []
            lines = result.stdout.split('\n')
            
            for line in lines {
                # 简化的解析逻辑
                if re.search(r'\d+\s+', line) {
                    parts = line.strip().split()
                    if len(parts) >= 2 {
                        hop_num = parts[0]
                        if hop_num.isdigit() {
                            hops.append({
                                "hop": int(hop_num),
                                "raw_line": line.strip()
                            })
                        }
                    }
                }
            }
            
            return hops
            
        } catch Exception as e {
            print(f"路由跟踪失败: {e}")
            return []
        }
    }
    
    func network_speed_test(test_url: str = "http://speedtest.ftp.otenet.gr/files/test1Mb.db") -> dict {
        """网络速度测试"""
        try {
            import requests
            
            start_time = time.time()
            response = requests.get(test_url, stream=true)
            
            total_size = 0
            for chunk in response.iter_content(chunk_size=1024) {
                total_size += len(chunk)
            }
            
            end_time = time.time()
            duration = end_time - start_time
            
            speed_bps = total_size / duration  # 字节每秒
            speed_mbps = speed_bps / (1024 * 1024)  # MB每秒
            
            return {
                "total_size": total_size,
                "duration": duration,
                "speed_bps": speed_bps,
                "speed_mbps": speed_mbps,
                "test_url": test_url
            }
            
        } except Exception as e {
            print(f"网络速度测试失败: {e}")
            return {}
        }
    }
    
    func network_diagnostics(host: str) -> dict {
        """网络诊断"""
        diagnostics = {
            "host": host,
            "timestamp": time.time()
        }
        
        # DNS解析
        print(f"正在解析 {host}...")
        ip_addresses = self.resolve_hostname(host)
        diagnostics["dns_resolution"] = {
            "success": len(ip_addresses) > 0,
            "ip_addresses": ip_addresses
        }
        
        if ip_addresses {
            target_ip = ip_addresses[0]
            
            # Ping测试
            print(f"正在Ping {target_ip}...")
            ping_result = self.ping_host(target_ip)
            diagnostics["ping"] = ping_result
            
            # 端口扫描
            print(f"正在扫描常用端口...")
            common_ports = [22, 23, 25, 53, 80, 110, 143, 443, 993, 995]
            port_scan = self.scan_ports(target_ip, common_ports)
            diagnostics["port_scan"] = port_scan
            
            # 路由跟踪
            print(f"正在进行路由跟踪...")
            traceroute_result = self.traceroute(target_ip)
            diagnostics["traceroute"] = traceroute_result
        }
        
        return diagnostics
    }
}

# 使用网络工具
net_tools = NetworkTools()

# 网络诊断示例
diagnostics = net_tools.network_diagnostics("google.com")
print("网络诊断结果:")
print(f"DNS解析: {'成功' if diagnostics['dns_resolution']['success'] else '失败'}")
if "ping" in diagnostics {
    ping_result = diagnostics["ping"]
    print(f"Ping结果: 丢包率 {ping_result['packet_loss']:.1f}%, 平均延迟 {ping_result['avg_time']:.1f}ms")
}
if "port_scan" in diagnostics {
    port_result = diagnostics["port_scan"]
    print(f"开放端口: {port_result['open_ports']}")
}

# 获取本地网络信息
local_ip = net_tools.get_local_ip()
print(f"本地IP地址: {local_ip}")

# 网络速度测试
# speed_result = net_tools.network_speed_test()
# if speed_result {
#     print(f"网络速度: {speed_result['speed_mbps']:.2f} MB/s")
# }

5.4 数据库自动化

数据库连接管理

import sqlite3
import mysql.connector
import psycopg2
import redis
from contextlib import contextmanager

class DatabaseManager {
    func __init__() {
        self.connections = {}
        self.connection_configs = {}
    }
    
    func add_connection(
        name: str,
        db_type: str,
        config: dict
    ) {
        """添加数据库连接配置"""
        self.connection_configs[name] = {
            "type": db_type,
            "config": config
        }
    }
    
    func get_connection(name: str) -> any? {
        """获取数据库连接"""
        if name in self.connections {
            return self.connections[name]
        }
        
        if name not in self.connection_configs {
            raise ValueError(f"未找到连接配置: {name}")
        }
        
        config_info = self.connection_configs[name]
        db_type = config_info["type"]
        config = config_info["config"]
        
        try {
            match db_type {
                "sqlite" => {
                    conn = sqlite3.connect(config["database"])
                    conn.row_factory = sqlite3.Row  # 使结果可以按列名访问
                }
                "mysql" => {
                    conn = mysql.connector.connect(**config)
                }
                "postgresql" => {
                    conn = psycopg2.connect(**config)
                }
                "redis" => {
                    conn = redis.Redis(**config)
                }
                _ => {
                    raise ValueError(f"不支持的数据库类型: {db_type}")
                }
            }
            
            self.connections[name] = conn
            print(f"数据库连接 '{name}' 已建立")
            return conn
            
        } catch Exception as e {
            print(f"连接数据库失败: {e}")
            return null
        }
    }
    
    @contextmanager
    func get_cursor(connection_name: str) {
        """获取数据库游标(上下文管理器)"""
        conn = self.get_connection(connection_name)
        if conn is null {
            raise ValueError(f"无法获取连接: {connection_name}")
        }
        
        cursor = conn.cursor()
        try {
            yield cursor
            conn.commit()
        } catch Exception as e {
            conn.rollback()
            raise e
        } finally {
            cursor.close()
        }
    }
    
    func execute_query(
        connection_name: str,
        query: str,
        params: tuple? = null
    ) -> list[dict] {
        """执行查询"""
        with self.get_cursor(connection_name) as cursor {
            if params {
                cursor.execute(query, params)
            } else {
                cursor.execute(query)
            }
            
            # 获取列名
            columns = [desc[0] for desc in cursor.description] if cursor.description else []
            
            # 获取结果
            rows = cursor.fetchall()
            
            # 转换为字典列表
            result = []
            for row in rows {
                if isinstance(row, sqlite3.Row) {
                    result.append(dict(row))
                } else {
                    result.append(dict(zip(columns, row)))
                }
            }
            
            return result
        }
    }
    
    func execute_update(
        connection_name: str,
        query: str,
        params: tuple? = null
    ) -> int {
        """执行更新操作"""
        with self.get_cursor(connection_name) as cursor {
            if params {
                cursor.execute(query, params)
            } else {
                cursor.execute(query)
            }
            return cursor.rowcount
        }
    }
    
    func execute_batch(
        connection_name: str,
        query: str,
        params_list: list[tuple]
    ) -> int {
        """批量执行"""
        with self.get_cursor(connection_name) as cursor {
            cursor.executemany(query, params_list)
            return cursor.rowcount
        }
    }
    
    func close_connection(name: str) {
        """关闭连接"""
        if name in self.connections {
            self.connections[name].close()
            del self.connections[name]
            print(f"数据库连接 '{name}' 已关闭")
    }
    
    func close_all_connections() {
        """关闭所有连接"""
        for name in list(self.connections.keys()) {
            self.close_connection(name)
        }
    }
}

# 数据库操作类
class DatabaseOperations {
    func __init__(db_manager: DatabaseManager) {
        self.db_manager = db_manager
    }
    
    func create_table(
        connection_name: str,
        table_name: str,
        columns: dict,
        primary_key: str? = null
    ) -> bool {
        """创建表"""
        try {
            # 构建列定义
            column_defs = []
            for col_name, col_type in columns.items() {
                column_defs.append(f"{col_name} {col_type}")
            }
            
            if primary_key {
                column_defs.append(f"PRIMARY KEY ({primary_key})")
            }
            
            query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)})"
            self.db_manager.execute_update(connection_name, query)
            
            print(f"表 '{table_name}' 创建成功")
            return true
            
        } catch Exception as e {
            print(f"创建表失败: {e}")
            return false
        }
    }
    
    func insert_data(
        connection_name: str,
        table_name: str,
        data: dict
    ) -> bool {
        """插入数据"""
        try {
            columns = list(data.keys())
            placeholders = ["?" for _ in columns]  # SQLite风格
            
            query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
            values = tuple(data.values())
            
            self.db_manager.execute_update(connection_name, query, values)
            print(f"数据插入成功到表 '{table_name}'")
            return true
            
        } catch Exception as e {
            print(f"插入数据失败: {e}")
            return false
        }
    }
    
    func bulk_insert(
        connection_name: str,
        table_name: str,
        data_list: list[dict]
    ) -> bool {
        """批量插入数据"""
        if not data_list {
            return false
        }
        
        try {
            columns = list(data_list[0].keys())
            placeholders = ["?" for _ in columns]
            
            query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
            
            # 准备参数列表
            params_list = [tuple(data.values()) for data in data_list]
            
            rows_affected = self.db_manager.execute_batch(connection_name, query, params_list)
            print(f"批量插入完成,影响 {rows_affected} 行")
            return true
            
        } catch Exception as e {
            print(f"批量插入失败: {e}")
            return false
        }
    }
    
    func update_data(
        connection_name: str,
        table_name: str,
        data: dict,
        where_clause: str,
        where_params: tuple? = null
    ) -> int {
        """更新数据"""
        try {
            set_clauses = [f"{col} = ?" for col in data.keys()]
            query = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE {where_clause}"
            
            params = list(data.values())
            if where_params {
                params.extend(where_params)
            }
            
            rows_affected = self.db_manager.execute_update(connection_name, query, tuple(params))
            print(f"更新完成,影响 {rows_affected} 行")
            return rows_affected
            
        } catch Exception as e {
            print(f"更新数据失败: {e}")
            return 0
        }
    }
    
    func delete_data(
        connection_name: str,
        table_name: str,
        where_clause: str,
        where_params: tuple? = null
    ) -> int {
        """删除数据"""
        try {
            query = f"DELETE FROM {table_name} WHERE {where_clause}"
            rows_affected = self.db_manager.execute_update(connection_name, query, where_params)
            print(f"删除完成,影响 {rows_affected} 行")
            return rows_affected
            
        } catch Exception as e {
            print(f"删除数据失败: {e}")
            return 0
        }
    }
    
    func backup_table(
        connection_name: str,
        table_name: str,
        backup_file: str
    ) -> bool {
        """备份表数据"""
        try {
            # 查询所有数据
            data = self.db_manager.execute_query(connection_name, f"SELECT * FROM {table_name}")
            
            # 写入JSON文件
            import json
            with open(backup_file, "w", encoding="utf-8") as f {
                json.dump(data, f, indent=2, ensure_ascii=false, default=str)
            }
            
            print(f"表 '{table_name}' 备份完成: {backup_file}")
            return true
            
        } catch Exception as e {
            print(f"备份表失败: {e}")
            return false
        }
    }
    
    func restore_table(
        connection_name: str,
        table_name: str,
        backup_file: str,
        clear_existing: bool = false
    ) -> bool {
        """恢复表数据"""
        try {
            # 读取备份文件
            import json
            with open(backup_file, "r", encoding="utf-8") as f {
                data = json.load(f)
            }
            
            if not data {
                print("备份文件为空")
                return false
            }
            
            # 清空现有数据
            if clear_existing {
                self.delete_data(connection_name, table_name, "1=1")
            }
            
            # 批量插入数据
            return self.bulk_insert(connection_name, table_name, data)
            
        } catch Exception as e {
            print(f"恢复表失败: {e}")
            return false
        }
    }
}

# 使用数据库管理器
db_manager = DatabaseManager()

# 添加SQLite连接
db_manager.add_connection("local_db", "sqlite", {
    "database": "data/app.db"
})

# 添加MySQL连接
db_manager.add_connection("mysql_db", "mysql", {
    "host": "localhost",
    "user": "username",
    "password": "password",
    "database": "myapp"
})

# 数据库操作
db_ops = DatabaseOperations(db_manager)

# 创建用户表
user_table_columns = {
    "id": "INTEGER",
    "username": "VARCHAR(50) NOT NULL",
    "email": "VARCHAR(100) NOT NULL",
    "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
}

db_ops.create_table("local_db", "users", user_table_columns, "id")

# 插入用户数据
user_data = {
    "username": "john_doe",
    "email": "john@example.com"
}

db_ops.insert_data("local_db", "users", user_data)

# 查询用户
users = db_manager.execute_query("local_db", "SELECT * FROM users WHERE username = ?", ("john_doe",))
print(f"查询结果: {users}")

# 备份表
db_ops.backup_table("local_db", "users", "backup/users_backup.json")

5.5 任务调度与自动化

定时任务调度

import schedule
import threading
import time
from datetime import datetime, timedelta

class TaskScheduler {
    func __init__() {
        self.jobs = {}
        self.running = false
        self.scheduler_thread = null
    }
    
    func add_job(
        name: str,
        func: callable,
        schedule_type: str,
        schedule_value: any,
        args: tuple = (),
        kwargs: dict = {},
        max_retries: int = 3
    ) -> bool {
        """添加定时任务"""
        try {
            job_info = {
                "function": func,
                "args": args,
                "kwargs": kwargs,
                "max_retries": max_retries,
                "retry_count": 0,
                "last_run": null,
                "next_run": null,
                "status": "scheduled",
                "error_log": []
            }
            
            # 根据调度类型设置任务
            match schedule_type {
                "interval" => {
                    job = schedule.every(schedule_value).seconds.do(self._run_job, name)
                }
                "daily" => {
                    job = schedule.every().day.at(schedule_value).do(self._run_job, name)
                }
                "weekly" => {
                    day, time_str = schedule_value
                    job = getattr(schedule.every(), day.lower()).at(time_str).do(self._run_job, name)
                }
                "monthly" => {
                    # 简化的月度调度
                    job = schedule.every().day.at(schedule_value).do(self._run_job, name)
                }
                "cron" => {
                    # 简化的cron表达式支持
                    job = schedule.every().minute.do(self._run_job, name)
                }
                _ => {
                    raise ValueError(f"不支持的调度类型: {schedule_type}")
                }
            }
            
            job_info["schedule_job"] = job
            self.jobs[name] = job_info
            
            print(f"任务 '{name}' 已添加到调度器")
            return true
            
        } catch Exception as e {
            print(f"添加任务失败: {e}")
            return false
        }
    }
    
    func _run_job(name: str) {
        """执行任务"""
        if name not in self.jobs {
            return
        }
        
        job_info = self.jobs[name]
        
        try {
            print(f"开始执行任务: {name}")
            start_time = time.time()
            
            # 执行任务函数
            result = job_info["function"](*job_info["args"], **job_info["kwargs"])
            
            end_time = time.time()
            duration = end_time - start_time
            
            # 更新任务信息
            job_info["last_run"] = datetime.now()
            job_info["status"] = "completed"
            job_info["retry_count"] = 0
            
            print(f"任务 '{name}' 执行完成,耗时: {duration:.2f}秒")
            
        } catch Exception as e {
            job_info["retry_count"] += 1
            job_info["error_log"].append({
                "timestamp": datetime.now(),
                "error": str(e)
            })
            
            print(f"任务 '{name}' 执行失败: {e}")
            
            if job_info["retry_count"] < job_info["max_retries"] {
                print(f"将在1分钟后重试 (第{job_info['retry_count']}次重试)")
                # 简单的重试机制
                threading.Timer(60, self._run_job, args=[name]).start()
            } else {
                job_info["status"] = "failed"
                print(f"任务 '{name}' 重试次数已达上限,标记为失败")
        }
    }
    
    func start() {
        """启动调度器"""
        if self.running {
            print("调度器已在运行")
            return
        }
        
        self.running = true
        
        func scheduler_loop() {
            while self.running {
                schedule.run_pending()
                time.sleep(1)
            }
        }
        
        self.scheduler_thread = threading.Thread(target=scheduler_loop)
        self.scheduler_thread.daemon = true
        self.scheduler_thread.start()
        
        print("任务调度器已启动")
    }
    
    func stop() {
        """停止调度器"""
        self.running = false
        if self.scheduler_thread {
            self.scheduler_thread.join(timeout=5)
        }
        print("任务调度器已停止")
    }
    
    func remove_job(name: str) -> bool {
        """移除任务"""
        if name not in self.jobs {
            print(f"任务 '{name}' 不存在")
            return false
        }
        
        # 取消调度
        schedule.cancel_job(self.jobs[name]["schedule_job"])
        del self.jobs[name]
        
        print(f"任务 '{name}' 已移除")
        return true
    }
    
    func list_jobs() -> list[dict] {
        """列出所有任务"""
        job_list = []
        for name, job_info in self.jobs.items() {
            job_list.append({
                "name": name,
                "status": job_info["status"],
                "last_run": job_info["last_run"],
                "retry_count": job_info["retry_count"],
                "max_retries": job_info["max_retries"],
                "error_count": len(job_info["error_log"])
            })
        }
        return job_list
    }
    
    func get_job_status(name: str) -> dict? {
        """获取任务状态"""
        if name not in self.jobs {
            return null
        }
        
        job_info = self.jobs[name]
        return {
            "name": name,
            "status": job_info["status"],
            "last_run": job_info["last_run"],
            "retry_count": job_info["retry_count"],
            "max_retries": job_info["max_retries"],
            "error_log": job_info["error_log"][-5:]  # 最近5个错误
        }
    }
}

# 任务函数示例
func backup_database() {
    print("执行数据库备份...")
    # 模拟备份操作
    time.sleep(2)
    print("数据库备份完成")
}

func send_daily_report() {
    print("生成并发送日报...")
    # 模拟报告生成
    time.sleep(1)
    print("日报发送完成")
}

func cleanup_temp_files() {
    print("清理临时文件...")
    # 模拟文件清理
    import os
    import glob
    
    temp_files = glob.glob("/tmp/*.tmp")
    for file in temp_files {
        try {
            os.remove(file)
            print(f"删除临时文件: {file}")
        } catch Exception as e {
            print(f"删除文件失败: {e}")
        }
    }
    print("临时文件清理完成")
}

# 使用任务调度器
scheduler = TaskScheduler()

# 添加不同类型的任务
scheduler.add_job(
    "database_backup",
    backup_database,
    "daily",
    "02:00"  # 每天凌晨2点
)

scheduler.add_job(
    "daily_report",
    send_daily_report,
    "daily",
    "09:00"  # 每天上午9点
)

scheduler.add_job(
    "cleanup_temp",
    cleanup_temp_files,
    "interval",
    3600  # 每小时执行一次
)

scheduler.add_job(
    "weekly_maintenance",
    lambda: print("执行周维护任务"),
    "weekly",
    ("sunday", "01:00")  # 每周日凌晨1点
)

# 启动调度器
scheduler.start()

# 查看任务列表
jobs = scheduler.list_jobs()
print("当前任务列表:")
for job in jobs {
    print(f"  {job['name']}: {job['status']}")
}

# 运行一段时间后停止
# time.sleep(60)
# scheduler.stop()

工作流自动化

import json
import time
from datetime import datetime
from enum import Enum

class WorkflowStatus(Enum) {
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"
}

class TaskStatus(Enum) {
    WAITING = "waiting"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"
}

class WorkflowTask {
    func __init__(
        name: str,
        function: callable,
        args: tuple = (),
        kwargs: dict = {},
        depends_on: list[str] = [],
        timeout: int = 300,
        retry_count: int = 0
    ) {
        self.name = name
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.depends_on = depends_on
        self.timeout = timeout
        self.retry_count = retry_count
        self.max_retries = retry_count
        self.status = TaskStatus.WAITING
        self.result = null
        self.error = null
        self.start_time = null
        self.end_time = null
        self.duration = 0
    }
    
    func can_run(completed_tasks: set[str]) -> bool {
        """检查任务是否可以运行"""
        if self.status != TaskStatus.WAITING {
            return false
        }
        
        # 检查依赖任务是否都已完成
        for dep in self.depends_on {
            if dep not in completed_tasks {
                return false
            }
        }
        
        return true
    }
    
    func execute() -> bool {
        """执行任务"""
        self.status = TaskStatus.RUNNING
        self.start_time = datetime.now()
        
        try {
            print(f"开始执行任务: {self.name}")
            self.result = self.function(*self.args, **self.kwargs)
            self.status = TaskStatus.COMPLETED
            print(f"任务 '{self.name}' 执行完成")
            return true
            
        } catch Exception as e {
            self.error = str(e)
            print(f"任务 '{self.name}' 执行失败: {e}")
            
            if self.retry_count > 0 {
                self.retry_count -= 1
                self.status = TaskStatus.WAITING
                print(f"任务 '{self.name}' 将重试,剩余重试次数: {self.retry_count}")
                return false
            } else {
                self.status = TaskStatus.FAILED
                return false
            }
        } finally {
            self.end_time = datetime.now()
            if self.start_time {
                self.duration = (self.end_time - self.start_time).total_seconds()
            }
        }
    }
}

class Workflow {
    func __init__(name: str) {
        self.name = name
        self.tasks = {}
        self.status = WorkflowStatus.PENDING
        self.start_time = null
        self.end_time = null
        self.duration = 0
        self.completed_tasks = set()
        self.failed_tasks = set()
    }
    
    func add_task(
        name: str,
        function: callable,
        args: tuple = (),
        kwargs: dict = {},
        depends_on: list[str] = [],
        timeout: int = 300,
        retry_count: int = 0
    ) -> WorkflowTask {
        """添加任务到工作流"""
        task = WorkflowTask(
            name, function, args, kwargs,
            depends_on, timeout, retry_count
        )
        self.tasks[name] = task
        return task
    }
    
    func validate() -> bool {
        """验证工作流的依赖关系"""
        # 检查循环依赖
        visited = set()
        rec_stack = set()
        
        func has_cycle(task_name: str) -> bool {
            if task_name in rec_stack {
                return true
            }
            if task_name in visited {
                return false
            }
            
            visited.add(task_name)
            rec_stack.add(task_name)
            
            if task_name in self.tasks {
                for dep in self.tasks[task_name].depends_on {
                    if has_cycle(dep) {
                        return true
                    }
                }
            }
            
            rec_stack.remove(task_name)
            return false
        }
        
        for task_name in self.tasks.keys() {
            if has_cycle(task_name) {
                print(f"检测到循环依赖,涉及任务: {task_name}")
                return false
            }
        }
        
        # 检查依赖任务是否存在
        for task_name, task in self.tasks.items() {
            for dep in task.depends_on {
                if dep not in self.tasks {
                    print(f"任务 '{task_name}' 依赖的任务 '{dep}' 不存在")
                    return false
                }
            }
        }
        
        return true
    }
    
    func execute() -> bool {
        """执行工作流"""
        if not self.validate() {
            print(f"工作流 '{self.name}' 验证失败")
            self.status = WorkflowStatus.FAILED
            return false
        }
        
        print(f"开始执行工作流: {self.name}")
        self.status = WorkflowStatus.RUNNING
        self.start_time = datetime.now()
        
        try {
            while true {
                # 查找可以执行的任务
                runnable_tasks = []
                for task_name, task in self.tasks.items() {
                    if task.can_run(self.completed_tasks) {
                        runnable_tasks.append(task)
                    }
                }
                
                if not runnable_tasks {
                    # 检查是否所有任务都已完成
                    if len(self.completed_tasks) == len(self.tasks) {
                        print(f"工作流 '{self.name}' 执行完成")
                        self.status = WorkflowStatus.COMPLETED
                        return true
                    } else {
                        # 有任务失败或无法执行
                        print(f"工作流 '{self.name}' 执行失败")
                        self.status = WorkflowStatus.FAILED
                        return false
                    }
                }
                
                # 执行可运行的任务
                for task in runnable_tasks {
                    if task.execute() {
                        self.completed_tasks.add(task.name)
                    } else {
                        if task.status == TaskStatus.FAILED {
                            self.failed_tasks.add(task.name)
                        }
                    }
                }
                
                # 检查是否有任务失败
                if self.failed_tasks {
                    print(f"工作流 '{self.name}' 中有任务失败: {list(self.failed_tasks)}")
                    self.status = WorkflowStatus.FAILED
                    return false
                }
                
                # 短暂等待
                time.sleep(0.1)
            }
            
        } except KeyboardInterrupt {
            print(f"工作流 '{self.name}' 被用户取消")
            self.status = WorkflowStatus.CANCELLED
            return false
        } except Exception as e {
            print(f"工作流 '{self.name}' 执行异常: {e}")
            self.status = WorkflowStatus.FAILED
            return false
        } finally {
            self.end_time = datetime.now()
            if self.start_time {
                self.duration = (self.end_time - self.start_time).total_seconds()
            }
        }
    }
    
    func get_status() -> dict {
        """获取工作流状态"""
        task_statuses = {}
        for name, task in self.tasks.items() {
            task_statuses[name] = {
                "status": task.status.value,
                "duration": task.duration,
                "error": task.error
            }
        }
        
        return {
            "name": self.name,
            "status": self.status.value,
            "duration": self.duration,
            "completed_tasks": list(self.completed_tasks),
            "failed_tasks": list(self.failed_tasks),
            "task_details": task_statuses
        }
    }
    
    func save_report(file_path: str) {
        """保存执行报告"""
        report = self.get_status()
        report["start_time"] = self.start_time.isoformat() if self.start_time else null
        report["end_time"] = self.end_time.isoformat() if self.end_time else null
        
        with open(file_path, "w", encoding="utf-8") as f {
            json.dump(report, f, indent=2, ensure_ascii=false)
        }
        
        print(f"工作流报告已保存到: {file_path}")
    }
}

# 工作流示例任务函数
func download_data(url: str, output_file: str) {
    print(f"下载数据从 {url} 到 {output_file}")
    time.sleep(1)  # 模拟下载
    return f"数据已下载到 {output_file}"
}

func process_data(input_file: str, output_file: str) {
    print(f"处理数据从 {input_file} 到 {output_file}")
    time.sleep(2)  # 模拟处理
    return f"数据已处理并保存到 {output_file}"
}

func analyze_data(data_file: str) {
    print(f"分析数据文件 {data_file}")
    time.sleep(1)  # 模拟分析
    return f"分析结果: 数据质量良好"
}

func generate_report(analysis_result: str, report_file: str) {
    print(f"生成报告到 {report_file}")
    time.sleep(1)  # 模拟报告生成
    return f"报告已生成: {report_file}"
}

func send_notification(message: str) {
    print(f"发送通知: {message}")
    time.sleep(0.5)  # 模拟发送
    return "通知已发送"
}

# 创建数据处理工作流
data_workflow = Workflow("数据处理工作流")

# 添加任务
data_workflow.add_task(
    "download",
    download_data,
    args=("https://api.example.com/data", "raw_data.json")
)

data_workflow.add_task(
    "process",
    process_data,
    args=("raw_data.json", "processed_data.json"),
    depends_on=["download"]
)

data_workflow.add_task(
    "analyze",
    analyze_data,
    args=("processed_data.json",),
    depends_on=["process"]
)

data_workflow.add_task(
    "report",
    generate_report,
    args=("分析完成", "analysis_report.pdf"),
    depends_on=["analyze"]
)

data_workflow.add_task(
    "notify",
    send_notification,
    args=("数据处理工作流已完成",),
    depends_on=["report"]
)

# 执行工作流
if data_workflow.execute() {
    print("工作流执行成功")
} else {
    print("工作流执行失败")
}

# 保存执行报告
data_workflow.save_report("workflow_report.json")

# 查看状态
status = data_workflow.get_status()
print(f"工作流状态: {status['status']}")
print(f"执行时长: {status['duration']:.2f}秒")
print(f"完成任务: {status['completed_tasks']}")

5.6 本章总结

本章详细介绍了AI Script在自动化脚本开发方面的强大功能,涵盖了以下核心内容:

核心要点

  1. 文件系统操作

    • 文件和目录的创建、删除、移动、复制
    • 文件内容的读取、写入、搜索和替换
    • 批量文件处理和模式匹配
  2. 系统管理与监控

    • 进程管理:启动、停止、监控进程
    • 系统监控:CPU、内存、磁盘、网络使用情况
    • 系统信息获取和性能分析
  3. 网络自动化

    • HTTP客户端:GET、POST、PUT、DELETE请求
    • 网络诊断:ping、端口扫描、连接测试
    • API集成和数据交换
  4. 数据库自动化

    • 多数据库连接管理
    • CRUD操作的自动化
    • 数据备份和恢复
    • 批量数据处理
  5. 任务调度与自动化

    • 定时任务调度器
    • 多种调度模式:间隔、每日、每周、月度
    • 任务重试和错误处理
    • 任务状态监控
  6. 工作流自动化

    • 任务依赖关系管理
    • 并行和串行任务执行
    • 工作流状态跟踪
    • 执行报告生成

最佳实践

  1. 错误处理

    • 使用try-catch块处理异常
    • 实现重试机制
    • 记录详细的错误日志
  2. 资源管理

    • 及时关闭文件句柄和网络连接
    • 使用连接池管理数据库连接
    • 监控系统资源使用情况
  3. 安全考虑

    • 验证输入参数
    • 使用安全的文件路径操作
    • 保护敏感信息(密码、API密钥)
  4. 性能优化

    • 使用批量操作减少I/O次数
    • 实现并行处理提高效率
    • 合理设置超时和重试参数
  5. 可维护性

    • 模块化设计,职责分离
    • 详细的文档和注释
    • 统一的错误处理和日志格式

练习题

  1. 基础练习

    • 编写一个文件备份脚本,支持增量备份
    • 创建一个系统监控脚本,定期检查系统状态
    • 实现一个日志分析工具,统计错误日志
  2. 进阶练习

    • 开发一个数据同步工具,在不同数据库间同步数据
    • 创建一个网站监控系统,检查网站可用性
    • 实现一个自动化部署脚本,支持多环境部署
  3. 综合项目

    • 构建一个完整的数据处理管道
    • 开发一个多任务调度系统
    • 创建一个自动化运维平台

通过本章的学习,你应该能够熟练使用AI Script开发各种自动化脚本,提高工作效率,减少重复性工作。下一章我们将学习AI Script的高级特性和扩展开发。