[TOC]
一. 过程
1. 相干概念
- 什么是程序?
程序:例如 XXXX.py 这是程序,处于动态的。
- 什么是过程
过程:一个程序运行起来后,代码 + 用到的资源称之为过程,它是操作系统分配资源的根本单元。
在晚期面向过程设计的计算机构造中,过程是程序的根本执行实体;
在当代面向线程设计的计算机构造中,过程是线程的容器。
- 同步 / 异步
所谓同步就是一个工作的实现须要依赖另外一个工作时,只有期待被依赖的工作实现后,依赖的工作能力算实现,这是一种牢靠的工作序列。
所谓异步是不须要期待被依赖的工作实现,只是告诉被依赖的工作要实现什么工作,依赖的工作也立刻执行,只有本人实现了整个工作就算实现了。至于被依赖的工作最终是否真正实现,依赖它的工作无奈确定,所以它是不牢靠的工作序列。
- 阻塞 / 非阻塞
阻塞和非阻塞跟同步和异步无关,次要与程序期待音讯告诉时的状态无关。也就是说阻塞与非阻塞次要是从程序(线程)期待音讯告诉时的状态角度来讲的。
- 并发 / 并行
1)并行,parallel 强调同一时刻同时执行
2)并发 concurrency 则指的一个时间段内去一起执行
2. 过程的状态
就绪态:运行的条件都曾经慢去,正在等在 cpu 执行
执行态:cpu 正在执行其性能
期待态:期待某些条件满足,例如一个程序 sleep 了,此时就处于期待态
3.Python 中应用多过程
multiprocessing 模块就是跨平台版本的多过程模块,提供了一个 Process 类来代表一个过程对象,这个对象能够了解为是一个独立的过程,能够执行另外的事件
- 示例 1
from multiprocessing import Process
import time
def run_process():
while True:
print("子过程 ----2----")
time.sleep(1)
if __name__=='__main__':
p = Process(target=run_process) # target 指定指标函数
p.start()
while True:
print("主过程 ----1----")
time.sleep(1)
Process 语法:
Process([group [, target [, name [, args [, kwargs]]]]])
参数 ————————–
target:如果传递了函数的援用,能够工作这个子过程就执行这里的代码
args:给 target 指定的函数传递的参数,以元组的形式传递
kwargs:给 target 指定的函数传递命名参数
name:给过程设定一个名字,能够不设定
group:指定过程组,大多数状况下用不到
Process 创立的实例对象的罕用办法:
办法 ————————–
start():启动子过程实例(创立子过程)
is_alive():判断过程子过程是否还在活着
join([timeout]):是否期待子过程执行完结,或期待多少秒
terminate():不论工作是否实现,立刻终止子过程
Process 创立的实例对象的罕用属性:
属性 ————————-
name:以后过程的别名,默认为 Process-N,N 为从 1 开始递增的整数
pid:以后过程的 pid(过程号)
- 示例 2 过程 pid
from multiprocessing import Process
import time
import os
def run_process():
while True:
print("子过程 ----pid:{}----".format(os.getpid()))
print()
time.sleep(1)
if __name__=='__main__':
p = Process(target=run_process)
p.start()
while True:
print("主过程 ----pid:{}----".format(os.getpid()))
time.sleep(1)
- 示例 3 子过程指标办法传参
from multiprocessing import Process
import time
import os
def run_process(course, teacher, *args, **kwargs):
while True:
print("子过程 ----pid:{}----{}上 {} 课".format(os.getpid(), teacher, course))
print()
time.sleep(1)
if __name__=='__main__':
p = Process(target=run_process, args=('语文',), kwargs={'teacher':'张三'})
p.start()
while True:
print("主过程 ----pid:{}----{}上 {} 课".format(os.getpid(),'李四','数学'))
time.sleep(1)
- 示例 4 过程间不会共享全局变量
from multiprocessing import Process
import time
import os
num_list = [0 , 1, 3, 4, 5, 6, 7, 8, 9, 10]
i = 3
def run_process1():
global i
while i:
print("子过程 ----pid:{}----".format(os.getpid()))
num_list.pop()
print(num_list)
i = i - 1
time.sleep(1)
def run_process2():
global i
while i:
print("子过程 ----pid:{}----".format(os.getpid()))
num_list.append(i+1)
print(num_list)
i = i - 1
time.sleep(1)
if __name__=='__main__':
p = Process(target=run_process1)
p.start()
p = Process(target=run_process2)
p.start()
输入
子过程 ----pid:10187----
[0, 1, 3, 4, 5, 6, 7, 8, 9]
子过程 ----pid:10188----
[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4]
子过程 ----pid:10187----
[0, 1, 3, 4, 5, 6, 7, 8]
子过程 ----pid:10188----
[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3]
子过程 ----pid:10187----
[0, 1, 3, 4, 5, 6, 7]
子过程 ----pid:10188----
[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3, 2]
4. 过程间通信
能够应用 multiprocessing 模块的 Queue 实现多过程之间的数据传递,Queue 自身是一个音讯列队程序。
示例
from multiprocessing import Process, Queue
import time, random
def worker(q):
"""实现文件"""
for i in range(10):
file_num = random.randint(1, 100)
print('已实现工作{}...'.format(file_num))
q.put(file_num)
time.sleep(1)
def boss(q):
"""查看文件"""
while True:
if not q.empty():
file_num = q.get(True)
print('已查看工作{}...'.format(file_num))
time.sleep(1)
else:
break
if __name__=='__main__':
# 创立 Queue,并传给各个子过程:q = Queue(5)
pw = Process(target=worker, args=(q,))
pb = Process(target=boss, args=(q,))
pw.start()
# pw.join()
pb.start()
# pb.join()
5. 过程池
当须要创立的子过程数量不多时,能够间接利用 multiprocessing 中的 Process 动静成生多个过程,但如果是上百甚至上千个指标,手动的去创立过程的工作量微小,此时就能够用 multiprocessing 模块提供的 Pool 办法。
初始化 Pool 时,能够指定一个最大过程数,当有新的申请提交到 Pool 中时,如果池还没有满,那么就会创立一个新的过程用来执行该申请;但如果池中的过程数曾经达到指定的最大值,那么该申请就会期待,直到池中有过程完结,才会用之前的过程来执行新的工作。
示例
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("start pro {},pid 为{}".format(msg,os.getpid()))
# random.random()随机生成 0~1 之间的浮点数
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"执行结束,耗时{:.2f}" .format(t_stop-t_start))
# 定义一个过程池,最大过程数 3
po = Pool(5)
for i in range(0,10):
# Pool().apply_async(要调用的指标,(传递给指标的参数元祖,))
# 每次循环将会用闲暇进去的子过程去调用指标
po.apply_async(worker,(i,))
print("----start----")
time.sleep(10)
po.close() # 敞开过程池,敞开后 po 不再接管新的申请
po.join() # 期待 po 中所有子过程执行实现,必须放在 close 语句之后
print("-----end-----")
6. 过程间过程的通信
应用 Pool 创立过程,就须要应用 multiprocessing.Manager()中的 Queue()
示例
from multiprocessing import Pool, Manager
import time, random
def worker(q):
"""实现文件"""
for i in range(10):
file_num = random.randint(1, 100)
print('已实现工作{}...'.format(file_num))
q.put(file_num)
time.sleep(1)
def boss(q):
"""查看文件"""
while True:
if not q.empty():
file_num = q.get(True)
print('已查看工作{}...'.format(file_num))
time.sleep(1)
else:
break
if __name__=='__main__':
# 创立 Queue,并传给各个子过程:q = Manager().Queue()
po = Pool()
po.apply_async(worker, (q,))
time.sleep(1) # 先让下面的工作向 Queue 存入数据,而后再让上面的工作开始从中取数据
po.apply_async(boss, (q,))
po.close()
po.join()
二. 线程
当初操作系统提出过程的概念,每一个过程都认为本人独占所有的计算机硬件资源。
过程就是独立的王国,过程间不能够轻易的共享数据。
线程就是省份,同一个过程内的线程能够共享过程的资源,每一个线程都领有本人独立的堆栈。
线程同样有着相似过程的状态
1)运行态:该时刻,该线程正在占用 CPU
2)就绪态:可随时转换为运行态,因为其余线程正在运行而暂停,该过程不占用 CPU
3)阻塞态:除非某些内部事件产生,否则线程不能运行
Python 线程的操作能够应用 threading 模块,threading 模块是对更底层 thread 做了一些包装的,能够更加不便的被应用。
1.threading.Thread
Thread 类:
def __init__(self, group=None, target=Nonoe, name=None, args=(), kwargs=None, daemon=None)
target 线程调用的对象,就是指标函数
name 为线程起个名字(能够重名,因为线程辨别靠 ID,不靠名字)
args,为指标函数传递实参,元组
kwargs,为指标函数关键字传参,字典
threading 的属性和办法
current_thread() 返回以后线程对象
main_thread() 返回主线程对象
active_count() 以后处于 alive 状态的线程个数
enumerate() 返回所有活着的线程的列表,不包含曾经终止的和未开始的线程
get_ident() 返回以后线程的 ID,非 0 整数
Thread 实例的属性和办法
name 只是一个名字
ident 线程 ID
is_alive() 返回线程是否活着
start() 启动线程,每一个线程必须且只能执行该办法一次
run() 运行线程函数
- 示例 1 线程的启动
import threading
import time
def worker():
for _ in range(10):
time.sleep(0.5)
print('start')
print(threading.get_ident()) # 返回以后线程对象线程 id
print('Thread over')
t = threading.Thread(target=worker)
t.start()
- 示例 2 多线程
import threading
import time
def finish_working():
for i in range(5):
print("线程:{} -- 实现工作加{}".format(threading.currentThread(), i))
print(threading.current_thread())
time.sleep(1)
if __name__ == "__main__":
for i in range(5):
t = threading.Thread(target=finish_working, name=str(i))
t.start() #启动线程,即让线程开始执行
- . 线程执行代码的封装
通过应用 threading 模块能实现多任务的程序开发,为了让每个线程的封装性更完满,所以应用 threading 模块时,往往会定义一个新的子类 class,只有继承 threading.Thread 就能够了,而后重写 run 办法。
import threading
import time
class MyThread(threading.Thread):
def run(self):
print('run')
super().run()
def start(self):
print('start')
super().start()
def worker1():
for _ in range(5):
time.sleep(0.5)
print('线程:{}-woring'.format(threading.currentThread()))
print('Thread over')
t = MyThread(target=worker1,name='w')
t.start()
- 线程之间共享全局变量
import time
count = 100
def work1():
global count
for i in range(3):
count += 1
print("----in work1, g_num is {}---".format(count))
def work2():
global count
print("----in work2, g_num is {}---".format(count))
print("--- 线程创立之前 g_num is {}---".format(count))
t1 = Thread(target=work1)
t1.start()
#延时一会,保障 t1 线程中的事件做完
time.sleep(1)
t2 = Thread(target=work2)
t2.start()
输入
--- 线程创立之前 g_num is 100---
----in work1, g_num is 103---
----in work2, g_num is 103---
2. 线程同步
线程同步,线程间协同,通过某种技术,让一个线程拜访某些数据时,其余线程不能拜访这些数据,直到该线程实现对数据的操作。
- Event
Event 事件,是线程间通信机制中最简略的实现,应用一个外部的标记 flag,通过 flag 的 True 或 False 的变动来进行操作。
名称 | 含意 |
---|---|
set() | 标记设置为 True |
clear() | 标记设置为 False |
is_set() | 标记是否设置为 True |
wait(timeout=None) | 设置期待标记为 True 的时长,None 为有限期待。失去返回 True,未等到超时了返回 False。 |
from threading import Event, Thread
import time
def boss(event:Event):
"""期待员工所有工作实现,点评"""
print("I'm boss, waiting for u.")
event.wait()
print('good job')
def worker(event:Event, count=10):
print("I am working for u")
cups = []
while True:
print('make 1')
time.sleep(0.5)
cups.append(1)
if len(cups) >= count:
event.set()
break
print('I finished my job. cups={}'.format(cups))
event = Event()
w = Thread(target=worker, args=(event,))
b = Thread(target=boss, args=(event,))
w.start()
time.sleep(1)
b.start(
总结:
应用同一个 event 对象的标记 flag
谁 wait 就是等到 flag 变为 True,或者等到超时返回 False,不限度期待的个数。
- Lock
锁,但凡存在共享资源争抢的中央都能够应用锁,从而保障只有一个使用者都能够齐全应用这个资源。
示例 不加锁:
import threading
cups = []
def worker(task=100):
while True:
count = len(cups)
print(count)
if count >= task:
break
cups.append(1)
print('{}'.format(threading.current_thread()))
print('I finished {} cups'.format(count))
for x in range(10):
threading.Thread(target=worker, args=(100,)).start()
以上工作实现的数量会大于 100,应用锁能够解决
示例
import logging
import threading
logging.basicConfig(level=logging.INFO)
cups = []
# 实例一把锁
lock = threading.Lock()
def worker(lock:threading.Lock,task=100):
while True:
lock.acquire() # 加锁
count = len(cups)
logging.info(count)
if count >= task:
lock.release() #记得退出循环时开释锁
break
cups.append(1)
lock.release() # 开释锁
logging.info('{}'.format(threading.current_thread()))
logging.info('I finished {} cups'.format(count))
for x in range(10):
threading.Thread(target=worker, args=(lock, 100)).start()
一般来说加锁后还有一些代码实现,在开释锁之前还有可能抛异样,一旦出现异常,锁是无奈开释,然而以后线程可能因为这个异样被终止了,这就产生了死锁。
加锁、解锁罕用语句:
1)应用 try…finally 语句保障锁的开释
2)with 上下文治理,锁对象反对上下文治理
示例:
from threading import Thread, Lock
import time, logging
logging.basicConfig(level=logging.INFO)
class Counter:
def __init__(self):
self.c = 0
self.lock = Lock()
def inc(self):
try:
self.lock.acquire()
self.c += 1
logging.info('add {}'.format(self.c))
finally:
self.lock.release()
def dec(self):
try:
self.lock.acquire()
self.c -= 1
logging.info('sub {}'.format(self.c))
finally:
self.lock.release()
@property
def value(self):
with self.lock:
return self.c
def do(c:Counter, count=100):
for _ in range(count):
for i in range(-50, 50):
if i < 0:
c.dec()
else:
c.inc()
c = Counter()
c1 = 10
c2 = 10
for i in range(c1):
Thread(target=do, args=(c,c2)).start()
time.sleep(5)
logging.info(c.value)
3.Condition
Condition 用于生产者、生产模型,为了解决生产者生产速度匹配问题。
构造方法 Condition(lock=None), 能够传入一个 lock 或者 RLock 对象, 默认是 RLock。
名称 | 含意 |
---|---|
acquire(*args) | 获取锁 |
wait(self, timoout=None) | 期待或超工夫 |
notify(n=1) | 唤醒至少指定个数的期待的线程,没有期待的线程没有操作 |
notiy_all() | 唤醒所有期待的线程 |
示例 1 不应用 Condition
import threading
import logging
import random
logging.basicConfig(level=logging.INFO)
class Dispatcher:
def __init__(self, data=0):
self.data = data
self.event = threading.Event()
def produce(self):
for i in range(100):
data = random.randint(1,100)
self.data = data
logging.info("produce--{}".format(self.data))
self.event.wait(1)
def custom(self):
while True:
logging.info("curstom---{}".format(self.data))
self.event.wait(1)
d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)
c.start()
p.start()
示例 2 应用 Condition
import threading
import logging
import random
logging.basicConfig(level=logging.INFO)
class Dispatcher:
def __init__(self, data=0):
self.data = data
self.event = threading.Event()
self.cond = threading.Condition()
def produce(self):
for i in range(100):
data = random.randint(1,100)
with self.cond:
self.data = data
self.cond.notify_all()
logging.info('produce {}'.format(self.data))
self.event.wait(1)
def custom(self):
while True:
with self.cond:
self.cond.wait()
logging.info('custom {}'.format(self.data))
self.event.wait(0.5)
d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)
c.start()
p.start()
示例 3 多个消费者
import threading
import logging
import random
logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')
class Dispatcher:
def __init__(self, data=0):
self.data = data
self.event = threading.Event()
self.cond = threading.Condition()
def produce(self):
for i in range(100):
data = random.randint(1,100)
with self.cond:
self.data = data
self.cond.notify(1)
logging.info('pru {}'.format(self.data))
self.event.wait(1)
def custom(self):
while True:
with self.cond:
self.cond.wait()
logging.info("线程{}-- 生产{}".format(threading.get_ident(), self.data))
self.event.wait(0.5)
d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)
c1 = threading.Thread(target=d.custom)
c.start()
c1.start()
p.start()
总结:
Condition 是用于生产者消费者模型中,解决生产者消费者速度匹配的问题。
采纳告诉机制,十分有效率。
应用形式:
应用 Condition 必须先 acquire,用玩 release,因为外部应用锁,默认应用 RLock,最好的形式是应用 with 上下文。
消费者 wait,期待告诉。
生产者生产好消息,对消费者发告诉,能够应用 notify 或者 notify_all 办法。
4 .Barrier
名称 | 含意 |
---|---|
Barrier(parties, action=None, timeout=None) | 构建 Barrier 对象,指定参与方数目。timeout 是 wait 办法未指定超时的默认值。 |
n_waiting | 以后在屏障中期待的线程数 |
parties | 各方数,就是须要多少个期待 |
wait(timeout=None) | 期待通过屏障,返回 0 到 [线程数 -1] 的整数,每个线程返回不同。如果 wait 办法设置了超时,并超时发送,屏障将处于 broken 状态。 |
办法:
名称 | 含意 |
---|---|
broken | 如果屏障处于突破状态,返回 True |
abort() | 将屏障至于 broken 状态,期待中的线程或者调用期待办法的线程都会抛出 BrokenBarrierError 异样,直到 reset 办法来复原屏障 |
reset() | 复原屏障,从新开始拦挡 |
示例
import threading
import logging
logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')
def worker(barrier:threading.Barrier):
logging.info('n_waiting={}'.format(barrier.n_waiting))
try:
bid = barrier.wait()
logging.info("after barrier {}".format(bid))
except threading.BrokenBarrierError:
logging.info('Broken Barrier in {}'.format(threading.current_thread().name))
barrier = threading.Barrier(3)
for i in range(5): #调整数字看后果
threading.Thread(target=worker, args=(barrier,)).start()
所有线程冲到了 Barrier 前期待,直到达到 parties 的数目,屏障关上,所有线程进行期待,继续执行。
再有线程 wait,屏障就绪等到达到参数方数目。
Barrier 利用场景:
并发初始化
所有线程都必须初始化实现后,能力持续工作,例如运行前加载数据、查看,如果这些工作没实现,就开始运行,将不能失常工作。
10 个线程做 10 种筹备工作,每个线程负责一种工作,只有这 10 个线程都实现后,能力持续工作,先实现的要期待后实现的线程。
例如,启动一个程序,须要先加载磁盘文件、缓存预热、初始化连接池等工作。这些工作能够齐头并进,不过只有都满足了,程序能力持续向后执行。假如数据库连贯失败,则初始化工作失败,就要 about,屏障 broken,所有线程收到异样退出。