代码拉取完成,页面将自动刷新
import argparse
import os
import time
from distutils.util import strtobool
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from schedulers.rl_scheduler import RLScheduler
from sim.simple_sim import run as run_test_env
from tasks.workflow import Workflow
from os import path as op
def get_data_dir():
return op.join(op.dirname(__file__), '../data/train_agents')
def get_model_dir():
return op.join(op.dirname(__file__), '../data/pretrained_models')
def get_results_folder():
return op.join(op.dirname(__file__), '../results/train_agents')
def get_runs_folder():
return op.join(get_results_folder(), 'runs')
def ppo_training(agent, envs, suffix='', seed=9527, device=torch.device('cuda'), run_test_env=None,
**hyper_params):
writer = SummaryWriter(op.join(get_runs_folder(), f"{agent.agent_name}{suffix}_{seed}_{int(time.time())}"))
writer.add_text(
"hyperparameters",
"|param|value|\n|-|-|\n%s" % ("\n".join([f"|{key}|{value}|" for key, value in hyper_params.items()])),
)
torch.backends.cudnn.deterministic = hyper_params.get('torch_deterministic', True)
gamma = hyper_params.get('gamma', 0.99)
gae_lambda = hyper_params.get('gae_lambda', 0.95)
learning_rate = hyper_params.get('learning_rate', 2.5e-4)
anneal_lr = hyper_params.get('anneal_lr', True)
num_steps = hyper_params.get('num_steps', 128)
num_envs = hyper_params.get('num_envs', 1)
total_timesteps = hyper_params.get('total_timesteps', 12800)
num_minibatches = hyper_params.get('num_minibatches', 4)
enable_gae = hyper_params.get('enable_gae', True)
update_epochs = hyper_params.get('update_epochs', 4)
clip_coef = hyper_params.get('clip_coef', 0.1)
norm_adv = hyper_params.get('norm_adv', True),
clip_vloss = hyper_params.get('clip_vloss', True)
ent_coef = hyper_params.get('ent_coef', 0.001)
vf_coef = hyper_params.get('vf_coef', 0.5)
max_grad_norm = hyper_params.get('max_grad_norm', 0.5)
target_kl = hyper_params.get('target_kl', None)
batch_size = int(num_envs * num_steps)
minibatch_size = int(batch_size // num_minibatches)
# 确保模型参数需要梯度
agent.train() # 设置为训练模式
for param in agent.parameters():
param.requires_grad = True
optimizer = optim.Adam(agent.parameters(), lr=learning_rate, eps=1e-5)
# 使用torch.cuda.amp进行混合精度训练
scaler = torch.amp.GradScaler(device=device)
# 预分配内存,避免频繁分配
obs = torch.zeros((num_steps, num_envs) + envs.single_observation_space.shape, device=device)
actions = torch.zeros((num_steps, num_envs) + envs.single_action_space.shape, device=device)
logprobs = torch.zeros((num_steps, num_envs), device=device)
rewards = torch.zeros((num_steps, num_envs), device=device)
dones = torch.zeros((num_steps, num_envs), device=device)
values = torch.zeros((num_steps, num_envs), device=device)
global_step = 0
start_time = time.time()
next_done = torch.zeros(num_envs).to(device)
next_obs = torch.Tensor(envs.reset()[0]).to(device)
num_updates = total_timesteps // batch_size
best_cost = float('inf')
best_agent = None
test_costs = []
train_losses = []
train_returns = []
termed_entropies = []
termed_agents = []
# 使用@torch.jit.script装饰器来加速计算GAE
@torch.jit.script
def compute_gae(rewards, values, dones, next_value, next_done, gamma: float, gae_lambda: float, num_steps: int):
advantages = torch.zeros_like(rewards)
last_gae = torch.zeros_like(rewards[0]) # 使用张量而不是标量
for t in range(num_steps - 1, -1, -1):
if t == num_steps - 1:
nextnonterminal = 1.0 - next_done.float()
nextvalues = next_value.view(-1)
else:
nextnonterminal = 1.0 - dones[t + 1].float()
nextvalues = values[t + 1]
nextnonterminal = nextnonterminal.view(-1)
delta = rewards[t] + gamma * nextvalues * nextnonterminal - values[t]
advantages[t] = last_gae = delta + gamma * gae_lambda * nextnonterminal * last_gae
returns = advantages + values
return advantages, returns
for update in range(1, num_updates + 1):
# Annealing the rate if instructed to do so.
if anneal_lr:
frac = 1.0 - (update - 1.0) / num_updates
lrnow = frac * learning_rate
optimizer.param_groups[0]["lr"] = lrnow
# 使用with torch.cuda.amp.autocast()进行混合精度训练
with torch.amp.autocast(device.type):
for step in range(0, num_steps):
global_step += num_envs
obs[step] = next_obs
dones[step] = next_done
with torch.no_grad():
action, logprob, _, value = agent.get_action_and_value(next_obs)
values[step] = value.flatten()
actions[step] = action
logprobs[step] = logprob
next_obs_np, reward, done, truncated, info = envs.step(action.cpu().numpy())
rewards[step] = torch.tensor(reward, device=device)
next_obs = torch.tensor(next_obs_np, device=device)
next_done = torch.tensor(done, device=device)
# 使用优化后的GAE计算
with torch.no_grad():
next_value = agent.get_value(next_obs).reshape(1, -1)
if enable_gae:
advantages, returns = compute_gae(
rewards, values, dones, next_value, next_done,
gamma, gae_lambda, num_steps
)
else:
returns = torch.zeros_like(rewards)
for t in reversed(range(num_steps)):
if t == num_steps - 1:
nextnonterminal = 1.0 - next_done
next_return = next_value
else:
nextnonterminal = 1.0 - dones[t + 1]
next_return = returns[t + 1]
returns[t] = rewards[t] + gamma * nextnonterminal * next_return
advantages = returns - values
# 使用torch.utils.data.DataLoader进行批处理
b_obs = obs.reshape((-1,) + envs.single_observation_space.shape)
b_logprobs = logprobs.reshape(-1)
b_actions = actions.reshape((-1,) + envs.single_action_space.shape)
b_advantages = advantages.reshape(-1)
b_returns = returns.reshape(-1)
b_values = values.reshape(-1)
# 优化训练循环
clipfracs = [] # 在每个更新开始时初始化
for epoch in range(update_epochs):
for start in range(0, batch_size, minibatch_size):
mb_inds = torch.randperm(batch_size)[:minibatch_size]
# 确保输入张量在正确的设备上
mb_obs = b_obs[mb_inds].to(device)
mb_actions = b_actions[mb_inds].to(device)
# 获取新的预测
_, newlogprob, entropy, newvalue = agent.get_action_and_value(
mb_obs, mb_actions.long().T
)
# 计算损失,确保所有操作都在计算图中
logratio = newlogprob - b_logprobs[mb_inds].detach()
ratio = logratio.exp()
with torch.no_grad():
approx_kl = ((ratio - 1) - logratio).mean()
clipfrac = ((ratio - 1.0).abs() > clip_coef).float().mean().item()
clipfracs.append(clipfrac)
# 计算优势
mb_advantages = b_advantages[mb_inds].detach()
if norm_adv:
mb_advantages = (mb_advantages - mb_advantages.mean()) / (mb_advantages.std() + 1e-8)
# 策略损失
pg_loss1 = -mb_advantages * ratio
pg_loss2 = -mb_advantages * torch.clamp(ratio, 1 - clip_coef, 1 + clip_coef)
pg_loss = torch.max(pg_loss1, pg_loss2).mean()
# 价值损失
newvalue = newvalue.view(-1)
if clip_vloss:
v_loss_unclipped = (newvalue - b_returns[mb_inds].detach()) ** 2
v_clipped = b_values[mb_inds].detach() + torch.clamp(
newvalue - b_values[mb_inds].detach(),
-clip_coef,
clip_coef,
)
v_loss_clipped = (v_clipped - b_returns[mb_inds].detach()) ** 2
v_loss = 0.5 * torch.max(v_loss_unclipped, v_loss_clipped).mean()
else:
v_loss = 0.5 * ((newvalue - b_returns[mb_inds].detach()) ** 2).mean()
# 熵损失
entropy_loss = entropy.mean()
# 总损失
loss = pg_loss - ent_coef * entropy_loss + v_loss * vf_coef
# 优化器步骤
optimizer.zero_grad(set_to_none=True) # 更高效的梯度清零
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
nn.utils.clip_grad_norm_(agent.parameters(), max_grad_norm)
scaler.step(optimizer)
scaler.update()
if target_kl is not None and approx_kl > target_kl:
break
entropy_item = entropy_loss.item()
if run_test_env is not None:
termed_entropies.append(entropy_item)
termed_agents.append(agent.state_dict())
if update % 125 == 0:
termed_best_entropy, termed_best_state = min(zip(termed_entropies, termed_agents), key=lambda x: x[0])
cur_agent = agent.state_dict()
agent.load_state_dict(termed_best_state)
cost, _ = run_test_env(agent=agent)
test_costs.append((update, cost))
if cost < best_cost:
best_cost = cost
best_agent = termed_best_state
agent.save(f'_{update}')
termed_agents.clear()
termed_entropies.clear()
agent.load_state_dict(cur_agent)
y_pred, y_true = b_values.cpu().numpy(), b_returns.cpu().numpy()
var_y = np.var(y_true)
explained_var = np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y
# TRY NOT TO MODIFY: record rewards for plotting purposes
writer.add_scalar("charts/learning_rate", optimizer.param_groups[0]["lr"], global_step)
writer.add_scalar("losses/value_loss", v_loss.item(), global_step)
writer.add_scalar("losses/policy_loss", pg_loss.item(), global_step)
writer.add_scalar("losses/entropy", entropy_item, global_step)
writer.add_scalar("losses/old_approx_kl", approx_kl.item(), global_step)
writer.add_scalar("losses/approx_kl", approx_kl.item(), global_step)
writer.add_scalar("losses/clipfrac", np.mean(clipfracs), global_step)
writer.add_scalar("losses/explained_variance", explained_var, global_step)
writer.add_scalar("charts/SPS", int(global_step / (time.time() - start_time)), global_step)
# 添加训练过程中的关键指标到tensorboard
writer.add_scalar("training/policy_gradient_loss", pg_loss.item(), global_step)
writer.add_scalar("training/value_loss", v_loss.item(), global_step)
writer.add_scalar("training/total_loss", loss.item(), global_step)
writer.add_scalar("training/entropy", entropy_item, global_step)
writer.add_scalar("training/approx_kl_divergence", approx_kl.item(), global_step)
# 添加梯度信息
total_grad_norm = 0
for param in agent.parameters():
if param.grad is not None:
param_norm = param.grad.data.norm(2)
total_grad_norm += param_norm.item() ** 2
total_grad_norm = total_grad_norm ** 0.5
writer.add_scalar("training/gradient_norm", total_grad_norm, global_step)
# 记录值函数统计信息
writer.add_scalar("values/mean", b_values.mean().item(), global_step)
writer.add_scalar("values/std", b_values.std().item(), global_step)
writer.add_scalar("values/max", b_values.max().item(), global_step)
writer.add_scalar("values/min", b_values.min().item(), global_step)
# 记录优势函数统计信息
writer.add_scalar("advantages/mean", b_advantages.mean().item(), global_step)
writer.add_scalar("advantages/std", b_advantages.std().item(), global_step)
writer.add_scalar("advantages/max", b_advantages.max().item(), global_step)
writer.add_scalar("advantages/min", b_advantages.min().item(), global_step)
# 记录回报统计信息
writer.add_scalar("returns/mean", b_returns.mean().item(), global_step)
writer.add_scalar("returns/std", b_returns.std().item(), global_step)
writer.add_scalar("returns/max", b_returns.max().item(), global_step)
writer.add_scalar("returns/min", b_returns.min().item(), global_step)
if run_test_env is not None and update % 125 == 0:
# 记录测试环境的性能
writer.add_scalar("test/cost", cost, global_step)
writer.add_scalar("test/best_cost", best_cost, global_step)
# 计算并记录策略的行为统计信息
actions_numpy = b_actions.cpu().numpy()
writer.add_scalar("actions/mean", np.mean(actions_numpy), global_step)
writer.add_scalar("actions/std", np.std(actions_numpy), global_step)
writer.add_scalar("actions/max", np.max(actions_numpy), global_step)
writer.add_scalar("actions/min", np.min(actions_numpy), global_step)
# 记录训练速度
sps = int(global_step / (time.time() - start_time))
writer.add_scalar("performance/steps_per_second", sps, global_step)
writer.add_scalar("performance/epoch", update, global_step)
if update % 10 == 0: # 每10次更新打印一次日志
print(f"[Update {update}/{num_updates}] "
f"Loss: {loss.item():.4f} "
f"Entropy: {entropy_item:.4f} "
f"KL: {approx_kl.item():.4f} "
f"Steps: {global_step} "
f"SPS: {sps}")
agent.save(best_agent)
envs.close()
writer.close()
return train_losses, train_returns, test_costs, best_agent
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。