PyTorch实战用A2C算法训练倒立摆解决连续动作空间难题强化学习在连续控制任务中面临的核心挑战之一是如何有效处理连续动作空间。本文将带你用PyTorch实现Advantage Actor-CriticA2C算法攻克倒立摆Pendulum-v0这一经典连续控制问题。不同于离散动作空间如Atari游戏连续动作空间要求算法能够输出实数值动作这对策略网络的构建提出了特殊要求。1. 理解A2C算法与连续动作空间A2CAdvantage Actor-Critic是Actor-Critic框架的同步版本它结合了策略梯度Policy Gradient和价值函数近似Value Function Approximation的优点。在连续动作空间场景中A2C的核心创新在于双网络结构Actor网络负责策略动作选择Critic网络评估状态价值优势函数使用TD误差作为优势估计减少策略更新的方差同步更新相比A3C的异步更新A2C更易于实现和调试对于Pendulum-v0环境动作空间是连续的扭矩值范围[-2.0, 2.0]这要求策略网络能够输出符合物理约束的实数值动作。传统离散动作空间的argmax操作不再适用我们需要采用高斯分布采样策略# 高斯策略示例 mean actor_network(state) # 输出均值 std torch.exp(log_std) # 可训练的对数标准差 dist Normal(mean, std) # 构建高斯分布 action dist.sample() # 采样动作2. 环境配置与网络架构2.1 环境初始化首先配置Gym环境和关键参数import gym import torch import torch.nn as nn import torch.optim as optim from torch.distributions import Normal env gym.make(Pendulum-v0) state_dim env.observation_space.shape[0] action_dim env.action_space.shape[0] max_action float(env.action_space.high[0]) # 超参数 gamma 0.99 # 折扣因子 lr_actor 0.0001 # Actor学习率 lr_critic 0.001 # Critic学习率 update_interval 5 # 更新间隔2.2 网络设计采用共享底层独立输出的网络结构class SharedBase(nn.Module): def __init__(self, state_dim): super().__init__() self.fc1 nn.Linear(state_dim, 128) self.fc2 nn.Linear(128, 128) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) return x class ActorCritic(nn.Module): def __init__(self, state_dim, action_dim, max_action): super().__init__() self.shared SharedBase(state_dim) # Actor分支 self.actor_mean nn.Linear(128, action_dim) self.actor_logstd nn.Parameter(torch.zeros(1, action_dim)) # Critic分支 self.critic nn.Linear(128, 1) self.max_action max_action def forward(self, state): shared_out self.shared(state) # Actor输出 mean self.max_action * torch.tanh(self.actor_mean(shared_out)) log_std self.actor_logstd.expand_as(mean) std torch.exp(log_std) # Critic输出 value self.critic(shared_out) return mean, std, value提示使用tanh激活函数将均值输出限制在[-max_action, max_action]范围内符合环境动作空间要求。3. 核心训练流程3.1 数据收集与存储A2C属于on-policy算法需要实时收集数据def collect_episode(env, model, max_steps200): states, actions, rewards, next_states, dones [], [], [], [], [] state env.reset() for _ in range(max_steps): with torch.no_grad(): state_tensor torch.FloatTensor(state).unsqueeze(0) mean, std, _ model(state_tensor) dist Normal(mean, std) action dist.sample().clamp(-model.max_action, model.max_action) next_state, reward, done, _ env.step(action.numpy()[0]) # 存储转移 states.append(state) actions.append(action.numpy()[0]) rewards.append(reward) next_states.append(next_state) dones.append(done) state next_state if done: break return states, actions, rewards, next_states, dones3.2 优势计算与归一化关键步骤是计算优势函数和回报归一化def compute_returns_and_advantages(rewards, values, next_values, dones, gamma0.99): returns [] advantages [] R next_values[-1] * (1 - dones[-1]) for t in reversed(range(len(rewards))): R rewards[t] gamma * R * (1 - dones[t]) returns.insert(0, R) advantages.insert(0, R - values[t]) # 归一化 returns torch.FloatTensor(returns) returns (returns - returns.mean()) / (returns.std() 1e-8) advantages torch.FloatTensor(advantages) advantages (advantages - advantages.mean()) / (advantages.std() 1e-8) return returns, advantages3.3 策略与价值函数更新联合优化Actor和Criticdef update_model(model, optimizer, states, actions, returns, advantages): states torch.FloatTensor(states) actions torch.FloatTensor(actions) # 前向传播 means, stds, values model(states) dists Normal(means, stds) # 计算损失 log_probs dists.log_prob(actions).sum(-1) actor_loss -(log_probs * advantages).mean() critic_loss 0.5 * (returns - values.squeeze()).pow(2).mean() # 熵正则项 entropy dists.entropy().mean() total_loss actor_loss 0.5 * critic_loss - 0.01 * entropy # 反向传播 optimizer.zero_grad() total_loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) optimizer.step() return actor_loss.item(), critic_loss.item(), entropy.item()4. 训练技巧与性能优化4.1 高斯策略的调参技巧标准差参数化使用对数标准差logstd保证标准差始终为正探索控制初始阶段设置较大标准差随着训练逐渐减小动作裁剪避免采样值超出环境允许范围# 自适应标准差示例 self.log_std nn.Parameter(torch.zeros(1, action_dim)) std torch.exp(self.log_std) # 保证std04.2 奖励工程倒立摆的原始奖励范围在[-16.27, 0]建议进行归一化# 奖励归一化 rewards np.array(rewards) rewards (rewards 16.27) / 16.27 # 映射到[0,1]4.3 训练曲线监控典型训练过程应监控以下指标指标正常表现异常处理平均回报逐渐接近0检查网络结构/学习率Critic损失平稳下降出现NaN需减小学习率策略熵初期高逐渐降低熵过低需增加探索5. 完整训练循环与结果分析5.1 主训练循环def train(env, model, optimizer, episodes1000): episode_rewards [] for episode in range(episodes): # 收集数据 states, actions, rewards, next_states, dones collect_episode(env, model) # 计算价值估计 with torch.no_grad(): states_tensor torch.FloatTensor(states) next_states_tensor torch.FloatTensor(next_states) _, _, values model(states_tensor) _, _, next_values model(next_states_tensor) # 计算回报和优势 returns, advantages compute_returns_and_advantages( rewards, values.squeeze(), next_values.squeeze(), dones) # 更新模型 actor_loss, critic_loss, entropy update_model( model, optimizer, states, actions, returns, advantages) # 记录结果 total_reward sum(rewards) episode_rewards.append(total_reward) if episode % 50 0: print(fEpisode {episode}, Reward: {total_reward:.1f}, fActor Loss: {actor_loss:.3f}, Critic Loss: {critic_loss:.3f}) return episode_rewards5.2 典型训练结果经过约500-1000轮训练后应能观察到平均回报从初始的-1000左右提升到-200以内策略熵从高熵状态随机探索逐渐降低Critic损失稳定下降价值估计逐渐准确在测试阶段可以观察到倒立摆从随机摆动到能够保持直立状态def test(env, model, episodes5): for episode in range(episodes): state env.reset() done False total_reward 0 while not done: env.render() with torch.no_grad(): state_tensor torch.FloatTensor(state).unsqueeze(0) mean, std, _ model(state_tensor) action mean.numpy()[0] # 测试时直接使用均值 state, reward, done, _ env.step(action) total_reward reward print(fTest Episode {episode}, Reward: {total_reward:.1f}) env.close()6. 常见问题与解决方案6.1 训练不稳定现象回报波动大策略突然退化解决方案减小学习率特别是Actor网络增加批处理大小收集更多样本再更新添加更强的熵正则化6.2 策略收敛到局部最优现象倒立摆保持固定角度不继续优化解决方案初期增加探索调高初始标准差尝试不同的网络初始化结合课程学习Curriculum Learning逐步增加任务难度6.3 计算资源优化对于更复杂的环境可以考虑使用GPU加速实现并行环境采样采用PPO等更稳定的算法变种7. 扩展与进阶方向掌握了基础A2C实现后可以考虑以下进阶改进分布式训练实现A3C的异步版本混合探索策略结合OU噪声或参数空间噪声分层强化学习将任务分解为高层目标制定和底层控制结合模仿学习使用专家演示加速训练实际项目中我在机械臂控制任务中发现将A2C与以下技巧结合效果显著动作空间分层粗调微调状态表示学习自动编码器基于模型的预训练