共计 8638 个字符,预计需要花费 22 分钟才能阅读完成。
从明天开始将陆陆续续发表一些 openstack 相干的文章。
openstack 服务的启动
根本所有的 openstack 服务都依赖 evenlet 实现各种并发工作,它的过程可分为两类:
1、WSGIService: 接管和解决 http 申请,依赖 eventlet.wsgi 的 wsgi server 解决 http 申请,比方 nova-api
2、Service: 接管和解决 rpc 申请,如 nova-operation 等
无论是 WSGIService 还是 Service 类型的过程,每当接管到一个申请(http 或 rpc),都会在线程池中调配一个协程解决该申请
一、WSGIService 的启动
上面以 nova 服务为例。
nova-api 由 nova/cmd/api.py 启动,它初始化一个 WSGIService(由 service.py 定义) 对象。
def main():
objects.register_all() | |
CONF(sys.argv[1:], project='nova', | |
version=version.version_string()) | |
logging.setup(CONF, "nova") | |
rpc.init(CONF) | |
launcher = service.get_launcher() | |
server = service.WSGIService('osapi_nova') | |
launcher.launch_service(server, workers=server.workers) | |
launcher.wait() |
api 中从 service 层获取一个启动器对象,最初将 server 对象传入启动器对象的 launch_service 办法中,launch_service(server, workers=server.workers)办法定义如下:
class Launcher(object):
def __init__(self): | |
super(Launcher, self).__init__() | |
self.launch_service = serve | |
self.wait = wait |
该办法被援用到 serve 办法,serve 办法定义如下:
def serve(server, workers=None):
global _launcher | |
if _launcher: | |
raise RuntimeError(_('serve() can only be called once')) | |
_launcher = service.launch(CONF, server, workers=workers) |
最终调用了 oslo_service/service.py 下的 launch 办法,launch 办法定义如下:
def launch(conf, service, workers=1, restart_method=’reload’):
… | |
if workers is not None and workers <= 0: | |
raise ValueError(_("Number of workers should be positive!")) | |
if workers is None or workers == 1: | |
launcher = ServiceLauncher(conf, restart_method=restart_method) | |
else: | |
launcher = ProcessLauncher(conf, restart_method=restart_method) | |
launcher.launch_service(service, workers=workers) |
能够看到这里应用到了两种启动器,在进一步解说启动的过程中先介绍下 openstack 中的启动器
二、Openstack 中的 Launcher
Openstack 中有一个叫 Launcher 的概念,即专门用来启动服务的,这个类被放在了 oslo_service 这个包外面,Launcher 分为两种:
一种是 ServiceLauncher;
另一种为 ProcessLauncher。
ServiceLauncher 用来启动单过程的服务;
而 ProcessLauncher 用来启动有多个 worker 子过程的服务,如各类 api 服务 (nova-api、cinder-api) 等
oslo_service/service.py
1、ServiceLauncher
ServiceLauncher 继承自 Launcher,启动服务的一个重要成员就是 launcher_service,ServiceLauncher 的该成员就是继承于 Launcher
def launch_service(self, service, workers=1):
… | |
if workers is not None and workers != 1: | |
raise ValueError(_("Launcher asked to start multiple workers")) | |
_check_service_base(service) | |
service.backdoor_port = self.backdoor_port | |
self.services.add(service) |
aucher_service 就是将服务增加到 self.services 成员外面,services 成员的类型是 class Services,看看它的 add 办法
class Services(object):
def __init__(self): | |
self.services = [] | |
self.tg = threadgroup.ThreadGroup() | |
self.done = event.Event() | |
def add(self, service): | |
"""Add a service to a list and create a thread to run it. | |
:param service: service to run | |
""" | |
self.services.append(service) | |
self.tg.add_thread(self.run_service, service, self.done) |
Services 这个类的初始化很简略,即创立一个 ThreadGroup,ThreadGroup 其实是 eventlet 的 GreenPool,Openstack 利用 eventlet 实现并发,add 办法,将 self.run_service 这个办法放入 pool 中,而 service 就是它的参数。run_service 办法很简略,就是调用 service 的 start 办法,这样就实现了服务的启动
2、ProcessLauncher
ProcessLauncher 间接继承于 Object,同样也有 launch_service 办法
def launch_service(self, service, workers=1):
… | |
_check_service_base(service) | |
wrap = ServiceWrapper(service, workers) | |
LOG.info('Starting %d workers', wrap.workers) | |
while self.running and len(wrap.children) < wrap.workers: | |
self._start_child(wrap) |
lauch_service 除了承受 service 以外,还须要承受一个 workers 参数,即子过程的个数,而后调用_start_child 启动多个子过程
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers: | |
# Limit ourselves to one process a second (over the period of | |
# number of workers * 1 second). This will allow workers to | |
# start up quickly but ensure we don't fork off children that | |
# die instantly too quickly. | |
if time.time() - wrap.forktimes[0] < wrap.workers: | |
LOG.info('Forking too fast, sleeping') | |
time.sleep(1) | |
wrap.forktimes.pop(0) | |
wrap.forktimes.append(time.time()) | |
pid = os.fork() | |
if pid == 0: | |
self.launcher = self._child_process(wrap.service) | |
while True: | |
self._child_process_handle_signal() | |
status, signo = self._child_wait_for_exit_or_signal(self.launcher) | |
if not _is_sighup_and_daemon(signo): | |
self.launcher.wait() | |
break | |
self.launcher.restart() | |
os._exit(status) | |
LOG.debug('Started child %d', pid) | |
wrap.children.add(pid) | |
self.children[pid] = wrap |
看见相熟的 fork 没有,只是简略的调用了一个 os.fork(),而后子过程开始运行,子过程调用_child_process
def _child_process(self, service):
self._child_process_handle_signal() | |
# Reopen the eventlet hub to make sure we don't share an epoll | |
# fd with parent and/or siblings, which would be bad | |
eventlet.hubs.use_hub() | |
# Close write to ensure only parent has it open | |
os.close(self.writepipe) | |
# Create greenthread to watch for parent to close pipe | |
eventlet.spawn_n(self._pipe_watcher) | |
# Reseed random number generator | |
random.seed() | |
launcher = Launcher(self.conf, restart_method=self.restart_method) | |
launcher.launch_service(service) | |
return launcher |
_child_process 其实很简略,创立一个 Launcher,调用 Laucher.launch_service 办法,后面介绍过,其实 ServiceLauncher 继承自 Launcher,也是调用的 launcher_service 办法,将服务启动,因而接下来的步骤能够参考后面,最终都将调用 service.start 办法启动服务
三、WSGIService 的启动—续
回到后面的启动局部,从 launcher 节的阐明,咱们晓得服务的启动最终调用了 service 的 start 办法,而这里的 service 就是咱们最开始在 api.py 中创立的 service,而后一层层传进前面的启动器中的,咱们持续回到 WSGIService 类中的 start(self)办法
def start(self):
… | |
if self.manager: | |
self.manager.init_host() | |
self.server.start() | |
self.port = self.server.port |
这里调用了 oslo_service/wsgi.py 中的 start(self)办法
def start(self):
… | |
self.dup_socket = self.socket.dup() | |
if self._use_ssl: | |
self.dup_socket = sslutils.wrap(self.conf, self.dup_socket) | |
wsgi_kwargs = { | |
'func': eventlet.wsgi.server, | |
'sock': self.dup_socket, | |
'site': self.app, | |
'protocol': self._protocol, | |
'custom_pool': self._pool, | |
'log': self._logger, | |
'log_format': self.conf.wsgi_log_format, | |
'debug': False, | |
'keepalive': self.conf.wsgi_keep_alive, | |
'socket_timeout': self.client_socket_timeout | |
} | |
if self._max_url_len: | |
wsgi_kwargs['url_length_limit'] = self._max_url_len | |
self._server = eventlet.spawn(**wsgi_kwargs) |
留神 wsgi_kwargs 中的参数 func,它的值为 eventlet.wsgi.server,在 eventlet/wsgi.py 的定义如下:
def server(sock, site,
… | |
try: | |
serv.log.info("(%s) wsgi starting up on %s" % (serv.pid, socket_repr(sock))) | |
while is_accepting: | |
try: | |
client_socket = sock.accept() | |
client_socket[0].settimeout(serv.socket_timeout) | |
serv.log.debug("(%s) accepted %r" % (serv.pid, client_socket[1])) | |
try: | |
pool.spawn_n(serv.process_request, client_socket) | |
except AttributeError: | |
warnings.warn("wsgi's pool should be an instance of ""eventlet.greenpool.GreenPool, is %s. Please convert your" | |
"call site to use GreenPool instead" % type(pool), | |
DeprecationWarning, stacklevel=2) | |
pool.execute_async(serv.process_request, client_socket) | |
except ACCEPT_EXCEPTIONS as e: | |
if support.get_errno(e) not in ACCEPT_ERRNO: | |
raise | |
except (KeyboardInterrupt, SystemExit): | |
serv.log.info("wsgi exiting") | |
break | |
finally: | |
pool.waitall() | |
… |
看,是不是看到相熟的一幕了!sock.accept() 监听申请,每当接管到一个新申请,调用 pool.spawn_n() 启动一个协程解决该申请
四、Service 的启动
Service 类型的过程同样由 nova/cmd/* 目录下某些文件创建:
nova-schedule: nova/cmd/schedule.py
……
作为消息中间件的消费者,它们监听各自的 queue,每当有 rpc 申请来长期,它们创立一个新的协程解决 rpc 申请。以 nova-schedule 为例,启动时初始化一个 Server(由 service.py 定义) 对象。
整个 Launcher 过程跟 WSGIServer 一样,只是 service 的 start()有些区别而已
def start(self):
… | |
target = messaging.Target(topic=self.topic, server=self.host) | |
endpoints = [self.manager] | |
endpoints.extend(self.manager.additional_endpoints) | |
serializer = objects_base.KarborObjectSerializer() | |
self.rpcserver = rpc.get_server(target, endpoints, serializer) | |
self.rpcserver.start() |
通过层层调用,最终生成了这样一个 RPCServer 对象
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'): | |
super(RPCServer, self).__init__(transport, dispatcher, executor) | |
self._target = target |
该类继承自 MessageHandlingServer;
注:nova 的各个组件都依赖 oslo.messaging 拜访音讯服务器,通过 oslo/messaging/server.py 初始化一个 MessageHandlingServer 的对象,监听音讯队列。
最终调用了该 service 的 start 办法
def start(self, override_pool_size=None):
… | |
if self._started: | |
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently' | |
'racy. It is deprecated, and will become a noop' | |
'in a future release of oslo.messaging. If you' | |
'need to restart MessageHandlingServer you should' | |
'instantiate a new object.')) | |
self._started = True | |
executor_opts = {} | |
if self.executor_type in ("threading", "eventlet"): | |
executor_opts["max_workers"] = (override_pool_size or self.conf.executor_thread_pool_size) | |
self._work_executor = self._executor_cls(**executor_opts) | |
try: | |
self.listener = self._create_listener() | |
except driver_base.TransportDriverError as ex: | |
raise ServerListenError(self.target, ex) | |
# HACK(sileht): We temporary pass the executor to the rabbit | |
# listener to fix a race with the deprecated blocking executor. | |
# We do this hack because this is need only for 'synchronous' | |
# executor like blocking. And this one is deprecated. Making | |
# driver working in an sync and an async way is complicated | |
# and blocking have 0% tests coverage. | |
if hasattr(self.listener, '_poll_style_listener'): | |
l = self.listener._poll_style_listener | |
if hasattr(l, "_message_operations_handler"): | |
l._message_operations_handler._executor = (self.executor_type) | |
self.listener.start(self._on_incoming) |
上述的对象又初始化一个 EventletExecutor(由 oslo/messaging/_executors/impl_eventlet.py) 类型的 excuete 对象,它调用 self.listener.poll() 监听 rpc 申请,每当接管到一个申请,创立一个协程解决该申请。
class EventletExecutor(base.ExecutorBase):
...... | |
def start(self): | |
if self._thread is not None: | |
return | |
@excutils.forever_retry_uncaught_exceptions | |
def _executor_thread(): | |
try: | |
while True: | |
incoming = self.listener.poll() | |
spawn_with(ctxt=self.dispatcher(incoming), | |
pool=self._greenpool) | |
except greenlet.GreenletExit: | |
return | |
self._thread = eventlet.spawn(_executor_thread) | |
博客:https://tunsuy.github.io/
github:https://github.com/tunsuy