PyTorch 深度学习实战(22):多智能体强化学习(MARL)

发布于:2025-03-24 ⋅ 阅读:(21) ⋅ 点赞:(0)

一、多智能体强化学习原理

1. 多智能体系统核心思想

多智能体强化学习(Multi-Agent Reinforcement Learning, MARL)旨在解决多个智能体在共享环境中协同或竞争学习的问题。与单智能体强化学习的区别在于:

对比维度 单智能体强化学习 多智能体强化学习
目标 单个智能体优化策略 多个智能体协同或竞争优化策略
环境 静态环境 动态环境(其他智能体行为影响)
挑战 探索与利用 非平稳性、通信、合作与竞争
2. 多智能体强化学习分类
  1. 合作型:智能体共同完成一个目标(如团队协作)

  2. 竞争型:智能体之间存在对抗关系(如博弈)

  3. 混合型:既有合作又有竞争(如市场交易)


二、MADDPG 算法框架

Multi-Agent Deep Deterministic Policy Gradient (MADDPG) 是 MARL 中的经典算法,基于 DDPG 扩展而来:

  1. 集中训练,分散执行

    • 训练时:每个智能体可以访问全局信息

    • 执行时:每个智能体仅依赖局部观测

  2. Critic 网络:使用全局信息评估动作价值

  3. Actor 网络:基于局部观测选择动作

数学表达:


三、MADDPG 实现步骤(基于 Gymnasium)

我们将以 Multi-Agent Particle Environment 为例,实现 MADDPG 算法:

  1. 定义多智能体环境:创建多个智能体共享的环境

  2. 构建 Actor-Critic 网络:每个智能体独立拥有 Actor 和 Critic 网络

  3. 实现集中训练:Critic 使用全局信息,Actor 使用局部观测

  4. 分散执行测试:验证智能体在局部观测下的表现


四、代码实现

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
from pettingzoo.mpe import simple_spread_v3
from gymnasium.spaces import Discrete, Box
import time

# ================== 配置参数 ==================
class MADDPGConfig:
    env_name = "simple_spread_v3"  # 使用 PettingZoo 中的环境
    num_agents = 3                 # 智能体数量
    hidden_dim = 256               # 网络隐藏层维度
    actor_lr = 1e-4                # Actor 学习率
    critic_lr = 1e-3               # Critic 学习率
    gamma = 0.95                   # 折扣因子
    tau = 0.05                     # 软更新系数
    buffer_size = 100000           # 经验回放缓冲区大小
    batch_size = 1024              # 批量大小
    max_episodes = 1000            # 最大训练回合数
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    noise_scale_init = 0.2    # 初始噪声幅度
    noise_scale_decay = 0.995  # 噪声衰减系数

# ================== Actor 网络 ==================
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, MADDPGConfig.hidden_dim),
            nn.ReLU(),
            nn.Linear(MADDPGConfig.hidden_dim, MADDPGConfig.hidden_dim),
            nn.ReLU(),
            nn.Linear(MADDPGConfig.hidden_dim, action_dim),
            nn.Sigmoid()  # 输出范围 [0, 1]
        )
    
    def forward(self, state):
        return self.net(state)

# ================== Critic 网络 ==================
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, num_agents):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim * num_agents + action_dim * num_agents, MADDPGConfig.hidden_dim),
            nn.ReLU(),
            nn.Linear(MADDPGConfig.hidden_dim, MADDPGConfig.hidden_dim),
            nn.ReLU(),
            nn.Linear(MADDPGConfig.hidden_dim, 1)
        )
    
    def forward(self, state, action):
        # 展平状态和动作的维度: [batch, agents, dim] -> [batch, agents*dim]
        state_flat = state.view(state.size(0), -1)
        action_flat = action.view(action.size(0), -1)
        x = torch.cat([state_flat, action_flat], dim=-1)
        return self.net(x)

# ================== MADDPG 智能体 ==================
class MADDPGAgent:
    def __init__(self, state_dim, action_dim, num_agents, agent_id, env_action_space):
        self.id = agent_id
        self.actor = Actor(state_dim, action_dim).to(MADDPGConfig.device)
        self.target_actor = Actor(state_dim, action_dim).to(MADDPGConfig.device)
        self.critic = Critic(state_dim, action_dim, num_agents).to(MADDPGConfig.device)
        self.target_critic = Critic(state_dim, action_dim, num_agents).to(MADDPGConfig.device)
        self.env_action_space = env_action_space  # 保存环境动作空间

        # 初始化目标网络
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic.load_state_dict(self.critic.state_dict())
        
        # 优化器
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=MADDPGConfig.actor_lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=MADDPGConfig.critic_lr)
    
    def act(self, state, noise_scale=None):
        if noise_scale is None:
            noise_scale = 0.0  # 或 self.config.noise_scale_init
        noise_scale = float(noise_scale)
        
        # ==== 动作生成逻辑 ====
        state = torch.FloatTensor(state).to(MADDPGConfig.device)
        action = self.actor(state).cpu().data.numpy()
        
        # 生成截断噪声
        noise = np.clip(np.random.randn(*action.shape), -3.0, 3.0) * noise_scale
        
        # 裁剪动作
        action = np.clip(action + noise, 0.0, 1.0)
        return action
    
    def update(self, replay_buffer, agents):
        if len(replay_buffer) < MADDPGConfig.batch_size:
            return
        
        # 从缓冲区采样
        states, actions, rewards, next_states, dones = replay_buffer.sample(MADDPGConfig.batch_size)
        
        # 提取当前智能体的奖励和终止状态
        agent_rewards = rewards[:, self.id, :]  # Shape: (batch_size, 1)
        agent_dones = dones[:, self.id, :]       # Shape: (batch_size, 1)
        
        # 计算目标 Q 值
        with torch.no_grad():
            next_actions = torch.cat(
                [agent.target_actor(next_states[:, i, :]) for i, agent in enumerate(agents)], 
                dim=1  # 在第二维拼接: (batch_size, num_agents * action_dim)
            )
            target_q_values = self.target_critic(next_states, next_actions)
            target_q = agent_rewards + MADDPGConfig.gamma * (1 - agent_dones) * target_q_values
        
        # 更新 Critic
        current_q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_q, target_q.detach())
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.critic.parameters(), max_norm=0.5)
        self.critic_optimizer.step()
        
        # 更新 Actor
        actor_actions = torch.cat(
            [self.actor(states[:, i, :]) if i == self.id else agents[i].actor(states[:, i, :]).detach() 
             for i in range(len(agents))],
            dim=1  # 在第二维拼接: (batch_size, num_agents * action_dim)
        )
        actor_loss = -self.critic(states, actor_actions).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.actor.parameters(), max_norm=0.5)
        self.actor_optimizer.step()
        
        # 软更新目标网络
        for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):
            target_param.data.copy_(MADDPGConfig.tau * param.data + (1 - MADDPGConfig.tau) * target_param.data)
        for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
            target_param.data.copy_(MADDPGConfig.tau * param.data + (1 - MADDPGConfig.tau) * target_param.data)

# ================== 经验回放缓冲区 ==================
class ReplayBuffer:
    def __init__(self, num_agents, state_dim, action_dim):
        self.buffer = deque(maxlen=MADDPGConfig.buffer_size)
        self.num_agents = num_agents
        self.state_dim = state_dim
        self.action_dim = action_dim
    
    def add(self, state, action, reward, next_state, done):
        # 转换为 numpy 数组并验证形状
        state = np.array(state, dtype=np.float32)
        assert state.shape == (self.num_agents, self.state_dim), \
            f"State shape mismatch: {state.shape} vs {(self.num_agents, self.state_dim)}"
        
        action = np.array(action, dtype=np.float32)
        assert action.shape == (self.num_agents, self.action_dim), \
            f"Action shape mismatch: {action.shape} vs {(self.num_agents, self.action_dim)}"
        
        next_state = np.array(next_state, dtype=np.float32)
        assert next_state.shape == (self.num_agents, self.state_dim), \
            f"Next state shape mismatch: {next_state.shape} vs {(self.num_agents, self.state_dim)}"
        
        self.buffer.append((
            state,
            action,
            np.array(reward, dtype=np.float32),
            next_state,
            np.array(done, dtype=np.float32)
        ))
    
    def sample(self, batch_size):
        samples = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*samples)
        
        # 转换为 numpy 数组并调整维度
        states = np.array(states, dtype=np.float32)          # Shape: (batch, num_agents, state_dim)
        actions = np.array(actions, dtype=np.float32)        # Shape: (batch, num_agents, action_dim)
        rewards = np.array(rewards, dtype=np.float32)        # Shape: (batch, num_agents)
        next_states = np.array(next_states, dtype=np.float32)
        dones = np.array(dones, dtype=np.float32)            # Shape: (batch, num_agents)
        
        # 转换为张量并添加维度
        return (
            torch.FloatTensor(states).to(MADDPGConfig.device),
            torch.FloatTensor(actions).to(MADDPGConfig.device),
            torch.FloatTensor(rewards).unsqueeze(-1).to(MADDPGConfig.device),  # Shape: (batch, num_agents, 1)
            torch.FloatTensor(next_states).to(MADDPGConfig.device),
            torch.FloatTensor(dones).unsqueeze(-1).to(MADDPGConfig.device)     # Shape: (batch, num_agents, 1)
        )
    
    def __len__(self):
        return len(self.buffer)

# ================== 训练系统 ==================
class MADDPGTrainer:
    def __init__(self):
        self.env = simple_spread_v3.parallel_env(
            N=3,
            continuous_actions=True,
            max_cycles=100,
            render_mode="rgb_array"
        )
        self.env_action_space = self.env.action_space("agent_0")
        print("动作空间下限:", self.env_action_space.low)   # 应输出 [0.0, 0.0, ...]
        print("动作空间上限:", self.env_action_space.high)  # 应输出 [1.0, 1.0, ...]
        # 验证动作空间
        assert np.allclose(self.env_action_space.low, 0.0), "动作空间下限应为0.0"
        assert np.allclose(self.env_action_space.high, 1.0), "动作空间上限应为1.0"
        self.state_dim = self.env.observation_space("agent_0").shape[0]
        self.action_space = self.env.action_space("agent_0")
        
        if isinstance(self.action_space, Box):
            self.action_dim = self.action_space.shape[0]
        else:
            raise ValueError("Continuous action space required")
        
        self.num_agents = MADDPGConfig.num_agents
        self.agents = [
            MADDPGAgent(self.state_dim, self.action_dim, self.num_agents, agent_id=i, env_action_space=self.env_action_space) 
            for i in range(self.num_agents)
        ]
        self.replay_buffer = ReplayBuffer(
            self.num_agents, 
            self.state_dim, 
            self.action_dim
        )
        self.noise_scale = MADDPGConfig.noise_scale_init  # 初始化噪声幅度

    def train(self):
        for episode in range(MADDPGConfig.max_episodes):
            states, _ = self.env.reset()
            episode_reward = 0
            current_noise_scale = self.noise_scale
            self.noise_scale *= MADDPGConfig.noise_scale_decay  # 衰减噪声
            
            while True:
                # 保存当前环境的agents列表
                current_agents = list(self.env.agents)
                
                # 生成动作
                actions = {}
                for i, agent_id in enumerate(current_agents):
                    agent_state = states[agent_id]
                    actions[agent_id] = self.agents[i].act(agent_state, noise_scale=current_noise_scale)
                
                # 执行环境步骤
                next_states, rewards, terminations, truncations, _ = self.env.step(actions)
                
                # 转换为数组并验证形状
                state_matrix = np.stack([states[agent_id] for agent_id in current_agents])
                next_state_matrix = np.stack([next_states[agent_id] for agent_id in current_agents])
                action_matrix = np.stack([actions[agent_id] for agent_id in current_agents])
                reward_array = np.array([rewards[agent_id] for agent_id in current_agents], dtype=np.float32)
                done_array = np.array([terminations[agent_id] or truncations[agent_id] for agent_id in current_agents], dtype=np.float32)
                
                # 存储经验(已包含形状验证)
                self.replay_buffer.add(
                    state_matrix,
                    action_matrix,
                    reward_array,
                    next_state_matrix,
                    done_array
                )
                
                # 更新状态
                states = next_states
                episode_reward += np.sum(reward_array)
                
                # 智能体更新
                for agent in self.agents:
                    agent.update(self.replay_buffer, self.agents)
                
                # 检查是否所有智能体都已完成
                if all(terminations[agent_id] or truncations[agent_id] for agent_id in current_agents):
                    break
            
            # 训练进度输出
            if (episode + 1) % 100 == 0:
                print(f"Episode {episode+1} | Total Reward: {episode_reward:.2f}")

if __name__ == "__main__":
    # ==== 日志抑制 ====
    import logging
    import warnings
    logging.basicConfig(level=logging.CRITICAL)
    logging.getLogger('gymnasium').setLevel(logging.CRITICAL)
    warnings.filterwarnings("ignore")

    start_time = time.time()
    print(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
    print("初始化训练环境...")
    trainer = MADDPGTrainer()
    print("开始训练...")
    trainer.train()
    end_time = time.time()
    print(f"训练完成时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")
    print(f"总训练时长: {end_time - start_time:.2f}秒")

五、关键代码解析

  1. Actor-Critic 网络

    • Actor:基于局部观测输出动作

    • Critic:基于全局状态和所有智能体的动作输出 Q 值

  2. 集中训练与分散执行

    • 训练时:Critic 使用全局信息,Actor 使用局部观测

    • 执行时:每个智能体仅依赖局部观测选择动作

  3. 经验回放缓冲区

    • 存储所有智能体的经验(状态、动作、奖励、下一状态、完成标志)

    • 随机采样用于训练


六、训练输出示例

开始时间: 2025-03-23 08:27:49
初始化训练环境...
动作空间下限: [0. 0. 0. 0. 0.]
动作空间上限: [1. 1. 1. 1. 1.]
开始训练...
Episode 100 | Total Reward: -322.41
Episode 200 | Total Reward: -402.97
Episode 300 | Total Reward: -388.02
Episode 400 | Total Reward: -384.98
Episode 500 | Total Reward: -261.25
Episode 600 | Total Reward: -215.34
Episode 700 | Total Reward: -623.22
Episode 800 | Total Reward: -255.72
Episode 900 | Total Reward: -506.97
Episode 1000 | Total Reward: -285.47
训练完成时间: 2025-03-23 09:35:24
总训练时长: 4055.36秒

七、总结与扩展

本文实现了多智能体强化学习的核心算法——MADDPG,展示了智能体在共享环境中协同学习的能力。读者可尝试以下扩展方向:

  1. 复杂环境

    • 使用更复杂的环境(如 StarCraft II)测试算法性能

  2. 通信机制

    • 添加智能体之间的通信机制,提升协作效率

  3. 混合任务

    • 设计既有合作又有竞争的任务,测试算法的通用性

在下一篇文章中,我们将探索 多任务强化学习(Multi-Task RL),并实现基于共享表示的策略优化算法!


注意事项

  1. 安装依赖:

    pip install gymnasium pettingzoo


网站公告

今日签到

点亮在社区的每一天
去签到