共计 4205 个字符,预计需要花费 11 分钟才能阅读完成。
nginx、uvicorn、gunicorn 这些 HTTP sever 都是 master-slave 架构
你好奇他们是怎么实现的吗?
如果你是也是一个好奇宝宝,就接着往下看吧
多过程
参考:python 过程池的两种不同实现
应用 multiprocessing.Pool 的实现
import os | |
import socket | |
import sys | |
import time | |
import threading | |
from loguru import logger | |
from concurrent.futures import ThreadPoolExecutor | |
from concurrent.futures._base import Future | |
import multiprocessing | |
default_encoding: str = 'utf-8' | |
pool = ThreadPoolExecutor( | |
max_workers=20, | |
thread_name_prefix='simple-work-thread-pool' | |
) | |
def init_serversocket() -> socket.socket: | |
serversocket = socket.socket( | |
family=socket.AF_INET, | |
type=socket.SOCK_STREAM | |
) | |
# 获取本地主机名 | |
host = socket.gethostname() | |
logger.debug(f'host {host}') | |
port = 6001 | |
# 绑定端口号 | |
serversocket.bind(('0.0.0.0', port)) | |
# 设置最大连接数,超过后排队 | |
serversocket.listen(2048) | |
return serversocket | |
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int: | |
send_len: int = clientsocket.send(response_body) | |
clientsocket.close() | |
return send_len | |
def start_request(clientsocket: socket.socket, addr: tuple) -> int: | |
try: | |
pid = os.getpid() | |
logger.debug(f'pid: {pid}, get message from {addr}') | |
request_body: bytes = clientsocket.recv(2048) | |
request_text: str = request_body.decode(encoding=default_encoding) | |
response_text: str = f'server get message: {request_text}' | |
response_body: bytes = response_text.encode(default_encoding) | |
# time.sleep(1) | |
send_len = send_response(clientsocket=clientsocket, addr=addr, response_body=response_body) | |
logger.debug(f'发送了响应') | |
return send_len | |
except Exception as error: | |
logger.exception(error) | |
def start_request_callback(future: Future) -> None: | |
send_len: int = future.result() | |
logger.debug(f'{threading.current_thread().name}, send payload len is {send_len}') | |
if __name__ == "__main__": | |
serversocket = init_serversocket() | |
pool = multiprocessing.Pool(processes=16) | |
while True: | |
clientsocket, addr = serversocket.accept() | |
clientsocket: socket.socket | |
addr: tuple | |
# future: Future = pool.submit(start_request, clientsocket, addr) | |
# future.add_done_callback(start_request_callback) | |
pool.apply_async(start_request, (clientsocket, addr)) | |
pool.close() | |
pool.join() |
应用 ProcessPoolExecutor 的实现
import os | |
import socket | |
import sys | |
import time | |
import threading | |
from loguru import logger | |
from concurrent.futures._base import Future | |
import multiprocessing | |
from concurrent.futures import ProcessPoolExecutor | |
default_encoding: str = 'utf-8' | |
def init_serversocket() -> socket.socket: | |
serversocket = socket.socket( | |
family=socket.AF_INET, | |
type=socket.SOCK_STREAM | |
) | |
# 获取本地主机名 | |
host = socket.gethostname() | |
logger.debug(f'host {host}') | |
port = 6001 | |
# 绑定端口号 | |
serversocket.bind(('0.0.0.0', port)) | |
# 设置最大连接数,超过后排队 | |
serversocket.listen(2048) | |
return serversocket | |
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int: | |
send_len: int = clientsocket.send(response_body) | |
clientsocket.close() | |
return send_len | |
def start_request(clientsocket: socket.socket, addr: tuple) -> int: | |
try: | |
pid = os.getpid() | |
logger.debug(f'pid: {pid}, get message from {addr}') | |
request_body: bytes = clientsocket.recv(2048) | |
request_text: str = request_body.decode(encoding=default_encoding) | |
response_text: str = f'server get message: {request_text}' | |
response_body: bytes = response_text.encode(default_encoding) | |
# time.sleep(1) | |
send_len = send_response(clientsocket=clientsocket, addr=addr, response_body=response_body) | |
logger.debug(f'发送了响应') | |
return send_len | |
except Exception as error: | |
logger.exception(error) | |
def start_request_callback(future: Future) -> None: | |
send_len: int = future.result() | |
logger.debug(f'{threading.current_thread().name}, send payload len is {send_len}') | |
if __name__ == "__main__": | |
serversocket = init_serversocket() | |
# pool = multiprocessing.Pool( | |
# processes=16, | |
# mp_context=multiprocessing.get_context('spawn') | |
# ) | |
pool = ProcessPoolExecutor(max_workers=multiprocessing.cpu_count(), | |
mp_context=multiprocessing.get_context('spawn') | |
) | |
while True: | |
clientsocket, addr = serversocket.accept() | |
clientsocket: socket.socket | |
addr: tuple | |
# future: Future = pool.submit(start_request, clientsocket, addr) | |
# future.add_done_callback(start_request_callback) | |
pool.submit(start_request,clientsocket, addr) | |
# pool.apply_async(start_request, (clientsocket, addr)) | |
pool.close() | |
pool.join() |
上诉两种实现的潜在问题
问题一:无奈完满运行在 mac 平台
下面两种形式在 Linux 上都能够工作的良好,然而在 mac 上却不行
服务端会有很大概率报错(客户端申请的时候,随机呈现报错):
- ConnectionRefusedError: [Errno 61] Connection refused
- concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
问题二:master 负载太高,容易成为瓶颈,无奈实现横向扩大
master 过程都干冒烟了,slave 都闲着
为什么呢?因为 slave 干的事件太少了,而 master 过程负责了 serversocket.accept 等等操作,压力山大
改进版:反对横向扩大的 master-slave 构造
正文完