25/6/11 <算法笔记>RL基础算法讲解

发布于:2025-06-13 ⋅ 阅读:(22) ⋅ 点赞:(0)

今天来总结一下AC,A2C,DDPG,HAC,MADDPG,MCTS,PlaNet,PPO,QMIX,SAC算法的各个主要内容和各个算法的优势和适配哪些环境场景。

AC

首先是AC算法,简单来说就是一个执行者和一个评论家,两个互相提升,互相影响,AC算法是后面很多强化学习算法的基础算法,它结合了​​策略梯度(Policy Gradient)​​ 和​​值函数逼近(Value Function Approximation)​​ 的优势。

大体流程就是,当前策略和环境交互,得到轨迹 {st​,at​,rt+1​,st+1​},然后Critic用TD误差更新值函数(如Q值或V值),接着Actor用Critic提供的 Q(st​,at​) 或优势函数 A(st​,at​) 计算策略梯度,然后更新。适合机器人和游戏等场景。

咱来看下大致代码:

import torch
import torch.nn as nn
import torch.optim as optim

class Actor(nn.Module):
    def __init__(self,state_dim,action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim,64),
            nn.ReLU(),
            nn.LinearI(64,action_dim),
            nn.Softmax(dim = -1)
        )
    def forward(self,state):
        return self.net(state)
    
class Critic(nn.Module):
    def __init__(self,state_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim,64),
            nn.ReLu(),
            nn.Linear(64,1)
        )
    def forward(self,state):
        return self.net(state)
    
class AC_Agent:
    def __init__(self,state_dim,action_dim,gamma =0.99,lr = 0.001):
        self.actor = Actor(state_dim,action_dim)
        self.critic = Critic(state_dim)
        self.optimizer_actor = optim.Adam(self.actor.parameters(),lr = lr)
        self.optimizer_critic = optim.Adam(self.critic.parameters(),lr = lr)
        self.gamma = gamma 
    
    def select_action(self,state):
        state = torch.FloatTensor(state)
        probs = self.actor(state)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.item(),log_prob
    def update(self,state,log_prob,reward,next_state,done):
        value = self.critic(torch.FloatTenstor(state))
        next_value = self.critic(torch.FloatTensor(next_state)) if not done else 0
        td_target = reward+self.gamma*next_value
        td_error = td_target - value

        critic_loss = td_error.pow(2).mean()
        self.optimizer_critic.zero_grad()
        critic_loss.backward()
        self.optimizer_critic.step()

        actor_loss = -log_prob*td_error.detach()
        self.optimizer_actor.zero_grad()
        actor_loss.backward()
        self.optimizer_actor.step()
        

A2C

在AC的基础上引入优势函数,​ A(s,a)=Q(s,a)−V(s)≈r+γV(s′)−V(s)来衡量动作相对平均水平的优势,进一步降低方差。

import torch
import torch.nn as nn
import torch.optim as optim

class Actor(nn.Module):
    def __init__(self,state_dim,action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim,64),
            nn.ReLu(),
            nn.Linear(64,action_dim),
            nn.Softmax(dim=-1)
        )
    def forward(self,state):
        return self.net(state)
    

class Critic(nn.Module):
    def __init__(self, state_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim,64),
            nn.ReLU(),
            nn.Linear(64,1)
        )
    def forward(self,state):
        return self.net(state)

class A2C_Agent:
    def __init__(self,state_dim,action_dim,gamma = 0.99,lr = 0.001):
        self.actor = Actor(state_dim,action_dim)
        self.critic = Critic(state_dim)
        self.optimizer = optim.Adam(
            list(self.actor.parameters())+list(self.critic.parameters()),
            lr = lr
        )
        self.gamma = gamma
    def update(self,states,actions,rewards,next_states,dones):
        states = torch.FloatTensor(states)
        rewards = torch.FloatTensor(rewards)
        dones = torch.FloatTensor(dones)

        values = self.critic(states).squeeze()
        next_values = self.critic(torch.FloatTensor(next_states)).squeeze()
        next_values[dones] = 0
        ############
        advantages = rewards + self.gamma*next_values -values###################
        ############
        critic_loss = advantages.pow(2).mean()

        probs = self.actor(states)
        dists = torch.distributions.Categorical(probs)#将概率分布转化为离散分布对象
        log_probs = dists.log_prob(torch.LongTensor(actions))
        actor_loss = -(log_probs * advantages.detach()).mean()


        total_loss = actor_loss + 0.5*critic_loss
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

优势函数为advantages = rewards + self.gamma*next_values -values

它之后作为loss更新网络。

DDPG

DDPG是一种面向​​连续动作空间​​的强化学习算法,结合了​​深度Q网络(DQN)​​ 和​​策略梯度(Policy Gradient)​​ 方法的优势。

Critic的目标变为了采样小批量样本,计算目标Q值:

最小化Critic的均方误差损失。

Actor的目标​​:通过梯度上升更新Actor:

优势是能​​高效处理连续动作​​。

class Actor(nn.Module):
    def __init__(self,state_dim,action_dim):
        super(Actor,self).__init__()
        self.fc1 = nn.Linear(state_dim,30)
        self.fc2 = nn.Linear(30,action_dim)
        self.fc1.weight.data.normal_(0,0.1)#权重初始化
        self.fc2.weight.data.normal_(0,0.1)

    def forward(self,state):
        x = torch.relu(self,fc1(state)),
        action = torch.tanh(self.fc2(x))*ACTION_BOUND 
        return action 
    
class Critic(nn.Module):
    def __init__(self,state_dim,action_dim):
        super(Critic,self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim,30)
        self.fc2 = nn.Linear(30,1)
        self.fc1.weight.data.normal_(0,0.1)
        self.fc2.weight.data.normal_(0,0.1)

    def forward(self,state,action):
        x = torch.cat([state,action],dim = 1)
        x = torch.relu(self,fc1(x))
        q_value = self.fc2(x)
        return q_value
    
class ReplayBuffer:
    def __init__(self,capacity):
        self.buffer = deque(maxlen = capacity)
    
    def push(self,state,action,reward,next_state,done):
        self.buffer.append((state,action,reward,next_state,done))

    def sample(self,batch_size):
        batch = random.sample(self.buffer,batch_size)
        states,actions,rewards,next_states,dones = zip(*batch)
        return (
            torch.FloatTensor(states),
            torch.FloatTensor(actions),
            torch.FloatTensor(reward).unsqueeze(1),
            torch.FloatTensor(next_states),
            torch.FloatTensor(dones).unsqueeze(1)
        )
    def __len__(self):
        return len(self.buffer)
    
class DDPGAgent:
    def __init__(self,state_dim,action_dim):
        self.actor = Actor()
        self.critic = Critic()
        self.actor_target = Actor()
        self.critic_target = Critic()

        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())

        self.noise = OUNoise(action_dim)
    
    def select_action(self,state,add_noise = True):
        action = self.actor(state)
        if add_noise:action+=self.noise.sample()
        return np.clip(action,-ACTION_BOUND,ACTION_BOUND)
    def update(self):
        target_q = rewards + GAMMA*(1-dones)*self.critic_target(next_target,next_actions)
        critic_loss = nn.MSELoss()(current_q,target_q)
        actor_loss = -self.critic(states,self.actor(states)).mean()
        for param,target_param in zip(net.parameters(),target_net.parameters()):
            target_param.data.copy_(TAU*param.data + (1-TAU)*target_param.data)
class OUNoise:
    def __init__(self,action_dim,mu=0,theta=0.15,sigma = 0.2):
        self.state = np.ones(action_dim)*mu
    def sample(self):
        dx = self.theta*(self.mu - self.state)+self.sigma*np.random.randn(self.action_dim)
        self.state += dx 
        return self.state

env = gym.make('P-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
agent = DDPGAgent(state_dim,action_dim)

for episode in range(EPISODES):
    state = env.reset()
    total_reward = 0
    agent.noise.reset()

    for step in range(EP_STEPS):
        action = agent.select_action(state,add_noise=True)
        next_state,reward,done,_=env.step(action)
        agent.memory.push(state,action,reward,next_state,done)
        state = next_state
        total_reward += reward
        if len(agent.memory)>BATH_SIZE:
            agent.update()
        if done:
            break

HAC

HAC是啥?一句话说清​​

​​像公司里领导派活,员工干活!​​

​​高层(Manager)​​:只定目标(比如“拿下市场30%份额”),不碰具体操作。

​​底层(Worker)​​:只负责执行(比如“地推100个客户”),不管战略。

​​一、核心思想​​

HAC通过​​分层决策结构​​解决长时域任务中的稀疏奖励问题:

HAC怎么运作?分三步走​​

​​领导画饼​​:高层看全局(当前状态+终极目标),拆解出 ​​子目标​​(例如“先攻占华东市场”)。

​​员工冲刺​​:底层拿到子目标,疯狂执行动作(比如发传单、搞促销),目标就是 ​​逼近领导定的KPI。

​​发奖金原则​​:

员工:离子目标越近,奖金越多(内在奖励:奖金 = -距离)。

领导:子目标完成后,看整个任务赚了多少钱(环境奖励),再调整策略。

import torch
import torch.nn as nn
import torch.optimizer as optim 
import numpy as np
import gym

class GoalConditionedNetwork(nn.Module):
    def __init__(self,state_dim,goal_dim,action_dim,hidden_dim=128)
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim + goal_dim,hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim,hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim,action_dim)
        )
    def forward(self,state,goal):
        x =torch.cat([state,goal],dim=-1)
        return self.net(x)
class Critic(nn.Module):
    def __init__(self,state_dim,goal_dim,action_dim):
        super().__init__()
        self.value_net = GoalConditionedNetwork(state_dim,goal_dim,1)
    def forward(self,state,goal,action):
        x = torch.cat([state,goal,action],dim=-1)
        return self.value_net(x,torch.zero_like(goal))
class HACAgent:
    def __init__(self,env_name,k_level =2 ,subgoal_horizon = 50):
        self.env = gym.make(env_name)
        self.k_level = k_level
        self.H = subgoal_horizon
        self.layers =[]

        for i in range(k_level):
            if i == 0:
                actor = Actor(self.env.observation_space.shape[0],self.env.observation_space_shape[0],self.env.action_space.shape[0])
                critic = Critic(self.env.observation_space.shape[0], self.env.observation_space.shape[0], self.env.action_space.shape[0])
            else:  # 高层: 输出子目标(视为特殊动作)
                actor = Actor(self.env.observation_space.shape[0], self.env.observation_space.shape[0], self.env.observation_space.shape[0])
                critic = Critic(self.env.observation_space.shape[0], self.env.observation_space.shape[0], self.env.observation_space.shape[0])
            self.layers.append({'actor': actor, 'critic': critic})

        self.replay_buffers = [ReplayBuffer() for _ in range(k_level)]
        self.optimizers = []
        for layer in self.layers:
            self.optimizers.append({
                'actor': optim.Adam(layer['actor'].parameters(), lr=1e-4),
                'critic': optim.Adam(layer['critic'].parameters(), lr=1e-3)
            })
    def act(self,state,goal,layer_idx):
        action = self.layers[layer_idx]['actor'](state,goal)
        noise = torch.randn_like(action) * 0.1
        return torch.clamp(action + noise,-1,1)
    def update(self,layer_idx,batch_size = 64):
        states, goals, actions, rewards, next_states, dones = self.replay_buffers[layer_idx].sample(batch_size)
        
        # Critic更新: 最小化时序差分误差 (TD Loss)
        with torch.no_grad():
            target_q = rewards + 0.99 * self.layers[layer_idx]['critic'](next_states, goals, self.act(next_states, goals, layer_idx))
        current_q = self.layers[layer_idx]['critic'](states, goals, actions)
        critic_loss = nn.MSELoss()(current_q, target_q)
        
        # Actor更新: 最大化Q值 (策略梯度)
        actor_loss = -self.layers[layer_idx]['critic'](states, goals, self.act(states, goals, layer_idx)).mean()
        
        # 反向传播
        self.optimizers[layer_idx]['critic'].zero_grad()
        critic_loss.backward()
        self.optimizers[layer_idx]['critic'].step()
        
        self.optimizers[layer_idx]['actor'].zero_grad()
        actor_loss.backward()
        self.optimizers[layer_idx]['actor'].step()
    def train_hierarchy(self, state, final_goal, level):
        if level == 0: 
            action = self.act(state, final_goal, level)
            next_state, reward, done, _ = self.env.step(action)
            return next_state, reward, done
        subgoal = self.act(state, final_goal, level)
        for _ in range(self.H):  
            next_state, reward, done = self.train_hierarchy(state, subgoal, level-1)
            if done or self.check_subgoal(state, subgoal): 
                break
            state = next_state
        intrinsic_reward = -np.linalg.norm(state - subgoal)
        return next_state, intrinsic_reward, done
    def check_subgoal(self, state, subgoal, threshold=0.05):
        """判断是否达成子目标 (欧氏距离)"""
        return np.linalg.norm(state - subgoal) < threshold
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = []
        self.capacity = capacity

    def add(self, state, goal, action, reward, next_state, done):
        """存储转换样本"""
        if len(self.buffer) >= self.capacity:
            self.buffer.pop(0)
        self.buffer.append((state, goal, action, reward, next_state, done))
        
        # HER核心: 用实际状态替换目标生成新样本 [2,6](@ref)
        self.buffer.append((state, next_state, action, 0, next_state, True))

    def sample(self, batch_size):
        indices = np.random.randint(len(self.buffer), size=batch_size)
        batch = [self.buffer[i] for i in indices]
        return [torch.tensor(x) for x in zip(*batch)]

if __name__ == "__main__":
    agent = HACAgent("MountainCarContinuous-v0", k_level=2)
    for episode in range(1000):
        state = agent.env.reset()
        final_goal = np.array([0.48, 0.04])  # 山顶目标 [2](@ref)
        total_reward = 0
        done = False
        
        while not done:
            # 从最高层开始执行策略
            next_state, reward, done = agent.train_hierarchy(state, final_goal, agent.k_level-1)
            total_reward += reward
            state = next_state
        
        # 每10轮更新所有层级
        if episode % 10 == 0:
            for level in range(agent.k_level):
                agent.update(level)
        print(f"Episode {episode}, Reward: {total_reward:.2f}")

HER​​

强化学习中的“事后诸葛亮”技术(HER​​)核心逻辑是:

​​“既然已经知道结果,不如假装这个结果就是当初的目标,重新学习一遍!”​​

机器人试图抓取杯子但失败了 → 实际抓到了旁边的笔。

传统方法:判定为失败,丢弃该经验。

HER方法:​​将目标临时改为“抓到笔”​​,生成新样本(状态+动作+新目标+成功奖励),让算法学习到“虽然没抓到杯子,但抓笔的动作是有价值的”。

这种“用实际结果重新定义目标”的思维方式,与人类“事后诸葛亮”(事情发生后声称自己早有预料)的行为逻辑高度相似。

MADDPG

多智能体强化学习领域的标志性算法,基于DDPG扩展而来,核心解决​​多智能体协作与竞争环境​​中的连续动作空间问题。

DDPG是单兵作战,那MADDPG就是多兵协作,

DDPG的短板:多个智能体独立训练时,环境因其他智能体策略变化而“失控”(如队友突然改变战术,导致你的策略失效)

MADDPG的突破​​:

​Actor 只用局部观察​​(如球员只看自己周围的画面)(优势​​:训练时通过全局视角稳定学习,执行时无需通信,适应实时环境)

Critic 输入全局信息​​:消除环境波动影响(教练用全局数据稳定指导)

策略多样性​​:训练时为每个智能体准备多套策略(如球员练多种战术),应对对手变化

class MADDPGAgentManager:
    def __init__(self,num_agent:int,obs_dim_list:List[int],action_dim:int,
                 actor_lr:float,critic_lr:float,gamma:float,tau:float,
                 device:torch.device):
        self.num_agents = num_agents
        self.obs_dim_list = obs_dim_list
        self.gamma = gamma
        self.tau = tau
        self.device = device

        joint_obs_dim = sum(obs_dim_list)
        joint_action_dim = sum(obs_dim_list)
        joint_action_dim_one_hot = num_agents*action_dim
        self.actors:List[ActorDiscrete] = []
        self.critics:List[CentralizedCritic]=[]
        self.target_actors:List[ActorDiscrete] = []
        self.target_critics:List[CentralizedCritic]=[]
        self.actor_optimizers:List[optim.Optimizer]=[]
        self.critic_optimizers:List[optim.Optimizer]=[]
        
        for i in range(num_agents):
            actor = ActorDiscrete(obs_dim_list[i], action_dim).to(device)
            critic = CentralizedCritic(joint_obs_dim, joint_action_dim_one_hot).to(device)
            target_actor = ActorDiscrete(obs_dim_list[i], action_dim).to(device)
            target_critic = CentralizedCritic(joint_obs_dim, joint_action_dim_one_hot).to(device)

            # 初始化目标网络
            target_actor.load_state_dict(actor.state_dict())
            target_critic.load_state_dict(critic.state_dict())
            for p in target_actor.parameters(): p.requires_grad = False
            for p in target_critic.parameters(): p.requires_grad = False

            # 创建优化器
            actor_optimizer = optim.Adam(actor.parameters(), lr=actor_lr)
            critic_optimizer = optim.Adam(critic.parameters(), lr=critic_lr)


            self.actors.append(actor)
            self.critics.append(critic)
            self.target_actors.append(target_actor)
            self.target_critics.append(target_critic)
            self.actor_optimizers.append(actor_optimizer)
            self.critic_optimizers.append(critic_optimizer)

    def select_actions(self, obs_list: List[torch.Tensor], use_exploration=True) -> Tuple[List[int], List[torch.Tensor]]:
        """ 基于它们的局部观测为所有智能体选择动作。 """
        actions = []
        log_probs = []
        for i in range(self.num_agents):
            self.actors[i].eval()
            with torch.no_grad():
                act, log_prob = self.actors[i].select_action(obs_list[i].to(self.device), use_exploration)
                # 如果是单个观测,转换为标量
                if act.dim() == 0:
                    act = act.item()
            self.actors[i].train()
            actions.append(act)
            log_probs.append(log_prob)
        return actions, log_probs
    
    def update(self,batch:Experience,agent_id:int)->Tuple[float,float]:
        obs_batch,act_batch,rew_batch,next_obs_batch,dones_batch = batch
        batch_size = obs_batch.shape[0]

        joint_obs = obs_batch.view(batch_size,-1).to(self.device)
        joint_next_obs = next_obs_batch.view(batch_size,-1).to(self.device)
        act_one_hot = F.one_hot(act_barth,num_classes = self.action_dim).float()
        joint_actions_one_hot = act_one_hot.view(batch_size,-1).to(self.device)


        reward_i =rew_batch[:,agent_id].unsequeeze(-1).to(self.device)
        dones_i = dones_batch[:,agent_id].unsequeeze(-1).to(self.device)

        with torch.no_grad():
            target_next_actions_list = []
            for j in range(self.num_agents):
                obs_j_next= next_obs_batch[:,j,:].to(device)
                action_j_next,_ = self.target_actors[j].select_action(obs_j_next,use_exploration=False)
                acrion_j_next_one_hot = F.one_hot(action_j_next,num_classes = self.action_dim).float()
                target_next_actions_list.append(action_j_next_one_hot)

            joint_target_next_actions = torch.cat(target_next_actions_list,dim=1).to(self.device)
            q_target_next = self.target_critics[agent_id](joint_next_obs, joint_target_next_actions)

            # 计算目标 y = r_i + gamma * Q'_i * (1 - d_i)
            y = rewards_i + self.gamma * (1.0 - dones_i) * q_target_next
        q_current = self.critics[agent_id](joint_obs, joint_actions_one_hot)

        # 计算Critic损失
        critic_loss = F.mse_loss(q_current, y)

        # 优化Critic i
        self.critic_optimizers[agent_id].zero_grad()
        critic_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.critics[agent_id].parameters(), 1.0)  # 可选裁剪
        self.critic_optimizers[agent_id].step()

        for p in self.critics[agent_id].parameters():
            p.requires_grad = False
            current_actions_list = []
        log_probs_i_list = []  # 仅存储智能体i的log_prob
        for j in range(self.num_agents):
            obs_j = obs_batch[:, j, :].to(self.device)
            # 更新需要动作概率/对数几率 - 使用Gumbel-Softmax或类似REINFORCE的更新
            dist_j = self.actors[j](obs_j)  # 获取分类分布
            # 如果我们使用DDPG目标:Q(s, mu(s)),我们需要确定性动作
            # 这里我们使用策略梯度适配:最大化 E[log_pi_i * Q_i_detached]
            action_j = dist_j.sample()  # 采样动作作为Q的输入
            if j == agent_id:
                log_prob_i = dist_j.log_prob(action_j)  # 只需要被更新智能体的log_prob

            action_j_one_hot = F.one_hot(action_j, num_classes=self.action_dim).float()
            current_actions_list.append(action_j_one_hot)

        joint_current_actions = torch.cat(current_actions_list, dim=1).to(self.device)
        # 计算Actor损失: - E[log_pi_i * Q_i_detached]
        # Q_i使用所有Actor的*当前*动作进行评估
        q_for_actor_loss = self.critics[agent_id](joint_obs, joint_current_actions)
        actor_loss = -(log_prob_i * q_for_actor_loss.detach()).mean()  # 分离Q值

        # 替代的DDPG风格损失(如果Actor是确定性的):
        # actor_loss = -self.critics[agent_id](joint_obs, joint_current_actions).mean()

        # 优化Actor i
        self.actor_optimizers[agent_id].zero_grad()
        actor_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.actors[agent_id].parameters(), 1.0)  # 可选裁剪
        self.actor_optimizers[agent_id].step()

        # 解冻Critic梯度
        for p in self.critics[agent_id].parameters():
            p.requires_grad = True

        return critic_loss.item(), actor_loss.item()

    def update_targets(self) -> None:
        """ 对所有目标网络执行软更新。 """
        for i in range(self.num_agents):
            soft_update(self.target_critics[i], self.critics[i], self.tau)
            soft_update(self.target_actors[i], self.actors[i], self.tau)

MCTS蒙特卡洛树

MCTS是一种基于​​随机模拟的启发式搜索算法​​,用于在复杂决策问题(如棋类游戏、机器人规划)中寻找最优策略。它通过构建一棵动态生长的搜索树,结合随机模拟(蒙特卡洛方法)和树搜索的精确性,平衡​​探索(未知节点)​​ 与​​利用(已知高价值节点)​​ 的关系。

MCTS通过迭代执行以下四个步骤构建决策树:

  1. ​选择(Selection)​

    • 从根节点(当前状态)出发,递归选择子节点,直到到达一个​​未完全展开的叶节点​​。
    • ​选择策略​​:使用​​UCB公式(Upper Confidence Bound)​​ 平衡探索与利用
      :UCB=NQ​+CNlnNparent​​​
    • Q:节点累计奖励
    • N:节点访问次数
    • C:探索常数(通常取 2​)
  2. ​扩展(Expansion)​

    例如:围棋中从当前棋盘状态生成所有可能的落子位置。
  3. ​模拟(Simulation)​

    ​关键​​:模拟过程无需依赖领域知识,仅需基础规则。
  4. ​反向传播(Backpropagation)​

    • 将模拟结果​​回传更新路径上所有节点的统计信息​​:
      • 节点访问次数 N←N+1
      • 累计奖励 Q←Q+奖励值。

MCTS​​它无需启发式知识​​:仅依赖游戏规则,适用于规则明确但状态空间庞大的问题(如围棋)。

import math
import numpy as np
import random
from collections import defaultdict

# ========== 游戏环境类 ========== [3](@ref)
class TicTacToe:
    def __init__(self):
        self.board = np.array([[' ' for _ in range(3)] for _ in range(3)])
        self.current_player = 'X'
        self.winner = None

    def get_available_moves(self):
        """返回所有空位坐标"""
        return [(i, j) for i in range(3) for j in range(3) if self.board[i][j] == ' ']

    def make_move(self, move):
        """执行落子并切换玩家"""
        i, j = move
        if self.board[i][j] != ' ':
            return False
            
        self.board[i][j] = self.current_player
        if self.check_win():
            self.winner = self.current_player
        elif not self.get_available_moves():
            self.winner = 'Draw'  # 平局
        self.current_player = 'O' if self.current_player == 'X' else 'X'
        return True

    def check_win(self):
        """检查胜利条件"""
        # 检查行和列
        for i in range(3):
            if self.board[i][0] == self.board[i][1] == self.board[i][2] != ' ':
                return True
            if self.board[0][i] == self.board[1][i] == self.board[2][i] != ' ':
                return True
        # 检查对角线
        if self.board[0][0] == self.board[1][1] == self.board[2][2] != ' ':
            return True
        if self.board[0][2] == self.board[1][1] == self.board[2][0] != ' ':
            return True
        return False

    def copy(self):
        """深拷贝当前游戏状态"""
        new_game = TicTacToe()
        new_game.board = np.copy(self.board)
        new_game.current_player = self.current_player
        new_game.winner = self.winner
        return new_game

# ========== MCTS节点类 ========== [1,7](@ref)
class Node:
    def __init__(self, game_state, parent=None):
        self.game_state = game_state  # TicTacToe对象
        self.parent = parent
        self.children = []
        self.visits = 0
        self.wins = 0  # 累计奖励值
    
    def is_fully_expanded(self):
        """检查是否完全扩展"""
        return len(self.children) == len(self.game_state.get_available_moves())
    
    def best_child(self, exploration=1.4):
        """UCB公式选择最优子节点"""
        return max(self.children, 
                   key=lambda child: child.wins / (child.visits + 1e-6) + 
                   exploration * math.sqrt(math.log(self.visits + 1) / (child.visits + 1e-6)))

# ========== MCTS搜索算法 ========== [1,3,7](@ref)
class MCTS:
    def __init__(self, root_state, iterations=1000):
        self.root = Node(root_state)
        self.iterations = iterations

    def search(self):
        """执行完整MCTS搜索"""
        for _ in range(self.iterations):
            # 1. 选择阶段 (Selection)
            node = self._select(self.root)
            
            # 2. 扩展阶段 (Expansion)
            if node.game_state.winner is None:
                node = self._expand(node)
                
            # 3. 模拟阶段 (Simulation)
            reward = self._simulate(node.game_state.copy())
            
            # 4. 反向传播 (Backpropagation)
            self._backpropagate(node, reward)
        
        # 返回访问次数最多的动作
        return max(self.root.children, key=lambda c: c.visits)

    def _select(self, node):
        """递归选择子节点直至叶节点"""
        while node.children and not node.game_state.winner:
            if not node.is_fully_expanded():
                return node
            node = node.best_child()
        return node

    def _expand(self, node):
        """扩展新子节点"""
        available_moves = node.game_state.get_available_moves()
        for move in available_moves:
            if move not in [child.move for child in node.children]:
                new_state = node.game_state.copy()
                new_state.make_move(move)
                child = Node(new_state, parent=node)
                child.move = move  # 记录导致此状态的动作
                node.children.append(child)
        return random.choice(node.children)  # 随机选择新扩展节点

    def _simulate(self, game_state):
        """随机模拟至游戏结束"""
        while game_state.winner is None:
            move = random.choice(game_state.get_available_moves())
            game_state.make_move(move)
        # 奖励计算:胜+1,平+0.5,负+0
        if game_state.winner == 'X': return 1
        elif game_state.winner == 'Draw': return 0.5
        else: return 0

    def _backpropagate(self, node, reward):
        """更新路径上所有节点统计"""
        while node:
            node.visits += 1
            node.wins += reward
            node = node.parent

# ========== 主程序示例 ========== [3](@ref)
if __name__ == "__main__":
    game = TicTacToe()
    print("初始棋盘:")
    print(game.board)
    
    while not game.winner:
        if game.current_player == 'X':  # AI回合
            mcts = MCTS(game.copy(), iterations=1000)
            best_node = mcts.search()
            move = best_node.move
            print(f"AI选择落子位置: {move}")
        else:  # 玩家回合
            moves = game.get_available_moves()
            print(f"可选位置: {moves}")
            move = tuple(map(int, input("输入落子位置(行列号,如'0 1'): ").split()))
        
        game.make_move(move)
        print(game.board)
    
    print(f"游戏结束! 胜者: {game.winner}")

PlaNet

PlaNet 是由 ​​Google Research​​ 提出的 ​​基于模型的强化学习(Model-Based RL)算法​​,核心目标是通过学习环境的​​潜在动态模型​​,直接在​​紧凑的潜在空间中进行规划​​,从而显著提升样本效率。

它就像你闭眼打游戏🎮:

​​编码器​​:睁眼看到画面,立刻记住关键信息(比如敌人位置)

​​RNN​​:闭眼时靠记忆推算"如果按左键,敌人会怎么移动"

​​预测器​​:纯靠脑补下一步状态(可能猜错)

​​解码器​​:睁眼验证脑补的画面和实际是否一致。

核心原理与技术架构​

1. ​​潜在动态模型(Latent Dynamics Model)​

PlaNet 的核心创新是​​避开高维像素空间的直接预测​​(如传统视频预测模型),转而学习一个​​低维潜在状态空间​​的动力学模型

  • ​编码器(Encoder)​​:将图像观测 ot​ 压缩为潜在状态 st​(捕捉物体位置、速度等抽象特征)
  • ​循环状态空间模型(RSSM)它分为确定性路径和随机性路径​​:
    • ​确定性路径​​:在​​完全已知环境​​下,通过固定规则计算最优路径,结果唯一且可复现。这里通过 RNN 记忆历史信息(ht​=RNN(ht−1​,st−1​,at−1​))。
    • ​随机性路径​​:在​​部分未知或动态环境​​下,通过概率模型处理不确定性,生成适应变化的路径。这里是预测潜在状态分布 st​∼p(st​∣ht​),表达环境不确定性。
    • ​解码器(Decoder)​​:从 st​ 重建观测 ot​ 并预测奖励 rt​
2. ​​潜在空间规划(Latent Space Planning)​

PlaNet 的规划过程完全在​​潜在状态空间​​中进行,大幅降低计算开销

  1. ​编码历史观测​​:将过去图像序列编码为当前潜在状态 st​
  2. ​交叉熵方法(CEM)​​:
    • 采样大量动作序列 {at​,at+1​,...,at+H​}。
    • 用 RSSM ​​预测未来潜在状态​​ st+1​,...,st+H​ 和奖励 rt​,...,rt+H​。
    • 选择​​累积奖励最高​​的动作序列
  3. ​执行与重规划​​:仅执行最优序列的首个动作 at∗​,收到新观测后重新规划。

大白话讲解PlaNet 的核心组件​​

1. ​​环境模拟器(RSSM 模型)​​

​​作用​​:把游戏画面压缩成“抽象记忆”,并预测下一步画面和奖励。

​​工作流程​​:

​​看见画面​​ → 用编码器(encoder)压缩成“关键特征” ✅

​​脑补状态​​ → 结合动作和记忆,预测下一步状态(分两种):

​​确定性状态​​(如“按左键角色必左移”)→ 用 RNN 记忆

​​随机状态​​(如“敌人可能向左或向右”)→ 用概率分布表示 🌟

​​验证脑补​​ → 用解码器(decoder)把预测状态还原成画面,对比真实画面有多像 ✅

2. ​​规划器(CEM 算法)​​

​​作用​​:在模拟器中试错 N 套动作方案,选出得分最高的。

​​操作步骤​​:

​​随机生成 100 套动作​​(如“左跳→右闪→攻击”)

​​用模拟器推演每套动作结果​​ → 计算累计奖励(如:杀敌 +10 分,掉坑 -5 分)

​​选出得分最高的 20 套方案​​ → 按它们的动作调整策略(均值+波动范围)

​​只执行最优方案的第一步​​ → 之后重新规划(避免翻车)

省时省命​​:

真实游戏只需玩 ​​100 局​​,传统方法要 10 万局(效率提升 1000 倍)

​​高维画面也不怕​​:

不直接处理像素,用“抽象记忆”做决策(类似人脑记关键特征)

​​规划稳准狠​​:

在模拟器中试错安全无代价,还能探索冷门神操作

PlaNet = 游戏存档模拟器 + 战术推演大师​​

它像军师一样,先在沙盘上排兵布阵,验证可行后再出兵,以最小代价赢下战局

PlaNet适用于连续控制任务。

class RSSM(nn.Module):
    def __init__(self, state_dim, obs_dim, action_dim, hidden_size=200):
        # 网络定义
        self.encoder = nn.Sequential(...)  # 观测 → 潜在状态
        self.rnn = nn.LSTMCell(...)        # 确定性状态传递
        self.prior_net = nn.Sequential(...) # 先验状态分布
        self.decoder = nn.Sequential(...)   # 状态重建观测

    def forward(self, obs, action, prev_state):
        # 1. 编码观测 → 后验分布
        post_params = self.encoder(obs)
        post_mean, post_std = post_params.chunk(2, dim=-1)
        s_t_post = Normal(post_mean, torch.exp(post_std)).rsample()
        
        # 2. RNN更新确定性状态
        h_t, c_t = self.rnn(torch.cat([action, s_t_post], -1), prev_state)
        
        # 3. 预测先验分布
        prior_params = self.prior_net(h_t)
        prior_mean, prior_std = prior_params.chunk(2, dim=-1)
        
        # 4. 重建观测
        obs_recon = self.decoder(s_t_post)
        
        return (h_t, c_t), (post_mean, post_std), (prior_mean, prior_std), obs_recon
class CEMPlanner:
    def plan(self,model,initial_state):
        for _ in range(n_iter):
            actions= Normal(self.mean,self.std ).sample((n_samples,))
            rewards= []
            for a_seq in actions:
                state = initial_state
                total_reward = 0
                for t in range(horizon):
                    next_state,reward = model.step(state,a_seq[t])
                    total_reward += reward
                    state = next_state
                rewards.append(total_reward)

            top_idx = torch.topk(rewards,top_k).indices
            top_actions = actions[top_idx]
            self.mean = top_actions
            self.std = top_actions.std(dim = 0)
        return self.mean[0]
    
    def train_planet(env,epochs = 1000):
        model = RSSM(state_dim= 32,...)
        planner = CEMPlanner(env.action_dim)
        for epoch in range(epochs):
            obs = env.reset()
            state = model.initial_state()  # 初始化状态
            
            for t in range(100):  # 环境交互步数
                # 规划生成动作
                action = planner.plan(model, state)
                
                # 环境执行
                next_obs, reward, done = env.step(action)
                
                # 模型更新
                optimizer.zero_grad()
                state, post, prior, recon = model(next_obs, action, state)
                loss = elbo_loss(next_obs, recon, post, prior)
                loss.backward()
                optimizer.step()

PPO

PPO算法是结合AC架构和策略梯度,优势函数,策略探索,截断机制(招牌)。

这是它的截断机制,目的是为了防止策略更新幅度过大​ ​​,避免策略崩溃。

PPO的创新解决方案:

1.信任区域约束​​(Trust Region): 通过KL散度限制更新幅度 → 保障​​训练稳定性​​ ​​

2.重要性采样​​(Importance Sampling): 用旧策略的数据评估新策略 → ​​数据重用​

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import gym
import numpy as np

# 超参数配置
class Config:
    def __init__(self):
        self.env_name = "Pendulum-v1"
        self.hidden_dim = 256
        self.actor_lr = 3e-4
        self.critic_lr = 1e-3
        self.gamma = 0.99        # 折扣因子
        self.gae_lambda = 0.95   # GAE参数
        self.eps_clip = 0.2      # 剪切范围
        self.K_epochs = 10       # 策略更新轮次
        self.entropy_coef = 0.01 # 熵正则系数
        self.batch_size = 64
        self.buffer_size = 2048

# 策略网络(Actor):输出动作分布参数
class PolicyNet(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim):
        super().__init__()
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        self.mu_head = nn.Linear(hidden_dim, action_dim)  # 均值输出
        self.log_std_head = nn.Linear(hidden_dim, action_dim)  # 对数标准差输出

    def forward(self, state):
        x = self.shared(state)
        mu = self.mu_head(x)
        log_std = self.log_std_head(x)
        std = torch.exp(log_std).clamp(min=1e-6)  # 确保标准差>0
        return mu, std

# 价值网络(Critic):评估状态价值
class ValueNet(nn.Module):
    def __init__(self, state_dim, hidden_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
    
    def forward(self, state):
        return self.net(state).squeeze(-1)

# PPO核心算法类
class PPO:
    def __init__(self, cfg):
        self.env = gym.make(cfg.env_name)
        state_dim = self.env.observation_space.shape[0]
        action_dim = self.env.action_space.shape[0]
        
        self.actor = PolicyNet(state_dim, action_dim, cfg.hidden_dim)
        self.critic = ValueNet(state_dim, cfg.hidden_dim)
        self.old_actor = PolicyNet(state_dim, action_dim, cfg.hidden_dim)  # 旧策略
        self.old_actor.load_state_dict(self.actor.state_dict())
        
        self.optimizer = optim.Adam([
            {'params': self.actor.parameters(), 'lr': cfg.actor_lr},
            {'params': self.critic.parameters(), 'lr': cfg.critic_lr}
        ])
        
        self.cfg = cfg
        self.buffer = []  # 经验缓冲区

    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            mu, std = self.old_actor(state)
        dist = Normal(mu, std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(-1)  # 多维动作对数概率求和
        return action.squeeze(0).numpy(), log_prob.item()

    def compute_gae(self, rewards, values, next_values, dones):
        """广义优势估计(GAE)"""
        deltas = rewards + self.cfg.gamma * next_values * (1 - dones) - values
        advantages = np.zeros_like(rewards)
        advantage = 0
        for t in reversed(range(len(rewards))):
            advantage = deltas[t] + self.cfg.gamma * self.cfg.gae_lambda * (1 - dones[t]) * advantage
            advantages[t] = advantage
        returns = advantages + values
        return advantages, returns

    def update(self):
        # 数据转换为张量
        states = torch.FloatTensor(np.array([t[0] for t in self.buffer]))
        actions = torch.FloatTensor(np.array([t[1] for t in self.buffer]))
        old_log_probs = torch.FloatTensor(np.array([t[2] for t in self.buffer]))
        rewards = torch.FloatTensor(np.array([t[3] for t in self.buffer]))
        next_states = torch.FloatTensor(np.array([t[4] for t in self.buffer]))
        dones = torch.FloatTensor(np.array([t[5] for t in self.buffer]))
        
        # 计算GAE优势函数
        with torch.no_grad():
            values = self.critic(states).numpy()
            next_values = self.critic(next_states).numpy()
        advantages, returns = self.compute_gae(
            rewards.numpy(), values, next_values, dones.numpy()
        )
        advantages = torch.FloatTensor(advantages)
        returns = torch.FloatTensor(returns)
        
        # 多轮策略优化
        for _ in range(self.cfg.K_epochs):
            # 计算新策略的动作概率
            mu, std = self.actor(states)
            dist = Normal(mu, std)
            log_probs = dist.log_prob(actions).sum(-1)
            
            # 核心1:概率比计算
            ratios = torch.exp(log_probs - old_log_probs.detach())
            
            # 核心2:Clipped Surrogate Loss
            surr1 = ratios * advantages
            surr2 = torch.clamp(ratios, 1-self.cfg.eps_clip, 1+self.cfg.eps_clip) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()
            
            # Critic损失(值函数拟合)
            values_pred = self.critic(states)
            critic_loss = F.mse_loss(values_pred, returns)
            
            # 熵正则项(鼓励探索)
            entropy = dist.entropy().mean()
            
            # 总损失
            loss = actor_loss + 0.5 * critic_loss - self.cfg.entropy_coef * entropy
            
            # 梯度更新
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)  # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
            self.optimizer.step()
        
        # 更新旧策略
        self.old_actor.load_state_dict(self.actor.state_dict())
        self.buffer.clear()

    def train(self, max_episodes=1000):
        for ep in range(max_episodes):
            state, _ = self.env.reset()
            ep_reward = 0
            
            for _ in range(self.cfg.buffer_size):
                action, log_prob = self.select_action(state)
                next_state, reward, done, _, _ = self.env.step(action)
                
                # 存储经验
                self.buffer.append((state, action, log_prob, reward, next_state, done))
                state = next_state
                ep_reward += reward
                
                if len(self.buffer) == self.cfg.buffer_size:
                    self.update()
                
                if done:
                    break
            
            print(f"Episode {ep+1}, Reward: {ep_reward:.1f}")

if __name__ == "__main__":
    cfg = Config()
    agent = PPO(cfg)
    agent.train()

QMXI

集中训练,分散执行

我们先来介绍一下它的三大组件:

智能体网络:每个智能体拥有独立网络,输入为​​局部观测 ota​ 和上一步动作 ut−1a​​​,输出当前动作价值 Qa​(τa,ua)。
​混合网络:功能​​是将各智能体的 Qa​ 值融合为全局 Qtot​。
​超网络:功能​​是生成混合网络的权重参数。

大白话讲解:QMIX 的目标​​:让球员训练时参考教练的全局战术(集中训练),比赛时自己快速决策(分散执行)。

QMIX 的运作原理(三步走)​:

每人有个“小脑”(Agent Network)​
  • 每个智能体用自己的 RNN 网络(如 GRU)分析局部观测(如“对手位置+自己动作”),计算每个动作的收益估值(Q 值)
  • 好比球员根据眼前情况,判断“传球还是投篮更有利”。
2️⃣ ​​教练的“战术板”(Mixing Network)​
  • 教练(混合网络)收集所有球员的 Q 值,结合全场局势(全局状态),用​​非线性方式​​融合成团队总收益(Q_tot)
  • ​关键设计​​:
    • 融合必须满足​​单调性​​——任何球员的 Q 值增加,团队 Q_tot 一定不减少(避免“个人优秀却拖累团队”);
    • 融合权重由“超网络”动态生成,根据战况调整策略
  • 好比教练根据球员能力 + 比分情况,制定“最优战术组合”。
3️⃣ ​​训练与执行的分离​
  • ​训练阶段​​:教练用全局信息(如全场录像)优化战术板,反向更新每个球员的小脑
  • ​执行阶段​​:球员仅凭局部观测独立决策,无需实时问教练
  • 好比训练时看录像分析,比赛时球员自主发挥。

用于​多无人机协同作战​,智能交通调度,游戏AI.

class QMixAgent(nn.Module):
    def __init__(self,obs_dim,action_dim,hidden_dim = 64):
        super().__init__()
        self.rnn = nn.GRUCell(obs_dim,hidden_dim)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim,hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim,action_dim)
            
        )
    def forward(self,obs,hidden_state):
        hidden  =self.rnn(obs,hidden_state)
        q_value= self.fc(hidden)
        return q_value,hidden
    
class QMixer(nn.Module):
    def __init__(self,num_agent,state_dim,hidden_dim = 64):
        super().__init__()
        self.hyper_w1 = nn.Linear(state_dim,hidden_dim*num_agents)
        self.hyper_b1 = nn.Linear(state_dim,hidden_dim)
    
        self.hyper_w2 = nn.Linear(state_dim,hidden_dim)
        self.hyper_b2 = nn.Linear(state_dim,1)
    def forward(self,agent_qs,globals_state):
        w1 = torch.abs(self.hyper_w2(global_state))
        b1 = self.hyper_b1(global_state)
        w2 =torch.abs(self.hyper_w2(global_state))
        b2 = self.hyper_b2(global_state)

        agent_qs = agent_qs.view(-1, agent_qs.size(-1))  # 展平智能体维度
        hidden = torch.bmm(agent_qs.unsqueeze(1), w1.view(-1, agent_qs.size(-1), w1.size(-1)))
        hidden = torch.relu(hidden + b1.view(-1, 1, -1))
        hidden = torch.bmm(hidden, w2.view(-1, hidden.size(-1), w2.size(-1)))
        mixed = hidden + b2.view(-1, 1, -1)
        
        # 恢复智能体维度
        return mixed.view(-1, agent_qs.size(1))
class QMix:
    def __init__(self, num_agents, obs_dim, action_dim, hidden_dim=64):
        # 智能体网络(每个智能体独立)
        self.agents = nn.ModuleList([QMixAgent(obs_dim, action_dim, hidden_dim) for _ in range(num_agents)])
        # 混合网络
        self.mixer = QMixer(num_agents, obs_dim, hidden_dim)
        # 目标网络(稳定训练)
        self.target_agents = nn.ModuleList([nn.Sequential(agent) for agent in self.agents])
        self.target_mixer = nn.Sequential(self.mixer)
        
        # 初始化目标网络参数
        for target, source in zip(self.target_agents.parameters(), self.agents.parameters()):
            target.data.copy_(source.data)
        for target, source in zip(self.target_mixer.parameters(), self.mixer.parameters()):
            target.data.copy_(source.data)
    
    def select_actions(self, obs_n, hidden_states_n, epsilon=0.1):
        """分散执行:根据局部观测选择动作"""
        q_values = []
        for i, agent in enumerate(self.agents):
            q, hidden = agent(obs_n[i], hidden_states_n[i])
            q_values.append(q)
        q_values = torch.stack(q_values, dim=1)  # [batch, num_agents, action_dim]
        
        # ε-贪婪策略选择动作
        if np.random.rand() < epsilon:
            actions = [torch.multinomial(torch.softmax(q, dim=-1), 1).item() for q in q_values]
        else:
            actions = [q.argmax(dim=-1).item() for q in q_values]
        return actions, hidden_states_n
    # 超参数
    num_agents = 2
    obs_dim = 4
    action_dim = 3
    hidden_dim = 64
    gamma = 0.99
    lr = 0.001
    batch_size = 32
    buffer_size = 10000

    # 初始化环境和智能体
    env = MultiAgentGridEnv_QMIX(num_agents=num_agents)  # 假设已定义环境
    agent = QMix(num_agents, obs_dim, action_dim, hidden_dim)

    # 经验回放池
    Buffer = namedtuple('Buffer', ['obs_n', 'actions', 'rewards', 'next_obs_n', 'dones'])
    buffer = deque(maxlen=buffer_size)

    # 训练循环
    for episode in range(1000):
        obs_n = env.reset()
        hidden_states_n = [torch.zeros(1, hidden_dim) for _ in range(num_agents)]
        total_reward = 0
        
        while True:
            # 选择动作
            actions, hidden_states_n = agent.select_actions(obs_n, hidden_states_n)
            # 执行动作
            next_obs_n, rewards, dones, _ = env.step(actions)
            total_reward += sum(rewards)
            # 存储经验
            buffer.append((obs_n, actions, rewards, next_obs_n, dones))
            
            # 更新网络
            if len(buffer) >= batch_size:
                batch = np.array(buffer)[np.random.choice(len(buffer), batch_size)]
                # 转换为张量
                obs_batch = torch.FloatTensor(np.array([b[0](@ref)for b in batch]))
                action_batch = torch.LongTensor(np.array([b[1](@ref)for b in batch]))
                reward_batch = torch.FloatTensor(np.array([b[2](@ref)for b in batch]))
                next_obs_batch = torch.FloatTensor(np.array([b[3](@ref)for b in batch]))
                done_batch = torch.FloatTensor(np.array([b[4](@ref)for b in batch]))
                
                # 计算目标 Q 值
                with torch.no_grad():
                    # 目标网络生成下一状态 Q 值
                    next_q = []
                    for i in range(num_agents):
                        q, _ = agent.target_agents[i](next_obs_batch[:, i](@ref), hidden_states_n[i](@ref))
                        next_q.append(q)
                    next_q = torch.stack(next_q, dim=1)
                    # 混合网络计算全局目标 Q 值
                    target_q = agent.target_mixer(next_q, next_obs_batch)
                    # TD 目标
                    target_q = reward_batch + gamma * (1 - done_batch) * target_q
                
                # 计算当前 Q 值
                current_q = []
                for i in range(num_agents):
                    q, _ = agent.agents[i](obs_batch[:, i](@ref), hidden_states_n[i](@ref))
                    current_q.append(q)
                current_q = torch.stack(current_q, dim=1)
                # 混合网络计算当前全局 Q 值
                pred_q = agent.mixer(current_q, obs_batch)
                
                # 计算损失
                loss = nn.MSELoss()(pred_q, target_q)
                
                # 反向传播
                agent.optimizer.zero_grad()
                loss.backward()
                agent.optimizer.step()
            
            # 软更新目标网络
            for target, source in zip(agent.target_agents.parameters(), agent.agents.parameters()):
                target.data.copy_(0.995 * target.data + 0.005 * source.data)
            for target, source in zip(agent.target_mixer.parameters(), agent.mixer.parameters()):
                target.data.copy_(0.995 * target.data + 0.005 * source.data)
            
            obs_n = next_obs_n
            if all(dones):
                break

SAC

SAC的独特之处在于引入​​“最大熵"

SAC的优化目标包含两部分:

累积奖励​​ + ​​策略熵​​:

  • H(π) 是策略熵,衡量动作随机性(熵越高,探索性越强)
  • α 是​​熵权重系数​​,自动调节探索强度
​网络架构:双Q网络 + 策略网络

它的critic网络是两个独立的Q网络,分别估计动作价值Q1,Q2.

关键技术:

自动熵调节(Auto-α)​​:
α 并非固定值,而是根据策略实际熵与目标熵的差异动态调整:

  • 若策略过于确定(熵低)→ 增大 α,鼓励探索;
  • 若策略过于随机(熵高)→ 减小 α,侧重利用

适用于机器人控制和自动驾驶
 

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from collections import deque
import random

class ReplayBuffer:
    def __init__(self,capacity):
        self.buffer = deque(maxlen = capacity)

    def push(self,state,action,reward,next_state,done):
        self.buffer.append((state,action,reward,next_state,done))
    def sample(self,batch_size):
        state,action,reward,next_state,done = zip(*random.sample(self.buffer,batch_size))
        return np.stack(state),np.stack(action),np.stack(reward),np.stack(next_state),np.stack(done)
    def __len__(self):
        return len(self.buffer)
    
class GaussianPolicy(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        
        self.mean = nn.Linear(hidden_dim, action_dim)
        self.log_std = nn.Linear(hidden_dim, action_dim)
    
    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        
        mean = self.mean(x)
        log_std = self.log_std(x)
        log_std = torch.clamp(log_std, min=-20, max=2)  # 限制log_std范围
        return mean, log_std
    
    def sample(self, state):
        # 重参数化采样 (公式11)
        mean, log_std = self.forward(state)
        std = log_std.exp()
        normal = torch.distributions.Normal(mean, std)
        
        # 从正态分布采样
        x_t = normal.rsample()
        
        # Tanh变换处理动作边界 (附录C)
        action = torch.tanh(x_t)
        
        # 计算log概率 (公式21)
        log_prob = normal.log_prob(x_t)
        log_prob -= torch.log(1 - action.pow(2) + 1e-6)
        log_prob = log_prob.sum(1, keepdim=True)
        
        return action, log_prob
    

class QNetwork(nn.Module):
    def __init__(self,state_dim,action_dim,hidden_dim=256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim+action_dim,hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim,hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim,1)
        )
    def forward(self,state,action):
        x = torch.cat([state,action],dim = 1)
        return self.net(x)
    
class SAC:
    def __init__(self,state_dim,action_dim,gamma = 0.99,tau=0.005,alpha=0.2,lr=3e-4):
        self.gamma = gamma
        self.tau= tau
        self.alpha = alpha

        self.policy = GaussianPolicy(state_dim,action_dim)
        self.policy_optimizer = optim.Adam(self.policy.parameters(),lr=lr)

        self.q_net1 = QNetwork(state_dim,action_dim)
        self.q_net2 = QNetwork(state_dim,action_dim)
        self.target_q_net1.load_state_dict(self.q_net1.state_dict())
        self.target_q_net2.load_state_dict(self.q_net2.state_dict())

        self.target_entropy = -action_dim
        self.log_alpha = torch.tensor(np.log(alpha),requires_grad = True)
        self.alpha_optimizer = optim.Adam([self.log_alpha],lr=lr)
        self.replay_buffer =ReplayBuffer(capacity=1000000)

    def update(self,batch_size):
        state,action,reward,next_state,done = self.replay_buffer.sample(batch_size)
        state = torch.FloatTensor(state)
        action = torch.FloatTensor(action)
        reward = torch.FloatTensor(reward).unsqueeze(1)
        next_state =torch.FloatTensor(next_state)
        done = torch.FloatTensor(done).unsqueeze(1)

        with torch.no_grad():
            next_action,next_log_prob = self.policy.sample(next_action)

            target_q1 = self.target_q_net1(next_state,next_action)
            target_q2 = self.target_q_net2(next_state,next_action)
            target_q = torch.min(target_q1,target_q2) - self.alpha*next_log_prob
            target_value = reward + (1-done)*self.gamma*target_q    
        current_q1 = self.q_net1(state, action)
        current_q2 = self.q_net2(state, action)
        q_loss1 = F.mse_loss(current_q1, target_value)
        q_loss2 = F.mse_loss(current_q2, target_value)
        self.q_optimizer1.zero_grad()
        q_loss1.backward()
        self.q_optimizer1.step()
        
        self.q_optimizer2.zero_grad()
        q_loss2.backward()
        self.q_optimizer2.step()
        for param in self.q_net1.parameters():
            param.requires_grad = False
        for param in self.q_net2.parameters():
            param.requires_grad = False
            
        # 采样新动作
        new_action, log_prob = self.policy.sample(state)

        min_q = torch.min(
            self.q_net1(state, new_action),
            self.q_net2(state, new_action)
        )
        policy_loss = (self.alpha * log_prob - min_q).mean()
        
        # 优化策略网络
        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()
        for param in self.q_net1.parameters():
            param.requires_grad = True
        for param in self.q_net2.parameters():
            param.requires_grad = True
        alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()
        self.alpha_optimizer.zero_grad()
        alpha_loss.backward()
        self.alpha_optimizer.step()
        self.alpha = self.log_alpha.exp()
        self.soft_update(self.target_q_net1, self.q_net1)
        self.soft_update(self.target_q_net2, self.q_net2)
        return q_loss1.item(), q_loss2.item(), policy_loss.item(), alpha_loss.item()
    def soft_update(self, target, source):
        """指数移动平均更新目标网络 (算法1)"""
        for target_param, param in zip(target.parameters(), source.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
    def select_action(self, state, evaluate=False):
        """选择动作 (训练与评估不同)"""
        state = torch.FloatTensor(state).unsqueeze(0)
        if evaluate:
            # 评估时使用均值 (图3a)
            mean, _ = self.policy(state)
            action = torch.tanh(mean)
            return action.detach().numpy()[0]
        else:
            # 训练时采样
            action, _ = self.policy.sample(state)
            return action.detach().numpy()[0]
    def train(self, env, episodes, batch_size=256):
        for episode in range(episodes):
            state = env.reset()
            episode_reward = 0
            
            while True:
                # 选择并执行动作
                action = self.select_action(state)
                next_state, reward, done, _ = env.step(action)
                
                # 存储经验
                self.replay_buffer.push(state, action, reward, next_state, done)
                episode_reward += reward
                
                # 状态转移
                state = next_state
                
                # 更新网络
                if len(self.replay_buffer) > batch_size:
                    self.update(batch_size)
                
                if done:
                    break