多道技术
单核实现并发的成果
必备知识点
并发
看起来像同时运行的就能够称之为并发
并行
真正意义上的同时执行
空间与工夫上的复用
空间上:多个程序专用一套计算机硬件
工夫上:切换+保留状态
ps:
- 并行必定算并发
- 单核的计算机必定不能实现并行,然而能够实现并发
补充:假如单核就是一个核,不思考CPU的内核数
多道技术图解
多道技术重点常识
"""切换(cpu)分为两种状况 1.当一个程序遇到IO操作的时候,操作系统会剥夺该程序的CPU执行权限 作用:进步了CPU的利用率,并且也不影响程序的执行效率 2.当一个程序长时间占用CPU的时候,操作系统也会剥夺该程序的CPU执行权限 作用:升高了程序的执行效率(程序运行工夫+切换工夫)"""
过程实践
必备知识点
过程与程序的区别
"""程序就是一堆躺在硬盘上的代码,是“死”的过程则示意程序正在执行的过程,是“活”的"""
过程调度
先来先服务调度算法
对长作业无利,对短作业有益
短作业优先调度算法
对短作业无利,对长作业有益
- 工夫片轮转法+多级反馈队列
过程运行的三状态图
示例:
两对重要概念
同步和异步
形容的是工作的提交形式
同步:工作提交之后,原地期待工作的返回后果,期待的过程中不做任何事(干等)
程序层面上体现的感觉就是卡住了
异步:工作提交之后,不原地期待工作的返回后果,间接去做其余事
我提交的工作后果如何获取?
工作的返回后果会有一个异步回调机制主动解决
阻塞非阻塞
形容的程序的运行状态
阻塞:阻塞态
非阻塞:就绪态、运行态
上述概念的组合:最高效的一种组合就是 <u>异步非阻塞</u>
现实状态:咱们写的代码永远处于就绪态和运行态之间切换(但根本不可能)
开启过程的两种形式
# 第一种 间接定义函数 罕用from multiprocessing import Processimport timedef task(name): print('%s is running' % name) time.sleep(3) print('%s is over' % name)if __name__ == '__main__': # 1、创立一个过程对象 p = Process(target=task, args=('jason',)) """ target=要运行的函数 args=(函数须要的参数) """ # 2、开启过程 p.start() # 通知操作系统帮你创立一个过程 异步 print('主')"""windows操作系统下 创立过程肯定要在main内创立因为windows下创立过程相似于模块导入的形式会从上往下顺次执行代码Linux中则是间接将代码残缺拷贝一份"""
# 第二种形式 类的继承 不罕用from multiprocessing import Processimport timeclass MyProcess(Process): # 定义一个类继承自Process,类名能够自定义 def run(self): # 将要运行的性能写入函数,函数名必须叫run!! print('hello beautiful girl!') time.sleep(1) print('get out!')if __name__ == '__main__': p = MyProcess() p.start() print('主')
总结
创立过程就是在内存中申请一块内存空间将须要运行的代码丢进去
一个过程对应在内存中就是一块独立的内存空间
多个过程对应在内存中就是多块独立的内存空间
过程与过程之间数据默认状况下是无奈间接交互,想交互能够借助第三方工具、模块
join办法
join是让主过程的代码期待子过程代码运行完结之后,再持续运行。不影响其余子过程的执行
from multiprocessing import Processimport timedef task(name, n): print('%s is running' % name) time.sleep(n) print('%s is over' % name)if __name__ == '__main__': # p1 = Process(target=task, args=('jason', 1)) # p2 = Process(target=task, args=('egon', 2)) # p3 = Process(target=task, args=('tank', 3)) # start_time = time.time() # p1.start() # p2.start() # p3.start() # 仅仅是通知操作系统要创立过程 # p1.join() # 主过程期待子过程p运行完结之后再持续往后执行 # p2.join() # p3.join() start_time = time.time() p_list = [] for i in range(1, 4): p = Process(target=task, args=('子过程%s' % i, i)) p.start() p_list.append(p) # 将起起来的所有过程对象放入一个列表中 for p in p_list: p.join() # 主过程期待每一个子过程p运行完结之后再持续往后执行 print('主', time.time() - start_time) >>>:子过程1 is running 子过程3 is running 子过程2 is running 子过程1 is over 子过程2 is over 子过程3 is over 主 3.059741973876953
过程间数据隔离(默认状况下)
from multiprocessing import Processmoney = 100def task(): global money # 部分批改全局 money = 666 print('子', money)if __name__ == '__main__': p = Process(target=task) p.start() p.join() print(money)
过程对象及其他办法
一台计算机下面运行着很多过程,那么计算机是如何辨别并治理这些过程服务端的呢?
计算机会给每一个运行的过程调配一个PID号
如何查看
Windows 进入 cmd 输出tasklist
查看全副过程,tasklist |findstr PID
查看具体的过程(PID是具体的过程号)
Mac 进入 终端 输出ps aux
查看全副过程,ps aux|grep PID
查看具体的过程(PID是具体的过程号)
from multiprocessing import Process, current_processimport oscurrent_process().pid # 查看以后过程号os.getpid() # 查看以后过程号os.getppid() # 查看以后过程的父过程号
from multiprocessing import Process, current_processimport timedef task(): print('%s is running' % current_process().pid) # current_process().pid 查看以后过程的过程号 time.sleep(3)if __name__ == '__main__': p = Process(target=task) p.start() print('主', current_process().pid)
from multiprocessing import Processimport timeimport osdef task(): print('%s is running' % os.getpid()) # 查看以后过程的过程号 print('子过程的主过程号:%s' % os.getppid()) # 查看以后过程的父过程的过程号 time.sleep(3)if __name__ == '__main__': p = Process(target=task) p.start() print('主', os.getpid()) print('主主', os.getppid())
p.terminate() # 杀死以后过程# 通知操作系统帮你去杀死以后过程 然而须要肯定的工夫 而代码的运行速度极快time.sleep(0.1) # 退出一个 time.sleep() 就能够失去正确后果print(p.is_alive()) # 判断以后过程是否存活
僵尸过程与孤儿过程
僵尸过程
死了然而没有死透
当你开设了子过程之后 该过程死后不会立即开释占用的过程号
因为我要让父过程可能查看到它开设的子过程的一些根本信息(占用的pid号、运行工夫等)
所有的过程都会步入僵尸过程
无害的状况:父过程不死并且在无限度的创立子过程并且子过程也不完结
回收子过程占用的pid号两种状况:
1、父过程期待子过程运行完结
2、父过程调用 join 办法(等同于期待子过程运行完结)
孤儿过程
子过程存活,父过程意外死亡
操作系统会开设一个“儿童福利院”专门治理孤儿过程回收相干资源
守护过程
from multiprocessing import Processimport timedef task(name): print('%s 总管正在活着' % name) time.sleep(3) print('%s 总管正在死亡' % name)if __name__ == '__main__': p = Process(target=task, args=('纵观',)) p.daemon = True # 将过程 p 设置成守护过程 这一句代码要放在 p.start 下面才无效 p.start() print('皇帝死于非命')
互斥锁
多个过程操作同一份数据的时候,会呈现数据错乱的问题
针对上述问题,解决形式就是加锁解决:将并发变成串行,就义效率然而保障了数据的平安
from multiprocessing import Process, Lockimport jsonimport timeimport random# 查票def search(i): # 文件操作读取票数 with open('data', 'r', encoding='utf8') as f: dic = json.load(f) print('用户 %s 查问余票:%s' % (i, dic.get('ticket_num'))) # 字典取值不要用[],要用.get()# 买票 1、先查 2、再买def buy(i): # 先查票 with open('data', 'r', encoding='utf8') as f: dic = json.load(f) # 模仿网络提早 time.sleep(random.randint(1, 3)) # 判断以后是否有票 if dic.get('ticket_num') > 0: # 批改数据库买票 dic['ticket_num'] -= 1 # 写入数据库 with open('data', 'w', encoding='utf8') as f: json.dump(dic, f) print('用户 %s 买票胜利!' % i) else: print('用户 %s 买票失败' % i)# 整合下面两个函数def run(i, mutex): search(i) # 给买票环节加锁解决 # 抢锁 mutex.acquire() # 所有人随机抢锁,一个人抢到,程序持续运行 buy(i) # 开释锁 mutex.release() # 买完票,将锁开释,供剩下的人抢if __name__ == '__main__': # 在主过程中生成一把锁 让所有的子过程抢 谁先抢到谁先买票 mutex = Lock() for i in range(1, 11): p = Process(target=run, args=(i, mutex)) p.start()
留神:
1、锁不要轻易应用,容易造成死锁景象(个别不会用到,都是外部封装好的)
2、锁只在解决数据的局部来保障数据安全(只在争抢数据的环节加锁解决即可)
过程间通信
过程之间是无奈间接进行数据交互的,然而能够通过队列或管道实现数据交互
队列Queue模块
管道:subprocess(stdin stdout stderr)
队列:管道 + 锁
队列:先进先出
堆栈:先进后出
from multiprocessing import Queue# import queue 也能够# 创立一个队列q = Queue(5) # 括号内能够传数字 示意生成的队列最大能够同时寄存的数据量# 往队列中存数据q.put(111)q.put(222)q.put(333)print(q.full()) # 判断以后队列是否满了print(q.empty()) # 判断以后队列是否空了q.put(444)q.put(555)print(q.full()) # 判断以后队列是否满了# q.put(666) # 当队列数据放满了之后 如果还有数据要放 程序会阻塞 直到有地位让进去# 去队列中取数据v1 = q.get()v2 = q.get()v3 = q.get()v4 = q.get()v5 = q.get()print(q.empty()) # 判断以后队列是否空了# v6 = q.get_nowait() # 没有数据间接报错 queue.Empty# v6 = q.get(timeout=3) # 没有数据之后原地期待三秒 没有再报错# v6 = q.get() # 队列中如果曾经没有数据的话 get办法会原地阻塞# print(v1, v2, v3, v4, v5, v6)"""q.full()q.empty()q.get_nowait()在多过程的状况下是不准确的本地测试的时候才可能会用到Queue,理论生产用的都是他人封装好的性能十分弱小的工具"""
IPC机制
钻研思路
1、主过程跟子过程借助于队列通信:
from multiprocessing import Queue, Processdef producer(q): q.put('我是23号技师 很快乐为您服务') # 在子过程中向主过程的队列中增加值 print('hello big baby')if __name__ == '__main__': q = Queue() # 在主过程中创立一个队列 p = Process(target=producer, args=(q,)) # 将producer函数当作子过程创立 p.start() print(q.get()) # 在主过程中获取并打印队列里的值
2、子过程跟子过程借助于队列通信:
from multiprocessing import Queue, Processdef producer(q): q.put('我是23号技师 很快乐为您服务') # 往队列里放数据def consumer(q): print(q.get()) # 在子过程中获取队列中的数据if __name__ == '__main__': q = Queue() # 在主过程中创立一个队列 p = Process(target=producer, args=(q,)) # 将producer函数当作子过程创立 p1 = Process(target=consumer, args=(q,)) # 将consumer函数当作子过程创立 p.start() p1.start()
生产者消费者模型
生产者:生产/制作货色的
消费者:生产/解决货色的
该模型除了上述两个之外还须要一个媒介
生存中的例子:做包子的将包子做好后放在蒸笼里,买包子的去蒸笼里拿
厨师做好的菜用盘子装着,给消费者端过去
生产者和消费者之间不是间接做交互的,而是借助于媒介做交互
生产者(做包子的)+ 音讯队列(蒸笼)+ 消费者(吃包子的)
音讯队列的存在是为了解决供需不均衡的问题
from multiprocessing import Process, Queue, JoinableQueueimport timeimport random# 生产者def producer(name, food, q): for i in range(5): data = '%s 生产了 %s%s' % (name, food, i) # 模仿提早 time.sleep(random.randint(1, 3)) print(data) # 将数据放入队列中 q.put(data)# 消费者def consumer(name, q): # 消费者胃口很大 光盘口头 while True: food = q.get() # 没有数据就会卡住 # 判断以后是否有完结的标识 # if food is None: break time.sleep(random.randint(1, 3)) print('%s吃了%s' % (name, food)) q.task_done() # 通知队列你曾经从外面取出了一个数据并且处理完毕了if __name__ == '__main__': # q = Queue() q = JoinableQueue() p1 = Process(target=producer, args=('大厨1号', '包子', q)) p2 = Process(target=producer, args=('大厨2号', '泔水', q)) c1 = Process(target=consumer, args=('春哥', q)) c2 = Process(target=consumer, args=('新哥', q)) p1.start() p2.start() # 将消费者设置成守护过程 c1.daemon = True c2.daemon = True c1.start() c2.start() p1.join() p2.join() # 期待生产者生产结束之后 往队列中增加特定的完结符号 # q.put(None) # 必定在所有生产者生产的数据的开端 # q.put(None) # 有两个消费者 所以要放两个 None q.join() # 期待队列中所有的数据被取完了再往下执行代码 """ JoinableQueue 每当你往该队列中存入数据的时候 外部会有一个计数器+1 每当你调用 task_done 的时候 计数器-1 q.join() 当计数器为0的时候 才往后运行 """ # 只有 q.join() 执行结束 阐明消费者曾经解决完数据了 消费者就没有存在的必要了 # 因而能够在主程序中将消费者设置为守护过程
线程实践
致命三问
什么是线程
过程:资源单位
线程:执行单位
- 将操作系统比喻成一个大的工厂,过程就相当于工厂外面的车间,线程就是车间外面的流水线
- 每一个过程必定自带一个线程
总结:起一个过程只是在内存空间中开拓一块独立的空间,真正被CPU执行的其实是过程外面的线程,线程指的就是代码的执行过程,执行代码中所须要应用到的资源都找所在的过程要。
过程和线程都是虚构单位,只是为了咱们更加不便的形容问题
为何要有线程
开设过程
1、申请内存空间 耗资源
2、“拷贝代码” 耗资源
开线程
一个过程内能够开设多个线程,在用一个过程内开多个线程无需再次申请内存空间及拷贝代码操作
总结:开线程比开过程省资源
同一个过程下的线程,数据是共享的
如何应用
开启线程的两种形式:
# # 第一种形式from multiprocessing import Processfrom threading import Threadimport timedef task(name): print('%s is running' % name) time.sleep(1) print('%s is over' % name) # 开启线程不须要在main上面执行代码 间接书写就能够t = Thread(target=task, args=('egon',))t.start() # 创立线程的开销十分小 简直是代码一执行线程就曾经创立了print('主')############################################################# 第二种形式from threading import Threadimport timeclass MyThead(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s is running' % self.name) time.sleep(1) print('egon DSB')if __name__ == '__main__': t = MyThead('egon') # 开一个线程 t.start() print('主')
TCP服务端实现并发的成果:
"""服务端 1、要有固定的IP和端口 2、24小时不间断提供服务 3、可能反对并发"""import socketfrom threading import Threadfrom multiprocessing import Processserver = socket.socket() # 括号内不加参数默认就是tcp协定server.bind(('127.0.0.1', 8080))server.listen(5)# 将服务的代码独自封装成一个函数def talk(conn): # 通信循环 while True: try: data = conn.recv(1024) if len(data) == 0: break print(data.decode('utf-8')) conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close()# 链接循环while True: conn, addr = server.accept() # 接客 # 叫其他人来服务客户 t = Thread(target=talk, args=(conn,)) # t = Process(target=talk, args=(conn,)) # 开过程也是一样的成果 t.start()###########################################################"""客户端"""import socketclient = socket.socket()client.connect(('127.0.0.1', 8080))while True: client.send(b'hello world') data = client.recv(1024) print(data.decode('utf-8'))
线程对象的join办法
from threading import Threadimport timedef task(name): print('%s is running' % name) time.sleep(3) print('%s is over' % name)if __name__ == '__main__': t = Thread(target=task, args=('egon',)) t.start() t.join() # 主线程期待子线程运行完结再执行 print('主')
同一个过程下的多个线程 数据是共享的
from threading import Threadimport timemoney = 100def task(): global money money = 666 print(money)if __name__ == '__main__': t = Thread(target=task) t.start() t.join() print(money)
线程对象属性及其他办法
from threading import Thread, active_count, current_threadimport timedef task(n): # print('hello world', os.getpid()) print('hello world', current_thread().name) # 查看以后所在线程的名字 time.sleep(n)if __name__ == '__main__': t = Thread(target=task, args=(1,)) t1 = Thread(target=task, args=(2,)) t.start() t1.start() t1.join() # 期待 t1 运行结束后再运行前面的代码 # print('主', os.getpid()) # print('主', current_thread().name) print('主', active_count()) # 统计以后正在沉闷的线程数
守护线程
from threading import Threadimport timedef task(name): print('%s is running' % name) time.sleep(1) print('%s is over' % name)if __name__ == '__main__': t = Thread(target=task, args=('xxxx',)) t.daemon = True # 将 t 线程变为守护线程 只有主线程完结了 t线程也完结 t.start() print('主') """主线程运行完结之后不会立即完结 会期待所有其余非守护线程完结才会完结因为主线程的完结意味着所在的过程的完结"""
略微有点迷惑性的例子:
from threading import Threadimport timedef foo(): print('123') time.sleep(1) print('end123')def func(): print('456') time.sleep(3) print('end456')if __name__ == '__main__': t1 = Thread(target=foo) t2 = Thread(target=func) t1.daemon = True t1.start() t2.start() print('主.......')
线程互斥锁
from threading import Thread, Lockimport timemoney = 100mutex = Lock() # 学生成一把锁def task(): global money mutex.acquire() # 在抢数据的代码前加锁 tmp = money time.sleep(0.1) money = tmp - 1 mutex.release() # 抢完当前开释锁if __name__ == '__main__': t_list = [] for i in range(100): # 起100个线程 t = Thread(target=task) t.start() t_list.append(t) # 先将线程逐个增加进列表中以备后续对立操作 for t in t_list: t.join() # 后续对立操作:保障每一个线程运行结束后再完结主线程 print(money)
GIL全局解释器锁
定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
python解释器有多个版本
Cpython
Jpython
Pypypython
然而广泛应用的都是Cpython解释器
在Cpython解释器中GIL是一把互斥锁,用来阻止同一个过程下的多个线程的同时执行
同一个过程下的多个线程无奈利用多核优势!!!
疑难:python的多线程是不是一点用都没有???
因为python中的内存治理不是线程平安的
内存治理(垃圾回收机制)
1、援用计数
2、标记革除
3、分代回收
重点:
1、GIL不是python的特点,而是Cpython解释器的特点
2、GIL是保障解释器级别的数据的平安(即线程与线程之间的数据的平安)
3、GIL会导致同一个过程下的多个线程无奈同时执行即无奈利用多核优势
4、针对不同的数据还是须要加不同的锁解决
5、解释性语言的通病:同一个过程下多个线程无奈利用多核优势
GIL与一般互斥锁的区别
from threading import Thread, Lockimport timemoney = 100mutex = Lock() # 学生成一把锁def task(): global money # with mutex: # 等同于上面的写法 # tmp = money # time.sleep(0.1) # money = tmp - 1 mutex.acquire() # 在抢数据的代码前加锁 tmp = money time.sleep(0.1) money = tmp - 1 mutex.release() # 抢完当前开释锁if __name__ == '__main__': t_list = [] for i in range(100): # 起100个线程 t = Thread(target=task) t.start() t_list.append(t) # 先将线程逐个增加进列表中以备后续对立操作 for t in t_list: t.join() # 后续对立操作:保障每一个线程运行结束后再完结主线程 print(money)"""100个线程起来之后要先去抢GIL我进入IO GIL主动开释 然而我手上还有一把互斥锁其余线程尽管抢到了GIL然而抢不到互斥锁最终GIL还是回到我的手上 我去操作数据"""
同一个过程下的多线程无奈利用多核优势,是不是就没有用了
多线程是否有用要看具体情况
单核:四个工作(IO密集型\计算密集型)
多核:四个工作(IO密集型\计算密集型)
计算密集型:每个工作都须要10s
单核:(不必思考了,时代已过)
多过程:额定的耗费资源
多线程:节俭资源
多核:
多过程:总耗时 10s+
多线程:总耗时 40s+
IO密集型:
多核:
多过程:绝对浪费资源
多线程:更加节俭资源
代码验证
# 计算密集型from multiprocessing import Processfrom threading import Threadimport os, timedef work(): res = 0 for i in range(10000000): res *= iif __name__ == '__main__': l = [] print(os.cpu_count()) # 获取以后计算机CPU个数 start_time = time.time() for i in range(8): # p = Process(target=work) # 0.4691619873046875 多过程更快 t = Thread(target=work) # 1.6921021938323975 # p.start() t.start() # l.append(p) l.append(t) for p in l: p.join() print(time.time() - start_time)
# IO密集型from multiprocessing import Processfrom threading import Threadimport os, timedef work(): time.sleep(2)if __name__ == '__main__': l = [] print(os.cpu_count()) # 获取以后计算机CPU个数 start_time = time.time() for i in range(2000): # p = Process(target=work) # 14.634779930114746 t = Thread(target=work) # 2.142601251602173 # p.start() t.start() # l.append(p) l.append(t) for p in l: p.join() print(time.time() - start_time)
总结:
多过程和多线程各有本人的劣势
通常能够 多过程上面开设多线程
这样的话既能够利用多核也能够节俭资源耗费
死锁
当你晓得锁的应用(抢锁必须要开释锁),其实你在操作锁的时候也极其容易产生死锁景象(整个程序卡死、阻塞)
from threading import Thread, Lockimport timemutexA = Lock()mutexB = Lock()# 类只有加括号屡次 产生的必定是不同的对象# 如果你想要实现屡次加括号等到的是雷同的对象 ---> 单例模式class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('%s 抢到A锁' % self.name) # 获取以后线程名 mutexB.acquire() print('%s 抢到B锁' % self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('%s 抢到B锁' % self.name) time.sleep(2) mutexA.acquire() print('%s 抢到A锁' % self.name) mutexA.release() mutexB.release()if __name__ == '__main__': for i in range(10): t = MyThread() t.start()
递归锁
特点:
能够被间断的acquire和release
然而只能被第一个抢到这把锁执行上述操作
它的外部有一个计数器 每acquire一次计数加一 每release一次计数减一
只有计数不为0 其他人都无奈抢到该锁
# 递归锁from threading import Thread, Lock, RLockimport timemutexA = mutexB = RLock()# 类只有加括号屡次 产生的必定是不同的对象# 如果你想要实现屡次加括号等到的是雷同的对象 ---> 单例模式class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('%s 抢到A锁' % self.name) # 获取以后线程名 mutexB.acquire() print('%s 抢到B锁' % self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('%s 抢到B锁' % self.name) time.sleep(2) mutexA.acquire() print('%s 抢到A锁' % self.name) mutexA.release() mutexB.release()if __name__ == '__main__': for i in range(10): t = MyThread() t.start()
信号量
信号量在不同的阶段可能对应不同的技术点
在并发编程中信号量指的是锁!
如果咱们将互斥锁比喻成一个厕所的话
那么信号量就相当于多个厕所
from threading import Thread, Semaphoreimport timeimport randomsm = Semaphore(5) # 括号内写数字 写几就示意开设几个坑位def task(name): sm.acquire() print('%s 正在蹲坑' % name) time.sleep(random.randint(1, 5)) sm.release()if __name__ == '__main__': for i in range(20): t = Thread(target=task, args=('伞兵%s号' % i,)) t.start()
Event事件
一些过程/线程须要期待另外一些过程/线程运行结束之后能力运行,相似于发射信号一样
from threading import Thread, Eventimport timeevent = Event() # 造了一个红绿灯def light(): print('红灯亮着的') time.sleep(3) print('绿灯亮了') event.set() # 通知等红灯的人能够走了def car(name): print('%s 号车正在等红灯' % name) event.wait() # 期待 event.set() 被触发 print('%s 号车加油门开走了..' % name)if __name__ == '__main__': t = Thread(target=light) # 设置一个红绿灯 t.start() for i in range(1, 21): t = Thread(target=car, args=('%s' % i,)) t.start()
线程q
"""同一个过程下多个线程 数据是共享的为什么同一个过程下还会应用队列来共享数据呢因为同一过程下的多个线程共享数据时,数据是不平安的队列: 管道 + 锁所以用队列是为了保证数据的平安"""import queue# 咱们当初应用的队列(queue)都是只能在本地测试应用# 1 队列q 先进先出q = queue.Queue(3)q.put(1)q.get()q.get_nowait()q.get(timeout=3)q.full()q.empty()############################################### 2 后进先出qq = queue.LifoQueue(3) # last in first outq.put(1)q.put(2)q.put(3)print(q.get()) # 3############################################### 3 优先级q (你能够给放入队列中的数据设置进出的优先级)q = queue.PriorityQueue(4)q.put((10, '111'))q.put((100, '222'))q.put((0, '333'))q.put((-5, '444'))print(q.get()) # (-5, '444')# put括号内放一个元组 第一个放数字示意优先级 第二个放数据# 须要留神的是 数字越小优先级越高!!!
过程池与线程池
先回顾tcp服务端实现并发的成果是怎么玩的?
之前是每来一个人就开设一个过程或者线程去解决
"""无论是开过程还是线程 都要耗费资源只不过线程的资源略微小一点而已咱们是不可能做到无限度的开设过程和线程 因为计算机硬件的资源跟不上!硬件的开发速度远远赶不上软件咱们的主旨应该是在保障计算机硬件可能失常工作的状况下最大限度的利用它"""
池的概念
什么是池?
池是用来保障计算机硬件平安的状况下最大限度的利用计算机
它升高了程序的运行效率,然而保障了计算机硬件的平安,从而让程序可能失常运行
根本应用:
# 线程池from concurrent.futures import ThreadPoolExecutorimport timepool = ThreadPoolExecutor(5) # 变量名轻易起 参数 5 示意池子外面固定只有五个线程# 括号内能够传数字 不传的话默认会开设以后计算机CPU个数五倍的线程"""池子造出来之后 外面会固定存在五个线程这五个线程不会呈现反复创立和销毁的过程这样也节俭了反复创立线程的过程 ---> 节俭资源池子的应用十分的简略你只须要将须要做的工作往池子中提交即可"""def task(n): print(n) time.sleep(2) return n*n"""工作的提交形式分两种 同步: 异步:"""# pool.submit(task, 1) # 朝池子中提交工作 异步提交# print('主')t_list = []for i in range(20): # 朝池子中提交20个工作 res = pool.submit(task, i) # print(res.result()) # submit类中的 result 办法 这行代码会导致程序同步提交 t_list.append(res)# 期待线程池中所有的工作执行结束之后再打印列表中的后果pool.shutdown() # 敞开线程池 期待线程池中所有的工作运行结束for t in t_list: print('>>>: ', t.result()) # 有序输入
然而上述线程池的代码用列表的形式有点顺当,能够用上面的办法
最终实现一有后果立即获取并打印
# 过程池from concurrent.futures import ProcessPoolExecutorimport timepool = ProcessPoolExecutor()# 括号内能够传数字 不传的话默认会开设以后计算机CPU个数的过程"""池子造出来之后 外面会固定存在几个过程这几个过程不会呈现反复创立和销毁的过程池子的应用十分的简略你只须要将须要做的工作往池子中提交即可"""def task(n): print(n) time.sleep(2) return n*ndef call_back(n): # n ==> pool.submit(task, i) 返回的对象 print('call_back: ', n.result()) # 对象的值用 .result() 查看"""工作的提交形式分两种 同步: 异步:提交后的返回后果应该通过回调来获取 回调机制:就相当于给每个异步工作绑定了一个定时炸弹,一旦工作有后果立即触发"""if __name__ == '__main__': for i in range(20): # 朝池子中提交20个工作 # res = pool.submit(task, i) pool.submit(task, i).add_done_callback(call_back)
总结
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorpool = ProcessPoolExecutor() # 过程池pool = ThreadPoolExecutor(5) # 线程池res = pool.submit(task, i).add_done_callback(call_back)
协程
过程:资源单位
线程:执行单位
协程:这个概念齐全是程序员本人意淫进去的 基本不存在
为的是“单线程下实现并发”
咱们程序员本人在代码层面上检测咱们所有的IO操作
一旦遇到IO 在代码级别实现切换
这样给CPU的感觉是你这个程序始终在运行 没有IO
从而晋升程序的运行效率
多道技术
切换+保留状态
CPU两种状况下切换
1、程序遇到IO
2、程序长时间占用
适宜的例子:
tcp服务端
accept
recv
代码如何做到 切换+保留状态?
切换:切换不肯定晋升效率 也有可能升高效率
IO切 晋升
没有IO切 升高
保留状态:保留上一次执行的状态 下一次接着上一次的操作持续往后执行
yield
验证切换是否就肯定晋升效率
# 串行执行计算密集型的工作import timedef func1(): for i in range(100000000): i + 1def func2(): for i in range(100000000): i + 1start_time = time.time()func1()func2()print(time.time() - start_time)
# 切换 + yieldimport timedef func1(): while True: 100000000 + 1 yielddef func2(): g = func1() # 先初始化出生成器 for i in range(100000000): i + 1 next(g)start_time = time.time()func2()print(time.time() - start_time)
gevent模块
能够检测程序的IO操作
from gevent import monkey; monkey.patch_all()import timefrom gevent import spawn"""gevent模块自身无奈检测常见的一些IO操作在应用的时候须要你额定的导入一句话from gevent import monkeymonkey.patch_all()又因为下面两句话在应用gevent模块时是必定要导入的所以还反对简写from gevent import monkey; monkey.patch_all()"""def heng(): print('哼') time.sleep(2) print('哼')def ha(): print('哈') time.sleep(3) print('哈')def heiheihei(): print('嘿嘿嘿') time.sleep(5) print('嘿嘿嘿')start_time = time.time()g1 = spawn(heng)g2 = spawn(ha)g3 = spawn(heiheihei)g1.join() # 期待被检测的工作执行结束 再往后继续执行g2.join()g3.join()print(time.time() - start_time) # 5.001976728439331
协程实现tcp服务端并发
# 服务端from gevent import monkey; monkey.patch_all()import socketfrom gevent import spawndef communication(conn): while True: try: data = conn.recv(1024) if len(data) == 0:break conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close()def server(ip, port): server = socket.socket() server.bind((ip, port)) server.listen(5) while True: conn, addr = server.accept() spawn(communication, conn)if __name__ == '__main__': g1 = spawn(server, '127.0.0.1', 8080) g1.join()
# 客户端from threading import Thread, current_threadimport socketdef x_client(): client = socket.socket() client.connect(('127.0.0.1', 8080)) n = 0 while True: msg = '%s say hello %s' % (current_thread().name, n) n += 1 client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8'))if __name__ == '__main__': for i in range(500): t = Thread(target=x_client) t.start()
总结
现实状态:
咱们能够通过
多过程上面开设多线程
多线程上面开设协程
从而使咱们的程序执行效率晋升
IO模型简介
咱们这里钻研的IO模型都是针对网络IO的
Stevens在文章中一共比拟了五种IO Model:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路复用
* signal driven IO 信号驱动IO
* asynchronous IO 异步IO
由signal driven IO(信号驱动IO)在理论中并不罕用,所以次要介绍其余四种IO Model
1)期待数据筹备(Waiting for the data to be ready)
2)将数据从内核拷贝到过程中(Copying the data from the kernel to the process)
同步异步
阻塞非阻塞
常见的网络状态:
accept
recv
recvftom
send尽管它也有IO行为 然而不在咱们的思考范畴
阻塞IO
咱们之前写的都是阻塞IO模型 协程除外
import socketserver = socket.socket()server.bind(('127.0.0.1', 8080))server.listen(5)while True: conn, addr = server.accept() while True: try: data = conn.recv(1024) if len(data) == 0: break print(data) conn.send(data.upper()) except ConnectionResetError as e: break conn.close() # 在服务端无论是开设多过程、多线程,还是过程池、线程池,其实还是没有解决IO问题"""该等的中央还是得等 没有躲避只不过多集体期待的彼此互不烦扰"""
非阻塞IO
"""要本人实现一个非阻塞IO模型"""# 服务端import socketserver = socket.socket()server.bind(('127.0.0.1', 8080))server.listen(5)server.setblocking(False) # 参数默认是True 改为False会将所有的网络阻塞变为非阻塞r_list = []del_list = []while True: try: conn, addr = server.accept() r_list.append(conn) except BlockingIOError: for conn in r_list: try: data = conn.recv(1024) # 没有音讯 报错 if len(data) == 0: # 客户端断开链接 conn.close() # 敞开conn # 将无用的conn从r_list删除 del_list.append(conn) continue conn.send(data.upper()) # 给客户端返回大写 except BlockingIOError: continue except ConnectionResetError: conn.close() del_list.append(conn) # 回收无用的链接 for conn in del_list: r_list.remove(conn) del_list.clear() ############################################################# 客户端import socketclient = socket.socket()client.connect(('127.0.0.1', 8080))while True: client.send(b'hello world') data = client.recv(1024) print(data)
总结
尽管非阻塞IO给你的感觉十分的牛逼,然而该模型会长时间占用着CPU并且不干活(让CPU不停的空转)
咱们理论利用中也不会思考应用非阻塞IO模型
然而任何的技术点都有它存在的意义
理论利用或者是思维借鉴
IO多路复用
当监管的对象只有一个的时候 其实IO多路复用连阻塞IO都比不上!
然而IO多路复用能够一次性监管很多个对象:
server = socket.socket()
conn, add = server.accept()
监管机制是操作系统自身就有的 如果你想用该监管机制(select)
须要你导入对应的select模块
# 服务端import socketimport selectserver = socket.socket()server.bind(('127.0.0.1', 8080))server.listen(5)server.setblocking(False)read_list = [server]while True: r_list, w_list, x_list = select.select(read_list, [], []) for i in r_list: """针对不同的对象做不同的解决""" if i is server: conn, addr = i.accept() read_list.append(conn) # conn 对象也应该增加到监管的队列中 else: res = i.recv(1024) if len(res) == 0: i.close() # 将有效的监管对象 移除 read_list.remove(i) continue print(res) i.send(b'heiheihei')
# 客户端import socketclient = socket.socket()client.connect(('127.0.0.1', 8080))while True: client.send(b'hello world') data = client.recv(1024) print(data)
总结
监管机制其实有很多:
select机制 ==> Windows Linux 都有
poll机制 ===> 只在 Linux 有,poll和select都能够监管多个对象 然而poll监管的数量更多
<u>上述select和poll机制都不是很完满 当监管的对象特地多的时候 可能会呈现极其大的提早响应</u>
epoll机制 ===> 只在 Linux 有
它给每一个监管对象都绑定了一个回调机制,一旦有响应 回调机制立即发动揭示
针对不同的操作系统还须要思考不同的检测机制 书写代码太过繁琐
有一个模块可能依据你跑的平台的不同主动帮你抉择对应的监管机制 => selectors模块
异步IO
异步IO模型是所有模型中效率最高的,也是应用最宽泛的
相干的模块和框架
模块:asyncio模块
异步框架:sanic、tronado、twisted
对立特点:速度快!!!
import threadingimport asyncioasync def hello(): print('hello world %s' % threading.current_thread()) await asyncio.sleep(1) # 此处模仿真正的IO操作的耗时 print('hello world %s' % threading.current_thread())loop = asyncio.get_event_loop()tasks = [hello(), hello()]loop.run_until_complete(asyncio.wait(tasks))loop.close()