阅读本文大约需要 10 分钟。

14.说一下进程、线程、以及多任务(多进程、多线程和协程)

  • 进程

    • 概念
      一个程序对应一个进程,这个进程被叫做主进程,而一个主进程下面还有许多子进程。
    • 实现方式

      • fork()
        示例:

        import os                  print('current_pid :%d' % os.getpid())     res = os.fork()     # 子进程返回的是 0if res == 0:print('res: %d' % res)print('sub_pid: %d' % os.getpid())     # 主进程返回的是子进程的 pidelse:    print('main_pid: %d' % os.getpid())    print('res:%d' % res)     # 结果为current_pid :12775main_pid: 12775res:12776res: 0sub_pid: 12776multiprocessing.Process
      • multiprocessing.Process
        示例:

        from multiprocessing import Processimport os, time          print('man_process pid : %d' % os.getpid())     class NewProcess(Process):    def __init__(self):        Process.__init__(self)         def run(self):        time.sleep(3)        print('%d process was runing' % os.getpid())     np = NewProcess()np.start()     # 结果为man_process pid : 78467847 process was runing
      • multiprocessing.Pool

        • 同步(apply)

          示例:

          from multiprocessing import Poolimport time, os, random          print('main_process pid: %d' % os.getpid())     def run():    time.sleep(random.random())  # random.random() 随机生成一个小于 1 的浮点数    print('%d process was runing' % os.getpid())     p = Pool(3)     for i in range(4):    p.apply(run, args=())     p.close()print('waiting for sub_process')     while True:    # 获取 Pool 中剩余的进程数量    count = len(p._cache)    if count != 0:        print('there was %d sub_process' % count)        time.sleep(random.random())    else:        break             print('sub_process has done')     # 结果为main_process pid: 42954297 process was runing4296 process was runing4298 process was runing4297 process was runingwating for sub_processsub_process has done
        • 异步(apply_async)
          示例:

          from multiprocessing import Poolimport time, os, random                    print('main_process pid: %d' % os.getpid())          def run():    # random.random() 随机生成一个小于 1 的浮点数    time.sleep(random.random())      print('%d process was runing' % os.getpid())   p = Pool(3)          for i in range(4):    p.apply_async(run, args=())              p.close()          while True:    # 获取 Pool 中剩余的进程数量    count = len(p._cache)    if count != 0:        print('there was %d sub_process' % count)        time.sleep(random.random())    else:        break                  print('wiating for sub_process..')p.join()          print('sub_process has done')          # 结果为main_process pid: 4342wiating for sub_process..there was 4 sub_process4344 process was runingthere was 3 sub_process4345 process was runing4344 process was runing4343 process was runingsub_process has done
    • 优缺点

      • fork()是计算机最底层的进程实现方式,一个fork()方法创建出来的进程有两个:主进程、子进程。fork()创建出来的进程,主进程不会等待子进程。
      • multiprocessing模块通过将fork方法封装成一个Process类,该类有一个start()方法,当调用该方法时,会自动调用run()方法,开启一个进程。并且由Process创建出来的进程,可以使用join()方法,使得主进程堵塞,被迫等待子进程。
      • multiprocess下另一种开启进程的方式是通过Pool进程池来实现。进程池可以开启多个进程来执行多个任务,但是进程数最大不会超过系统 CPU 核数。同样的,由Pool创建出来的进程,主进程也不会等待子进程,通过join()方法可以迫使主进程等待子进程,或者使用apply()同步的方式。
    • 进程通信
      进程之间的通信可以通过队列(Queue)来进行,多个进程一部分向队列里写入数据,一部分从队列里读取数据,从而完成多进程之间的通信问题。
      示例:

      from multiprocessing import Process, Queueimport random, time, os    def write(q):    if not q.full():        for i in range(4):           q.put(i)           print('%d was writing data[%d] to queue' % (os.getpid(), i))              time.sleep(random.random())    else:        print('queue is full')  def read(q):    # 等待队列被写入数据    time.sleep(random.random())    while True:        if not q.empty():            data = q.get()            print('%d was reading data{%d} from queue' % (os.getpid(), data))        else:            print('queue is empty')            break      # 创建通信队列,进程之间,全局变量不共享q = Queue()pw = Process(target=write, args=(q,))pr = Process(target=read, args=(q,))      pw.start()pr.start()      pw.join()pr.join()print('end')      # 结果为4640 was writing data[0] to queue4640 was writing data[1] to queue4640 was writing data[2] to queue4641 was reading data{0} from queue4641 was reading data{1} from queue4641 was reading data{2} from queuequeue is empty4640 was writing data[3] to queueend

      由于进程的执行顺序问题,造成了 pr 先于 pw 执行,所以 pr 未读取到数据,pr 进程任务结束,堵塞解开,主进程继续向下运行,最后 pw 任务结束。

    • 进程通信改良
      示例:

      from multiprocessing import Process, Queueimport random, time, os        def write(q):    if not q.full():        for i in range(4):            q.put(i)            print('%d was writing data[%d] to queue' % (os.getpid(), i))                  time.sleep(random.random())    else:        print('queue is full')        def read(q):        # 等待队列被写入数据        time.sleep(random.random())        while True:            data = q.get()            print('%d was reading data{%d} from queue' % (os.getpid(), data))    # 创建通信队列,进程之间,没有全局变量共享之说q = Queue()pw = Process(target=write, args=(q,))pr = Process(target=read, args=(q,))    pw.start()pr.start()    pw.join()# pr 进程立刻结束pr.terminate()print('end')    # 结果为12898 was writing data[0] to queue12898 was writing data[1] to queue12898 was writing data[2] to queue12899 was reading data{0} from queue12899 was reading data{1} from queue12899 was reading data{2} from queue12898 was writing data[3] to queue12899 was reading data{3} from queueend
  • 线程

    • 概念
      线程是进程下的一部分,进程下负责执行代码程序的就是线程,一个进程下会有很多个线程。同样的,一个主线程下面也有很多子线程。

      另外,Python 中的线程依据的是 Java 中的线程模型,如果有兴趣的同学可以研究一下。

    • 实现方式

      示例:

      import threading, time    def run():    time.sleep(1)    # currentThread() 返回的是当前的线程对象信息    print('%s was runing' % threading.currentThread())    print('current thread\'name: %s' % threading.currentThread().getName())  # 创建一个线程t = threading.Thread(target=run, args=())  # 启动线程t.start()  # get_ident 返回的是当前线程对象所在的内存地址(id),该地址是唯一可以验证线程的数据# 也可使用 currentThread().getName() 来简单的区分线程print('current thread\'name: %s' % threading.currentThread().getName())print('main_thread tid: %s' % threading.get_ident())  # 结果为current thread'name: MainThreadmain_thread tid: 140427132020480<Thread(Thread-1, started 140427100555008)> was runingcurrent thread'name: Thread-1
    • 线程通信

      • 通信队列
        通信队列作为相对来说最为安全的线程通信手段,其中Queue模块自身拥有所有所需的锁,这使得通信队列中的对象可以安全的在多线程之间共享。

        这里用常见的「生产者-消费者模型」来介绍。

        示例:

        import threading, queue, time, random    flag = object()    def producter(q):    for i in range(4):        q.put(i)    print('%s put data{%d} in queue' % (threading.currentThread().getName(), i))    time.sleep(random.random())    q.put(flag)    def consumer(q):    time.sleep(random.random())    while True:        res = q.get()        if res == flag:            q.put(flag)            break        else:            print('%s get data{%d} from queue' % (threading.currentThread().getName(), res))    # 创建队列q = queue.Queue()    # 创建线程pro = threading.Thread(target=producter, args=(q,))con = threading.Thread(target=consumer, args=(q,))    pro.start()con.start()    # 结果为Thread-1 put data{0} in queueThread-1 put data{1} in queueThread-2 get data{0} from queueThread-2 get data{1} from queueThread-1 put data{2} in queueThread-2 get data{2} from queueThread-1 put data{3} in queueThread-2 get data{3} from queueend

        这里有一个细节。在多线程下,当生产者任务完成之后,向队列queue里添加了一个特殊对象(终止信号)flag,这样当消费者从queue中取出任务时,当取到flag时,意味着所有任务被取出,并再次将flag添加至queue中,这样其他线程中的消费者在接收到这个终止信号后,也会得知当前生产者任务已经全部发布。

      • 轮询
        通过为数据操作添加while循环判断,迫使线程被迫等待操作。(为了优化等待时间,应在最核心的位置添加判断条件)

        示例:

        import threading                class NewThread(threading.Thread):    flag = 0    g_num = 0            def __init__(self):         super().__init__()            def run(self):        print('%s was runing' % threading.currentThread().getName())        if self.name == 'Thread-1':            self.add_num()            NewThread.flag = 1        else:            # 轮询            # Thread-2 被迫等待 Thread-1 完成任务之后才能执行            while True:                if NewThread.flag:                    self.add_num()                    break            @classmethod    def add_num(cls):        global g_num        for i in range(1000000):            cls.g_num += 1        print('on the %s, g_num: %d' % (threading.currentThread().getName(), cls.g_num))        t1 = NewThread()t2 = NewThread()        t1.start()t2.start()        # 结果为Thread-1 was runingThread-2 was runingon the Thread-1, g_num: 1000000on the Thread-2, g_num: 2000000
      • 互斥锁
        互斥锁是专门为了针对线程安全而设计的一种结构,锁可以强制线程排序,保护线程安全,但是加锁、解锁会消耗系统 CPU 资源。
      • 互斥锁优化

        示例:

        import threading            class NewThread(threading.Thread):    g_num = 0    # 生成锁对象    lock = threading.Lock()          def __init__(self):         super().__init__()               def run(self):               # 判断当前线程是否上锁,若未上锁,则一直尝试上锁(acquire)直至成功             with NewThread.lock:                 print('%s was runing' % self.name)                 self.add_num()               @classmethod         def add_num(cls):             for i in range(1000000):                 cls.g_num += 1             print('on the %s g_num: %d' % (threading.currentThread().getName(), cls.g_num))      t1 = NewThread()t2 = NewThread()      t1.start()t2.start()      # 结果为Thread-1 was runingon the Thread-1 g_num: 1000000Thread-2 was runingon the Thread-2 g_num: 2000000
      • 死锁问题
        当多线程下出现多个锁,判断条件又是另一个线程里的锁时,就会出现一种情况:当另一个线程任务执行时间过长,或是线程结束,未解锁。当前线程由于迟迟无法上锁,程序始终阻塞,此时就会陷入死锁问题。
      • 死锁问题解决

        • 设置超时时间threading.Lock().acquire(timeout=3)只要在上锁时设置超时时间timeout=,只要超过时间,线程就会不再等待是否解锁,而是直接运行。但是这种方式很危险,可能会带来大量的等待时间。
        • 为每个锁添加一个特殊编号,多线程在获取锁的时候严格按照该编号的升序方式来获取,相当于为线程排序,这样就避免了多线程因为资源争抢,而陷入死锁的可能。
        • 银行家算法
  • 进程与线程的区别

    • 线程和进程的执行顺序都是一样的,都是由操作系统的调度算法决定,不是根据程序的编写顺序来决定。
    • 进程是资源分配的单位,而线程是 CPU 调度的单位。
    • 进程在主程序结束后,程序立马结束,需要手动利用join()方法使得主程序发生堵塞,来等待子进程。而主线程的任务结束后,程序会等待子线程结束才会结束。故不需要特意使用join()方法来使主线程等待子线程。
    • 多进程适合 CPU 密集型,多线程适合 I/O 密集型。
  • 协程

    • 概念
      线程下的一种,也叫微线程,单线程自身控制切换任务时机,达到多任务的效果。避免了由于系统在处理多进程或者多线程时,切换任务时需要的等待时间。这一点很像操作系统里的中断。
    • 实现方式

      • 生成器(yield)
        生成器相关内容可看问题 13。

        这里以一个简单的「生产者-消费者模型」来解释如何使用生成器实现协程。

        示例:

        import threading          def producter(c):    next(c)    n = 4    print('%s was running' % threading.currentThread().getName())        while n:        print('product data: %d' % n)        res = c.send(n)        print(res)        n -= 1    print('sale out')          def consumer():    res = ''         print('%s was running' % threading.currentThread().getName())    while True:        n = yield res             print('consume data: %d' % n)        res = '200 OK'     print('%s was running' % threading.currentThread().getName())c = consumer()     producter(c)     # 结果为MainThread was runningMainThread was runningMainThread was runningproduct data: 4consume data: 4200 OKproduct data: 3consume data: 3200 OKproduct data: 2consume data: 2200 OKproduct data: 1consume data: 1200 OKsale out

        可以看到,生产者事先不知道消费者具体要消费多少数据,生产者只是一直在生产。而消费者则是利用生成器的中断特性,consumer函数中,程序每一次循环遇到yield关键字就会停下,等待producter函数启动生成器,再继续下一次循环。

        在这中间只有一个线程在运行,任务的切换时机由程序员自己控制,避免了由于多线程之间的切换消耗,这样就简单实现了协程。

      • 异步 I/O(asyncio)
        由于生成器在未来的 Python 3.10 版本中将不在支持协程,而是推荐使用asyncio库,该库适用于高并发。

        自己目前不会,就不瞎 BB 了,具体可看文档。

        asyncio 中文文档

未写完,下次更新补上