Hello,大家好,我是程序员小二!
如果有一个文件,外面有 10 万个 url,须要对每个 url 发送 http 申请,并打印申请后果的状态码,如何编写代码尽可能快的实现这些工作呢?
Python 并发编程有很多办法,多线程的规范库 threading,concurrency,协程 asyncio,当然还有 grequests 这种异步库,每一个都能够实现上述需要,上面一一用代码实现一下,本文的代码能够间接运行,给你当前的并发编程作为参考:
队列+多线程
定义一个大小为 400 的队列,而后开启 200 个线程,每个线程都是一直的从队列中获取 url 并拜访。
主线程读取文件中的 url 放入队列中,而后期待队列中所有的元素都被接管和处理完毕。代码如下:
from threading import Threadimport sysfrom queue import Queueimport requestsconcurrent = 200def doWork():while True:url = q.get()status, url = getStatus(url)doSomethingWithResult(status, url)q.task_done()def getStatus(ourl):try:res = requests.get(ourl)return res.status_code, ourlexcept:return "error", ourldef doSomethingWithResult(status, url):print(status, url)q = Queue(concurrent * 2)for i in range(concurrent):t = Thread(target=doWork)t.daemon = Truet.start()try:for url in open("urllist.txt"):q.put(url.strip())q.join()except KeyboardInterrupt:sys.exit(1)
运行后果如下:
有没有 get 到新技能?
线程池
如果你应用线程池,举荐应用更高级的 concurrent.futures 库:
import concurrent.futuresimport requestsout = []CONNECTIONS = 100TIMEOUT = 5urls = []with open("urllist.txt") as reader:for url in reader:urls.append(url.strip())def load_url(url, timeout):ans = requests.get(url, timeout=timeout)return ans.status_codewith concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)for future in concurrent.futures.as_completed(future_to_url):try:data = future.result()except Exception as exc:data = str(type(exc))finally:out.append(data)print(data)
协程 + aiohttp
协程也是并发十分罕用的工具了,
import asynciofrom aiohttp import ClientSession, ClientConnectorErrorasync def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:try:resp = await session.request(method="GET", url=url, **kwargs)except ClientConnectorError:return (url, 404)return (url, resp.status)async def make_requests(urls: set, **kwargs) -> None:async with ClientSession() as session:tasks = []for url in urls:tasks.append(fetch_html(url=url, session=session, **kwargs))results = await asyncio.gather(*tasks)for result in results:print(f'{result[1]} - {str(result[0])}')if __name__ == "__main__":import sysassert sys.version_info >= (3, 7), "Script requires Python 3.7+."with open("urllist.txt") as infile:urls = set(map(str.strip, infile))asyncio.run(make_requests(urls=urls))
grequests[1]
这是个第三方库,目前有 3.8K 个星,就是 Requests + Gevent[2],让异步 http 申请变得更加简略。Gevent 的实质还是协程。
应用前:
pip install grequests
应用起来那是相当的简略:
import grequestsurls = []with open("urllist.txt") as reader:for url in reader:urls.append(url.strip())rs = (grequests.get(u) for u in urls)for result in grequests.map(rs):print(result.status_code, result.url)
留神 grequests.map(rs) 是并发执行的。运行后果如下:
也能够退出异样解决:
>>> def exception_handler(request, exception):... print("Request failed")>>> reqs = [... grequests.get('http://httpbin.org/delay/1', timeout=0.001),... grequests.get('http://fakedomain/'),... grequests.get('http://httpbin.org/status/500')]>>> grequests.map(reqs, exception_handler=exception_handler)Request failedRequest failed[None, None, <Response [500]>]
最初的话
明天分享了并发 http 申请的几种实现形式,有人说异步(协程)性能比多线程好,其实要分场景看的,没有一种办法实用所有的场景,笔者就曾做过一个试验,也是申请 url,当并发数量超过 500 时,协程显著变慢。所以,不能说哪个肯定比哪个好,须要划分状况。