【动手学强化学习】番外8-IPPO应用框架学习与复现

发布于:2025-04-23 ⋅ 阅读:(19) ⋅ 点赞:(0)


一、待解决问题

1.1 问题描述

在Combat环境中应用了MAPPO算法,在同样环境中学习并复现IPPO算法。

1.2 解决方法

(1)搭建基础环境。
(2)IPPO 算法实例复现。
(3)代码框架理解

二、方法详述

2.1 必要说明

(1)MAPPO 与 IPPO 算法的区别在于什么地方?

源文献链接:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games
源文献原文如下:

为清楚起见,我们将具有 集中价值函数 输入的 PPO 称为 MAPPO (Multi-Agent PPO)。
将 策略和价值函数均具有本地输入 的 PPO 称为 IPPO (Independent PPO)。

总结而言,就是critic网络不再是集中式的了。
因此,IPPO 相对于 MAPPO 可能会更加占用计算、存储资源,毕竟每个agent都会拥有各自的critic网络

(2)IPPO 算法应用框架主要参考来源

其一,《动手学强化学习》-chapter 20
其二,深度强化学习(7)多智能体强化学习IPPO、MADDPG

✅非常感谢大佬的分享!!!

2.2 应用步骤

2.2.1 搭建基础环境

这一步骤直接参考上一篇博客,【动手学强化学习】番外7-MAPPO应用框架2学习与复现

2.2.2 IPPO 算法实例复现

(1)源码

智能体对于policy的使用分为separated policyshared policy,即每个agent拥有单独的policy net,所有agent共用一个policy net,二者在源码中都能够使用,对应位置取消注释即可。
MAPPO的不同就在于,每个agent拥有单独的value net。

import torch
import torch.nn.functional as F
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import sys
from ma_gym.envs.combat.combat import Combat

# PPO算法


class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc2(F.relu(self.fc1(x))))
        return F.softmax(self.fc3(x), dim=1)


class ValueNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim):
        super(ValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x):
        x = F.relu(self.fc2(F.relu(self.fc1(x))))
        return self.fc3(x)


def compute_advantage(gamma, lmbda, td_delta):
    td_delta = td_delta.detach().numpy()
    advantage_list = []
    advantage = 0.0
    for delta in td_delta[::-1]:
        advantage = gamma * lmbda * advantage + delta
        advantage_list.append(advantage)
    advantage_list.reverse()
    return torch.tensor(advantage_list, dtype=torch.float)


# PPO,采用截断方式
class PPO:
    def __init__(self, state_dim, hidden_dim, action_dim,
                 actor_lr, critic_lr, lmbda, eps, gamma, device):
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim, hidden_dim).to(device)
        self.actor_optimizer = torch.optim.Adam(
            self.actor.parameters(), actor_lr)
        self.critic_optimizer = torch.optim.Adam(
            self.critic.parameters(), critic_lr)
        self.gamma = gamma
        self.lmbda = lmbda
        self.eps = eps  # PPO中截断范围的参数
        self.device = device

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        probs = self.actor(state)
        action_dict = torch.distributions.Categorical(probs)
        action = action_dict.sample()
        return action.item()

    def update(self, transition_dict):
        states = torch.tensor(
            transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(
            transition_dict['actions']).view(-1, 1).to(self.device)
        rewards = torch.tensor(
            transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(
            transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(
            transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
        td_target = rewards + self.gamma * \
            self.critic(next_states) * (1 - dones)
        td_delta = td_target - self.critic(states)
        advantage = compute_advantage(
            self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
        old_log_probs = torch.log(self.actor(
            states).gather(1, actions)).detach()
        log_probs = torch.log(self.actor(states).gather(1, actions))
        ratio = torch.exp(log_probs - old_log_probs)
        surr1 = ratio * advantage
        surr2 = torch.clamp(ratio, 1 - self.eps, 1 +
                            self.eps) * advantage  # 截断
        action_loss = torch.mean(-torch.min(surr1, surr2))  # PPO损失函数
        critic_loss = torch.mean(F.mse_loss(
            self.critic(states), td_target.detach()))
        self.actor_optimizer.zero_grad()
        self.critic_optimizer.zero_grad()
        action_loss.backward()
        critic_loss.backward()
        self.actor_optimizer.step()
        self.critic_optimizer.step()


def show_lineplot(data, name):
    # 生成 x 轴的索引
    x = list(range(100))

    # 创建图形和坐标轴
    plt.figure(figsize=(20, 6))

    # 绘制折线图
    plt.plot(x, data, label=name,
             marker='o', linestyle='-', linewidth=2)

    # 添加标题和标签
    plt.title(name)
    plt.xlabel('Index')
    plt.ylabel('Value')
    plt.legend()

    # 显示图形
    plt.grid(True)
    plt.show()


actor_lr = 3e-4
critic_lr = 1e-3
epochs = 10
episode_per_epoch = 1000
hidden_dim = 64
gamma = 0.99
lmbda = 0.97
eps = 0.2
team_size = 2  # 每个team里agent的数量
grid_size = (15, 15)  # 二维空间的大小
device = torch.device(
    "cuda") if torch.cuda.is_available() else torch.device("cpu")

# 创建环境
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)
state_dim = env.observation_space[0].shape[0]
action_dim = env.action_space[0].n

# =============================================================================
# # 创建智能体(不参数共享:separated policy)
# agent1 = PPO(
#     state_dim, hidden_dim, action_dim,
#     actor_lr, critic_lr, lmbda, eps, gamma, device
# )
# agent2 = PPO(
#     state_dim, hidden_dim, action_dim,
#     actor_lr, critic_lr, lmbda, eps, gamma, device
# )
# =============================================================================


# 创建智能体(参数共享:shared policy)
agent = PPO(
    state_dim, hidden_dim, action_dim,
    actor_lr, critic_lr, lmbda, eps, gamma, device
)

win_list = []

for e in range(epochs):
    with tqdm(total=episode_per_epoch, desc='Epoch %d' % e) as pbar:
        for episode in range(episode_per_epoch):
            # Replay buffer for agent1
            buffer_agent1 = {
                'states': [],
                'actions': [],
                'next_states': [],
                'rewards': [],
                'dones': []
            }
            # Replay buffer for agent2
            buffer_agent2 = {
                'states': [],
                'actions': [],
                'next_states': [],
                'rewards': [],
                'dones': []
            }
            # 重置环境
            s = env.reset()
            terminal = False
            while not terminal:
                # 采取动作(不进行参数共享)
                # a1 = agent1.take_action(s[0])
                # a2 = agent2.take_action(s[1])
                # 采取动作(进行参数共享)
                a1 = agent.take_action(s[0])
                a2 = agent.take_action(s[1])
                next_s, r, done, info = env.step([a1, a2])

                buffer_agent1['states'].append(s[0])
                buffer_agent1['actions'].append(a1)
                buffer_agent1['next_states'].append(next_s[0])
                # 如果获胜,获得100的奖励,否则获得0.1惩罚
                buffer_agent1['rewards'].append(
                    r[0] + 100 if info['win'] else r[0] - 0.1)
                buffer_agent1['dones'].append(False)

                buffer_agent2['states'].append(s[1])
                buffer_agent2['actions'].append(a2)
                buffer_agent2['next_states'].append(next_s[1])
                buffer_agent2['rewards'].append(
                    r[1] + 100 if info['win'] else r[1] - 0.1)
                buffer_agent2['dones'].append(False)

                s = next_s  # 转移到下一个状态
                terminal = all(done)

            # 更新策略(不进行参数共享)
            # agent1.update(buffer_agent1)
            # agent2.update(buffer_agent2)
            # 更新策略(进行参数共享)
            agent.update(buffer_agent1)
            agent.update(buffer_agent2)
            win_list.append(1 if info['win'] else 0)
            if (episode + 1) % 100 == 0:
                pbar.set_postfix({
                    'episode': '%d' % (episode_per_epoch * e + episode + 1),
                    'winner prob': '%.3f' % np.mean(win_list[-100:]),
                    'win count': '%d' % win_list[-100:].count(1)
                })
            pbar.update(1)


win_array = np.array(win_list)
# 每100条轨迹取一次平均
win_array = np.mean(win_array.reshape(-1, 100), axis=1)

# 创建 episode_list,每组 100 个回合的累计回合数
episode_list = np.arange(1, len(win_array) + 1) * 100
plt.plot(episode_list, win_array)
plt.xlabel('Episodes')
plt.ylabel('win rate')
plt.title('IPPO on Combat(shared policy)')
plt.show()
(2)Combat环境补充

这里还需要说明的是,由于在奖励设置过程中采用了win(获胜),但是Combat环境中step函数并没有返还该值

 buffer_agent1['rewards'].append(
     r[0] + 100 if info['win'] else r[0] - 0.1)
     ...
win_array = np.array(win_list)
...

因此需要在step()函数中加入对win(获胜)的判断,与return返回值

        # 判断是否获胜
        win = False
        if all(self._agent_dones):
            if sum([v for k, v in self.opp_health.items()]) == 0:
                win = True
            elif sum([v for k, v in self.agent_health.items()]) == 0:
                win = False
            else:
                win = None  # 平局
    
        # 将获胜信息添加到 info 中
        info = {'health': self.agent_health, 'win': win, 'opp_health': self.opp_health, 'step_count': self._step_count}

        return self.get_agent_obs(), rewards, self._agent_dones, info
(3)代码结果

左图为separated policy下运行结果,右图为shared policy下运行结果。
明显可以看出,separated policy有着更好的效果,但是代价就是在训练过程中会占用更多的资源。
在这里插入图片描述在这里插入图片描述

2.2.3 代码框架理解

2.2.2节源码不难看出,IPPO算法其实就是在PPO算法上改为了多agent的环境,其中policy net,value net的更新原理并没有改变,因此代码框架的理解查看PPO算法原理即可。
参考链接:【动手学强化学习】part8-PPO(Proximal Policy Optimization)近端策略优化算法

三、疑问

  1. 暂无

四、总结

  • IPPO算法相对于MAPPO算法会占用更多的资源,如果环境较为简单,可以采用该算法。如果环境比较复杂,建议先采用MAPPO算法进行训练。

网站公告

今日签到

点亮在社区的每一天
去签到