关于数据挖掘:python中使用马尔可夫决策过程MDP动态编程来解决最短路径强化学习问题

39次阅读

共计 8835 个字符,预计需要花费 23 分钟才能阅读完成。

原文链接:http://tecdat.cn/?p=11105

原文出处:拓端数据部落公众号

在强化学习中,咱们有趣味确定一种最大化获取处分的策略。假如环境是马尔可夫决策过程(MDP)的现实模型,咱们能够利用动静编程办法来解决强化学习问题。

在这篇文章中,我介绍了能够在 MDP 上下文中应用的三种动静编程算法。为了使这些概念更容易了解,我在网格世界的上下文中实现了算法,这是演示强化学习的风行示例。

在开始应用该应用程序之前,我想疾速提供网格世界上后续工作所需的实践背景。

MDP 的要害强化学习术语

以下各节解释了强化学习的要害术语,即:

  • 策略:  代理应在哪种状态下执行哪些操作
  • 状态值函数:  每个州对于将来处分的期望值
  • 口头价值函数:  在特定状态下针对将来处分执行特定口头的预期价值
  • 过渡概率:  从一种状态过渡到另一种状态的概率
  • 处分性能:  代理在状态之间转换时取得的处分

状态值函数

给定策略 ππ,状态值函数 Vπ(s)Vπ(s)将每个状态 ss 映射到代理在此状态下可取得的预期收益:

式中,stst 示意时刻 tt 的状态。参数 γ∈[0,1]γ∈[0,1]称为  折扣因子。它决定了将来处分的影响。

动作值函数

给定策略 ππ,动作值函数 Qπ(s,a)Qπ(s,a)确定在状态 ss 中执行动作 aa 时的预期处分:

转移概率

在状态 ss 中执行动作 aa 能够将代理转换为状态 s ’s’。通过 Pass’Pss’a 形容产生此过渡的可能性。

处分函数

处分函数 Rass’Rss’a 指定当代理通过动作 aa 从状态 ss 过渡到状态 s ’s’ 时取得的处分。

Gridworld 中的三种根本 MDP 算法的演示

在本文中,您将学习如何在网格世界中为 MDP 利用三种算法:

  1. 策略评估:  给定策略 ππ,与 ππ 相干的价值函数是什么?
  2. 策略迭代:  给定策略 ππ,咱们如何找到最佳策略 π∗π∗?
  3. 值迭代:  如何从头开始找到最佳策略 π∗π∗?

在 gridworld 中,代理的指标是达到网格中的指定地位。该代理能够向北,向东,向南或向西挪动。这些动作由汇合 {N,E,S,W} {N,E,S,W} 示意。请留神,代理始终晓得状态(即其在网格中的地位)。

 网格中存在一些壁,代理无奈通过这些壁。

根本的 Gridworld 施行

我曾经以面向对象的形式实现了 gridworld。以下各节形容了我如何设计地图和策略实体的代码。

 Gridworld 地图

为了实现 gridworld,我首先要做的是代表地图的类。我定义了以下格局来示意各个网格单元:

  • # 批示墙壁
  • X 表明指标
  • 空白示意空块

依附这些符号,结构  了上面的 map:

###################
#                X#
#   ###########   #
#   #         #   #
#   # ####### #   #
#   # #     # #   #
#     #  #  #     #
#        #        #
#                 #
###             ###
#                 #
###################

我实现了  MapParser,它生成一个  Map 对象。地图对象管制   对 gridworld 单元的拜访。单个单元格子类定义特定单元格的行为,例如空单元格,墙和指标单元格。能够应用其行和列索引来标识每个单元格。

通过此设置,能够不便地加载地图:

parser = MapParser()
gridMap = parser.parseMap("../data/map01.grid")

载入 

对于强化学习,咱们须要可能解决一个策略 π(s,a)π(s,a)。在 gridworld 中,每个状态 ss 代表代理的地位。这些动作将代理挪动到四个天文方向之一。咱们将应用以下符号将策略映射到地图上:

  • N 为动作 GO_NORTH
  • E 为口头 GO_EAST
  • S 为动作 GO_SOUTH
  • W 为口头 GO_WEST

未知符号被映射到  NONE 操作,以取得残缺的策略。

应用这些定义,我定义  了以下策略:

###################
#EESWWWWWWWWWWWWWX#
#EES###########SWN#
#EES#EEEEEEEES#SWN#
#EES#N#######S#SWN#
#EES#N#SSWWW#S#SWN#
#EEEEN#SS#NN#SWSWN#
#EENSSSSS#NNWWWWWN#
#EENWWWWEEEEEEEEEN#
###NNNNNNNNNNNNN###
#EENWWWWWWWWWWWWWW#
###################

请留神,策略文件保留了围墙和指标单元,以进步可读性。该政策的制订有两个指标:

  1. 代理应该可能达到目标。 对于未实现此属性的策略,策略评估将不会给出正当的后果,因为永远不会取得指标回报。
  2. 该策略应该不是最现实的。这意味着在某些状态下,业务代表没有采取最短的门路达到目标。这样的策略使咱们能够看到尝试改良初始策略的算法的成果。

为了加载该策略,我实现了一个  策略解析器,该解析器将策略存储为  策略对象。应用这些对象,咱们能够像这样加载初始策略:

policyParser = PolicyParser()
policy = policyParser.parsePolicy("../data/map01.policy")

策略对象具备用于建模 π(s,a)π(s,a)的性能:

def pi(self, cell, action):
    if len(self.policy) == 0:
        # no policy: try all actions (for value iteration)
        return 1

    if self.policyActionForCell(cell) == action:
        # policy allows this action
        return  1
    else:
        # policy forbids this action
        return 0

强化学习的筹备

为了筹备施行强化学习算法,咱们依然须要提供过渡和处分性能。

过渡函数

要定义转换函数 Pass’Pss’a,咱们首先须要辨别非法行为和法律行为。在 gridworld 中,有两种办法能够使动作不非法:

  1. 如果该动作会使代理脱离网格
  2. 如果该动作会使代理人陷入困境

这为咱们提供了转换函数的第一条规定:

1. When an action is illegal, the agent should remain in its previous state.

此外,咱们还必须要求:

2. When an action is illegal, transitions to states other than its previous state should be forbidden.

 当然,状态转换对于所选动作必须无效。因为每个动作仅将代理挪动一个地位,因而倡议状态 s ’s’ 必须在与状态 ss 相邻的单元格中具备代理:

3. Only allow transitions through actions that would lead the agent to an adjacent cell.

对于此规定,咱们假如有一个谓词 adj(s,s’)adj(s,s’)来批示主体从 ss 到 s ’s’ 的过渡是否波及相邻单元格。

最初,一旦达到目标状态 s ∗ s ∗,咱们就不心愿代理再次来到。为了阐明这一点,咱们引入了最终规定:

4. Don't transition from the goal cell.

基于这四个规定,咱们能够定义转换函数如下:

所提供的 Python 实现  getTransitionProbability 并不像数学公式那样明确:

def getTransitionProbability(self, oldState, newState, action, gridWorld):
    proposedCell = gridWorld.proposeMove(action)
    if proposedCell is None:
        # Rule 1 and 2: illegal move 
        return transitionProbabilityForIllegalMoves(oldState, newState)
    if oldState.isGoal():
        # Rule 4: stay at goal
        return 0
    if proposedCell != newState:
        # Rule 3: move not possible
        return 0
    else:
        # Rule 3: move possible
        return 1

请留神,它  proposeMove 模仿了操作的胜利执行,并返回了代理的新网格单元。

处分函数

在 gridworld 中,咱们想找到达到终端状态的最短门路。咱们要最大化取得的处分,因而指标状态 s ∗ s ∗的处分应高于其余状态的处分。对于 gridworld,咱们将应用以下简略函数:

 评估

策略评估算法的指标   是评估策略 π(s,a)π(s,a),即依据 V(s)∀sV(s)∀s 确定所有状态的值。该算法基于 Bellman 方程:

对于迭代 k + 1k + 1,该方程式通过以下公式得出状态 ss 的值:

  • π(s,a)π(s,a):在状态 ss 中抉择动作 aa 的概率
  • Pass’Pss’a:应用动作 aa 从状态 ss 过渡到状态 s ’s’ 的概率
  • Rass’Rss’a:应用动作 aa 从状态 ss 过渡到状态 s ’s’ 时的预期处分
  • γγ:贴现率
  • Vπk(s’)Vkπ(s’):在给定策略 ππ 的状况下,步骤 kk 中状态 s ’s’ 的值

为了更好地了解方程式,让咱们在 gridworld 的上下文中逐个思考:

  • π(s,a)π(s,a):因为咱们处于确定性环境中,因而策略仅指定一个动作 aa,其中 π(s,a)=1π(s,a)= 1,而所有其余动作 a ’a’ 具备 π(s,a’)=0π(s,a’)= 0。因而,乘以 π(s,a)π(s,a)只会抉择策略指定的操作。
  • ∑s′∑s′:该和是所有状态 s′s′的总和,能够从以后状态 ss 失去。在 gridworld 中,咱们只须要思考相邻像元和以后像元自身,即 s ’∈{x | adj(x,s)∨x= s}s’∈{x | adj(x,s)∨x= s}。
  • Pass’Pss’a:这是通过动作 aa 从状态 ss 过渡到 s ’s’ 的概率。
  • Rass’Rss’a:这是通过 aa 从 ss 过渡到 s ’s’ 的处分。请留神,在 gridworld 中,处分仅由下一个状态 s ’s’ 确定。
  • γγ:折现因子调节预期处分的影响。
  • Vk(s’)Vk(s’):在提议状态 s ’s’ 的预期处分。该术语的存在是政策评估是动静编程的起因:咱们正在应用先前计算的值来更新以后值。

咱们将应用 γ =1γ= 1,因为咱们处在一个情景 中,在达到目标状态时学习 进行。因而,值函数示意达到指标单元格的最短门路的长度。更精确地说,让 d(s,s ∗)d(s,s ∗)示意从状态 ss 到指标的最短门路。而后,对于 s≠s ∗ s≠s ∗,Vπ(s)=-d(s,s ∗)+1Vπ(s)=-d(s,s ∗)+ 1。

为了施行策略评估,咱们通常将对状态空间进行屡次扫描。每次扫描都须要前一次迭代中的值函数。新值和旧值函数之间的差别通常用作算法的终止条件:

def findConvergedCells(self, V\_old, V\_new, theta = 0.01):
    diff = abs(V\_old-V\_new)
    return np.where(diff < theta)\[0\]

该函数确定值函数差别小于 θθ 的网格单元的索引。当所有状态的值都收敛到稳固值时,咱们能够进行。因为状况并非总是如此(例如,如果策略指定状态的动作不会导致指标,或者过渡概率 / 处分配置不当),咱们还要指定最大迭代次数。

达到进行条件后,evaluatePolicy 返回最新的状态值函数:

def evaluatePolicy(self, gridWorld, gamma = 1):
    if len(self.policy) != len(gridWorld.getCells()):
        # sanity check whether policy matches dimension of gridWorld
        raise Exception("Policy dimension doesn't fit gridworld dimension.")
    maxIterations = 500
    V_old = None
    V_new = initValues(gridWorld) # sets value of 0 for viable cells
    iter = 0
    convergedCellIndices = np.zeros(0)
    while len(ConvergedCellIndices) != len(V_new):
        V\_old = V\_new
        iter += 1
        V\_new = self.evaluatePolicySweep(gridWorld, V\_old, gamma, convergedCellIndices)
        convergedCellIndices = self.findConvergedCells(V\_old, V\_new)
        if iter > maxIterations:
            break
    print("Policy evaluation converged after iteration:" + str(iter))
    return V_new

evaluatePolicySweep 性能执行一次策略评估扫描。该函数遍历网格中的所有单元并确定状态的新值.

请留神,该  ignoreCellIndices 参数示意后续扫描未更改值函数的像元索引。这些单元在进一步的迭代中将被疏忽以进步性能。这对于咱们的 gridworld 示例来说很好,因为咱们只是想找到最短的门路。因而,状态值函数第一次不变时,这是其最佳值。

应用该evaluatePolicyForState 函数计算状态值。该函数的外围实现了咱们先前探讨的 Bellman 方程。此函数的重要思维是,在计算状态 ss 的值函数时,咱们不想扫描所有状态 s ’s’。这就是为什么  状态生成器  仅生成可能理论产生的状态(即,转换概率大于零)的起因。

 评估后果

有了适当的实现后,咱们能够通过执行以下命令找到策略的状态值函数.

为了将值函数与策略一起绘制,咱们能够在将用于示意地图的一维数组转换为二维数组后,应用 matplotlib 中的 pyplot:

def drawValueFunction(V, gridWorld, policy):
    fig, ax = plt.subplots()
    im = ax.imshow(np.reshape(V, (-1, gridWorld.getWidth())))
    for cell in gridWorld.getCells():
        p = cell.getCoords()
        i = cell.getIndex()
        if not cell.isGoal():
            text = ax.text(p\[1\], p\[0\], str(policy\[i\]),
                       ha="center", va="center", color="w")
        if cell.isGoal():
            text = ax.text(p\[1\], p\[0\], "X",
                       ha="center", va="center", color="w")
    plt.show()

应用该函数,咱们能够可视化策略的状态值函数:

对于非指标单元,将应用策略指定的操作对图进行正文。X 标签上方示意右上方单元格中的指标。

其余单元格的值由色彩批示。最差的状态(具备最低的处分)以紫色显示,坏的状态以蓝色显示,蓝绿色的中间状态以绿色显示,良好的状态以绿色显示,十分好的状态(具备最高的处分)显示为黄色。

查看这些值,咱们能够看到后果与策略规定的操作相匹配。例如,间接位于指标西侧的状态的值非常低,因为该状态的动作(GO_WEST)会导致较长的弯路。位于指标正南方的单元格具备很高的价值,因为其作用(GO_NORTH)间接导致指标。

请留神,在当前的工作中,的性能  evaluatePolicy 至关重要,因为咱们会屡次调用它。对于计算的示例,该函数须要进行 61 次迭代,这在我的笔记本电脑上大概转换了半秒钟。请留神,对于更靠近最佳策略的策略,策略评估将须要较少的迭代,因为值将更快地流传。

可能确定状态值函数十分好 - 当初咱们能够量化所提议策略的长处了。然而,咱们尚未解决寻找最佳政策的问题。这就是策略迭代起作用的中央。

策略迭代

当初咱们曾经可能计算状态值函数,咱们应该可能  改良现有的策略。一种简略的策略是贪心算法,该算法遍历网格中的所有单元格,而后依据值函数抉择使预期处分最大化的操作。

 其定义为

 improvePolicy 函数确定策略的值函数,而后调用  findGreedyPolicy 以标识每种状态的最佳操作.

要做的  findGreedyPolicy 是思考每个单元并抉择使预期处分最大化的动作,从而结构输出策略的改良版本。例如,执行  improvePolicy 一次并从新评估策略后,咱们失去以下后果:

与原始值函数相比,指标旁边的所有单元格当初都给了咱们很高的回报,因为操作已失去优化。然而,咱们能够看到这些改良仅仅是部分的。那么,咱们如何取得最优政策呢?

策略迭代算法的思维   是,咱们能够通过 迭代评估新策略的状态值函数来找到最优策略,并应用贪婪算法对该策略进行改良,直到达到最优:

def policyIteration(policy, gridWorld):
    lastPolicy = copy.deepcopy(policy)
    lastPolicy.resetValues() # reset values to force re-evaluation of policy
    improvedPolicy = None
    while True:
        improvedPolicy = improvePolicy(lastPolicy, gridWorld)
        if improvedPolicy == lastPolicy:
            break
        improvedPolicy.resetValues() # to force re-evaluation of values on next run
        lastPolicy = improvedPolicy
    return(improvedPolicy)

策略迭代的后果

在 gridworld 上运行该算法能够在 20 次迭代中找到最佳解决方案 - 在我的笔记本上大概须要 4.5 秒。20 次迭代后的终止并不令人诧异:gridworld 贴图的宽度为 19。因而,咱们须要进行 19 次迭代能力优化程度走廊的值。而后,咱们须要进行一次额定的迭代来确定该算法能够终止,因为该策略未更改。

了解策略迭代的一个很好的工具是可视化每个迭代:

下图显示了应用策略迭代结构的最优值函数:

目视查看表明值函数正确,因为它为网格中的每个单元格抉择了最短门路。

价值迭代

借助咱们迄今为止摸索的工具,呈现了一个新问题:为什么咱们基本须要思考初始策略?价值迭代算法的思维   是咱们能够在没有策略的状况下计算价值函数。与其让政策 ππ 批示抉择了哪些操作,咱们不抉择那些使预期处分最大化的操作:

因为价值迭代的计算与策略评估十分类似,所以我曾经实现了将价值迭代evaluatePolicyForState 用于我先前定义的办法中的性能。

只有没有可用的策略,此函数就会执行值迭代算法。在这种状况下,len(self.policy) 将为零,从而  pi 始终返回一个值,并且  V 被确定为所有动作的预期处分的最大值。

因而,要实现值迭代,咱们不用做很多编码。咱们只须要evaluatePolicySweep 在Policy 对象的值函数未知的状况下迭代调用该  函数,直到该过程为咱们提供最佳后果为止。而后,要确定相应的策略,咱们只需调用findGreedyPolicy 咱们先前定义的  函数.

价值迭代的后果

当执行值迭代时,处分(高:黄色,低:光明)从指标的最终状态(右上方  X)扩大到其余状态:

摘要

咱们曾经看到了如何在 MDP 中利用强化学习。咱们的工作假如是咱们对环境有全面的理解,并且代理齐全理解环境。基于此,咱们可能促成动静编程来解决三个问题。首先,咱们应用策略评估来确定给定策略的状态值函数。接下来,咱们利用策略迭代算法来优化现有策略。第三,咱们利用价值迭代从头开始寻找最佳策略。

正文完
 0