乐趣区

关于自然语言处理:强化学习从基础到进阶案例与实践42深度Q网络DQNCart-pole游戏展示

强化学习从根底到进阶 - 案例与实际[4.2]:深度 Q 网络 DQN-Cart pole 游戏展现

  • 强化学习(Reinforcement learning,简称 RL)是机器学习中的一个畛域,区别与监督学习和无监督学习,强调如何基于环境而口头,以获得最大化的预期利益。
  • 根本操作步骤:智能体 agent 在环境 environment 中学习,依据环境的状态state(或观测到的observation),执行动作action,并依据环境的反馈reward(处分)来领导更好的动作。

比方本我的项目的 Cart pole 小游戏中,agent就是动图中的杆子,杆子有向左向右两种action

## 装置依赖
!pip install pygame
!pip install gym
!pip install atari_py
!pip install parl
import gym
import os
import random
import collections

import paddle
import paddle.nn as nn
import numpy as np
import paddle.nn.functional as F

1. 教训回放局部

教训回放次要做的事件是:把后果存入教训池,而后教训池中随机取出一条后果进行训练。

这样做有两个益处:

  1. 缩小样本之间的关联性
  2. 进步样本的利用率

之所以退出 experience replay 是因为样本是从游戏中的间断帧取得的,这与简略的 reinforcement learning 问题相比,样本的关联性大了很多,如果没有 experience replay,算法在间断一段时间内根本朝着同一个方向做 gradient descent,那么同样的步长下这样间接计算 gradient 就有可能不收敛。因而 experience replay 是从一个 memory pool 中随机选取了一些 expeirence,而后再求梯度,从而防止了这个问题。

class ReplayMemory(object):
    def __init__(self, max_size):
        self.buffer = collections.deque(maxlen=max_size)

    # 减少一条教训到教训池中
    def append(self, exp):
        self.buffer.append(exp)

    # 从教训池中选取 N 条教训进去
    def sample(self, batch_size):
        mini_batch = random.sample(self.buffer, batch_size)
        obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []

        for experience in mini_batch:
            s, a, r, s_p, done = experience
            obs_batch.append(s)
            action_batch.append(a)
            reward_batch.append(r)
            next_obs_batch.append(s_p)
            done_batch.append(done)

        return np.array(obs_batch).astype('float32'), np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'), np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')

    def __len__(self):
        return len(self.buffer)

2.DQN

DQN 算法较一般算法在教训回放和固定 Q 指标有了较大的改良,次要起因:

  • 教训回放:他充分利用了 off-colicp 的劣势,通过训练把后果(问题)存入 Q 表格,而后随机从表格中取出一条后果进行优化。这样子一方面能够:缩小样本之间的关联性另一方面:进步样本的利用率 注:训练后果会存进 Q 表格,当 Q 表格满了当前,存进来的数据会把最早存进去的数据“挤出去”(弹出)
  • 固定 Q 指标他解决了算法更新不安稳的问题。和监督学习做比拟,监督学习的最终值要迫近理论后果,这个后果是固定的,然而咱们的 DQN 却不是,他的目标值是通过神经网络当前的一个值,那么这个值是变动的不好拟合,怎么办,DQN 团队想到了一个很好的方法,让这个值在肯定工夫外面放弃不变,这样子这个指标就能够确定了,而后目标值更新当前更加靠近理论后果,能够更好的进行训练。

3. 模型 Model

这里的模型能够依据本人的需要抉择不同的神经网络组建。

DQN用来定义前向 (Forward) 网络,能够自在的定制本人的网络结构。

class DQN(nn.Layer):
    def __init__(self, outputs):
        super(DQN, self).__init__()
        self.linear1 = nn.Linear(in_features=4, out_features=128)
        self.linear2 = nn.Linear(in_features=128, out_features=24)
        self.linear3 = nn.Linear(in_features=24, out_features=outputs)

    def forward(self, x):
        x = self.linear1(x)
        x = F.relu(x)
        x = self.linear2(x)
        x = F.relu(x)
        x = self.linear3(x)
        return x

4. 智能体 Agent 的学习函数

这里包含模型摸索与模型训练两个局部

Agent负责算法与环境的交互,在交互过程中把生成的数据提供给 Algorithm 来更新模型(Model),数据的预处理流程也个别定义在这里。

def sample(obs, MODEL):
    global E_GREED
    global ACTION_DIM
    global E_GREED_DECREMENT
    sample = np.random.rand()  # 产生 0~1 之间的小数
    if sample < E_GREED:
        act = np.random.randint(ACTION_DIM)  # 摸索:每个动作都有概率被抉择
    else:
        obs = np.expand_dims(obs, axis=0)
        obs = paddle.to_tensor(obs, dtype='float32')
        act = MODEL(obs)
        act = np.argmax(act.numpy())  # 抉择最优动作
    E_GREED = max(0.01, E_GREED - E_GREED_DECREMENT)  # 随着训练逐渐收敛,摸索的水平缓缓升高
    return act


def learn(obs, act, reward, next_obs, terminal, TARGET_MODEL, MODEL):
    global global_step
    # 每隔 200 个 training steps 同步一次 model 和 target_model 的参数
    if global_step % 50 == 0:
        TARGET_MODEL.load_dict(MODEL.state_dict())
    global_step += 1

    obs = np.array(obs).astype('float32')
    next_obs = np.array(next_obs).astype('float32')
    # act = np.expand_dims(act, -1)
    cost = optimize_model(obs, act, reward, next_obs,
                          terminal, TARGET_MODEL, MODEL)  # 训练一次网络
    return cost


def optimize_model(obs, action, reward, next_obs, terminal, TARGET_MODEL, MODEL):
    """应用 DQN 算法更新 self.model 的 value 网络"""
    # 从 target_model 中获取 max Q' 的值,用于计算 target_Q
    global E_GREED
    global ACTION_DIM
    global E_GREED_DECREMENT
    global GAMMA
    global LEARNING_RATE
    global opt

    opt = paddle.optimizer.Adam(learning_rate=LEARNING_RATE,
                                parameters=MODEL.parameters())  # 优化器(动态图)

    obs = paddle.to_tensor(obs)
    next_obs = paddle.to_tensor(next_obs)

    next_pred_value = TARGET_MODEL(next_obs).detach()
    best_v = paddle.max(next_pred_value, axis=1)
    target = reward + (1.0 - terminal) * GAMMA * best_v.numpy()
    target = paddle.to_tensor(target)
    pred_value = MODEL(obs)  # 获取 Q 预测值
    # 将 action 转 onehot 向量,比方:3 => [0,0,0,1,0]
    action = paddle.to_tensor(action.astype('int32'))
    action_onehot = F.one_hot(action, ACTION_DIM)
    action_onehot = paddle.cast(action_onehot, dtype='float32')
    # 上面一行是逐元素相乘,拿到 action 对应的 Q(s,a)
    pred_action_value = paddle.sum(paddle.multiply(action_onehot, pred_value), axis=1)
    # 计算 Q(s,a) 与 target_Q 的均方差,失去 loss
    cost = F.square_error_cost(pred_action_value, target)
    cost = paddle.mean(cost)
    avg_cost = cost
    cost.backward()
    opt.step()
    opt.clear_grad()

    return avg_cost.numpy()

5. 模型梯度更新算法

def run_train(env, rpm, TARGET_MODEL, MODEL):
    MODEL.train()
    TARGET_MODEL.train()
    total_reward = 0
    obs = env.reset()

    global global_step
    while True:
        global_step += 1
        # 获取随机动作和执行游戏
        action = sample(obs, MODEL)

        next_obs, reward, isOver, info = env.step(action)

        # 记录数据
        rpm.append((obs, action, reward, next_obs, isOver))

        # 在预热实现之后,每隔 LEARN_FREQ 步数就训练一次
        if (len(rpm) > MEMORY_WARMUP_SIZE) and (global_step % LEARN_FREQ == 0):
            (batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver) = rpm.sample(BATCH_SIZE)
            train_loss = learn(batch_obs, batch_action, batch_reward,
                               batch_next_obs, batch_isOver, TARGET_MODEL, MODEL)

        total_reward += reward
        obs = next_obs.astype('float32')

        # 完结游戏
        if isOver:
            break
    return total_reward


def evaluate(model, env, render=False):
    model.eval()
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        episode_reward = 0
        while True:
            obs = np.expand_dims(obs, axis=0)
            obs = paddle.to_tensor(obs, dtype='float32')
            action = model(obs)
            action = np.argmax(action.numpy())
            obs, reward, done, _ = env.step(action)
            episode_reward += reward
            if render:
                env.render()
            if done:
                break
        eval_reward.append(episode_reward)
    return np.mean(eval_reward)

6. 训练函数与验证函数

设置超参数

LEARN_FREQ = 5  # 训练频率,不须要每一个 step 都 learn,攒一些新增教训后再 learn,提高效率
MEMORY_SIZE = 20000  # replay memory 的大小,越大越占用内存
MEMORY_WARMUP_SIZE = 200  # replay_memory 里须要预存一些教训数据,再开启训练
BATCH_SIZE = 32  # 每次给 agent learn 的数据数量,从 replay memory 随机里 sample 一批数据进去
LEARNING_RATE = 0.001  # 学习率大小
GAMMA = 0.99  # reward 的衰减因子,个别取 0.9 到 0.999 不等

E_GREED = 0.1  # 摸索初始概率
E_GREED_DECREMENT = 1e-6  # 在训练过程中,升高摸索的概率
MAX_EPISODE = 20000  # 训练次数
SAVE_MODEL_PATH = "models/save"  # 保留模型门路
OBS_DIM = None
ACTION_DIM = None
global_step = 0
def main():
    global OBS_DIM
    global ACTION_DIM

    train_step_list = []
    train_reward_list = []
    evaluate_step_list = []
    evaluate_reward_list = []

    # 初始化游戏
    env = gym.make('CartPole-v0')
    # 图像输出形态和动作维度
    action_dim = env.action_space.n
    obs_dim = env.observation_space.shape
    OBS_DIM = obs_dim
    ACTION_DIM = action_dim
    max_score = -int(1e4)

    # 创立存储执行游戏的内存
    rpm = ReplayMemory(MEMORY_SIZE)
    MODEL = DQN(ACTION_DIM)
    TARGET_MODEL = DQN(ACTION_DIM)
    # if os.path.exists(os.path.dirname(SAVE_MODEL_PATH)):
    #     MODEL_DICT = paddle.load(SAVE_MODEL_PATH+'.pdparams')
    #     MODEL.load_dict(MODEL_DICT)  # 加载模型参数
    print("filling memory...")
    while len(rpm) < MEMORY_WARMUP_SIZE:
        run_train(env, rpm, TARGET_MODEL, MODEL)
    print("filling memory done")

    # 开始训练
    episode = 0

    print("start training...")
    # 训练 max_episode 个回合,test 局部不计算入 episode 数量
    while episode < MAX_EPISODE:
        # train part
        for i in range(0, int(50)):
            # First we need a state
            total_reward = run_train(env, rpm, TARGET_MODEL, MODEL)
            episode += 1
        
        # print("episode:{}    reward:{}".format(episode, str(total_reward)))

        # test part
        # print("start evaluation...")
        eval_reward = evaluate(TARGET_MODEL, env)
        print('episode:{}    e_greed:{}   test_reward:{}'.format(episode, E_GREED, eval_reward))

        evaluate_step_list.append(episode)
        evaluate_reward_list.append(eval_reward)

        # if eval_reward > max_score or not os.path.exists(os.path.dirname(SAVE_MODEL_PATH)):
        #     max_score = eval_reward
        #     paddle.save(TARGET_MODEL.state_dict(), SAVE_MODEL_PATH+'.pdparams')  # 保留模型


if __name__ == '__main__':
    main()

filling memory…
filling memory done
start training…
episode:50 e_greed:0.0992949999999993 test_reward:9.0
episode:100 e_greed:0.0987909999999988 test_reward:9.8
episode:150 e_greed:0.09827199999999828 test_reward:10.0
episode:200 e_greed:0.09777599999999778 test_reward:8.8
episode:250 e_greed:0.09726999999999728 test_reward:9.0
episode:300 e_greed:0.09676199999999677 test_reward:10.0
episode:350 e_greed:0.0961919999999962 test_reward:14.8

我的项目链接 fork 一下即可运行

https://www.heywhale.com/mw/project/649e7d3f70567260f8f11d2b

更多优质内容请关注公号:汀丶人工智能

退出移动版