nginx、uvicorn、gunicorn 这些 HTTP sever 都是 master-slave 架构

你好奇他们是怎么实现的吗?

如果你是也是一个好奇宝宝,就接着往下看吧

多过程

参考:python 过程池的两种不同实现

应用 multiprocessing.Pool 的实现

import osimport socketimport sysimport timeimport threadingfrom loguru import loggerfrom concurrent.futures import ThreadPoolExecutorfrom concurrent.futures._base import Futureimport multiprocessingdefault_encoding: str = 'utf-8'pool = ThreadPoolExecutor(    max_workers=20,    thread_name_prefix='simple-work-thread-pool')def init_serversocket() -> socket.socket:    serversocket = socket.socket(        family=socket.AF_INET,        type=socket.SOCK_STREAM    )    # 获取本地主机名    host = socket.gethostname()    logger.debug(f'host {host}')    port = 6001    # 绑定端口号    serversocket.bind(('0.0.0.0', port))    # 设置最大连接数,超过后排队    serversocket.listen(2048)    return serversocketdef send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:    send_len: int = clientsocket.send(response_body)    clientsocket.close()    return send_lendef start_request(clientsocket: socket.socket, addr: tuple) -> int:    try:        pid = os.getpid()        logger.debug(f'pid: {pid}, get message from {addr}')        request_body: bytes = clientsocket.recv(2048)        request_text: str = request_body.decode(encoding=default_encoding)        response_text: str = f'server get message: {request_text}'        response_body: bytes = response_text.encode(default_encoding)        # time.sleep(1)        send_len = send_response(            clientsocket=clientsocket, addr=addr, response_body=response_body)        logger.debug(f'发送了响应')        return send_len    except Exception as error:        logger.exception(error)def start_request_callback(future: Future) -> None:    send_len: int = future.result()    logger.debug(        f'{threading.current_thread().name}, send payload len is {send_len}')if __name__ == "__main__":    serversocket = init_serversocket()    pool = multiprocessing.Pool(processes=16)    while True:        clientsocket, addr = serversocket.accept()        clientsocket: socket.socket        addr: tuple        # future: Future = pool.submit(start_request, clientsocket, addr)        # future.add_done_callback(start_request_callback)        pool.apply_async(start_request, (clientsocket, addr))    pool.close()    pool.join()

应用 ProcessPoolExecutor 的实现

import osimport socketimport sysimport timeimport threadingfrom loguru import loggerfrom concurrent.futures._base import Futureimport multiprocessingfrom concurrent.futures import ProcessPoolExecutordefault_encoding: str = 'utf-8'def init_serversocket() -> socket.socket:    serversocket = socket.socket(        family=socket.AF_INET,        type=socket.SOCK_STREAM    )    # 获取本地主机名    host = socket.gethostname()    logger.debug(f'host {host}')    port = 6001    # 绑定端口号    serversocket.bind(('0.0.0.0', port))    # 设置最大连接数,超过后排队    serversocket.listen(2048)    return serversocketdef send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:    send_len: int = clientsocket.send(response_body)    clientsocket.close()    return send_lendef start_request(clientsocket: socket.socket, addr: tuple) -> int:    try:        pid = os.getpid()        logger.debug(f'pid: {pid}, get message from {addr}')        request_body: bytes = clientsocket.recv(2048)        request_text: str = request_body.decode(encoding=default_encoding)        response_text: str = f'server get message: {request_text}'        response_body: bytes = response_text.encode(default_encoding)        # time.sleep(1)        send_len = send_response(            clientsocket=clientsocket, addr=addr, response_body=response_body)        logger.debug(f'发送了响应')        return send_len    except Exception as error:        logger.exception(error)def start_request_callback(future: Future) -> None:    send_len: int = future.result()    logger.debug(        f'{threading.current_thread().name}, send payload len is {send_len}')if __name__ == "__main__":    serversocket = init_serversocket()    # pool = multiprocessing.Pool(    #     processes=16,    #     mp_context=multiprocessing.get_context('spawn')    # )    pool = ProcessPoolExecutor(        max_workers=multiprocessing.cpu_count(),        mp_context=multiprocessing.get_context('spawn')    )    while True:        clientsocket, addr = serversocket.accept()        clientsocket: socket.socket        addr: tuple        # future: Future = pool.submit(start_request, clientsocket, addr)        # future.add_done_callback(start_request_callback)                pool.submit(start_request,clientsocket, addr)        # pool.apply_async(start_request, (clientsocket, addr))    pool.close()    pool.join()

上诉两种实现的潜在问题

问题一:无奈完满运行在 mac 平台

下面两种形式在 Linux 上都能够工作的良好,然而在 mac 上却不行

服务端会有很大概率报错(客户端申请的时候,随机呈现报错):

  • ConnectionRefusedError: [Errno 61] Connection refused
  • concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

问题二:master 负载太高,容易成为瓶颈,无奈实现横向扩大

master 过程都干冒烟了,slave 都闲着

为什么呢?因为 slave 干的事件太少了,而 master 过程负责了 serversocket.accept 等等操作,压力山大

改进版:反对横向扩大的 master-slave 构造