共计 1952 个字符,预计需要花费 5 分钟才能阅读完成。
为什么要多过程 + 协程?因为这能够取得极高的性能,倡议先通读 Python 黑魔法 — 异步 IO(asyncio)协程 一文。
废话不多说,上代码。
<!–more–>
import asyncio
import multiprocessing
import os
import time
from multiprocessing import Manager
# 业务类
class BaiJiaHao():
async def get_author(self, rec):
"""协程代码"""
print('enter get author,wait for: %d' % rec['num'])
# 模仿 IO 操作,耗时依据传进来的 num 决定
await asyncio.sleep(rec['num'])
# 返回协程工作实现后的后果
return rec
def run(self):
# 假设咱们有 11 个工作要跑,每个工作耗时为 num 秒,串行的话须要 43 秒。# 但咱们这个 demo 跑完只须要这些工作中的最大值:8 秒
list = [{'title': 'title1', 'num': 2},
{'title': 'title2', 'num': 1},
{'title': 'title3', 'num': 3},
{'title': 'title4', 'num': 8},
{'title': 'title5', 'num': 2},
{'title': 'title6', 'num': 5},
{'title': 'title7', 'num': 7},
{'title': 'title8', 'num': 3},
{'title': 'title9', 'num': 4},
{'title': 'title10', 'num': 3},
{'title': 'title11', 'num': 5},
]
result = run_get_author_in_multi_process(list)
print('result', result)
def get_chunks(iterable, chunks=1):
"""此函数用于宰割若干工作到不同的过程里去"""
lst = list(iterable)
return [lst[i::chunks] for i in range(chunks)]
def run_get_author(lists, queue):
"""这个就是子过程运行的函数,接管工作列表和用于过程间通信的 Queue"""
print('exec run_get_author.child process id : %s, parent process id : %s' % (os.getpid(), os.getppid()))
# 每个子过程调配一个新的 loop
loop = asyncio.new_event_loop()
# 初始化业务类,转成 task 或 future
spider = BaiJiaHao()
tasks = [loop.create_task(spider.get_author(rec)) for rec in lists]
# 协程走起
loop.run_until_complete(asyncio.wait(tasks))
# 往 queue 写入每个工作的后果
for task in tasks:
queue.put(task.result())
def run_get_author_in_multi_process(task_lists):
"""父过程函数,次要是宰割工作并初始化过程池,启动过程并返回后果"""
# process_count = len(tasks) % 2
# 过程数这里我用机器上的外围数,留神:未思考外围数比工作多的状况
process_count = multiprocessing.cpu_count()
print('process_count: %d' % process_count)
split_lists = get_chunks(task_lists, process_count)
pool = multiprocessing.Pool(process_count)
queue = Manager().Queue()
for lists in split_lists:
pool.apply_async(run_get_author, args=(lists, queue,))
pool.close()
pool.join()
result = []
# 从子过程读取后果并返回
while not queue.empty():
result.append(queue.get())
return result
now = lambda : time.time()
if __name__ == '__main__':
start = now()
spider = BaiJiaHao()
spider.run()
print('done','TIME:', now() - start)
运行后果:
正文完