PolicyGradient算法玩CartPole和MountainCar代码Pytorch版本
参考链接
代码参考了莫烦python的policy gradient算法,特别感谢!!!
库版本号
gym0.26.2
gym-notices 0.0.8
pytorch 1.11.0 + cu115
python 3.8.12
gym0.26.2版本用法略有不同,涉及到step函数、render函数渲染图像方法,我会额外开一篇帖子介绍
PolicyGradient决策代码
玩CartPole倒立摆和玩MountainCar的PolicyGradient算法决策代码是一样的,也就是下面的RL_brain.py文件
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
np.random.seed(1)
torch.manual_seed(1)
class NetWork(nn.Module):
"""
神经网络结构
# 全连接1
# 全连接2
# ReLU
"""
def __init__(self,
n_actions,
n_features,
n_neuron=10):
super(NetWork, self).__init__()
self.net = nn.Sequential(
nn.Linear(in_features=n_features,
out_features=n_neuron,
bias=True),
nn.Linear(in_features=n_neuron,
out_features=n_actions,
bias=True),
nn.ReLU()
)
def forward(self, x):
return self.net(x)
class PolicyGradient:
"""
PolicyGradient算法
"""
# 初始化
def __init__(self,
n_actions,
n_features,
n_neuron=10,
learning_rate=0.01,
reward_decay=0.95):
self.n_actions = n_actions
self.n_features = n_features
self.n_neuron = n_neuron
self.lr = learning_rate
self.gamma = reward_decay
# 之前Q-learning算法定义一个memory共同存储observation action reward
# 这里定义三个memory分别存储observation action reward
# self.ep_obs存储observation
# self.ep_as存储action
# self.ep_rs存储reward
# learn网络的时候用memory里边全部内容学习,没有batch_size学习,相对简单些
self.ep_obs, self.ep_as, self.ep_rs = [], [], []
self.net = NetWork(n_actions=self.n_actions,
n_features=self.n_features,
n_neuron=self.n_neuron)
self.optimizer = torch.optim.Adam(params=self.net.parameters(),
lr=self.lr)
# 选行为(和Q-learning算法相比有改变)
def choose_action(self, observation):
s = torch.FloatTensor(observation)
out = self.net(s) # 给net一个输入
prob_weights = F.softmax(out, dim=0)
prob_weights = prob_weights.detach().numpy()
# 根据概率抽样得到一个action
action = np.random.choice(range(prob_weights.shape[0]), p=prob_weights)
return action
# 存储回合 transition(和Q-learning算法相比有改变)
def store_transition(self, s, a, r):
self.ep_obs.append(s)
self.ep_as.append(a) # 这句是什么意思
self.ep_rs.append(r)
# 学习更新参数(和Q-learning算法相比有改变)
def learn(self):
# discount and normalize episode reward
discounted_ep_rs_norm = self._discount_and_norm_rewards()
# 转换成torch.tensor数据类型
s = torch.FloatTensor(np.vstack(self.ep_obs))
action = torch.LongTensor(np.stack(self.ep_as))
discounted_ep_rs_norm = torch.FloatTensor(discounted_ep_rs_norm)
# net输出
out = self.net(s)
# train on episode
# loss = nn.CrossEntropyLoss(reduction='none')(out, action) 没有带weight
neg_log_prob = nn.CrossEntropyLoss(reduction='none')(out, action)
loss = torch.mean(neg_log_prob * discounted_ep_rs_norm)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# empty episode data
self.ep_obs, self.ep_as, self.ep_rs = [], [], []
return discounted_ep_rs_norm.detach().numpy()
# 衰减回合的reward(和Q-learning算法相比有新内容)
def _discount_and_norm_rewards(self):
# discount episode rewards
discounted_ep_rs = np.zeros_like(self.ep_rs)
running_add = 0
for t in reversed(range(0, len(self.ep_rs))):
running_add = running_add * self.gamma + self.ep_rs[t]
discounted_ep_rs[t] = running_add
# normalize episode rewards
discounted_ep_rs -= np.mean(discounted_ep_rs)
discounted_ep_rs /= np.std(discounted_ep_rs)
return discounted_ep_rs
玩CartPole倒立摆控制代码
run_CartPole.py
"""
gym: 0.26.2
gym-notices 0.0.8
"""
import gym
from RL_brain import PolicyGradient
import matplotlib.pyplot as plt
RENDER = False # 在屏幕上显示模拟窗口会拖慢运行速度 我们等计算机学得差不多了再显示模拟
DISPLAY_REWARD_THRESHOLD = 10000 # 当回合总reward大于400时显示模拟窗口
env = gym.make('CartPole-v1')
# env = gym.make('CartPole-v1', render_mode='human') # CartPole模拟
env.reset(seed=1) # 普通的Policy Gradient方法,使得回合的variance比较大,所以我们选了
env = env.unwrapped # 取消限制
print(env.action_space) # 显示可用action
print(env.observation_space) # 显示可用state的observation
print(env.observation_space.high) # 显示observation最高值
print(env.observation_space.low) # 显示observation最低值
# 定义
RL = PolicyGradient(
n_actions=env.action_space.n,
n_features=env.observation_space.shape[0],
n_neuron=20,
learning_rate=0.02,
reward_decay=0.99, # gamma
)
for i_episode in range(424):
observation, info = env.reset()
while True:
if RENDER:
env_test = gym.make('CartPole-v1', render_mode='human')
env_test.reset()
# env.render()
action = RL.choose_action(observation)
observation_, reward, done, truncated, info = env.step(action)
RL.store_transition(observation, action, reward) # 存储这一回合的transition
if done:
ep_rs_sum = sum(RL.ep_rs)
if 'running_reward' not in globals():
running_reward = ep_rs_sum
else:
running_reward = running_reward * 0.99 + ep_rs_sum * 0.01
if running_reward > DISPLAY_REWARD_THRESHOLD:
RENDER = True # 判断是否显示
print('episode:', i_episode, ' reward:', int(running_reward))
vt = RL.learn() # 学习,输出vt,我们下节课讲这个vt的作用
if i_episode == 423:
plt.plot(vt) # plot这个回合的vt
plt.xlabel('episode steps')
plt.ylabel('normalized state-action value')
plt.show()
break
observation = observation_
玩MountainCar小车控制代码
run_MountainCar.py
import gym
from RL_brain import PolicyGradient
import matplotlib.pyplot as plt
DISPLAY_REWARD_THRESHOLD = 200 # renders environment if total episode reward is greater than this threshold
# episode: 154 reward: -10667
# episode: 387 reward: -2009
# episode: 489 reward: -1006
# episdoe: 628 reward: -502
RENDER = False # rendering wastes time
env = gym.make('MountainCar-v0')
env.reset(seed=1) # reproducible, general Policy gradient has high variance
env = env.unwrapped
print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)
RL = PolicyGradient(
n_actions=env.action_space.n,
n_features=env.observation_space.shape[0],
n_neuron=20,
learning_rate=0.02,
reward_decay=0.995,
)
for i_episode in range(31):
observation, info = env.reset()
while True:
if RENDER:
# env.render()
pass
action = RL.choose_action(observation)
observation_, reward, done, truncated, info = env.step(action) # reward = -1 in all cases
RL.store_transition(observation, action, reward)
if done:
# 每个回合结束之后更新网络参数
ep_rs_sum = sum(RL.ep_rs)
if 'running_reward' not in globals():
# 如果当前模块不包含running_reward
running_reward = ep_rs_sum
else:
running_reward = running_reward * 0.99 + ep_rs_sum * 0.01
if running_reward > DISPLAY_REWARD_THRESHOLD:
RENDER = True # rendering
print('episode:', i_episode, ' reward:', int(running_reward))
vt = RL.learn() # train
if i_episode == 30:
plt.plot(vt) # plot the episode vt
plt.xlabel('episode steps')
plt.ylabel('normalized state-action value')
plt.show()
break
observation = observation_