Python線程池的正確使用方法
Python線程池的正確使用
1、為什么要使用線程池呢?
因為線程執(zhí)行完任務(wù)之后就會被系統(tǒng)銷毀,下次再執(zhí)行任務(wù)的時候再進行創(chuàng)建。這種方式在邏輯上沒有啥問題。但是系統(tǒng)啟動一個新線程的成本是比較高,因為其中涉及與操作系統(tǒng)的交互,操作系統(tǒng)需要給新線程分配資源。打個比方吧!就像軟件公司招聘員工干活一樣。當(dāng)有活干時,就招聘一個外包人員干活。當(dāng)活干完之后就把這個人員辭退掉。你說在這過程中所耗費的時間成本和溝通成本是不是很大。那么公司一般的做法是:當(dāng)項目立項時就確定需要幾名開發(fā)人員,然后將這些人員配齊。然后這些人員就常駐在項目組,有活就干,沒活就摸魚。線程池也是同樣的道理。線程池可以定義最大線程數(shù),這些線程有任務(wù)就執(zhí)行任務(wù),沒任務(wù)就進入線程池中歇著。
2、線程池怎么用呢?
線程池的基類是concurrent.futures模塊中的Executor類,而Executor
類提供了兩個子類,即ThreadPoolExecutor
類和ProcessPoolExecutor
類。其中ThreadPoolExecutor
用于創(chuàng)建線程池
,而ProcessPoolExecutor
用于創(chuàng)建進程池
。本文將重點介紹ThreadPoolExecutor類的使用。首先,讓我們來看看ThreadPoolExecutor類的構(gòu)造函數(shù)。這里使用的Python版本是:3.6.7。
def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or 1) * 5 if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter()))
他的構(gòu)造函數(shù)只有兩個參數(shù):一個是max_workers
參數(shù),用于指定線程池的最大線程數(shù),如果不指定的話則默認(rèn)是CPU核數(shù)的5倍。另一個參數(shù)是thread_name_prefix
,它用來指定線程池中線程的名稱前綴。其他參數(shù):
_shutdown
初始值值為False,默認(rèn)情況下線程池不銷毀,即線程池的生命周期跟項目的生命周期一致。self._work_queue = queue.Queue
()生成緩沖隊列。_threads
沒有任務(wù)被提交時,線程的數(shù)量設(shè)置為0。_shutdown_lock
指定線程池的鎖是Lock鎖。- 說完了線程池的創(chuàng)建之后,接著來看看線程池中比較常用的幾個方法吧。
submit(self, fn, *args, **kwargs):
- 該方法用提交任務(wù),即將fn函數(shù)提交給線程池,*args代表傳給fn函數(shù)的參數(shù),**kwargs代表以關(guān)鍵字參數(shù)的形式為fn函數(shù)傳入?yún)?shù)。
shutdown(self, wait=True):
- 關(guān)閉線程池
map(func, *iterables, timeout=None, chunksize=1):
- 該函數(shù)類似于全局函數(shù)
map(func,*iterables
),只是該函數(shù)將會啟動多個線程,以異步方式立即對iterables執(zhí)行map處理。
程序?qū)ask函數(shù)通過submit方法提交給線程池之后,線程池會返回一個Future對象,該對象的作用主要是用于獲取線程任務(wù)函數(shù)的返回值。Future提供了如下幾個方法。
cancel():
取消該Future代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回False;否則,程序會取消該任務(wù),并返回True。result(timeout=None):
獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。add_done_callback(fn):
為該 Future 代表的線程任務(wù)注冊一個“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時,程序會自動觸發(fā)該 fn 函數(shù)。done():
如果該Future代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回True。
來個簡單的例子:
該例中創(chuàng)建了一個最大線程數(shù)是2的線程池來執(zhí)行async_add函數(shù)。
from concurrent.futures import ThreadPoolExecutor import threading import time def async_add(max): sum = 0 for i in range(max): sum = sum + i time.sleep(1) print(threading.current_thread().name + "執(zhí)行求和操作求得的和是=" + str(sum)) return sum # 創(chuàng)建兩個線程 pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='測試線程') # 向線程池提交一個task,20作為async_add()函數(shù)的參數(shù) future1 = pool.submit(async_add, 20) # 向線程池再提交一個task future2 = pool.submit(async_add, 50) # 判斷future1代表的任務(wù)是否執(zhí)行完 time.sleep(2) print(future1.done()) print(future2.done()) # 查看future1代表的任務(wù)返回的結(jié)果 print('線程一的執(zhí)行結(jié)果是=' + str(future1.result())) # 查看future2代表的任務(wù)的返回結(jié)果 print('線程二的執(zhí)行結(jié)果是=' + str(future2.result())) print("----" + threading.current_thread().name + "----主線程執(zhí)行結(jié)束-----")
運行結(jié)果是:
測試線程_0執(zhí)行求和操作求得的和是=190
測試線程_1執(zhí)行求和操作求得的和是=1225
True
True
線程一的執(zhí)行結(jié)果是=190
線程二的執(zhí)行結(jié)果是=1225
----MainThread----主線程執(zhí)行結(jié)束-----
本例中定義了一個最大線程數(shù)是2的線程池,并向線程池中提交了兩個任務(wù),其中async_add函數(shù)就是要執(zhí)行的任務(wù)。在async_add
函數(shù)中添加 time.sleep(1)
休眠一秒是為了驗證done()方法返回的結(jié)果。最后才打印主線程執(zhí)行結(jié)束表明result()方法是阻塞的。如果將result()屏蔽掉。
改成如下形式:
# 創(chuàng)建兩個線程 pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='測試線程') # 向線程池提交一個task,20作為async_add()函數(shù)的參數(shù) future1 = pool.submit(async_add, 20) # 向線程池再提交一個task future2 = pool.submit(async_add, 50) # 判斷future1代表的任務(wù)是否執(zhí)行完 print(future1.done()) print(future2.done()) print("----" + threading.current_thread().name + "----主線程執(zhí)行結(jié)束-----")
則運行結(jié)果是:
False
False
----MainThread----主線程執(zhí)行結(jié)束-----
測試線程_0執(zhí)行求和操作求得的和是=190
測試線程_1執(zhí)行求和操作求得的和是=1225
3、如何非阻塞的獲取線程執(zhí)行的結(jié)果
前面介紹的result()方法是通過阻塞的方式來獲取線程的運行結(jié)果的。那么如果通過非阻塞的方法來獲取線程任務(wù)最后的返回結(jié)果呢?這里就需要使用線程的回調(diào)函數(shù)來獲取線程的返回結(jié)果。
from concurrent.futures import ThreadPoolExecutor import threading import time def async_add(max): sum = 0 for i in range(max): sum = sum + i time.sleep(1) print(threading.current_thread().name + "執(zhí)行求和操作求得的和是=" + str(sum)) return sum with ThreadPoolExecutor(max_workers=2) as pool: # 向線程池提交一個task future1 = pool.submit(async_add, 20) future2 = pool.submit(async_add, 50) # 定義獲取結(jié)果的函數(shù) def get_result(future): print(threading.current_thread().name + '運行結(jié)果:' + str(future.result())) # 查看future1代表的任務(wù)返回的結(jié)果 future1.add_done_callback(get_result) # 查看future2代表的任務(wù)的返回結(jié)果 future2.add_done_callback(get_result) print('------------主線程執(zhí)行結(jié)束----')
運行結(jié)果是:
------------主線程執(zhí)行結(jié)束----
ThreadPoolExecutor-0_1執(zhí)行求和操作求得的和是=1225
ThreadPoolExecutor-0_1運行結(jié)果:1225
ThreadPoolExecutor-0_0執(zhí)行求和操作求得的和是=190
ThreadPoolExecutor-0_0運行結(jié)果:190
從結(jié)果可以看出獲取線程執(zhí)行結(jié)果的方法完全沒有阻塞到主線程的運行。這里通過add_done_callback
函數(shù)向線程池中注冊了一個獲取線程執(zhí)行結(jié)果的函數(shù)get_result。
由于線程池實現(xiàn)了上下文管理協(xié)議(Context Manage Protocol),因此程序可以使用with語句來管理線程池,這樣即可避免手動關(guān)閉線程池。
4、線程池的運行策略
這里有必要介紹一下線程池的執(zhí)行策略,也就是說當(dāng)線程池中的任務(wù)數(shù)大于線程池的最大線程數(shù)時,線程池該如何處理這些任務(wù)呢?處理不了的任務(wù)是直接丟棄還是慢慢處理呢?再回答這個問題之前,讓我們來看下下面這個例子:這里定義了一個最大線程數(shù)是4個線程池,然后向線程池中提交了100個task任務(wù)。
def async_add(max): sum = 0 for i in range(max): sum = sum + i time.sleep(1) print(threading.current_thread().name + "執(zhí)行求和操作求得的和是=" + str(sum)) return sum with ThreadPoolExecutor(max_workers=4) as pool: for i in range(100): pool.submit(async_add, i) print('------------主線程執(zhí)行結(jié)束----')
運行結(jié)果是:
------------主線程執(zhí)行結(jié)束----
ThreadPoolExecutor-0_1執(zhí)行求和操作求得的和是=0
ThreadPoolExecutor-0_0執(zhí)行求和操作求得的和是=0
ThreadPoolExecutor-0_3執(zhí)行求和操作求得的和是=3
ThreadPoolExecutor-0_2執(zhí)行求和操作求得的和是=1
...省略部分結(jié)果.....
ThreadPoolExecutor-0_1執(zhí)行求和操作求得的和是=4656
ThreadPoolExecutor-0_2執(zhí)行求和操作求得的和是=4753
ThreadPoolExecutor-0_0執(zhí)行求和操作求得的和是=4560
ThreadPoolExecutor-0_3執(zhí)行求和操作求得的和是=4851
從運行結(jié)果可以看出:一直都是相同的線程來執(zhí)行這些任務(wù),并且所有的任務(wù)都沒有被丟棄。并且任務(wù)按照先來后到的順序來執(zhí)行。這里就需要說到線程池默認(rèn)的緩沖隊列了。self._work_queue = queue.Queue()
該語句會創(chuàng)建一個大小無限制的緩沖隊列。該隊列是一個 FIFO(先進先出)的常規(guī)隊列。所以當(dāng)任務(wù)數(shù)超過最大線程數(shù)時,任務(wù)會暫時放在緩沖隊列queue中。當(dāng)線程空閑之后會從緩沖隊列中取出任務(wù)來執(zhí)行。
該隊列有個參數(shù)maxsize可以限制隊列的大小。如果隊列的大小達(dá)到隊列的上限,就會加鎖,再次加入元素時,就會被阻塞,直到隊列中的元素被消費。如果將maxsize的設(shè)置為0或者負(fù)數(shù)時,則該隊列的大小就是無限制的。
到此這篇關(guān)于Python線程池的正確使用方法的文章就介紹到這了,更多相關(guān)Python線程池的正確使用內(nèi)容請搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。