1 Star 0 Fork 1

zhou_leo/PPO-Pyorch

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
PPO.py 9.74 KB
一键复制 编辑 原始数据 按行查看 历史
Leo 提交于 2023-07-04 22:02 . modified show the execution time
import torch
import torch.nn as nn
from torch.distributions import Categorical
import gym #pip install box2d box2d-kengz --user
import time
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Memory:
def __init__(self):
self.actions = []
self.states = []
self.logprobs = [] # 对数概率
self.rewards = []
self.is_terminals = []
def clear_memory(self):
del self.actions[:]
del self.states[:]
del self.logprobs[:]
del self.rewards[:]
del self.is_terminals[:]
class ActorCritic(nn.Module):
def __init__(self, state_dim, action_dim, n_latent_var):
super(ActorCritic, self).__init__() # 调用父类构造函数
# actor
self.action_layer = nn.Sequential(
nn.Linear(state_dim, n_latent_var), # 线性层nn.Linear
nn.Tanh(), # 激活函数
nn.Linear(n_latent_var, n_latent_var), # 隐藏层的维度是 n_latent_var
nn.Tanh(),
nn.Linear(n_latent_var, action_dim),
nn.Softmax(dim=-1) # 应用层归一化,dim=-1 意味着 softmax 函数将在张量的最后一个维度上进行归一化操作
)
# critic
self.value_layer = nn.Sequential(
nn.Linear(state_dim, n_latent_var),
nn.Tanh(),
nn.Linear(n_latent_var, n_latent_var),
nn.Tanh(),
nn.Linear(n_latent_var, 1)
)
def forward(self):
raise NotImplementedError
def act(self, state, memory):
state = torch.from_numpy(state).float().to(device) # 将NumPy数组转换为浮点型(float)的PyTorch张量并将其移到指定的GPU上
action_probs = self.action_layer(state) # 执行了一个前向传播操作,将状态输入到神经网络中进行计算,得到动作的概率分布 action_probs。
dist = Categorical(action_probs) # 创建一个离散概率分布对象 dist
action = dist.sample() # 按照给定的概率分布来进行采样
# 将历史记录到memory,对象的赋值是共用内存的
memory.states.append(state)
memory.actions.append(action)
memory.logprobs.append(dist.log_prob(action)) # 给定动作的对数概率,添加到 memory.logprobs 列表
return action.item()
def evaluate(self, state, action):
"""
Evaluates the given state-action pair using the policy network.
Args:
state (torch.Tensor): Input state tensor.
action (torch.Tensor): Input action tensor.
Returns:
action_logprobs (torch.Tensor): Log probabilities of the given actions under the policy.
state_value (torch.Tensor): Value estimate of the given state.
dist_entropy (torch.Tensor): Entropy of the action distribution.
"""
action_probs = self.action_layer(state) # 执行前向传播:状态 -> Actor神经网络 -> 动作概率
dist = Categorical(action_probs) # 基于概率获取动作分布
action_logprobs = dist.log_prob(action) # 计算给定动作的对数概率
dist_entropy = dist.entropy() # 计算动作分布的熵
state_value = self.value_layer(state) # 估计给定状态的值
return action_logprobs, torch.squeeze(state_value), dist_entropy
class PPO:
def __init__(self, state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip):
self.lr = lr
self.betas = betas
self.gamma = gamma
self.eps_clip = eps_clip
self.K_epochs = K_epochs
self.policy = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
# 创建了一个Adam优化器(optimizer),用于更新PPO算法中的策略网络(policy)的参数。
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas)
# 创建旧的策略网络
self.policy_old = ActorCritic(state_dim, action_dim, n_latent_var).to(device)
self.policy_old.load_state_dict(self.policy.state_dict())
self.MseLoss = nn.MSELoss() # 定义一个均方误差(Mean Squared Error,MSE)损失函数
def update(self, memory):
# Monte Carlo estimate of state rewards:
rewards = []
discounted_reward = 0 # 折扣奖励
for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
# 逆向
if is_terminal:
discounted_reward = 0 # 游戏重置,清除折扣奖励
discounted_reward = reward + (self.gamma * discounted_reward) # gamma折扣因子,对未来奖励的重视程度
rewards.insert(0, discounted_reward)
# Normalizing the rewards:
rewards = torch.tensor(rewards, dtype=torch.float32).to(device) # 将处理过的奖励转换为张量Tensor
rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5) # 归一化处理
# convert list to tensor
old_states = torch.stack(memory.states).to(device).detach() # 将堆叠的保存tensor的list堆叠为一个tensor
old_actions = torch.stack(memory.actions).to(device).detach()
old_logprobs = torch.stack(memory.logprobs).to(device).detach()
# Optimize policy for K epochs: 更新K次参数
for _ in range(self.K_epochs):
# Evaluating old actions and values :
logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)
# 给定动作的对数概率, 估计给定状态的值, 计算动作分布的熵
# Finding the ratio (pi_theta / pi_theta__old):
ratios = torch.exp(logprobs - old_logprobs.detach())
# Finding Surrogate Loss: PPO最核心的部分,损失函数PPO2
advantages = rewards - state_values.detach()
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1-self.eps_clip, 1+self.eps_clip) * advantages
loss = -torch.min(surr1, surr2) + 0.5*self.MseLoss(state_values, rewards) - 0.01*dist_entropy
# take gradient step
self.optimizer.zero_grad() # 梯度下降
loss.mean().backward() # 前传
self.optimizer.step()
# Copy new weights into old policy: # 将更新过的权重参数复制给policy_old继续下一轮的循环迭代
self.policy_old.load_state_dict(self.policy.state_dict())
def main():
############## Hyperparameters ##############
env_name = "LunarLander-v2"
# creating environment
env = gym.make(env_name)
state_dim = env.observation_space.shape[0]
action_dim = 4
render = False
solved_reward = 230 # stop training if avg_reward > solved_reward
log_interval = 20 # print avg reward in the interval
max_episodes = 50000 # max training episodes
max_timesteps = 300 # max timesteps in one episode
n_latent_var = 64 # number of variables in hidden layer
update_timestep = 2000 # update policy every n timesteps
lr = 0.002 # learning rate
betas = (0.9, 0.999)
gamma = 0.99 # discount factor
K_epochs = 4 # update policy for K epochs
eps_clip = 0.2 # clip parameter for PPO
random_seed = 48 # 锁定随机种子方便观察
#############################################
print("训练开始")
start_time = time.time()
if random_seed:
torch.manual_seed(random_seed)
env.seed(random_seed)
memory = Memory()
ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip)
#print(lr,betas)
# logging variables
running_reward = 0
avg_length = 0
timestep = 0
# training loop
for i_episode in range(1, max_episodes+1):
state = env.reset() # 初始化(重新玩)
for t in range(max_timesteps):
timestep += 1
# Running policy_old:
action = ppo.policy_old.act(state, memory)
state, reward, done, _ = env.step(action) # 运行一个step得到(新的状态,奖励,是否终止,额外的调试信息)
# Saving reward and is_terminal:
memory.rewards.append(reward)
memory.is_terminals.append(done)
# update if its time
if timestep % update_timestep == 0:
ppo.update(memory) # PPO算法核心,每update_timestep个timestep更新一次policy神经网络
memory.clear_memory() # 清空记忆区
timestep = 0
running_reward += reward
if render:
env.render()
if done:
break
avg_length += t
# stop training if avg_reward > solved_reward
if running_reward > (log_interval*solved_reward):
print("########## Solved! ##########")
torch.save(ppo.policy.state_dict(), './PPO_{}.pth'.format(env_name))
break
# logging
if i_episode % log_interval == 0:
avg_length = int(avg_length/log_interval)
running_reward = int((running_reward/log_interval))
print('Episode {} \t avg length: {} \t reward: {}'.format(i_episode, avg_length, running_reward))
running_reward = 0
avg_length = 0
print("训练结束")
end_time = time.time()
train_time = end_time - start_time
print("训练时间:", train_time,"秒")
if __name__ == '__main__':
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhou-leo/ppo-pyorch.git
git@gitee.com:zhou-leo/ppo-pyorch.git
zhou-leo
ppo-pyorch
PPO-Pyorch
master

搜索帮助