看到一篇文章nameko 的应用及注意事项 ,中有一段对于发动近程调用的形容

rpc后紧跟的是微服务定义时的类变量 name 的值即为微服务名称,接着紧跟rpc办法,应用 call_async 为异步调用,而调用 result_async.result() 时会期待异步工作返回后果。须要留神的是, 运行 ClusterRpcProxy(config) 时会创立与队列的连贯,该操作比拟耗时,如果有大量的微服务调用,不应该反复创立连贯,应在语句块内实现所有调用。异步调用的后果只能在语句块内获取,即调用 .result() 期待后果。语句块之外连贯断开就无奈获取了。

大略意思就是说频繁实例化 ClusterRpcProxy,联合我之前应用在 RabbitMQ 后盾看到的奇怪景象,感觉有必要来钻研一下:

上面是常见的 nameko 调用形式

api.py

from fastapi import FastAPIfrom rpc import (    upload_service_rpc)app = FastAPI()@app.get('/')async def root():    return {"message": "Hello World"}@app.post('/upload/')def upload(data: UploadRequestBody):    logger.debug(data.json(ensure_ascii=False))    success: bool = upload_service_rpc(data)  # 此处发动 rpc 调用    return {        'status': success    }

upload_service_rpc 办法是对 nameko 提供的 ClusterRpcProxy 的一层函数包装

rpc.py

from nameko.standalone.rpc import ClusterRpcProxyimport settingsfrom schemas import (    UploadRequestBody,)from loguru import loggerconfig = {    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'}def upload_service_rpc(data: UploadRequestBody) -> bool:    """ 给 fatapi 裸露的 rpc 接口 """    with ClusterRpcProxy(config) as cluster_rpc:   # 通过 ClusterRpcProxy 发动 RPC 申请        success: bool = cluster_rpc.console_service.upload(            data=data.json(ensure_ascii=False)        )        return success

然而下面的写法,看起来很完满,然而 nameko 的实现是每次实例化 ClusterRpcProxy 都会在 RabbitMQ 中创立一个新的 queue ,如果咱们每次 rpc 申请都想下面的代码一样频繁实例化 ClusterRpcProxy 会导致大量的工夫消耗在创立 queue 上。

下图是 RabbmitMQ 后盾治理界面的截图,能够看到当发动屡次申请的时候,会呈现大量 rpc.reply-standalone_rpc_proxy_{routing_key} 格局的 queue

这些 rpc.reply-standalone_rpc_proxy_{routing_key} 队列会在没有音讯之后几秒后被敞开,不会始终存在上来

接下来革新代码:

api.py

import settingsfrom loguru import loggerfrom fastapi import FastAPIfrom schemas import (    UploadRequestBody)from rpc import (    init_rpc_proxy)app = FastAPI()rpc_proxy = init_rpc_proxy()    #  把 rpc_proxy 对象变成一个全局变量,生命周期随同整个程序@app.post('/upload/')def upload(data: UploadRequestBody):    logger.debug(data.json(ensure_ascii=False))    success: bool = rpc_proxy.console_service.upload(  # 执行 rpc 调用 console_service 的 upload 办法        data=data.json(ensure_ascii=False)    )    return {        'status': success    }

rpc.py

# coding=utf-8from nameko.standalone.rpc import ClusterRpcProxyimport settingsfrom schemas import (    UploadRequestBody,)from loguru import loggerconfig = {    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'}def init_rpc_proxy():    return ClusterRpcProxy(config) # init_rpc_proxy 只负责返回对象,不执行代码

然而当咱们执行下面的新代码的时候,就报错

AttributeError: 'ClusterRpcProxy' object has no attribute 'console_service'

为什么呢?起因在于 ClusterRpcProxy 类的 __enter__ 办法,但咱们不应用 with 上下文管理器的时候,就不会执行 __enter__ 办法中的内容,而机密就在 __enter__ 办法中,让咱们来看看 __enter__ 办法中有什么吧!

nameko/standalone/rpc.py

class StandaloneProxyBase(object):   # StandaloneProxyBase 是 ClusterRpcProxy 的父类    class ServiceContainer(object):        """ Implements a minimum interface of the        :class:`~containers.ServiceContainer` to be used by the subclasses        and rpc imports in this module.        """        service_name = "standalone_rpc_proxy"        def __init__(self, config):            self.config = config            self.shared_extensions = {}    class Dummy(Entrypoint):        method_name = "call"    _proxy = None    def __init__(        self, config, context_data=None, timeout=None,        reply_listener_cls=SingleThreadedReplyListener    ):        container = self.ServiceContainer(config)        self._worker_ctx = WorkerContext(            container, service=None, entrypoint=self.Dummy,            data=context_data)        self._reply_listener = reply_listener_cls(            timeout=timeout).bind(container)    def __enter__(self):        return self.start()    def __exit__(self, tpe, value, traceback):        self.stop()    def start(self):        self._reply_listener.setup()        return self._proxy    def stop(self):        self._reply_listener.stop()class ClusterRpcProxy(StandaloneProxyBase):    def __init__(self, *args, **kwargs):        super(ClusterRpcProxy, self).__init__(*args, **kwargs)        self._proxy = ClusterProxy(self._worker_ctx, self._reply_listener)

StandaloneProxyBase 是 ClusterRpcProxy 的父类,能够看到 __enter__ 办法执行了 return self.start(),start 办法返回 return self._proxy, 而不是常见的 return self,所以这就导致了咱们后面的谬误。

晓得了问题的起因,改起来就很快了!

api.py

import settingsfrom loguru import loggerfrom fastapi import FastAPIfrom schemas import (    UploadRequestBody)from rpc import (    init_rpc_proxy)app = FastAPI()_rpc_proxy = init_rpc_proxy()  # 辨别两个 _rpc_proxy 和 rpc_proxyrpc_proxy = _rpc_proxy.start()@app.post('/upload/')def upload(data: UploadRequestBody):    logger.debug(data.json(ensure_ascii=False))    # success: bool = upload_service_rpc2(data)    success: bool = rpc_proxy.console_service.upload( # 应用 rpc_proxy 调用 rpc 办法        data=data.json(ensure_ascii=False)    )    return {        'status': success    }

rpc.py

# coding=utf-8from nameko.standalone.rpc import ClusterRpcProxyimport settingsfrom schemas import (    UploadRequestBody,)from loguru import loggerconfig = {    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'}def init_rpc_proxy():    return ClusterRpcProxy(config)

好了,咱们看看前后的速度差别:

测试代码:

import requestsdata = {    # 暗藏了这部分内容}for i in range(20):    response = requests.post('http://localhost:63000/upload/', json=data)    print(response.status_code, response.text)

循环跑 20 次:

批改前:

─➤  time python test_api.py200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}python test_api.py  0.14s user 0.05s system 1% cpu 14.696 total

批改后:

─➤  time python test_api.py200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}200 {"status":true}python test_api.py  0.14s user 0.05s system 2% cpu 7.271 total

因为防止了每次 RPC 申请都创立一个 queue,所以速度实现了比拟大的晋升。

14 秒比照 7 秒,实现了速度的翻倍!