看到一篇文章nameko 的应用及注意事项 ,中有一段对于发动近程调用的形容
rpc后紧跟的是微服务定义时的类变量 name 的值即为微服务名称,接着紧跟rpc办法,应用 call_async 为异步调用,而调用 result_async.result() 时会期待异步工作返回后果。须要留神的是, 运行 ClusterRpcProxy(config) 时会创立与队列的连贯,该操作比拟耗时,如果有大量的微服务调用,不应该反复创立连贯,应在语句块内实现所有调用。异步调用的后果只能在语句块内获取,即调用 .result() 期待后果。语句块之外连贯断开就无奈获取了。
大略意思就是说频繁实例化 ClusterRpcProxy
,联合我之前应用在 RabbitMQ 后盾看到的奇怪景象,感觉有必要来钻研一下:
上面是常见的 nameko
调用形式
api.py
from fastapi import FastAPIfrom rpc import ( upload_service_rpc)app = FastAPI()@app.get('/')async def root(): return {"message": "Hello World"}@app.post('/upload/')def upload(data: UploadRequestBody): logger.debug(data.json(ensure_ascii=False)) success: bool = upload_service_rpc(data) # 此处发动 rpc 调用 return { 'status': success }
upload_service_rpc
办法是对 nameko
提供的 ClusterRpcProxy
的一层函数包装
rpc.py
from nameko.standalone.rpc import ClusterRpcProxyimport settingsfrom schemas import ( UploadRequestBody,)from loguru import loggerconfig = { 'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:' f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:' f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'}def upload_service_rpc(data: UploadRequestBody) -> bool: """ 给 fatapi 裸露的 rpc 接口 """ with ClusterRpcProxy(config) as cluster_rpc: # 通过 ClusterRpcProxy 发动 RPC 申请 success: bool = cluster_rpc.console_service.upload( data=data.json(ensure_ascii=False) ) return success
然而下面的写法,看起来很完满,然而 nameko
的实现是每次实例化 ClusterRpcProxy
都会在 RabbitMQ
中创立一个新的 queue
,如果咱们每次 rpc
申请都想下面的代码一样频繁实例化 ClusterRpcProxy
会导致大量的工夫消耗在创立 queue
上。
下图是 RabbmitMQ
后盾治理界面的截图,能够看到当发动屡次申请的时候,会呈现大量 rpc.reply-standalone_rpc_proxy_{routing_key}
格局的 queue
这些 rpc.reply-standalone_rpc_proxy_{routing_key}
队列会在没有音讯之后几秒后被敞开,不会始终存在上来
接下来革新代码:
api.py
import settingsfrom loguru import loggerfrom fastapi import FastAPIfrom schemas import ( UploadRequestBody)from rpc import ( init_rpc_proxy)app = FastAPI()rpc_proxy = init_rpc_proxy() # 把 rpc_proxy 对象变成一个全局变量,生命周期随同整个程序@app.post('/upload/')def upload(data: UploadRequestBody): logger.debug(data.json(ensure_ascii=False)) success: bool = rpc_proxy.console_service.upload( # 执行 rpc 调用 console_service 的 upload 办法 data=data.json(ensure_ascii=False) ) return { 'status': success }
rpc.py
# coding=utf-8from nameko.standalone.rpc import ClusterRpcProxyimport settingsfrom schemas import ( UploadRequestBody,)from loguru import loggerconfig = { 'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:' f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:' f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'}def init_rpc_proxy(): return ClusterRpcProxy(config) # init_rpc_proxy 只负责返回对象,不执行代码
然而当咱们执行下面的新代码的时候,就报错
AttributeError: 'ClusterRpcProxy' object has no attribute 'console_service'
为什么呢?起因在于 ClusterRpcProxy 类的 __enter__
办法,但咱们不应用 with
上下文管理器的时候,就不会执行 __enter__
办法中的内容,而机密就在 __enter__
办法中,让咱们来看看 __enter__
办法中有什么吧!
nameko/standalone/rpc.py
class StandaloneProxyBase(object): # StandaloneProxyBase 是 ClusterRpcProxy 的父类 class ServiceContainer(object): """ Implements a minimum interface of the :class:`~containers.ServiceContainer` to be used by the subclasses and rpc imports in this module. """ service_name = "standalone_rpc_proxy" def __init__(self, config): self.config = config self.shared_extensions = {} class Dummy(Entrypoint): method_name = "call" _proxy = None def __init__( self, config, context_data=None, timeout=None, reply_listener_cls=SingleThreadedReplyListener ): container = self.ServiceContainer(config) self._worker_ctx = WorkerContext( container, service=None, entrypoint=self.Dummy, data=context_data) self._reply_listener = reply_listener_cls( timeout=timeout).bind(container) def __enter__(self): return self.start() def __exit__(self, tpe, value, traceback): self.stop() def start(self): self._reply_listener.setup() return self._proxy def stop(self): self._reply_listener.stop()class ClusterRpcProxy(StandaloneProxyBase): def __init__(self, *args, **kwargs): super(ClusterRpcProxy, self).__init__(*args, **kwargs) self._proxy = ClusterProxy(self._worker_ctx, self._reply_listener)
StandaloneProxyBase
是 ClusterRpcProxy 的父类,能够看到 __enter__
办法执行了 return self.start()
,start 办法返回 return self._proxy
, 而不是常见的 return self
,所以这就导致了咱们后面的谬误。
晓得了问题的起因,改起来就很快了!
api.py
import settingsfrom loguru import loggerfrom fastapi import FastAPIfrom schemas import ( UploadRequestBody)from rpc import ( init_rpc_proxy)app = FastAPI()_rpc_proxy = init_rpc_proxy() # 辨别两个 _rpc_proxy 和 rpc_proxyrpc_proxy = _rpc_proxy.start()@app.post('/upload/')def upload(data: UploadRequestBody): logger.debug(data.json(ensure_ascii=False)) # success: bool = upload_service_rpc2(data) success: bool = rpc_proxy.console_service.upload( # 应用 rpc_proxy 调用 rpc 办法 data=data.json(ensure_ascii=False) ) return { 'status': success }
rpc.py
# coding=utf-8from nameko.standalone.rpc import ClusterRpcProxyimport settingsfrom schemas import ( UploadRequestBody,)from loguru import loggerconfig = { 'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:' f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:' f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'}def init_rpc_proxy(): return ClusterRpcProxy(config)
好了,咱们看看前后的速度差别:
测试代码:
import requestsdata = { # 暗藏了这部分内容}for i in range(20): response = requests.post('http://localhost:63000/upload/', json=data) print(response.status_code, response.text)
循环跑 20 次:
批改前:
─➤ time python test_api.py200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}python test_api.py 0.14s user 0.05s system 1% cpu 14.696 total
批改后:
─➤ time python test_api.py200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}python test_api.py 0.14s user 0.05s system 2% cpu 7.271 total
因为防止了每次 RPC 申请都创立一个 queue
,所以速度实现了比拟大的晋升。
14
秒比照 7
秒,实现了速度的翻倍!