关于python:Python入门Python做游戏跳跃小鸟

4次阅读

共计 12926 个字符,预计需要花费 33 分钟才能阅读完成。

如何运行

  • 装置依赖
pip install keras
pip install pygame
pip install scikit-image
pip install h5py

作者应用的是 theano 训练的,训练好的模型文件要应用 theano 作为 Keras 的后端能力调用,在配置文件 ~/.keras/keras.json 中(没有可创立)确认 / 批改 backendtheano(如果没有装置 tensorflow[Keras 的另一可选后端]如同就不必管了),配置文件款式下文中卷积神经网络大节的补充里有。
要应用 theano,咱们还须要装置 OpenBLAS。间接下载源码并解压,cd 进目录,而后

sudo apt-get install gfortran
make FC=gfortran  
sudo make PREFIX=/usr/local install  
  • 下载源码并运行
git clone https://github.com/yanpanlau/Keras-FlappyBird.git
cd Keras-FlappyBird
python qlearn.py -m "Run"

我下载时目录中 game/wrapped\_flappy\_bird.py 文件的第 144 行,FPSCLOCK.tick(FPS)语句处缩进有点问题,删去现有缩进,打 8 个空格就好了,没问题就不必管了。

  • vmware 虚拟机 Ubuntu16.04+python3+ 只应用 CPU+theano 运行:

  • 如果想从新训练神经网络,删除 model.h5 文件而后运行命令qlearn.py -m "Train"

源码剖析

  • 游戏输出及返回图像
import wrapped_flappy_bird as game
x_t1_colored, r_t, terminal = game_state.frame_step(a_t)

间接应用 flappybird python 版本的接口。
输出为 a\_t(1, 0) 代表不跳,(0,1)代表跳)。
返回值为下一帧图像 x\_t1\_colored 和处分 reward+0.1 示意存活,+1示意通过管道,-1示意死亡),处分被管制在 [-1,+1] 来进步稳定性。terminal 是一个布尔值示意游戏是否完结。
处分函数在 game/wrapped_flappy_bird.py 中的
def frame_step(self, input_actions)办法中批改。

为什么间接将游戏图像输出解决呢?我一开始没转过弯,其实图像中蕴含了全副的信息(声音信息在少数游戏里只是辅助,不影响游戏),而人在玩游戏时也是承受输出的图像信息,而后决策输入相应的操作指令。这里其实就是在模仿人的反馈过程,将这一过程形容为一个非线性函数,而该非线性函数咱们将应用卷积神经网络来表白,大体上卷积实现了对图像特色的提取,神经网络实现了从特色到操作指令的转换。

  • 图像预处理


因素:1.  将图片转换为灰阶
2.  裁剪图片尺寸到 80x80 像素
3.  每次沉积 4 帧,一起馈入神经网络。相当于一次输出一张 '四通道' 的图像。(为什么要将 4 帧堆在一起?这是一种办法,为了让模型能推断出小鸟的速度信息。)
x_t1 = skimage.color.rgb2gray(x_t1_colored)
x_t1 = skimage.transform.resize(x_t1,(80,80))
x_t1 = skimage.exposure.rescale_intensity(x_t1, out_range=(0, 255)) # 调整亮度
#
x_t1 = x_t1.reshape(1, 1, x_t1.shape[0], x_t1.shape[1])
s_t1 = np.append(x_t1, s_t[:, :3, :, :], axis=1)
# axis=1 意味着在第二维上增加

x\_t1是一个 (1x1x80x80) 的单帧,s\_t1 是 4 帧的叠加,形态为 (1x4x80x80)。输出设计为 (1x4x80x80) 而不是 (4x80x80) 是为了 Keras 思考。

补充

rescale\_intensity

  • 卷积神经网络
    当初,将预处理后的图像输出神经网络。
def buildmodel():
    print("开始建模")
    model = Sequential()
    model.add(Convolution2D(32, 8, 8, subsample=(4,4),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th', input_shape=(img_channels,img_rows,img_cols)))
    model.add(Activation('relu'))
    model.add(Convolution2D(64, 4, 4, subsample=(2,2),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th'))
    model.add(Activation('relu'))
    model.add(Convolution2D(64, 3, 3, subsample=(1,1),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th'))
    model.add(Activation('relu'))
    model.add(Flatten())
    model.add(Dense(512, init=lambda shape, name: normal(shape, scale=0.01, name=name)))
    model.add(Activation('relu'))
    model.add(Dense(2,init=lambda shape, name: normal(shape, scale=0.01, name=name)))
   
    adam = Adam(lr=1e-6)
    model.compile(loss='mse',optimizer=adam)
    print("建模实现")
    return model

Convolution2D(
nb\_filter, # 过滤器个数
nb\_row, # 过滤器的行数
nb\_col, # 过滤器的列数
init=’glorot\_uniform’, # 层权重 weights 的初始化函数
activation=’linear’, # 默认激活函数为线性,即 a(x) = x
weights=None,
border\_mode=’valid’,
\# 默认 ’valid'(不补零,个别状况)
\# 或者 ’same'(主动补零,使得输入尺寸在过滤窗口步幅为 1 的状况下与输出尺寸雷同,
\# 即输入尺寸 = 输出尺寸 / 步幅)
subsample=(1, 1), # 代表向左和向下的过滤窗口挪动步幅
dim\_ordering=’default’, # ‘default’ 或 ’tf’ 或 ’th’
W\_regularizer=None,
b\_regularizer=None,
activity\_regularizer=None,
W\_constraint=None,
b\_constraint=None,
bias=True
\# 未正文的个别用默认值
)

该函数是二维输出的滤波窗口的卷积函数。当应用它作为模型的第一层时,须要提供 `input_shape` 关键字,如输出为 128x128 RGB 3 通道图像,则 `input_shape=(3, 128, 128)`。`dim_ordering` 的默认值在 `~/.keras/keras.json` 文件中,若没有能够创立(个别运行过一次 keras 就有),格局为

{
“image\_dim\_ordering”: “tf”,
“epsilon”: 1e-07,
“floatx”: “float32”,
“backend”: “tensorflow”
}

依据 `image_dim_ordering` 和 `backend` 抉择应用 `theano` 即 `th` 或者 `tensorflow` 即 `tf`。- 确切构造如下:输出为 4x80x80 的图像矩阵。第一层卷积层,有 32 个卷积核(过滤器),每个卷积核的尺寸是 8x8,x 轴和 y 轴的步幅都是 4,补零,并应用了一个 ReLU 激活函数。第二层卷积层,有 64 个卷积核(过滤器),每个卷积核的尺寸是 4x4,x 轴和 y 轴的步幅都是 2,补零,并应用了一个 ReLU 激活函数。第三层卷积层,有 64 个卷积核(过滤器),每个卷积核的尺寸是 3x3,x 轴和 y 轴的步幅都是 1,补零,并应用了一个 ReLU 激活函数。而后将它们展平为一维输出暗藏层。该暗藏层有 512 个神经单元,全连贯到第三层卷积层的输入,并应用 ReLU 激活函数。最初的输入层是一个全连贯线性层,输入动作对应的 Q 值列表。一般来说,索引 0 代表什么也不做;在这个游戏里索引 1 代表跳一下。比拟两者的 Q 值大小,抉择大的作为下一步操作。> 相干知识点请看:
[卷积神经网络 CNN 基本概念笔记](http://www.jianshu.com/p/606a33ba04ff)
[CS231n Convolutional Neural Networks](http://cs231n.github.io/convolutional-networks/)
每层输入计算公式:(W-F+2P)/S+1。W: 输出尺寸大小; S: 步幅; F: 卷积核尺寸; P: 补零数。在这个利用的设置中,输入计算能够简化为 W /S。![结构图](http://upload-images.jianshu.io/upload_images/2422746-ca7a2f32e0f2e692.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
> 补充:参看[画出卷积神经网络结构图](http://www.jianshu.com/p/56a05b5e4f20)

- 小贴士
keras 使得构建卷积神经网络异样简略。然而有一些须要留神的中央。A. 抉择一个良好的初始化办法是重要的,这里抉择了 σ =0.01 的正态分布。(`lambad x,y : f(x,y)` 是一个匿名函数)
`init=lambda shape, name: normal(shape, scale=0.01, name=name)`
B. 维度的程序很重要。应用 Theano 的话就是 4x80x80,Tensorflow 的话输出就是 80x80x4。通过 Convolution2D 函数的 **dim_ordering** 参数设置,这里应用的是 theano。C. 此次应用了一个叫亚当的自适应优化算法。学习速率为 **1-e6**。D. 对于梯度降落优化算法[An overview of gradient descent optimization algorithms](http://sebastianruder.com/optimizing-gradient-descent/)。E. `Convolution2D` 函数中的参数 `border_mode` 的模式也须要留神,这里抉择了补零操作,使得图像边缘的像素点也受到过滤操作,转化了所有的图像信息。- DQN
上面就是使用 Q -learning 算法来训练神经网络了。在 Q -learning 中最重要的就是 Q 函数了:**Q(s, a)** 代表了当咱们在状态 s 执行 a 动作的最大贴现处分。**Q(s, a)** 给你一个对于在 s 状态抉择 a 动作是有多好的考量。Q 函数就像玩游戏的秘籍,在你须要决定在状态 s 下该抉择动作 a 还是 b 时,只须要挑高 Q 值的动作就行了。** 最大贴现处分 ** 既反映了在 s 状态下做出动作 a 失去状态 s'的即时反馈处分(存活 +0.1,通过管道 +1),也反映了在状态 s' 下持续游戏可能失去的最佳处分(不论什么输出)——其实就是 s'下所有动作的 ** 最大贴现处分中 ** 的最大的一个(即 `max[Q(s', a) | 所有可能的动作 a]`),但第二个处分要乘以一个折扣系数,因为它是将来的处分,要贴现,得打点折扣。实际上 Q 函数是一个实践假如存在的函数,从下面的表述中咱们能够看出 Q 函数能够表白为一种递归的模式,咱们能够通过迭代来获取它,这与神经网络的训练过程不约而同。而咱们就是要用卷积神经网络来实现 Q 函数,实际上是 (Q,a) = f(s) 函数,由一个状态返回在该状态下的所有可能输出与相应 Q 值形成的二值对列表,只是输出动作以不同的索引示意。这样,咱们省去了对巨多的状态 s 的分析判断和简单程序编写,只有以某种形式初始化 Q 值表,而后依据 Q 值表训练调整神经网络的权重,再依据训练后的神经网络预测来更新 Q 值表,如此重复迭代来迫近 Q 函数。
        # 抽取小批量样本进行训练
        minibatch = random.sample(D, BATCH)
        # inputs 和 targets 一起形成了 Q 值表
        inputs = np.zeros((BATCH, s_t.shape[1], s_t.shape[2], s_t.shape[3]))   #32, 80, 80, 4
        targets = np.zeros((inputs.shape[0], ACTIONS))                         #32, 2

        # 开始教训回放
        for i in range(0, len(minibatch)):
            # 以下序号对应 D 的存储程序将信息全副取出,# D.append((s_t, action_index, r_t, s_t1, terminal))
            state_t = minibatch[i][0]    # 以后状态
            action_t = minibatch[i][1]   # 输出动作
            reward_t = minibatch[i][2]   # 返回处分
            state_t1 = minibatch[i][3]   # 返回的下一状态
            terminal = minibatch[i][4]   # 返回的是否终止的标记

            inputs[i:i + 1] = state_t    # 保留以后状态,即 Q(s,a)中的 s

            # 失去预测的以输出动作 x 为索引的 Q 值列表
            targets[i] = model.predict(state_t)  
            # 失去下一状态下预测的以输出动作 x 为索引的 Q 值列表
            Q_sa = model.predict(state_t1)

            if terminal:  # 如果动作执行后游戏终止了,该状态下 (s) 该动作 (a) 的 Q 值就相当于处分
                targets[i, action_t] = reward_t
            else:         # 否则,该状态 (s) 下该动作 (a) 的 Q 值就相当于动作执行后的即时处分和下一状态下的最佳预期处分乘以一个折扣率
                targets[i, action_t] = reward_t + GAMMA * np.max(Q_sa)

        # 用生成的 Q 值表训练神经网络,同时返回以后的误差
        loss += model.train_on_batch(inputs, targets)
- 经验重播
  在下面的代码中,有一个教训重播(Experience Replay)。实际上,应用像神经网络这样的非线性函数来对 Q 值近似是十分不稳固的。解决这个问题最重要的窍门就是教训重播了。在游戏中的所有阶段性情景都被放在回放存储器 **D** 中。(这里应用了 python 的 deque 构造来存储)。当训练神经网络时,从 **D** 中随机小批量抽取情景,而不是应用最近的,这将大大提高零碎的稳定性。- 勘探与开发
这是加强学习的另一个问题:是持续勘探新的资源,还是专一开发现有的资源。也就是说在 利用开发现有的已知低劣的决策 和 持续摸索新的可能更好的行为 之间如何抉择和调配工夫。在现实生活中咱们也常常遇到这类问题:抉择一家尝试过的合口的饭店还是试试新的——有可能更好,也可能基本吃不下。在加强学习中,为了最大化将来收益,它必须在两者之间获得均衡。一个风行的解决方案叫做 ϵ(epsilon)贪婪办法。ϵ 是一个 0 到 1 之间的变量,依据它能够失去哪些工夫该去摸索一个随机的新动作。
        if random.random() <= epsilon:
            print("----------Random Action----------")
            action_index = random.randrange(ACTIONS)
            a_t[action_index] = 1
        else:
            q = model.predict(s_t)       #输出四幅图像的组合,预测后果
            max_Q = np.argmax(q)
            action_index = max_Q
            a_t[max_Q] = 1

### 能够改良的工作
1.  当初的 DQN 基于大量的教训回放,是否有可能代替它或者删去它。2. 如何决定最佳的卷积神经网络。3. 训练很慢,如何减速它或者使它收敛更快。4. 卷积神经网络到底学到了什么,它的学习成绩是否是可迁徙的。### 其它资源
- [Demystifying Deep Reinforcement Learning](https://www.nervanasys.com/demystifying-deep-reinforcement-learning/)
- [深度强化学习揭秘(下面英文文章的国人翻译)](http://chuansong.me/n/335655551424)
- [Keras plays catch, a single file Reinforcement Learning example](http://edersantana.github.io/articles/keras_rl/)
- [Human-level control through deep reinforcement learning](http://www.nature.com/nature/journal/v518/n7540/full/nature14236.html)
- [上文 (Human-level) 的 ppt 讲稿](http://ir.hit.edu.cn/~jguo/docs/notes/dqn-atari.pdf)
- [Configuring Theano For High Performance Deep Learning](http://www.johnwittenauer.net/configuring-theano-for-high-performance-deep-learning/)

### 代码正文

!/usr/bin/env python

from future import print\_function

import argparse
import skimage as skimage
from skimage import transform, color, exposure
from skimage.transform import rotate
from skimage.viewer import ImageViewer
import sys
sys.path.append(“game/”)
import wrapped\_flappy\_bird as game
import random
import numpy as np
from collections import deque

import json
from keras import initializations
from keras.initializations import normal, identity
from keras.models import model\_from\_json
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.layers.convolutional import Convolution2D, MaxPooling2D
from keras.optimizers import SGD , Adam

GAME = ‘bird’ # 游戏名
CONFIG = ‘nothreshold’
ACTIONS = 2 # 无效动作数:不动 + 跳 = 2 个
GAMMA = 0.99 # 折扣系数,将来的处分转化为当初的要乘的一个系数
OBSERVATION = 3200. # 训练之前察看多少步
EXPLORE = 3000000. # epsilon 衰减的总步数
FINAL\_EPSILON = 0.0001 # epsilon 的最小值
INITIAL\_EPSILON = 0.1 # epsilon 的初始值,epsilon 逐步减小
REPLAY\_MEMORY = 50000 # 记住的情景 (状态 s 到状态 s ’ 的所有信息) 数
BATCH = 32 # 选取的小批量训练样本数

一帧一个输出动作

FRAME\_PER\_ACTION = 1

预处理后的图片尺寸

img\_rows , img\_cols = 80, 80

每次重叠 4 帧灰阶图像,相当于 4 通道

img\_channels = 4

构建神经网络模型

def buildmodel():
print(“Now we build the model”)
\# 以下正文见文中
model = Sequential()
model.add(Convolution2D(32, 8, 8, subsample=(4,4),init=lambda shape, name: normal(shape, scale=0.01, name=name), border\_mode=’same’,input\_shape=(img\_channels,img\_rows,img\_cols)))
model.add(Activation(‘relu’))
model.add(Convolution2D(64, 4, 4, subsample=(2,2),init=lambda shape, name: normal(shape, scale=0.01, name=name), border\_mode=’same’))
model.add(Activation(‘relu’))
model.add(Convolution2D(64, 3, 3, subsample=(1,1),init=lambda shape, name: normal(shape, scale=0.01, name=name), border\_mode=’same’))
model.add(Activation(‘relu’))
model.add(Flatten())
model.add(Dense(512, init=lambda shape, name: normal(shape, scale=0.01, name=name)))
model.add(Activation(‘relu’))
model.add(Dense(2,init=lambda shape, name: normal(shape, scale=0.01, name=name)))

adam = Adam(lr=1e-6)
model.compile(loss='mse',optimizer=adam) # 应用损失函数为均方误差,优化器为 Adam。print("We finish building the model")
return model

def trainNetwork(model,args):
\# 失去一个游戏模拟器
game\_state = game.GameState()

# 保留之前的察看到回放存储器 D
D = deque()

# 什么也不做来失去第一个状态而后预处理图片为 80x80x4 格局
do_nothing = np.zeros(ACTIONS)
do_nothing[0] = 1  # do_nothing 为 array([1,0])
x_t, r_0, terminal = game_state.frame_step(do_nothing)

x_t = skimage.color.rgb2gray(x_t)
x_t = skimage.transform.resize(x_t,(80,80))
x_t = skimage.exposure.rescale_intensity(x_t,out_range=(0,255))
# 初始化时,重叠 4 张图都为初始的同 1 张
s_t = np.stack((x_t, x_t, x_t, x_t), axis=0)  # s_t 为四张图的重叠

# 为了在 Keras 中应用,咱们须要调整数组形态,在头部减少一个维度
s_t = s_t.reshape(1, s_t.shape[0], s_t.shape[1], s_t.shape[2])

if args['mode'] == 'Run':
    OBSERVE = 999999999    # 咱们始终察看,而不训练
    epsilon = FINAL_EPSILON
    print ("Now we load weight")
    model.load_weights("model.h5")
    adam = Adam(lr=1e-6)
    model.compile(loss='mse',optimizer=adam)
    print ("Weight load successfully")    
else:                      # 否则咱们在察看一段时间之后开始训练
    OBSERVE = OBSERVATION
    epsilon = INITIAL_EPSILON

t = 0 # t 为总帧数
while (True):
    # 每次循环从新初始化的值
    loss = 0
    Q_sa = 0
    action_index = 0
    r_t = 0
    a_t = np.zeros([ACTIONS])
    # 通过 epsilon 贪婪算法抉择行为
    if t % FRAME_PER_ACTION == 0:
        if random.random() <= epsilon:
            print("----------Random Action----------")
            action_index = random.randrange(ACTIONS) # 随机选取一个动作
            a_t[action_index] = 1        # 生成相应的规范化动作输出参数
        else:
            q = model.predict(s_t)       # 输出以后状态失去预测的 Q 值
            max_Q = np.argmax(q)         # 返回数组中最大值的索引
            # numpy.argmax(a, axis=None, out=None)
            # Returns the indices of the maximum values along an axis.
            action_index = max_Q         # 索引 0 代表啥也不做,索引 1 代表跳一下
            a_t[max_Q] = 1               # 生成相应的规范化动作输出参数

    # 在开始训练之后并且 epsilon 小于肯定值之前,咱们逐渐减小 epsilon
    if epsilon > FINAL_EPSILON and t > OBSERVE:
        epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE

    # 执行选定的动作,并察看返回的下一状态和处分
    x_t1_colored, r_t, terminal = game_state.frame_step(a_t)
    # 将图像处理为灰阶,调整尺寸、亮度
    x_t1 = skimage.color.rgb2gray(x_t1_colored)
    x_t1 = skimage.transform.resize(x_t1,(80,80))
    x_t1 = skimage.exposure.rescale_intensity(x_t1, out_range=(0, 255))
    # 调整图像数组形态,减少头两维到 4 维
    x_t1 = x_t1.reshape(1, 1, x_t1.shape[0], x_t1.shape[1])
    # 将 s_t 的前三帧增加在新帧的前面,新帧的索引为 0,造成最初的 4 帧图像
    s_t1 = np.append(x_t1, s_t[:, :3, :, :], axis=1)


    # 存储状态转移到回放存储器
    D.append((s_t, action_index, r_t, s_t1, terminal))
    if len(D) > REPLAY_MEMORY:
        D.popleft()

    # 如果察看实现,则
    if t > OBSERVE:
        # 抽取小批量样本进行训练
        minibatch = random.sample(D, BATCH)
        # inputs 和 targets 一起形成了 Q 值表
        inputs = np.zeros((BATCH, s_t.shape[1], s_t.shape[2], s_t.shape[3]))   #32, 80, 80, 4
        targets = np.zeros((inputs.shape[0], ACTIONS))                         #32, 2

        # 开始教训回放
        for i in range(0, len(minibatch)):
            # 以下序号对应 D 的存储程序将信息全副取出,# D.append((s_t, action_index, r_t, s_t1, terminal))
            state_t = minibatch[i][0]    # 以后状态
            action_t = minibatch[i][1]   # 输出动作
            reward_t = minibatch[i][2]   # 返回处分
            state_t1 = minibatch[i][3]   # 返回的下一状态
            terminal = minibatch[i][4]   # 返回的是否终止的标记

            inputs[i:i + 1] = state_t    # 保留以后状态,即 Q(s,a)中的 s

            # 失去预测的以输出动作 x 为索引的 Q 值列表
            targets[i] = model.predict(state_t)  
            # 失去下一状态下预测的以输出动作 x 为索引的 Q 值列表
            Q_sa = model.predict(state_t1)

            if terminal:  # 如果动作执行后游戏终止了,该状态下 (s) 该动作 (a) 的 Q 值就相当于处分
                targets[i, action_t] = reward_t
            else:         # 否则,该状态 (s) 下该动作 (a) 的 Q 值就相当于动作执行后的即时处分和下一状态下的最佳预期处分乘以一个折扣率
                targets[i, action_t] = reward_t + GAMMA * np.max(Q_sa)

        # 用生成的 Q 值表训练神经网络,同时返回以后的误差
        loss += model.train_on_batch(inputs, targets)

    s_t = s_t1 # 下一状态变为以后状态
    t = t + 1  # 总帧数 +1

    # 每 100 次迭代存储下以后的训练模型
    if t % 100 == 0:
        print("Now we save model")
        model.save_weights("model.h5", overwrite=True)
        with open("model.json", "w") as outfile:
            json.dump(model.to_json(), outfile)

    # 输入信息
    state = ""
    if t <= OBSERVE:
        state = "observe"
    elif t > OBSERVE and t <= OBSERVE + EXPLORE:
        state = "explore"
    else:
        state = "train"

    print("TIMESTEP", t, "/ STATE", state, \
        "/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \
        "/ Q_MAX" , np.max(Q_sa), "/ Loss", loss)

print("Episode finished!")
print("************************")

def playGame(args):
model = buildmodel() # 先建设模型
trainNetwork(model,args) # 开始训练

def main():
parser = argparse.ArgumentParser(description=’Description of your program’)
parser.add\_argument(‘-m’,’–mode’, help=’Train / Run’, required=True) #承受参数 mode
args = vars(parser.parse\_args()) # args 是字典,’mode’ 是键
playGame(args) # 开始游戏

if name == “main“:
main() #执行本脚本时以 main 函数开始

正文完
 0