强化学习从根底到进阶-案例与实际[4.2]:深度Q网络DQN-Cart pole游戏展现

  • 强化学习(Reinforcement learning,简称RL)是机器学习中的一个畛域,区别与监督学习和无监督学习,强调如何基于环境而口头,以获得最大化的预期利益。
  • 根本操作步骤:智能体agent在环境environment中学习,依据环境的状态state(或观测到的observation),执行动作action,并依据环境的反馈reward(处分)来领导更好的动作。

比方本我的项目的Cart pole小游戏中,agent就是动图中的杆子,杆子有向左向右两种action

## 装置依赖!pip install pygame!pip install gym!pip install atari_py!pip install parl
import gymimport osimport randomimport collectionsimport paddleimport paddle.nn as nnimport numpy as npimport paddle.nn.functional as F

1.教训回放局部

教训回放次要做的事件是:把后果存入教训池,而后教训池中随机取出一条后果进行训练。

这样做有两个益处:

  1. 缩小样本之间的关联性
  2. 进步样本的利用率
之所以退出experience replay是因为样本是从游戏中的间断帧取得的,这与简略的reinforcement learning问题相比,样本的关联性大了很多,如果没有experience replay,算法在间断一段时间内根本朝着同一个方向做gradient descent,那么同样的步长下这样间接计算gradient就有可能不收敛。因而experience replay是从一个memory pool中随机选取了一些expeirence,而后再求梯度,从而防止了这个问题。
class ReplayMemory(object):    def __init__(self, max_size):        self.buffer = collections.deque(maxlen=max_size)    # 减少一条教训到教训池中    def append(self, exp):        self.buffer.append(exp)    # 从教训池中选取N条教训进去    def sample(self, batch_size):        mini_batch = random.sample(self.buffer, batch_size)        obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []        for experience in mini_batch:            s, a, r, s_p, done = experience            obs_batch.append(s)            action_batch.append(a)            reward_batch.append(r)            next_obs_batch.append(s_p)            done_batch.append(done)        return np.array(obs_batch).astype('float32'), np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'), np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')    def __len__(self):        return len(self.buffer)

2.DQN

DQN算法较一般算法在教训回放和固定Q指标有了较大的改良,次要起因:

  • 教训回放:他充分利用了off-colicp的劣势,通过训练把后果(问题)存入Q表格,而后随机从表格中取出一条后果进行优化。这样子一方面能够:缩小样本之间的关联性另一方面:进步样本的利用率 注:训练后果会存进Q表格,当Q表格满了当前,存进来的数据会把最早存进去的数据“挤出去”(弹出)
  • 固定Q指标他解决了算法更新不安稳的问题。 和监督学习做比拟,监督学习的最终值要迫近理论后果,这个后果是固定的,然而咱们的DQN却不是,他的目标值是通过神经网络当前的一个值,那么这个值是变动的不好拟合,怎么办,DQN团队想到了一个很好的方法,让这个值在肯定工夫外面放弃不变,这样子这个指标就能够确定了,而后目标值更新当前更加靠近理论后果,能够更好的进行训练。

3.模型Model

这里的模型能够依据本人的需要抉择不同的神经网络组建。

DQN用来定义前向(Forward)网络,能够自在的定制本人的网络结构。

class DQN(nn.Layer):    def __init__(self, outputs):        super(DQN, self).__init__()        self.linear1 = nn.Linear(in_features=4, out_features=128)        self.linear2 = nn.Linear(in_features=128, out_features=24)        self.linear3 = nn.Linear(in_features=24, out_features=outputs)    def forward(self, x):        x = self.linear1(x)        x = F.relu(x)        x = self.linear2(x)        x = F.relu(x)        x = self.linear3(x)        return x

4.智能体Agent的学习函数

这里包含模型摸索与模型训练两个局部

Agent负责算法与环境的交互,在交互过程中把生成的数据提供给Algorithm来更新模型(Model),数据的预处理流程也个别定义在这里。

def sample(obs, MODEL):    global E_GREED    global ACTION_DIM    global E_GREED_DECREMENT    sample = np.random.rand()  # 产生0~1之间的小数    if sample < E_GREED:        act = np.random.randint(ACTION_DIM)  # 摸索:每个动作都有概率被抉择    else:        obs = np.expand_dims(obs, axis=0)        obs = paddle.to_tensor(obs, dtype='float32')        act = MODEL(obs)        act = np.argmax(act.numpy())  # 抉择最优动作    E_GREED = max(0.01, E_GREED - E_GREED_DECREMENT)  # 随着训练逐渐收敛,摸索的水平缓缓升高    return actdef learn(obs, act, reward, next_obs, terminal, TARGET_MODEL, MODEL):    global global_step    # 每隔200个training steps同步一次model和target_model的参数    if global_step % 50 == 0:        TARGET_MODEL.load_dict(MODEL.state_dict())    global_step += 1    obs = np.array(obs).astype('float32')    next_obs = np.array(next_obs).astype('float32')    # act = np.expand_dims(act, -1)    cost = optimize_model(obs, act, reward, next_obs,                          terminal, TARGET_MODEL, MODEL)  # 训练一次网络    return costdef optimize_model(obs, action, reward, next_obs, terminal, TARGET_MODEL, MODEL):    """    应用DQN算法更新self.model的value网络    """    # 从target_model中获取 max Q' 的值,用于计算target_Q    global E_GREED    global ACTION_DIM    global E_GREED_DECREMENT    global GAMMA    global LEARNING_RATE    global opt    opt = paddle.optimizer.Adam(learning_rate=LEARNING_RATE,                                parameters=MODEL.parameters())  # 优化器(动态图)    obs = paddle.to_tensor(obs)    next_obs = paddle.to_tensor(next_obs)    next_pred_value = TARGET_MODEL(next_obs).detach()    best_v = paddle.max(next_pred_value, axis=1)    target = reward + (1.0 - terminal) * GAMMA * best_v.numpy()    target = paddle.to_tensor(target)    pred_value = MODEL(obs)  # 获取Q预测值    # 将action转onehot向量,比方:3 => [0,0,0,1,0]    action = paddle.to_tensor(action.astype('int32'))    action_onehot = F.one_hot(action, ACTION_DIM)    action_onehot = paddle.cast(action_onehot, dtype='float32')    # 上面一行是逐元素相乘,拿到action对应的 Q(s,a)    pred_action_value = paddle.sum(paddle.multiply(action_onehot, pred_value), axis=1)    # 计算 Q(s,a) 与 target_Q的均方差,失去loss    cost = F.square_error_cost(pred_action_value, target)    cost = paddle.mean(cost)    avg_cost = cost    cost.backward()    opt.step()    opt.clear_grad()    return avg_cost.numpy()

5.模型梯度更新算法

def run_train(env, rpm, TARGET_MODEL, MODEL):    MODEL.train()    TARGET_MODEL.train()    total_reward = 0    obs = env.reset()    global global_step    while True:        global_step += 1        # 获取随机动作和执行游戏        action = sample(obs, MODEL)        next_obs, reward, isOver, info = env.step(action)        # 记录数据        rpm.append((obs, action, reward, next_obs, isOver))        # 在预热实现之后,每隔LEARN_FREQ步数就训练一次        if (len(rpm) > MEMORY_WARMUP_SIZE) and (global_step % LEARN_FREQ == 0):            (batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver) = rpm.sample(BATCH_SIZE)            train_loss = learn(batch_obs, batch_action, batch_reward,                               batch_next_obs, batch_isOver, TARGET_MODEL, MODEL)        total_reward += reward        obs = next_obs.astype('float32')        # 完结游戏        if isOver:            break    return total_rewarddef evaluate(model, env, render=False):    model.eval()    eval_reward = []    for i in range(5):        obs = env.reset()        episode_reward = 0        while True:            obs = np.expand_dims(obs, axis=0)            obs = paddle.to_tensor(obs, dtype='float32')            action = model(obs)            action = np.argmax(action.numpy())            obs, reward, done, _ = env.step(action)            episode_reward += reward            if render:                env.render()            if done:                break        eval_reward.append(episode_reward)    return np.mean(eval_reward)

6.训练函数与验证函数

设置超参数

LEARN_FREQ = 5  # 训练频率,不须要每一个step都learn,攒一些新增教训后再learn,提高效率MEMORY_SIZE = 20000  # replay memory的大小,越大越占用内存MEMORY_WARMUP_SIZE = 200  # replay_memory 里须要预存一些教训数据,再开启训练BATCH_SIZE = 32  # 每次给agent learn的数据数量,从replay memory随机里sample一批数据进去LEARNING_RATE = 0.001  # 学习率大小GAMMA = 0.99  # reward 的衰减因子,个别取 0.9 到 0.999 不等E_GREED = 0.1  # 摸索初始概率E_GREED_DECREMENT = 1e-6  # 在训练过程中,升高摸索的概率MAX_EPISODE = 20000  # 训练次数SAVE_MODEL_PATH = "models/save"  # 保留模型门路OBS_DIM = NoneACTION_DIM = Noneglobal_step = 0
def main():    global OBS_DIM    global ACTION_DIM    train_step_list = []    train_reward_list = []    evaluate_step_list = []    evaluate_reward_list = []    # 初始化游戏    env = gym.make('CartPole-v0')    # 图像输出形态和动作维度    action_dim = env.action_space.n    obs_dim = env.observation_space.shape    OBS_DIM = obs_dim    ACTION_DIM = action_dim    max_score = -int(1e4)    # 创立存储执行游戏的内存    rpm = ReplayMemory(MEMORY_SIZE)    MODEL = DQN(ACTION_DIM)    TARGET_MODEL = DQN(ACTION_DIM)    # if os.path.exists(os.path.dirname(SAVE_MODEL_PATH)):    #     MODEL_DICT = paddle.load(SAVE_MODEL_PATH+'.pdparams')    #     MODEL.load_dict(MODEL_DICT)  # 加载模型参数    print("filling memory...")    while len(rpm) < MEMORY_WARMUP_SIZE:        run_train(env, rpm, TARGET_MODEL, MODEL)    print("filling memory done")    # 开始训练    episode = 0    print("start training...")    # 训练max_episode个回合,test局部不计算入episode数量    while episode < MAX_EPISODE:        # train part        for i in range(0, int(50)):            # First we need a state            total_reward = run_train(env, rpm, TARGET_MODEL, MODEL)            episode += 1                # print("episode:{}    reward:{}".format(episode, str(total_reward)))        # test part        # print("start evaluation...")        eval_reward = evaluate(TARGET_MODEL, env)        print('episode:{}    e_greed:{}   test_reward:{}'.format(episode, E_GREED, eval_reward))        evaluate_step_list.append(episode)        evaluate_reward_list.append(eval_reward)        # if eval_reward > max_score or not os.path.exists(os.path.dirname(SAVE_MODEL_PATH)):        #     max_score = eval_reward        #     paddle.save(TARGET_MODEL.state_dict(), SAVE_MODEL_PATH+'.pdparams')  # 保留模型if __name__ == '__main__':    main()

filling memory...
filling memory done
start training...
episode:50 e_greed:0.0992949999999993 test_reward:9.0
episode:100 e_greed:0.0987909999999988 test_reward:9.8
episode:150 e_greed:0.09827199999999828 test_reward:10.0
episode:200 e_greed:0.09777599999999778 test_reward:8.8
episode:250 e_greed:0.09726999999999728 test_reward:9.0
episode:300 e_greed:0.09676199999999677 test_reward:10.0
episode:350 e_greed:0.0961919999999962 test_reward:14.8

我的项目链接fork一下即可运行

https://www.heywhale.com/mw/project/649e7d3f70567260f8f11d2b

更多优质内容请关注公号:汀丶人工智能