策略梯度和REINFORCE算法
前言
策略梯度目标函数:
策略梯度目标是学习一种策略,使得从任何给定时间开始,累积未来收益最大化。是在状态执行动作所获得的奖励;,其中R是奖励函数。由于这是一个最大化问题,我们通过计算目标函数对于策略参数的偏导数,并计算梯度来优化策略。
期望公式:
策略梯度方法推导
假设采用的策略为,策略可参数化为,目标函数设为,表示为:
设状态,动作按照策略的行动轨迹为,并用概率的形式表示每次的行动,那么改写为:
对目标函数求梯度:
“求导公式:.
”
根据定义可知:
(由于右边是连乘的关系)两边同时求对数
等式右边展开
两边同时对策略的参数求梯度
由于已知得到的过程与策略无关,那么与无关,那么
整理后可得:
将该结论代入中:
展开:
综上所述:
注意到该式最后一项
实际上就是使用蒙特卡洛算法中的,因此进行替换:
在的计算过程中考虑折扣因子,目标函数重写为:
上述结论因此变为:
上述推导即为Reinforce算法的推导过程,总结得到Reinforce的伪代码为:
图源:https://i.stack.imgur.com
Pytorch实现
"""
@Author: w
@Email: xxx@xxx.com
@FileName: reinforce_web.py
@DateTime: 2023/7/30 18:33
@SoftWare: PyCharm
"""
import sys
import torch
import gym
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt
# Constants
GAMMA = 0.9
class PolicyNetwork(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-4):
super(PolicyNetwork, self).__init__()
self.num_actions = num_actions
self.linear1 = nn.Linear(num_inputs, hidden_size)
self.linear2 = nn.Linear(hidden_size, num_actions)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def forward(self, state):
x = F.relu(self.linear1(state))
x = F.softmax(self.linear2(x), dim=1)
return x
def get_action(self, state):
#state = torch.from_numpy(state).float().unsqueeze(0)
state = torch.FloatTensor(state).reshape(1, 4)
probs = self.forward(Variable(state))
highest_prob_action = np.random.choice(self.num_actions, p=np.squeeze(probs.detach().numpy()))
log_prob = torch.log(probs.squeeze(0)[highest_prob_action])
return highest_prob_action, log_prob
def update_policy(policy_network, rewards, log_probs):
discounted_rewards = []
for t in range(len(rewards)):
Gt = 0
pw = 0
for r in rewards[t:]:
Gt = Gt + GAMMA ** pw * r
pw = pw + 1
discounted_rewards.append(Gt)
discounted_rewards = torch.tensor(discounted_rewards)
discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (
discounted_rewards.std() + 1e-9) # normalize discounted rewards
policy_gradient = []
for log_prob, Gt in zip(log_probs, discounted_rewards):
policy_gradient.append(-log_prob * Gt)
policy_network.optimizer.zero_grad()
policy_gradient = torch.stack(policy_gradient).sum()
policy_gradient.backward()
policy_network.optimizer.step()
def main():
env = gym.make('CartPole-v1', render_mode='rgb_array')
policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, 128)
max_episode_num = 5000
max_steps = 10000
numsteps = []
avg_numsteps = []
all_rewards = []
for episode in range(max_episode_num):
state,_ = env.reset()
log_probs = []
rewards = []
for steps in range(max_steps):
env.render()
action, log_prob = policy_net.get_action(state)
new_state, reward, done, truncated, info = env.step(action)
log_probs.append(log_prob)
rewards.append(reward)
if done:
update_policy(policy_net, rewards, log_probs)
numsteps.append(steps)
avg_numsteps.append(np.mean(numsteps[-10:]))
all_rewards.append(np.sum(rewards))
if episode % 1 == 0:
sys.stdout.write("episode: {}, total reward: {}, average_reward: {}, length: {}\n".format(episode,np.round(np.sum(rewards),decimals=3),np.round(np.mean(all_rewards[-10:]),decimals=3),steps))
break
state = new_state
plt.plot(numsteps)
plt.plot(avg_numsteps)
plt.xlabel('Episode')
plt.show()
if __name__ == "__main__":
main()
Actor-Critic and A2C A3C
直接使用策略梯度方法的缺点也很明显,具体包括:
策略梯度算法的收敛速度慢 高方差
在轨迹的每个时间步中,实际中都会观察到一些随机事件。每个事件都有带来一些噪声,并且即使在很多时间步骤中累积一小部分噪声,也会导致高方差的结果。也正是由于高方差的原因,使得策略梯度算法收敛速度进一步变慢。
Actor Critic
策略梯度算法中策略的梯度最终计算为:
可以将期望值分解为:
最后一项实际上就是Q值
如同DQN算法,Q值可以通过用神经网络参数化Q函数并来进行学习。
这将引导我们进入Actor Critic方法,其中:
“评论者”估计值函数。这可以是动作值的Q值或状态值的V值。
“演员”根据评论者的建议更新策略分布(如使用策略梯度)。
评论者和演员函数都可以使用神经网络进行参数化。
图源:https://i.stack.imgur.com
Advantage Actor Critic
刚刚提到REINFORCE算法存在高方差问题,通过减去一个基准值,可以降低方差并增加稳定性:
为什么减去一个基准值就可以降低高方差带来的影响?
策略梯度方法中,我们选择的奖励函数会对优化步骤产生很大影响。对于一个轨迹 ,如果它的回报 是负数,我们会选择在相反的梯度方向上进行优化步骤,这会减小该轨迹的概率密度。相反,对于回报为正数的轨迹,它们的概率密度会增加。
然而,如果我们简单地将 ,其中 是一个足够大的常数,使得 变为正数,那么我们实际上会增加轨迹 的概率权重,尽管它的回报仍然比之前的正数回报轨迹要差。由于模型对选择的奖励函数的平移和缩放非常敏感,我们自然会想到是否能找到一个最优的 ,使得以下表达式的方差最小:
将 称为基线(baseline)。同时,也希望通过这种方式减去 不会对梯度估计产生偏差。首先,策略梯度推导中有恒等式:
为了证明估计仍然是无偏的,还需要证明:
可以等价地证明,证明过程为:
其中我们使用了,因为 是概率分布。因此,我们增加了基线后的策略梯度仍然是无偏的。
然后问题就变成了如何选择最优的 b。最直接的选择是使用所有轨迹的平均奖励作为基线,即 。在这种情况下,我们的回报是“居中”的,优于平均水平的回报将被正向加权,而劣于平均水平的回报将被负向加权。实际上,这种方法效果很好,但它并不是最优的。
为了计算最优设置,计算策略梯度的方差。根据概率论的基本公式:
表达式最右边的项是策略梯度的平方,而在优化 的过程中,可以忽略这个项(但注意此时变为有偏)。
对其进行求导,得到
将 关于上述方程等于0,可以得到
即最优的 是预期回报与预期梯度大小的加权平均。
回到Advantage Actor Critic的推导
状态的V函数被认为是最佳的基准函数,将Q值与V值相减,得到A,A表示相对于给定状态下的平均一般行动,采取特定行动有多好。称A此值为优势值:
而根据贝尔曼方程
所以
结合Actor Critic的结论,代入可得Advantage Actor Critic算法:
图源:DOI:10.1038/s41593-018-0147-8
Pytorch实现
"""
@Author: w
@Email: xxx@xxx.com
@FileName: A2C_web.py
@DateTime: 2023/7/30 21:40
@SoftWare: PyCharm
"""
import sys
import torch
import gym
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt
import pandas as pd
# hyperparameters
hidden_size = 256
learning_rate = 3e-4
# Constants
GAMMA = 0.99
num_steps = 300
max_episodes = 3000
class ActorCritic(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-4):
super(ActorCritic, self).__init__()
self.num_actions = num_actions
self.critic_linear1 = nn.Linear(num_inputs, hidden_size)
self.critic_linear2 = nn.Linear(hidden_size, 1)
self.actor_linear1 = nn.Linear(num_inputs, hidden_size)
self.actor_linear2 = nn.Linear(hidden_size, num_actions)
def forward(self, state):
state = Variable(torch.from_numpy(state).float().unsqueeze(0))
value = F.relu(self.critic_linear1(state))
value = self.critic_linear2(value)
policy_dist = F.relu(self.actor_linear1(state))
policy_dist = F.softmax(self.actor_linear2(policy_dist), dim=1)
return value, policy_dist
def a2c(env):
num_inputs = env.observation_space.shape[0]
num_outputs = env.action_space.n
actor_critic = ActorCritic(num_inputs, num_outputs, hidden_size)
ac_optimizer = optim.Adam(actor_critic.parameters(), lr=learning_rate)
all_lengths = []
average_lengths = []
all_rewards = []
entropy_term = 0
for episode in range(max_episodes):
log_probs = []
values = []
rewards = []
state,_ = env.reset()
for steps in range(num_steps):
value, policy_dist = actor_critic.forward(state)
value = value.detach().numpy()[0, 0]
dist = policy_dist.detach().numpy()
action = np.random.choice(num_outputs, p=np.squeeze(dist))
log_prob = torch.log(policy_dist.squeeze(0)[action])
entropy = -np.sum(np.mean(dist) * np.log(dist))
new_state, reward, done, truncated, info = env.step(action)
rewards.append(reward)
values.append(value)
log_probs.append(log_prob)
entropy_term += entropy
state = new_state
if done or steps == num_steps - 1:
Qval, _ = actor_critic.forward(new_state)
Qval = Qval.detach().numpy()[0, 0]
all_rewards.append(np.sum(rewards))
all_lengths.append(steps)
average_lengths.append(np.mean(all_lengths[-10:]))
if episode % 10 == 0:
sys.stdout.write("episode: {}, reward: {}, total length: {}, average length: {} \n".format(episode,np.sum(rewards),steps,average_lengths[-1]))
break
# compute Q values
Qvals = np.zeros_like(values)
for t in reversed(range(len(rewards))):
Qval = rewards[t] + GAMMA * Qval
Qvals[t] = Qval
# update actor critic
values = torch.FloatTensor(values)
Qvals = torch.FloatTensor(Qvals)
log_probs = torch.stack(log_probs)
advantage = Qvals - values
actor_loss = (-log_probs * advantage).mean()
critic_loss = 0.5 * advantage.pow(2).mean()
ac_loss = actor_loss + critic_loss + 0.001 * entropy_term
ac_optimizer.zero_grad()
ac_loss.backward()
ac_optimizer.step()
# Plot results
smoothed_rewards = pd.Series.rolling(pd.Series(all_rewards), 10).mean()
smoothed_rewards = [elem for elem in smoothed_rewards]
plt.plot(all_rewards)
plt.plot(smoothed_rewards)
plt.plot()
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.show()
plt.plot(all_lengths)
plt.plot(average_lengths)
plt.xlabel('Episode')
plt.ylabel('Episode length')
plt.show()
if __name__ == "__main__":
env = gym.make("CartPole-v1")
a2c(env)
Asynchronous Advantage Actor Critic
从本质上讲,A3C实现了并行训练,其中多个并行环境中的Actor独立更新全局值函数,因此是“异步”的。异步的优点是对状态空间进行了有效和高效的探索。
A3C使用N个智能体并行工作,收集样本并计算策略和价值函数的梯度 一段时间后,将梯度传递给主网络,使用所有代理的梯度来更新Actor和Critic 一段时间后,智能体复制全局网络的权重 这种并行性使代理的数据无相关性,因此不需要经验重放缓冲区 甚至可以明确在每个Actor和Critic 中使用不同的探索策略以最大限度地提高多样性 异步化可以扩展到其他更新机制(SARSA、Q-learning等),但在Advantage Actor critic模式下效果更好
图源:https://pylessons.com/A3C-reinforcement-learning
图源:https://cse.buffalo.edu/~avereshc/rl_fall19/lecture_20_Actor_Critic_Methods.pdf
如需要上述代码的requirements.txt,请关注公众号后回复“策略梯度requirements.txt”