多线程多过程多协程(转载)
- Bilibili 蚂蚁学 Python UP 主说得很好
- 工作中看视频不不便,截取重点局部改为文字版,不便抄作业
- 原地址:https://www.bilibili.com/video/BV1bK411A7tV
多线程
-
定义一个函数
def my_func(a,b): do_something(a,b)
-
创立线程
import threading t = threading.Thread(target=my_func, args=(a, b))
-
启动线程
t.start()
-
期待完结
t.join()
队列
-
queue.Queue 是线程平安的
import queue q = queue.Queue() # 增加与获取 q.put(time) item = q.get() # 查看状态 q.qsize() q.empty() q.full()
线程平安(锁)
-
try-finally
import threading lock = threading.Lock() lock.acquire() try: # do something finally: lock.release()
-
with
import threading lock = threading.Lock() with lock: # do something
线程池
-
map 函数,后果预入参程序对应
from concurrent.futures import ThreadPoolExecutor arg_list = [] with ThreadPoolExecutor() as pool: results = pool.map(my_func, arg_list) for result in results: print(results)
-
submit 函数,as_completed 程序可按实现程序
from concurrent.futures import ThreadPoolExecutor, as_completed arg_list = [] with ThreadPoolExecutor() as pool: futures = [pool.submit(my_func, arg) for arg in arg_list] # 按输出程序 for future in futures: print(future.result()) # 按实现程序 for future in as_completed(futures): print(future.result())
Flask 中应用线程池
import time
from concurrent.futures import ThreadPoolExecutor
from flask import Flask
app = Flask(__name__)
pool = ThreadPoolExecutor()
def do_1():
time.sleep(1)
return 'do_1'
def do_2():
time.sleep(1)
return 'do_2'
def do_3():
time.sleep(1)
return 'do_3'
@app.route("/")
def index():
result_1 = pool.submit(do_1)
result_2 = pool.submit(do_2)
result_3 = pool.submit(do_3)
return {'1': result_1.result(),
'2': result_2.result(),
'3': result_3.result(),}
if __name__ == "__main__":
app.run()
多过程
图片截图自 蚂蚁学 Python Bilibili 03:00
Flask 应用多过程
import time
from concurrent.futures import ProcessPoolExecutor
from flask import Flask
app = Flask(__name__)
def do_1():
time.sleep(1)
return 'do_1'
def do_2():
time.sleep(1)
return 'do_2'
def do_3():
time.sleep(1)
return 'do_3'
@app.route("/")
def index():
result_1 = pool.submit(do_1)
result_2 = pool.submit(do_2)
result_3 = pool.submit(do_3)
return {'1': result_1.result(),
'2': result_2.result(),
'3': result_3.result(),}
if __name__ == "__main__":
pool = ProcessPoolExecutor()
app.run()
协程:asyncio、await
import asyncio
import aiohttp
loop = asyncio.get_event_loop()
async def get_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
print(f"url:{url},{len(result)}")
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)]
tasks = [loop.create_task(get_url(url)) for url in urls]
loop.run_until_complete(asyncio.wait(tasks))
管制 asyncio 并发数
-
try-finally
import asyncio sem = asyncio.Semaphore(10) await sem.acquire() try: # do something finally: sem.release()
-
with
import asyncio sem = asyncio.Semaphore(10) async with sem: # do something
-
举例
import asyncio import aiohttp loop = asyncio.get_event_loop() # 限度 10 个并发 semaphore = asyncio.Semaphore(10) async def get_url(url): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: result = await resp.text() print(f"url:{url},{len(result)}") urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)] tasks = [loop.create_task(get_url(url)) for url in urls] loop.run_until_complete(asyncio.wait(tasks))