Python并發(fā)編程隊列與多線程最快發(fā)送http請求方式
Python 并發(fā)編程有很多方法,多線程的標(biāo)準(zhǔn)庫 threading,concurrency,協(xié)程 asyncio,當(dāng)然還有 grequests 這種異步庫,每一個都可以實(shí)現(xiàn)上述需求,下面一一用代碼實(shí)現(xiàn)一下,本文的代碼可以直接運(yùn)行,給你以后的并發(fā)編程作為參考:
隊列+多線程
定義一個大小為 400 的隊列,然后開啟 200 個線程,每個線程都是不斷的從隊列中獲取 url 并訪問。
主線程讀取文件中的 url 放入隊列中,然后等待隊列中所有的元素都被接收和處理完畢。代碼如下:
from threading import Thread import sys from queue import Queue import requests concurrent = 200 def doWork(): while True: url = q.get() status, url = getStatus(url) doSomethingWithResult(status, url) q.task_done() def getStatus(ourl): try: res = requests.get(ourl) return res.status_code, ourl except: return "error", ourl def doSomethingWithResult(status, url): print(status, url) q = Queue(concurrent * 2) for i in range(concurrent): t = Thread(target=doWork) t.daemon = True t.start() try: for url in open("urllist.txt"): q.put(url.strip()) q.join() except KeyboardInterrupt: sys.exit(1)
運(yùn)行結(jié)果如下:
有沒有 get 到新技能?
線程池
如果你使用線程池,推薦使用更高級的 concurrent.futures 庫:
import concurrent.futures import requests out = [] CONNECTIONS = 100 TIMEOUT = 5 urls = [] with open("urllist.txt") as reader: for url in reader: urls.append(url.strip()) def load_url(url, timeout): ans = requests.get(url, timeout=timeout) return ans.status_code with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor: future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls) for future in concurrent.futures.as_completed(future_to_url): try: data = future.result() except Exception as exc: data = str(type(exc)) finally: out.append(data) print(data)
協(xié)程 + aiohttp
協(xié)程也是并發(fā)非常常用的工具了:
import asyncio from aiohttp import ClientSession, ClientConnectorError async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple: try: resp = await session.request(method="GET", url=url, **kwargs) except ClientConnectorError: return (url, 404) return (url, resp.status) async def make_requests(urls: set, **kwargs) -> None: async with ClientSession() as session: tasks = [] for url in urls: tasks.append( fetch_html(url=url, session=session, **kwargs) ) results = await asyncio.gather(*tasks) for result in results: print(f'{result[1]} - {str(result[0])}') if __name__ == "__main__": import sys assert sys.version_info >= (3, 7), "Script requires Python 3.7+." with open("urllist.txt") as infile: urls = set(map(str.strip, infile)) asyncio.run(make_requests(urls=urls))
grequests
這是個第三方庫,目前有 3.8K 個星,就是 Requests + Gevent,讓異步 http 請求變得更加簡單。Gevent 的本質(zhì)還是協(xié)程。
使用前:
pip install grequests
使用起來那是相當(dāng)?shù)暮唵危?/p>
import grequests urls = [] with open("urllist.txt") as reader: for url in reader: urls.append(url.strip()) rs = (grequests.get(u) for u in urls) for result in grequests.map(rs): print(result.status_code, result.url)
注意 grequests.map(rs)
是并發(fā)執(zhí)行的。運(yùn)行結(jié)果如下:
也可以加入異常處理:
>>> def exception_handler(request, exception): ... print("Request failed") >>> reqs = [ ... grequests.get('http://httpbin.org/delay/1', timeout=0.001), ... grequests.get('http://fakedomain/'), ... grequests.get('http://httpbin.org/status/500')] >>> grequests.map(reqs, exception_handler=exception_handler) Request failed Request failed [None, None, <Response [500]>]
最后的話
今天分享了并發(fā) http 請求的幾種實(shí)現(xiàn)方式,有人說異步(協(xié)程)性能比多線程好,其實(shí)要分場景看的,沒有一種方法適用所有的場景,筆者就曾做過一個實(shí)驗(yàn),也是請求 url,當(dāng)并發(fā)數(shù)量超過 500 時,協(xié)程明顯變慢。
以上就是Python并發(fā)編程隊列與多線程最快發(fā)送http請求方式的詳細(xì)內(nèi)容,更多關(guān)于Python并發(fā)編程隊列與多線程的資料請關(guān)注本站其它相關(guān)文章!
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。