一、多智能体强化学习原理
1. 多智能体系统核心思想
多智能体强化学习(Multi-Agent Reinforcement Learning, MARL)旨在解决多个智能体在共享环境中协同或竞争学习的问题。与单智能体强化学习的区别在于:
对比维度 | 单智能体强化学习 | 多智能体强化学习 |
---|---|---|
目标 | 单个智能体优化策略 | 多个智能体协同或竞争优化策略 |
环境 | 静态环境 | 动态环境(其他智能体行为影响) |
挑战 | 探索与利用 | 非平稳性、通信、合作与竞争 |
2. 多智能体强化学习分类
合作型:智能体共同完成一个目标(如团队协作)
竞争型:智能体之间存在对抗关系(如博弈)
混合型:既有合作又有竞争(如市场交易)
二、MADDPG 算法框架
Multi-Agent Deep Deterministic Policy Gradient (MADDPG) 是 MARL 中的经典算法,基于 DDPG 扩展而来:
集中训练,分散执行:
训练时:每个智能体可以访问全局信息
执行时:每个智能体仅依赖局部观测
Critic 网络:使用全局信息评估动作价值
Actor 网络:基于局部观测选择动作
数学表达:
三、MADDPG 实现步骤(基于 Gymnasium)
我们将以 Multi-Agent Particle Environment 为例,实现 MADDPG 算法:
定义多智能体环境:创建多个智能体共享的环境
构建 Actor-Critic 网络:每个智能体独立拥有 Actor 和 Critic 网络
实现集中训练:Critic 使用全局信息,Actor 使用局部观测
分散执行测试:验证智能体在局部观测下的表现
四、代码实现
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
from pettingzoo.mpe import simple_spread_v3
from gymnasium.spaces import Discrete, Box
import time
# ================== 配置参数 ==================
class MADDPGConfig:
env_name = "simple_spread_v3" # 使用 PettingZoo 中的环境
num_agents = 3 # 智能体数量
hidden_dim = 256 # 网络隐藏层维度
actor_lr = 1e-4 # Actor 学习率
critic_lr = 1e-3 # Critic 学习率
gamma = 0.95 # 折扣因子
tau = 0.05 # 软更新系数
buffer_size = 100000 # 经验回放缓冲区大小
batch_size = 1024 # 批量大小
max_episodes = 1000 # 最大训练回合数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
noise_scale_init = 0.2 # 初始噪声幅度
noise_scale_decay = 0.995 # 噪声衰减系数
# ================== Actor 网络 ==================
class Actor(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, MADDPGConfig.hidden_dim),
nn.ReLU(),
nn.Linear(MADDPGConfig.hidden_dim, MADDPGConfig.hidden_dim),
nn.ReLU(),
nn.Linear(MADDPGConfig.hidden_dim, action_dim),
nn.Sigmoid() # 输出范围 [0, 1]
)
def forward(self, state):
return self.net(state)
# ================== Critic 网络 ==================
class Critic(nn.Module):
def __init__(self, state_dim, action_dim, num_agents):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim * num_agents + action_dim * num_agents, MADDPGConfig.hidden_dim),
nn.ReLU(),
nn.Linear(MADDPGConfig.hidden_dim, MADDPGConfig.hidden_dim),
nn.ReLU(),
nn.Linear(MADDPGConfig.hidden_dim, 1)
)
def forward(self, state, action):
# 展平状态和动作的维度: [batch, agents, dim] -> [batch, agents*dim]
state_flat = state.view(state.size(0), -1)
action_flat = action.view(action.size(0), -1)
x = torch.cat([state_flat, action_flat], dim=-1)
return self.net(x)
# ================== MADDPG 智能体 ==================
class MADDPGAgent:
def __init__(self, state_dim, action_dim, num_agents, agent_id, env_action_space):
self.id = agent_id
self.actor = Actor(state_dim, action_dim).to(MADDPGConfig.device)
self.target_actor = Actor(state_dim, action_dim).to(MADDPGConfig.device)
self.critic = Critic(state_dim, action_dim, num_agents).to(MADDPGConfig.device)
self.target_critic = Critic(state_dim, action_dim, num_agents).to(MADDPGConfig.device)
self.env_action_space = env_action_space # 保存环境动作空间
# 初始化目标网络
self.target_actor.load_state_dict(self.actor.state_dict())
self.target_critic.load_state_dict(self.critic.state_dict())
# 优化器
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=MADDPGConfig.actor_lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=MADDPGConfig.critic_lr)
def act(self, state, noise_scale=None):
if noise_scale is None:
noise_scale = 0.0 # 或 self.config.noise_scale_init
noise_scale = float(noise_scale)
# ==== 动作生成逻辑 ====
state = torch.FloatTensor(state).to(MADDPGConfig.device)
action = self.actor(state).cpu().data.numpy()
# 生成截断噪声
noise = np.clip(np.random.randn(*action.shape), -3.0, 3.0) * noise_scale
# 裁剪动作
action = np.clip(action + noise, 0.0, 1.0)
return action
def update(self, replay_buffer, agents):
if len(replay_buffer) < MADDPGConfig.batch_size:
return
# 从缓冲区采样
states, actions, rewards, next_states, dones = replay_buffer.sample(MADDPGConfig.batch_size)
# 提取当前智能体的奖励和终止状态
agent_rewards = rewards[:, self.id, :] # Shape: (batch_size, 1)
agent_dones = dones[:, self.id, :] # Shape: (batch_size, 1)
# 计算目标 Q 值
with torch.no_grad():
next_actions = torch.cat(
[agent.target_actor(next_states[:, i, :]) for i, agent in enumerate(agents)],
dim=1 # 在第二维拼接: (batch_size, num_agents * action_dim)
)
target_q_values = self.target_critic(next_states, next_actions)
target_q = agent_rewards + MADDPGConfig.gamma * (1 - agent_dones) * target_q_values
# 更新 Critic
current_q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_q, target_q.detach())
self.critic_optimizer.zero_grad()
critic_loss.backward()
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), max_norm=0.5)
self.critic_optimizer.step()
# 更新 Actor
actor_actions = torch.cat(
[self.actor(states[:, i, :]) if i == self.id else agents[i].actor(states[:, i, :]).detach()
for i in range(len(agents))],
dim=1 # 在第二维拼接: (batch_size, num_agents * action_dim)
)
actor_loss = -self.critic(states, actor_actions).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), max_norm=0.5)
self.actor_optimizer.step()
# 软更新目标网络
for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):
target_param.data.copy_(MADDPGConfig.tau * param.data + (1 - MADDPGConfig.tau) * target_param.data)
for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
target_param.data.copy_(MADDPGConfig.tau * param.data + (1 - MADDPGConfig.tau) * target_param.data)
# ================== 经验回放缓冲区 ==================
class ReplayBuffer:
def __init__(self, num_agents, state_dim, action_dim):
self.buffer = deque(maxlen=MADDPGConfig.buffer_size)
self.num_agents = num_agents
self.state_dim = state_dim
self.action_dim = action_dim
def add(self, state, action, reward, next_state, done):
# 转换为 numpy 数组并验证形状
state = np.array(state, dtype=np.float32)
assert state.shape == (self.num_agents, self.state_dim), \
f"State shape mismatch: {state.shape} vs {(self.num_agents, self.state_dim)}"
action = np.array(action, dtype=np.float32)
assert action.shape == (self.num_agents, self.action_dim), \
f"Action shape mismatch: {action.shape} vs {(self.num_agents, self.action_dim)}"
next_state = np.array(next_state, dtype=np.float32)
assert next_state.shape == (self.num_agents, self.state_dim), \
f"Next state shape mismatch: {next_state.shape} vs {(self.num_agents, self.state_dim)}"
self.buffer.append((
state,
action,
np.array(reward, dtype=np.float32),
next_state,
np.array(done, dtype=np.float32)
))
def sample(self, batch_size):
samples = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*samples)
# 转换为 numpy 数组并调整维度
states = np.array(states, dtype=np.float32) # Shape: (batch, num_agents, state_dim)
actions = np.array(actions, dtype=np.float32) # Shape: (batch, num_agents, action_dim)
rewards = np.array(rewards, dtype=np.float32) # Shape: (batch, num_agents)
next_states = np.array(next_states, dtype=np.float32)
dones = np.array(dones, dtype=np.float32) # Shape: (batch, num_agents)
# 转换为张量并添加维度
return (
torch.FloatTensor(states).to(MADDPGConfig.device),
torch.FloatTensor(actions).to(MADDPGConfig.device),
torch.FloatTensor(rewards).unsqueeze(-1).to(MADDPGConfig.device), # Shape: (batch, num_agents, 1)
torch.FloatTensor(next_states).to(MADDPGConfig.device),
torch.FloatTensor(dones).unsqueeze(-1).to(MADDPGConfig.device) # Shape: (batch, num_agents, 1)
)
def __len__(self):
return len(self.buffer)
# ================== 训练系统 ==================
class MADDPGTrainer:
def __init__(self):
self.env = simple_spread_v3.parallel_env(
N=3,
continuous_actions=True,
max_cycles=100,
render_mode="rgb_array"
)
self.env_action_space = self.env.action_space("agent_0")
print("动作空间下限:", self.env_action_space.low) # 应输出 [0.0, 0.0, ...]
print("动作空间上限:", self.env_action_space.high) # 应输出 [1.0, 1.0, ...]
# 验证动作空间
assert np.allclose(self.env_action_space.low, 0.0), "动作空间下限应为0.0"
assert np.allclose(self.env_action_space.high, 1.0), "动作空间上限应为1.0"
self.state_dim = self.env.observation_space("agent_0").shape[0]
self.action_space = self.env.action_space("agent_0")
if isinstance(self.action_space, Box):
self.action_dim = self.action_space.shape[0]
else:
raise ValueError("Continuous action space required")
self.num_agents = MADDPGConfig.num_agents
self.agents = [
MADDPGAgent(self.state_dim, self.action_dim, self.num_agents, agent_id=i, env_action_space=self.env_action_space)
for i in range(self.num_agents)
]
self.replay_buffer = ReplayBuffer(
self.num_agents,
self.state_dim,
self.action_dim
)
self.noise_scale = MADDPGConfig.noise_scale_init # 初始化噪声幅度
def train(self):
for episode in range(MADDPGConfig.max_episodes):
states, _ = self.env.reset()
episode_reward = 0
current_noise_scale = self.noise_scale
self.noise_scale *= MADDPGConfig.noise_scale_decay # 衰减噪声
while True:
# 保存当前环境的agents列表
current_agents = list(self.env.agents)
# 生成动作
actions = {}
for i, agent_id in enumerate(current_agents):
agent_state = states[agent_id]
actions[agent_id] = self.agents[i].act(agent_state, noise_scale=current_noise_scale)
# 执行环境步骤
next_states, rewards, terminations, truncations, _ = self.env.step(actions)
# 转换为数组并验证形状
state_matrix = np.stack([states[agent_id] for agent_id in current_agents])
next_state_matrix = np.stack([next_states[agent_id] for agent_id in current_agents])
action_matrix = np.stack([actions[agent_id] for agent_id in current_agents])
reward_array = np.array([rewards[agent_id] for agent_id in current_agents], dtype=np.float32)
done_array = np.array([terminations[agent_id] or truncations[agent_id] for agent_id in current_agents], dtype=np.float32)
# 存储经验(已包含形状验证)
self.replay_buffer.add(
state_matrix,
action_matrix,
reward_array,
next_state_matrix,
done_array
)
# 更新状态
states = next_states
episode_reward += np.sum(reward_array)
# 智能体更新
for agent in self.agents:
agent.update(self.replay_buffer, self.agents)
# 检查是否所有智能体都已完成
if all(terminations[agent_id] or truncations[agent_id] for agent_id in current_agents):
break
# 训练进度输出
if (episode + 1) % 100 == 0:
print(f"Episode {episode+1} | Total Reward: {episode_reward:.2f}")
if __name__ == "__main__":
# ==== 日志抑制 ====
import logging
import warnings
logging.basicConfig(level=logging.CRITICAL)
logging.getLogger('gymnasium').setLevel(logging.CRITICAL)
warnings.filterwarnings("ignore")
start_time = time.time()
print(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
print("初始化训练环境...")
trainer = MADDPGTrainer()
print("开始训练...")
trainer.train()
end_time = time.time()
print(f"训练完成时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")
print(f"总训练时长: {end_time - start_time:.2f}秒")
五、关键代码解析
Actor-Critic 网络
Actor
:基于局部观测输出动作Critic
:基于全局状态和所有智能体的动作输出 Q 值
集中训练与分散执行
训练时:Critic 使用全局信息,Actor 使用局部观测
执行时:每个智能体仅依赖局部观测选择动作
经验回放缓冲区
存储所有智能体的经验(状态、动作、奖励、下一状态、完成标志)
随机采样用于训练
六、训练输出示例
开始时间: 2025-03-23 08:27:49
初始化训练环境...
动作空间下限: [0. 0. 0. 0. 0.]
动作空间上限: [1. 1. 1. 1. 1.]
开始训练...
Episode 100 | Total Reward: -322.41
Episode 200 | Total Reward: -402.97
Episode 300 | Total Reward: -388.02
Episode 400 | Total Reward: -384.98
Episode 500 | Total Reward: -261.25
Episode 600 | Total Reward: -215.34
Episode 700 | Total Reward: -623.22
Episode 800 | Total Reward: -255.72
Episode 900 | Total Reward: -506.97
Episode 1000 | Total Reward: -285.47
训练完成时间: 2025-03-23 09:35:24
总训练时长: 4055.36秒
七、总结与扩展
本文实现了多智能体强化学习的核心算法——MADDPG,展示了智能体在共享环境中协同学习的能力。读者可尝试以下扩展方向:
复杂环境
使用更复杂的环境(如 StarCraft II)测试算法性能
通信机制
添加智能体之间的通信机制,提升协作效率
混合任务
设计既有合作又有竞争的任务,测试算法的通用性
在下一篇文章中,我们将探索 多任务强化学习(Multi-Task RL),并实现基于共享表示的策略优化算法!
注意事项
安装依赖:
pip install gymnasium pettingzoo