来源:DeepHub IMBA
本文约7000字,建议阅读9分钟 本文深入探讨蒙特卡罗 (MC)方法。这些方法的特点是能够仅从经验中学习,不需要任何环境模型,这与动态规划(DP)方法形成对比。
Monte Carlo预测
Monte Carlo控制
带有探索性启动的Monte Carlo控制
def call_once(func):
"""自定义缓存装饰器
忽略第一个参数。
"""
cache = {}
def wrapper(*args, **kwargs):
key = (func.__name__, args[1:])
assert not kwargs, "We don't support kwargs atm"
if key not in cache:
cache[key] = func(*args)
return cache[key]
return wrapper
@call_once
def generate_possible_states(
env: ParametrizedEnv, num_runs: int = 100
) -> list[tuple[int, ParametrizedEnv]]:
"""生成环境的可能状态。
为此,通过选择随机状态并从该状态开始
遵循随机策略,记录新状态,
迭代地增加已知状态集。
Args:
env: 使用的环境
num_runs: 发现循环的次数
Returns:
包含发现状态的列表 - 这些是状态(观察)
和表示该状态的gym环境的元组
"""
_, action_space = get_observation_action_space(env)
observation, _ = env.env.reset()
possible_states = [(observation, copy.deepcopy(env))]
for _ in range(num_runs):
observation, env = random.choice(possible_states)
env = copy.deepcopy(env)
terminated = truncated = False
while not terminated and not truncated:
action = np.random.choice([a for a in range(action_space.n)])
observation, _, terminated, truncated, _ = env.env.step(action)
if observation in set([state for state, _ in possible_states]):
break
else:
if not terminated and not truncated:
possible_states.append((observation, env))
return possible_states
def generate_random_start(env: ParametrizedEnv) -> tuple[int, ParametrizedEnv]:
"""选择一个随机起始状态。
为此,首先生成所有可能的状态(缓存),
然后从这些状态中随机选择一个。
"""
possible_states = generate_possible_states(env)
observation, env = random.choice(possible_states)
return copy.deepcopy(observation), copy.deepcopy(env)
def generate_episode(
env: ParametrizedEnv,
pi: np.ndarray,
exploring_starts: bool,
max_episode_length: int = 20,
) -> list[tuple[Any, Any, Any]]:
"""生成一个遵循给定策略的情节。
Args:
env: 使用的环境
pi: 要遵循的策略
exploring_starts: 当为True时遵循探索性启动假设(ES)
Returns:
生成的情节
"""
_, action_space = get_observation_action_space(env)
episode = []
observation, _ = env.env.reset()
if exploring_starts:
# 如果是ES,选择随机起始状态
observation, env = generate_random_start(env)
terminated = truncated = False
initial_step = True
while not terminated and not truncated:
if initial_step and exploring_starts:
# 如果是ES,初始时选择随机动作
action = np.random.choice([a for a in range(action_space.n)])
else:
action = np.random.choice(
[a for a in range(action_space.n)], p=pi[observation]
)
initial_step = False
observation_new, reward, terminated, truncated, _ = env.env.step(action)
episode.append((observation, action, reward))
# 终止智能体陷入死循环的情节
if len(episode) > max_episode_length:
break
observation = observation_new
return episode
def mc_es(env: ParametrizedEnv) -> np.ndarray:
"""通过带有探索性启动的Monte Carlo方法
求解传入的Gymnasium环境。
Args:
env: 包含问题的环境
Returns:
找到的策略
"""
observation_space, action_space = get_observation_action_space(env)
pi = (
np.ones((observation_space.n, action_space.n)).astype(np.int32) / action_space.n
)
Q = np.zeros((observation_space.n, action_space.n))
returns = defaultdict(list)
num_steps = 1000
for t in range(num_steps):
episode = generate_episode(env, pi, True)
G = 0.0
for t in range(len(episode) - 1, -1, -1):
s, a, r = episode[t]
G = env.gamma * G + r
prev_s = [(s, a) for (s, a, _) in episode[:t]]
if (s, a) not in prev_s:
returns[s, a].append(G)
Q[s, a] = statistics.fmean(returns[s, a])
if not all(Q[s, a] == Q[s, 0] for a in range(action_space.n)):
for a in range(action_space.n):
pi[s, a] = 1 if a == np.argmax(Q[s]) else 0
return np.argmax(pi, 1)
无探索性启动的Monte Carlo控制
def on_policy_mc(env: ParametrizedEnv) -> np.ndarray:
"""通过on-policy Monte Carlo控制方法
求解传入的Gymnasium环境。
Args:
env: 包含问题的环境
Returns:
找到的策略
"""
observation_space, action_space = get_observation_action_space(env)
pi = (
np.ones((observation_space.n, action_space.n)).astype(np.int32) / action_space.n
)
Q = np.zeros((observation_space.n, action_space.n))
returns = defaultdict(list)
num_steps = 1000
for _ in range(num_steps):
episode = generate_episode(env, pi, False)
G = 0.0
for t in range(len(episode) - 1, -1, -1):
s, a, r = episode[t]
G = env.gamma * G + r
prev_s = [(s, a) for (s, a, _) in episode[:t]]
if (s, a) not in prev_s:
returns[s, a].append(G)
Q[s, a] = statistics.fmean(returns[s, a])
if not all(Q[s, a] == Q[s, 0] for a in range(action_space.n)):
A_star = np.argmax(Q[s, :])
for a in range(action_space.n):
pi[s, a] = (
1 - env.eps + env.eps / action_space.n
if a == A_star
else env.eps / action_space.n
)
return np.argmax(pi, 1)
Off-Policy Monte Carlo控制
def off_policy_mc(env: ParametrizedEnv) -> np.ndarray:
"""通过off-policy Monte Carlo控制方法
求解传入的Gymnasium环境。
Args:
env: 包含问题的环境
Returns:
找到的策略
"""
observation_space, action_space = get_observation_action_space(env)
Q = np.zeros((observation_space.n, action_space.n))
C = np.zeros((observation_space.n, action_space.n))
pi = np.argmax(Q, 1)
num_steps = 1000
for _ in range(num_steps):
b = np.random.rand(int(observation_space.n), int(action_space.n))
b = b / np.expand_dims(np.sum(b, axis=1), -1)
episode = generate_episode(env, b, False)
G = 0.0
W = 1
for t in range(len(episode) - 1, -1, -1):
s, a, r = episode[t]
G = env.gamma * G + r
C[s, a] += W
Q[s, a] += W / C[s, a] * (G - Q[s, a])
pi = np.argmax(Q, 1)
if a != np.argmax(Q[s]):
break
W *= 1 / b[s, a]
return pi
def off_policy_mc_non_inc(env: ParametrizedEnv) -> np.ndarray:
"""通过on-policy MonteCarlo控制方法求解传入的Gymnasium环境 -
但不使用Sutton的增量算法来更新重要性采样权重。
Args:
env: 包含问题的环境
Returns:
找到的策略
"""
observation_space, action_space = get_observation_action_space(env)
Q = np.zeros((observation_space.n, action_space.n))
num_steps = 10000
returns = defaultdict(list)
ratios = defaultdict(list)
for _ in range(num_steps):
b = np.random.rand(int(observation_space.n), int(action_space.n))
b = b / np.expand_dims(np.sum(b, axis=1), -1)
episode = generate_episode(env, b, False)
G = 0.0
ratio = 1
for t in range(len(episode) - 1, -1, -1):
s, a, r = episode[t]
G = env.gamma * G + r
# 创建当前目标策略,
# 它是Q函数的argmax,
# 但对于Q值相等的情况给予相等的权重
pi = np.zeros_like(Q)
pi[np.arange(Q.shape[0]), np.argmax(Q, 1)] = 1
uniform_rows = np.all(Q == Q[:, [0]], axis=1)
pi[uniform_rows] = 1 / action_space.n
ratio *= pi[s, a] / b[s, a]
if ratio == 0:
break
returns[s, a].append(G)
ratios[s, a].append(ratio)
Q[s, a] = sum([r * s for r, s in zip(returns[s, a], ratios[s, a])]) / sum(
[s for s in ratios[s, a]]
)
Q = np.nan_to_num(Q, nan=0.0)
return np.argmax(Q, 1)
总结
参考文献
[1] Sutton, R. S., & Barto, A. G. (2018). Reinforcement learning: An introduction. MIT press. http://incompleteideas.net/book/RLbook2020.pdf 注:公式都是从这里截图的
[2] Farama Foundation. (2022). Gymnasium: A Python Library for Reinforcement Learning Environments. https://github.com/Farama-Foundation/Gymnasium
微信公众号后台回复
加群:加入全球华人OR|AI|DS社区硕博微信学术群
资料:免费获得大量运筹学相关学习资料
人才库:加入运筹精英人才库,获得独家职位推荐
电子书:免费获取平台小编独家创作的优化理论、运筹实践和数据科学电子书,持续更新中ing...
加入我们:加入「运筹OR帷幄」,参与内容创作平台运营
知识星球:加入「运筹OR帷幄」数据算法社区,免费参与每周「领读计划」、「行业inTalk」、「OR会客厅」等直播活动,与数百位签约大V进行在线交流
文章须知
文章作者:数据派THU
责任编辑:Road Rash
微信编辑:疑疑
文章转载自『数据派THU』公众号,原文链接:Monte Carlo方法解决强化学习问题
关注我们
FOLLOW US