乐趣区

关于python:nameko-避坑笔记避免频繁实例化-ClusterRpcProxy

看到一篇文章 nameko 的应用及注意事项,中有一段对于发动近程调用的形容

rpc 后紧跟的是微服务定义时的类变量 name 的值即为微服务名称,接着紧跟 rpc 办法,应用 call_async 为异步调用,而调用 result_async.result() 时会期待异步工作返回后果。须要留神的是,运行 ClusterRpcProxy(config) 时会创立与队列的连贯,该操作比拟耗时, 如果有大量的微服务调用,不应该反复创立连贯,应在语句块内实现所有调用 。异步调用的后果只能在语句块内获取,即调用 .result() 期待后果。语句块之外连贯断开就无奈获取了。

大略意思就是说频繁实例化 ClusterRpcProxy, 联合我之前应用在 RabbitMQ 后盾看到的奇怪景象,感觉有必要来钻研一下:

上面是常见的 nameko 调用形式

api.py

from fastapi import FastAPI
from rpc import (upload_service_rpc)

app = FastAPI()

@app.get('/')
async def root():
    return {"message": "Hello World"}

@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    success: bool = upload_service_rpc(data)  # 此处发动 rpc 调用
    return {'status': success}

upload_service_rpc 办法是对 nameko 提供的 ClusterRpcProxy 的一层函数包装

rpc.py

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (UploadRequestBody,)
from loguru import logger

config = {'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}

def upload_service_rpc(data: UploadRequestBody) -> bool:
    """给 fatapi 裸露的 rpc 接口"""
    with ClusterRpcProxy(config) as cluster_rpc:   # 通过 ClusterRpcProxy 发动 RPC 申请
        success: bool = cluster_rpc.console_service.upload(data=data.json(ensure_ascii=False)
        )
        return success

然而下面的写法,看起来很完满,然而 nameko 的实现是每次实例化 ClusterRpcProxy 都会在 RabbitMQ 中创立一个新的 queue,如果咱们每次 rpc 申请都想下面的代码一样频繁实例化 ClusterRpcProxy 会导致大量的工夫消耗在创立 queue 上。

下图是 RabbmitMQ 后盾治理界面的截图,能够看到当发动屡次申请的时候,会呈现大量 rpc.reply-standalone_rpc_proxy_{routing_key} 格局的 queue

这些 rpc.reply-standalone_rpc_proxy_{routing_key} 队列会在没有音讯之后几秒后被敞开,不会始终存在上来

接下来革新代码:

api.py

import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (UploadRequestBody)
from rpc import (init_rpc_proxy)

app = FastAPI()


rpc_proxy = init_rpc_proxy()    #  把 rpc_proxy 对象变成一个全局变量,生命周期随同整个程序


@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    success: bool = rpc_proxy.console_service.upload(  # 执行 rpc 调用 console_service 的 upload 办法
        data=data.json(ensure_ascii=False)
    )

    return {'status': success}


rpc.py

# coding=utf-8

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (UploadRequestBody,)
from loguru import logger

config = {'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}


def init_rpc_proxy():
    return ClusterRpcProxy(config) # init_rpc_proxy 只负责返回对象,不执行代码

然而当咱们执行下面的新代码的时候,就报错

AttributeError: 'ClusterRpcProxy' object has no attribute 'console_service'

为什么呢?起因在于 ClusterRpcProxy 类的 __enter__ 办法,但咱们不应用 with 上下文管理器的时候,就不会执行 __enter__ 办法中的内容,而机密就在 __enter__ 办法中,让咱们来看看 __enter__ 办法中有什么吧!

nameko/standalone/rpc.py

class StandaloneProxyBase(object):   # StandaloneProxyBase 是 ClusterRpcProxy 的父类
    class ServiceContainer(object):
        """ Implements a minimum interface of the
        :class:`~containers.ServiceContainer` to be used by the subclasses
        and rpc imports in this module.
        """service_name ="standalone_rpc_proxy"

        def __init__(self, config):
            self.config = config
            self.shared_extensions = {}

    class Dummy(Entrypoint):
        method_name = "call"

    _proxy = None

    def __init__(
        self, config, context_data=None, timeout=None,
        reply_listener_cls=SingleThreadedReplyListener
    ):
        container = self.ServiceContainer(config)

        self._worker_ctx = WorkerContext(
            container, service=None, entrypoint=self.Dummy,
            data=context_data)
        self._reply_listener = reply_listener_cls(timeout=timeout).bind(container)

    def __enter__(self):
        return self.start()

    def __exit__(self, tpe, value, traceback):
        self.stop()

    def start(self):
        self._reply_listener.setup()
        return self._proxy

    def stop(self):
        self._reply_listener.stop()

class ClusterRpcProxy(StandaloneProxyBase):
    def __init__(self, *args, **kwargs):
        super(ClusterRpcProxy, self).__init__(*args, **kwargs)
        self._proxy = ClusterProxy(self._worker_ctx, self._reply_listener)

StandaloneProxyBase 是 ClusterRpcProxy 的父类,能够看到 __enter__ 办法执行了 return self.start(),start 办法返回 return self._proxy,而不是常见的 return self,所以这就导致了咱们后面的谬误。

晓得了问题的起因,改起来就很快了!

api.py

import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (UploadRequestBody)
from rpc import (init_rpc_proxy)

app = FastAPI()


_rpc_proxy = init_rpc_proxy()  # 辨别两个 _rpc_proxy 和 rpc_proxy
rpc_proxy = _rpc_proxy.start()


@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    # success: bool = upload_service_rpc2(data)
    success: bool = rpc_proxy.console_service.upload( # 应用 rpc_proxy 调用 rpc 办法
        data=data.json(ensure_ascii=False)
    )

    return {'status': success}

rpc.py

# coding=utf-8

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (UploadRequestBody,)
from loguru import logger

config = {'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}


def init_rpc_proxy():
    return ClusterRpcProxy(config)

好了,咱们看看前后的速度差别:

测试代码:

import requests

data = {# 暗藏了这部分内容}

for i in range(20):
    response = requests.post('http://localhost:63000/upload/', json=data)
    print(response.status_code, response.text)

循环跑 20 次:

批改前:

─➤  time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py  0.14s user 0.05s system 1% cpu 14.696 total

批改后:

─➤  time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py  0.14s user 0.05s system 2% cpu 7.271 total

因为防止了每次 RPC 申请都创立一个 queue,所以速度实现了比拟大的晋升。

14 秒比照 7 秒,实现了速度的翻倍!

退出移动版