5.1 文件系统操作
文件和目录管理
import os
import shutil
import glob
from pathlib import Path
class FileManager {
func __init__(base_path: str = ".") {
self.base_path = Path(base_path).resolve()
self.operations_log = []
}
func create_directory(path: str, parents: bool = true) -> bool {
"""创建目录"""
try {
dir_path = self.base_path / path
dir_path.mkdir(parents=parents, exist_ok=true)
self._log_operation(f"创建目录: {dir_path}")
return true
} catch Exception as e {
self._log_operation(f"创建目录失败: {path}, 错误: {e}")
return false
}
}
func copy_file(source: str, destination: str) -> bool {
"""复制文件"""
try {
src_path = self.base_path / source
dst_path = self.base_path / destination
# 确保目标目录存在
dst_path.parent.mkdir(parents=true, exist_ok=true)
shutil.copy2(src_path, dst_path)
self._log_operation(f"复制文件: {src_path} -> {dst_path}")
return true
} catch Exception as e {
self._log_operation(f"复制文件失败: {source} -> {destination}, 错误: {e}")
return false
}
}
func move_file(source: str, destination: str) -> bool {
"""移动文件"""
try {
src_path = self.base_path / source
dst_path = self.base_path / destination
# 确保目标目录存在
dst_path.parent.mkdir(parents=true, exist_ok=true)
shutil.move(str(src_path), str(dst_path))
self._log_operation(f"移动文件: {src_path} -> {dst_path}")
return true
} catch Exception as e {
self._log_operation(f"移动文件失败: {source} -> {destination}, 错误: {e}")
return false
}
}
func delete_file(path: str) -> bool {
"""删除文件"""
try {
file_path = self.base_path / path
if file_path.is_file() {
file_path.unlink()
self._log_operation(f"删除文件: {file_path}")
return true
} else {
self._log_operation(f"文件不存在: {file_path}")
return false
}
} catch Exception as e {
self._log_operation(f"删除文件失败: {path}, 错误: {e}")
return false
}
}
func delete_directory(path: str, recursive: bool = false) -> bool {
"""删除目录"""
try {
dir_path = self.base_path / path
if dir_path.is_dir() {
if recursive {
shutil.rmtree(dir_path)
} else {
dir_path.rmdir()
}
self._log_operation(f"删除目录: {dir_path}")
return true
} else {
self._log_operation(f"目录不存在: {dir_path}")
return false
}
} catch Exception as e {
self._log_operation(f"删除目录失败: {path}, 错误: {e}")
return false
}
}
func find_files(pattern: str, recursive: bool = true) -> list[str] {
"""查找文件"""
try {
if recursive {
search_pattern = f"**/{pattern}"
} else {
search_pattern = pattern
}
files = list(self.base_path.glob(search_pattern))
file_paths = [str(f.relative_to(self.base_path)) for f in files if f.is_file()]
self._log_operation(f"查找文件: {pattern}, 找到 {len(file_paths)} 个文件")
return file_paths
} catch Exception as e {
self._log_operation(f"查找文件失败: {pattern}, 错误: {e}")
return []
}
}
func get_file_info(path: str) -> dict? {
"""获取文件信息"""
try {
file_path = self.base_path / path
if not file_path.exists() {
return null
}
stat = file_path.stat()
return {
"path": str(file_path),
"size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"is_file": file_path.is_file(),
"is_directory": file_path.is_dir(),
"permissions": oct(stat.st_mode)[-3:]
}
} catch Exception as e {
self._log_operation(f"获取文件信息失败: {path}, 错误: {e}")
return null
}
}
func _log_operation(message: str) {
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.operations_log.append(f"[{timestamp}] {message}")
}
func get_operations_log() -> list[str] {
return self.operations_log.copy()
}
}
# 使用文件管理器
file_manager = FileManager("/path/to/workspace")
# 创建目录结构
file_manager.create_directory("projects/ai_script")
file_manager.create_directory("data/raw")
file_manager.create_directory("data/processed")
# 查找Python文件
python_files = file_manager.find_files("*.py")
print(f"找到 {len(python_files)} 个Python文件")
# 批量操作示例
for file in python_files[:5] { # 只处理前5个文件
info = file_manager.get_file_info(file)
if info and info["size"] > 1024 { # 大于1KB的文件
backup_path = f"backup/{file}"
file_manager.copy_file(file, backup_path)
}
}
文件内容处理
import re
import json
import csv
import yaml
class FileProcessor {
func __init__() {
self.encoding = "utf-8"
self.processed_files = []
}
func read_text_file(filepath: str) -> str? {
"""读取文本文件"""
try {
with open(filepath, "r", encoding=self.encoding) as f {
content = f.read()
self.processed_files.append(filepath)
return content
} catch Exception as e {
print(f"读取文件失败: {filepath}, 错误: {e}")
return null
}
}
func write_text_file(filepath: str, content: str) -> bool {
"""写入文本文件"""
try {
# 确保目录存在
Path(filepath).parent.mkdir(parents=true, exist_ok=true)
with open(filepath, "w", encoding=self.encoding) as f {
f.write(content)
self.processed_files.append(filepath)
return true
} catch Exception as e {
print(f"写入文件失败: {filepath}, 错误: {e}")
return false
}
}
func read_json_file(filepath: str) -> dict? {
"""读取JSON文件"""
try {
with open(filepath, "r", encoding=self.encoding) as f {
data = json.load(f)
return data
} catch Exception as e {
print(f"读取JSON文件失败: {filepath}, 错误: {e}")
return null
}
}
func write_json_file(filepath: str, data: dict, indent: int = 2) -> bool {
"""写入JSON文件"""
try {
Path(filepath).parent.mkdir(parents=true, exist_ok=true)
with open(filepath, "w", encoding=self.encoding) as f {
json.dump(data, f, indent=indent, ensure_ascii=false)
return true
} catch Exception as e {
print(f"写入JSON文件失败: {filepath}, 错误: {e}")
return false
}
}
func read_csv_file(filepath: str, delimiter: str = ",") -> list[dict]? {
"""读取CSV文件"""
try {
data = []
with open(filepath, "r", encoding=self.encoding) as f {
reader = csv.DictReader(f, delimiter=delimiter)
for row in reader {
data.append(dict(row))
}
return data
} catch Exception as e {
print(f"读取CSV文件失败: {filepath}, 错误: {e}")
return null
}
}
func write_csv_file(
filepath: str,
data: list[dict],
fieldnames: list[str]? = null,
delimiter: str = ","
) -> bool {
"""写入CSV文件"""
try {
if not data {
return false
}
if fieldnames is null {
fieldnames = list(data[0].keys())
}
Path(filepath).parent.mkdir(parents=true, exist_ok=true)
with open(filepath, "w", newline="", encoding=self.encoding) as f {
writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=delimiter)
writer.writeheader()
writer.writerows(data)
return true
} catch Exception as e {
print(f"写入CSV文件失败: {filepath}, 错误: {e}")
return false
}
}
func process_text_batch(
input_pattern: str,
output_dir: str,
processor_func: callable,
file_extension: str = ".txt"
) -> int {
"""批量处理文本文件"""
processed_count = 0
for filepath in glob.glob(input_pattern) {
try {
# 读取文件
content = self.read_text_file(filepath)
if content is null {
continue
}
# 处理内容
processed_content = processor_func(content)
# 生成输出文件路径
filename = Path(filepath).stem
output_path = Path(output_dir) / f"{filename}_processed{file_extension}"
# 写入处理后的内容
if self.write_text_file(str(output_path), processed_content) {
processed_count += 1
print(f"处理完成: {filepath} -> {output_path}")
}
} catch Exception as e {
print(f"处理文件失败: {filepath}, 错误: {e}")
}
}
return processed_count
}
func search_and_replace(
filepath: str,
search_pattern: str,
replacement: str,
use_regex: bool = false
) -> bool {
"""搜索和替换文件内容"""
try {
content = self.read_text_file(filepath)
if content is null {
return false
}
if use_regex {
modified_content = re.sub(search_pattern, replacement, content)
} else {
modified_content = content.replace(search_pattern, replacement)
}
return self.write_text_file(filepath, modified_content)
} catch Exception as e {
print(f"搜索替换失败: {filepath}, 错误: {e}")
return false
}
}
}
# 使用文件处理器
processor = FileProcessor()
# 文本处理函数示例
func clean_text(text: str) -> str {
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符
text = re.sub(r'[^\w\s.,!?]', '', text)
return text.strip()
}
func extract_emails(text: str) -> str {
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, text)
return '\n'.join(emails)
}
# 批量处理示例
processed_count = processor.process_text_batch(
"data/raw/*.txt",
"data/processed",
clean_text
)
print(f"处理了 {processed_count} 个文件")
# 配置文件处理示例
config_data = {
"database": {
"host": "localhost",
"port": 5432,
"name": "ai_script_db"
},
"ai_models": {
"text_model": "gpt-4",
"embedding_model": "text-embedding-ada-002"
}
}
processor.write_json_file("config/settings.json", config_data)
5.2 系统管理与监控
进程管理
import subprocess
import psutil
import signal
import threading
class ProcessManager {
func __init__() {
self.running_processes = {}
self.process_logs = {}
}
func start_process(
command: list[str],
name: str? = null,
working_dir: str? = null,
env_vars: dict? = null,
capture_output: bool = true
) -> str? {
"""启动进程"""
try {
if name is null {
name = f"process_{len(self.running_processes)}"
}
# 准备环境变量
env = os.environ.copy()
if env_vars {
env.update(env_vars)
}
# 启动进程
if capture_output {
process = subprocess.Popen(
command,
cwd=working_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=true
)
} else {
process = subprocess.Popen(
command,
cwd=working_dir,
env=env
)
}
self.running_processes[name] = {
"process": process,
"command": command,
"start_time": time.time(),
"capture_output": capture_output
}
print(f"进程 '{name}' 已启动,PID: {process.pid}")
return name
} catch Exception as e {
print(f"启动进程失败: {e}")
return null
}
}
func stop_process(name: str, timeout: int = 10) -> bool {
"""停止进程"""
if name not in self.running_processes {
print(f"进程 '{name}' 不存在")
return false
}
try {
process_info = self.running_processes[name]
process = process_info["process"]
# 尝试优雅关闭
process.terminate()
try {
process.wait(timeout=timeout)
} catch subprocess.TimeoutExpired {
# 强制关闭
process.kill()
process.wait()
}
# 获取输出
if process_info["capture_output"] {
stdout, stderr = process.communicate()
self.process_logs[name] = {
"stdout": stdout,
"stderr": stderr,
"return_code": process.returncode
}
}
del self.running_processes[name]
print(f"进程 '{name}' 已停止")
return true
} catch Exception as e {
print(f"停止进程失败: {e}")
return false
}
}
func get_process_status(name: str) -> dict? {
"""获取进程状态"""
if name not in self.running_processes {
return null
}
process_info = self.running_processes[name]
process = process_info["process"]
try {
# 使用psutil获取详细信息
ps_process = psutil.Process(process.pid)
return {
"name": name,
"pid": process.pid,
"status": ps_process.status(),
"cpu_percent": ps_process.cpu_percent(),
"memory_percent": ps_process.memory_percent(),
"memory_info": ps_process.memory_info()._asdict(),
"create_time": ps_process.create_time(),
"running_time": time.time() - process_info["start_time"],
"command": process_info["command"]
}
} catch psutil.NoSuchProcess {
# 进程已结束
del self.running_processes[name]
return null
} catch Exception as e {
print(f"获取进程状态失败: {e}")
return null
}
}
func list_processes() -> list[dict] {
"""列出所有管理的进程"""
processes = []
for name in list(self.running_processes.keys()) {
status = self.get_process_status(name)
if status {
processes.append(status)
}
}
return processes
}
func get_process_output(name: str) -> dict? {
"""获取进程输出"""
return self.process_logs.get(name)
}
func monitor_process(name: str, callback: callable? = null) {
"""监控进程"""
func monitor_loop() {
while name in self.running_processes {
status = self.get_process_status(name)
if status {
if callback {
callback(status)
}
time.sleep(5) # 每5秒检查一次
} else {
break
}
}
monitor_thread = threading.Thread(target=monitor_loop)
monitor_thread.daemon = true
monitor_thread.start()
}
}
# 使用进程管理器
process_manager = ProcessManager()
# 启动一个长期运行的进程
process_name = process_manager.start_process(
["python", "-m", "http.server", "8000"],
name="web_server",
working_dir="/path/to/web/root"
)
if process_name {
# 监控进程
func log_process_status(status: dict) {
print(f"进程 {status['name']}: CPU {status['cpu_percent']:.1f}%, 内存 {status['memory_percent']:.1f}%")
}
process_manager.monitor_process(process_name, log_process_status)
# 等待一段时间后停止
time.sleep(30)
process_manager.stop_process(process_name)
}
系统监控
import psutil
import platform
import socket
class SystemMonitor {
func __init__() {
self.monitoring = false
self.alerts = []
self.thresholds = {
"cpu_percent": 80.0,
"memory_percent": 85.0,
"disk_percent": 90.0,
"temperature": 70.0
}
}
func get_system_info() -> dict {
"""获取系统基本信息"""
return {
"platform": platform.platform(),
"processor": platform.processor(),
"architecture": platform.architecture(),
"hostname": socket.gethostname(),
"python_version": platform.python_version(),
"boot_time": psutil.boot_time()
}
}
func get_cpu_info() -> dict {
"""获取CPU信息"""
return {
"physical_cores": psutil.cpu_count(logical=false),
"total_cores": psutil.cpu_count(logical=true),
"max_frequency": psutil.cpu_freq().max if psutil.cpu_freq() else null,
"current_frequency": psutil.cpu_freq().current if psutil.cpu_freq() else null,
"cpu_percent": psutil.cpu_percent(interval=1),
"per_cpu_percent": psutil.cpu_percent(interval=1, percpu=true)
}
}
func get_memory_info() -> dict {
"""获取内存信息"""
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
return {
"total": virtual_memory.total,
"available": virtual_memory.available,
"used": virtual_memory.used,
"percentage": virtual_memory.percent,
"swap_total": swap_memory.total,
"swap_used": swap_memory.used,
"swap_percentage": swap_memory.percent
}
}
func get_disk_info() -> list[dict] {
"""获取磁盘信息"""
disk_info = []
for partition in psutil.disk_partitions() {
try {
usage = psutil.disk_usage(partition.mountpoint)
disk_info.append({
"device": partition.device,
"mountpoint": partition.mountpoint,
"file_system": partition.fstype,
"total": usage.total,
"used": usage.used,
"free": usage.free,
"percentage": (usage.used / usage.total) * 100
})
} catch PermissionError {
continue
}
}
return disk_info
}
func get_network_info() -> dict {
"""获取网络信息"""
network_io = psutil.net_io_counters()
network_connections = len(psutil.net_connections())
return {
"bytes_sent": network_io.bytes_sent,
"bytes_received": network_io.bytes_recv,
"packets_sent": network_io.packets_sent,
"packets_received": network_io.packets_recv,
"connections": network_connections
}
}
func get_process_list(limit: int = 10) -> list[dict] {
"""获取进程列表"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']) {
try {
processes.append(proc.info)
} catch (psutil.NoSuchProcess, psutil.AccessDenied) {
continue
}
}
# 按CPU使用率排序
processes.sort(key=lambda x: x['cpu_percent'] or 0, reverse=true)
return processes[:limit]
}
func check_alerts() -> list[str] {
"""检查系统警告"""
alerts = []
# CPU检查
cpu_info = self.get_cpu_info()
if cpu_info["cpu_percent"] > self.thresholds["cpu_percent"] {
alerts.append(f"CPU使用率过高: {cpu_info['cpu_percent']:.1f}%")
}
# 内存检查
memory_info = self.get_memory_info()
if memory_info["percentage"] > self.thresholds["memory_percent"] {
alerts.append(f"内存使用率过高: {memory_info['percentage']:.1f}%")
}
# 磁盘检查
disk_info = self.get_disk_info()
for disk in disk_info {
if disk["percentage"] > self.thresholds["disk_percent"] {
alerts.append(f"磁盘 {disk['device']} 使用率过高: {disk['percentage']:.1f}%")
}
}
return alerts
}
func start_monitoring(interval: int = 60, callback: callable? = null) {
"""开始系统监控"""
self.monitoring = true
func monitoring_loop() {
while self.monitoring {
try {
# 收集系统信息
system_data = {
"timestamp": time.time(),
"cpu": self.get_cpu_info(),
"memory": self.get_memory_info(),
"disk": self.get_disk_info(),
"network": self.get_network_info(),
"top_processes": self.get_process_list(5)
}
# 检查警告
alerts = self.check_alerts()
if alerts {
system_data["alerts"] = alerts
self.alerts.extend(alerts)
}
# 调用回调函数
if callback {
callback(system_data)
}
time.sleep(interval)
} catch Exception as e {
print(f"监控错误: {e}")
time.sleep(interval)
}
}
monitor_thread = threading.Thread(target=monitoring_loop)
monitor_thread.daemon = true
monitor_thread.start()
print(f"系统监控已启动,间隔: {interval}秒")
}
func stop_monitoring() {
"""停止系统监控"""
self.monitoring = false
print("系统监控已停止")
}
func get_system_report() -> dict {
"""生成系统报告"""
return {
"system_info": self.get_system_info(),
"cpu_info": self.get_cpu_info(),
"memory_info": self.get_memory_info(),
"disk_info": self.get_disk_info(),
"network_info": self.get_network_info(),
"top_processes": self.get_process_list(10),
"alerts": self.check_alerts(),
"report_time": time.time()
}
}
}
# 使用系统监控器
monitor = SystemMonitor()
# 生成系统报告
report = monitor.get_system_report()
print("系统报告:")
print(f"CPU使用率: {report['cpu_info']['cpu_percent']:.1f}%")
print(f"内存使用率: {report['memory_info']['percentage']:.1f}%")
print(f"活跃连接数: {report['network_info']['connections']}")
# 检查警告
alerts = report["alerts"]
if alerts {
print("\n系统警告:")
for alert in alerts {
print(f" - {alert}")
}
# 启动持续监控
func log_system_data(data: dict) {
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["timestamp"]))
print(f"[{timestamp}] CPU: {data['cpu']['cpu_percent']:.1f}%, 内存: {data['memory']['percentage']:.1f}%")
if "alerts" in data {
for alert in data["alerts"] {
print(f" 警告: {alert}")
}
}
# monitor.start_monitoring(interval=30, callback=log_system_data)
5.3 网络自动化
HTTP客户端
import requests
import json
import time
from urllib.parse import urljoin, urlparse
class HTTPClient {
func __init__(base_url: str? = null, timeout: int = 30) {
self.base_url = base_url
self.timeout = timeout
self.session = requests.Session()
self.request_history = []
self.retry_config = {
"max_retries": 3,
"backoff_factor": 1,
"status_forcelist": [500, 502, 503, 504]
}
}
func set_headers(headers: dict) {
"""设置默认请求头"""
self.session.headers.update(headers)
}
func set_auth(username: str, password: str) {
"""设置基本认证"""
self.session.auth = (username, password)
}
func set_bearer_token(token: str) {
"""设置Bearer Token"""
self.session.headers["Authorization"] = f"Bearer {token}"
}
func _build_url(endpoint: str) -> str {
"""构建完整URL"""
if self.base_url {
return urljoin(self.base_url, endpoint)
}
return endpoint
}
func _log_request(method: str, url: str, response: any, duration: float) {
"""记录请求日志"""
self.request_history.append({
"timestamp": time.time(),
"method": method,
"url": url,
"status_code": response.status_code,
"duration": duration,
"success": response.ok
})
}
func _retry_request(method: str, url: str, **kwargs) -> any {
"""重试请求"""
max_retries = self.retry_config["max_retries"]
backoff_factor = self.retry_config["backoff_factor"]
status_forcelist = self.retry_config["status_forcelist"]
for attempt in range(max_retries + 1) {
try {
start_time = time.time()
response = self.session.request(method, url, timeout=self.timeout, **kwargs)
duration = time.time() - start_time
self._log_request(method, url, response, duration)
if response.ok or response.status_code not in status_forcelist {
return response
}
if attempt < max_retries {
wait_time = backoff_factor * (2 ** attempt)
print(f"请求失败 (状态码: {response.status_code}),{wait_time}秒后重试...")
time.sleep(wait_time)
}
} catch requests.exceptions.RequestException as e {
if attempt < max_retries {
wait_time = backoff_factor * (2 ** attempt)
print(f"请求异常: {e},{wait_time}秒后重试...")
time.sleep(wait_time)
} else {
raise e
}
}
}
return response
}
func get(endpoint: str, params: dict? = null, **kwargs) -> any {
"""GET请求"""
url = self._build_url(endpoint)
return self._retry_request("GET", url, params=params, **kwargs)
}
func post(endpoint: str, data: any? = null, json_data: dict? = null, **kwargs) -> any {
"""POST请求"""
url = self._build_url(endpoint)
return self._retry_request("POST", url, data=data, json=json_data, **kwargs)
}
func put(endpoint: str, data: any? = null, json_data: dict? = null, **kwargs) -> any {
"""PUT请求"""
url = self._build_url(endpoint)
return self._retry_request("PUT", url, data=data, json=json_data, **kwargs)
}
func delete(endpoint: str, **kwargs) -> any {
"""DELETE请求"""
url = self._build_url(endpoint)
return self._retry_request("DELETE", url, **kwargs)
}
func download_file(url: str, filepath: str, chunk_size: int = 8192) -> bool {
"""下载文件"""
try {
response = self.get(url, stream=true)
response.raise_for_status()
# 确保目录存在
Path(filepath).parent.mkdir(parents=true, exist_ok=true)
with open(filepath, "wb") as f {
for chunk in response.iter_content(chunk_size=chunk_size) {
if chunk {
f.write(chunk)
}
}
}
print(f"文件下载完成: {filepath}")
return true
} catch Exception as e {
print(f"文件下载失败: {e}")
return false
}
}
func upload_file(endpoint: str, filepath: str, field_name: str = "file") -> any {
"""上传文件"""
url = self._build_url(endpoint)
with open(filepath, "rb") as f {
files = {field_name: f}
return self._retry_request("POST", url, files=files)
}
}
func get_request_stats() -> dict {
"""获取请求统计"""
if not self.request_history {
return {"total_requests": 0}
}
total_requests = len(self.request_history)
successful_requests = sum(1 for req in self.request_history if req["success"])
failed_requests = total_requests - successful_requests
durations = [req["duration"] for req in self.request_history]
avg_duration = sum(durations) / len(durations)
return {
"total_requests": total_requests,
"successful_requests": successful_requests,
"failed_requests": failed_requests,
"success_rate": successful_requests / total_requests,
"average_duration": avg_duration,
"min_duration": min(durations),
"max_duration": max(durations)
}
}
}
# API客户端示例
class APIClient(HTTPClient) {
func __init__(base_url: str, api_key: str? = null) {
super().__init__(base_url)
if api_key {
self.set_headers({"X-API-Key": api_key})
}
self.set_headers({
"Content-Type": "application/json",
"User-Agent": "AIScript-Client/1.0"
})
}
func get_user(user_id: int) -> dict? {
"""获取用户信息"""
try {
response = self.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
} catch Exception as e {
print(f"获取用户失败: {e}")
return null
}
}
func create_user(user_data: dict) -> dict? {
"""创建用户"""
try {
response = self.post("/users", json_data=user_data)
response.raise_for_status()
return response.json()
} catch Exception as e {
print(f"创建用户失败: {e}")
return null
}
}
func update_user(user_id: int, user_data: dict) -> dict? {
"""更新用户"""
try {
response = self.put(f"/users/{user_id}", json_data=user_data)
response.raise_for_status()
return response.json()
} catch Exception as e {
print(f"更新用户失败: {e}")
return null
}
}
func delete_user(user_id: int) -> bool {
"""删除用户"""
try {
response = self.delete(f"/users/{user_id}")
response.raise_for_status()
return true
} catch Exception as e {
print(f"删除用户失败: {e}")
return false
}
}
func search_users(query: str, limit: int = 10) -> list[dict] {
"""搜索用户"""
try {
params = {"q": query, "limit": limit}
response = self.get("/users/search", params=params)
response.raise_for_status()
return response.json().get("users", [])
} catch Exception as e {
print(f"搜索用户失败: {e}")
return []
}
}
}
# 使用HTTP客户端
client = HTTPClient("https://api.example.com")
client.set_bearer_token("your-api-token")
# 发送请求
response = client.get("/status")
if response.ok {
print(f"API状态: {response.json()}")
}
# 下载文件
client.download_file(
"https://example.com/data.csv",
"downloads/data.csv"
)
# 获取请求统计
stats = client.get_request_stats()
print(f"请求统计: {stats}")
网络工具
import socket
import ping3
import subprocess
import re
class NetworkTools {
func __init__() {
self.results = []
}
func ping_host(host: str, count: int = 4, timeout: int = 3) -> dict {
"""Ping主机"""
results = {
"host": host,
"packets_sent": count,
"packets_received": 0,
"packet_loss": 0.0,
"min_time": null,
"max_time": null,
"avg_time": null,
"times": []
}
times = []
for i in range(count) {
try {
response_time = ping3.ping(host, timeout=timeout)
if response_time is not null {
times.append(response_time * 1000) # 转换为毫秒
results["packets_received"] += 1
}
} catch Exception as e {
print(f"Ping失败: {e}")
}
}
if times {
results["times"] = times
results["min_time"] = min(times)
results["max_time"] = max(times)
results["avg_time"] = sum(times) / len(times)
}
results["packet_loss"] = (count - results["packets_received"]) / count * 100
return results
}
func check_port(host: str, port: int, timeout: int = 3) -> bool {
"""检查端口是否开放"""
try {
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
} catch Exception {
return false
}
}
func scan_ports(host: str, ports: list[int], timeout: int = 1) -> dict {
"""扫描端口"""
open_ports = []
closed_ports = []
for port in ports {
if self.check_port(host, port, timeout) {
open_ports.append(port)
} else {
closed_ports.append(port)
}
}
return {
"host": host,
"open_ports": open_ports,
"closed_ports": closed_ports,
"total_scanned": len(ports)
}
}
func get_local_ip() -> str? {
"""获取本地IP地址"""
try {
# 连接到外部地址来获取本地IP
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(("8.8.8.8", 80))
local_ip = sock.getsockname()[0]
sock.close()
return local_ip
} catch Exception {
return null
}
}
func resolve_hostname(hostname: str) -> list[str] {
"""解析主机名"""
try {
ip_addresses = socket.gethostbyname_ex(hostname)[2]
return ip_addresses
} catch Exception {
return []
}
}
func reverse_dns(ip: str) -> str? {
"""反向DNS查询"""
try {
hostname = socket.gethostbyaddr(ip)[0]
return hostname
} catch Exception {
return null
}
}
func traceroute(host: str, max_hops: int = 30) -> list[dict] {
"""路由跟踪"""
try {
if platform.system().lower() == "windows" {
cmd = ["tracert", "-h", str(max_hops), host]
} else {
cmd = ["traceroute", "-m", str(max_hops), host]
}
result = subprocess.run(cmd, capture_output=true, text=true, timeout=60)
# 解析输出
hops = []
lines = result.stdout.split('\n')
for line in lines {
# 简化的解析逻辑
if re.search(r'\d+\s+', line) {
parts = line.strip().split()
if len(parts) >= 2 {
hop_num = parts[0]
if hop_num.isdigit() {
hops.append({
"hop": int(hop_num),
"raw_line": line.strip()
})
}
}
}
}
return hops
} catch Exception as e {
print(f"路由跟踪失败: {e}")
return []
}
}
func network_speed_test(test_url: str = "http://speedtest.ftp.otenet.gr/files/test1Mb.db") -> dict {
"""网络速度测试"""
try {
import requests
start_time = time.time()
response = requests.get(test_url, stream=true)
total_size = 0
for chunk in response.iter_content(chunk_size=1024) {
total_size += len(chunk)
}
end_time = time.time()
duration = end_time - start_time
speed_bps = total_size / duration # 字节每秒
speed_mbps = speed_bps / (1024 * 1024) # MB每秒
return {
"total_size": total_size,
"duration": duration,
"speed_bps": speed_bps,
"speed_mbps": speed_mbps,
"test_url": test_url
}
} except Exception as e {
print(f"网络速度测试失败: {e}")
return {}
}
}
func network_diagnostics(host: str) -> dict {
"""网络诊断"""
diagnostics = {
"host": host,
"timestamp": time.time()
}
# DNS解析
print(f"正在解析 {host}...")
ip_addresses = self.resolve_hostname(host)
diagnostics["dns_resolution"] = {
"success": len(ip_addresses) > 0,
"ip_addresses": ip_addresses
}
if ip_addresses {
target_ip = ip_addresses[0]
# Ping测试
print(f"正在Ping {target_ip}...")
ping_result = self.ping_host(target_ip)
diagnostics["ping"] = ping_result
# 端口扫描
print(f"正在扫描常用端口...")
common_ports = [22, 23, 25, 53, 80, 110, 143, 443, 993, 995]
port_scan = self.scan_ports(target_ip, common_ports)
diagnostics["port_scan"] = port_scan
# 路由跟踪
print(f"正在进行路由跟踪...")
traceroute_result = self.traceroute(target_ip)
diagnostics["traceroute"] = traceroute_result
}
return diagnostics
}
}
# 使用网络工具
net_tools = NetworkTools()
# 网络诊断示例
diagnostics = net_tools.network_diagnostics("google.com")
print("网络诊断结果:")
print(f"DNS解析: {'成功' if diagnostics['dns_resolution']['success'] else '失败'}")
if "ping" in diagnostics {
ping_result = diagnostics["ping"]
print(f"Ping结果: 丢包率 {ping_result['packet_loss']:.1f}%, 平均延迟 {ping_result['avg_time']:.1f}ms")
}
if "port_scan" in diagnostics {
port_result = diagnostics["port_scan"]
print(f"开放端口: {port_result['open_ports']}")
}
# 获取本地网络信息
local_ip = net_tools.get_local_ip()
print(f"本地IP地址: {local_ip}")
# 网络速度测试
# speed_result = net_tools.network_speed_test()
# if speed_result {
# print(f"网络速度: {speed_result['speed_mbps']:.2f} MB/s")
# }
5.4 数据库自动化
数据库连接管理
import sqlite3
import mysql.connector
import psycopg2
import redis
from contextlib import contextmanager
class DatabaseManager {
func __init__() {
self.connections = {}
self.connection_configs = {}
}
func add_connection(
name: str,
db_type: str,
config: dict
) {
"""添加数据库连接配置"""
self.connection_configs[name] = {
"type": db_type,
"config": config
}
}
func get_connection(name: str) -> any? {
"""获取数据库连接"""
if name in self.connections {
return self.connections[name]
}
if name not in self.connection_configs {
raise ValueError(f"未找到连接配置: {name}")
}
config_info = self.connection_configs[name]
db_type = config_info["type"]
config = config_info["config"]
try {
match db_type {
"sqlite" => {
conn = sqlite3.connect(config["database"])
conn.row_factory = sqlite3.Row # 使结果可以按列名访问
}
"mysql" => {
conn = mysql.connector.connect(**config)
}
"postgresql" => {
conn = psycopg2.connect(**config)
}
"redis" => {
conn = redis.Redis(**config)
}
_ => {
raise ValueError(f"不支持的数据库类型: {db_type}")
}
}
self.connections[name] = conn
print(f"数据库连接 '{name}' 已建立")
return conn
} catch Exception as e {
print(f"连接数据库失败: {e}")
return null
}
}
@contextmanager
func get_cursor(connection_name: str) {
"""获取数据库游标(上下文管理器)"""
conn = self.get_connection(connection_name)
if conn is null {
raise ValueError(f"无法获取连接: {connection_name}")
}
cursor = conn.cursor()
try {
yield cursor
conn.commit()
} catch Exception as e {
conn.rollback()
raise e
} finally {
cursor.close()
}
}
func execute_query(
connection_name: str,
query: str,
params: tuple? = null
) -> list[dict] {
"""执行查询"""
with self.get_cursor(connection_name) as cursor {
if params {
cursor.execute(query, params)
} else {
cursor.execute(query)
}
# 获取列名
columns = [desc[0] for desc in cursor.description] if cursor.description else []
# 获取结果
rows = cursor.fetchall()
# 转换为字典列表
result = []
for row in rows {
if isinstance(row, sqlite3.Row) {
result.append(dict(row))
} else {
result.append(dict(zip(columns, row)))
}
}
return result
}
}
func execute_update(
connection_name: str,
query: str,
params: tuple? = null
) -> int {
"""执行更新操作"""
with self.get_cursor(connection_name) as cursor {
if params {
cursor.execute(query, params)
} else {
cursor.execute(query)
}
return cursor.rowcount
}
}
func execute_batch(
connection_name: str,
query: str,
params_list: list[tuple]
) -> int {
"""批量执行"""
with self.get_cursor(connection_name) as cursor {
cursor.executemany(query, params_list)
return cursor.rowcount
}
}
func close_connection(name: str) {
"""关闭连接"""
if name in self.connections {
self.connections[name].close()
del self.connections[name]
print(f"数据库连接 '{name}' 已关闭")
}
func close_all_connections() {
"""关闭所有连接"""
for name in list(self.connections.keys()) {
self.close_connection(name)
}
}
}
# 数据库操作类
class DatabaseOperations {
func __init__(db_manager: DatabaseManager) {
self.db_manager = db_manager
}
func create_table(
connection_name: str,
table_name: str,
columns: dict,
primary_key: str? = null
) -> bool {
"""创建表"""
try {
# 构建列定义
column_defs = []
for col_name, col_type in columns.items() {
column_defs.append(f"{col_name} {col_type}")
}
if primary_key {
column_defs.append(f"PRIMARY KEY ({primary_key})")
}
query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)})"
self.db_manager.execute_update(connection_name, query)
print(f"表 '{table_name}' 创建成功")
return true
} catch Exception as e {
print(f"创建表失败: {e}")
return false
}
}
func insert_data(
connection_name: str,
table_name: str,
data: dict
) -> bool {
"""插入数据"""
try {
columns = list(data.keys())
placeholders = ["?" for _ in columns] # SQLite风格
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
values = tuple(data.values())
self.db_manager.execute_update(connection_name, query, values)
print(f"数据插入成功到表 '{table_name}'")
return true
} catch Exception as e {
print(f"插入数据失败: {e}")
return false
}
}
func bulk_insert(
connection_name: str,
table_name: str,
data_list: list[dict]
) -> bool {
"""批量插入数据"""
if not data_list {
return false
}
try {
columns = list(data_list[0].keys())
placeholders = ["?" for _ in columns]
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
# 准备参数列表
params_list = [tuple(data.values()) for data in data_list]
rows_affected = self.db_manager.execute_batch(connection_name, query, params_list)
print(f"批量插入完成,影响 {rows_affected} 行")
return true
} catch Exception as e {
print(f"批量插入失败: {e}")
return false
}
}
func update_data(
connection_name: str,
table_name: str,
data: dict,
where_clause: str,
where_params: tuple? = null
) -> int {
"""更新数据"""
try {
set_clauses = [f"{col} = ?" for col in data.keys()]
query = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE {where_clause}"
params = list(data.values())
if where_params {
params.extend(where_params)
}
rows_affected = self.db_manager.execute_update(connection_name, query, tuple(params))
print(f"更新完成,影响 {rows_affected} 行")
return rows_affected
} catch Exception as e {
print(f"更新数据失败: {e}")
return 0
}
}
func delete_data(
connection_name: str,
table_name: str,
where_clause: str,
where_params: tuple? = null
) -> int {
"""删除数据"""
try {
query = f"DELETE FROM {table_name} WHERE {where_clause}"
rows_affected = self.db_manager.execute_update(connection_name, query, where_params)
print(f"删除完成,影响 {rows_affected} 行")
return rows_affected
} catch Exception as e {
print(f"删除数据失败: {e}")
return 0
}
}
func backup_table(
connection_name: str,
table_name: str,
backup_file: str
) -> bool {
"""备份表数据"""
try {
# 查询所有数据
data = self.db_manager.execute_query(connection_name, f"SELECT * FROM {table_name}")
# 写入JSON文件
import json
with open(backup_file, "w", encoding="utf-8") as f {
json.dump(data, f, indent=2, ensure_ascii=false, default=str)
}
print(f"表 '{table_name}' 备份完成: {backup_file}")
return true
} catch Exception as e {
print(f"备份表失败: {e}")
return false
}
}
func restore_table(
connection_name: str,
table_name: str,
backup_file: str,
clear_existing: bool = false
) -> bool {
"""恢复表数据"""
try {
# 读取备份文件
import json
with open(backup_file, "r", encoding="utf-8") as f {
data = json.load(f)
}
if not data {
print("备份文件为空")
return false
}
# 清空现有数据
if clear_existing {
self.delete_data(connection_name, table_name, "1=1")
}
# 批量插入数据
return self.bulk_insert(connection_name, table_name, data)
} catch Exception as e {
print(f"恢复表失败: {e}")
return false
}
}
}
# 使用数据库管理器
db_manager = DatabaseManager()
# 添加SQLite连接
db_manager.add_connection("local_db", "sqlite", {
"database": "data/app.db"
})
# 添加MySQL连接
db_manager.add_connection("mysql_db", "mysql", {
"host": "localhost",
"user": "username",
"password": "password",
"database": "myapp"
})
# 数据库操作
db_ops = DatabaseOperations(db_manager)
# 创建用户表
user_table_columns = {
"id": "INTEGER",
"username": "VARCHAR(50) NOT NULL",
"email": "VARCHAR(100) NOT NULL",
"created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
}
db_ops.create_table("local_db", "users", user_table_columns, "id")
# 插入用户数据
user_data = {
"username": "john_doe",
"email": "john@example.com"
}
db_ops.insert_data("local_db", "users", user_data)
# 查询用户
users = db_manager.execute_query("local_db", "SELECT * FROM users WHERE username = ?", ("john_doe",))
print(f"查询结果: {users}")
# 备份表
db_ops.backup_table("local_db", "users", "backup/users_backup.json")
5.5 任务调度与自动化
定时任务调度
import schedule
import threading
import time
from datetime import datetime, timedelta
class TaskScheduler {
func __init__() {
self.jobs = {}
self.running = false
self.scheduler_thread = null
}
func add_job(
name: str,
func: callable,
schedule_type: str,
schedule_value: any,
args: tuple = (),
kwargs: dict = {},
max_retries: int = 3
) -> bool {
"""添加定时任务"""
try {
job_info = {
"function": func,
"args": args,
"kwargs": kwargs,
"max_retries": max_retries,
"retry_count": 0,
"last_run": null,
"next_run": null,
"status": "scheduled",
"error_log": []
}
# 根据调度类型设置任务
match schedule_type {
"interval" => {
job = schedule.every(schedule_value).seconds.do(self._run_job, name)
}
"daily" => {
job = schedule.every().day.at(schedule_value).do(self._run_job, name)
}
"weekly" => {
day, time_str = schedule_value
job = getattr(schedule.every(), day.lower()).at(time_str).do(self._run_job, name)
}
"monthly" => {
# 简化的月度调度
job = schedule.every().day.at(schedule_value).do(self._run_job, name)
}
"cron" => {
# 简化的cron表达式支持
job = schedule.every().minute.do(self._run_job, name)
}
_ => {
raise ValueError(f"不支持的调度类型: {schedule_type}")
}
}
job_info["schedule_job"] = job
self.jobs[name] = job_info
print(f"任务 '{name}' 已添加到调度器")
return true
} catch Exception as e {
print(f"添加任务失败: {e}")
return false
}
}
func _run_job(name: str) {
"""执行任务"""
if name not in self.jobs {
return
}
job_info = self.jobs[name]
try {
print(f"开始执行任务: {name}")
start_time = time.time()
# 执行任务函数
result = job_info["function"](*job_info["args"], **job_info["kwargs"])
end_time = time.time()
duration = end_time - start_time
# 更新任务信息
job_info["last_run"] = datetime.now()
job_info["status"] = "completed"
job_info["retry_count"] = 0
print(f"任务 '{name}' 执行完成,耗时: {duration:.2f}秒")
} catch Exception as e {
job_info["retry_count"] += 1
job_info["error_log"].append({
"timestamp": datetime.now(),
"error": str(e)
})
print(f"任务 '{name}' 执行失败: {e}")
if job_info["retry_count"] < job_info["max_retries"] {
print(f"将在1分钟后重试 (第{job_info['retry_count']}次重试)")
# 简单的重试机制
threading.Timer(60, self._run_job, args=[name]).start()
} else {
job_info["status"] = "failed"
print(f"任务 '{name}' 重试次数已达上限,标记为失败")
}
}
func start() {
"""启动调度器"""
if self.running {
print("调度器已在运行")
return
}
self.running = true
func scheduler_loop() {
while self.running {
schedule.run_pending()
time.sleep(1)
}
}
self.scheduler_thread = threading.Thread(target=scheduler_loop)
self.scheduler_thread.daemon = true
self.scheduler_thread.start()
print("任务调度器已启动")
}
func stop() {
"""停止调度器"""
self.running = false
if self.scheduler_thread {
self.scheduler_thread.join(timeout=5)
}
print("任务调度器已停止")
}
func remove_job(name: str) -> bool {
"""移除任务"""
if name not in self.jobs {
print(f"任务 '{name}' 不存在")
return false
}
# 取消调度
schedule.cancel_job(self.jobs[name]["schedule_job"])
del self.jobs[name]
print(f"任务 '{name}' 已移除")
return true
}
func list_jobs() -> list[dict] {
"""列出所有任务"""
job_list = []
for name, job_info in self.jobs.items() {
job_list.append({
"name": name,
"status": job_info["status"],
"last_run": job_info["last_run"],
"retry_count": job_info["retry_count"],
"max_retries": job_info["max_retries"],
"error_count": len(job_info["error_log"])
})
}
return job_list
}
func get_job_status(name: str) -> dict? {
"""获取任务状态"""
if name not in self.jobs {
return null
}
job_info = self.jobs[name]
return {
"name": name,
"status": job_info["status"],
"last_run": job_info["last_run"],
"retry_count": job_info["retry_count"],
"max_retries": job_info["max_retries"],
"error_log": job_info["error_log"][-5:] # 最近5个错误
}
}
}
# 任务函数示例
func backup_database() {
print("执行数据库备份...")
# 模拟备份操作
time.sleep(2)
print("数据库备份完成")
}
func send_daily_report() {
print("生成并发送日报...")
# 模拟报告生成
time.sleep(1)
print("日报发送完成")
}
func cleanup_temp_files() {
print("清理临时文件...")
# 模拟文件清理
import os
import glob
temp_files = glob.glob("/tmp/*.tmp")
for file in temp_files {
try {
os.remove(file)
print(f"删除临时文件: {file}")
} catch Exception as e {
print(f"删除文件失败: {e}")
}
}
print("临时文件清理完成")
}
# 使用任务调度器
scheduler = TaskScheduler()
# 添加不同类型的任务
scheduler.add_job(
"database_backup",
backup_database,
"daily",
"02:00" # 每天凌晨2点
)
scheduler.add_job(
"daily_report",
send_daily_report,
"daily",
"09:00" # 每天上午9点
)
scheduler.add_job(
"cleanup_temp",
cleanup_temp_files,
"interval",
3600 # 每小时执行一次
)
scheduler.add_job(
"weekly_maintenance",
lambda: print("执行周维护任务"),
"weekly",
("sunday", "01:00") # 每周日凌晨1点
)
# 启动调度器
scheduler.start()
# 查看任务列表
jobs = scheduler.list_jobs()
print("当前任务列表:")
for job in jobs {
print(f" {job['name']}: {job['status']}")
}
# 运行一段时间后停止
# time.sleep(60)
# scheduler.stop()
工作流自动化
import json
import time
from datetime import datetime
from enum import Enum
class WorkflowStatus(Enum) {
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
}
class TaskStatus(Enum) {
WAITING = "waiting"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
}
class WorkflowTask {
func __init__(
name: str,
function: callable,
args: tuple = (),
kwargs: dict = {},
depends_on: list[str] = [],
timeout: int = 300,
retry_count: int = 0
) {
self.name = name
self.function = function
self.args = args
self.kwargs = kwargs
self.depends_on = depends_on
self.timeout = timeout
self.retry_count = retry_count
self.max_retries = retry_count
self.status = TaskStatus.WAITING
self.result = null
self.error = null
self.start_time = null
self.end_time = null
self.duration = 0
}
func can_run(completed_tasks: set[str]) -> bool {
"""检查任务是否可以运行"""
if self.status != TaskStatus.WAITING {
return false
}
# 检查依赖任务是否都已完成
for dep in self.depends_on {
if dep not in completed_tasks {
return false
}
}
return true
}
func execute() -> bool {
"""执行任务"""
self.status = TaskStatus.RUNNING
self.start_time = datetime.now()
try {
print(f"开始执行任务: {self.name}")
self.result = self.function(*self.args, **self.kwargs)
self.status = TaskStatus.COMPLETED
print(f"任务 '{self.name}' 执行完成")
return true
} catch Exception as e {
self.error = str(e)
print(f"任务 '{self.name}' 执行失败: {e}")
if self.retry_count > 0 {
self.retry_count -= 1
self.status = TaskStatus.WAITING
print(f"任务 '{self.name}' 将重试,剩余重试次数: {self.retry_count}")
return false
} else {
self.status = TaskStatus.FAILED
return false
}
} finally {
self.end_time = datetime.now()
if self.start_time {
self.duration = (self.end_time - self.start_time).total_seconds()
}
}
}
}
class Workflow {
func __init__(name: str) {
self.name = name
self.tasks = {}
self.status = WorkflowStatus.PENDING
self.start_time = null
self.end_time = null
self.duration = 0
self.completed_tasks = set()
self.failed_tasks = set()
}
func add_task(
name: str,
function: callable,
args: tuple = (),
kwargs: dict = {},
depends_on: list[str] = [],
timeout: int = 300,
retry_count: int = 0
) -> WorkflowTask {
"""添加任务到工作流"""
task = WorkflowTask(
name, function, args, kwargs,
depends_on, timeout, retry_count
)
self.tasks[name] = task
return task
}
func validate() -> bool {
"""验证工作流的依赖关系"""
# 检查循环依赖
visited = set()
rec_stack = set()
func has_cycle(task_name: str) -> bool {
if task_name in rec_stack {
return true
}
if task_name in visited {
return false
}
visited.add(task_name)
rec_stack.add(task_name)
if task_name in self.tasks {
for dep in self.tasks[task_name].depends_on {
if has_cycle(dep) {
return true
}
}
}
rec_stack.remove(task_name)
return false
}
for task_name in self.tasks.keys() {
if has_cycle(task_name) {
print(f"检测到循环依赖,涉及任务: {task_name}")
return false
}
}
# 检查依赖任务是否存在
for task_name, task in self.tasks.items() {
for dep in task.depends_on {
if dep not in self.tasks {
print(f"任务 '{task_name}' 依赖的任务 '{dep}' 不存在")
return false
}
}
}
return true
}
func execute() -> bool {
"""执行工作流"""
if not self.validate() {
print(f"工作流 '{self.name}' 验证失败")
self.status = WorkflowStatus.FAILED
return false
}
print(f"开始执行工作流: {self.name}")
self.status = WorkflowStatus.RUNNING
self.start_time = datetime.now()
try {
while true {
# 查找可以执行的任务
runnable_tasks = []
for task_name, task in self.tasks.items() {
if task.can_run(self.completed_tasks) {
runnable_tasks.append(task)
}
}
if not runnable_tasks {
# 检查是否所有任务都已完成
if len(self.completed_tasks) == len(self.tasks) {
print(f"工作流 '{self.name}' 执行完成")
self.status = WorkflowStatus.COMPLETED
return true
} else {
# 有任务失败或无法执行
print(f"工作流 '{self.name}' 执行失败")
self.status = WorkflowStatus.FAILED
return false
}
}
# 执行可运行的任务
for task in runnable_tasks {
if task.execute() {
self.completed_tasks.add(task.name)
} else {
if task.status == TaskStatus.FAILED {
self.failed_tasks.add(task.name)
}
}
}
# 检查是否有任务失败
if self.failed_tasks {
print(f"工作流 '{self.name}' 中有任务失败: {list(self.failed_tasks)}")
self.status = WorkflowStatus.FAILED
return false
}
# 短暂等待
time.sleep(0.1)
}
} except KeyboardInterrupt {
print(f"工作流 '{self.name}' 被用户取消")
self.status = WorkflowStatus.CANCELLED
return false
} except Exception as e {
print(f"工作流 '{self.name}' 执行异常: {e}")
self.status = WorkflowStatus.FAILED
return false
} finally {
self.end_time = datetime.now()
if self.start_time {
self.duration = (self.end_time - self.start_time).total_seconds()
}
}
}
func get_status() -> dict {
"""获取工作流状态"""
task_statuses = {}
for name, task in self.tasks.items() {
task_statuses[name] = {
"status": task.status.value,
"duration": task.duration,
"error": task.error
}
}
return {
"name": self.name,
"status": self.status.value,
"duration": self.duration,
"completed_tasks": list(self.completed_tasks),
"failed_tasks": list(self.failed_tasks),
"task_details": task_statuses
}
}
func save_report(file_path: str) {
"""保存执行报告"""
report = self.get_status()
report["start_time"] = self.start_time.isoformat() if self.start_time else null
report["end_time"] = self.end_time.isoformat() if self.end_time else null
with open(file_path, "w", encoding="utf-8") as f {
json.dump(report, f, indent=2, ensure_ascii=false)
}
print(f"工作流报告已保存到: {file_path}")
}
}
# 工作流示例任务函数
func download_data(url: str, output_file: str) {
print(f"下载数据从 {url} 到 {output_file}")
time.sleep(1) # 模拟下载
return f"数据已下载到 {output_file}"
}
func process_data(input_file: str, output_file: str) {
print(f"处理数据从 {input_file} 到 {output_file}")
time.sleep(2) # 模拟处理
return f"数据已处理并保存到 {output_file}"
}
func analyze_data(data_file: str) {
print(f"分析数据文件 {data_file}")
time.sleep(1) # 模拟分析
return f"分析结果: 数据质量良好"
}
func generate_report(analysis_result: str, report_file: str) {
print(f"生成报告到 {report_file}")
time.sleep(1) # 模拟报告生成
return f"报告已生成: {report_file}"
}
func send_notification(message: str) {
print(f"发送通知: {message}")
time.sleep(0.5) # 模拟发送
return "通知已发送"
}
# 创建数据处理工作流
data_workflow = Workflow("数据处理工作流")
# 添加任务
data_workflow.add_task(
"download",
download_data,
args=("https://api.example.com/data", "raw_data.json")
)
data_workflow.add_task(
"process",
process_data,
args=("raw_data.json", "processed_data.json"),
depends_on=["download"]
)
data_workflow.add_task(
"analyze",
analyze_data,
args=("processed_data.json",),
depends_on=["process"]
)
data_workflow.add_task(
"report",
generate_report,
args=("分析完成", "analysis_report.pdf"),
depends_on=["analyze"]
)
data_workflow.add_task(
"notify",
send_notification,
args=("数据处理工作流已完成",),
depends_on=["report"]
)
# 执行工作流
if data_workflow.execute() {
print("工作流执行成功")
} else {
print("工作流执行失败")
}
# 保存执行报告
data_workflow.save_report("workflow_report.json")
# 查看状态
status = data_workflow.get_status()
print(f"工作流状态: {status['status']}")
print(f"执行时长: {status['duration']:.2f}秒")
print(f"完成任务: {status['completed_tasks']}")
5.6 本章总结
本章详细介绍了AI Script在自动化脚本开发方面的强大功能,涵盖了以下核心内容:
核心要点
文件系统操作
- 文件和目录的创建、删除、移动、复制
- 文件内容的读取、写入、搜索和替换
- 批量文件处理和模式匹配
系统管理与监控
- 进程管理:启动、停止、监控进程
- 系统监控:CPU、内存、磁盘、网络使用情况
- 系统信息获取和性能分析
网络自动化
- HTTP客户端:GET、POST、PUT、DELETE请求
- 网络诊断:ping、端口扫描、连接测试
- API集成和数据交换
数据库自动化
- 多数据库连接管理
- CRUD操作的自动化
- 数据备份和恢复
- 批量数据处理
任务调度与自动化
- 定时任务调度器
- 多种调度模式:间隔、每日、每周、月度
- 任务重试和错误处理
- 任务状态监控
工作流自动化
- 任务依赖关系管理
- 并行和串行任务执行
- 工作流状态跟踪
- 执行报告生成
最佳实践
错误处理
- 使用try-catch块处理异常
- 实现重试机制
- 记录详细的错误日志
资源管理
- 及时关闭文件句柄和网络连接
- 使用连接池管理数据库连接
- 监控系统资源使用情况
安全考虑
- 验证输入参数
- 使用安全的文件路径操作
- 保护敏感信息(密码、API密钥)
性能优化
- 使用批量操作减少I/O次数
- 实现并行处理提高效率
- 合理设置超时和重试参数
可维护性
- 模块化设计,职责分离
- 详细的文档和注释
- 统一的错误处理和日志格式
练习题
基础练习
- 编写一个文件备份脚本,支持增量备份
- 创建一个系统监控脚本,定期检查系统状态
- 实现一个日志分析工具,统计错误日志
进阶练习
- 开发一个数据同步工具,在不同数据库间同步数据
- 创建一个网站监控系统,检查网站可用性
- 实现一个自动化部署脚本,支持多环境部署
综合项目
- 构建一个完整的数据处理管道
- 开发一个多任务调度系统
- 创建一个自动化运维平台
通过本章的学习,你应该能够熟练使用AI Script开发各种自动化脚本,提高工作效率,减少重复性工作。下一章我们将学习AI Script的高级特性和扩展开发。