共计 4205 个字符,预计需要花费 11 分钟才能阅读完成。
nginx、uvicorn、gunicorn 这些 HTTP sever 都是 master-slave 架构
你好奇他们是怎么实现的吗?
如果你是也是一个好奇宝宝,就接着往下看吧
多过程
参考:python 过程池的两种不同实现
应用 multiprocessing.Pool 的实现
import os
import socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
pool = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='simple-work-thread-pool'
)
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(2048)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket: socket.socket, addr: tuple) -> int:
try:
pid = os.getpid()
logger.debug(f'pid: {pid}, get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
# time.sleep(1)
send_len = send_response(clientsocket=clientsocket, addr=addr, response_body=response_body)
logger.debug(f'发送了响应')
return send_len
except Exception as error:
logger.exception(error)
def start_request_callback(future: Future) -> None:
send_len: int = future.result()
logger.debug(f'{threading.current_thread().name}, send payload len is {send_len}')
if __name__ == "__main__":
serversocket = init_serversocket()
pool = multiprocessing.Pool(processes=16)
while True:
clientsocket, addr = serversocket.accept()
clientsocket: socket.socket
addr: tuple
# future: Future = pool.submit(start_request, clientsocket, addr)
# future.add_done_callback(start_request_callback)
pool.apply_async(start_request, (clientsocket, addr))
pool.close()
pool.join()
应用 ProcessPoolExecutor 的实现
import os
import socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures._base import Future
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
default_encoding: str = 'utf-8'
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(2048)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket: socket.socket, addr: tuple) -> int:
try:
pid = os.getpid()
logger.debug(f'pid: {pid}, get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
# time.sleep(1)
send_len = send_response(clientsocket=clientsocket, addr=addr, response_body=response_body)
logger.debug(f'发送了响应')
return send_len
except Exception as error:
logger.exception(error)
def start_request_callback(future: Future) -> None:
send_len: int = future.result()
logger.debug(f'{threading.current_thread().name}, send payload len is {send_len}')
if __name__ == "__main__":
serversocket = init_serversocket()
# pool = multiprocessing.Pool(
# processes=16,
# mp_context=multiprocessing.get_context('spawn')
# )
pool = ProcessPoolExecutor(max_workers=multiprocessing.cpu_count(),
mp_context=multiprocessing.get_context('spawn')
)
while True:
clientsocket, addr = serversocket.accept()
clientsocket: socket.socket
addr: tuple
# future: Future = pool.submit(start_request, clientsocket, addr)
# future.add_done_callback(start_request_callback)
pool.submit(start_request,clientsocket, addr)
# pool.apply_async(start_request, (clientsocket, addr))
pool.close()
pool.join()
上诉两种实现的潜在问题
问题一:无奈完满运行在 mac 平台
下面两种形式在 Linux 上都能够工作的良好,然而在 mac 上却不行
服务端会有很大概率报错(客户端申请的时候,随机呈现报错):
- ConnectionRefusedError: [Errno 61] Connection refused
- concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
问题二:master 负载太高,容易成为瓶颈,无奈实现横向扩大
master 过程都干冒烟了,slave 都闲着
为什么呢?因为 slave 干的事件太少了,而 master 过程负责了 serversocket.accept 等等操作,压力山大
改进版:反对横向扩大的 master-slave 构造
正文完