PolicyGradient算法玩CartPole和MountainCar代码Pytorch版本

参考链接

代码参考了莫烦python的policy gradient算法,特别感谢!!!

库版本号

gym0.26.2
gym-notices 0.0.8
pytorch 1.11.0 + cu115
python 3.8.12

gym0.26.2版本用法略有不同,涉及到step函数、render函数渲染图像方法,我会额外开一篇帖子介绍

PolicyGradient决策代码

玩CartPole倒立摆和玩MountainCar的PolicyGradient算法决策代码是一样的,也就是下面的RL_brain.py文件

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

np.random.seed(1)
torch.manual_seed(1)


class NetWork(nn.Module):
    """
    神经网络结构
    # 全连接1
    # 全连接2
    # ReLU
    """
    def __init__(self,
                 n_actions,
                 n_features,
                 n_neuron=10):
        super(NetWork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(in_features=n_features,
                      out_features=n_neuron,
                      bias=True),
            nn.Linear(in_features=n_neuron,
                      out_features=n_actions,
                      bias=True),
            nn.ReLU()
        )

    def forward(self, x):
        return self.net(x)


class PolicyGradient:
    """
    PolicyGradient算法
    """
    # 初始化
    def __init__(self,
                 n_actions,
                 n_features,
                 n_neuron=10,
                 learning_rate=0.01,
                 reward_decay=0.95):
        self.n_actions = n_actions
        self.n_features = n_features
        self.n_neuron = n_neuron
        self.lr = learning_rate
        self.gamma = reward_decay

        # 之前Q-learning算法定义一个memory共同存储observation action reward
        # 这里定义三个memory分别存储observation action reward
        # self.ep_obs存储observation
        # self.ep_as存储action
        # self.ep_rs存储reward
        # learn网络的时候用memory里边全部内容学习,没有batch_size学习,相对简单些
        self.ep_obs, self.ep_as, self.ep_rs = [], [], []

        self.net = NetWork(n_actions=self.n_actions,
                           n_features=self.n_features,
                           n_neuron=self.n_neuron)
        self.optimizer = torch.optim.Adam(params=self.net.parameters(),
                                      lr=self.lr)

    # 选行为(和Q-learning算法相比有改变)
    def choose_action(self, observation):
        s = torch.FloatTensor(observation)
        out = self.net(s)  # 给net一个输入
        prob_weights = F.softmax(out, dim=0)
        prob_weights = prob_weights.detach().numpy()
        # 根据概率抽样得到一个action
        action = np.random.choice(range(prob_weights.shape[0]), p=prob_weights)
        return action

    # 存储回合 transition(和Q-learning算法相比有改变)
    def store_transition(self, s, a, r):
        self.ep_obs.append(s)
        self.ep_as.append(a)  # 这句是什么意思
        self.ep_rs.append(r)

    # 学习更新参数(和Q-learning算法相比有改变)
    def learn(self):
        # discount and normalize episode reward
        discounted_ep_rs_norm = self._discount_and_norm_rewards()

        # 转换成torch.tensor数据类型
        s = torch.FloatTensor(np.vstack(self.ep_obs))
        action = torch.LongTensor(np.stack(self.ep_as))

        discounted_ep_rs_norm = torch.FloatTensor(discounted_ep_rs_norm)

        # net输出
        out = self.net(s)

        # train on episode
        # loss = nn.CrossEntropyLoss(reduction='none')(out, action) 没有带weight
        neg_log_prob = nn.CrossEntropyLoss(reduction='none')(out, action)
        loss = torch.mean(neg_log_prob * discounted_ep_rs_norm)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # empty episode data
        self.ep_obs, self.ep_as, self.ep_rs = [], [], []
        return discounted_ep_rs_norm.detach().numpy()

    # 衰减回合的reward(和Q-learning算法相比有新内容)
    def _discount_and_norm_rewards(self):
        # discount episode rewards
        discounted_ep_rs = np.zeros_like(self.ep_rs)
        running_add = 0
        for t in reversed(range(0, len(self.ep_rs))):
            running_add = running_add * self.gamma + self.ep_rs[t]
            discounted_ep_rs[t] = running_add

        # normalize episode rewards
        discounted_ep_rs -= np.mean(discounted_ep_rs)
        discounted_ep_rs /= np.std(discounted_ep_rs)
        return discounted_ep_rs

玩CartPole倒立摆控制代码

run_CartPole.py

"""
gym: 0.26.2
gym-notices 0.0.8
"""

import gym
from RL_brain import PolicyGradient
import matplotlib.pyplot as plt

RENDER = False  # 在屏幕上显示模拟窗口会拖慢运行速度 我们等计算机学得差不多了再显示模拟
DISPLAY_REWARD_THRESHOLD = 10000  # 当回合总reward大于400时显示模拟窗口

env = gym.make('CartPole-v1')
# env = gym.make('CartPole-v1', render_mode='human')  # CartPole模拟
env.reset(seed=1)  # 普通的Policy Gradient方法,使得回合的variance比较大,所以我们选了
env = env.unwrapped  # 取消限制


print(env.action_space)  # 显示可用action
print(env.observation_space)  # 显示可用state的observation
print(env.observation_space.high)  # 显示observation最高值
print(env.observation_space.low)  # 显示observation最低值


# 定义
RL = PolicyGradient(
    n_actions=env.action_space.n,
    n_features=env.observation_space.shape[0],
    n_neuron=20,
    learning_rate=0.02,
    reward_decay=0.99,  # gamma
)

for i_episode in range(424):

    observation, info = env.reset()

    while True:
        if RENDER:
            env_test = gym.make('CartPole-v1', render_mode='human')
            env_test.reset()

            # env.render()

        action = RL.choose_action(observation)

        observation_, reward, done, truncated, info = env.step(action)

        RL.store_transition(observation, action, reward)  # 存储这一回合的transition

        if done:
            ep_rs_sum = sum(RL.ep_rs)

            if 'running_reward' not in globals():
                running_reward = ep_rs_sum
            else:
                running_reward = running_reward * 0.99 + ep_rs_sum * 0.01

            if running_reward > DISPLAY_REWARD_THRESHOLD:
                RENDER = True  # 判断是否显示

            print('episode:', i_episode, ' reward:', int(running_reward))

            vt = RL.learn()  # 学习,输出vt,我们下节课讲这个vt的作用

            if i_episode == 423:
                plt.plot(vt)  # plot这个回合的vt
                plt.xlabel('episode steps')
                plt.ylabel('normalized state-action value')
                plt.show()
            break

        observation = observation_

玩MountainCar小车控制代码

run_MountainCar.py

import gym
from RL_brain import PolicyGradient
import matplotlib.pyplot as plt


DISPLAY_REWARD_THRESHOLD = 200  # renders environment if total episode reward is greater than this threshold

# episode: 154  reward: -10667
# episode: 387  reward: -2009
# episode: 489  reward: -1006
# episdoe: 628  reward: -502

RENDER = False  # rendering wastes time

env = gym.make('MountainCar-v0')
env.reset(seed=1)  # reproducible, general Policy gradient has high variance
env = env.unwrapped

print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

RL = PolicyGradient(
    n_actions=env.action_space.n,
    n_features=env.observation_space.shape[0],
    n_neuron=20,
    learning_rate=0.02,
    reward_decay=0.995,
)

for i_episode in range(31):

    observation, info = env.reset()

    while True:
        if RENDER:
            # env.render()
            pass

        action = RL.choose_action(observation)

        observation_, reward, done, truncated, info = env.step(action)  # reward = -1 in all cases

        RL.store_transition(observation, action, reward)

        if done:
            # 每个回合结束之后更新网络参数
            ep_rs_sum = sum(RL.ep_rs)
            if 'running_reward' not in globals():
                # 如果当前模块不包含running_reward
                running_reward = ep_rs_sum
            else:
                running_reward = running_reward * 0.99 + ep_rs_sum * 0.01

            if running_reward > DISPLAY_REWARD_THRESHOLD:
                RENDER = True  # rendering

            print('episode:', i_episode, ' reward:', int(running_reward))

            vt = RL.learn()  # train

            if i_episode == 30:
                plt.plot(vt)  # plot the episode vt
                plt.xlabel('episode steps')
                plt.ylabel('normalized state-action value')
                plt.show()

            break

        observation = observation_