关于python:nameko-避坑笔记避免频繁实例化-ClusterRpcProxy

看到一篇文章nameko 的应用及注意事项 ,中有一段对于发动近程调用的形容

rpc后紧跟的是微服务定义时的类变量 name 的值即为微服务名称,接着紧跟rpc办法,应用 call_async 为异步调用,而调用 result_async.result() 时会期待异步工作返回后果。须要留神的是, 运行 ClusterRpcProxy(config) 时会创立与队列的连贯,该操作比拟耗时,如果有大量的微服务调用,不应该反复创立连贯,应在语句块内实现所有调用。异步调用的后果只能在语句块内获取,即调用 .result() 期待后果。语句块之外连贯断开就无奈获取了。

大略意思就是说频繁实例化 ClusterRpcProxy,联合我之前应用在 RabbitMQ 后盾看到的奇怪景象,感觉有必要来钻研一下:

上面是常见的 nameko 调用形式

api.py

from fastapi import FastAPI
from rpc import (
    upload_service_rpc
)

app = FastAPI()

@app.get('/')
async def root():
    return {"message": "Hello World"}

@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    success: bool = upload_service_rpc(data)  # 此处发动 rpc 调用
    return {
        'status': success
    }

upload_service_rpc 办法是对 nameko 提供的 ClusterRpcProxy 的一层函数包装

rpc.py

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (
    UploadRequestBody,
)
from loguru import logger

config = {
    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}

def upload_service_rpc(data: UploadRequestBody) -> bool:
    """ 给 fatapi 裸露的 rpc 接口 """
    with ClusterRpcProxy(config) as cluster_rpc:   # 通过 ClusterRpcProxy 发动 RPC 申请
        success: bool = cluster_rpc.console_service.upload(
            data=data.json(ensure_ascii=False)
        )
        return success

然而下面的写法,看起来很完满,然而 nameko 的实现是每次实例化 ClusterRpcProxy 都会在 RabbitMQ 中创立一个新的 queue ,如果咱们每次 rpc 申请都想下面的代码一样频繁实例化 ClusterRpcProxy 会导致大量的工夫消耗在创立 queue 上。

下图是 RabbmitMQ 后盾治理界面的截图,能够看到当发动屡次申请的时候,会呈现大量 rpc.reply-standalone_rpc_proxy_{routing_key} 格局的 queue

这些 rpc.reply-standalone_rpc_proxy_{routing_key} 队列会在没有音讯之后几秒后被敞开,不会始终存在上来

接下来革新代码:

api.py

import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (
    UploadRequestBody
)
from rpc import (
    init_rpc_proxy
)

app = FastAPI()


rpc_proxy = init_rpc_proxy()    #  把 rpc_proxy 对象变成一个全局变量,生命周期随同整个程序


@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    success: bool = rpc_proxy.console_service.upload(  # 执行 rpc 调用 console_service 的 upload 办法
        data=data.json(ensure_ascii=False)
    )

    return {
        'status': success
    }


rpc.py

# coding=utf-8

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (
    UploadRequestBody,
)
from loguru import logger

config = {
    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}


def init_rpc_proxy():
    return ClusterRpcProxy(config) # init_rpc_proxy 只负责返回对象,不执行代码

然而当咱们执行下面的新代码的时候,就报错

AttributeError: 'ClusterRpcProxy' object has no attribute 'console_service'

为什么呢?起因在于 ClusterRpcProxy 类的 __enter__ 办法,但咱们不应用 with 上下文管理器的时候,就不会执行 __enter__ 办法中的内容,而机密就在 __enter__ 办法中,让咱们来看看 __enter__ 办法中有什么吧!

nameko/standalone/rpc.py

class StandaloneProxyBase(object):   # StandaloneProxyBase 是 ClusterRpcProxy 的父类
    class ServiceContainer(object):
        """ Implements a minimum interface of the
        :class:`~containers.ServiceContainer` to be used by the subclasses
        and rpc imports in this module.
        """
        service_name = "standalone_rpc_proxy"

        def __init__(self, config):
            self.config = config
            self.shared_extensions = {}

    class Dummy(Entrypoint):
        method_name = "call"

    _proxy = None

    def __init__(
        self, config, context_data=None, timeout=None,
        reply_listener_cls=SingleThreadedReplyListener
    ):
        container = self.ServiceContainer(config)

        self._worker_ctx = WorkerContext(
            container, service=None, entrypoint=self.Dummy,
            data=context_data)
        self._reply_listener = reply_listener_cls(
            timeout=timeout).bind(container)

    def __enter__(self):
        return self.start()

    def __exit__(self, tpe, value, traceback):
        self.stop()

    def start(self):
        self._reply_listener.setup()
        return self._proxy

    def stop(self):
        self._reply_listener.stop()

class ClusterRpcProxy(StandaloneProxyBase):
    def __init__(self, *args, **kwargs):
        super(ClusterRpcProxy, self).__init__(*args, **kwargs)
        self._proxy = ClusterProxy(self._worker_ctx, self._reply_listener)

StandaloneProxyBase 是 ClusterRpcProxy 的父类,能够看到 __enter__ 办法执行了 return self.start(),start 办法返回 return self._proxy, 而不是常见的 return self,所以这就导致了咱们后面的谬误。

晓得了问题的起因,改起来就很快了!

api.py

import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (
    UploadRequestBody
)
from rpc import (
    init_rpc_proxy
)

app = FastAPI()


_rpc_proxy = init_rpc_proxy()  # 辨别两个 _rpc_proxy 和 rpc_proxy
rpc_proxy = _rpc_proxy.start()


@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    # success: bool = upload_service_rpc2(data)
    success: bool = rpc_proxy.console_service.upload( # 应用 rpc_proxy 调用 rpc 办法
        data=data.json(ensure_ascii=False)
    )

    return {
        'status': success
    }

rpc.py

# coding=utf-8

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (
    UploadRequestBody,
)
from loguru import logger

config = {
    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}


def init_rpc_proxy():
    return ClusterRpcProxy(config)

好了,咱们看看前后的速度差别:

测试代码:

import requests

data = {
    # 暗藏了这部分内容
}

for i in range(20):
    response = requests.post('http://localhost:63000/upload/', json=data)
    print(response.status_code, response.text)

循环跑 20 次:

批改前:

─➤  time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py  0.14s user 0.05s system 1% cpu 14.696 total

批改后:

─➤  time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py  0.14s user 0.05s system 2% cpu 7.271 total

因为防止了每次 RPC 申请都创立一个 queue,所以速度实现了比拟大的晋升。

14 秒比照 7 秒,实现了速度的翻倍!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理