多线程根底概念
并行与并发
- 并行:同时解决多个工作,必须在多核环境下
- 一段时间内同时解决多个工作,单核也能够并发
并发伎俩
- 线程:内核空间的调度
- 过程:内核空间的调度
- 协程:用户空间的调度
线程能够容许程序在同一过程空间中并发运行多个操作。本次次要介绍Python规范库中的多线程模块threading。
<!--more-->
threading模块
线程初始化
应用threading模块的Thread类初始化对象而后调用start办法启动线程。
import threadingimport timedef worker(num): time.sleep(1) print('worker-{}'.format(num))# 创立线程对象 target参数是一个函数, 这个函数即线程要执行的逻辑threads = [threading.Thread(target=worker, args=(i, ))for i in range(5)]for t in threads: t.start() # start 办法启动一个线程, 当这个线程的逻辑执行结束的时候,线程主动退出, Python 没有提供被动退出线程的办法# 输入以下后果worker-0worker-1worker-2worker-3worker-4
初始化的五个线程的执行逻辑中的print办法打印字符串及换行符呈现了随机散布,即呈现了资源竞争。
给线程传递参数
import threadingimport timedef worker(*args, **kwargs): time.sleep(1) print(args) print(kwargs)threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()# 输入(1, 2, 3){'a': 'b'}
args传递地位参数,kwargs传递关键字参数。
Thread罕用参数和办法
>>> help(threading.Thread)
能够看到Thread函数的初始化办法中的参数如下:
| __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None) | This constructor should always be called with keyword arguments. Arguments are: | | *group* should be None; reserved for future extension when a ThreadGroup | class is implemented. | | *target* is the callable object to be invoked by the run() | method. Defaults to None, meaning nothing is called. | | *name* is the thread name. By default, a unique name is constructed of | the form "Thread-N" where N is a small decimal number. | | *args* is the argument tuple for the target invocation. Defaults to (). | | *kwargs* is a dictionary of keyword arguments for the target | invocation. Defaults to {}.
name
示意线程名称,默认状况下,线程名称是Thread-N
,N是一个较小的十进制数。咱们能够传递name参数,控制线程名称。
以下会导入logging模块来显示线程的名称等详细信息
import threadingimport timeimport logginglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')def worker(num): time.sleep(1) logging.info('worker-{}'.format(num))threads = [threading.Thread(target=worker, args=(i, ), name='workerthread-{}'.format(i)) for i in range(5)]for t in threads: t.start()# 输入2017-03-20 21:39:29,339 INFO [workerthread-0] worker-02017-03-20 21:39:29,340 INFO [workerthread-1] worker-12017-03-20 21:39:29,340 INFO [workerthread-2] worker-22017-03-20 21:39:29,340 INFO [workerthread-3] worker-32017-03-20 21:39:29,346 INFO [workerthread-4] worker-4
其中logging模块的basicConfig函数的format中的%(threadName)s就是用来输入以后线程的名称的。
线程能够重名, 线程名并不是线程的惟一标识,然而通常应该防止线程重名,通常的解决伎俩是加前缀
daemon
Daemon:守护
和Daemon线程绝对应的还有Non-Daemon线程,在此Thread初始化函数中的daemon参数即示意线程是否是Daemon线程。
- Daemon线程:会随同主线程完结而完结(能够了解为主线程完结,守护线程完结)
- Non-Daemon线程:不会随着主线程完结而完结,主线程须要期待Non-Daemon完结
import loggingimport timeimport threadinglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')def worker(): logging.info('starting') time.sleep(2) logging.info('stopping')if __name__ == '__main__': logging.info('starting') t1 = threading.Thread(target=worker, name='worker1', daemon=False) t1.start() time.sleep(1) t2 = threading.Thread(target=worker, name='worker2', daemon=True) t2.start() logging.info('stopping')# 输入2017-03-20 23:28:06,404 INFO [MainThread] starting2017-03-20 23:28:06,436 INFO [worker1] starting2017-03-20 23:28:07,492 INFO [worker2] starting2017-03-20 23:28:07,492 INFO [MainThread] stopping # 主线程执行实现2017-03-20 23:28:08,439 INFO [worker1] stopping # 主线程执行实现之后会等Non-Daemon线程执行实现,然而并不会等Daemon线程执行实现,即Daemon线程会随着主线程执行实现而开释
Thread.join()
如果想等Daemon线程执行实现之后主线程再退出,能够应用线程对象的join()
办法
import loggingimport timeimport threadinglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')def worker(): logging.info('starting') time.sleep(2) logging.info('stopping')if __name__ == '__main__': logging.info('starting') t1 = threading.Thread(target=worker, name='worker1', daemon=False) t1.start() time.sleep(1) t2 = threading.Thread(target=worker, name='worker2', daemon=True) t2.start() logging.info('stopping') t1.join() t2.join()# 输入2017-03-20 23:41:07,217 INFO [MainThread] starting2017-03-20 23:41:07,243 INFO [worker1] starting2017-03-20 23:41:08,245 INFO [worker2] starting2017-03-20 23:41:08,246 INFO [MainThread] stopping2017-03-20 23:41:09,243 INFO [worker1] stopping2017-03-20 23:41:10,248 INFO [worker2] stopping
应用join函数只有主线程就须要期待Daemon线程执行实现在推出。
join函数的原型:join(self, timeout=None)
join办法会阻塞直到线程退出或者超时, timeout 是可选的,如果不设置timeout, 会始终期待线程退出。如果设置了timeout,会在超时之后退出或者线程执行实现退出。
因为join函数总是返回None,因而在超时工夫达到之后如果要晓得线程是否还是存活的,能够调用is_alive()办法判断线程是否存活。
threading罕用办法
enumerate()
列出以后所有的存活的线程
>>> threading.enumerate()[<_MainThread(MainThread, started 140209670301504)>, <Thread(worker1, started 140209545410304)>, <Thread(worker2, started daemon 140209537017600)>]
local()
import loggingimport threadinglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')ctx = threading.local()ctx.data = 5data = 'a'def worker(): logging.info(data) logging.info(ctx.data)worker()threading.Thread(target=worker).start()# 输入2017-03-21 00:02:08,102 INFO [MainThread] a2017-03-21 00:02:08,113 INFO [MainThread] 52017-03-21 00:02:08,119 INFO [Thread-34] aException in thread Thread-34:Traceback (most recent call last): File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner self.run() File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "<ipython-input-28-5395bd925d87>", line 7, in worker logging.info(ctx.data)AttributeError: '_thread._local' object has no attribute 'data'
线程共享内存、状态和资源。然而thread模块的local类的对象的属性, 只在以后线程可见。
Thread类的派生
Python中能够通过继承 Thread
类并重写 run
办法来编写多线程的逻辑,此时逻辑函数就是run。
class mythread(threading.Thread): def run(self): print('mythread run')t = mythread()t.run() # 输入mythread runt.start() # 输入mythread run
通过继承形式派生而来的子类对象能够同时执行start办法和run办法,后果是一样的,都是执行子类的run办法。然而非继承的形式不能同时应用start办法和run办法,会报错。
派生时逻辑函数的参数传递
class mythread(threading.Thread): def __init__(self, *args, **kwargs): super().__init__() # 须要调用父类的初始化办法初始化 self.args = args self.kwargs = kwargs def run(self): print('mythread run', self.args, self.kwargs)t = mythread(1, 2, 3, a='b')t.start() # 输入mythread run (1, 2, 3) {'a': 'b'}
Timer类
Timer类:Thread类的派生类,也在threading模块中。意为定时器,用作线程的提早执行。
>>> help(threading.Timer)
Timer类的初始化办法:__init__(self, interval, function, args=None, kwargs=None)
- interval:工夫距离,即几秒之后开始执行function
- function:线程执行的逻辑函数
- args:地位参数
- kwargs:关键字参数
代码
import threadingimport timeimport logginglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')def worker(): logging.info('worker running')t1 = threading.Timer(interval=3, function=worker)t2 = threading.Timer(interval=3, function=worker)t1.setName('t1')t2.setName('t2')logging.info('start')t1.start()t2.start()time.sleep(2)logging.info('canceling {}'.format(t1.name))t1.cancel() # 2s之后依然能够勾销t1logging.info('end')# 输入2017-03-21 19:28:52,801 INFO [MainThread] start2017-03-21 19:28:54,811 INFO [MainThread] canceling t12017-03-21 19:28:54,819 INFO [MainThread] end2017-03-21 19:28:55,808 INFO [t2] worker running
Timer.cancel():勾销依然存活的定时器,如果定时器曾经开始执行function,则无奈勾销。
Timer.setDaemon(True):设置定时器为守护线程
线程同步
当应用多个线程来拜访同一个数据时,会经常出现资源争用等线程平安问题(比方多个线程都在操作同一数据导致数据不统一),这时候咱们就能够应用一些同步技术来解决这类问题。比方Event,Lock,Condition,Barrier,Semaphore等等。
Event
>>> help(threading.Event)
Event对象内置一个标记,这个标记能够由set()办法和clear()办法设定。线程能够应用wait()办法进行阻塞期待,晓得Event对象内置标记被set。
- clear(self):设置内置标记为False
- set(self):设置内置标记为True
- wait(self, timeout=None):开始阻塞,直到内置标记被设置为True(即wait会阻塞线程直到set办法被调用或者超时)
- is_set(self):当且仅当内置标记为True的时候返回True
代码
以下代码实现的逻辑是:一个boss和五个睡觉工人,只有有一个工人实现了睡觉工作,那么就唤醒boss和其余工人。
import datetimeimport threadingimport loggingimport randomlogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')def worker(event: threading.Event): s = random.randint(1, 5) event.wait(s) # wait办法而不应用sleep办法,能够让其余工人收到告诉后不再期待 logging.info('sleep {}'.format(s)) event.set()def boss(event:threading.Event): start = datetime.datetime.now() event.wait() end = datetime.datetime.now() logging.info('that boss exit takes {}s'.format(end - start))def start(): event = threading.Event() b = threading.Thread(target=boss, args=(event, ), name='boss') b.start() for i in range(5): t = threading.Thread(target=worker, args=(event, ), name='worker-{}'.format(i)) t.start()
执行start()办法,测试后果
>>> start()2017-03-21 21:20:17,195 INFO [worker-2] sleep 12017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s2017-03-21 21:20:17,199 INFO [worker-0] sleep 22017-03-21 21:20:17,199 INFO [worker-1] sleep 32017-03-21 21:20:17,199 INFO [worker-3] sleep 22017-03-21 21:20:17,198 INFO [worker-4] sleep 1
能够看到:worker-2退出之后,boss和另外四个worker也霎时就退出了。所以event对象的内置状态被set之后,相干线程就不再wait了。
- event:在线程之间发送信号,通常用于某个线程须要期待其余线程解决实现某些动作之后能力启动
wait()办法的timeout参数
def worker(event: threading.Event): while not event.wait(3): logging.info('run run run')event = threading.Event()threading.Thread(target=worker, args=(event, )).start()# 输入2017-03-21 21:32:47,275 INFO [Thread-8] run run run2017-03-21 21:32:50,277 INFO [Thread-8] run run run2017-03-21 21:32:53,281 INFO [Thread-8] run run run2017-03-21 21:32:56,284 INFO [Thread-8] run run run...
程序每隔3s就会输入一次后果,直到执行set()办法才会进行。因而咱们能够写一个定时器(相似于Thread类的派生类Timer)。
代码
class Timer: def __init__(self, interval, function, *args, **kwargs): self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.event = threading.Event() self.thread = threading.Thread(target=self.__target(), args=args, kwargs=kwargs) def __target(self): if not self.event.wait(self.interval): return self.function def start(self): self.thread.start() def cancel(self): self.event.set()def worker(act): logging.info('run-{}'.format(act))t = Timer(5, worker, 'hahaha')t.start() # 输入2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha
提早5s之后执行了逻辑函数,也能够应用cancel函数勾销。(要留神参数的传递,此处Timer初始化不能应用关键字参数)
Lock
event是用来同步线程之间的操作的,然而如果要管制共享资源的拜访那就须要用到锁机制了,在Python规范库中的实现就是内置的lock类。
>>> help(threading.Lock)
threading.Lock()函数会创立一个lock类的对象。
>>> help(threading.Lock())
锁对象是一个同步原语(synchronization primitive),lock对象次要有以下三个办法:
- acquire(): acquire(blocking=True, timeout=-1) -> bool 取得锁(即锁定锁)。胜利取得锁返回True,没有取得锁则返回False。
- release(): release() 开释锁
- locked(): locked() -> bool 查看锁是否被锁住
代码
以下代码实现了在多个过程同时对资源进行拜访时,进行加锁和解锁的操作,保障加减操作和赋值操作组合之后的原子性。
class Counter: # 计时器有加减办法,都会批改value值,因而都须要加锁解决 def __init__(self, start=0): self.value = start self.lock = threading.Lock() def inc(self): self.lock.acquire() try: self.value += 1 finally: self.lock.release() # 须要用finally语句保障锁肯定会被开释,否则资源永远不可拜访 def dec(self): self.lock.acquire() try: self.value -= 1 finally: self.lock.release()def inc_worker(c: Counter): pause = random.random() logging.info('sleeping-{}'.format(pause)) time.sleep(pause) c.inc() logging.info('cur_value:{}'.format(c.value))def dec_worker(c: Counter): pause = random.random() logging.info('sleeping-{}'.format(pause)) time.sleep(pause) c.dec() logging.info('cur_value:{}'.format(c.value))c = Counter()for i in range(2): threading.Thread(target=inc_worker, args=(c, ), name='inc_worker-{}'.format(i)).start()for i in range(3): threading.Thread(target=dec_worker, args=(c, ), name='dec_worker-{}'.format(i)).start()
测试输入
2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.65424169492203272017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.486155432298978732017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.123555895072424592017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.52767103919056812017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.55462514076112472017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-12017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:02017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-12017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-22017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1
可见,各项操作之间放弃互相原子性,没有呈现烦扰。
因为lock类实现了__enter__
和__exit__
两个魔术办法,因而反对上下文管理器,能够批改以上Counter类的实现办法如下:
class Counter: def __init__(self, start=0): self.value = start self.lock = threading.Lock() def inc(self): self.lock.acquire() with self.lock: self.value += 1 def dec(self): self.lock.acquire() with self.lock: self.value -= 1
即应用上下文管理器来代替try...finally...
语句,测试输入应该以以上后果统一。
acquire办法的blocking参数
当blocking=True时,A线程中执行了lock.acquire()办法之后并且没有执行到lock.release()办法,如果在B线程中再次执行lock.acquire()办法,则B线程阻塞。
- 正如以上代码实现,当有n个线程须要批改一个共享资源的时候,其余线程在获取锁之前都处于阻塞状态。(python的阻塞都会让出cpu的工夫片,因而不是忙期待)
当blocking=Fasle时,A线程中执行了lock.acquire()办法之后并且没有执行到lock.release()办法,如果在B线程中再次执行lock.acquire()办法,则B线程不会阻塞,并且acquire函数返回False。
acquire办法的timeout参数
当blocking=True并且timeout>0时,acquire会始终阻塞到超时或者锁被开释。
acquire(0)的参数传递
模仿acquire办法的默认参数,编写一下函数进行模仿参数传递的过程:
def print1(blocking=True, timeout=-1): print(blocking, timeout)print1(0) # 输入0 -1print1(10) # 输入10 -1
可见第一个地位参数,代替了blocking。也就是说lock.acquire(0)等效于lock.acquire(blocking=False)
RLock
失常的lock对象是不能屡次调用acquire
的,然而可重用锁RLock
能够屡次调用 acquire
而不阻塞,而且 release
时也要执行和 acquire
一样的次数。
Condition
除了Event对象之外,线程同步还能够应用条件同步机制Condition。一类线程期待特定条件,而另一类线程收回特定条件满足的信号。
>>> help(threading.Condition)
在Condition的帮忙中有以下几个办法:
- 初始化办法:init(self, lock=None)。如果给定了lock参数,那么必须是Lock或者Rlock对象,并且被当做底层锁来应用。如果没有指定,那么会创立一个RLock对象的锁,也被当做底层锁来应用。
- 实现了
__enter__
和__exit__
办法,反对上下文管理器。 - notify(self, n=1):唤醒一个或多个在以后Condition上期待的其余线程,如果此办法的调用线程没有取得锁,那么在调用的时候就会报错RuntimeError
- notify_all(self):唤醒所有线程
- wait(self, timeout=None):始终期待着晓得被notifyed或者产生超时
实例代码
以下代码实现的是:有一个生产者线程,会生产若干次,每次生产完结后须要告诉所有的消费者线程来生产,因而上面代码应用的是notify_all办法。
import threadingimport timeimport loggingimport randomlogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')class Producer_Consumer_Model: def __init__(self): self.data = None self.event = threading.Event() # 用来管制消费者退出 self.condition = threading.Condition() def Consumer(self): while not self.event.is_set(): with self.condition: self.condition.wait() # 始终期待直到收到生产者告诉notify_all logging.info(self.data) # 收到告诉之后,开始执行消费者的业务逻辑局部 def Producer(self): for _ in range(4): # 每个生产者生产4次 data = random.randint(0, 100) logging.info(data) with self.condition: self.data = data # 写入胜利就示意生产胜利,因而须要在此加锁并且可能告诉消费者线程去生产,因而抉择应用condition来解决 self.condition.notify_all() # 生产胜利之后告诉所有的消费者去生产 self.event.wait(1) # 没生产一次期待1s self.event.set() # 所有的生产实现之后告诉消费者退出m = Producer_Consumer_Model()for i in range(3): threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()p = threading.Thread(target=m.Producer, name='Producer')p.start()
测试后果(一个生产者,三个消费者)
2017-03-22 22:07:42,875 INFO [Producer] 162017-03-22 22:07:42,883 INFO [Consumer-0] 162017-03-22 22:07:42,890 INFO [Consumer-2] 162017-03-22 22:07:42,894 INFO [Consumer-1] 162017-03-22 22:07:43,884 INFO [Producer] 762017-03-22 22:07:43,888 INFO [Consumer-0] 762017-03-22 22:07:43,895 INFO [Consumer-2] 762017-03-22 22:07:43,898 INFO [Consumer-1] 762017-03-22 22:07:44,889 INFO [Producer] 312017-03-22 22:07:44,891 INFO [Consumer-0] 312017-03-22 22:07:44,911 INFO [Consumer-2] 312017-03-22 22:07:44,913 INFO [Consumer-1] 312017-03-22 22:07:45,892 INFO [Producer] 172017-03-22 22:07:45,894 INFO [Consumer-0] 172017-03-22 22:07:45,907 INFO [Consumer-2] 172017-03-22 22:07:45,910 INFO [Consumer-1] 17
可见,生产者每生产一次,所有的消费者就会去生产。如果想管制每次生产之后告诉几个消费者来生产,那么就能够应用notify办法,指定消费者线程个数。
代码如下
import threadingimport timeimport loggingimport randomlogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')class Producer_Consumer_Model: def __init__(self): self.data = None self.event = threading.Event() # 用来管制消费者退出 self.condition = threading.Condition() def Consumer(self): while not self.event.is_set(): with self.condition: self.condition.wait() # 始终期待直到收到生产者告诉notify_all logging.info(self.data) # 收到告诉之后,开始执行消费者的业务逻辑局部 def Producer(self): for _ in range(4): # 每个生产者生产4次 data = random.randint(0, 100) logging.info(data) with self.condition: self.data = data # 写入胜利就示意生产胜利,因而须要在此加锁并且可能告诉消费者线程去生产,因而抉择应用condition来解决 self.condition.notify(1) # 生产胜利之后告诉所有的消费者去生产 self.event.wait(1) # 没生产一次期待1s self.event.set() # 所有的生产实现之后告诉消费者退出m = Producer_Consumer_Model()for i in range(3): threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()p = threading.Thread(target=m.Producer, name='Producer')p.start()
测试后果(一个生产者,三个消费者,每次生产之后只告诉一个消费者去生产)
2017-03-22 22:24:52,933 INFO [Producer] 112017-03-22 22:24:52,948 INFO [Consumer-0] 112017-03-22 22:24:53,949 INFO [Producer] 472017-03-22 22:24:53,967 INFO [Consumer-1] 472017-03-22 22:24:54,967 INFO [Producer] 142017-03-22 22:24:54,983 INFO [Consumer-2] 142017-03-22 22:24:55,986 INFO [Producer] 542017-03-22 22:24:55,993 INFO [Consumer-0] 54
- Condition 通常用于生产者消费者模式, 生产者生产音讯之后, 应用notify 或者 notify_all 告诉消费者生产。
- 消费者应用wait办法阻塞期待生产者告诉
- notify告诉指定个wait的线程, notify_all告诉所有的wait线程
- 无论notify/notify_all还是wait 都必须先acqurie, 实现后必须确保release, 通常应用with语法
Barrier
Barrier类存在于threading模块中,中文能够翻译成栅栏
>>> help(threading.Barrier)
能够看到Barrier的次要办法和属性:
__init__(self, parties, action=None, timeout=None)
:初始化办法,创立一个Barrierparties
:所有参加的线程的数量action
:所有的线程都wait之后并且在线程开释之前就会执行这个action函数,相当于集结之后要做的事件。timeout
:相当于给须要期待的每个线程的wait办法加上timeout参数,超时则barrier不再失效
abort(self)
:将Barrier设置成broken状态reset(self)
:将Barrier重置为最后状态wait(self, timeout=None)
:在Barrier前期待,返回在Barrier前期待的下标,从0到parties-1broken
:如果Barrier处于broken状态则返回Truen_waiting
:以后曾经在Barrier处期待的线程的数量parties
:须要在Barrier处期待的线程的数量
示例代码
import threadingimport logginglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')barrier = threading.Barrier(parties=3)def worker(barrier: threading.Barrier): logging.info('waiting for barrier with {} others'.format(barrier.n_waiting)) worker_id = barrier.wait() logging.info('after barrier {}'.format(worker_id))for i in range(3): threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start()
测试后果
2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others2017-03-22 23:25:04,001 INFO [worker-2] after barrier 22017-03-22 23:25:04,001 INFO [worker-0] after barrier 02017-03-22 23:25:04,001 INFO [worker-1] after barrier 1
可见,所有的线程都会始终期待,晓得所有的线程都到期了,而后就通过barrier,继续执行后续操作。
Barrier会建设一个控制点,所有参加的线程都会阻塞,直到所有参加的“各方”达到这一点。 它让线程离开启动,而后暂停,直到它们都筹备好再持续。因而,这一点能够了解为各个线程的一个集结点。
abort函数的应用
将Barrier设置成broken状态。所有线程在参加集结过程中,只有执行了barrier.abort办法,那么正在期待的线程都会抛出threading.BrokenBarrierError异样。能够了解为,只有有一个线程确定曾经到不了Barrier并且告诉了Barrier,那么Barrier就会执行abort办法,告诉所有正在wait的线程放弃集结。
实例代码
import threadingimport logginglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')def worker(barrier: threading.Barrier): logging.info('waiting for barrier with {} others'.format(barrier.n_waiting)) try: worker_id = barrier.wait() except threading.BrokenBarrierError: logging.info('aborting') else: logging.info('after barrier {}'.format(worker_id))barrier = threading.Barrier(4) # 须要期待4个线程for i in range(3): threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start() # 3个线程都开始waitbarrier.abort() # 还有一个线程没有到wait,此时执行abort办法,则所有正在wait的线程都抛出异样
测试后果
2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others2017-03-22 23:47:43,211 INFO [worker-2] aborting2017-03-22 23:47:43,207 INFO [worker-1] aborting2017-03-22 23:47:43,207 INFO [worker-0] aborting
Semaphore
Semaphore类存在于threading模块中
help(threading.Semaphore)
信号量外部管理者一个计数器,这个计数器的值等于release()办法调用的次数减去acquire()办法调用的次数而后再加上初始值value,value默认为1。
能够看到Semaphore的次要办法:
__init__(self, value=1)
:初始化一个信号量,value为外部计数器赋初值,默认为1acquire(self, blocking=True, timeout=None)
:获取信号量,外部计数器减一release(self)
:开释信号量,外部计数器加一
示例代码
import threadingimport timeimport loggingimport randomlogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')class Pool: def __init__(self, num): self.num = num self.conns = [self._make_connect(i) for i in range(num)] # 寄存连贯 self.sem = threading.Semaphore(num) # 信号量外部计数器初始为连接数 def _make_connect(self, name): # 依据连贯名称创立连贯 conn = 'connect-{}'.format(name) return conn def get_connect(self): # 从连接池获取连贯 self.sem.acquire() return self.conns.pop() def return_connect(self, conn): # 将连贯conn返还到连接池中 self.conns.insert(0, conn) self.sem.release()def worker(pool: Pool): logging.info('starting') conn = pool.get_connect() # 如果获取不到则会阻塞在acquire处 logging.info('get connect {}'.format(conn)) t = random.randint(1, 3) time.sleep(t) logging.info('takes {}s'.format(t)) pool.return_connect(conn) logging.info('return connect {}'.format(conn))pool = Pool(2) # 连接池中有两个连贯能够应用for i in range(3): # 三个线程应用两个连贯开始工作 threading.Thread(target=worker, args=(pool, ), name='worker-{}'.format(i)).start()
测试后果
2017-03-23 00:54:36,056 INFO [worker-0] starting2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-12017-03-23 00:54:36,074 INFO [worker-1] starting2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-02017-03-23 00:54:36,089 INFO [worker-2] starting2017-03-23 00:54:39,074 INFO [worker-0] takes 3s2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-12017-03-23 00:54:39,076 INFO [worker-2] get connect connect-12017-03-23 00:54:39,093 INFO [worker-1] takes 3s2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-02017-03-23 00:54:40,093 INFO [worker-2] takes 1s2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1
这个测试结果显示:三个线程获取连接池中的两个连贯,后果呈现了一个线程期待其余线程执行实现之后再获取连贯的过程。
Queue
Condition线程同步局部用来传递数据的是一个封装在生产者消费者模型中的元素data(失常应用状况下个别封装的都是一个列表,相似与Barrier局部的连接池中的conns列表)。
Python的queue模块中提供了同步的、线程平安的队列类,包含三种队列:
- FIFO(先入先出)队列Queue
- LIFO(后入先出)队列LifoQueue
- 优先级队列PriorityQueue
这些队列都实现了锁原语,可能在多线程中间接应用。能够应用队列来实现线程间的同步。因而咱们能够应用queue模块来替换掉生产者消费者中的全局元素,代码如下:
import randomimport queueimport threadingclass Producer_Consumer_Model: def __init__(self): self.q = queue.Queue() self.event = threading.Event() def Consumer(self): while not self.event.is_set(): logging.info(self.q.get()) def Producer(self): while not self.event.wait(3): data = random.randint(1, 100) logging.info(data) self.q.put(data)m = Producer_Consumer_Model()threading.Thread(target=m.Consumer, name='Consumer').start()threading.Thread(target=m.Producer, name='Producer').start()
测试后果
2017-03-23 10:11:22,990 INFO [Producer] 262017-03-23 10:11:22,993 INFO [Consumer] 262017-03-23 10:11:25,993 INFO [Producer] 892017-03-23 10:11:26,003 INFO [Consumer] 892017-03-23 10:11:29,004 INFO [Producer] 142017-03-23 10:11:29,006 INFO [Consumer] 142017-03-23 10:11:32,007 INFO [Producer] 172017-03-23 10:11:32,009 INFO [Consumer] 17
每生产一次,消费者就会生产一次。当消费者线程,读取Queue则调用Queue.get()办法,若Queue为空时消费者线程获取不到内容,就会阻塞在这里,直到胜利获取内容。
线程同步总结
- Event:次要用于线程之间的事件告诉
- Lock,Rlock:次要用于爱护共享资源
- Condition:次要用于生产者消费者模型,能够了解为Event和Lock的结合体
- Barrier:同步指定个期待的线程
- Semaphore:次要用于爱护资源,和Lock的区别在于能够多个线程访问共享资源,而锁一次只能一个线程拜访到共享资源,即锁是value=1的信号量
- Queue:应用FIFO队列进行同步,实用于生产者消费者模型
GIL
GIL(Global Interpreter Lock):全局解释器锁
Python代码的执行由Python 主循环来管制,Python 在设计之初就思考到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 主循环的拜访由全局解释器锁(GIL)来管制,正是这个锁能保障同一时刻只有一个线程在运行。
因而Python多线程程序的执行程序如下:
- 设置GIL
- 切换到一个线程去运行
- 运行
- 完结线程
- 解锁GIL
- 反复以上步骤
因而,Python的多线程并没有实现并行,只是实现了并发而已。如果要实现真正的并行,那就须要应用Python的多过程模块multiprocessing(multiprocessing模块的主旨是像治理线程一样来治理过程)。
参考资料
- threading — Manage Concurrent Operations Within a Process
- Python线程同步机制: Locks, RLocks, Semaphores, Conditions, Events和Queues
记得帮我点赞哦!
精心整顿了计算机各个方向的从入门、进阶、实战的视频课程和电子书,依照目录正当分类,总能找到你须要的学习材料,还在等什么?快去关注下载吧!!!
朝思暮想,必有回响,小伙伴们帮我点个赞吧,非常感谢。
我是职场亮哥,YY高级软件工程师、四年工作教训,回绝咸鱼争当龙头的斜杠程序员。听我说,提高多,程序人生一把梭
如果有幸能帮到你,请帮我点个【赞】,给个关注,如果能顺带评论给个激励,将不胜感激。
职场亮哥文章列表:更多文章
自己所有文章、答复都与版权保护平台有单干,著作权归职场亮哥所有,未经受权,转载必究!