今天来总结一下AC,A2C,DDPG,HAC,MADDPG,MCTS,PlaNet,PPO,QMIX,SAC算法的各个主要内容和各个算法的优势和适配哪些环境场景。
AC
首先是AC算法,简单来说就是一个执行者和一个评论家,两个互相提升,互相影响,AC算法是后面很多强化学习算法的基础算法,它结合了策略梯度(Policy Gradient) 和值函数逼近(Value Function Approximation) 的优势。
大体流程就是,当前策略和环境交互,得到轨迹 {st,at,rt+1,st+1},然后Critic用TD误差更新值函数(如Q值或V值),接着Actor用Critic提供的 Q(st,at) 或优势函数 A(st,at) 计算策略梯度,然后更新。适合机器人和游戏等场景。
咱来看下大致代码:
import torch
import torch.nn as nn
import torch.optim as optim
class Actor(nn.Module):
def __init__(self,state_dim,action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim,64),
nn.ReLU(),
nn.LinearI(64,action_dim),
nn.Softmax(dim = -1)
)
def forward(self,state):
return self.net(state)
class Critic(nn.Module):
def __init__(self,state_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim,64),
nn.ReLu(),
nn.Linear(64,1)
)
def forward(self,state):
return self.net(state)
class AC_Agent:
def __init__(self,state_dim,action_dim,gamma =0.99,lr = 0.001):
self.actor = Actor(state_dim,action_dim)
self.critic = Critic(state_dim)
self.optimizer_actor = optim.Adam(self.actor.parameters(),lr = lr)
self.optimizer_critic = optim.Adam(self.critic.parameters(),lr = lr)
self.gamma = gamma
def select_action(self,state):
state = torch.FloatTensor(state)
probs = self.actor(state)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(),log_prob
def update(self,state,log_prob,reward,next_state,done):
value = self.critic(torch.FloatTenstor(state))
next_value = self.critic(torch.FloatTensor(next_state)) if not done else 0
td_target = reward+self.gamma*next_value
td_error = td_target - value
critic_loss = td_error.pow(2).mean()
self.optimizer_critic.zero_grad()
critic_loss.backward()
self.optimizer_critic.step()
actor_loss = -log_prob*td_error.detach()
self.optimizer_actor.zero_grad()
actor_loss.backward()
self.optimizer_actor.step()
A2C
在AC的基础上引入优势函数, A(s,a)=Q(s,a)−V(s)≈r+γV(s′)−V(s)来衡量动作相对平均水平的优势,进一步降低方差。
import torch
import torch.nn as nn
import torch.optim as optim
class Actor(nn.Module):
def __init__(self,state_dim,action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim,64),
nn.ReLu(),
nn.Linear(64,action_dim),
nn.Softmax(dim=-1)
)
def forward(self,state):
return self.net(state)
class Critic(nn.Module):
def __init__(self, state_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim,64),
nn.ReLU(),
nn.Linear(64,1)
)
def forward(self,state):
return self.net(state)
class A2C_Agent:
def __init__(self,state_dim,action_dim,gamma = 0.99,lr = 0.001):
self.actor = Actor(state_dim,action_dim)
self.critic = Critic(state_dim)
self.optimizer = optim.Adam(
list(self.actor.parameters())+list(self.critic.parameters()),
lr = lr
)
self.gamma = gamma
def update(self,states,actions,rewards,next_states,dones):
states = torch.FloatTensor(states)
rewards = torch.FloatTensor(rewards)
dones = torch.FloatTensor(dones)
values = self.critic(states).squeeze()
next_values = self.critic(torch.FloatTensor(next_states)).squeeze()
next_values[dones] = 0
############
advantages = rewards + self.gamma*next_values -values###################
############
critic_loss = advantages.pow(2).mean()
probs = self.actor(states)
dists = torch.distributions.Categorical(probs)#将概率分布转化为离散分布对象
log_probs = dists.log_prob(torch.LongTensor(actions))
actor_loss = -(log_probs * advantages.detach()).mean()
total_loss = actor_loss + 0.5*critic_loss
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
优势函数为advantages = rewards + self.gamma*next_values -values
它之后作为loss更新网络。
DDPG
DDPG是一种面向连续动作空间的强化学习算法,结合了深度Q网络(DQN) 和策略梯度(Policy Gradient) 方法的优势。
Critic的目标变为了采样小批量样本,计算目标Q值:
最小化Critic的均方误差损失。
Actor的目标:通过梯度上升更新Actor:
优势是能高效处理连续动作。
class Actor(nn.Module):
def __init__(self,state_dim,action_dim):
super(Actor,self).__init__()
self.fc1 = nn.Linear(state_dim,30)
self.fc2 = nn.Linear(30,action_dim)
self.fc1.weight.data.normal_(0,0.1)#权重初始化
self.fc2.weight.data.normal_(0,0.1)
def forward(self,state):
x = torch.relu(self,fc1(state)),
action = torch.tanh(self.fc2(x))*ACTION_BOUND
return action
class Critic(nn.Module):
def __init__(self,state_dim,action_dim):
super(Critic,self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim,30)
self.fc2 = nn.Linear(30,1)
self.fc1.weight.data.normal_(0,0.1)
self.fc2.weight.data.normal_(0,0.1)
def forward(self,state,action):
x = torch.cat([state,action],dim = 1)
x = torch.relu(self,fc1(x))
q_value = self.fc2(x)
return q_value
class ReplayBuffer:
def __init__(self,capacity):
self.buffer = deque(maxlen = capacity)
def push(self,state,action,reward,next_state,done):
self.buffer.append((state,action,reward,next_state,done))
def sample(self,batch_size):
batch = random.sample(self.buffer,batch_size)
states,actions,rewards,next_states,dones = zip(*batch)
return (
torch.FloatTensor(states),
torch.FloatTensor(actions),
torch.FloatTensor(reward).unsqueeze(1),
torch.FloatTensor(next_states),
torch.FloatTensor(dones).unsqueeze(1)
)
def __len__(self):
return len(self.buffer)
class DDPGAgent:
def __init__(self,state_dim,action_dim):
self.actor = Actor()
self.critic = Critic()
self.actor_target = Actor()
self.critic_target = Critic()
self.actor_target.load_state_dict(self.actor.state_dict())
self.critic_target.load_state_dict(self.critic.state_dict())
self.noise = OUNoise(action_dim)
def select_action(self,state,add_noise = True):
action = self.actor(state)
if add_noise:action+=self.noise.sample()
return np.clip(action,-ACTION_BOUND,ACTION_BOUND)
def update(self):
target_q = rewards + GAMMA*(1-dones)*self.critic_target(next_target,next_actions)
critic_loss = nn.MSELoss()(current_q,target_q)
actor_loss = -self.critic(states,self.actor(states)).mean()
for param,target_param in zip(net.parameters(),target_net.parameters()):
target_param.data.copy_(TAU*param.data + (1-TAU)*target_param.data)
class OUNoise:
def __init__(self,action_dim,mu=0,theta=0.15,sigma = 0.2):
self.state = np.ones(action_dim)*mu
def sample(self):
dx = self.theta*(self.mu - self.state)+self.sigma*np.random.randn(self.action_dim)
self.state += dx
return self.state
env = gym.make('P-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
agent = DDPGAgent(state_dim,action_dim)
for episode in range(EPISODES):
state = env.reset()
total_reward = 0
agent.noise.reset()
for step in range(EP_STEPS):
action = agent.select_action(state,add_noise=True)
next_state,reward,done,_=env.step(action)
agent.memory.push(state,action,reward,next_state,done)
state = next_state
total_reward += reward
if len(agent.memory)>BATH_SIZE:
agent.update()
if done:
break
HAC
HAC是啥?一句话说清
像公司里领导派活,员工干活!
高层(Manager):只定目标(比如“拿下市场30%份额”),不碰具体操作。
底层(Worker):只负责执行(比如“地推100个客户”),不管战略。
一、核心思想
HAC通过分层决策结构解决长时域任务中的稀疏奖励问题:
HAC怎么运作?分三步走
领导画饼:高层看全局(当前状态+终极目标),拆解出 子目标(例如“先攻占华东市场”)。
员工冲刺:底层拿到子目标,疯狂执行动作(比如发传单、搞促销),目标就是 逼近领导定的KPI。
发奖金原则:
员工:离子目标越近,奖金越多(内在奖励:奖金 = -距离)。
领导:子目标完成后,看整个任务赚了多少钱(环境奖励),再调整策略。
import torch
import torch.nn as nn
import torch.optimizer as optim
import numpy as np
import gym
class GoalConditionedNetwork(nn.Module):
def __init__(self,state_dim,goal_dim,action_dim,hidden_dim=128)
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + goal_dim,hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim,hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim,action_dim)
)
def forward(self,state,goal):
x =torch.cat([state,goal],dim=-1)
return self.net(x)
class Critic(nn.Module):
def __init__(self,state_dim,goal_dim,action_dim):
super().__init__()
self.value_net = GoalConditionedNetwork(state_dim,goal_dim,1)
def forward(self,state,goal,action):
x = torch.cat([state,goal,action],dim=-1)
return self.value_net(x,torch.zero_like(goal))
class HACAgent:
def __init__(self,env_name,k_level =2 ,subgoal_horizon = 50):
self.env = gym.make(env_name)
self.k_level = k_level
self.H = subgoal_horizon
self.layers =[]
for i in range(k_level):
if i == 0:
actor = Actor(self.env.observation_space.shape[0],self.env.observation_space_shape[0],self.env.action_space.shape[0])
critic = Critic(self.env.observation_space.shape[0], self.env.observation_space.shape[0], self.env.action_space.shape[0])
else: # 高层: 输出子目标(视为特殊动作)
actor = Actor(self.env.observation_space.shape[0], self.env.observation_space.shape[0], self.env.observation_space.shape[0])
critic = Critic(self.env.observation_space.shape[0], self.env.observation_space.shape[0], self.env.observation_space.shape[0])
self.layers.append({'actor': actor, 'critic': critic})
self.replay_buffers = [ReplayBuffer() for _ in range(k_level)]
self.optimizers = []
for layer in self.layers:
self.optimizers.append({
'actor': optim.Adam(layer['actor'].parameters(), lr=1e-4),
'critic': optim.Adam(layer['critic'].parameters(), lr=1e-3)
})
def act(self,state,goal,layer_idx):
action = self.layers[layer_idx]['actor'](state,goal)
noise = torch.randn_like(action) * 0.1
return torch.clamp(action + noise,-1,1)
def update(self,layer_idx,batch_size = 64):
states, goals, actions, rewards, next_states, dones = self.replay_buffers[layer_idx].sample(batch_size)
# Critic更新: 最小化时序差分误差 (TD Loss)
with torch.no_grad():
target_q = rewards + 0.99 * self.layers[layer_idx]['critic'](next_states, goals, self.act(next_states, goals, layer_idx))
current_q = self.layers[layer_idx]['critic'](states, goals, actions)
critic_loss = nn.MSELoss()(current_q, target_q)
# Actor更新: 最大化Q值 (策略梯度)
actor_loss = -self.layers[layer_idx]['critic'](states, goals, self.act(states, goals, layer_idx)).mean()
# 反向传播
self.optimizers[layer_idx]['critic'].zero_grad()
critic_loss.backward()
self.optimizers[layer_idx]['critic'].step()
self.optimizers[layer_idx]['actor'].zero_grad()
actor_loss.backward()
self.optimizers[layer_idx]['actor'].step()
def train_hierarchy(self, state, final_goal, level):
if level == 0:
action = self.act(state, final_goal, level)
next_state, reward, done, _ = self.env.step(action)
return next_state, reward, done
subgoal = self.act(state, final_goal, level)
for _ in range(self.H):
next_state, reward, done = self.train_hierarchy(state, subgoal, level-1)
if done or self.check_subgoal(state, subgoal):
break
state = next_state
intrinsic_reward = -np.linalg.norm(state - subgoal)
return next_state, intrinsic_reward, done
def check_subgoal(self, state, subgoal, threshold=0.05):
"""判断是否达成子目标 (欧氏距离)"""
return np.linalg.norm(state - subgoal) < threshold
class ReplayBuffer:
def __init__(self, capacity=100000):
self.buffer = []
self.capacity = capacity
def add(self, state, goal, action, reward, next_state, done):
"""存储转换样本"""
if len(self.buffer) >= self.capacity:
self.buffer.pop(0)
self.buffer.append((state, goal, action, reward, next_state, done))
# HER核心: 用实际状态替换目标生成新样本 [2,6](@ref)
self.buffer.append((state, next_state, action, 0, next_state, True))
def sample(self, batch_size):
indices = np.random.randint(len(self.buffer), size=batch_size)
batch = [self.buffer[i] for i in indices]
return [torch.tensor(x) for x in zip(*batch)]
if __name__ == "__main__":
agent = HACAgent("MountainCarContinuous-v0", k_level=2)
for episode in range(1000):
state = agent.env.reset()
final_goal = np.array([0.48, 0.04]) # 山顶目标 [2](@ref)
total_reward = 0
done = False
while not done:
# 从最高层开始执行策略
next_state, reward, done = agent.train_hierarchy(state, final_goal, agent.k_level-1)
total_reward += reward
state = next_state
# 每10轮更新所有层级
if episode % 10 == 0:
for level in range(agent.k_level):
agent.update(level)
print(f"Episode {episode}, Reward: {total_reward:.2f}")
HER
强化学习中的“事后诸葛亮”技术(HER)核心逻辑是:
“既然已经知道结果,不如假装这个结果就是当初的目标,重新学习一遍!”
机器人试图抓取杯子但失败了 → 实际抓到了旁边的笔。
传统方法:判定为失败,丢弃该经验。
HER方法:将目标临时改为“抓到笔”,生成新样本(状态+动作+新目标+成功奖励),让算法学习到“虽然没抓到杯子,但抓笔的动作是有价值的”。
这种“用实际结果重新定义目标”的思维方式,与人类“事后诸葛亮”(事情发生后声称自己早有预料)的行为逻辑高度相似。
MADDPG
多智能体强化学习领域的标志性算法,基于DDPG扩展而来,核心解决多智能体协作与竞争环境中的连续动作空间问题。
DDPG是单兵作战,那MADDPG就是多兵协作,
DDPG的短板:多个智能体独立训练时,环境因其他智能体策略变化而“失控”(如队友突然改变战术,导致你的策略失效)
MADDPG的突破:
Actor 只用局部观察(如球员只看自己周围的画面)(优势:训练时通过全局视角稳定学习,执行时无需通信,适应实时环境)
Critic 输入全局信息:消除环境波动影响(教练用全局数据稳定指导)
策略多样性:训练时为每个智能体准备多套策略(如球员练多种战术),应对对手变化
class MADDPGAgentManager:
def __init__(self,num_agent:int,obs_dim_list:List[int],action_dim:int,
actor_lr:float,critic_lr:float,gamma:float,tau:float,
device:torch.device):
self.num_agents = num_agents
self.obs_dim_list = obs_dim_list
self.gamma = gamma
self.tau = tau
self.device = device
joint_obs_dim = sum(obs_dim_list)
joint_action_dim = sum(obs_dim_list)
joint_action_dim_one_hot = num_agents*action_dim
self.actors:List[ActorDiscrete] = []
self.critics:List[CentralizedCritic]=[]
self.target_actors:List[ActorDiscrete] = []
self.target_critics:List[CentralizedCritic]=[]
self.actor_optimizers:List[optim.Optimizer]=[]
self.critic_optimizers:List[optim.Optimizer]=[]
for i in range(num_agents):
actor = ActorDiscrete(obs_dim_list[i], action_dim).to(device)
critic = CentralizedCritic(joint_obs_dim, joint_action_dim_one_hot).to(device)
target_actor = ActorDiscrete(obs_dim_list[i], action_dim).to(device)
target_critic = CentralizedCritic(joint_obs_dim, joint_action_dim_one_hot).to(device)
# 初始化目标网络
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())
for p in target_actor.parameters(): p.requires_grad = False
for p in target_critic.parameters(): p.requires_grad = False
# 创建优化器
actor_optimizer = optim.Adam(actor.parameters(), lr=actor_lr)
critic_optimizer = optim.Adam(critic.parameters(), lr=critic_lr)
self.actors.append(actor)
self.critics.append(critic)
self.target_actors.append(target_actor)
self.target_critics.append(target_critic)
self.actor_optimizers.append(actor_optimizer)
self.critic_optimizers.append(critic_optimizer)
def select_actions(self, obs_list: List[torch.Tensor], use_exploration=True) -> Tuple[List[int], List[torch.Tensor]]:
""" 基于它们的局部观测为所有智能体选择动作。 """
actions = []
log_probs = []
for i in range(self.num_agents):
self.actors[i].eval()
with torch.no_grad():
act, log_prob = self.actors[i].select_action(obs_list[i].to(self.device), use_exploration)
# 如果是单个观测,转换为标量
if act.dim() == 0:
act = act.item()
self.actors[i].train()
actions.append(act)
log_probs.append(log_prob)
return actions, log_probs
def update(self,batch:Experience,agent_id:int)->Tuple[float,float]:
obs_batch,act_batch,rew_batch,next_obs_batch,dones_batch = batch
batch_size = obs_batch.shape[0]
joint_obs = obs_batch.view(batch_size,-1).to(self.device)
joint_next_obs = next_obs_batch.view(batch_size,-1).to(self.device)
act_one_hot = F.one_hot(act_barth,num_classes = self.action_dim).float()
joint_actions_one_hot = act_one_hot.view(batch_size,-1).to(self.device)
reward_i =rew_batch[:,agent_id].unsequeeze(-1).to(self.device)
dones_i = dones_batch[:,agent_id].unsequeeze(-1).to(self.device)
with torch.no_grad():
target_next_actions_list = []
for j in range(self.num_agents):
obs_j_next= next_obs_batch[:,j,:].to(device)
action_j_next,_ = self.target_actors[j].select_action(obs_j_next,use_exploration=False)
acrion_j_next_one_hot = F.one_hot(action_j_next,num_classes = self.action_dim).float()
target_next_actions_list.append(action_j_next_one_hot)
joint_target_next_actions = torch.cat(target_next_actions_list,dim=1).to(self.device)
q_target_next = self.target_critics[agent_id](joint_next_obs, joint_target_next_actions)
# 计算目标 y = r_i + gamma * Q'_i * (1 - d_i)
y = rewards_i + self.gamma * (1.0 - dones_i) * q_target_next
q_current = self.critics[agent_id](joint_obs, joint_actions_one_hot)
# 计算Critic损失
critic_loss = F.mse_loss(q_current, y)
# 优化Critic i
self.critic_optimizers[agent_id].zero_grad()
critic_loss.backward()
torch.nn.utils.clip_grad_norm_(self.critics[agent_id].parameters(), 1.0) # 可选裁剪
self.critic_optimizers[agent_id].step()
for p in self.critics[agent_id].parameters():
p.requires_grad = False
current_actions_list = []
log_probs_i_list = [] # 仅存储智能体i的log_prob
for j in range(self.num_agents):
obs_j = obs_batch[:, j, :].to(self.device)
# 更新需要动作概率/对数几率 - 使用Gumbel-Softmax或类似REINFORCE的更新
dist_j = self.actors[j](obs_j) # 获取分类分布
# 如果我们使用DDPG目标:Q(s, mu(s)),我们需要确定性动作
# 这里我们使用策略梯度适配:最大化 E[log_pi_i * Q_i_detached]
action_j = dist_j.sample() # 采样动作作为Q的输入
if j == agent_id:
log_prob_i = dist_j.log_prob(action_j) # 只需要被更新智能体的log_prob
action_j_one_hot = F.one_hot(action_j, num_classes=self.action_dim).float()
current_actions_list.append(action_j_one_hot)
joint_current_actions = torch.cat(current_actions_list, dim=1).to(self.device)
# 计算Actor损失: - E[log_pi_i * Q_i_detached]
# Q_i使用所有Actor的*当前*动作进行评估
q_for_actor_loss = self.critics[agent_id](joint_obs, joint_current_actions)
actor_loss = -(log_prob_i * q_for_actor_loss.detach()).mean() # 分离Q值
# 替代的DDPG风格损失(如果Actor是确定性的):
# actor_loss = -self.critics[agent_id](joint_obs, joint_current_actions).mean()
# 优化Actor i
self.actor_optimizers[agent_id].zero_grad()
actor_loss.backward()
torch.nn.utils.clip_grad_norm_(self.actors[agent_id].parameters(), 1.0) # 可选裁剪
self.actor_optimizers[agent_id].step()
# 解冻Critic梯度
for p in self.critics[agent_id].parameters():
p.requires_grad = True
return critic_loss.item(), actor_loss.item()
def update_targets(self) -> None:
""" 对所有目标网络执行软更新。 """
for i in range(self.num_agents):
soft_update(self.target_critics[i], self.critics[i], self.tau)
soft_update(self.target_actors[i], self.actors[i], self.tau)
MCTS蒙特卡洛树
MCTS是一种基于随机模拟的启发式搜索算法,用于在复杂决策问题(如棋类游戏、机器人规划)中寻找最优策略。它通过构建一棵动态生长的搜索树,结合随机模拟(蒙特卡洛方法)和树搜索的精确性,平衡探索(未知节点) 与利用(已知高价值节点) 的关系。
MCTS通过迭代执行以下四个步骤构建决策树:
选择(Selection)
- 从根节点(当前状态)出发,递归选择子节点,直到到达一个未完全展开的叶节点。
- 选择策略:使用UCB公式(Upper Confidence Bound) 平衡探索与利用
:UCB=NQ+CNlnNparent - Q:节点累计奖励
- N:节点访问次数
- C:探索常数(通常取 2)
扩展(Expansion)
例如:围棋中从当前棋盘状态生成所有可能的落子位置。模拟(Simulation)
关键:模拟过程无需依赖领域知识,仅需基础规则。反向传播(Backpropagation)
- 将模拟结果回传更新路径上所有节点的统计信息:
- 节点访问次数 N←N+1
- 累计奖励 Q←Q+奖励值。
- 将模拟结果回传更新路径上所有节点的统计信息:
MCTS它无需启发式知识:仅依赖游戏规则,适用于规则明确但状态空间庞大的问题(如围棋)。
import math
import numpy as np
import random
from collections import defaultdict
# ========== 游戏环境类 ========== [3](@ref)
class TicTacToe:
def __init__(self):
self.board = np.array([[' ' for _ in range(3)] for _ in range(3)])
self.current_player = 'X'
self.winner = None
def get_available_moves(self):
"""返回所有空位坐标"""
return [(i, j) for i in range(3) for j in range(3) if self.board[i][j] == ' ']
def make_move(self, move):
"""执行落子并切换玩家"""
i, j = move
if self.board[i][j] != ' ':
return False
self.board[i][j] = self.current_player
if self.check_win():
self.winner = self.current_player
elif not self.get_available_moves():
self.winner = 'Draw' # 平局
self.current_player = 'O' if self.current_player == 'X' else 'X'
return True
def check_win(self):
"""检查胜利条件"""
# 检查行和列
for i in range(3):
if self.board[i][0] == self.board[i][1] == self.board[i][2] != ' ':
return True
if self.board[0][i] == self.board[1][i] == self.board[2][i] != ' ':
return True
# 检查对角线
if self.board[0][0] == self.board[1][1] == self.board[2][2] != ' ':
return True
if self.board[0][2] == self.board[1][1] == self.board[2][0] != ' ':
return True
return False
def copy(self):
"""深拷贝当前游戏状态"""
new_game = TicTacToe()
new_game.board = np.copy(self.board)
new_game.current_player = self.current_player
new_game.winner = self.winner
return new_game
# ========== MCTS节点类 ========== [1,7](@ref)
class Node:
def __init__(self, game_state, parent=None):
self.game_state = game_state # TicTacToe对象
self.parent = parent
self.children = []
self.visits = 0
self.wins = 0 # 累计奖励值
def is_fully_expanded(self):
"""检查是否完全扩展"""
return len(self.children) == len(self.game_state.get_available_moves())
def best_child(self, exploration=1.4):
"""UCB公式选择最优子节点"""
return max(self.children,
key=lambda child: child.wins / (child.visits + 1e-6) +
exploration * math.sqrt(math.log(self.visits + 1) / (child.visits + 1e-6)))
# ========== MCTS搜索算法 ========== [1,3,7](@ref)
class MCTS:
def __init__(self, root_state, iterations=1000):
self.root = Node(root_state)
self.iterations = iterations
def search(self):
"""执行完整MCTS搜索"""
for _ in range(self.iterations):
# 1. 选择阶段 (Selection)
node = self._select(self.root)
# 2. 扩展阶段 (Expansion)
if node.game_state.winner is None:
node = self._expand(node)
# 3. 模拟阶段 (Simulation)
reward = self._simulate(node.game_state.copy())
# 4. 反向传播 (Backpropagation)
self._backpropagate(node, reward)
# 返回访问次数最多的动作
return max(self.root.children, key=lambda c: c.visits)
def _select(self, node):
"""递归选择子节点直至叶节点"""
while node.children and not node.game_state.winner:
if not node.is_fully_expanded():
return node
node = node.best_child()
return node
def _expand(self, node):
"""扩展新子节点"""
available_moves = node.game_state.get_available_moves()
for move in available_moves:
if move not in [child.move for child in node.children]:
new_state = node.game_state.copy()
new_state.make_move(move)
child = Node(new_state, parent=node)
child.move = move # 记录导致此状态的动作
node.children.append(child)
return random.choice(node.children) # 随机选择新扩展节点
def _simulate(self, game_state):
"""随机模拟至游戏结束"""
while game_state.winner is None:
move = random.choice(game_state.get_available_moves())
game_state.make_move(move)
# 奖励计算:胜+1,平+0.5,负+0
if game_state.winner == 'X': return 1
elif game_state.winner == 'Draw': return 0.5
else: return 0
def _backpropagate(self, node, reward):
"""更新路径上所有节点统计"""
while node:
node.visits += 1
node.wins += reward
node = node.parent
# ========== 主程序示例 ========== [3](@ref)
if __name__ == "__main__":
game = TicTacToe()
print("初始棋盘:")
print(game.board)
while not game.winner:
if game.current_player == 'X': # AI回合
mcts = MCTS(game.copy(), iterations=1000)
best_node = mcts.search()
move = best_node.move
print(f"AI选择落子位置: {move}")
else: # 玩家回合
moves = game.get_available_moves()
print(f"可选位置: {moves}")
move = tuple(map(int, input("输入落子位置(行列号,如'0 1'): ").split()))
game.make_move(move)
print(game.board)
print(f"游戏结束! 胜者: {game.winner}")
PlaNet
PlaNet 是由 Google Research 提出的 基于模型的强化学习(Model-Based RL)算法,核心目标是通过学习环境的潜在动态模型,直接在紧凑的潜在空间中进行规划,从而显著提升样本效率。
它就像你闭眼打游戏🎮:
编码器:睁眼看到画面,立刻记住关键信息(比如敌人位置)
RNN:闭眼时靠记忆推算"如果按左键,敌人会怎么移动"
预测器:纯靠脑补下一步状态(可能猜错)
解码器:睁眼验证脑补的画面和实际是否一致。
核心原理与技术架构
1. 潜在动态模型(Latent Dynamics Model)
PlaNet 的核心创新是避开高维像素空间的直接预测(如传统视频预测模型),转而学习一个低维潜在状态空间的动力学模型
- 编码器(Encoder):将图像观测 ot 压缩为潜在状态 st(捕捉物体位置、速度等抽象特征)
- 循环状态空间模型(RSSM)它分为确定性路径和随机性路径:
- 确定性路径:在完全已知环境下,通过固定规则计算最优路径,结果唯一且可复现。这里通过 RNN 记忆历史信息(ht=RNN(ht−1,st−1,at−1))。
- 随机性路径:在部分未知或动态环境下,通过概率模型处理不确定性,生成适应变化的路径。这里是预测潜在状态分布 st∼p(st∣ht),表达环境不确定性。
- 解码器(Decoder):从 st 重建观测 ot 并预测奖励 rt
2. 潜在空间规划(Latent Space Planning)
PlaNet 的规划过程完全在潜在状态空间中进行,大幅降低计算开销
- 编码历史观测:将过去图像序列编码为当前潜在状态 st
- 交叉熵方法(CEM):
- 采样大量动作序列 {at,at+1,...,at+H}。
- 用 RSSM 预测未来潜在状态 st+1,...,st+H 和奖励 rt,...,rt+H。
- 选择累积奖励最高的动作序列
- 执行与重规划:仅执行最优序列的首个动作 at∗,收到新观测后重新规划。
大白话讲解PlaNet 的核心组件
1. 环境模拟器(RSSM 模型)
作用:把游戏画面压缩成“抽象记忆”,并预测下一步画面和奖励。
工作流程:
看见画面 → 用编码器(encoder)压缩成“关键特征” ✅
脑补状态 → 结合动作和记忆,预测下一步状态(分两种):
确定性状态(如“按左键角色必左移”)→ 用 RNN 记忆
随机状态(如“敌人可能向左或向右”)→ 用概率分布表示 🌟
验证脑补 → 用解码器(decoder)把预测状态还原成画面,对比真实画面有多像 ✅
2. 规划器(CEM 算法)
作用:在模拟器中试错 N 套动作方案,选出得分最高的。
操作步骤:
随机生成 100 套动作(如“左跳→右闪→攻击”)
用模拟器推演每套动作结果 → 计算累计奖励(如:杀敌 +10 分,掉坑 -5 分)
选出得分最高的 20 套方案 → 按它们的动作调整策略(均值+波动范围)
只执行最优方案的第一步 → 之后重新规划(避免翻车)
省时省命:
真实游戏只需玩 100 局,传统方法要 10 万局(效率提升 1000 倍)
高维画面也不怕:
不直接处理像素,用“抽象记忆”做决策(类似人脑记关键特征)
规划稳准狠:
在模拟器中试错安全无代价,还能探索冷门神操作
PlaNet = 游戏存档模拟器 + 战术推演大师
它像军师一样,先在沙盘上排兵布阵,验证可行后再出兵,以最小代价赢下战局
PlaNet适用于连续控制任务。
class RSSM(nn.Module):
def __init__(self, state_dim, obs_dim, action_dim, hidden_size=200):
# 网络定义
self.encoder = nn.Sequential(...) # 观测 → 潜在状态
self.rnn = nn.LSTMCell(...) # 确定性状态传递
self.prior_net = nn.Sequential(...) # 先验状态分布
self.decoder = nn.Sequential(...) # 状态重建观测
def forward(self, obs, action, prev_state):
# 1. 编码观测 → 后验分布
post_params = self.encoder(obs)
post_mean, post_std = post_params.chunk(2, dim=-1)
s_t_post = Normal(post_mean, torch.exp(post_std)).rsample()
# 2. RNN更新确定性状态
h_t, c_t = self.rnn(torch.cat([action, s_t_post], -1), prev_state)
# 3. 预测先验分布
prior_params = self.prior_net(h_t)
prior_mean, prior_std = prior_params.chunk(2, dim=-1)
# 4. 重建观测
obs_recon = self.decoder(s_t_post)
return (h_t, c_t), (post_mean, post_std), (prior_mean, prior_std), obs_recon
class CEMPlanner:
def plan(self,model,initial_state):
for _ in range(n_iter):
actions= Normal(self.mean,self.std ).sample((n_samples,))
rewards= []
for a_seq in actions:
state = initial_state
total_reward = 0
for t in range(horizon):
next_state,reward = model.step(state,a_seq[t])
total_reward += reward
state = next_state
rewards.append(total_reward)
top_idx = torch.topk(rewards,top_k).indices
top_actions = actions[top_idx]
self.mean = top_actions
self.std = top_actions.std(dim = 0)
return self.mean[0]
def train_planet(env,epochs = 1000):
model = RSSM(state_dim= 32,...)
planner = CEMPlanner(env.action_dim)
for epoch in range(epochs):
obs = env.reset()
state = model.initial_state() # 初始化状态
for t in range(100): # 环境交互步数
# 规划生成动作
action = planner.plan(model, state)
# 环境执行
next_obs, reward, done = env.step(action)
# 模型更新
optimizer.zero_grad()
state, post, prior, recon = model(next_obs, action, state)
loss = elbo_loss(next_obs, recon, post, prior)
loss.backward()
optimizer.step()
PPO
PPO算法是结合AC架构和策略梯度,优势函数,策略探索,截断机制(招牌)。
这是它的截断机制,目的是为了防止策略更新幅度过大 ,避免策略崩溃。
PPO的创新解决方案:
1.信任区域约束(Trust Region): 通过KL散度限制更新幅度 → 保障训练稳定性
2.重要性采样(Importance Sampling): 用旧策略的数据评估新策略 → 数据重用
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import gym
import numpy as np
# 超参数配置
class Config:
def __init__(self):
self.env_name = "Pendulum-v1"
self.hidden_dim = 256
self.actor_lr = 3e-4
self.critic_lr = 1e-3
self.gamma = 0.99 # 折扣因子
self.gae_lambda = 0.95 # GAE参数
self.eps_clip = 0.2 # 剪切范围
self.K_epochs = 10 # 策略更新轮次
self.entropy_coef = 0.01 # 熵正则系数
self.batch_size = 64
self.buffer_size = 2048
# 策略网络(Actor):输出动作分布参数
class PolicyNet(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim):
super().__init__()
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
self.mu_head = nn.Linear(hidden_dim, action_dim) # 均值输出
self.log_std_head = nn.Linear(hidden_dim, action_dim) # 对数标准差输出
def forward(self, state):
x = self.shared(state)
mu = self.mu_head(x)
log_std = self.log_std_head(x)
std = torch.exp(log_std).clamp(min=1e-6) # 确保标准差>0
return mu, std
# 价值网络(Critic):评估状态价值
class ValueNet(nn.Module):
def __init__(self, state_dim, hidden_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, state):
return self.net(state).squeeze(-1)
# PPO核心算法类
class PPO:
def __init__(self, cfg):
self.env = gym.make(cfg.env_name)
state_dim = self.env.observation_space.shape[0]
action_dim = self.env.action_space.shape[0]
self.actor = PolicyNet(state_dim, action_dim, cfg.hidden_dim)
self.critic = ValueNet(state_dim, cfg.hidden_dim)
self.old_actor = PolicyNet(state_dim, action_dim, cfg.hidden_dim) # 旧策略
self.old_actor.load_state_dict(self.actor.state_dict())
self.optimizer = optim.Adam([
{'params': self.actor.parameters(), 'lr': cfg.actor_lr},
{'params': self.critic.parameters(), 'lr': cfg.critic_lr}
])
self.cfg = cfg
self.buffer = [] # 经验缓冲区
def select_action(self, state):
state = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
mu, std = self.old_actor(state)
dist = Normal(mu, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(-1) # 多维动作对数概率求和
return action.squeeze(0).numpy(), log_prob.item()
def compute_gae(self, rewards, values, next_values, dones):
"""广义优势估计(GAE)"""
deltas = rewards + self.cfg.gamma * next_values * (1 - dones) - values
advantages = np.zeros_like(rewards)
advantage = 0
for t in reversed(range(len(rewards))):
advantage = deltas[t] + self.cfg.gamma * self.cfg.gae_lambda * (1 - dones[t]) * advantage
advantages[t] = advantage
returns = advantages + values
return advantages, returns
def update(self):
# 数据转换为张量
states = torch.FloatTensor(np.array([t[0] for t in self.buffer]))
actions = torch.FloatTensor(np.array([t[1] for t in self.buffer]))
old_log_probs = torch.FloatTensor(np.array([t[2] for t in self.buffer]))
rewards = torch.FloatTensor(np.array([t[3] for t in self.buffer]))
next_states = torch.FloatTensor(np.array([t[4] for t in self.buffer]))
dones = torch.FloatTensor(np.array([t[5] for t in self.buffer]))
# 计算GAE优势函数
with torch.no_grad():
values = self.critic(states).numpy()
next_values = self.critic(next_states).numpy()
advantages, returns = self.compute_gae(
rewards.numpy(), values, next_values, dones.numpy()
)
advantages = torch.FloatTensor(advantages)
returns = torch.FloatTensor(returns)
# 多轮策略优化
for _ in range(self.cfg.K_epochs):
# 计算新策略的动作概率
mu, std = self.actor(states)
dist = Normal(mu, std)
log_probs = dist.log_prob(actions).sum(-1)
# 核心1:概率比计算
ratios = torch.exp(log_probs - old_log_probs.detach())
# 核心2:Clipped Surrogate Loss
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1-self.cfg.eps_clip, 1+self.cfg.eps_clip) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Critic损失(值函数拟合)
values_pred = self.critic(states)
critic_loss = F.mse_loss(values_pred, returns)
# 熵正则项(鼓励探索)
entropy = dist.entropy().mean()
# 总损失
loss = actor_loss + 0.5 * critic_loss - self.cfg.entropy_coef * entropy
# 梯度更新
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5) # 梯度裁剪
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
self.optimizer.step()
# 更新旧策略
self.old_actor.load_state_dict(self.actor.state_dict())
self.buffer.clear()
def train(self, max_episodes=1000):
for ep in range(max_episodes):
state, _ = self.env.reset()
ep_reward = 0
for _ in range(self.cfg.buffer_size):
action, log_prob = self.select_action(state)
next_state, reward, done, _, _ = self.env.step(action)
# 存储经验
self.buffer.append((state, action, log_prob, reward, next_state, done))
state = next_state
ep_reward += reward
if len(self.buffer) == self.cfg.buffer_size:
self.update()
if done:
break
print(f"Episode {ep+1}, Reward: {ep_reward:.1f}")
if __name__ == "__main__":
cfg = Config()
agent = PPO(cfg)
agent.train()
QMXI
集中训练,分散执行
我们先来介绍一下它的三大组件:
智能体网络:每个智能体拥有独立网络,输入为局部观测 ota 和上一步动作 ut−1a,输出当前动作价值 Qa(τa,ua)。
混合网络:功能是将各智能体的 Qa 值融合为全局 Qtot。
超网络:功能是生成混合网络的权重参数。
大白话讲解:QMIX 的目标:让球员训练时参考教练的全局战术(集中训练),比赛时自己快速决策(分散执行)。
QMIX 的运作原理(三步走):
每人有个“小脑”(Agent Network)
- 每个智能体用自己的 RNN 网络(如 GRU)分析局部观测(如“对手位置+自己动作”),计算每个动作的收益估值(Q 值)
- 好比球员根据眼前情况,判断“传球还是投篮更有利”。
2️⃣ 教练的“战术板”(Mixing Network)
- 教练(混合网络)收集所有球员的 Q 值,结合全场局势(全局状态),用非线性方式融合成团队总收益(Q_tot)
- 关键设计:
- 融合必须满足单调性——任何球员的 Q 值增加,团队 Q_tot 一定不减少(避免“个人优秀却拖累团队”);
- 融合权重由“超网络”动态生成,根据战况调整策略
- 好比教练根据球员能力 + 比分情况,制定“最优战术组合”。
3️⃣ 训练与执行的分离
- 训练阶段:教练用全局信息(如全场录像)优化战术板,反向更新每个球员的小脑
- 执行阶段:球员仅凭局部观测独立决策,无需实时问教练
- 好比训练时看录像分析,比赛时球员自主发挥。
用于多无人机协同作战,智能交通调度,游戏AI.
class QMixAgent(nn.Module):
def __init__(self,obs_dim,action_dim,hidden_dim = 64):
super().__init__()
self.rnn = nn.GRUCell(obs_dim,hidden_dim)
self.fc = nn.Sequential(
nn.Linear(hidden_dim,hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim,action_dim)
)
def forward(self,obs,hidden_state):
hidden =self.rnn(obs,hidden_state)
q_value= self.fc(hidden)
return q_value,hidden
class QMixer(nn.Module):
def __init__(self,num_agent,state_dim,hidden_dim = 64):
super().__init__()
self.hyper_w1 = nn.Linear(state_dim,hidden_dim*num_agents)
self.hyper_b1 = nn.Linear(state_dim,hidden_dim)
self.hyper_w2 = nn.Linear(state_dim,hidden_dim)
self.hyper_b2 = nn.Linear(state_dim,1)
def forward(self,agent_qs,globals_state):
w1 = torch.abs(self.hyper_w2(global_state))
b1 = self.hyper_b1(global_state)
w2 =torch.abs(self.hyper_w2(global_state))
b2 = self.hyper_b2(global_state)
agent_qs = agent_qs.view(-1, agent_qs.size(-1)) # 展平智能体维度
hidden = torch.bmm(agent_qs.unsqueeze(1), w1.view(-1, agent_qs.size(-1), w1.size(-1)))
hidden = torch.relu(hidden + b1.view(-1, 1, -1))
hidden = torch.bmm(hidden, w2.view(-1, hidden.size(-1), w2.size(-1)))
mixed = hidden + b2.view(-1, 1, -1)
# 恢复智能体维度
return mixed.view(-1, agent_qs.size(1))
class QMix:
def __init__(self, num_agents, obs_dim, action_dim, hidden_dim=64):
# 智能体网络(每个智能体独立)
self.agents = nn.ModuleList([QMixAgent(obs_dim, action_dim, hidden_dim) for _ in range(num_agents)])
# 混合网络
self.mixer = QMixer(num_agents, obs_dim, hidden_dim)
# 目标网络(稳定训练)
self.target_agents = nn.ModuleList([nn.Sequential(agent) for agent in self.agents])
self.target_mixer = nn.Sequential(self.mixer)
# 初始化目标网络参数
for target, source in zip(self.target_agents.parameters(), self.agents.parameters()):
target.data.copy_(source.data)
for target, source in zip(self.target_mixer.parameters(), self.mixer.parameters()):
target.data.copy_(source.data)
def select_actions(self, obs_n, hidden_states_n, epsilon=0.1):
"""分散执行:根据局部观测选择动作"""
q_values = []
for i, agent in enumerate(self.agents):
q, hidden = agent(obs_n[i], hidden_states_n[i])
q_values.append(q)
q_values = torch.stack(q_values, dim=1) # [batch, num_agents, action_dim]
# ε-贪婪策略选择动作
if np.random.rand() < epsilon:
actions = [torch.multinomial(torch.softmax(q, dim=-1), 1).item() for q in q_values]
else:
actions = [q.argmax(dim=-1).item() for q in q_values]
return actions, hidden_states_n
# 超参数
num_agents = 2
obs_dim = 4
action_dim = 3
hidden_dim = 64
gamma = 0.99
lr = 0.001
batch_size = 32
buffer_size = 10000
# 初始化环境和智能体
env = MultiAgentGridEnv_QMIX(num_agents=num_agents) # 假设已定义环境
agent = QMix(num_agents, obs_dim, action_dim, hidden_dim)
# 经验回放池
Buffer = namedtuple('Buffer', ['obs_n', 'actions', 'rewards', 'next_obs_n', 'dones'])
buffer = deque(maxlen=buffer_size)
# 训练循环
for episode in range(1000):
obs_n = env.reset()
hidden_states_n = [torch.zeros(1, hidden_dim) for _ in range(num_agents)]
total_reward = 0
while True:
# 选择动作
actions, hidden_states_n = agent.select_actions(obs_n, hidden_states_n)
# 执行动作
next_obs_n, rewards, dones, _ = env.step(actions)
total_reward += sum(rewards)
# 存储经验
buffer.append((obs_n, actions, rewards, next_obs_n, dones))
# 更新网络
if len(buffer) >= batch_size:
batch = np.array(buffer)[np.random.choice(len(buffer), batch_size)]
# 转换为张量
obs_batch = torch.FloatTensor(np.array([b[0](@ref)for b in batch]))
action_batch = torch.LongTensor(np.array([b[1](@ref)for b in batch]))
reward_batch = torch.FloatTensor(np.array([b[2](@ref)for b in batch]))
next_obs_batch = torch.FloatTensor(np.array([b[3](@ref)for b in batch]))
done_batch = torch.FloatTensor(np.array([b[4](@ref)for b in batch]))
# 计算目标 Q 值
with torch.no_grad():
# 目标网络生成下一状态 Q 值
next_q = []
for i in range(num_agents):
q, _ = agent.target_agents[i](next_obs_batch[:, i](@ref), hidden_states_n[i](@ref))
next_q.append(q)
next_q = torch.stack(next_q, dim=1)
# 混合网络计算全局目标 Q 值
target_q = agent.target_mixer(next_q, next_obs_batch)
# TD 目标
target_q = reward_batch + gamma * (1 - done_batch) * target_q
# 计算当前 Q 值
current_q = []
for i in range(num_agents):
q, _ = agent.agents[i](obs_batch[:, i](@ref), hidden_states_n[i](@ref))
current_q.append(q)
current_q = torch.stack(current_q, dim=1)
# 混合网络计算当前全局 Q 值
pred_q = agent.mixer(current_q, obs_batch)
# 计算损失
loss = nn.MSELoss()(pred_q, target_q)
# 反向传播
agent.optimizer.zero_grad()
loss.backward()
agent.optimizer.step()
# 软更新目标网络
for target, source in zip(agent.target_agents.parameters(), agent.agents.parameters()):
target.data.copy_(0.995 * target.data + 0.005 * source.data)
for target, source in zip(agent.target_mixer.parameters(), agent.mixer.parameters()):
target.data.copy_(0.995 * target.data + 0.005 * source.data)
obs_n = next_obs_n
if all(dones):
break
SAC
SAC的独特之处在于引入“最大熵"
SAC的优化目标包含两部分:
累积奖励 + 策略熵:
- H(π) 是策略熵,衡量动作随机性(熵越高,探索性越强)
- α 是熵权重系数,自动调节探索强度
网络架构:双Q网络 + 策略网络
它的critic网络是两个独立的Q网络,分别估计动作价值Q1,Q2.
关键技术:
自动熵调节(Auto-α):
α 并非固定值,而是根据策略实际熵与目标熵的差异动态调整:
- 若策略过于确定(熵低)→ 增大 α,鼓励探索;
- 若策略过于随机(熵高)→ 减小 α,侧重利用
适用于机器人控制和自动驾驶
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from collections import deque
import random
class ReplayBuffer:
def __init__(self,capacity):
self.buffer = deque(maxlen = capacity)
def push(self,state,action,reward,next_state,done):
self.buffer.append((state,action,reward,next_state,done))
def sample(self,batch_size):
state,action,reward,next_state,done = zip(*random.sample(self.buffer,batch_size))
return np.stack(state),np.stack(action),np.stack(reward),np.stack(next_state),np.stack(done)
def __len__(self):
return len(self.buffer)
class GaussianPolicy(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=256):
super().__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.mean = nn.Linear(hidden_dim, action_dim)
self.log_std = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
mean = self.mean(x)
log_std = self.log_std(x)
log_std = torch.clamp(log_std, min=-20, max=2) # 限制log_std范围
return mean, log_std
def sample(self, state):
# 重参数化采样 (公式11)
mean, log_std = self.forward(state)
std = log_std.exp()
normal = torch.distributions.Normal(mean, std)
# 从正态分布采样
x_t = normal.rsample()
# Tanh变换处理动作边界 (附录C)
action = torch.tanh(x_t)
# 计算log概率 (公式21)
log_prob = normal.log_prob(x_t)
log_prob -= torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(1, keepdim=True)
return action, log_prob
class QNetwork(nn.Module):
def __init__(self,state_dim,action_dim,hidden_dim=256):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim+action_dim,hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim,hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim,1)
)
def forward(self,state,action):
x = torch.cat([state,action],dim = 1)
return self.net(x)
class SAC:
def __init__(self,state_dim,action_dim,gamma = 0.99,tau=0.005,alpha=0.2,lr=3e-4):
self.gamma = gamma
self.tau= tau
self.alpha = alpha
self.policy = GaussianPolicy(state_dim,action_dim)
self.policy_optimizer = optim.Adam(self.policy.parameters(),lr=lr)
self.q_net1 = QNetwork(state_dim,action_dim)
self.q_net2 = QNetwork(state_dim,action_dim)
self.target_q_net1.load_state_dict(self.q_net1.state_dict())
self.target_q_net2.load_state_dict(self.q_net2.state_dict())
self.target_entropy = -action_dim
self.log_alpha = torch.tensor(np.log(alpha),requires_grad = True)
self.alpha_optimizer = optim.Adam([self.log_alpha],lr=lr)
self.replay_buffer =ReplayBuffer(capacity=1000000)
def update(self,batch_size):
state,action,reward,next_state,done = self.replay_buffer.sample(batch_size)
state = torch.FloatTensor(state)
action = torch.FloatTensor(action)
reward = torch.FloatTensor(reward).unsqueeze(1)
next_state =torch.FloatTensor(next_state)
done = torch.FloatTensor(done).unsqueeze(1)
with torch.no_grad():
next_action,next_log_prob = self.policy.sample(next_action)
target_q1 = self.target_q_net1(next_state,next_action)
target_q2 = self.target_q_net2(next_state,next_action)
target_q = torch.min(target_q1,target_q2) - self.alpha*next_log_prob
target_value = reward + (1-done)*self.gamma*target_q
current_q1 = self.q_net1(state, action)
current_q2 = self.q_net2(state, action)
q_loss1 = F.mse_loss(current_q1, target_value)
q_loss2 = F.mse_loss(current_q2, target_value)
self.q_optimizer1.zero_grad()
q_loss1.backward()
self.q_optimizer1.step()
self.q_optimizer2.zero_grad()
q_loss2.backward()
self.q_optimizer2.step()
for param in self.q_net1.parameters():
param.requires_grad = False
for param in self.q_net2.parameters():
param.requires_grad = False
# 采样新动作
new_action, log_prob = self.policy.sample(state)
min_q = torch.min(
self.q_net1(state, new_action),
self.q_net2(state, new_action)
)
policy_loss = (self.alpha * log_prob - min_q).mean()
# 优化策略网络
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
for param in self.q_net1.parameters():
param.requires_grad = True
for param in self.q_net2.parameters():
param.requires_grad = True
alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()
self.alpha_optimizer.zero_grad()
alpha_loss.backward()
self.alpha_optimizer.step()
self.alpha = self.log_alpha.exp()
self.soft_update(self.target_q_net1, self.q_net1)
self.soft_update(self.target_q_net2, self.q_net2)
return q_loss1.item(), q_loss2.item(), policy_loss.item(), alpha_loss.item()
def soft_update(self, target, source):
"""指数移动平均更新目标网络 (算法1)"""
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
def select_action(self, state, evaluate=False):
"""选择动作 (训练与评估不同)"""
state = torch.FloatTensor(state).unsqueeze(0)
if evaluate:
# 评估时使用均值 (图3a)
mean, _ = self.policy(state)
action = torch.tanh(mean)
return action.detach().numpy()[0]
else:
# 训练时采样
action, _ = self.policy.sample(state)
return action.detach().numpy()[0]
def train(self, env, episodes, batch_size=256):
for episode in range(episodes):
state = env.reset()
episode_reward = 0
while True:
# 选择并执行动作
action = self.select_action(state)
next_state, reward, done, _ = env.step(action)
# 存储经验
self.replay_buffer.push(state, action, reward, next_state, done)
episode_reward += reward
# 状态转移
state = next_state
# 更新网络
if len(self.replay_buffer) > batch_size:
self.update(batch_size)
if done:
break