一文读懂策略梯度算法:REINFORCE、Actor-Critic、A2C

学术   科技   2023-07-31 18:58   河北  

策略梯度和REINFORCE算法

前言

策略梯度目标函数:

策略梯度目标是学习一种策略,使得从任何给定时间开始,累积未来收益最大化。是在状态执行动作所获得的奖励;,其中R是奖励函数。由于这是一个最大化问题,我们通过计算目标函数对于策略参数的偏导数,并计算梯度来优化策略。

期望公式:

策略梯度方法推导

假设采用的策略为,策略可参数化为,目标函数设为,表示为:

设状态,动作按照策略的行动轨迹为,并用概率的形式表示每次的行动,那么改写为:

对目标函数求梯度:

求导公式:.

根据定义可知:

(由于右边是连乘的关系)两边同时求对数

等式右边展开

两边同时对策略的参数求梯度

由于已知得到的过程与策略无关,那么与无关,那么

整理后可得:

将该结论代入中:

展开:

综上所述:

注意到该式最后一项

实际上就是使用蒙特卡洛算法中的,因此进行替换:

的计算过程中考虑折扣因子,目标函数重写为:

上述结论因此变为:

上述推导即为Reinforce算法的推导过程,总结得到Reinforce的伪代码为:

图源:https://i.stack.imgur.com

Pytorch实现

"""
 @Author: w
 @Email: xxx@xxx.com
 @FileName: reinforce_web.py
 @DateTime: 2023/7/30 18:33
 @SoftWare: PyCharm
"""


import sys
import torch
import gym
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt

# Constants
GAMMA = 0.9

class PolicyNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-4):
        super(PolicyNetwork, self).__init__()

        self.num_actions = num_actions
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, num_actions)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.softmax(self.linear2(x), dim=1)
        return x

    def get_action(self, state):
        #state = torch.from_numpy(state).float().unsqueeze(0)
        state = torch.FloatTensor(state).reshape(14)

        probs = self.forward(Variable(state))
        highest_prob_action = np.random.choice(self.num_actions, p=np.squeeze(probs.detach().numpy()))
        log_prob = torch.log(probs.squeeze(0)[highest_prob_action])
        return highest_prob_action, log_prob

def update_policy(policy_network, rewards, log_probs):
    discounted_rewards = []

    for t in range(len(rewards)):
        Gt = 0
        pw = 0
        for r in rewards[t:]:
            Gt = Gt + GAMMA ** pw * r
            pw = pw + 1
        discounted_rewards.append(Gt)

    discounted_rewards = torch.tensor(discounted_rewards)
    discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (
            discounted_rewards.std() + 1e-9)  # normalize discounted rewards

    policy_gradient = []
    for log_prob, Gt in zip(log_probs, discounted_rewards):
        policy_gradient.append(-log_prob * Gt)

    policy_network.optimizer.zero_grad()
    policy_gradient = torch.stack(policy_gradient).sum()
    policy_gradient.backward()
    policy_network.optimizer.step()

def main():
    env = gym.make('CartPole-v1', render_mode='rgb_array')
    policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, 128)

    max_episode_num = 5000
    max_steps = 10000
    numsteps = []
    avg_numsteps = []
    all_rewards = []

    for episode in range(max_episode_num):
        state,_ = env.reset()
        log_probs = []
        rewards = []

        for steps in range(max_steps):
            env.render()
            action, log_prob = policy_net.get_action(state)
            new_state, reward, done, truncated, info  = env.step(action)
            log_probs.append(log_prob)
            rewards.append(reward)

            if done:
                update_policy(policy_net, rewards, log_probs)
                numsteps.append(steps)
                avg_numsteps.append(np.mean(numsteps[-10:]))
                all_rewards.append(np.sum(rewards))
                if episode % 1 == 0:
                    sys.stdout.write("episode: {}, total reward: {}, average_reward: {}, length: {}\n".format(episode,np.round(np.sum(rewards),decimals=3),np.round(np.mean(all_rewards[-10:]),decimals=3),steps))
                break

            state = new_state

    plt.plot(numsteps)
    plt.plot(avg_numsteps)
    plt.xlabel('Episode')
    plt.show()
if __name__ == "__main__":
    main()

Actor-Critic and A2C A3C

直接使用策略梯度方法的缺点也很明显,具体包括:

  • 策略梯度算法的收敛速度慢
  • 高方差

在轨迹的每个时间步中,实际中都会观察到一些随机事件。每个事件都有带来一些噪声,并且即使在很多时间步骤中累积一小部分噪声,也会导致高方差的结果。也正是由于高方差的原因,使得策略梯度算法收敛速度进一步变慢。

Actor Critic

策略梯度算法中策略的梯度最终计算为:

可以将期望值分解为:

最后一项实际上就是Q值

如同DQN算法,Q值可以通过用神经网络参数化Q函数并来进行学习。

这将引导我们进入Actor Critic方法,其中:

“评论者”估计值函数。这可以是动作值的Q值或状态值的V值。

“演员”根据评论者的建议更新策略分布(如使用策略梯度)。

评论者和演员函数都可以使用神经网络进行参数化。

图源:https://i.stack.imgur.com

Advantage Actor Critic

刚刚提到REINFORCE算法存在高方差问题,通过减去一个基准值,可以降低方差并增加稳定性:


为什么减去一个基准值就可以降低高方差带来的影响?

策略梯度方法中,我们选择的奖励函数会对优化步骤产生很大影响。对于一个轨迹 ,如果它的回报 是负数,我们会选择在相反的梯度方向上进行优化步骤,这会减小该轨迹的概率密度。相反,对于回报为正数的轨迹,它们的概率密度会增加。

然而,如果我们简单地将 ,其中 是一个足够大的常数,使得 变为正数,那么我们实际上会增加轨迹 的概率权重,尽管它的回报仍然比之前的正数回报轨迹要差。由于模型对选择的奖励函数的平移和缩放非常敏感,我们自然会想到是否能找到一个最优的 ,使得以下表达式的方差最小:

称为基线(baseline)。同时,也希望通过这种方式减去 不会对梯度估计产生偏差。首先,策略梯度推导中有恒等式:

为了证明估计仍然是无偏的,还需要证明:

可以等价地证明,证明过程为:

其中我们使用了,因为  是概率分布。因此,我们增加了基线后的策略梯度仍然是无偏的。

然后问题就变成了如何选择最优的 b。最直接的选择是使用所有轨迹的平均奖励作为基线,即 。在这种情况下,我们的回报是“居中”的,优于平均水平的回报将被正向加权,而劣于平均水平的回报将被负向加权。实际上,这种方法效果很好,但它并不是最优的。

为了计算最优设置,计算策略梯度的方差。根据概率论的基本公式:

表达式最右边的项是策略梯度的平方,而在优化 的过程中,可以忽略这个项(但注意此时变为有偏)。

对其进行求导,得到

关于上述方程等于0,可以得到

即最优的 是预期回报与预期梯度大小的加权平均。


回到Advantage Actor Critic的推导

状态的V函数被认为是最佳的基准函数,将Q值与V值相减,得到A,A表示相对于给定状态下的平均一般行动,采取特定行动有多好。称A此值为优势值:

而根据贝尔曼方程

所以

结合Actor Critic的结论,代入可得Advantage Actor Critic算法:

图源:DOI:10.1038/s41593-018-0147-8

Pytorch实现

"""
 @Author: w
 @Email: xxx@xxx.com
 @FileName: A2C_web.py
 @DateTime: 2023/7/30 21:40
 @SoftWare: PyCharm
"""

import sys
import torch
import gym
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt
import pandas as pd

# hyperparameters
hidden_size = 256
learning_rate = 3e-4

# Constants
GAMMA = 0.99
num_steps = 300
max_episodes = 3000

class ActorCritic(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-4):
        super(ActorCritic, self).__init__()

        self.num_actions = num_actions
        self.critic_linear1 = nn.Linear(num_inputs, hidden_size)
        self.critic_linear2 = nn.Linear(hidden_size, 1)

        self.actor_linear1 = nn.Linear(num_inputs, hidden_size)
        self.actor_linear2 = nn.Linear(hidden_size, num_actions)

    def forward(self, state):
        state = Variable(torch.from_numpy(state).float().unsqueeze(0))
        value = F.relu(self.critic_linear1(state))
        value = self.critic_linear2(value)

        policy_dist = F.relu(self.actor_linear1(state))
        policy_dist = F.softmax(self.actor_linear2(policy_dist), dim=1)

        return value, policy_dist

def a2c(env):
    num_inputs = env.observation_space.shape[0]
    num_outputs = env.action_space.n

    actor_critic = ActorCritic(num_inputs, num_outputs, hidden_size)
    ac_optimizer = optim.Adam(actor_critic.parameters(), lr=learning_rate)

    all_lengths = []
    average_lengths = []
    all_rewards = []
    entropy_term = 0

    for episode in range(max_episodes):
        log_probs = []
        values = []
        rewards = []

        state,_ = env.reset()
        for steps in range(num_steps):
            value, policy_dist = actor_critic.forward(state)
            value = value.detach().numpy()[00]
            dist = policy_dist.detach().numpy()

            action = np.random.choice(num_outputs, p=np.squeeze(dist))
            log_prob = torch.log(policy_dist.squeeze(0)[action])
            entropy = -np.sum(np.mean(dist) * np.log(dist))
            new_state, reward, done, truncated, info   = env.step(action)

            rewards.append(reward)
            values.append(value)
            log_probs.append(log_prob)
            entropy_term += entropy
            state = new_state

            if done or steps == num_steps - 1:
                Qval, _ = actor_critic.forward(new_state)
                Qval = Qval.detach().numpy()[00]
                all_rewards.append(np.sum(rewards))
                all_lengths.append(steps)
                average_lengths.append(np.mean(all_lengths[-10:]))
                if episode % 10 == 0:
                    sys.stdout.write("episode: {}, reward: {}, total length: {}, average length: {} \n".format(episode,np.sum(rewards),steps,average_lengths[-1]))
                break

        # compute Q values
        Qvals = np.zeros_like(values)
        for t in reversed(range(len(rewards))):
            Qval = rewards[t] + GAMMA * Qval
            Qvals[t] = Qval

        # update actor critic
        values = torch.FloatTensor(values)
        Qvals = torch.FloatTensor(Qvals)
        log_probs = torch.stack(log_probs)

        advantage = Qvals - values
        actor_loss = (-log_probs * advantage).mean()
        critic_loss = 0.5 * advantage.pow(2).mean()
        ac_loss = actor_loss + critic_loss + 0.001 * entropy_term

        ac_optimizer.zero_grad()
        ac_loss.backward()
        ac_optimizer.step()

    # Plot results
    smoothed_rewards = pd.Series.rolling(pd.Series(all_rewards), 10).mean()
    smoothed_rewards = [elem for elem in smoothed_rewards]
    plt.plot(all_rewards)
    plt.plot(smoothed_rewards)
    plt.plot()
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.show()

    plt.plot(all_lengths)
    plt.plot(average_lengths)
    plt.xlabel('Episode')
    plt.ylabel('Episode length')
    plt.show()

if __name__ == "__main__":
    env = gym.make("CartPole-v1")
    a2c(env)

Asynchronous Advantage Actor Critic

从本质上讲,A3C实现了并行训练,其中多个并行环境中的Actor独立更新全局值函数,因此是“异步”的。异步的优点是对状态空间进行了有效和高效的探索。

  • A3C使用N个智能体并行工作,收集样本并计算策略和价值函数的梯度
  • 一段时间后,将梯度传递给主网络,使用所有代理的梯度来更新Actor和Critic
  • 一段时间后,智能体复制全局网络的权重
  • 这种并行性使代理的数据无相关性,因此不需要经验重放缓冲区
  • 甚至可以明确在每个Actor和Critic 中使用不同的探索策略以最大限度地提高多样性
  • 异步化可以扩展到其他更新机制(SARSA、Q-learning等),但在Advantage Actor critic模式下效果更好

图源:https://pylessons.com/A3C-reinforcement-learning

图源:https://cse.buffalo.edu/~avereshc/rl_fall19/lecture_20_Actor_Critic_Methods.pdf

如需要上述代码的requirements.txt,请关注公众号后回复“策略梯度requirements.txt”

控我所思VS制之以衡
专注于控制理论、控制工程、数学、运筹、算法等方面的经验积累与分享
 最新文章