为什么要多过程+协程?因为这能够取得极高的性能,倡议先通读 Python黑魔法 --- 异步IO( asyncio) 协程 一文。

废话不多说,上代码。

<!--more-->

import asyncioimport multiprocessingimport osimport timefrom multiprocessing import Manager# 业务类class BaiJiaHao():    async def get_author(self, rec):        """        协程代码        """        print('enter get author,wait for: %d' % rec['num'])        # 模仿IO操作,耗时依据传进来的num决定        await asyncio.sleep(rec['num'])        # 返回协程工作实现后的后果        return rec    def run(self):        # 假设咱们有11个工作要跑,每个工作耗时为num秒,串行的话须要43秒。        # 但咱们这个demo跑完只须要这些工作中的最大值:8秒        list = [{'title': 'title1', 'num': 2},                {'title': 'title2', 'num': 1},                {'title': 'title3', 'num': 3},                {'title': 'title4', 'num': 8},                {'title': 'title5', 'num': 2},                {'title': 'title6', 'num': 5},                {'title': 'title7', 'num': 7},                {'title': 'title8', 'num': 3},                {'title': 'title9', 'num': 4},                {'title': 'title10', 'num': 3},                {'title': 'title11', 'num': 5},                ]        result = run_get_author_in_multi_process(list)        print('result', result)def get_chunks(iterable, chunks=1):    """    此函数用于宰割若干工作到不同的过程里去    """    lst = list(iterable)    return [lst[i::chunks] for i in range(chunks)]def run_get_author(lists, queue):    """    这个就是子过程运行的函数,接管工作列表和用于过程间通信的Queue    """    print('exec run_get_author.child process id : %s, parent process id : %s' % (os.getpid(), os.getppid()))    # 每个子过程调配一个新的loop    loop = asyncio.new_event_loop()    # 初始化业务类,转成task或future    spider = BaiJiaHao()    tasks = [loop.create_task(spider.get_author(rec)) for rec in lists]    # 协程走起    loop.run_until_complete(asyncio.wait(tasks))    # 往queue写入每个工作的后果    for task in tasks:        queue.put(task.result())def run_get_author_in_multi_process(task_lists):    """    父过程函数,次要是宰割工作并初始化过程池,启动过程并返回后果    """    # process_count = len(tasks) % 2    # 过程数这里我用机器上的外围数,留神:未思考外围数比工作多的状况    process_count = multiprocessing.cpu_count()    print('process_count: %d' % process_count)    split_lists = get_chunks(task_lists, process_count)    pool = multiprocessing.Pool(process_count)    queue = Manager().Queue()    for lists in split_lists:        pool.apply_async(run_get_author, args=(lists, queue,))    pool.close()    pool.join()    result = []    # 从子过程读取后果并返回    while not queue.empty():        result.append(queue.get())    return resultnow = lambda : time.time()if __name__ == '__main__':    start = now()    spider = BaiJiaHao()    spider.run()    print('done','TIME: ', now() - start)

运行后果: