[TOC]

一.过程

1.相干概念

  • 什么是程序?

程序:例如XXXX.py这是程序,处于动态的。

  • 什么是过程

过程:一个程序运行起来后,代码+用到的资源称之为过程,它是操作系统分配资源的根本单元。

在晚期面向过程设计的计算机构造中,过程是程序的根本执行实体;
在当代面向线程设计的计算机构造中,过程是线程的容器。

  • 同步/异步

所谓同步就是一个工作的实现须要依赖另外一个工作时,只有期待被依赖的工作实现后,依赖的工作能力算实现,这是一种牢靠的工作序列。
所谓异步是不须要期待被依赖的工作实现,只是告诉被依赖的工作要实现什么工作,依赖的工作也立刻执行,只有本人实现了整个工作就算实现了。至于被依赖的工作最终是否真正实现,依赖它的工作无奈确定,所以它是不牢靠的工作序列。

  • 阻塞/非阻塞

阻塞和非阻塞跟同步和异步无关,次要与程序期待音讯告诉时的状态无关。也就是说阻塞与非阻塞次要是从程序(线程)期待音讯告诉时的状态角度来讲的。

  • 并发/并行

1)并行,parallel 强调同一时刻同时执行
2)并发 concurrency 则指的一个时间段内去一起执行

2.过程的状态


就绪态:运行的条件都曾经慢去,正在等在cpu执行
执行态:cpu正在执行其性能
期待态:期待某些条件满足,例如一个程序sleep了,此时就处于期待态

3.Python中应用多过程

multiprocessing模块就是跨平台版本的多过程模块,提供了一个Process类来代表一个过程对象,这个对象能够了解为是一个独立的过程,能够执行另外的事件

  • 示例1
from multiprocessing import Processimport timedef run_process():    while True:        print("子过程----2----")        time.sleep(1)if __name__=='__main__':    p = Process(target=run_process) # target指定指标函数    p.start()    while True:        print("主过程----1----")        time.sleep(1)

Process语法:
Process([group [, target [, name [, args [, kwargs]]]]])

参数--------------------------
target:如果传递了函数的援用,能够工作这个子过程就执行这里的代码
args:给target指定的函数传递的参数,以元组的形式传递
kwargs:给target指定的函数传递命名参数
name:给过程设定一个名字,能够不设定
group:指定过程组,大多数状况下用不到
Process创立的实例对象的罕用办法:

办法--------------------------
start():启动子过程实例(创立子过程)
is_alive():判断过程子过程是否还在活着
join([timeout]):是否期待子过程执行完结,或期待多少秒
terminate():不论工作是否实现,立刻终止子过程
Process创立的实例对象的罕用属性:

属性-------------------------
name:以后过程的别名,默认为Process-N,N为从1开始递增的整数
pid:以后过程的pid(过程号)

  • 示例2 过程pid
from multiprocessing import Processimport timeimport osdef run_process():    while True:        print("子过程----pid:{}----".format(os.getpid()))        print()        time.sleep(1)if __name__=='__main__':    p = Process(target=run_process)    p.start()    while True:        print("主过程----pid:{}----".format(os.getpid()))        time.sleep(1)
  • 示例3 子过程指标办法传参
from multiprocessing import Processimport timeimport osdef run_process(course, teacher, *args, **kwargs):    while True:        print("子过程----pid:{}----{}上{}课".format(os.getpid(), teacher, course))        print()        time.sleep(1)if __name__=='__main__':    p = Process(target=run_process, args=('语文',), kwargs={'teacher':'张三'})    p.start()    while True:        print("主过程----pid:{}----{}上{}课".format(os.getpid(),'李四','数学'))        time.sleep(1)
  • 示例4 过程间不会共享全局变量
from multiprocessing import Processimport timeimport osnum_list = [0 , 1, 3, 4, 5, 6, 7, 8, 9, 10]i = 3def run_process1():    global  i    while i:        print("子过程----pid:{}----".format(os.getpid()))        num_list.pop()        print(num_list)        i = i - 1        time.sleep(1)def run_process2():    global i    while i:        print("子过程----pid:{}----".format(os.getpid()))        num_list.append(i+1)        print(num_list)        i = i - 1        time.sleep(1)if __name__=='__main__':    p = Process(target=run_process1)    p.start()    p = Process(target=run_process2)    p.start()

输入

子过程----pid:10187----[0, 1, 3, 4, 5, 6, 7, 8, 9]子过程----pid:10188----[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4]子过程----pid:10187----[0, 1, 3, 4, 5, 6, 7, 8]子过程----pid:10188----[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3]子过程----pid:10187----[0, 1, 3, 4, 5, 6, 7]子过程----pid:10188----[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3, 2]

4.过程间通信

能够应用multiprocessing模块的Queue实现多过程之间的数据传递,Queue自身是一个音讯列队程序。

示例

from multiprocessing import Process, Queueimport time, randomdef worker(q):    """实现文件"""    for i in range(10):        file_num = random.randint(1, 100)        print('已实现工作{}...'.format(file_num))        q.put(file_num)        time.sleep(1)def boss(q):    """查看文件"""    while True:        if not q.empty():            file_num = q.get(True)            print('已查看工作{}...'.format(file_num))            time.sleep(1)        else:            breakif __name__=='__main__':    # 创立Queue,并传给各个子过程:    q = Queue(5)    pw = Process(target=worker, args=(q,))    pb = Process(target=boss, args=(q,))    pw.start()    # pw.join()    pb.start()    # pb.join()

5.过程池

当须要创立的子过程数量不多时,能够间接利用multiprocessing中的Process动静成生多个过程,但如果是上百甚至上千个指标,手动的去创立过程的工作量微小,此时就能够用multiprocessing模块提供的Pool办法。

初始化Pool时,能够指定一个最大过程数,当有新的申请提交到Pool中时,如果池还没有满,那么就会创立一个新的过程用来执行该申请;但如果池中的过程数曾经达到指定的最大值,那么该申请就会期待,直到池中有过程完结,才会用之前的过程来执行新的工作。

示例

from multiprocessing import Poolimport os, time, randomdef worker(msg):    t_start = time.time()    print("start pro {},pid为{}".format(msg,os.getpid()))    # random.random()随机生成0~1之间的浮点数    time.sleep(random.random()*2)    t_stop = time.time()    print(msg,"执行结束,耗时{:.2f}" .format(t_stop-t_start))# 定义一个过程池,最大过程数3po = Pool(5)for i in range(0,10):    # Pool().apply_async(要调用的指标,(传递给指标的参数元祖,))    # 每次循环将会用闲暇进去的子过程去调用指标    po.apply_async(worker,(i,))print("----start----")time.sleep(10)po.close()  # 敞开过程池,敞开后po不再接管新的申请po.join()  # 期待po中所有子过程执行实现,必须放在close语句之后print("-----end-----")

6.过程间过程的通信

应用Pool创立过程,就须要应用multiprocessing.Manager()中的Queue()

示例

from multiprocessing import Pool, Managerimport time, randomdef worker(q):    """实现文件"""    for i in range(10):        file_num = random.randint(1, 100)        print('已实现工作{}...'.format(file_num))        q.put(file_num)        time.sleep(1)def boss(q):    """查看文件"""    while True:        if not q.empty():            file_num = q.get(True)            print('已查看工作{}...'.format(file_num))            time.sleep(1)        else:            breakif __name__=='__main__':    # 创立Queue,并传给各个子过程:    q = Manager().Queue()    po = Pool()    po.apply_async(worker, (q,))    time.sleep(1)  # 先让下面的工作向Queue存入数据,而后再让上面的工作开始从中取数据    po.apply_async(boss, (q,))    po.close()    po.join()

二.线程

当初操作系统提出过程的概念,每一个过程都认为本人独占所有的计算机硬件资源。
过程就是独立的王国,过程间不能够轻易的共享数据。
线程就是省份,同一个过程内的线程能够共享过程的资源,每一个线程都领有本人独立的堆栈。

线程同样有着相似过程的状态
1)运行态:该时刻,该线程正在占用CPU
2)就绪态:可随时转换为运行态,因为其余线程正在运行而暂停,该过程不占用CPU
3)阻塞态:除非某些内部事件产生,否则线程不能运行

Python线程的操作能够应用threading模块,threading模块是对更底层thread做了一些包装的,能够更加不便的被应用。

1.threading.Thread

Thread类:
def __init__(self, group=None, target=Nonoe, name=None, args=(), kwargs=None, daemon=None)

target 线程调用的对象,就是指标函数
name 为线程起个名字(能够重名,因为线程辨别靠ID,不靠名字)
args,为指标函数传递实参,元组
kwargs, 为指标函数关键字传参,字典

threading的属性和办法
current_thread() 返回以后线程对象
main_thread() 返回主线程对象
active_count() 以后处于alive状态的线程个数
enumerate() 返回所有活着的线程的列表,不包含曾经终止的和未开始的线程
get_ident() 返回以后线程的ID,非0整数

Thread实例的属性和办法
name 只是一个名字
ident 线程ID
is_alive() 返回线程是否活着

start() 启动线程,每一个线程必须且只能执行该办法一次
run() 运行线程函数

  • 示例1 线程的启动
import threadingimport timedef worker():    for _ in range(10):        time.sleep(0.5)        print('start')        print(threading.get_ident()) # 返回以后线程对象线程id        print('Thread over')t = threading.Thread(target=worker)t.start()
  • 示例2 多线程
import threadingimport timedef finish_working():    for i in range(5):        print("线程:{} --实现工作加{}".format(threading.currentThread(), i))        print(threading.current_thread())        time.sleep(1)if __name__ == "__main__":    for i in range(5):        t = threading.Thread(target=finish_working, name=str(i))        t.start() #启动线程,即让线程开始执行
  • .线程执行代码的封装

通过应用threading模块能实现多任务的程序开发,为了让每个线程的封装性更完满,所以应用threading模块时,往往会定义一个新的子类class,只有继承threading.Thread就能够了,而后重写run办法。

import threadingimport timeclass MyThread(threading.Thread):    def run(self):        print('run')        super().run()    def start(self):        print('start')        super().start()def worker1():    for _ in range(5):        time.sleep(0.5)        print('线程:{}-woring'.format(threading.currentThread()))        print('Thread over')t = MyThread(target=worker1,name='w')t.start()
  • 线程之间共享全局变量
import timecount = 100def work1():    global count    for i in range(3):        count += 1    print("----in work1, g_num is {}---".format(count))def work2():    global count    print("----in work2, g_num is {}---".format(count))print("---线程创立之前g_num is {}---".format(count))t1 = Thread(target=work1)t1.start()#延时一会,保障t1线程中的事件做完time.sleep(1)t2 = Thread(target=work2)t2.start()

输入

---线程创立之前g_num is 100-------in work1, g_num is 103-------in work2, g_num is 103---

2.线程同步

线程同步,线程间协同,通过某种技术,让一个线程拜访某些数据时,其余线程不能拜访这些数据,直到该线程实现对数据的操作。

  • Event

Event事件,是线程间通信机制中最简略的实现,应用一个外部的标记flag,通过flag的True或False的变动来进行操作。

名称含意
set()标记设置为True
clear()标记设置为False
is_set()标记是否设置为True
wait(timeout=None)设置期待标记为True的时长,None为有限期待。失去返回True, 未等到超时了返回False。
from threading import Event, Threadimport timedef boss(event:Event):    """    期待员工所有工作实现,点评    """    print("I'm boss, waiting for u.")    event.wait()    print('good job')def worker(event:Event, count=10):    print("I am working for u")    cups = []    while True:        print('make 1')        time.sleep(0.5)        cups.append(1)        if len(cups) >= count:            event.set()            break        print('I finished my job. cups={}'.format(cups))event = Event()w = Thread(target=worker, args=(event, ))b = Thread(target=boss, args=(event, ))w.start()time.sleep(1)b.start(

总结:
应用同一个event对象的标记flag
谁wait就是等到flag变为True,或者等到超时返回False,不限度期待的个数。

  • Lock

锁,但凡存在共享资源争抢的中央都能够应用锁,从而保障只有一个使用者都能够齐全应用这个资源。
示例 不加锁:

import threadingcups = []def worker(task=100):    while True:        count = len(cups)        print(count)        if count >= task:            break        cups.append(1)        print('{}'.format(threading.current_thread()))    print('I finished {} cups'.format(count))for x in range(10):    threading.Thread(target=worker, args=(100, )).start()

以上工作实现的数量会大于100,应用锁能够解决
示例

import loggingimport threadinglogging.basicConfig(level=logging.INFO)cups = []# 实例一把锁lock = threading.Lock()def worker(lock:threading.Lock,task=100):    while True:        lock.acquire() # 加锁        count = len(cups)        logging.info(count)        if count >= task:            lock.release() #记得退出循环时开释锁            break        cups.append(1)        lock.release() # 开释锁        logging.info('{}'.format(threading.current_thread()))    logging.info('I finished {} cups'.format(count))for x in range(10):    threading.Thread(target=worker, args=(lock, 100)).start()

一般来说加锁后还有一些代码实现,在开释锁之前还有可能抛异样,一旦出现异常,锁是无奈开释,然而以后线程可能因为这个异样被终止了,这就产生了死锁。

加锁、解锁罕用语句:
1)应用try...finally语句保障锁的开释
2)with上下文治理,锁对象反对上下文治理

示例:

from threading import Thread, Lockimport time, logginglogging.basicConfig(level=logging.INFO)class Counter:    def __init__(self):        self.c = 0        self.lock = Lock()    def inc(self):        try:            self.lock.acquire()            self.c += 1            logging.info('add {}'.format(self.c))        finally:            self.lock.release()    def dec(self):        try:            self.lock.acquire()            self.c -= 1            logging.info('sub {}'.format(self.c))        finally:            self.lock.release()    @property    def value(self):        with self.lock:            return self.cdef do(c:Counter, count=100):    for  _ in range(count):        for i in range(-50, 50):            if i < 0:                c.dec()            else:                c.inc()c = Counter()c1 = 10c2 = 10for i in range(c1):    Thread(target=do, args=(c,c2)).start()time.sleep(5)logging.info(c.value)

3.Condition

Condition 用于生产者、生产模型,为了解决生产者生产速度匹配问题。

构造方法Condition(lock=None), 能够传入一个lock或者RLock对象,默认是RLock。

名称含意
acquire(*args)获取锁
wait(self, timoout=None)期待或超工夫
notify(n=1)唤醒至少指定个数的期待的线程,没有期待的线程没有操作
notiy_all()唤醒所有期待的线程

示例1 不应用Condition

import threadingimport loggingimport randomlogging.basicConfig(level=logging.INFO)class Dispatcher:    def __init__(self, data=0):        self.data = data        self.event = threading.Event()    def produce(self):        for i in range(100):            data =  random.randint(1,100)            self.data = data            logging.info("produce--{}".format(self.data))            self.event.wait(1)      def custom(self):        while True:            logging.info("curstom---{}".format(self.data))            self.event.wait(1)d = Dispatcher()p = threading.Thread(target=d.produce)c = threading.Thread(target=d.custom)c.start()p.start()

示例2 应用Condition

import threadingimport loggingimport randomlogging.basicConfig(level=logging.INFO)class Dispatcher:    def __init__(self, data=0):        self.data = data        self.event = threading.Event()        self.cond = threading.Condition()    def produce(self):        for i in range(100):            data =  random.randint(1,100)            with self.cond:                self.data = data                self.cond.notify_all()            logging.info('produce {}'.format(self.data))            self.event.wait(1)    def custom(self):        while True:            with self.cond:                self.cond.wait()                logging.info('custom {}'.format(self.data))            self.event.wait(0.5)d = Dispatcher()p = threading.Thread(target=d.produce)c = threading.Thread(target=d.custom)c.start()p.start()

示例3 多个消费者

import threadingimport loggingimport randomlogging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')class Dispatcher:    def __init__(self, data=0):        self.data = data        self.event = threading.Event()        self.cond = threading.Condition()    def produce(self):        for i in range(100):            data =  random.randint(1,100)            with self.cond:                self.data = data                self.cond.notify(1)            logging.info('pru {}'.format(self.data))            self.event.wait(1)    def custom(self):        while True:            with self.cond:                self.cond.wait()                logging.info("线程{}--生产{}".format(threading.get_ident(), self.data))            self.event.wait(0.5)d = Dispatcher()p = threading.Thread(target=d.produce)c = threading.Thread(target=d.custom)c1 = threading.Thread(target=d.custom)c.start()c1.start()p.start()

总结:
Condition是用于生产者消费者模型中,解决生产者消费者速度匹配的问题。
采纳告诉机制,十分有效率。

应用形式:
应用Condition必须先acquire,用玩release,因为外部应用锁,默认应用RLock,最好的形式是应用with上下文。
消费者wait,期待告诉。
生产者生产好消息,对消费者发告诉,能够应用notify或者notify_all办法。

4 .Barrier

名称含意
Barrier(parties, action=None, timeout=None)构建Barrier对象,指定参与方数目。timeout是wait办法未指定超时的默认值。
n_waiting以后在屏障中期待的线程数
parties各方数,就是须要多少个期待
wait(timeout=None)期待通过屏障,返回0到[线程数-1]的整数,每个线程返回不同。如果wait办法设置了超时,并超时发送,屏障将处于broken状态。

办法:

名称含意
broken如果屏障处于突破状态,返回True
abort()将屏障至于broken状态,期待中的线程或者调用期待办法的线程都会抛出BrokenBarrierError异样, 直到reset办法来复原屏障
reset()复原屏障,从新开始拦挡

示例

import threadingimport logginglogging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')def worker(barrier:threading.Barrier):    logging.info('n_waiting={}'.format(barrier.n_waiting))    try:        bid = barrier.wait()        logging.info("after barrier {}".format(bid))    except threading.BrokenBarrierError:        logging.info('Broken Barrier in {}'.format(threading.current_thread().name))barrier = threading.Barrier(3)for i in range(5): #调整数字看后果    threading.Thread(target=worker, args=(barrier, )).start()

所有线程冲到了Barrier前期待,直到达到parties的数目,屏障关上,所有线程进行期待,继续执行。
再有线程wait,屏障就绪等到达到参数方数目。

Barrier利用场景:
并发初始化
所有线程都必须初始化实现后,能力持续工作,例如运行前加载数据、查看,如果这些工作没实现,就开始运行,将不能失常工作。
10个线程做10种筹备工作,每个线程负责一种工作,只有这10个线程都实现后,能力持续工作,先实现的要期待后实现的线程。
例如,启动一个程序,须要先加载磁盘文件、缓存预热、初始化连接池等工作。这些工作能够齐头并进,不过只有都满足了,程序能力持续向后执行。假如数据库连贯失败,则初始化工作失败,就要about,屏障broken,所有线程收到异样退出。