nginx、uvicorn、gunicorn 这些 HTTP sever 都是 master-slave 架构
你好奇他们是怎么实现的吗?
如果你是也是一个好奇宝宝,就接着往下看吧
多过程
参考:python 过程池的两种不同实现
应用 multiprocessing.Pool 的实现
import osimport socketimport sysimport timeimport threadingfrom loguru import loggerfrom concurrent.futures import ThreadPoolExecutorfrom concurrent.futures._base import Futureimport multiprocessingdefault_encoding: str = 'utf-8'pool = ThreadPoolExecutor( max_workers=20, thread_name_prefix='simple-work-thread-pool')def init_serversocket() -> socket.socket: serversocket = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM ) # 获取本地主机名 host = socket.gethostname() logger.debug(f'host {host}') port = 6001 # 绑定端口号 serversocket.bind(('0.0.0.0', port)) # 设置最大连接数,超过后排队 serversocket.listen(2048) return serversocketdef send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int: send_len: int = clientsocket.send(response_body) clientsocket.close() return send_lendef start_request(clientsocket: socket.socket, addr: tuple) -> int: try: pid = os.getpid() logger.debug(f'pid: {pid}, get message from {addr}') request_body: bytes = clientsocket.recv(2048) request_text: str = request_body.decode(encoding=default_encoding) response_text: str = f'server get message: {request_text}' response_body: bytes = response_text.encode(default_encoding) # time.sleep(1) send_len = send_response( clientsocket=clientsocket, addr=addr, response_body=response_body) logger.debug(f'发送了响应') return send_len except Exception as error: logger.exception(error)def start_request_callback(future: Future) -> None: send_len: int = future.result() logger.debug( f'{threading.current_thread().name}, send payload len is {send_len}')if __name__ == "__main__": serversocket = init_serversocket() pool = multiprocessing.Pool(processes=16) while True: clientsocket, addr = serversocket.accept() clientsocket: socket.socket addr: tuple # future: Future = pool.submit(start_request, clientsocket, addr) # future.add_done_callback(start_request_callback) pool.apply_async(start_request, (clientsocket, addr)) pool.close() pool.join()
应用 ProcessPoolExecutor 的实现
import osimport socketimport sysimport timeimport threadingfrom loguru import loggerfrom concurrent.futures._base import Futureimport multiprocessingfrom concurrent.futures import ProcessPoolExecutordefault_encoding: str = 'utf-8'def init_serversocket() -> socket.socket: serversocket = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM ) # 获取本地主机名 host = socket.gethostname() logger.debug(f'host {host}') port = 6001 # 绑定端口号 serversocket.bind(('0.0.0.0', port)) # 设置最大连接数,超过后排队 serversocket.listen(2048) return serversocketdef send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int: send_len: int = clientsocket.send(response_body) clientsocket.close() return send_lendef start_request(clientsocket: socket.socket, addr: tuple) -> int: try: pid = os.getpid() logger.debug(f'pid: {pid}, get message from {addr}') request_body: bytes = clientsocket.recv(2048) request_text: str = request_body.decode(encoding=default_encoding) response_text: str = f'server get message: {request_text}' response_body: bytes = response_text.encode(default_encoding) # time.sleep(1) send_len = send_response( clientsocket=clientsocket, addr=addr, response_body=response_body) logger.debug(f'发送了响应') return send_len except Exception as error: logger.exception(error)def start_request_callback(future: Future) -> None: send_len: int = future.result() logger.debug( f'{threading.current_thread().name}, send payload len is {send_len}')if __name__ == "__main__": serversocket = init_serversocket() # pool = multiprocessing.Pool( # processes=16, # mp_context=multiprocessing.get_context('spawn') # ) pool = ProcessPoolExecutor( max_workers=multiprocessing.cpu_count(), mp_context=multiprocessing.get_context('spawn') ) while True: clientsocket, addr = serversocket.accept() clientsocket: socket.socket addr: tuple # future: Future = pool.submit(start_request, clientsocket, addr) # future.add_done_callback(start_request_callback) pool.submit(start_request,clientsocket, addr) # pool.apply_async(start_request, (clientsocket, addr)) pool.close() pool.join()
上诉两种实现的潜在问题
问题一:无奈完满运行在 mac 平台
下面两种形式在 Linux 上都能够工作的良好,然而在 mac 上却不行
服务端会有很大概率报错(客户端申请的时候,随机呈现报错):
- ConnectionRefusedError: [Errno 61] Connection refused
- concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
问题二:master 负载太高,容易成为瓶颈,无奈实现横向扩大
master 过程都干冒烟了,slave 都闲着
为什么呢?因为 slave 干的事件太少了,而 master 过程负责了 serversocket.accept 等等操作,压力山大