Exp 1:原始策略优化(Vanilla Policy Gradient)

Vanilla adj. 一般的,没有新意的;香草的

训练算法总体思路

次要的训练算法集中在 RL_Trainer.run_training_loop 中。通过观察一个循环(iteration)的调用过程,可总结为:

  1. 收集多个路线,,取得 train_batch_size 个时刻的数据 存在 replay buffer 中
  2. 上一步齐全完结之后,从 buffer 中采样最近 batch_size 个时刻数据
  3. 用数据训练模型

    1. 先更新策略
    2. 再更新 baseline (第一节还不须要)

第一、二步是齐全串行的。且通过观察代码实现可知,尽管有 buffer 作数据直达,但 train_batch_size 与 batch_size 定义为相等,每次训练的数据都是上次模型更新之后的。因而仍然是 On-Policy RL。

第一局部次要须要补全 PGAgent.train,这一函数又牵扯到计算 总收益 Q,以及 policy update

两种 \( \hat{Q} \) 的计算:Reward-to-go or Not

留神这里是 \( \hat{Q} \) (Q-hat),是 单个蒙特卡洛采样门路的收益值,而不是 Q-learning 中神经网络给出的 状态-动作对 的预测价值。

Reward-to-go 就是思考因果的 Q 值,t 工夫点的的 Q 不思考 t 之前工夫点的收益。

不思考因果的:求和即可,每个地位的值都和时刻 t 无关,都是一样的。

    #####################################################    ################## HELPER FUNCTIONS #################    #####################################################    def _discounted_return(self, rewards):        """            Helper function            Input: list of rewards {r_0, r_1, ..., r_t', ... r_T} from a single rollout of length T            Output: list where each index t contains sum_{t'=0}^T gamma^t' r_{t'}        """        discounted_sum, discount = 0, 1        for rr in rewards:            discounted_sum += discount * rr            discount *= self.gamma        return [discounted_sum for i in range(len(rewards))]

思考 Reward-to-go 的:应用迭代的方法

$$\begin{align}\hat{Q}_{t}&=\sum_{t'=t}^{T} \gamma^{t'-t} * r_{t'}\\ &=\sum_{t'=t+1}^{T} \gamma^{t'-t} * r_{t'}+r_{t}\\ &=\gamma\sum_{t'=t+1}^{T} \gamma^{t'-t-1} * r_{t'}+r_{t}\\ &=\hat{Q}_{t+1}+r_{t}\end{align}$$

而 已知:

$$\hat{Q}_{T}=\sum_{t'=T}^{T} \gamma^{t'-T} * r_{t'}=r_{T}$$

    def _discounted_cumsum(self, rewards):        """            Helper function which            -takes a list of rewards {r_0, r_1, ..., r_t', ... r_T},            -and returns a list where the entry in each index t is sum_{t'=t}^T gamma^(t'-t) * r_{t'}            (For Reward-to-go)        """        rtg_discounted_q = rewards.copy()        for i in range(len(rtg_discounted_q)-2, -1, -1):            rtg_discounted_q[i] = self.gamma * (rtg_discounted_q[i+1]) + rewards[i]        return rtg_discounted_q

策略更新(Policy Updating)

策略优化的数学实质是:通过调整 策略概率模型 的散布,最大化收益的期望值

但通过应用 对数求导 的数学技巧,策略优化指标函数从后果上讲,能够认为是 策略对数概率 的加权均匀 ,而权重是 收益值之和。因而收益越高的决策权重越大。(当然这是感性认识,而不是数学实质)

$$\nabla_{\theta} J(\theta) \approx \frac{1}{N} \sum_{i=1}^{N} \sum_{t=1}^{T} \nabla_{\theta} \log \pi_{\theta}\left(\mathbf{a}_{i, t} \mid \mathbf{s}_{i, t}\right) \hat{Q}_{i, t}^{\pi}$$

class MLPPolicyPG(MLPPolicy):    def __init__(self, ac_dim, ob_dim, n_layers, size, **kwargs):        super().__init__(ac_dim, ob_dim, n_layers, size, **kwargs)        self.baseline_loss = nn.MSELoss()    def update(self, observations, actions, advantages, q_values=None):        observations = ptu.from_numpy(observations)        actions = ptu.from_numpy(actions)        advantages = ptu.from_numpy(advantages)        self.optimizer.zero_grad()        observations, actions, advantages = ptu.from_numpy(observations), ptu.from_numpy(actions), ptu.from_numpy(advantages)        action_distribution = self.forward(observations)        log_probs = action_distribution.log_prob(actions)        loss = -torch.mul(log_probs, advantages).mean()        loss.backward()        self.optimizer.step()        if self.nn_baseline:            pass # omitted        train_log = {            'Training Loss': ptu.to_numpy(loss),        }        return train_log

Exp 2:Neural Network Baselines

Critic 模型用作 Baseline 或者 Critic

这届内容理论是在 Actor-Critic 章节的课程才讲的 ()

Critic 模型指的是一个学习器(比方神经网络),输出是状态(或再加上动作),输入是这个状态(或 状态-动作 对)的价值。

这一节中尽管存在 Critic 神经网络模型,但它是作为 Baseline 应用,所以仍然是 Policy Gradient,而不是 Actor-Critic。分辨的准则是,PG 在策略更新中 Advantage 值的 被减数 仍然是蒙特卡洛采样 \( \hat{Q} \)

Baseline 模型训练

如上文思路,尽管有 buffer 作数据直达,但 train_batch_size 与 batch_size 定义为相等,每次训练的数据都是上次更新之后的。

因而,buffer 只是表象,实质上还是 On-policy RL。On-Policy 假如成立。

Baseline 模型的训练思路是 函数预计:神经网络的特色抽取和拟合能力,使得它可能辨认出不同但类似的状态,从而采取类似的决策。

这种能够认为是 蒙特卡洛法的延申解释。传统蒙特卡洛法须要在 完全相同 的输出上屡次采样( \( V^{\pi}\left(\mathbf{s}_{t}\right) \approx \frac{1}{N}\sum_{i=1}^{N}\sum_{t^{\prime}=t}^{T} r\left(\mathbf{s}_{t^{\prime}}, \mathbf{a}_{t^{\prime}}\right) \) ),这在大部分的 强化学习环境都是不可能的。

当然这里还额定思考了 收益的工夫递加。

class MLPPolicyPG(MLPPolicy):    def __init__(self, ac_dim, ob_dim, n_layers, size, **kwargs):        super().__init__(ac_dim, ob_dim, n_layers, size, **kwargs)        self.baseline_loss = nn.MSELoss()    def update(self, observations, actions, advantages, q_values=None):        # Updating Policy (omitted)        if self.nn_baseline:            ## TODO: update the neural network baseline using the q_values as            ## targets. The q_values should first be normalized to have a mean            ## of zero and a standard deviation of one.            ## Note: You will need to convert the targets into a tensor using                ## ptu.from_numpy before using it in the loss            assert q_values is not None            self.baseline_optimizer.zero_grad()            q_values = ptu.from_numpy(q_values) if isinstance(q_values, np.ndarray) else q_values            q_mean, q_std = torch.mean(q_values), torch.std(q_values)            q_values = (q_values - q_mean).divide(q_std)            values = self.baseline(observations)            print(values.shape, q_values.shape)            b_loss = self.baseline_loss(values, q_values)            b_loss.backward()            self.baseline_optimizer.step()        train_log = {            'Training Loss': ptu.to_numpy(loss),        }        return train_log

引入 Baselines

很简略:

def estimate_advantage(self, obs: np.ndarray, rews_list: np.ndarray, q_values: np.ndarray, terminals: np.ndarray):    """        Computes advantages by (possibly) using GAE, or subtracting a baseline from the estimated Q values    """    # Estimate the advantage when nn_baseline is True,    # by querying the neural network that you're using to learn the value function    if self.nn_baseline:        values_unnormalized = self.actor.run_baseline_prediction(obs)        ## ensure that the value predictions and q_values have the same dimensionality        ## to prevent silent broadcasting errors        assert values_unnormalized.ndim == q_values.ndim        ## TODO: values were trained with standardized q_values, so ensure            ## that the predictions have the same mean and standard deviation as            ## the current batch of q_values        values = values_unnormalized * q_values.std() + q_values.mean()        batch_size = obs.shape[0]        if self.gae_lambda is not None:            pass # TODO        else:            ## TODO: compute advantage estimates using q_values, and values as baselines            advantages = np.zeros(batch_size)            for i in range(batch_size):                advantages[i] = q_values[i] - values[i]    # Else, just set the advantage to [Q]    else:        advantages = q_values.copy()    # Normalize the resulting advantages to have a mean of zero    # and a standard deviation of one    if self.standardize_advantages:        ad_mean, ad_std = np.average(advantages), np.std(advantages)        advantages = (advantages - ad_mean) / ad_std    return advantages

Exp 3:GAE

def estimate_advantage(self, obs: np.ndarray, rews_list: np.ndarray, q_values: np.ndarray, terminals: np.ndarray):    """        Computes advantages by (possibly) using GAE, or subtracting a baseline from the estimated Q values    """    # Estimate the advantage when nn_baseline is True,    # by querying the neural network that you're using to learn the value function    if self.nn_baseline:        values_unnormalized = self.actor.run_baseline_prediction(obs)        assert values_unnormalized.ndim == q_values.ndim        values = values_unnormalized * q_values.std() + q_values.mean()        batch_size = obs.shape[0]        if self.gae_lambda is not None:            ## append a dummy T+1 value for simpler recursive calculation            values = np.append(values, [0])            ## combine rews_list into a single array            rews = np.concatenate(rews_list)            ## create empty numpy array to populate with GAE advantage            ## estimates, with dummy T+1 value for simpler recursive calculation            advantages = np.zeros(batch_size + 1)            flatten_rews = np.concatenate(rews_list)            for i in reversed(range(batch_size)):                ## TODO: recursively compute advantage estimates starting from                    ## timestep T.                ## HINT: use terminals to handle edge cases. terminals[i]                    ## is 1 if the state is the last in its trajectory, and                    ## 0 otherwise.                if terminals[i]:                    advantages[i] = flatten_rews[i] - values[i]                else:                    delta = flatten_rews[i] + self.gamma * values[i+1] - values[i]                    advantages[i] = delta + self.gamma * advantages[i+1]        else:            advantages = np.zeros(batch_size)            for i in range(batch_size):                advantages[i] = q_values[i] - values[i]    else:        advantages = q_values.copy()    # Normalize the resulting advantages to have a mean of zero    # and a standard deviation of one    if self.standardize_advantages:        ad_mean, ad_std = np.average(advantages), np.std(advantages)        advantages = (advantages - ad_mean) / ad_std    return advantages