import jsonimport multiprocessingimport settingsfrom nameko.standalone.rpc import ClusterRpcProxyfrom nameko.standalone.events import event_dispatcherfrom nameko.constants import NON_PERSISTENT, PERSISTENTconfig = { 'AMQP_URI': f'amqp://{settings.RABBITMQ_CONFIG.username}:' f'{settings.RABBITMQ_CONFIG.password}@{settings.RABBITMQ_CONFIG.host}:' f'{settings.RABBITMQ_CONFIG.port}/{settings.RABBITMQ_CONFIG.vhost}'}dispatch = event_dispatcher( config, delivery_mode=NON_PERSISTENT, use_confirms=False)def run(): data = { 'name': 'jike', 'age': 18, 'score': { 'math': 100, 'science': 99.5, 'english': 59 } } for _ in range(10000000): dispatch( 'worker_service', 'to_rubbish', json.dumps(data) )if __name__ == "__main__": pool = multiprocessing.Pool(processes=8) for i in range(100000): pool.apply_async(run) pool.close() pool.join()
很简略,就是先 pool.close()
,再 pool.join()