文章目录
一、待解决问题
1.1 问题描述
在Combat环境中应用了MAPPO算法,在同样环境中学习并复现IPPO算法。
1.2 解决方法
(1)搭建基础环境。
(2)IPPO 算法实例复现。
(3)代码框架理解
二、方法详述
2.1 必要说明
(1)MAPPO 与 IPPO 算法的区别在于什么地方?
源文献链接:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games
源文献原文如下:
为清楚起见,我们将具有 集中价值函数 输入的 PPO 称为 MAPPO (Multi-Agent PPO)。
将 策略和价值函数均具有本地输入 的 PPO 称为 IPPO (Independent PPO)。
总结而言,就是critic网络
不再是集中式的了。
因此,IPPO 相对于 MAPPO 可能会更加占用计算、存储资源,毕竟每个agent都会拥有各自的critic网络
。
(2)IPPO 算法应用框架主要参考来源
其一,《动手学强化学习》-chapter 20
其二,深度强化学习(7)多智能体强化学习IPPO、MADDPG
✅非常感谢大佬的分享!!!
2.2 应用步骤
2.2.1 搭建基础环境
这一步骤直接参考上一篇博客,【动手学强化学习】番外7-MAPPO应用框架2学习与复现
2.2.2 IPPO 算法实例复现
(1)源码
智能体对于policy的使用分为separated policy
与shared policy
,即每个agent拥有单独的policy net,所有agent共用一个policy net,二者在源码中都能够使用,对应位置取消注释即可。
与MAPPO
的不同就在于,每个agent拥有单独的value net。
import torch
import torch.nn.functional as F
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import sys
from ma_gym.envs.combat.combat import Combat
# PPO算法
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
def forward(self, x):
x = F.relu(self.fc2(F.relu(self.fc1(x))))
return F.softmax(self.fc3(x), dim=1)
class ValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim):
super(ValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, 1)
def forward(self, x):
x = F.relu(self.fc2(F.relu(self.fc1(x))))
return self.fc3(x)
def compute_advantage(gamma, lmbda, td_delta):
td_delta = td_delta.detach().numpy()
advantage_list = []
advantage = 0.0
for delta in td_delta[::-1]:
advantage = gamma * lmbda * advantage + delta
advantage_list.append(advantage)
advantage_list.reverse()
return torch.tensor(advantage_list, dtype=torch.float)
# PPO,采用截断方式
class PPO:
def __init__(self, state_dim, hidden_dim, action_dim,
actor_lr, critic_lr, lmbda, eps, gamma, device):
self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.critic = ValueNet(state_dim, hidden_dim).to(device)
self.actor_optimizer = torch.optim.Adam(
self.actor.parameters(), actor_lr)
self.critic_optimizer = torch.optim.Adam(
self.critic.parameters(), critic_lr)
self.gamma = gamma
self.lmbda = lmbda
self.eps = eps # PPO中截断范围的参数
self.device = device
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
probs = self.actor(state)
action_dict = torch.distributions.Categorical(probs)
action = action_dict.sample()
return action.item()
def update(self, transition_dict):
states = torch.tensor(
transition_dict['states'], dtype=torch.float).to(self.device)
actions = torch.tensor(
transition_dict['actions']).view(-1, 1).to(self.device)
rewards = torch.tensor(
transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(
transition_dict['next_states'], dtype=torch.float).to(self.device)
dones = torch.tensor(
transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
td_target = rewards + self.gamma * \
self.critic(next_states) * (1 - dones)
td_delta = td_target - self.critic(states)
advantage = compute_advantage(
self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
old_log_probs = torch.log(self.actor(
states).gather(1, actions)).detach()
log_probs = torch.log(self.actor(states).gather(1, actions))
ratio = torch.exp(log_probs - old_log_probs)
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - self.eps, 1 +
self.eps) * advantage # 截断
action_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数
critic_loss = torch.mean(F.mse_loss(
self.critic(states), td_target.detach()))
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
action_loss.backward()
critic_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
def show_lineplot(data, name):
# 生成 x 轴的索引
x = list(range(100))
# 创建图形和坐标轴
plt.figure(figsize=(20, 6))
# 绘制折线图
plt.plot(x, data, label=name,
marker='o', linestyle='-', linewidth=2)
# 添加标题和标签
plt.title(name)
plt.xlabel('Index')
plt.ylabel('Value')
plt.legend()
# 显示图形
plt.grid(True)
plt.show()
actor_lr = 3e-4
critic_lr = 1e-3
epochs = 10
episode_per_epoch = 1000
hidden_dim = 64
gamma = 0.99
lmbda = 0.97
eps = 0.2
team_size = 2 # 每个team里agent的数量
grid_size = (15, 15) # 二维空间的大小
device = torch.device(
"cuda") if torch.cuda.is_available() else torch.device("cpu")
# 创建环境
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)
state_dim = env.observation_space[0].shape[0]
action_dim = env.action_space[0].n
# =============================================================================
# # 创建智能体(不参数共享:separated policy)
# agent1 = PPO(
# state_dim, hidden_dim, action_dim,
# actor_lr, critic_lr, lmbda, eps, gamma, device
# )
# agent2 = PPO(
# state_dim, hidden_dim, action_dim,
# actor_lr, critic_lr, lmbda, eps, gamma, device
# )
# =============================================================================
# 创建智能体(参数共享:shared policy)
agent = PPO(
state_dim, hidden_dim, action_dim,
actor_lr, critic_lr, lmbda, eps, gamma, device
)
win_list = []
for e in range(epochs):
with tqdm(total=episode_per_epoch, desc='Epoch %d' % e) as pbar:
for episode in range(episode_per_epoch):
# Replay buffer for agent1
buffer_agent1 = {
'states': [],
'actions': [],
'next_states': [],
'rewards': [],
'dones': []
}
# Replay buffer for agent2
buffer_agent2 = {
'states': [],
'actions': [],
'next_states': [],
'rewards': [],
'dones': []
}
# 重置环境
s = env.reset()
terminal = False
while not terminal:
# 采取动作(不进行参数共享)
# a1 = agent1.take_action(s[0])
# a2 = agent2.take_action(s[1])
# 采取动作(进行参数共享)
a1 = agent.take_action(s[0])
a2 = agent.take_action(s[1])
next_s, r, done, info = env.step([a1, a2])
buffer_agent1['states'].append(s[0])
buffer_agent1['actions'].append(a1)
buffer_agent1['next_states'].append(next_s[0])
# 如果获胜,获得100的奖励,否则获得0.1惩罚
buffer_agent1['rewards'].append(
r[0] + 100 if info['win'] else r[0] - 0.1)
buffer_agent1['dones'].append(False)
buffer_agent2['states'].append(s[1])
buffer_agent2['actions'].append(a2)
buffer_agent2['next_states'].append(next_s[1])
buffer_agent2['rewards'].append(
r[1] + 100 if info['win'] else r[1] - 0.1)
buffer_agent2['dones'].append(False)
s = next_s # 转移到下一个状态
terminal = all(done)
# 更新策略(不进行参数共享)
# agent1.update(buffer_agent1)
# agent2.update(buffer_agent2)
# 更新策略(进行参数共享)
agent.update(buffer_agent1)
agent.update(buffer_agent2)
win_list.append(1 if info['win'] else 0)
if (episode + 1) % 100 == 0:
pbar.set_postfix({
'episode': '%d' % (episode_per_epoch * e + episode + 1),
'winner prob': '%.3f' % np.mean(win_list[-100:]),
'win count': '%d' % win_list[-100:].count(1)
})
pbar.update(1)
win_array = np.array(win_list)
# 每100条轨迹取一次平均
win_array = np.mean(win_array.reshape(-1, 100), axis=1)
# 创建 episode_list,每组 100 个回合的累计回合数
episode_list = np.arange(1, len(win_array) + 1) * 100
plt.plot(episode_list, win_array)
plt.xlabel('Episodes')
plt.ylabel('win rate')
plt.title('IPPO on Combat(shared policy)')
plt.show()
(2)Combat环境补充
这里还需要说明的是,由于在奖励设置过程中采用了win
(获胜),但是Combat
环境中step函数并没有返还该值
buffer_agent1['rewards'].append(
r[0] + 100 if info['win'] else r[0] - 0.1)
...
win_array = np.array(win_list)
...
因此需要在step()
函数中加入对win
(获胜)的判断,与return返回值
# 判断是否获胜
win = False
if all(self._agent_dones):
if sum([v for k, v in self.opp_health.items()]) == 0:
win = True
elif sum([v for k, v in self.agent_health.items()]) == 0:
win = False
else:
win = None # 平局
# 将获胜信息添加到 info 中
info = {'health': self.agent_health, 'win': win, 'opp_health': self.opp_health, 'step_count': self._step_count}
return self.get_agent_obs(), rewards, self._agent_dones, info
(3)代码结果
左图为separated policy
下运行结果,右图为shared policy
下运行结果。
明显可以看出,separated policy
有着更好的效果,但是代价就是在训练过程中会占用更多的资源。
2.2.3 代码框架理解
从2.2.2节源码
不难看出,IPPO
算法其实就是在PPO
算法上改为了多agent的环境,其中policy net,value net的更新原理并没有改变,因此代码框架的理解查看PPO
算法原理即可。
参考链接:【动手学强化学习】part8-PPO(Proximal Policy Optimization)近端策略优化算法
三、疑问
- 暂无
四、总结
IPPO
算法相对于MAPPO
算法会占用更多的资源,如果环境较为简单,可以采用该算法。如果环境比较复杂,建议先采用MAPPO
算法进行训练。