关于python:并发编程

41次阅读

共计 21725 个字符,预计需要花费 55 分钟才能阅读完成。

多道技术

单核 实现并发的成果

必备知识点

  • 并发

    看起来像同时运行的就能够称之为并发

  • 并行

    真正意义上的同时执行

空间与工夫上的复用

​ 空间上:多个程序专用一套计算机硬件

​ 工夫上:切换 + 保留状态

ps:

  • 并行必定算并发
  • 单核的计算机必定不能实现并行,然而能够实现并发

补充:假如单核就是一个核,不思考 CPU 的内核数

多道技术图解

多道技术重点常识

"""
切换(cpu)分为两种状况
    1. 当一个程序遇到 IO 操作的时候,操作系统会剥夺该程序的 CPU 执行权限
            作用:进步了 CPU 的利用率,并且也不影响程序的执行效率
            
    2. 当一个程序长时间占用 CPU 的时候,操作系统也会剥夺该程序的 CPU 执行权限
            作用:升高了程序的执行效率(程序运行工夫 + 切换工夫)"""

过程实践

必备知识点

过程与程序的区别

"""
程序就是一堆躺在硬盘上的代码,是“死”的
过程则示意程序正在执行的过程,是“活”的
"""

过程调度

  • 先来先服务调度算法

    ​ 对长作业无利,对短作业有益

  • 短作业优先调度算法

    ​ 对短作业无利,对长作业有益

  • 工夫片轮转法 + 多级反馈队列

过程运行的三状态图

示例:

两对重要概念

  • 同步和异步

    形容的是工作的提交形式

    同步:工作提交之后,原地期待工作的返回后果,期待的过程中不做任何事(干等)

    ​ 程序层面上体现的感觉就是卡住了

    异步:工作提交之后,不原地期待工作的返回后果,间接去做其余事

​ 我提交的工作后果如何获取?

工作的返回后果会有一个异步回调机制主动解决

  • 阻塞非阻塞

    形容的程序的运行状态

    阻塞:阻塞态

    非阻塞:就绪态、运行态

上述概念的组合:最高效的一种组合就是 <u> 异步非阻塞 </u>
现实状态:咱们写的代码永远处于就绪态和运行态之间切换(但根本不可能)

开启过程的两种形式

# 第一种 间接定义函数 罕用
from multiprocessing import Process
import time


def task(name):
    print('%s is running' % name)
    time.sleep(3)
    print('%s is over' % name)


if __name__ == '__main__':
    # 1、创立一个过程对象
    p = Process(target=task, args=('jason',))
    """
    target= 要运行的函数
    args=(函数须要的参数)
    """
    # 2、开启过程
    p.start()  # 通知操作系统帮你创立一个过程 异步
    print('主')
"""
windows 操作系统下 创立过程肯定要在 main 内创立
因为 windows 下创立过程相似于模块导入的形式
会从上往下顺次执行代码
Linux 中则是间接将代码残缺拷贝一份
"""
# 第二种形式 类的继承 不罕用
from multiprocessing import Process
import time


class MyProcess(Process):  # 定义一个类继承自 Process,类名能够自定义
    def run(self):  # 将要运行的性能写入函数,函数名必须叫 run!!print('hello beautiful girl!')
        time.sleep(1)
        print('get out!')


if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print('主')

总结

创立过程就是在内存中申请一块内存空间将须要运行的代码丢进去

一个过程对应在内存中就是一块独立的内存空间

多个过程对应在内存中就是多块独立的内存空间

过程与过程之间数据默认状况下是无奈间接交互,想交互能够借助第三方工具、模块

join 办法

join 是让主过程的代码期待子过程代码运行完结之后,再持续运行。不影响其余子过程的执行

from multiprocessing import Process
import time


def task(name, n):
    print('%s is running' % name)
    time.sleep(n)
    print('%s is over' % name)


if __name__ == '__main__':
    # p1 = Process(target=task, args=('jason', 1))
    # p2 = Process(target=task, args=('egon', 2))
    # p3 = Process(target=task, args=('tank', 3))
    # start_time = time.time()
    # p1.start()
    # p2.start()
    # p3.start()  # 仅仅是通知操作系统要创立过程
    # p1.join()  # 主过程期待子过程 p 运行完结之后再持续往后执行
    # p2.join()
    # p3.join()
    start_time = time.time()
    p_list = []
    for i in range(1, 4):
        p = Process(target=task, args=('子过程 %s' % i, i))
        p.start()
        p_list.append(p)  # 将起起来的所有过程对象放入一个列表中
    for p in p_list:
        p.join()  # 主过程期待每一个子过程 p 运行完结之后再持续往后执行
    print('主', time.time() - start_time)


    
>>>: 子过程 1 is running
    子过程 3 is running
    子过程 2 is running
    子过程 1 is over
    子过程 2 is over
    子过程 3 is over
    主 3.059741973876953

过程间数据隔离(默认状况下)

from multiprocessing import Process

money = 100


def task():
    global money  # 部分批改全局
    money = 666
    print('子', money)


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print(money)

过程对象及其他办法

一台计算机下面运行着很多过程,那么计算机是如何辨别并治理这些过程服务端的呢?

计算机会给每一个运行的过程调配一个 PID 号

如何查看

​ Windows 进入 cmd 输出 tasklist 查看全副过程,tasklist |findstr PID查看具体的过程(PID 是具体的过程号)

​ Mac 进入 终端 输出 ps aux 查看全副过程,ps aux|grep PID查看具体的过程(PID 是具体的过程号)

from multiprocessing import Process, current_process
import os

current_process().pid  # 查看以后过程号
os.getpid()  # 查看以后过程号
os.getppid()  # 查看以后过程的父过程号
from multiprocessing import Process, current_process
import time


def task():
    print('%s is running' % current_process().pid)  # current_process().pid 查看以后过程的过程号
    time.sleep(3)


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print('主', current_process().pid)
from multiprocessing import Process
import time
import os


def task():
    print('%s is running' % os.getpid())  # 查看以后过程的过程号
    print('子过程的主过程号:%s' % os.getppid())  # 查看以后过程的父过程的过程号
    time.sleep(3)


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print('主', os.getpid())
    print('主主', os.getppid())
p.terminate()  # 杀死以后过程
# 通知操作系统帮你去杀死以后过程 然而须要肯定的工夫 而代码的运行速度极快
time.sleep(0.1)  # 退出一个 time.sleep() 就能够失去正确后果
print(p.is_alive())  # 判断以后过程是否存活

僵尸过程与孤儿过程

  • 僵尸过程

    死了然而没有死透

    当你开设了子过程之后 该过程死后不会立即开释占用的过程号

    因为我要让父过程可能查看到它开设的子过程的一些根本信息(占用的 pid 号、运行工夫等)

    所有的过程 都会步入僵尸过程

    ​ 无害的状况:父过程不死并且在无限度的创立子过程并且子过程也不完结

    ​ 回收子过程占用的 pid 号两种状况:

    ​ 1、父过程期待子过程运行完结

    ​ 2、父过程调用 join 办法(等同于期待子过程运行完结)

  • 孤儿过程

    子过程存活,父过程意外死亡

    操作系统会开设一个“儿童福利院”专门治理孤儿过程回收相干资源

守护过程

from multiprocessing import Process
import time


def task(name):
    print('%s 总管正在活着' % name)
    time.sleep(3)
    print('%s 总管正在死亡' % name)


if __name__ == '__main__':
    p = Process(target=task, args=('纵观',))
    p.daemon = True  # 将过程 p 设置成守护过程 这一句代码要放在 p.start 下面才无效
    p.start()
    print('皇帝死于非命')

互斥锁

多个过程操作同一份数据的时候,会呈现数据错乱的问题

针对上述问题,解决形式就是加锁解决:将并发变成串行,就义效率然而保障了数据的平安

from multiprocessing import Process, Lock
import json
import time
import random


# 查票
def search(i):
    # 文件操作读取票数
    with open('data', 'r', encoding='utf8') as f:
        dic = json.load(f)
    print('用户 %s 查问余票:%s' % (i, dic.get('ticket_num')))  # 字典取值不要用[],要用.get()


# 买票  1、先查  2、再买
def buy(i):
    # 先查票
    with open('data', 'r', encoding='utf8') as f:
        dic = json.load(f)
    # 模仿网络提早
    time.sleep(random.randint(1, 3))
    # 判断以后是否有票
    if dic.get('ticket_num') > 0:
        # 批改数据库买票
        dic['ticket_num'] -= 1
        # 写入数据库
        with open('data', 'w', encoding='utf8') as f:
            json.dump(dic, f)
        print('用户 %s 买票胜利!' % i)
    else:
        print('用户 %s 买票失败' % i)


# 整合下面两个函数
def run(i, mutex):
    search(i)
    # 给买票环节加锁解决
    # 抢锁
    mutex.acquire()  # 所有人随机抢锁,一个人抢到,程序持续运行
    buy(i)
    # 开释锁
    mutex.release()  # 买完票,将锁开释,供剩下的人抢


if __name__ == '__main__':
    # 在主过程中生成一把锁 让所有的子过程抢 谁先抢到谁先买票
    mutex = Lock()
    for i in range(1, 11):
        p = Process(target=run, args=(i, mutex))
        p.start()

留神

​ 1、锁不要轻易应用,容易造成死锁景象(个别不会用到,都是外部封装好的)

​ 2、锁只在解决数据的局部来保障数据安全(只在争抢数据的环节加锁解决即可)

过程间通信

过程之间是无奈间接进行数据交互的,然而能够通过队列或管道实现数据交互

队列 Queue 模块

​ 管道:subprocess(stdin stdout stderr)

​ 队列:管道 + 锁

​ 队列:先进先出

​ 堆栈:先进后出

from multiprocessing import Queue
# import queue 也能够


# 创立一个队列
q = Queue(5)  # 括号内能够传数字 示意生成的队列最大能够同时寄存的数据量

# 往队列中存数据
q.put(111)
q.put(222)
q.put(333)
print(q.full())  # 判断以后队列是否满了
print(q.empty())  # 判断以后队列是否空了
q.put(444)
q.put(555)
print(q.full())  # 判断以后队列是否满了
# q.put(666)  # 当队列数据放满了之后 如果还有数据要放 程序会阻塞 直到有地位让进去

# 去队列中取数据
v1 = q.get()
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
print(q.empty())  # 判断以后队列是否空了
# v6 = q.get_nowait()  # 没有数据间接报错 queue.Empty
# v6 = q.get(timeout=3)  # 没有数据之后原地期待三秒 没有再报错
# v6 = q.get()  # 队列中如果曾经没有数据的话 get 办法会原地阻塞
# print(v1, v2, v3, v4, v5, v6)

"""
q.full()
q.empty()
q.get_nowait()
在多过程的状况下是不准确的
本地测试的时候才可能会用到 Queue,理论生产用的都是他人封装好的性能十分弱小的工具
"""

IPC 机制

钻研思路

1、主过程跟子过程借助于队列通信:
from multiprocessing import Queue, Process


def producer(q):
    q.put('我是 23 号技师 很快乐为您服务')  # 在子过程中向主过程的队列中增加值
    print('hello big baby')


if __name__ == '__main__':
    q = Queue()  # 在主过程中创立一个队列
    p = Process(target=producer, args=(q,))  # 将 producer 函数当作子过程创立
    p.start()
    print(q.get())  # 在主过程中获取并打印队列里的值
2、子过程跟子过程借助于队列通信:
from multiprocessing import Queue, Process


def producer(q):
    q.put('我是 23 号技师 很快乐为您服务')  # 往队列里放数据


def consumer(q):
    print(q.get())  # 在子过程中获取队列中的数据


if __name__ == '__main__':
    q = Queue()  # 在主过程中创立一个队列
    p = Process(target=producer, args=(q,))  # 将 producer 函数当作子过程创立
    p1 = Process(target=consumer, args=(q,))  # 将 consumer 函数当作子过程创立
    p.start()
    p1.start()

生产者消费者模型

生产者:生产 / 制作货色的

消费者:生产 / 解决货色的

该模型除了上述两个之外还须要一个媒介

生存中的例子:做包子的将包子做好后放在蒸笼里,买包子的去蒸笼里拿

​ 厨师做好的菜用盘子装着,给消费者端过去

生产者和消费者之间不是间接做交互的,而是借助于媒介做交互

生产者(做包子的)+ 音讯队列(蒸笼)+ 消费者(吃包子的)

音讯队列 的存在是为了解决供需不均衡的问题

from multiprocessing import Process, Queue, JoinableQueue
import time
import random


# 生产者
def producer(name, food, q):
    for i in range(5):
        data = '%s 生产了 %s%s' % (name, food, i)
        # 模仿提早
        time.sleep(random.randint(1, 3))
        print(data)
        # 将数据放入队列中
        q.put(data)


# 消费者
def consumer(name, q):
    # 消费者胃口很大 光盘口头
    while True:
        food = q.get()  # 没有数据就会卡住
        # 判断以后是否有完结的标识
        # if food is None: break
        time.sleep(random.randint(1, 3))
        print('%s 吃了 %s' % (name, food))
        q.task_done()  # 通知队列你曾经从外面取出了一个数据并且处理完毕了


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer, args=('大厨 1 号', '包子', q))
    p2 = Process(target=producer, args=('大厨 2 号', '泔水', q))
    c1 = Process(target=consumer, args=('春哥', q))
    c2 = Process(target=consumer, args=('新哥', q))
    p1.start()
    p2.start()
    # 将消费者设置成守护过程
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    # 期待生产者生产结束之后 往队列中增加特定的完结符号
    # q.put(None)  # 必定在所有生产者生产的数据的开端
    # q.put(None)  # 有两个消费者 所以要放两个 None
    q.join()  # 期待队列中所有的数据被取完了再往下执行代码
    """
    JoinableQueue 每当你往该队列中存入数据的时候 外部会有一个计数器 +1
    每当你调用 task_done 的时候 计数器 -1
    q.join() 当计数器为 0 的时候 才往后运行
    """
    # 只有 q.join() 执行结束 阐明消费者曾经解决完数据了 消费者就没有存在的必要了
    # 因而能够在主程序中将消费者设置为守护过程

线程实践

致命三问

  • 什么是线程

    过程:资源单位

    线程:执行单位

    • 将操作系统比喻成一个大的工厂,过程就相当于工厂外面的车间,线程就是车间外面的流水线
    • 每一个过程必定自带一个线程

    总结 :起一个过程只是在内存空间中开拓一块独立的空间,真正被 CPU 执行的其实是 过程外面的线程 线程指的就是代码的执行过程,执行代码中所须要应用到的资源都找所在的过程要。

    过程和线程都是虚构单位,只是为了咱们更加不便的形容问题

  • 为何要有线程

    开设过程

    ​ 1、申请内存空间 耗资源

    ​ 2、“拷贝代码”耗资源

    开线程

    ​ 一个过程内能够开设多个线程,在用一个过程内开多个线程无需再次申请内存空间及拷贝代码操作

    总结:开线程比开过程省资源

    同一个过程下的线程,数据是共享的

  • 如何应用

    开启线程的两种形式:

    # # 第一种形式
    from multiprocessing import Process
    from threading import Thread
    import time
    
    
    def task(name):
        print('%s is running' % name)
        time.sleep(1)
        print('%s is over' % name)
        
        
    # 开启线程不须要在 main 上面执行代码 间接书写就能够
    t = Thread(target=task, args=('egon',))
    t.start()  # 创立线程的开销十分小 简直是代码一执行线程就曾经创立了
    print('主')
    
    ############################################################
    
    # 第二种形式
    from threading import Thread
    import time
    
    
    class MyThead(Thread):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):
            print('%s is running' % self.name)
            time.sleep(1)
            print('egon DSB')
    
    
    if __name__ == '__main__':
        t = MyThead('egon')  # 开一个线程
        t.start()
        print('主')

TCP 服务端实现并发的成果:

"""
服务端
    1、要有固定的 IP 和端口
    2、24 小时不间断提供服务
    3、可能反对并发
"""

import socket
from threading import Thread
from multiprocessing import Process


server = socket.socket()  # 括号内不加参数默认就是 tcp 协定
server.bind(('127.0.0.1', 8080))
server.listen(5)


# 将服务的代码独自封装成一个函数
def talk(conn):
    # 通信循环
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


# 链接循环
while True:
    conn, addr = server.accept()  # 接客
    # 叫其他人来服务客户
    t = Thread(target=talk, args=(conn,))
    # t = Process(target=talk, args=(conn,))  # 开过程也是一样的成果
    t.start()

###########################################################

"""客户端"""

import socket


client = socket.socket()
client.connect(('127.0.0.1', 8080))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data.decode('utf-8'))

线程对象的 join 办法

from threading import Thread
import time


def task(name):
    print('%s is running' % name)
    time.sleep(3)
    print('%s is over' % name)


if __name__ == '__main__':
    t = Thread(target=task, args=('egon',))
    t.start()
    t.join()  # 主线程期待子线程运行完结再执行
    print('主')

同一个过程下的多个线程 数据是共享的

from threading import Thread
import time

money = 100


def task():
    global money
    money = 666
    print(money)


if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()
    print(money)

线程对象属性及其他办法

from threading import Thread, active_count, current_thread
import time


def task(n):
    # print('hello world', os.getpid())
    print('hello world', current_thread().name)  # 查看以后所在线程的名字
    time.sleep(n)


if __name__ == '__main__':
    t = Thread(target=task, args=(1,))
    t1 = Thread(target=task, args=(2,))
    t.start()
    t1.start()
    t1.join()  # 期待 t1 运行结束后再运行前面的代码
    # print('主', os.getpid())
    # print('主', current_thread().name)
    print('主', active_count())  # 统计以后正在沉闷的线程数

守护线程

from threading import Thread
import time


def task(name):
    print('%s is running' % name)
    time.sleep(1)
    print('%s is over' % name)


if __name__ == '__main__':
    t = Thread(target=task, args=('xxxx',))
    t.daemon = True  # 将 t 线程变为守护线程 只有主线程完结了 t 线程也完结
    t.start()
    print('主')
    
"""
主线程运行完结之后不会立即完结 会期待所有其余非守护线程完结才会完结
因为主线程的完结意味着所在的过程的完结
"""

略微有点迷惑性的例子:

from threading import Thread
import time


def foo():
    print('123')
    time.sleep(1)
    print('end123')


def func():
    print('456')
    time.sleep(3)
    print('end456')


if __name__ == '__main__':
    t1 = Thread(target=foo)
    t2 = Thread(target=func)
    t1.daemon = True
    t1.start()
    t2.start()
    print('主.......')

线程互斥锁

from threading import Thread, Lock
import time

money = 100
mutex = Lock()  # 学生成一把锁


def task():
    global money
    mutex.acquire()  # 在抢数据的代码前加锁
    tmp = money
    time.sleep(0.1)
    money = tmp - 1
    mutex.release()  # 抢完当前开释锁


if __name__ == '__main__':
    t_list = []
    for i in range(100):  # 起 100 个线程
        t = Thread(target=task)
        t.start()
        t_list.append(t)  # 先将线程逐个增加进列表中以备后续对立操作
    for t in t_list:
        t.join()  # 后续对立操作:保障每一个线程运行结束后再完结主线程
    print(money)

GIL 全局解释器锁

定义:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

python 解释器有多个版本

​ Cpython

​ Jpython

​ Pypypython

然而广泛应用的都是 Cpython 解释器

在 Cpython 解释器中 GIL 是一把互斥锁,用来阻止同一个过程下的多个线程的同时执行

同一个过程下的多个线程无奈利用多核优势!!!

疑难:python 的多线程是不是一点用都没有???

因为 python 中的内存治理不是线程平安的

内存治理(垃圾回收机制)

​ 1、援用计数

​ 2、标记革除

​ 3、分代回收

重点

1、GIL 不是 python 的特点,而是 Cpython 解释器的特点

2、GIL 是保障解释器级别的数据的平安(即线程与线程之间的数据的平安)

3、GIL 会导致同一个过程下的多个线程无奈同时执行即无奈利用多核优势

4、针对不同的数据还是须要加不同的锁解决

5、解释性语言的通病:同一个过程下多个线程无奈利用多核优势

GIL 与一般互斥锁的区别

from threading import Thread, Lock
import time

money = 100
mutex = Lock()  # 学生成一把锁


def task():
    global money
    # with mutex:   # 等同于上面的写法
    #     tmp = money
    #     time.sleep(0.1)
    #     money = tmp - 1
    mutex.acquire()  # 在抢数据的代码前加锁
    tmp = money
    time.sleep(0.1)
    money = tmp - 1
    mutex.release()  # 抢完当前开释锁


if __name__ == '__main__':
    t_list = []
    for i in range(100):  # 起 100 个线程
        t = Thread(target=task)
        t.start()
        t_list.append(t)  # 先将线程逐个增加进列表中以备后续对立操作
    for t in t_list:
        t.join()  # 后续对立操作:保障每一个线程运行结束后再完结主线程
    print(money)

"""
100 个线程起来之后要先去抢 GIL
我进入 IO GIL 主动开释 然而我手上还有一把互斥锁
其余线程尽管抢到了 GIL 然而抢不到互斥锁
最终 GIL 还是回到我的手上 我去操作数据
"""

同一个过程下的多线程无奈利用多核优势,是不是就没有用了

多线程是否有用要看具体情况

单核:四个工作(IO 密集型 \ 计算密集型)

多核:四个工作(IO 密集型 \ 计算密集型)

计算密集型:每个工作都须要 10s

单核:(不必思考了,时代已过)

​ 多过程:额定的耗费资源

​ 多线程:节俭资源

多核:

​ 多过程:总耗时 10s+

​ 多线程:总耗时 40s+

IO 密集型:

多核:

​ 多过程:绝对浪费资源

​ 多线程:更加节俭资源

代码验证

# 计算密集型
from multiprocessing import Process
from threading import Thread
import os, time


def work():
    res = 0
    for i in range(10000000):
        res *= i


if __name__ == '__main__':
    l = []
    print(os.cpu_count())  # 获取以后计算机 CPU 个数
    start_time = time.time()
    for i in range(8):
        # p = Process(target=work)  # 0.4691619873046875  多过程更快
        t = Thread(target=work)  # 1.6921021938323975
        # p.start()
        t.start()
        # l.append(p)
        l.append(t)
    for p in l:
        p.join()
    print(time.time() - start_time)
# IO 密集型
from multiprocessing import Process
from threading import Thread
import os, time


def work():
    time.sleep(2)


if __name__ == '__main__':
    l = []
    print(os.cpu_count())  # 获取以后计算机 CPU 个数
    start_time = time.time()
    for i in range(2000):
        # p = Process(target=work)  # 14.634779930114746
        t = Thread(target=work)  # 2.142601251602173
        # p.start()
        t.start()
        # l.append(p)
        l.append(t)
    for p in l:
        p.join()
    print(time.time() - start_time)

总结:

多过程和多线程各有本人的劣势

通常能够 多过程上面开设多线程

这样的话既能够利用多核也能够节俭资源耗费

死锁

当你晓得锁的应用(抢锁必须要开释锁),其实你在操作锁的时候也极其容易产生死锁景象(整个程序卡死、阻塞)

from threading import Thread, Lock
import time

mutexA = Lock()
mutexB = Lock()


# 类只有加括号屡次 产生的必定是不同的对象
# 如果你想要实现屡次加括号等到的是雷同的对象 ---> 单例模式


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('%s 抢到 A 锁' % self.name)  # 获取以后线程名
        mutexB.acquire()
        print('%s 抢到 B 锁' % self.name)
        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('%s 抢到 B 锁' % self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 抢到 A 锁' % self.name)
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

递归锁

特点:

​ 能够被间断的 acquire 和 release

​ 然而只能被第一个抢到这把锁执行上述操作

​ 它的外部有一个计数器 每 acquire 一次计数加一 每 release 一次计数减一

​ 只有计数不为 0 其他人都无奈抢到该锁

# 递归锁
from threading import Thread, Lock, RLock
import time

mutexA = mutexB = RLock()


# 类只有加括号屡次 产生的必定是不同的对象
# 如果你想要实现屡次加括号等到的是雷同的对象 ---> 单例模式


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('%s 抢到 A 锁' % self.name)  # 获取以后线程名
        mutexB.acquire()
        print('%s 抢到 B 锁' % self.name)
        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('%s 抢到 B 锁' % self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 抢到 A 锁' % self.name)
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

信号量

信号量在不同的阶段可能对应不同的技术点

在并发编程中信号量指的是锁

如果咱们将互斥锁比喻成一个厕所的话

那么信号量就相当于多个厕所

from threading import Thread, Semaphore
import time
import random


sm = Semaphore(5)  # 括号内写数字 写几就示意开设几个坑位


def task(name):
    sm.acquire()
    print('%s 正在蹲坑' % name)
    time.sleep(random.randint(1, 5))
    sm.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task, args=('伞兵 %s 号' % i,))
        t.start()

Event 事件

一些过程 / 线程须要期待另外一些过程 / 线程运行结束之后能力运行,相似于发射信号一样

from threading import Thread, Event
import time

event = Event()  # 造了一个红绿灯


def light():
    print('红灯亮着的')
    time.sleep(3)
    print('绿灯亮了')
    event.set()  # 通知等红灯的人能够走了


def car(name):
    print('%s 号车正在等红灯' % name)
    event.wait()  # 期待 event.set() 被触发
    print('%s 号车加油门开走了..' % name)


if __name__ == '__main__':
    t = Thread(target=light)  # 设置一个红绿灯
    t.start()

    for i in range(1, 21):
        t = Thread(target=car, args=('%s' % i,))
        t.start()

线程 q

"""
同一个过程下多个线程 数据是共享的
为什么同一个过程下还会应用队列来共享数据呢
因为同一过程下的多个线程共享数据时,数据是不平安的
队列:管道 + 锁
所以用队列是为了保证数据的平安
"""
import queue
# 咱们当初应用的队列(queue)都是只能在本地测试应用

# 1 队列 q 先进先出
q = queue.Queue(3)
q.put(1)
q.get()
q.get_nowait()
q.get(timeout=3)
q.full()
q.empty()

##############################################

# 2 后进先出 q
q = queue.LifoQueue(3)  # last in first out
q.put(1)
q.put(2)
q.put(3)
print(q.get())  # 3

##############################################

# 3 优先级 q(你能够给放入队列中的数据设置进出的优先级)q = queue.PriorityQueue(4)
q.put((10, '111'))
q.put((100, '222'))
q.put((0, '333'))
q.put((-5, '444'))
print(q.get())  # (-5, '444')
# put 括号内放一个元组 第一个放数字示意优先级 第二个放数据
# 须要留神的是 数字越小优先级越高!!!

过程池与线程池

先回顾 tcp 服务端实现并发的成果是怎么玩的?

之前是每来一个人就开设一个过程或者线程去解决

"""
无论是开过程还是线程 都要耗费资源
只不过线程的资源略微小一点而已

咱们是不可能做到无限度的开设过程和线程 因为计算机硬件的资源跟不上!硬件的开发速度远远赶不上软件

咱们的主旨应该是在保障计算机硬件可能失常工作的状况下最大限度的利用它
"""

池的概念

什么是池?

​ 池是用来保障计算机硬件平安的状况下最大限度的利用计算机

​ 它 升高 了程序的运行效率,然而保障了 计算机硬件的平安,从而让程序可能失常运行

根本应用:

# 线程池

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(5)  # 变量名轻易起  参数 5 示意池子外面固定只有五个线程
# 括号内能够传数字 不传的话默认会开设以后计算机 CPU 个数五倍的线程
"""
池子造出来之后 外面会固定存在五个线程
这五个线程不会呈现反复创立和销毁的过程
这样也节俭了反复创立线程的过程 ---> 节俭资源
池子的应用十分的简略
你只须要将须要做的工作往池子中提交即可
"""


def task(n):
    print(n)
    time.sleep(2)
    return n*n


"""
工作的提交形式分两种
    同步:异步:"""

# pool.submit(task, 1)  # 朝池子中提交工作  异步提交
# print('主')
t_list = []
for i in range(20):  # 朝池子中提交 20 个工作
    res = pool.submit(task, i)
    # print(res.result())  # submit 类中的 result 办法  这行代码会导致程序同步提交
    t_list.append(res)
# 期待线程池中所有的工作执行结束之后再打印列表中的后果
pool.shutdown()  # 敞开线程池 期待线程池中所有的工作运行结束
for t in t_list:
    print('>>>:', t.result())  # 有序输入

然而上述线程池的代码用列表的形式有点顺当,能够用上面的办法

最终实现一有后果立即获取并打印

# 过程池

from concurrent.futures import ProcessPoolExecutor
import time

pool = ProcessPoolExecutor()
# 括号内能够传数字 不传的话默认会开设以后计算机 CPU 个数的过程
"""
池子造出来之后 外面会固定存在几个过程
这几个过程不会呈现反复创立和销毁的过程
池子的应用十分的简略
你只须要将须要做的工作往池子中提交即可
"""


def task(n):
    print(n)
    time.sleep(2)
    return n*n


def call_back(n):  # n ==> pool.submit(task, i) 返回的对象
    print('call_back:', n.result())  # 对象的值用 .result() 查看


"""
工作的提交形式分两种
    同步:异步:提交后的返回后果应该通过回调来获取
        回调机制:就相当于给每个异步工作绑定了一个定时炸弹,一旦工作有后果立即触发
"""if __name__ =='__main__':
    for i in range(20):  # 朝池子中提交 20 个工作
        # res = pool.submit(task, i)
        pool.submit(task, i).add_done_callback(call_back)

总结

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

pool = ProcessPoolExecutor()  # 过程池
pool = ThreadPoolExecutor(5)  # 线程池

res = pool.submit(task, i).add_done_callback(call_back)

协程

过程:资源单位

线程:执行单位

协程:这个概念齐全是程序员本人意淫进去的 基本不存在

​ 为的是“单线程下实现并发”

​ 咱们程序员本人在代码层面上检测咱们所有的 IO 操作

​ 一旦遇到 IO 在代码级别实现切换

​ 这样给 CPU 的感觉是你这个程序始终在运行 没有 IO

​ 从而晋升程序的运行效率

多道技术

​ 切换 + 保留状态

​ CPU 两种状况下切换

​ 1、程序遇到 IO

​ 2、程序长时间占用

适宜的例子:

​ tcp 服务端

​ accept

​ recv

代码如何做到 切换 + 保留状态?

​ 切换:切换不肯定晋升效率 也有可能升高效率

​ IO 切 晋升

​ 没有 IO 切 升高

​ 保留状态:保留上一次执行的状态 下一次接着上一次的操作持续往后执行

​ yield

验证切换是否就肯定晋升效率

# 串行执行计算密集型的工作
import time


def func1():
    for i in range(100000000):
        i + 1


def func2():
    for i in range(100000000):
        i + 1


start_time = time.time()
func1()
func2()
print(time.time() - start_time)
# 切换 + yield
import time


def func1():
    while True:
        100000000 + 1
        yield


def func2():
    g = func1()  # 先初始化出生成器
    for i in range(100000000):
        i + 1
        next(g)


start_time = time.time()
func2()
print(time.time() - start_time)

gevent 模块

能够检测程序的 IO 操作

from gevent import monkey; monkey.patch_all()
import time
from gevent import spawn

"""
gevent 模块自身无奈检测常见的一些 IO 操作
在应用的时候须要你额定的导入一句话
from gevent import monkey
monkey.patch_all()
又因为下面两句话在应用 gevent 模块时是必定要导入的
所以还反对简写
from gevent import monkey; monkey.patch_all()
"""


def heng():
    print('哼')
    time.sleep(2)
    print('哼')


def ha():
    print('哈')
    time.sleep(3)
    print('哈')


def heiheihei():
    print('嘿嘿嘿')
    time.sleep(5)
    print('嘿嘿嘿')


start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()  # 期待被检测的工作执行结束 再往后继续执行
g2.join()
g3.join()
print(time.time() - start_time)  # 5.001976728439331

协程实现 tcp 服务端并发

# 服务端
from gevent import monkey; monkey.patch_all()
import socket
from gevent import spawn


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)


if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()
# 客户端
from threading import Thread, current_thread
import socket


def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1', 8080))
    n = 0
    while True:
        msg = '%s say hello %s' % (current_thread().name, n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()

总结

现实状态:

咱们能够通过

多过程上面开设多线程

多线程上面开设协程

从而使咱们的程序执行效率晋升

IO 模型简介

咱们这里钻研的 IO 模型都是针对网络 IO 的

Stevens 在文章中一共比拟了五种 IO Model:
* blocking IO 阻塞 IO
* nonblocking IO 非阻塞 IO
* IO multiplexing IO 多路复用
* signal driven IO 信号驱动 IO
* asynchronous IO 异步 IO
由 signal driven IO(信号驱动 IO)在理论中并不罕用,所以次要介绍其余四种 IO Model

1)期待数据筹备(Waiting for the data to be ready)

2)将数据从内核拷贝到过程中(Copying the data from the kernel to the process)

同步异步

阻塞非阻塞

常见的网络状态:

​ accept

​ recv

​ recvftom

​ send 尽管它也有 IO 行为 然而不在咱们的思考范畴

阻塞 IO

咱们之前写的都是阻塞 IO 模型 协程除外

import socket


server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)


while True:
    conn, addr = server.accept()
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data)
            conn.send(data.upper())
        except ConnectionResetError as e:
            break
    conn.close()
    
# 在服务端无论是开设多过程、多线程,还是过程池、线程池,其实还是没有解决 IO 问题
"""
该等的中央还是得等 没有躲避
只不过多集体期待的彼此互不烦扰
"""

非阻塞 IO

"""要本人实现一个非阻塞 IO 模型"""
# 服务端
import socket

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)  # 参数默认是 True 改为 False 会将所有的网络阻塞变为非阻塞

r_list = []
del_list = []
while True:
    try:
        conn, addr = server.accept()
        r_list.append(conn)
    except BlockingIOError:
        for conn in r_list:
            try:
                data = conn.recv(1024)  # 没有音讯 报错
                if len(data) == 0:  # 客户端断开链接
                    conn.close()  # 敞开 conn
                    # 将无用的 conn 从 r_list 删除
                    del_list.append(conn)
                    continue
                conn.send(data.upper())  # 给客户端返回大写
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_list.append(conn)
        # 回收无用的链接
        for conn in del_list:
            r_list.remove(conn)
        del_list.clear()
        
############################################################

# 客户端
import socket

client = socket.socket()
client.connect(('127.0.0.1', 8080))


while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data)
总结

尽管非阻塞 IO 给你的感觉十分的牛逼,然而该模型会 长时间占用着 CPU 并且不干活(让 CPU 不停的空转)

咱们理论利用中也不会思考应用非阻塞 IO 模型

然而任何的技术点都有它存在的意义

理论利用或者是 思维借鉴

IO 多路复用

当监管的对象只有一个的时候 其实 IO 多路复用连阻塞 IO 都比不上!

然而 IO 多路复用能够一次性监管很多个对象:

​ server = socket.socket()

​ conn, add = server.accept()

监管机制 是操作系统自身就有的 如果你想用该监管机制(select)

须要你导入对应的 select 模块

# 服务端
import socket
import select

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
read_list = [server]

while True:
    r_list, w_list, x_list = select.select(read_list, [], [])
    for i in r_list:
        """针对不同的对象做不同的解决"""
        if i is server:
            conn, addr = i.accept()
            read_list.append(conn)  # conn 对象也应该增加到监管的队列中
        else:
            res = i.recv(1024)
            if len(res) == 0:
                i.close()
                # 将有效的监管对象 移除
                read_list.remove(i)
                continue
            print(res)
            i.send(b'heiheihei')
# 客户端
import socket

client = socket.socket()
client.connect(('127.0.0.1', 8080))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data)
总结

监管机制其实有很多:

​ select 机制 ==> Windows Linux 都有

​ poll 机制 ===> 只在 Linux 有,poll 和 select 都能够监管多个对象 然而 poll 监管的数量更多

​ <u> 上述 select 和 poll 机制都不是很完满 当监管的对象特地多的时候 可能会呈现极其大的提早响应 </u>

​ epoll 机制 ===> 只在 Linux 有

它给每一个监管对象都绑定了一个回调机制,一旦有响应 回调机制立即发动揭示

针对不同的操作系统还须要思考不同的检测机制 书写代码太过繁琐

有一个模块可能依据你跑的平台的不同主动帮你抉择对应的监管机制 => selectors 模块

异步 IO

异步 IO 模型是所有模型中效率最高的,也是应用最宽泛的

相干的模块和框架

​ 模块:asyncio 模块

​ 异步框架:sanic、tronado、twisted

对立特点:速度快!!!

import threading
import asyncio


async def hello():
    print('hello world %s' % threading.current_thread())
    await asyncio.sleep(1)  # 此处模仿真正的 IO 操作的耗时
    print('hello world %s' % threading.current_thread())


loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

四个 IO 模型比照

正文完
 0