乐趣区

关于人工智能:强化学习落地竞态场景下基于锁机制的闲置端口查用

本文首发于:行者 AI

在强化学习的游戏畛域落地中,咱们常把逻辑简单的实在游戏当做一个黑盒子,应用网络通信与其数据交互达到训练的目标。为了训练效率,在 Python 中常应用多过程表演多个 Actor 的角色来与游戏黑盒子进行数据交互;因而在这样的环境下,极大可能呈现多过程争抢同一通信端口的景象(竞态场景),容易导致过程报错。

本文依据以上背景,首先给出一个惯例的解决办法,接着剖析惯例方法的局限性,而后介绍基于锁机制的解决办法,最初给出理论 python 例程。本文次要分为以下三个局部:

(1)惯例解决办法

(2)基于锁机制的闲置端口查用

(3)python 代码实现

1. 惯例解决办法

针对上述问题,有一种最容易想到的解决办法:每次过程调用端口时都查看端口号是否曾经被应用了,如果是那就跳过持续搜查下一个端口,直到找到一个未应用的端口号将它返回给调用者。咱们简略用 python 实现这一过程:

import socket 

def get_free_port(ip, init_port): 
    while True: 
        try:
            sock = socket.socket()
            sock.bind((ip, init_port))
            ip, port = sock.getnameinfo()
            sock.close()
            return port
        except Exception:
            init_port += 1
            continue

对以上思路稍加剖析,能够将之大抵拆分为 3 个步骤:

(1)应用 sock.bind() 函数去主动绑定端口,以此判断端口是否可用;

(2)若能够绑定证实该端口可用,则应用 sock.close() 开释掉该端口;若不可绑定则证实该端口已被应用,那么端口号加 1 持续尝试绑定直到胜利而后同样应用 sock.close() 开释掉该端口为止;

(3)把开释掉的端口号返回给须要调用的程序。

上述思路看上去能工作,然而其实远远不够。采纳这个思路只能让端口在十分短的工夫内不被绑定,难以满足在竞态场景中应用。在竞态环境中,随着过程数的增多,大概率会产生以下状况:


上图所示,同一个端口号 X 被过程 A 和过程 B 查用。对于过程 B,查用进度比过程 A 稍慢,在过程 A 将端口 X 开释并筹备返回程序进行绑定时,过程 B 绑定了端口 X 进行查看,此时过程 A 将无奈再绑定端口 X 导致报错。

因而能够看出,竞态场景中如何返回一个平安的端口号并不简略,即便利用随机种子随机初始端口号也不能彻底防止这种状况;咱们须要一种机制保障一个未绑定的端口号不能被其它过程任意绑定。

2. 基于锁机制的闲置端口查用

应用 lock file 可能保障即便端口号未绑定,在未拿到锁之前也不会被其余过程绑定。利用 fasteners.process_lock.InterProcessLock 对端口加锁,具体思路如下:

(1)sock.bind()主动绑定检测端口是否可用;

(2)若端口可用则用 InterProcessLock 对端口加锁;否则持续查看,直到检测到可用端口后加锁;

(3)解除之前的端口绑定;

(4)平安返回这个端口号;

(5)对该端口解锁;

流程如下:


回顾上述过程,在端口 X 被绑定后的每一个步骤之间,端口都是平安的。

3. python 代码实现

首先定义两个类 class BindFreePort()class FreePort(),前者用于搜寻并检测可用端口;后者输出前者的检测后果,应用第 2 章所述的机制将端口平安的输入给须要的程序。

import os
import fasteners
import threading


class BindFreePort(object):
    def __init__(self, start, stop):
        self.port = None

        import random, socket

        self.sock = socket.socket()

        while True:
            port = random.randint(start, stop)
            try:
                self.sock.bind(('127.0.0.1', port))
                self.port = port
                import time
                time.sleep(0.01)
                break
            except Exception:
                continue

    def release(self):
        assert self.port is not None
        self.sock.close()
    

class FreePort(object):
    used_ports = set()

    def __init__(self, start=4000, stop=6000):
        self.lock = None
        self.bind = None
        self.port = None

        from fasteners.process_lock import InterProcessLock
        import time
        pid = os.getpid()

        while True:
            bind = BindFreePort(start, stop)

            print(f'{pid} got port : {bind.port}')

            if bind.port in self.used_ports:
                print(f'{pid} will release port : {bind.port}')
                bind.release()
                continue

            '''Since we cannot be certain the user will bind the port'immediately' (actually it is not possible using
            this flow. We must ensure that the port will not be reacquired even it is not bound to anything
            '''lock = InterProcessLock(path='/tmp/socialdna/port_{}_lock'.format(bind.port))
            success = lock.acquire(blocking=False)

            if success:
                self.lock = lock
                self.port = bind.port
                self.used_ports.add(bind.port)
                bind.release()
                break

            bind.release()
            time.sleep(0.01)

    def release(self):
        assert self.lock is not None
        assert self.port is not None
        self.used_ports.remove(self.port)
        self.lock.release()

而后,基于这两个类的性能,咱们简略测试一下:

def get_and_bind_freeport(*args):
    freeport = FreePort(start=4000, stop=4009)
    import time
    time.sleep(0.5)
    return freeport.port

def test():
    from multiprocessing.pool import Pool
    jobs = 10
    p = Pool(jobs)
    ports = p.map(get_and_bind_freeport, range(jobs))
    print(f'[ports]: {ports}')
    assert len(ports) == len(set(ports))
    p.close()

if __name__ == '__main__':
    test()

上述代码中,咱们构建了函数 get_and_bind_freeport() 依照第 2 章所述机制返回一个端口,用 time.sleep(0.5) 模仿过程内的工夫扰动,其中端口的搜寻范畴是 4000~4009;函数test() 从过程池中启动 10 个过程,每个过程映射一个函数 get_and_bind_freeport()4000~4009中搜寻一个端口号并将之平安返回。

如果整个过程中端口号是平安的,那么返回后果该当是 len(ports) == len(set(ports)) 即 10 个端口别离被 10 个过程查用,不存在多个过程返回同一端口号的状况。

4. 总结

本文比照了在事实游戏的强化学习训练中,解决多过程竞争通信端口这一景象的两种办法;通过原理剖析比照以及理论入手试验,咱们得出结论:在竞态场景中查用通信端口号时,比照惯例思路基于锁机制的端口查用可能更平安地实用于这个场景。同时也揭示咱们,在游戏畛域的强化学习落地过程中会面临诸多事实问题,只有在理论工程中一直入手实操一直总结,能力达成指标。

5. 参考

[1] Fasteners

[2] Synchronization between processes

退出移动版