028_分布式部署架构

发布于:2025-07-16 ⋅ 阅读:(13) ⋅ 点赞:(0)

028_分布式部署架构

概述

本文档介绍如何设计和实现Claude应用的分布式部署架构,包括负载均衡、缓存策略、服务发现、容错机制等。

微服务架构设计

1. 服务拆分策略

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import asyncio
import aiohttp
import json
from dataclasses import dataclass
from enum import Enum

class ServiceType(Enum):
    GATEWAY = "gateway"
    AUTH = "auth"
    CONVERSATION = "conversation"
    TRANSLATION = "translation"
    CONTENT_FILTER = "content_filter"
    CACHE = "cache"
    METRICS = "metrics"

@dataclass
class ServiceConfig:
    name: str
    service_type: ServiceType
    host: str
    port: int
    health_check_path: str = "/health"
    version: str = "1.0.0"
    replicas: int = 1
    max_requests_per_second: int = 100

class BaseService(ABC):
    def __init__(self, config: ServiceConfig):
        self.config = config
        self.is_healthy = True
        self.metrics = {
            'requests_processed': 0,
            'errors_count': 0,
            'avg_response_time': 0
        }
    
    @abstractmethod
    async def start(self):
        """启动服务"""
        pass
    
    @abstractmethod
    async def stop(self):
        """停止服务"""
        pass
    
    @abstractmethod
    async def health_check(self) -> Dict[str, Any]:
        """健康检查"""
        pass
    
    async def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理请求的通用包装器"""
        start_time = asyncio.get_event_loop().time()
        
        try:
            self.metrics['requests_processed'] += 1
            result = await self._process_request_impl(request)
            
            # 更新响应时间
            response_time = asyncio.get_event_loop().time() - start_time
            self._update_avg_response_time(response_time)
            
            return {
                'status': 'success',
                'data': result,
                'service': self.config.name,
                'response_time': response_time
            }
            
        except Exception as e:
            self.metrics['errors_count'] += 1
            response_time = asyncio.get_event_loop().time() - start_time
            
            return {
                'status': 'error',
                'error': str(e),
                'service': self.config.name,
                'response_time': response_time
            }
    
    @abstractmethod
    async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
        """实际的请求处理逻辑"""
        pass
    
    def _update_avg_response_time(self, new_time: float):
        """更新平均响应时间"""
        current_avg = self.metrics['avg_response_time']
        total_requests = self.metrics['requests_processed']
        
        # 简单的移动平均
        self.metrics['avg_response_time'] = (
            (current_avg * (total_requests - 1) + new_time) / total_requests
        )

# Claude服务实现
class ClaudeService(BaseService):
    def __init__(self, config: ServiceConfig, anthropic_client):
        super().__init__(config)
        self.client = anthropic_client
        self.conversation_manager = None
    
    async def start(self):
        """启动Claude服务"""
        # 初始化必要的组件
        self.is_healthy = True
        print(f"Claude service {self.config.name} started on {self.config.host}:{self.config.port}")
    
    async def stop(self):
        """停止Claude服务"""
        self.is_healthy = False
        print(f"Claude service {self.config.name} stopped")
    
    async def health_check(self) -> Dict[str, Any]:
        """健康检查"""
        # 检查与Claude API的连接
        try:
            # 简单的测试请求
            response = await self._make_test_request()
            
            return {
                'status': 'healthy',
                'service': self.config.name,
                'version': self.config.version,
                'metrics': self.metrics,
                'api_connection': 'ok'
            }
        except Exception as e:
            self.is_healthy = False
            return {
                'status': 'unhealthy',
                'service': self.config.name,
                'error': str(e)
            }
    
    async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
        """处理Claude请求"""
        messages = request.get('messages', [])
        model = request.get('model', 'claude-3-5-sonnet-20241022')
        max_tokens = request.get('max_tokens', 1000)
        
        # 调用Claude API
        response = self.client.messages.create(
            model=model,
            messages=messages,
            max_tokens=max_tokens
        )
        
        return {
            'content': response.content[0].text,
            'usage': {
                'input_tokens': response.usage.input_tokens,
                'output_tokens': response.usage.output_tokens
            }
        }
    
    async def _make_test_request(self):
        """发送测试请求"""
        return self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            messages=[{"role": "user", "content": "test"}],
            max_tokens=10
        )

# 认证服务
class AuthService(BaseService):
    def __init__(self, config: ServiceConfig):
        super().__init__(config)
        self.api_keys = {}  # 实际应使用数据库
        self.rate_limits = {}
    
    async def start(self):
        self.is_healthy = True
        print(f"Auth service started on {self.config.host}:{self.config.port}")
    
    async def stop(self):
        self.is_healthy = False
    
    async def health_check(self) -> Dict[str, Any]:
        return {
            'status': 'healthy',
            'service': self.config.name,
            'version': self.config.version,
            'metrics': self.metrics
        }
    
    async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
        """处理认证请求"""
        api_key = request.get('api_key')
        action = request.get('action', 'validate')
        
        if action == 'validate':
            return await self._validate_api_key(api_key)
        elif action == 'check_rate_limit':
            return await self._check_rate_limit(api_key)
        else:
            raise ValueError(f"Unknown action: {action}")
    
    async def _validate_api_key(self, api_key: str) -> Dict[str, Any]:
        """验证API密钥"""
        # 简化的验证逻辑
        if api_key and api_key.startswith('sk-'):
            return {
                'valid': True,
                'user_id': f"user_{hash(api_key) % 10000}",
                'tier': 'standard'
            }
        else:
            return {'valid': False}
    
    async def _check_rate_limit(self, api_key: str) -> Dict[str, Any]:
        """检查速率限制"""
        # 简化的速率限制检查
        current_count = self.rate_limits.get(api_key, 0)
        
        if current_count < 100:  # 假设限制为100请求/小时
            self.rate_limits[api_key] = current_count + 1
            return {
                'allowed': True,
                'remaining': 100 - current_count - 1
            }
        else:
            return {
                'allowed': False,
                'remaining': 0,
                'reset_time': 3600  # 1小时后重置
            }

# 缓存服务
class CacheService(BaseService):
    def __init__(self, config: ServiceConfig):
        super().__init__(config)
        self.cache = {}  # 实际应使用Redis
        self.ttl = {}
    
    async def start(self):
        self.is_healthy = True
        print(f"Cache service started on {self.config.host}:{self.config.port}")
    
    async def stop(self):
        self.is_healthy = False
    
    async def health_check(self) -> Dict[str, Any]:
        return {
            'status': 'healthy',
            'service': self.config.name,
            'cache_size': len(self.cache),
            'metrics': self.metrics
        }
    
    async def _process_request_impl(self, request: Dict[str, Any]) -> Any:
        """处理缓存请求"""
        action = request.get('action')
        key = request.get('key')
        
        if action == 'get':
            return await self._get(key)
        elif action == 'set':
            value = request.get('value')
            ttl = request.get('ttl', 3600)
            return await self._set(key, value, ttl)
        elif action == 'delete':
            return await self._delete(key)
        else:
            raise ValueError(f"Unknown action: {action}")
    
    async def _get(self, key: str) -> Dict[str, Any]:
        """获取缓存值"""
        if key in self.cache:
            return {
                'found': True,
                'value': self.cache[key]
            }
        else:
            return {'found': False}
    
    async def _set(self, key: str, value: Any, ttl: int) -> Dict[str, Any]:
        """设置缓存值"""
        self.cache[key] = value
        # 简化的TTL处理
        return {'success': True}
    
    async def _delete(self, key: str) -> Dict[str, Any]:
        """删除缓存值"""
        if key in self.cache:
            del self.cache[key]
            return {'deleted': True}
        else:
            return {'deleted': False}

2. 服务发现与注册

import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import List, Dict

class ServiceRegistry:
    def __init__(self):
        self.services = {}  # service_name -> [ServiceInstance]
        self.health_check_interval = 30  # 30秒
        self.health_check_task = None
    
    async def start(self):
        """启动服务注册中心"""
        self.health_check_task = asyncio.create_task(
            self._periodic_health_check()
        )
        print("Service registry started")
    
    async def stop(self):
        """停止服务注册中心"""
        if self.health_check_task:
            self.health_check_task.cancel()
        print("Service registry stopped")
    
    def register_service(
        self,
        service_name: str,
        instance_id: str,
        host: str,
        port: int,
        metadata: Dict[str, Any] = None
    ):
        """注册服务实例"""
        instance = ServiceInstance(
            service_name=service_name,
            instance_id=instance_id,
            host=host,
            port=port,
            metadata=metadata or {}
        )
        
        if service_name not in self.services:
            self.services[service_name] = []
        
        # 移除已存在的实例(如果有)
        self.services[service_name] = [
            s for s in self.services[service_name]
            if s.instance_id != instance_id
        ]
        
        # 添加新实例
        self.services[service_name].append(instance)
        
        print(f"Service registered: {service_name}/{instance_id}")
    
    def deregister_service(self, service_name: str, instance_id: str):
        """注销服务实例"""
        if service_name in self.services:
            self.services[service_name] = [
                s for s in self.services[service_name]
                if s.instance_id != instance_id
            ]
            
            # 如果没有实例了,删除服务
            if not self.services[service_name]:
                del self.services[service_name]
        
        print(f"Service deregistered: {service_name}/{instance_id}")
    
    def discover_services(self, service_name: str) -> List['ServiceInstance']:
        """发现服务实例"""
        return [
            instance for instance in self.services.get(service_name, [])
            if instance.is_healthy
        ]
    
    def get_all_services(self) -> Dict[str, List['ServiceInstance']]:
        """获取所有服务"""
        return self.services.copy()
    
    async def _periodic_health_check(self):
        """定期健康检查"""
        while True:
            try:
                await self._check_all_services_health()
                await asyncio.sleep(self.health_check_interval)
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Health check error: {e}")
                await asyncio.sleep(5)
    
    async def _check_all_services_health(self):
        """检查所有服务的健康状态"""
        tasks = []
        
        for service_name, instances in self.services.items():
            for instance in instances:
                task = self._check_instance_health(instance)
                tasks.append(task)
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _check_instance_health(self, instance: 'ServiceInstance'):
        """检查单个实例的健康状态"""
        try:
            async with aiohttp.ClientSession() as session:
                url = f"http://{instance.host}:{instance.port}/health"
                
                async with session.get(
                    url,
                    timeout=aiohttp.ClientTimeout(total=5)
                ) as response:
                    if response.status == 200:
                        instance.mark_healthy()
                    else:
                        instance.mark_unhealthy()
                        
        except Exception:
            instance.mark_unhealthy()

@dataclass
class ServiceInstance:
    service_name: str
    instance_id: str
    host: str
    port: int
    metadata: Dict[str, Any]
    is_healthy: bool = True
    last_health_check: datetime = None
    consecutive_failures: int = 0
    
    def __post_init__(self):
        self.last_health_check = datetime.now()
    
    def mark_healthy(self):
        """标记为健康"""
        self.is_healthy = True
        self.consecutive_failures = 0
        self.last_health_check = datetime.now()
    
    def mark_unhealthy(self):
        """标记为不健康"""
        self.consecutive_failures += 1
        
        # 连续失败3次才标记为不健康
        if self.consecutive_failures >= 3:
            self.is_healthy = False
        
        self.last_health_check = datetime.now()
    
    def get_endpoint(self) -> str:
        """获取服务端点"""
        return f"http://{self.host}:{self.port}"

# 负载均衡器
class LoadBalancer:
    def __init__(self, service_registry: ServiceRegistry):
        self.registry = service_registry
        self.strategies = {
            'round_robin': self._round_robin,
            'random': self._random_selection,
            'least_connections': self._least_connections,
            'weighted': self._weighted_selection
        }
        self.round_robin_counters = {}
    
    def select_instance(
        self,
        service_name: str,
        strategy: str = 'round_robin'
    ) -> Optional[ServiceInstance]:
        """选择服务实例"""
        instances = self.registry.discover_services(service_name)
        
        if not instances:
            return None
        
        if len(instances) == 1:
            return instances[0]
        
        selection_func = self.strategies.get(strategy, self._round_robin)
        return selection_func(service_name, instances)
    
    def _round_robin(
        self,
        service_name: str,
        instances: List[ServiceInstance]
    ) -> ServiceInstance:
        """轮询策略"""
        if service_name not in self.round_robin_counters:
            self.round_robin_counters[service_name] = 0
        
        index = self.round_robin_counters[service_name] % len(instances)
        self.round_robin_counters[service_name] += 1
        
        return instances[index]
    
    def _random_selection(
        self,
        service_name: str,
        instances: List[ServiceInstance]
    ) -> ServiceInstance:
        """随机选择策略"""
        import random
        return random.choice(instances)
    
    def _least_connections(
        self,
        service_name: str,
        instances: List[ServiceInstance]
    ) -> ServiceInstance:
        """最少连接策略(简化版)"""
        # 这里简化为随机选择,实际应跟踪连接数
        return self._random_selection(service_name, instances)
    
    def _weighted_selection(
        self,
        service_name: str,
        instances: List[ServiceInstance]
    ) -> ServiceInstance:
        """加权选择策略"""
        # 基于实例的元数据中的权重
        weights = []
        for instance in instances:
            weight = instance.metadata.get('weight', 1)
            weights.append(weight)
        
        import random
        total_weight = sum(weights)
        r = random.uniform(0, total_weight)
        
        cumulative = 0
        for i, weight in enumerate(weights):
            cumulative += weight
            if r <= cumulative:
                return instances[i]
        
        return instances[0]

缓存与存储策略

1. 分布式缓存系统

import redis
import json
import hashlib
from typing import Any, Optional, Union
import pickle

class DistributedCacheManager:
    def __init__(
        self,
        redis_hosts: List[str],
        cache_prefix: str = "claude_app",
        default_ttl: int = 3600
    ):
        self.cache_prefix = cache_prefix
        self.default_ttl = default_ttl
        
        # 创建Redis连接池
        self.redis_clients = []
        for host in redis_hosts:
            host_parts = host.split(':')
            redis_host = host_parts[0]
            redis_port = int(host_parts[1]) if len(host_parts) > 1 else 6379
            
            client = redis.Redis(
                host=redis_host,
                port=redis_port,
                decode_responses=False,  # 用于存储二进制数据
                socket_connect_timeout=5,
                socket_timeout=5
            )
            self.redis_clients.append(client)
    
    def _get_client(self, key: str) -> redis.Redis:
        """基于key选择Redis客户端(一致性哈希)"""
        hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
        index = hash_value % len(self.redis_clients)
        return self.redis_clients[index]
    
    def _make_key(self, key: str) -> str:
        """生成缓存键"""
        return f"{self.cache_prefix}:{key}"
    
    async def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        cache_key = self._make_key(key)
        client = self._get_client(cache_key)
        
        try:
            data = client.get(cache_key)
            if data is None:
                return None
            
            # 反序列化数据
            return pickle.loads(data)
            
        except Exception as e:
            print(f"Cache get error for key {key}: {e}")
            return None
    
    async def set(
        self,
        key: str,
        value: Any,
        ttl: Optional[int] = None
    ) -> bool:
        """设置缓存值"""
        cache_key = self._make_key(key)
        client = self._get_client(cache_key)
        
        try:
            # 序列化数据
            serialized_value = pickle.dumps(value)
            
            # 设置TTL
            expire_time = ttl or self.default_ttl
            
            return client.setex(cache_key, expire_time, serialized_value)
            
        except Exception as e:
            print(f"Cache set error for key {key}: {e}")
            return False
    
    async def delete(self, key: str) -> bool:
        """删除缓存值"""
        cache_key = self._make_key(key)
        client = self._get_client(cache_key)
        
        try:
            return bool(client.delete(cache_key))
        except Exception as e:
            print(f"Cache delete error for key {key}: {e}")
            return False
    
    async def exists(self, key: str) -> bool:
        """检查键是否存在"""
        cache_key = self._make_key(key)
        client = self._get_client(cache_key)
        
        try:
            return bool(client.exists(cache_key))
        except Exception as e:
            print(f"Cache exists error for key {key}: {e}")
            return False

# 智能缓存策略
class SmartCacheStrategy:
    def __init__(self, cache_manager: DistributedCacheManager):
        self.cache_manager = cache_manager
        self.cache_policies = {
            'conversation': {'ttl': 1800, 'compress': True},  # 30分钟
            'translation': {'ttl': 7200, 'compress': False},  # 2小时
            'user_profile': {'ttl': 3600, 'compress': False},  # 1小时
            'api_response': {'ttl': 300, 'compress': True}    # 5分钟
        }
    
    async def cache_conversation(
        self,
        conversation_id: str,
        messages: List[Dict],
        user_id: str
    ) -> bool:
        """缓存对话数据"""
        key = f"conversation:{conversation_id}"
        
        # 可选:压缩大对话
        policy = self.cache_policies['conversation']
        data = {
            'messages': messages,
            'user_id': user_id,
            'cached_at': datetime.now().isoformat()
        }
        
        if policy['compress'] and len(json.dumps(data)) > 10000:
            data = self._compress_conversation(data)
        
        return await self.cache_manager.set(
            key,
            data,
            ttl=policy['ttl']
        )
    
    async def get_conversation(
        self,
        conversation_id: str
    ) -> Optional[Dict]:
        """获取缓存的对话"""
        key = f"conversation:{conversation_id}"
        data = await self.cache_manager.get(key)
        
        if data and 'compressed' in data:
            data = self._decompress_conversation(data)
        
        return data
    
    async def cache_api_response(
        self,
        request_hash: str,
        response: Dict[str, Any]
    ) -> bool:
        """缓存API响应"""
        key = f"api_response:{request_hash}"
        policy = self.cache_policies['api_response']
        
        return await self.cache_manager.set(
            key,
            response,
            ttl=policy['ttl']
        )
    
    async def get_cached_api_response(
        self,
        request_hash: str
    ) -> Optional[Dict]:
        """获取缓存的API响应"""
        key = f"api_response:{request_hash}"
        return await self.cache_manager.get(key)
    
    def _compress_conversation(self, data: Dict) -> Dict:
        """压缩对话数据(简化版)"""
        import gzip
        
        messages_json = json.dumps(data['messages'])
        compressed_messages = gzip.compress(messages_json.encode())
        
        return {
            'compressed': True,
            'messages': compressed_messages,
            'user_id': data['user_id'],
            'cached_at': data['cached_at']
        }
    
    def _decompress_conversation(self, data: Dict) -> Dict:
        """解压缩对话数据"""
        import gzip
        
        decompressed_messages = gzip.decompress(data['messages']).decode()
        messages = json.loads(decompressed_messages)
        
        return {
            'messages': messages,
            'user_id': data['user_id'],
            'cached_at': data['cached_at']
        }

# 缓存预热和失效策略
class CacheWarmupManager:
    def __init__(self, cache_strategy: SmartCacheStrategy):
        self.cache_strategy = cache_strategy
        self.warmup_tasks = []
    
    async def warmup_user_data(self, user_id: str):
        """预热用户数据"""
        # 预加载用户的最近对话
        recent_conversations = await self._get_recent_conversations(user_id)
        
        tasks = []
        for conv in recent_conversations[:5]:  # 只预热最近5个对话
            task = self._warmup_conversation(conv['id'])
            tasks.append(task)
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _warmup_conversation(self, conversation_id: str):
        """预热单个对话"""
        # 检查是否已缓存
        cached = await self.cache_strategy.get_conversation(conversation_id)
        
        if not cached:
            # 从数据库加载并缓存
            conversation_data = await self._load_conversation_from_db(conversation_id)
            if conversation_data:
                await self.cache_strategy.cache_conversation(
                    conversation_id,
                    conversation_data['messages'],
                    conversation_data['user_id']
                )
    
    async def _get_recent_conversations(self, user_id: str) -> List[Dict]:
        """获取用户最近的对话(模拟)"""
        # 实际应从数据库查询
        return [
            {'id': f'conv_{user_id}_{i}', 'updated_at': datetime.now()}
            for i in range(10)
        ]
    
    async def _load_conversation_from_db(self, conversation_id: str) -> Optional[Dict]:
        """从数据库加载对话(模拟)"""
        # 实际应从数据库查询
        return {
            'messages': [
                {'role': 'user', 'content': f'Message in {conversation_id}'}
            ],
            'user_id': 'user123'
        }

2. 数据分片与分区

class DatabaseShardManager:
    def __init__(self, shard_configs: List[Dict]):
        self.shards = {}
        self.shard_ring = []
        
        for config in shard_configs:
            shard_id = config['shard_id']
            self.shards[shard_id] = DatabaseShard(config)
            
            # 构建一致性哈希环
            for i in range(config.get('virtual_nodes', 100)):
                hash_value = self._hash(f"{shard_id}:{i}")
                self.shard_ring.append((hash_value, shard_id))
        
        # 排序哈希环
        self.shard_ring.sort()
    
    def _hash(self, key: str) -> int:
        """计算哈希值"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def get_shard(self, key: str) -> 'DatabaseShard':
        """根据键获取对应的分片"""
        hash_value = self._hash(key)
        
        # 在哈希环中找到第一个大于等于hash_value的节点
        for ring_hash, shard_id in self.shard_ring:
            if ring_hash >= hash_value:
                return self.shards[shard_id]
        
        # 如果没找到,返回第一个节点(环形)
        return self.shards[self.shard_ring[0][1]]
    
    async def save_conversation(
        self,
        conversation_id: str,
        conversation_data: Dict
    ):
        """保存对话到相应分片"""
        shard = self.get_shard(conversation_id)
        await shard.save_conversation(conversation_id, conversation_data)
    
    async def load_conversation(
        self,
        conversation_id: str
    ) -> Optional[Dict]:
        """从相应分片加载对话"""
        shard = self.get_shard(conversation_id)
        return await shard.load_conversation(conversation_id)
    
    async def save_user_data(
        self,
        user_id: str,
        user_data: Dict
    ):
        """保存用户数据"""
        shard = self.get_shard(user_id)
        await shard.save_user_data(user_id, user_data)
    
    async def get_shard_stats(self) -> Dict[str, Any]:
        """获取分片统计信息"""
        stats = {}
        
        for shard_id, shard in self.shards.items():
            stats[shard_id] = await shard.get_stats()
        
        return stats

class DatabaseShard:
    def __init__(self, config: Dict):
        self.shard_id = config['shard_id']
        self.host = config['host']
        self.port = config['port']
        self.database = config['database']
        self.connection_pool = None
        
        # 统计信息
        self.stats = {
            'total_conversations': 0,
            'total_users': 0,
            'storage_size': 0,
            'last_updated': datetime.now()
        }
    
    async def connect(self):
        """连接到数据库"""
        # 这里应该初始化真实的数据库连接
        print(f"Connected to shard {self.shard_id} at {self.host}:{self.port}")
    
    async def save_conversation(
        self,
        conversation_id: str,
        conversation_data: Dict
    ):
        """保存对话数据"""
        # 实际应保存到数据库
        self.stats['total_conversations'] += 1
        self.stats['last_updated'] = datetime.now()
        
        print(f"Saved conversation {conversation_id} to shard {self.shard_id}")
    
    async def load_conversation(
        self,
        conversation_id: str
    ) -> Optional[Dict]:
        """加载对话数据"""
        # 实际应从数据库查询
        return {
            'id': conversation_id,
            'messages': [],
            'created_at': datetime.now().isoformat()
        }
    
    async def save_user_data(self, user_id: str, user_data: Dict):
        """保存用户数据"""
        self.stats['total_users'] += 1
        self.stats['last_updated'] = datetime.now()
    
    async def get_stats(self) -> Dict[str, Any]:
        """获取分片统计信息"""
        return {
            'shard_id': self.shard_id,
            'host': self.host,
            'port': self.port,
            **self.stats
        }

容错与恢复

1. 服务容错机制

import asyncio
from enum import Enum
from typing import Callable, Any

class CircuitBreakerState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitBreakerState.CLOSED
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """执行函数调用with熔断保护"""
        if self.state == CircuitBreakerState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitBreakerState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self) -> bool:
        """检查是否应该尝试重置"""
        if self.last_failure_time is None:
            return False
        
        return (
            asyncio.get_event_loop().time() - self.last_failure_time > 
            self.recovery_timeout
        )
    
    def _on_success(self):
        """成功时的处理"""
        self.failure_count = 0
        self.state = CircuitBreakerState.CLOSED
    
    def _on_failure(self):
        """失败时的处理"""
        self.failure_count += 1
        self.last_failure_time = asyncio.get_event_loop().time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitBreakerState.OPEN

# 服务容错包装器
class FaultTolerantService:
    def __init__(
        self,
        primary_service: BaseService,
        fallback_services: List[BaseService] = None,
        circuit_breaker: CircuitBreaker = None
    ):
        self.primary_service = primary_service
        self.fallback_services = fallback_services or []
        self.circuit_breaker = circuit_breaker or CircuitBreaker()
        self.current_service = primary_service
    
    async def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """容错的请求处理"""
        try:
            # 尝试使用主服务
            return await self.circuit_breaker.call(
                self.primary_service.process_request,
                request
            )
            
        except Exception as primary_error:
            print(f"Primary service failed: {primary_error}")
            
            # 尝试使用备用服务
            for fallback_service in self.fallback_services:
                try:
                    result = await fallback_service.process_request(request)
                    result['fallback_used'] = True
                    result['primary_error'] = str(primary_error)
                    return result
                    
                except Exception as fallback_error:
                    print(f"Fallback service failed: {fallback_error}")
                    continue
            
            # 所有服务都失败了
            return {
                'status': 'error',
                'error': 'All services unavailable',
                'primary_error': str(primary_error)
            }

# 自动恢复管理器
class AutoRecoveryManager:
    def __init__(self, service_registry: ServiceRegistry):
        self.service_registry = service_registry
        self.recovery_strategies = {}
        self.monitoring_tasks = {}
    
    def register_recovery_strategy(
        self,
        service_name: str,
        strategy: Callable
    ):
        """注册恢复策略"""
        self.recovery_strategies[service_name] = strategy
    
    async def start_monitoring(self, service_name: str):
        """开始监控服务"""
        if service_name in self.monitoring_tasks:
            return
        
        task = asyncio.create_task(
            self._monitor_service(service_name)
        )
        self.monitoring_tasks[service_name] = task
    
    async def stop_monitoring(self, service_name: str):
        """停止监控服务"""
        if service_name in self.monitoring_tasks:
            self.monitoring_tasks[service_name].cancel()
            del self.monitoring_tasks[service_name]
    
    async def _monitor_service(self, service_name: str):
        """监控单个服务"""
        consecutive_failures = 0
        
        while True:
            try:
                instances = self.service_registry.discover_services(service_name)
                healthy_instances = [i for i in instances if i.is_healthy]
                
                if len(healthy_instances) == 0 and len(instances) > 0:
                    consecutive_failures += 1
                    
                    if consecutive_failures >= 3:
                        await self._trigger_recovery(service_name)
                        consecutive_failures = 0
                else:
                    consecutive_failures = 0
                
                await asyncio.sleep(30)  # 每30秒检查一次
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Monitoring error for {service_name}: {e}")
                await asyncio.sleep(10)
    
    async def _trigger_recovery(self, service_name: str):
        """触发服务恢复"""
        print(f"Triggering recovery for service: {service_name}")
        
        if service_name in self.recovery_strategies:
            try:
                await self.recovery_strategies[service_name]()
            except Exception as e:
                print(f"Recovery failed for {service_name}: {e}")
        else:
            # 默认恢复策略:重启服务
            await self._default_recovery(service_name)
    
    async def _default_recovery(self, service_name: str):
        """默认恢复策略"""
        # 简化的重启逻辑
        print(f"Attempting to restart {service_name}")
        
        # 这里应该实现真实的服务重启逻辑
        # 例如:调用容器编排系统的API
        
        await asyncio.sleep(5)  # 模拟重启时间
        print(f"Service {service_name} recovery attempted")

监控与运维

1. 分布式监控系统

import time
from dataclasses import dataclass
from typing import Dict, List, Any
import asyncio

@dataclass
class Metric:
    name: str
    value: float
    timestamp: float
    tags: Dict[str, str]
    unit: str = None

class MetricsCollector:
    def __init__(self):
        self.metrics = []
        self.counters = {}
        self.gauges = {}
        self.histograms = {}
    
    def counter(self, name: str, value: float = 1, tags: Dict[str, str] = None):
        """记录计数器指标"""
        key = f"{name}:{tags or {}}"
        self.counters[key] = self.counters.get(key, 0) + value
        
        self.metrics.append(Metric(
            name=name,
            value=self.counters[key],
            timestamp=time.time(),
            tags=tags or {},
            unit="count"
        ))
    
    def gauge(self, name: str, value: float, tags: Dict[str, str] = None):
        """记录瞬时值指标"""
        key = f"{name}:{tags or {}}"
        self.gauges[key] = value
        
        self.metrics.append(Metric(
            name=name,
            value=value,
            timestamp=time.time(),
            tags=tags or {},
            unit="gauge"
        ))
    
    def histogram(
        self,
        name: str,
        value: float,
        tags: Dict[str, str] = None
    ):
        """记录直方图指标"""
        key = f"{name}:{tags or {}}"
        
        if key not in self.histograms:
            self.histograms[key] = {
                'count': 0,
                'sum': 0,
                'min': float('inf'),
                'max': float('-inf'),
                'values': []
            }
        
        hist = self.histograms[key]
        hist['count'] += 1
        hist['sum'] += value
        hist['min'] = min(hist['min'], value)
        hist['max'] = max(hist['max'], value)
        hist['values'].append(value)
        
        # 保持最近1000个值
        if len(hist['values']) > 1000:
            hist['values'] = hist['values'][-1000:]
        
        self.metrics.append(Metric(
            name=name,
            value=value,
            timestamp=time.time(),
            tags=tags or {},
            unit="histogram"
        ))
    
    def get_metrics(self, since: float = None) -> List[Metric]:
        """获取指标"""
        if since is None:
            return self.metrics.copy()
        
        return [m for m in self.metrics if m.timestamp >= since]
    
    def clear_metrics(self):
        """清除指标"""
        self.metrics.clear()

# 系统监控器
class SystemMonitor:
    def __init__(self, metrics_collector: MetricsCollector):
        self.metrics = metrics_collector
        self.monitoring_task = None
        self.is_running = False
    
    async def start(self):
        """开始监控"""
        self.is_running = True
        self.monitoring_task = asyncio.create_task(self._monitoring_loop())
    
    async def stop(self):
        """停止监控"""
        self.is_running = False
        if self.monitoring_task:
            self.monitoring_task.cancel()
    
    async def _monitoring_loop(self):
        """监控循环"""
        while self.is_running:
            try:
                await self._collect_system_metrics()
                await asyncio.sleep(10)  # 每10秒收集一次
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Monitoring error: {e}")
                await asyncio.sleep(5)
    
    async def _collect_system_metrics(self):
        """收集系统指标"""
        import psutil
        
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        self.metrics.gauge('system.cpu.usage', cpu_percent, {'unit': 'percent'})
        
        # 内存使用率
        memory = psutil.virtual_memory()
        self.metrics.gauge('system.memory.usage', memory.percent, {'unit': 'percent'})
        self.metrics.gauge('system.memory.available', memory.available, {'unit': 'bytes'})
        
        # 磁盘使用率
        disk = psutil.disk_usage('/')
        disk_percent = (disk.used / disk.total) * 100
        self.metrics.gauge('system.disk.usage', disk_percent, {'unit': 'percent'})
        
        # 网络IO
        network = psutil.net_io_counters()
        self.metrics.counter('system.network.bytes_sent', network.bytes_sent)
        self.metrics.counter('system.network.bytes_recv', network.bytes_recv)

# 分布式追踪
class DistributedTracer:
    def __init__(self):
        self.active_traces = {}
        self.completed_traces = []
    
    def start_span(
        self,
        operation_name: str,
        parent_span_id: str = None,
        tags: Dict[str, Any] = None
    ) -> 'Span':
        """开始一个新的span"""
        span = Span(
            operation_name=operation_name,
            parent_span_id=parent_span_id,
            tags=tags or {}
        )
        
        self.active_traces[span.span_id] = span
        return span
    
    def finish_span(self, span: 'Span'):
        """完成span"""
        span.finish()
        
        if span.span_id in self.active_traces:
            del self.active_traces[span.span_id]
        
        self.completed_traces.append(span)
        
        # 保持最近1000个追踪
        if len(self.completed_traces) > 1000:
            self.completed_traces = self.completed_traces[-1000:]
    
    def get_trace(self, trace_id: str) -> List['Span']:
        """获取完整的追踪"""
        return [
            span for span in self.completed_traces
            if span.trace_id == trace_id
        ]

@dataclass
class Span:
    operation_name: str
    parent_span_id: str = None
    tags: Dict[str, Any] = None
    span_id: str = None
    trace_id: str = None
    start_time: float = None
    end_time: float = None
    duration: float = None
    
    def __post_init__(self):
        import uuid
        
        self.span_id = str(uuid.uuid4())
        self.trace_id = self.parent_span_id or str(uuid.uuid4())
        self.start_time = time.time()
        self.tags = self.tags or {}
    
    def finish(self):
        """完成span"""
        self.end_time = time.time()
        self.duration = self.end_time - self.start_time
    
    def add_tag(self, key: str, value: Any):
        """添加标签"""
        self.tags[key] = value
    
    def log(self, message: str):
        """添加日志"""
        if 'logs' not in self.tags:
            self.tags['logs'] = []
        
        self.tags['logs'].append({
            'timestamp': time.time(),
            'message': message
        })

部署自动化

1. 容器化部署

# Dockerfile示例配置
DOCKERFILE_TEMPLATE = """
FROM python:3.9-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
  CMD curl -f http://localhost:8080/health || exit 1

# 启动命令
CMD ["python", "app.py"]
"""

# Docker Compose配置
DOCKER_COMPOSE_TEMPLATE = """
version: '3.8'

services:
  api-gateway:
    build: ./gateway
    ports:
      - "8080:8080"
    environment:
      - SERVICE_NAME=api-gateway
      - REGISTRY_URL=http://service-registry:8500
    depends_on:
      - service-registry
      - redis
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  claude-service:
    build: ./claude-service
    deploy:
      replicas: 3
    environment:
      - SERVICE_NAME=claude-service
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - REGISTRY_URL=http://service-registry:8500
    depends_on:
      - service-registry
      - redis

  auth-service:
    build: ./auth-service
    deploy:
      replicas: 2
    environment:
      - SERVICE_NAME=auth-service
      - REGISTRY_URL=http://service-registry:8500
    depends_on:
      - service-registry
      - postgres

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  postgres:
    image: postgres:13
    environment:
      - POSTGRES_DB=claude_app
      - POSTGRES_USER=app_user
      - POSTGRES_PASSWORD=app_password
    volumes:
      - postgres_data:/var/lib/postgresql/data

  service-registry:
    image: consul:latest
    ports:
      - "8500:8500"
    command: consul agent -dev -client=0.0.0.0

volumes:
  redis_data:
  postgres_data:
"""

# Kubernetes部署配置
K8S_DEPLOYMENT_TEMPLATE = """
apiVersion: apps/v1
kind: Deployment
metadata:
  name: claude-service
  labels:
    app: claude-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: claude-service
  template:
    metadata:
      labels:
        app: claude-service
    spec:
      containers:
      - name: claude-service
        image: claude-app/claude-service:latest
        ports:
        - containerPort: 8080
        env:
        - name: ANTHROPIC_API_KEY
          valueFrom:
            secretKeyRef:
              name: claude-secrets
              key: api-key
        - name: REDIS_URL
          value: "redis://redis-service:6379"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: claude-service
spec:
  selector:
    app: claude-service
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: claude-service-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: claude-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
"""

class DeploymentManager:
    def __init__(self):
        self.environments = {
            'development': {
                'replicas': 1,
                'resources': {'memory': '256Mi', 'cpu': '100m'}
            },
            'staging': {
                'replicas': 2,
                'resources': {'memory': '512Mi', 'cpu': '250m'}
            },
            'production': {
                'replicas': 3,
                'resources': {'memory': '1Gi', 'cpu': '500m'}
            }
        }
    
    def generate_k8s_manifests(
        self,
        service_name: str,
        environment: str,
        image_tag: str
    ) -> Dict[str, str]:
        """生成Kubernetes部署清单"""
        env_config = self.environments.get(environment, self.environments['production'])
        
        manifests = {
            'deployment': self._generate_deployment_manifest(
                service_name, environment, image_tag, env_config
            ),
            'service': self._generate_service_manifest(service_name),
            'hpa': self._generate_hpa_manifest(service_name, env_config)
        }
        
        return manifests
    
    def _generate_deployment_manifest(
        self,
        service_name: str,
        environment: str,
        image_tag: str,
        config: Dict
    ) -> str:
        """生成Deployment清单"""
        return f"""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {service_name}
  namespace: {environment}
  labels:
    app: {service_name}
    environment: {environment}
spec:
  replicas: {config['replicas']}
  selector:
    matchLabels:
      app: {service_name}
  template:
    metadata:
      labels:
        app: {service_name}
        environment: {environment}
    spec:
      containers:
      - name: {service_name}
        image: claude-app/{service_name}:{image_tag}
        ports:
        - containerPort: 8080
        resources:
          requests:
            memory: {config['resources']['memory']}
            cpu: {config['resources']['cpu']}
          limits:
            memory: {config['resources']['memory']}
            cpu: {config['resources']['cpu']}
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
"""
    
    def _generate_service_manifest(self, service_name: str) -> str:
        """生成Service清单"""
        return f"""
apiVersion: v1
kind: Service
metadata:
  name: {service_name}
spec:
  selector:
    app: {service_name}
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080
  type: ClusterIP
"""
    
    def _generate_hpa_manifest(self, service_name: str, config: Dict) -> str:
        """生成HPA清单"""
        max_replicas = config['replicas'] * 3
        
        return f"""
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: {service_name}-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: {service_name}
  minReplicas: {config['replicas']}
  maxReplicas: {max_replicas}
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
"""

这个分布式部署架构提供了完整的微服务部署方案,包括服务发现、负载均衡、容错机制、监控系统和自动化部署等关键组件。


网站公告

今日签到

点亮在社区的每一天
去签到