共计 5858 个字符,预计需要花费 15 分钟才能阅读完成。
看到一篇文章 nameko 的应用及注意事项,中有一段对于发动近程调用的形容
rpc 后紧跟的是微服务定义时的类变量 name 的值即为微服务名称,接着紧跟 rpc 办法,应用 call_async 为异步调用,而调用 result_async.result() 时会期待异步工作返回后果。须要留神的是,运行 ClusterRpcProxy(config) 时会创立与队列的连贯,该操作比拟耗时, 如果有大量的微服务调用,不应该反复创立连贯,应在语句块内实现所有调用 。异步调用的后果只能在语句块内获取,即调用 .result() 期待后果。语句块之外连贯断开就无奈获取了。
大略意思就是说频繁实例化 ClusterRpcProxy
, 联合我之前应用在 RabbitMQ 后盾看到的奇怪景象,感觉有必要来钻研一下:
上面是常见的 nameko
调用形式
api.py
from fastapi import FastAPI
from rpc import (upload_service_rpc)
app = FastAPI()
@app.get('/')
async def root():
return {"message": "Hello World"}
@app.post('/upload/')
def upload(data: UploadRequestBody):
logger.debug(data.json(ensure_ascii=False))
success: bool = upload_service_rpc(data) # 此处发动 rpc 调用
return {'status': success}
upload_service_rpc
办法是对 nameko
提供的 ClusterRpcProxy
的一层函数包装
rpc.py
from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (UploadRequestBody,)
from loguru import logger
config = {'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}
def upload_service_rpc(data: UploadRequestBody) -> bool:
"""给 fatapi 裸露的 rpc 接口"""
with ClusterRpcProxy(config) as cluster_rpc: # 通过 ClusterRpcProxy 发动 RPC 申请
success: bool = cluster_rpc.console_service.upload(data=data.json(ensure_ascii=False)
)
return success
然而下面的写法,看起来很完满,然而 nameko
的实现是每次实例化 ClusterRpcProxy
都会在 RabbitMQ
中创立一个新的 queue
,如果咱们每次 rpc
申请都想下面的代码一样频繁实例化 ClusterRpcProxy
会导致大量的工夫消耗在创立 queue
上。
下图是 RabbmitMQ
后盾治理界面的截图,能够看到当发动屡次申请的时候,会呈现大量 rpc.reply-standalone_rpc_proxy_{routing_key}
格局的 queue
这些
rpc.reply-standalone_rpc_proxy_{routing_key}
队列会在没有音讯之后几秒后被敞开,不会始终存在上来
接下来革新代码:
api.py
import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (UploadRequestBody)
from rpc import (init_rpc_proxy)
app = FastAPI()
rpc_proxy = init_rpc_proxy() # 把 rpc_proxy 对象变成一个全局变量,生命周期随同整个程序
@app.post('/upload/')
def upload(data: UploadRequestBody):
logger.debug(data.json(ensure_ascii=False))
success: bool = rpc_proxy.console_service.upload( # 执行 rpc 调用 console_service 的 upload 办法
data=data.json(ensure_ascii=False)
)
return {'status': success}
rpc.py
# coding=utf-8
from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (UploadRequestBody,)
from loguru import logger
config = {'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}
def init_rpc_proxy():
return ClusterRpcProxy(config) # init_rpc_proxy 只负责返回对象,不执行代码
然而当咱们执行下面的新代码的时候,就报错
AttributeError: 'ClusterRpcProxy' object has no attribute 'console_service'
为什么呢?起因在于 ClusterRpcProxy 类的 __enter__
办法,但咱们不应用 with
上下文管理器的时候,就不会执行 __enter__
办法中的内容,而机密就在 __enter__
办法中,让咱们来看看 __enter__
办法中有什么吧!
nameko/standalone/rpc.py
class StandaloneProxyBase(object): # StandaloneProxyBase 是 ClusterRpcProxy 的父类
class ServiceContainer(object):
""" Implements a minimum interface of the
:class:`~containers.ServiceContainer` to be used by the subclasses
and rpc imports in this module.
"""service_name ="standalone_rpc_proxy"
def __init__(self, config):
self.config = config
self.shared_extensions = {}
class Dummy(Entrypoint):
method_name = "call"
_proxy = None
def __init__(
self, config, context_data=None, timeout=None,
reply_listener_cls=SingleThreadedReplyListener
):
container = self.ServiceContainer(config)
self._worker_ctx = WorkerContext(
container, service=None, entrypoint=self.Dummy,
data=context_data)
self._reply_listener = reply_listener_cls(timeout=timeout).bind(container)
def __enter__(self):
return self.start()
def __exit__(self, tpe, value, traceback):
self.stop()
def start(self):
self._reply_listener.setup()
return self._proxy
def stop(self):
self._reply_listener.stop()
class ClusterRpcProxy(StandaloneProxyBase):
def __init__(self, *args, **kwargs):
super(ClusterRpcProxy, self).__init__(*args, **kwargs)
self._proxy = ClusterProxy(self._worker_ctx, self._reply_listener)
StandaloneProxyBase
是 ClusterRpcProxy 的父类,能够看到 __enter__
办法执行了 return self.start()
,start 办法返回 return self._proxy
,而不是常见的 return self
,所以这就导致了咱们后面的谬误。
晓得了问题的起因,改起来就很快了!
api.py
import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (UploadRequestBody)
from rpc import (init_rpc_proxy)
app = FastAPI()
_rpc_proxy = init_rpc_proxy() # 辨别两个 _rpc_proxy 和 rpc_proxy
rpc_proxy = _rpc_proxy.start()
@app.post('/upload/')
def upload(data: UploadRequestBody):
logger.debug(data.json(ensure_ascii=False))
# success: bool = upload_service_rpc2(data)
success: bool = rpc_proxy.console_service.upload( # 应用 rpc_proxy 调用 rpc 办法
data=data.json(ensure_ascii=False)
)
return {'status': success}
rpc.py
# coding=utf-8
from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (UploadRequestBody,)
from loguru import logger
config = {'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}
def init_rpc_proxy():
return ClusterRpcProxy(config)
好了,咱们看看前后的速度差别:
测试代码:
import requests
data = {# 暗藏了这部分内容}
for i in range(20):
response = requests.post('http://localhost:63000/upload/', json=data)
print(response.status_code, response.text)
循环跑 20 次:
批改前:
─➤ time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py 0.14s user 0.05s system 1% cpu 14.696 total
批改后:
─➤ time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py 0.14s user 0.05s system 2% cpu 7.271 total
因为防止了每次 RPC 申请都创立一个 queue
,所以速度实现了比拟大的晋升。
14
秒比照 7
秒,实现了速度的翻倍!