乐趣区

关于python:Python多线程

多线程根底概念

并行与并发

  • 并行:同时解决多个工作,必须在多核环境下
  • 一段时间内同时解决多个工作,单核也能够并发

并发伎俩

  • 线程:内核空间的调度
  • 过程:内核空间的调度
  • 协程:用户空间的调度

线程能够容许程序在同一过程空间中并发运行多个操作。本次次要介绍 Python 规范库中的多线程模块 threading。

<!–more–>

threading 模块

线程初始化

应用 threading 模块的 Thread 类初始化对象而后调用 start 办法启动线程。

import threading
import time

def worker(num):
    time.sleep(1)
    print('worker-{}'.format(num))

# 创立线程对象 target 参数是一个函数,这个函数即线程要执行的逻辑
threads = [threading.Thread(target=worker, args=(i,))for i in range(5)]
for t in threads:
    t.start()
    # start 办法启动一个线程,当这个线程的逻辑执行结束的时候,线程主动退出, Python 没有提供被动退出线程的办法

# 输入以下后果
worker-0worker-1worker-2worker-3



worker-4

初始化的五个线程的执行逻辑中的 print 办法打印字符串及换行符呈现了随机散布,即呈现了资源竞争。

给线程传递参数

import threading
import time

def worker(*args, **kwargs):
    time.sleep(1)
    print(args)
    print(kwargs)

threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()

# 输入
(1, 2, 3)
{'a': 'b'}

args 传递地位参数,kwargs 传递关键字参数。

Thread 罕用参数和办法

>>> help(threading.Thread)

能够看到 Thread 函数的初始化办法中的参数如下:

 |  __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
 |      This constructor should always be called with keyword arguments. Arguments are:
 |      
 |      *group* should be None; reserved for future extension when a ThreadGroup
 |      class is implemented.
 |      
 |      *target* is the callable object to be invoked by the run()
 |      method. Defaults to None, meaning nothing is called.
 |      
 |      *name* is the thread name. By default, a unique name is constructed of
 |      the form "Thread-N" where N is a small decimal number.
 |      
 |      *args* is the argument tuple for the target invocation. Defaults to ().
 |      
 |      *kwargs* is a dictionary of keyword arguments for the target
 |      invocation. Defaults to {}.

name

示意线程名称,默认状况下,线程名称是Thread-N,N 是一个较小的十进制数。咱们能够传递 name 参数,控制线程名称。

以下会导入 logging 模块来显示线程的名称等详细信息

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(num):
    time.sleep(1)
    logging.info('worker-{}'.format(num))

threads = [threading.Thread(target=worker, args=(i,), name='workerthread-{}'.format(i)) for i in range(5)]
for t in threads:
    t.start()

# 输入
2017-03-20 21:39:29,339 INFO [workerthread-0] worker-0
2017-03-20 21:39:29,340 INFO [workerthread-1] worker-1
2017-03-20 21:39:29,340 INFO [workerthread-2] worker-2
2017-03-20 21:39:29,340 INFO [workerthread-3] worker-3
2017-03-20 21:39:29,346 INFO [workerthread-4] worker-4

其中 logging 模块的 basicConfig 函数的 format 中的 %(threadName)s 就是用来输入以后线程的名称的。

线程能够重名, 线程名并不是线程的惟一标识,然而通常应该防止线程重名,通常的解决伎俩是加前缀

daemon

Daemon:守护

和 Daemon 线程绝对应的还有 Non-Daemon 线程,在此 Thread 初始化函数中的 daemon 参数即示意线程是否是 Daemon 线程。

  • Daemon 线程:会随同主线程完结而完结(能够了解为主线程完结,守护线程完结)
  • Non-Daemon 线程:不会随着主线程完结而完结,主线程须要期待 Non-Daemon 完结
import logging
import time
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')


def worker():
    logging.info('starting')
    time.sleep(2)
    logging.info('stopping')


if __name__ == '__main__':
    logging.info('starting')
    t1 = threading.Thread(target=worker, name='worker1', daemon=False)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=worker, name='worker2', daemon=True)
    t2.start()
    logging.info('stopping')

# 输入
2017-03-20 23:28:06,404 INFO [MainThread] starting
2017-03-20 23:28:06,436 INFO [worker1] starting
2017-03-20 23:28:07,492 INFO [worker2] starting
2017-03-20 23:28:07,492 INFO [MainThread] stopping  # 主线程执行实现
2017-03-20 23:28:08,439 INFO [worker1] stopping  # 主线程执行实现之后会等 Non-Daemon 线程执行实现,然而并不会等 Daemon 线程执行实现,即 Daemon 线程会随着主线程执行实现而开释

Thread.join()

如果想等 Daemon 线程执行实现之后主线程再退出,能够应用线程对象的 join() 办法

import logging
import time
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')


def worker():
    logging.info('starting')
    time.sleep(2)
    logging.info('stopping')


if __name__ == '__main__':
    logging.info('starting')
    t1 = threading.Thread(target=worker, name='worker1', daemon=False)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=worker, name='worker2', daemon=True)
    t2.start()
    logging.info('stopping')
    t1.join()
    t2.join()

# 输入
2017-03-20 23:41:07,217 INFO [MainThread] starting
2017-03-20 23:41:07,243 INFO [worker1] starting
2017-03-20 23:41:08,245 INFO [worker2] starting
2017-03-20 23:41:08,246 INFO [MainThread] stopping
2017-03-20 23:41:09,243 INFO [worker1] stopping
2017-03-20 23:41:10,248 INFO [worker2] stopping

应用 join 函数只有主线程就须要期待 Daemon 线程执行实现在推出。

join 函数的原型:join(self, timeout=None)

join 办法会阻塞直到线程退出或者超时, timeout 是可选的,如果不设置 timeout,会始终期待线程退出。如果设置了 timeout,会在超时之后退出或者线程执行实现退出。

因为 join 函数总是返回 None,因而在超时工夫达到之后如果要晓得线程是否还是存活的,能够调用 is_alive()办法判断线程是否存活。

threading 罕用办法

enumerate()

列出以后所有的 存活 的线程

>>> threading.enumerate()
[<_MainThread(MainThread, started 140209670301504)>, <Thread(worker1, started 140209545410304)>, <Thread(worker2, started daemon 140209537017600)>]

local()

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

ctx = threading.local()
ctx.data = 5
data = 'a'

def worker():
    logging.info(data)
    logging.info(ctx.data)

worker()
threading.Thread(target=worker).start()

# 输入
2017-03-21 00:02:08,102 INFO [MainThread] a
2017-03-21 00:02:08,113 INFO [MainThread] 5
2017-03-21 00:02:08,119 INFO [Thread-34] a
Exception in thread Thread-34:
Traceback (most recent call last):
  File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-28-5395bd925d87>", line 7, in worker
    logging.info(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data'

线程共享内存、状态和资源。然而 thread 模块的 local 类的对象的属性,只在以后线程可见。

Thread 类的派生

Python 中能够通过继承 Thread 类并重写 run 办法来编写多线程的逻辑,此时逻辑函数就是 run。

class mythread(threading.Thread):
    def run(self):
        print('mythread run')

t = mythread()
t.run()  # 输入 mythread run
t.start()  # 输入 mythread run

通过继承形式派生而来的子类对象能够同时执行 start 办法和 run 办法,后果是一样的,都是执行子类的 run 办法。然而非继承的形式不能同时应用 start 办法和 run 办法,会报错。

派生时逻辑函数的参数传递

class mythread(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__()  # 须要调用父类的初始化办法初始化
        self.args = args
        self.kwargs = kwargs

    def run(self):
        print('mythread run', self.args, self.kwargs)

t = mythread(1, 2, 3, a='b')
t.start()  # 输入 mythread run (1, 2, 3) {'a': 'b'}

Timer 类

Timer 类:Thread 类的派生类,也在 threading 模块中。意为定时器,用作线程的提早执行。

>>> help(threading.Timer)

Timer 类的初始化办法:__init__(self, interval, function, args=None, kwargs=None)

  • interval:工夫距离,即几秒之后开始执行 function
  • function:线程执行的逻辑函数
  • args:地位参数
  • kwargs:关键字参数

代码

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():
    logging.info('worker running')

t1 = threading.Timer(interval=3, function=worker)
t2 = threading.Timer(interval=3, function=worker)
t1.setName('t1')
t2.setName('t2')
logging.info('start')
t1.start()
t2.start()
time.sleep(2)
logging.info('canceling {}'.format(t1.name))
t1.cancel()  # 2s 之后依然能够勾销 t1
logging.info('end')

# 输入
2017-03-21 19:28:52,801 INFO [MainThread] start
2017-03-21 19:28:54,811 INFO [MainThread] canceling t1
2017-03-21 19:28:54,819 INFO [MainThread] end
2017-03-21 19:28:55,808 INFO [t2] worker running

Timer.cancel():勾销依然存活的定时器,如果定时器曾经开始执行 function,则无奈勾销。

Timer.setDaemon(True):设置定时器为守护线程

线程同步

当应用多个线程来拜访同一个数据时,会经常出现资源争用等线程平安问题(比方多个线程都在操作同一数据导致数据不统一),这时候咱们就能够应用一些同步技术来解决这类问题。比方 Event,Lock,Condition,Barrier,Semaphore 等等。

Event

>>> help(threading.Event)

Event 对象内置一个标记,这个标记能够由 set() 办法和 clear() 办法设定。线程能够应用 wait() 办法进行阻塞期待,晓得 Event 对象内置标记被 set。

  1. clear(self):设置内置标记为 False
  2. set(self):设置内置标记为 True
  3. wait(self, timeout=None):开始阻塞,直到内置标记被设置为 True(即 wait 会阻塞线程直到 set 办法被调用或者超时)
  4. is_set(self):当且仅当内置标记为 True 的时候返回 True

代码

以下代码实现的逻辑是:一个 boss 和五个睡觉工人,只有有一个工人实现了睡觉工作,那么就唤醒 boss 和其余工人。

import datetime
import threading
import logging
import random

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(event: threading.Event):
    s = random.randint(1, 5)
    event.wait(s)  # wait 办法而不应用 sleep 办法,能够让其余工人收到告诉后不再期待
    logging.info('sleep {}'.format(s))
    event.set()


def boss(event:threading.Event):
    start = datetime.datetime.now()
    event.wait()
    end = datetime.datetime.now()
    logging.info('that boss exit takes {}s'.format(end - start))

def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event,), name='boss')
    b.start()
    for i in range(5):
        t = threading.Thread(target=worker, args=(event,), name='worker-{}'.format(i))
        t.start()

执行 start()办法,测试后果

>>> start()

2017-03-21 21:20:17,195 INFO [worker-2] sleep 1
2017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s
2017-03-21 21:20:17,199 INFO [worker-0] sleep 2
2017-03-21 21:20:17,199 INFO [worker-1] sleep 3
2017-03-21 21:20:17,199 INFO [worker-3] sleep 2
2017-03-21 21:20:17,198 INFO [worker-4] sleep 1

能够看到:worker- 2 退出之后,boss 和另外四个 worker 也霎时就退出了。所以 event 对象的内置状态被 set 之后,相干线程就不再 wait 了。

  • event:在线程之间发送信号,通常用于某个线程须要期待其余线程解决实现某些动作之后能力启动

wait()办法的 timeout 参数

def worker(event: threading.Event):
    while not event.wait(3):
        logging.info('run run run')

event = threading.Event()
threading.Thread(target=worker, args=(event,)).start()

# 输入
2017-03-21 21:32:47,275 INFO [Thread-8] run run run
2017-03-21 21:32:50,277 INFO [Thread-8] run run run
2017-03-21 21:32:53,281 INFO [Thread-8] run run run
2017-03-21 21:32:56,284 INFO [Thread-8] run run run
...

程序每隔 3s 就会输入一次后果,直到执行 set()办法才会进行。因而咱们能够写一个定时器(相似于 Thread 类的派生类 Timer)。

代码

class Timer:
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.__target(), args=args, kwargs=kwargs)

    def __target(self):
        if not self.event.wait(self.interval):
            return self.function

    def start(self):
        self.thread.start()

    def cancel(self):
        self.event.set()

def worker(act):
    logging.info('run-{}'.format(act))

t = Timer(5, worker, 'hahaha')
t.start()  # 输入 2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha

提早 5s 之后执行了逻辑函数,也能够应用 cancel 函数勾销。(要留神参数的传递,此处 Timer 初始化不能应用关键字参数

Lock

event 是用来同步线程之间的操作的,然而如果要管制共享资源的拜访那就须要用到锁机制了,在 Python 规范库中的实现就是内置的 lock 类。

>>> help(threading.Lock)

threading.Lock()函数会创立一个 lock 类的对象。

>>> help(threading.Lock())

锁对象是一个同步原语(synchronization primitive),lock 对象次要有以下三个办法:

  1. acquire():acquire(blocking=True, timeout=-1) -> bool 取得锁(即锁定锁)。胜利取得锁返回 True,没有取得锁则返回 False。
  2. release():release() 开释锁
  3. locked():locked() -> bool 查看锁是否被锁住

代码

以下代码实现了在多个过程同时对资源进行拜访时,进行加锁和解锁的操作,保障加减操作和赋值操作组合之后的原子性。

class Counter:  # 计时器有加减办法,都会批改 value 值,因而都须要加锁解决
    def __init__(self, start=0):
        self.value = start
        self.lock = threading.Lock()

    def inc(self):
        self.lock.acquire()
        try:
            self.value += 1
        finally:
            self.lock.release()  # 须要用 finally 语句保障锁肯定会被开释,否则资源永远不可拜访

    def dec(self):
        self.lock.acquire()
        try:
            self.value -= 1
        finally:
            self.lock.release()

def inc_worker(c: Counter):
    pause = random.random()
    logging.info('sleeping-{}'.format(pause))
    time.sleep(pause)
    c.inc()
    logging.info('cur_value:{}'.format(c.value))

def dec_worker(c: Counter):
    pause = random.random()
    logging.info('sleeping-{}'.format(pause))
    time.sleep(pause)
    c.dec()
    logging.info('cur_value:{}'.format(c.value))

c = Counter()
for i in range(2):
    threading.Thread(target=inc_worker, args=(c,), name='inc_worker-{}'.format(i)).start()

for i in range(3):
    threading.Thread(target=dec_worker, args=(c,), name='dec_worker-{}'.format(i)).start()

测试输入

2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.6542416949220327
2017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.48615543229897873
2017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.12355589507242459
2017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.5276710391905681
2017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.5546251407611247
2017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-1
2017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:0
2017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-1
2017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-2
2017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1

可见,各项操作之间放弃互相原子性,没有呈现烦扰。

因为 lock 类实现了 __enter____exit__两个魔术办法,因而反对上下文管理器,能够批改以上 Counter 类的实现办法如下:

class Counter:
    def __init__(self, start=0):
        self.value = start
        self.lock = threading.Lock()

    def inc(self):
        self.lock.acquire()
        with self.lock:
            self.value += 1

    def dec(self):
        self.lock.acquire()
        with self.lock:
            self.value -= 1

即应用上下文管理器来代替 try...finally... 语句,测试输入应该以以上后果统一。

acquire 办法的 blocking 参数

当 blocking=True 时,A 线程中执行了 lock.acquire()办法之后并且没有执行到 lock.release()办法,如果在 B 线程中再次执行 lock.acquire()办法,则 B 线程阻塞。

  • 正如以上代码实现,当有 n 个线程须要批改一个共享资源的时候,其余线程在获取锁之前都处于阻塞状态。(python 的阻塞都会让出 cpu 的工夫片,因而不是忙期待)

当 blocking=Fasle 时,A 线程中执行了 lock.acquire()办法之后并且没有执行到 lock.release()办法,如果在 B 线程中再次执行 lock.acquire()办法,则 B 线程不会阻塞,并且 acquire 函数返回 False。

acquire 办法的 timeout 参数

当 blocking=True 并且 timeout>0 时,acquire 会始终阻塞到超时或者锁被开释。

acquire(0)的参数传递

模仿 acquire 办法的默认参数,编写一下函数进行模仿参数传递的过程:

def print1(blocking=True, timeout=-1):
    print(blocking, timeout)

print1(0)  # 输入 0 -1
print1(10)  # 输入 10 -1

可见第一个地位参数,代替了 blocking。也就是说 lock.acquire(0)等效于 lock.acquire(blocking=False)

RLock

失常的 lock 对象是不能屡次调用 acquire 的,然而可重用锁 RLock 能够屡次调用 acquire 而不阻塞,而且 release 时也要执行和 acquire 一样的次数。

Condition

除了 Event 对象之外,线程同步还能够应用条件同步机制 Condition。一类线程期待特定条件,而另一类线程收回特定条件满足的信号。

>>> help(threading.Condition)

在 Condition 的帮忙中有以下几个办法:

  • 初始化办法:init(self, lock=None)。如果给定了 lock 参数,那么必须是 Lock 或者 Rlock 对象,并且被当做底层锁来应用。如果没有指定,那么会创立一个 RLock 对象的锁,也被当做底层锁来应用。
  • 实现了 __enter____exit__办法,反对上下文管理器。
  • notify(self, n=1):唤醒一个或多个在以后 Condition 上期待的其余线程,如果此办法的调用线程没有取得锁,那么在调用的时候就会报错 RuntimeError
  • notify_all(self):唤醒所有线程
  • wait(self, timeout=None):始终期待着晓得被 notifyed 或者产生超时

实例代码

以下代码实现的是:有一个生产者线程,会生产若干次,每次生产完结后须要告诉所有的消费者线程来生产,因而上面代码应用的是 notify_all 办法。

import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Producer_Consumer_Model:
    def __init__(self):
        self.data = None
        self.event = threading.Event()  # 用来管制消费者退出
        self.condition = threading.Condition()

    def Consumer(self):
        while not self.event.is_set():
            with self.condition:
                self.condition.wait()  # 始终期待直到收到生产者告诉 notify_all
                logging.info(self.data)  # 收到告诉之后,开始执行消费者的业务逻辑局部

    def Producer(self):
        for _ in range(4):  # 每个生产者生产 4 次
            data = random.randint(0, 100)
            logging.info(data)
            with self.condition:
                self.data = data  # 写入胜利就示意生产胜利,因而须要在此加锁并且可能告诉消费者线程去生产,因而抉择应用 condition 来解决
                self.condition.notify_all()  # 生产胜利之后告诉所有的消费者去生产
            self.event.wait(1)  # 没生产一次期待 1s
        self.event.set()  # 所有的生产实现之后告诉消费者退出

m = Producer_Consumer_Model()
for i in range(3):
    threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()

p = threading.Thread(target=m.Producer, name='Producer')
p.start()

测试后果(一个生产者,三个消费者)

2017-03-22 22:07:42,875 INFO [Producer] 16
2017-03-22 22:07:42,883 INFO [Consumer-0] 16
2017-03-22 22:07:42,890 INFO [Consumer-2] 16
2017-03-22 22:07:42,894 INFO [Consumer-1] 16
2017-03-22 22:07:43,884 INFO [Producer] 76
2017-03-22 22:07:43,888 INFO [Consumer-0] 76
2017-03-22 22:07:43,895 INFO [Consumer-2] 76
2017-03-22 22:07:43,898 INFO [Consumer-1] 76
2017-03-22 22:07:44,889 INFO [Producer] 31
2017-03-22 22:07:44,891 INFO [Consumer-0] 31
2017-03-22 22:07:44,911 INFO [Consumer-2] 31
2017-03-22 22:07:44,913 INFO [Consumer-1] 31
2017-03-22 22:07:45,892 INFO [Producer] 17
2017-03-22 22:07:45,894 INFO [Consumer-0] 17
2017-03-22 22:07:45,907 INFO [Consumer-2] 17
2017-03-22 22:07:45,910 INFO [Consumer-1] 17

可见,生产者每生产一次,所有的消费者就会去生产。如果想管制每次生产之后告诉几个消费者来生产,那么就能够应用 notify 办法,指定消费者线程个数。

代码如下

import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Producer_Consumer_Model:
    def __init__(self):
        self.data = None
        self.event = threading.Event()  # 用来管制消费者退出
        self.condition = threading.Condition()

    def Consumer(self):
        while not self.event.is_set():
            with self.condition:
                self.condition.wait()  # 始终期待直到收到生产者告诉 notify_all
                logging.info(self.data)  # 收到告诉之后,开始执行消费者的业务逻辑局部

    def Producer(self):
        for _ in range(4):  # 每个生产者生产 4 次
            data = random.randint(0, 100)
            logging.info(data)
            with self.condition:
                self.data = data  # 写入胜利就示意生产胜利,因而须要在此加锁并且可能告诉消费者线程去生产,因而抉择应用 condition 来解决
                self.condition.notify(1)  # 生产胜利之后告诉所有的消费者去生产
            self.event.wait(1)  # 没生产一次期待 1s
        self.event.set()  # 所有的生产实现之后告诉消费者退出

m = Producer_Consumer_Model()
for i in range(3):
    threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()

p = threading.Thread(target=m.Producer, name='Producer')
p.start()

测试后果(一个生产者,三个消费者,每次生产之后只告诉一个消费者去生产)

2017-03-22 22:24:52,933 INFO [Producer] 11
2017-03-22 22:24:52,948 INFO [Consumer-0] 11
2017-03-22 22:24:53,949 INFO [Producer] 47
2017-03-22 22:24:53,967 INFO [Consumer-1] 47
2017-03-22 22:24:54,967 INFO [Producer] 14
2017-03-22 22:24:54,983 INFO [Consumer-2] 14
2017-03-22 22:24:55,986 INFO [Producer] 54
2017-03-22 22:24:55,993 INFO [Consumer-0] 54
  1. Condition 通常用于生产者消费者模式,生产者生产音讯之后,应用 notify 或者 notify_all 告诉消费者生产。
  2. 消费者应用 wait 办法阻塞期待生产者告诉
  3. notify 告诉指定个 wait 的线程,notify_all 告诉所有的 wait 线程
  4. 无论 notify/notify_all 还是 wait 都必须先 acqurie,实现后必须确保 release,通常应用 with 语法

Barrier

Barrier 类存在于 threading 模块中,中文能够翻译成栅栏

>>> help(threading.Barrier)

能够看到 Barrier 的次要办法和属性:

  • __init__(self, parties, action=None, timeout=None):初始化办法,创立一个 Barrier

    • parties:所有参加的线程的数量
    • action:所有的线程都 wait 之后并且在线程开释之前就会执行这个 action 函数,相当于集结之后要做的事件。
    • timeout:相当于给须要期待的每个线程的 wait 办法加上 timeout 参数,超时则 barrier 不再失效
  • abort(self):将 Barrier 设置成 broken 状态
  • reset(self):将 Barrier 重置为最后状态
  • wait(self, timeout=None):在 Barrier 前期待,返回在 Barrier 前期待的下标,从 0 到 parties-1
  • broken:如果 Barrier 处于 broken 状态则返回 True
  • n_waiting:以后曾经在 Barrier 处期待的线程的数量
  • parties:须要在 Barrier 处期待的线程的数量

示例代码

import threading
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

barrier = threading.Barrier(parties=3)

def worker(barrier: threading.Barrier):
    logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))
    worker_id = barrier.wait()
    logging.info('after barrier {}'.format(worker_id))

for i in range(3):
    threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start()

测试后果

2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:25:04,001 INFO [worker-2] after barrier 2
2017-03-22 23:25:04,001 INFO [worker-0] after barrier 0
2017-03-22 23:25:04,001 INFO [worker-1] after barrier 1

可见,所有的线程都会始终期待,晓得所有的线程都到期了,而后就通过 barrier,继续执行后续操作。

Barrier 会建设一个控制点,所有参加的线程都会阻塞,直到所有参加的“各方”达到这一点。它让线程离开启动,而后暂停,直到它们都筹备好再持续。因而,这一点能够了解为各个线程的一个集结点。

abort 函数的应用

将 Barrier 设置成 broken 状态。所有线程在参加集结过程中,只有执行了 barrier.abort 办法,那么正在期待的线程都会抛出 threading.BrokenBarrierError 异样。能够了解为,只有有一个线程确定曾经到不了 Barrier 并且告诉了 Barrier,那么 Barrier 就会执行 abort 办法,告诉所有正在 wait 的线程放弃集结。

实例代码

import threading
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(barrier: threading.Barrier):
    logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        logging.info('aborting')
    else:
        logging.info('after barrier {}'.format(worker_id))

barrier = threading.Barrier(4)  # 须要期待 4 个线程
for i in range(3):
    threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start()  # 3 个线程都开始 wait
barrier.abort()  # 还有一个线程没有到 wait,此时执行 abort 办法,则所有正在 wait 的线程都抛出异样

测试后果

2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:47:43,211 INFO [worker-2] aborting
2017-03-22 23:47:43,207 INFO [worker-1] aborting
2017-03-22 23:47:43,207 INFO [worker-0] aborting

Semaphore

Semaphore 类存在于 threading 模块中

help(threading.Semaphore)

信号量外部管理者一个计数器,这个计数器的值等于 release()办法调用的次数减去 acquire()办法调用的次数而后再加上初始值 value,value 默认为 1。

能够看到 Semaphore 的次要办法:

  • __init__(self, value=1):初始化一个信号量,value 为外部计数器赋初值,默认为 1
  • acquire(self, blocking=True, timeout=None):获取信号量,外部计数器减一
  • release(self):开释信号量,外部计数器加一

示例代码

import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Pool:
    def __init__(self, num):
        self.num = num
        self.conns = [self._make_connect(i) for i in range(num)]  # 寄存连贯
        self.sem = threading.Semaphore(num)  # 信号量外部计数器初始为连接数

    def _make_connect(self, name):  # 依据连贯名称创立连贯
        conn = 'connect-{}'.format(name)
        return conn

    def get_connect(self):  # 从连接池获取连贯
        self.sem.acquire()
        return self.conns.pop()

    def return_connect(self, conn):  # 将连贯 conn 返还到连接池中
        self.conns.insert(0, conn)
        self.sem.release()

def worker(pool: Pool):
    logging.info('starting')
    conn = pool.get_connect()  # 如果获取不到则会阻塞在 acquire 处
    logging.info('get connect {}'.format(conn))
    t = random.randint(1, 3)
    time.sleep(t)
    logging.info('takes {}s'.format(t))
    pool.return_connect(conn)
    logging.info('return connect {}'.format(conn))

pool = Pool(2)  # 连接池中有两个连贯能够应用
for i in range(3):  # 三个线程应用两个连贯开始工作
    threading.Thread(target=worker, args=(pool,), name='worker-{}'.format(i)).start()

测试后果

2017-03-23 00:54:36,056 INFO [worker-0] starting
2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-1
2017-03-23 00:54:36,074 INFO [worker-1] starting
2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-0
2017-03-23 00:54:36,089 INFO [worker-2] starting
2017-03-23 00:54:39,074 INFO [worker-0] takes 3s
2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-1
2017-03-23 00:54:39,076 INFO [worker-2] get connect connect-1
2017-03-23 00:54:39,093 INFO [worker-1] takes 3s
2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-0
2017-03-23 00:54:40,093 INFO [worker-2] takes 1s
2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1

这个测试结果显示:三个线程获取连接池中的两个连贯,后果呈现了一个线程期待其余线程执行实现之后再获取连贯的过程。

Queue

Condition 线程同步局部用来传递数据的是一个封装在生产者消费者模型中的元素 data(失常应用状况下个别封装的都是一个列表,相似与 Barrier 局部的连接池中的 conns 列表)。

Python 的 queue 模块中提供了同步的、线程平安的队列类,包含三种队列:

  • FIFO(先入先出)队列 Queue
  • LIFO(后入先出)队列 LifoQueue
  • 优先级队列 PriorityQueue

这些队列都实现了锁原语,可能在多线程中间接应用。能够应用队列来实现线程间的同步。因而咱们能够应用 queue 模块来替换掉生产者消费者中的全局元素,代码如下:

import random
import queue
import threading

class Producer_Consumer_Model:
    def __init__(self):
        self.q = queue.Queue()
        self.event = threading.Event()

    def Consumer(self):
        while not self.event.is_set():
            logging.info(self.q.get())

    def Producer(self):
        while not self.event.wait(3):
            data = random.randint(1, 100)
            logging.info(data)
            self.q.put(data)

m = Producer_Consumer_Model()
threading.Thread(target=m.Consumer, name='Consumer').start()
threading.Thread(target=m.Producer, name='Producer').start()

测试后果

2017-03-23 10:11:22,990 INFO [Producer] 26
2017-03-23 10:11:22,993 INFO [Consumer] 26
2017-03-23 10:11:25,993 INFO [Producer] 89
2017-03-23 10:11:26,003 INFO [Consumer] 89
2017-03-23 10:11:29,004 INFO [Producer] 14
2017-03-23 10:11:29,006 INFO [Consumer] 14
2017-03-23 10:11:32,007 INFO [Producer] 17
2017-03-23 10:11:32,009 INFO [Consumer] 17

每生产一次,消费者就会生产一次。当消费者线程,读取 Queue 则调用 Queue.get()办法,若 Queue 为空时消费者线程获取不到内容,就会阻塞在这里,直到胜利获取内容。

线程同步总结

  • Event:次要用于线程之间的事件告诉
  • Lock,Rlock:次要用于爱护共享资源
  • Condition:次要用于生产者消费者模型,能够了解为 Event 和 Lock 的结合体
  • Barrier:同步指定个期待的线程
  • Semaphore:次要用于爱护资源,和 Lock 的区别在于能够多个线程访问共享资源,而锁一次只能一个线程拜访到共享资源,即锁是 value= 1 的信号量
  • Queue:应用 FIFO 队列进行同步,实用于生产者消费者模型

GIL

GIL(Global Interpreter Lock):全局解释器锁

Python 代码的执行由 Python 主循环来管制,Python 在设计之初就思考到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对 Python 主循环的拜访由全局解释器锁(GIL)来管制,正是这个锁能保障同一时刻只有一个线程在运行。

因而 Python 多线程程序的执行程序如下:

  1. 设置 GIL
  2. 切换到一个线程去运行
  3. 运行
  4. 完结线程
  5. 解锁 GIL
  6. 反复以上步骤

因而,Python 的多线程并没有实现并行,只是实现了并发而已。如果要实现真正的并行,那就须要应用 Python 的多过程模块 multiprocessing(multiprocessing 模块的主旨是像治理线程一样来治理过程)。


参考资料

  1. threading — Manage Concurrent Operations Within a Process
  2. Python 线程同步机制: Locks, RLocks, Semaphores, Conditions, Events 和 Queues

记得帮我点赞哦!

精心整顿了计算机各个方向的从入门、进阶、实战的视频课程和电子书,依照目录正当分类,总能找到你须要的学习材料,还在等什么?快去关注下载吧!!!

朝思暮想,必有回响,小伙伴们帮我点个赞吧,非常感谢。

我是职场亮哥,YY 高级软件工程师、四年工作教训,回绝咸鱼争当龙头的斜杠程序员。

听我说,提高多,程序人生一把梭

如果有幸能帮到你,请帮我点个【赞】,给个关注,如果能顺带评论给个激励,将不胜感激。

职场亮哥文章列表:更多文章

自己所有文章、答复都与版权保护平台有单干,著作权归职场亮哥所有,未经受权,转载必究!

退出移动版