Exp 1:原始策略优化(Vanilla Policy Gradient)
Vanilla adj. 一般的,没有新意的;香草的
训练算法总体思路
次要的训练算法集中在 RL_Trainer.run_training_loop
中。通过观察一个循环(iteration)的调用过程,可总结为:
- 收集多个路线,,取得 train_batch_size 个时刻的数据 存在 replay buffer 中
- 上一步齐全完结之后,从 buffer 中采样最近 batch_size 个时刻数据
用数据训练模型
- 先更新策略
- 再更新 baseline (第一节还不须要)
第一、二步是齐全串行的。且通过观察代码实现可知,尽管有 buffer 作数据直达,但 train_batch_size 与 batch_size 定义为相等,每次训练的数据都是上次模型更新之后的。因而仍然是 On-Policy RL。
第一局部次要须要补全 PGAgent.train
,这一函数又牵扯到计算 总收益 Q,以及 policy update
两种 \( \hat{Q} \) 的计算:Reward-to-go or Not
留神这里是 \( \hat{Q} \) (Q-hat),是 单个蒙特卡洛采样门路的收益值,而不是 Q-learning 中神经网络给出的 状态-动作对 的预测价值。
Reward-to-go 就是思考因果的 Q 值,t 工夫点的的 Q 不思考 t 之前工夫点的收益。
不思考因果的:求和即可,每个地位的值都和时刻 t 无关,都是一样的。
##################################################### ################## HELPER FUNCTIONS ################# ##################################################### def _discounted_return(self, rewards): """ Helper function Input: list of rewards {r_0, r_1, ..., r_t', ... r_T} from a single rollout of length T Output: list where each index t contains sum_{t'=0}^T gamma^t' r_{t'} """ discounted_sum, discount = 0, 1 for rr in rewards: discounted_sum += discount * rr discount *= self.gamma return [discounted_sum for i in range(len(rewards))]
思考 Reward-to-go 的:应用迭代的方法
$$\begin{align}\hat{Q}_{t}&=\sum_{t'=t}^{T} \gamma^{t'-t} * r_{t'}\\ &=\sum_{t'=t+1}^{T} \gamma^{t'-t} * r_{t'}+r_{t}\\ &=\gamma\sum_{t'=t+1}^{T} \gamma^{t'-t-1} * r_{t'}+r_{t}\\ &=\hat{Q}_{t+1}+r_{t}\end{align}$$
而 已知:
$$\hat{Q}_{T}=\sum_{t'=T}^{T} \gamma^{t'-T} * r_{t'}=r_{T}$$
def _discounted_cumsum(self, rewards): """ Helper function which -takes a list of rewards {r_0, r_1, ..., r_t', ... r_T}, -and returns a list where the entry in each index t is sum_{t'=t}^T gamma^(t'-t) * r_{t'} (For Reward-to-go) """ rtg_discounted_q = rewards.copy() for i in range(len(rtg_discounted_q)-2, -1, -1): rtg_discounted_q[i] = self.gamma * (rtg_discounted_q[i+1]) + rewards[i] return rtg_discounted_q
策略更新(Policy Updating)
策略优化的数学实质是:通过调整 策略概率模型 的散布,最大化收益的期望值 。
但通过应用 对数求导 的数学技巧,策略优化指标函数从后果上讲,能够认为是 策略对数概率 的加权均匀 ,而权重是 收益值之和。因而收益越高的决策权重越大。(当然这是感性认识,而不是数学实质)
$$\nabla_{\theta} J(\theta) \approx \frac{1}{N} \sum_{i=1}^{N} \sum_{t=1}^{T} \nabla_{\theta} \log \pi_{\theta}\left(\mathbf{a}_{i, t} \mid \mathbf{s}_{i, t}\right) \hat{Q}_{i, t}^{\pi}$$
class MLPPolicyPG(MLPPolicy): def __init__(self, ac_dim, ob_dim, n_layers, size, **kwargs): super().__init__(ac_dim, ob_dim, n_layers, size, **kwargs) self.baseline_loss = nn.MSELoss() def update(self, observations, actions, advantages, q_values=None): observations = ptu.from_numpy(observations) actions = ptu.from_numpy(actions) advantages = ptu.from_numpy(advantages) self.optimizer.zero_grad() observations, actions, advantages = ptu.from_numpy(observations), ptu.from_numpy(actions), ptu.from_numpy(advantages) action_distribution = self.forward(observations) log_probs = action_distribution.log_prob(actions) loss = -torch.mul(log_probs, advantages).mean() loss.backward() self.optimizer.step() if self.nn_baseline: pass # omitted train_log = { 'Training Loss': ptu.to_numpy(loss), } return train_log
Exp 2:Neural Network Baselines
Critic 模型用作 Baseline 或者 Critic
这届内容理论是在 Actor-Critic 章节的课程才讲的 ()
Critic 模型指的是一个学习器(比方神经网络),输出是状态(或再加上动作),输入是这个状态(或 状态-动作 对)的价值。
这一节中尽管存在 Critic 神经网络模型,但它是作为 Baseline 应用,所以仍然是 Policy Gradient,而不是 Actor-Critic。分辨的准则是,PG 在策略更新中 Advantage 值的 被减数 仍然是蒙特卡洛采样 \( \hat{Q} \)
Baseline 模型训练
如上文思路,尽管有 buffer 作数据直达,但 train_batch_size 与 batch_size 定义为相等,每次训练的数据都是上次更新之后的。
因而,buffer 只是表象,实质上还是 On-policy RL。On-Policy 假如成立。
Baseline 模型的训练思路是 函数预计:神经网络的特色抽取和拟合能力,使得它可能辨认出不同但类似的状态,从而采取类似的决策。
这种能够认为是 蒙特卡洛法的延申解释。传统蒙特卡洛法须要在 完全相同 的输出上屡次采样( \( V^{\pi}\left(\mathbf{s}_{t}\right) \approx \frac{1}{N}\sum_{i=1}^{N}\sum_{t^{\prime}=t}^{T} r\left(\mathbf{s}_{t^{\prime}}, \mathbf{a}_{t^{\prime}}\right) \) ),这在大部分的 强化学习环境都是不可能的。
当然这里还额定思考了 收益的工夫递加。
class MLPPolicyPG(MLPPolicy): def __init__(self, ac_dim, ob_dim, n_layers, size, **kwargs): super().__init__(ac_dim, ob_dim, n_layers, size, **kwargs) self.baseline_loss = nn.MSELoss() def update(self, observations, actions, advantages, q_values=None): # Updating Policy (omitted) if self.nn_baseline: ## TODO: update the neural network baseline using the q_values as ## targets. The q_values should first be normalized to have a mean ## of zero and a standard deviation of one. ## Note: You will need to convert the targets into a tensor using ## ptu.from_numpy before using it in the loss assert q_values is not None self.baseline_optimizer.zero_grad() q_values = ptu.from_numpy(q_values) if isinstance(q_values, np.ndarray) else q_values q_mean, q_std = torch.mean(q_values), torch.std(q_values) q_values = (q_values - q_mean).divide(q_std) values = self.baseline(observations) print(values.shape, q_values.shape) b_loss = self.baseline_loss(values, q_values) b_loss.backward() self.baseline_optimizer.step() train_log = { 'Training Loss': ptu.to_numpy(loss), } return train_log
引入 Baselines
很简略:
def estimate_advantage(self, obs: np.ndarray, rews_list: np.ndarray, q_values: np.ndarray, terminals: np.ndarray): """ Computes advantages by (possibly) using GAE, or subtracting a baseline from the estimated Q values """ # Estimate the advantage when nn_baseline is True, # by querying the neural network that you're using to learn the value function if self.nn_baseline: values_unnormalized = self.actor.run_baseline_prediction(obs) ## ensure that the value predictions and q_values have the same dimensionality ## to prevent silent broadcasting errors assert values_unnormalized.ndim == q_values.ndim ## TODO: values were trained with standardized q_values, so ensure ## that the predictions have the same mean and standard deviation as ## the current batch of q_values values = values_unnormalized * q_values.std() + q_values.mean() batch_size = obs.shape[0] if self.gae_lambda is not None: pass # TODO else: ## TODO: compute advantage estimates using q_values, and values as baselines advantages = np.zeros(batch_size) for i in range(batch_size): advantages[i] = q_values[i] - values[i] # Else, just set the advantage to [Q] else: advantages = q_values.copy() # Normalize the resulting advantages to have a mean of zero # and a standard deviation of one if self.standardize_advantages: ad_mean, ad_std = np.average(advantages), np.std(advantages) advantages = (advantages - ad_mean) / ad_std return advantages
Exp 3:GAE
def estimate_advantage(self, obs: np.ndarray, rews_list: np.ndarray, q_values: np.ndarray, terminals: np.ndarray): """ Computes advantages by (possibly) using GAE, or subtracting a baseline from the estimated Q values """ # Estimate the advantage when nn_baseline is True, # by querying the neural network that you're using to learn the value function if self.nn_baseline: values_unnormalized = self.actor.run_baseline_prediction(obs) assert values_unnormalized.ndim == q_values.ndim values = values_unnormalized * q_values.std() + q_values.mean() batch_size = obs.shape[0] if self.gae_lambda is not None: ## append a dummy T+1 value for simpler recursive calculation values = np.append(values, [0]) ## combine rews_list into a single array rews = np.concatenate(rews_list) ## create empty numpy array to populate with GAE advantage ## estimates, with dummy T+1 value for simpler recursive calculation advantages = np.zeros(batch_size + 1) flatten_rews = np.concatenate(rews_list) for i in reversed(range(batch_size)): ## TODO: recursively compute advantage estimates starting from ## timestep T. ## HINT: use terminals to handle edge cases. terminals[i] ## is 1 if the state is the last in its trajectory, and ## 0 otherwise. if terminals[i]: advantages[i] = flatten_rews[i] - values[i] else: delta = flatten_rews[i] + self.gamma * values[i+1] - values[i] advantages[i] = delta + self.gamma * advantages[i+1] else: advantages = np.zeros(batch_size) for i in range(batch_size): advantages[i] = q_values[i] - values[i] else: advantages = q_values.copy() # Normalize the resulting advantages to have a mean of zero # and a standard deviation of one if self.standardize_advantages: ad_mean, ad_std = np.average(advantages), np.std(advantages) advantages = (advantages - ad_mean) / ad_std return advantages