Python并發(fā)編程線程消息通信機(jī)制詳解
前面我已經(jīng)向大家介紹了,如何使用創(chuàng)建線程,啟動(dòng)線程。相信大家都會(huì)有這樣一個(gè)想法,線程無非就是創(chuàng)建一下,然后再start()
下,實(shí)在是太簡單了。
可是要知道,在真實(shí)的項(xiàng)目中,實(shí)際場景可要我們舉的例子要復(fù)雜的多得多,不同線程的執(zhí)行可能是有順序的,或者說他們的執(zhí)行是有條件的,是要受控制的。如果僅僅依靠前面學(xué)的那點(diǎn)淺薄的知識(shí),是遠(yuǎn)遠(yuǎn)不夠的。
那今天,我們就來探討一下如何控制線程的觸發(fā)執(zhí)行。
要實(shí)現(xiàn)對多個(gè)線程進(jìn)行控制,其實(shí)本質(zhì)上就是消息通信機(jī)制在起作用,利用這個(gè)機(jī)制發(fā)送指令,告訴線程,什么時(shí)候可以執(zhí)行,什么時(shí)候不可以執(zhí)行,執(zhí)行什么內(nèi)容。
經(jīng)過我的總結(jié),線程中通信方法大致有如下三種:
threading.Event
threading.Condition
queue.Queue
接下來我們來一一探討下。
1Event事件
Python提供了非常簡單的通信機(jī)制 Threading.Event
,通用的條件變量。多個(gè)線程可以等待某個(gè)事件的發(fā)生,在事件發(fā)生后,所有的線程都會(huì)被激活。
關(guān)于Event的使用也超級(jí)簡單,就三個(gè)函數(shù)
event = threading.Event() # 重置event,使得所有該event事件都處于待命狀態(tài) event.clear() # 等待接收event的指令,決定是否阻塞程序執(zhí)行 event.wait() # 發(fā)送event指令,使所有設(shè)置該event事件的線程執(zhí)行 event.set()
舉個(gè)例子來看下。
import time import threading class MyThread(threading.Thread): def __init__(self, name, event): super().__init__() self.name = name self.event = event def run(self): print('Thread: {} start at {}'.format(self.name, time.ctime(time.time()))) # 等待event.set()后,才能往下執(zhí)行 self.event.wait() print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time()))) threads = [] event = threading.Event() # 定義五個(gè)線程 [threads.append(MyThread(str(i), event)) for i in range(1,5)] # 重置event,使得event.wait()起到阻塞作用 event.clear() # 啟動(dòng)所有線程 [t.start() for t in threads] print('等待5s...') time.sleep(5) print('喚醒所有線程...') event.set()
Thread: 1 start at Sun May 13 20:38:08 2018 Thread: 2 start at Sun May 13 20:38:08 2018 Thread: 3 start at Sun May 13 20:38:08 2018 Thread: 4 start at Sun May 13 20:38:08 2018 等待5s... 喚醒所有線程... Thread: 1 finish at Sun May 13 20:38:13 2018 Thread: 4 finish at Sun May 13 20:38:13 2018 Thread: 2 finish at Sun May 13 20:38:13 2018 Thread: 3 finish at Sun May 13 20:38:13 2018
可見在所有線程都啟動(dòng)(start()
)后,并不會(huì)執(zhí)行完,而是都在self.event.wait()
止住了,需要我們通過event.set()
來給所有線程發(fā)送執(zhí)行指令才能往下執(zhí)行。
2 Condition
Condition和Event 是類似的,并沒有多大區(qū)別。
同樣,Condition也只需要掌握幾個(gè)函數(shù)即可。
cond = threading.Condition() # 類似lock.acquire() cond.acquire() # 類似lock.release() cond.release() # 等待指定觸發(fā),同時(shí)會(huì)釋放對鎖的獲取,直到被notify才重新占有瑣。 cond.wait() # 發(fā)送指定,觸發(fā)執(zhí)行 cond.notify()
舉個(gè)網(wǎng)上一個(gè)比較趣的捉迷藏的例子來看看
import threading, time class Hider(threading.Thread): def __init__(self, cond, name): super(Hider, self).__init__() self.cond = cond self.name = name def run(self): time.sleep(1) #確保先運(yùn)行Seeker中的方法 self.cond.acquire() print(self.name + ': 我已經(jīng)把眼睛蒙上了') self.cond.notify() self.cond.wait() print(self.name + ': 我找到你了哦 ~_~') self.cond.notify() self.cond.release() print(self.name + ': 我贏了') class Seeker(threading.Thread): def __init__(self, cond, name): super(Seeker, self).__init__() self.cond = cond self.name = name def run(self): self.cond.acquire() self.cond.wait() print(self.name + ': 我已經(jīng)藏好了,你快來找我吧') self.cond.notify() self.cond.wait() self.cond.release() print(self.name + ': 被你找到了,哎~~~') cond = threading.Condition() seeker = Seeker(cond, 'seeker') hider = Hider(cond, 'hider') seeker.start() hider.start()
通過cond來通信,阻塞自己,并使對方執(zhí)行。從而,達(dá)到有順序的執(zhí)行。
看下結(jié)果
hider:我已經(jīng)把眼睛蒙上了 seeker: 我已經(jīng)藏好了,你快來找我吧 hider:我找到你了 ~_~ hider:我贏了 seeker: 被你找到了,哎~~~
3 Queue隊(duì)列
最后一個(gè),隊(duì)列,它是本節(jié)的重點(diǎn),因?yàn)樗俏覀內(nèi)粘i_發(fā)中最使用頻率最高的。
從一個(gè)線程向另一個(gè)線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊(duì)列了。創(chuàng)建一個(gè)被多個(gè)線程共享的 Queue 對象,這些線程通過使用put()
和 get()
操作來向隊(duì)列中發(fā)送和獲取元素。
同樣,對于Queue,我們也只需要掌握幾個(gè)函數(shù)即可。
from queue import Queue # maxsize默認(rèn)為0,不受限 # 一旦>0,而消息數(shù)又達(dá)到限制,q.put()也將阻塞 q = Queue(maxsize=0) # 默認(rèn)阻塞程序,等待隊(duì)列消息,可設(shè)置超時(shí)時(shí)間 q.get(block=True, timeout=None) # 發(fā)送消息:默認(rèn)會(huì)阻塞程序至隊(duì)列中有空閑位置放入數(shù)據(jù) q.put(item, block=True, timeout=None) # 等待所有的消息都被消費(fèi)完 q.join() # 通知隊(duì)列任務(wù)處理已經(jīng)完成,當(dāng)所有任務(wù)都處理完成時(shí),join() 阻塞將會(huì)解除 q.task_done()
以下三個(gè)方法,知道就好,一般不需要使用
# 查詢當(dāng)前隊(duì)列的消息個(gè)數(shù) q.qsize() # 隊(duì)列消息是否都被消費(fèi)完,返回 True/False q.empty() # 檢測隊(duì)列里消息是否已滿 q.full()
函數(shù)會(huì)比之前的多一些,同時(shí)也從另一方面說明了其功能更加豐富。
我來舉個(gè)老師點(diǎn)名的例子。
# coding=utf-8 # /usr/bin/env python ''' Author: wangbm Email: wongbingming@163.com Wechat: mrbensonwon Blog: python-online.cn 公眾號(hào):Python編程時(shí)光 date: 2020/9/20 下午7:30 desc: ''' __author__ = 'wangbm' from queue import Queue from threading import Thread import time class Student: def __init__(self, name): self.name = name def speak(self): print("{}:到!".format(self.name)) class Teacher: def __init__(self, queue): super().__init__() self.queue=queue def call(self, student_name): if student_name == "exit": print("點(diǎn)名結(jié)束,開始上課..") else: print("老師:{}來了沒?".format(student_name)) # 發(fā)送消息,要點(diǎn)誰的名 self.queue.put(student_name) class CallManager(Thread): def __init__(self, queue): super().__init__() self.students = {} self.queue = queue def put(self, student): self.students.setdefault(student.name, student) def run(self): while True: # 阻塞程序,時(shí)刻監(jiān)聽老師,接收消息 student_name = queue.get() if student_name == "exit": break elif student_name in self.students: self.students[student_name].speak() else: print("老師,咱班,沒有 {} 這個(gè)人".format(student_name)) queue = Queue() teacher = Teacher(queue=queue) s1 = Student(name="小明") s2 = Student(name="小亮") cm = CallManager(queue) cm.put(s1) cm.put(s2) cm.start() print('開始點(diǎn)名~') teacher.call('小明') time.sleep(1) teacher.call('小亮') time.sleep(1) teacher.call("exit")
運(yùn)行結(jié)果如下
開始點(diǎn)名~
老師:小明來了沒?
小明:到!
老師:小亮來了沒?
小亮:到!
點(diǎn)名結(jié)束,開始上課..
其實(shí) queue 還有一個(gè)很重要的方法,Queue.task_done()
如果不明白它的原理,我們在寫程序,就很有可能卡死。
當(dāng)我們使用 Queue.get() 從隊(duì)列取出數(shù)據(jù)后,這個(gè)數(shù)據(jù)有沒有被正常消費(fèi),是很重要的。
如果數(shù)據(jù)沒有被正常消費(fèi),那么Queue會(huì)認(rèn)為這個(gè)任務(wù)還在執(zhí)行中,此時(shí)你使用 Queue.join() 會(huì)一直阻塞,即使此時(shí)你的隊(duì)列里已經(jīng)沒有消息了。
那么如何解決這種一直阻塞的問題呢?
就是在我們正常消費(fèi)完數(shù)據(jù)后,記得調(diào)用一下 Queue.task_done(),說明隊(duì)列這個(gè)任務(wù)已經(jīng)結(jié)束了。
當(dāng)隊(duì)列內(nèi)部的任務(wù)計(jì)數(shù)器歸于零時(shí),調(diào)用 Queue.join() 就不會(huì)再阻塞了。
要理解這個(gè)過程,請參考 http://python.iswbm.com/en/latest/c02/c02_06.html 里自定義線程池的的例子。
4 總結(jié)一下
學(xué)習(xí)了以上三種通信方法,我們很容易就能發(fā)現(xiàn)Event
和 Condition
是threading模塊原生提供的模塊,原理簡單,功能單一,它能發(fā)送 True
和 False
的指令,所以只能適用于某些簡單的場景中。
而Queue
則是比較高級(jí)的模塊,它可能發(fā)送任何類型的消息,包括字符串、字典等。其內(nèi)部實(shí)現(xiàn)其實(shí)也引用了Condition
模塊(譬如put
和get
函數(shù)的阻塞),正是其對Condition
進(jìn)行了功能擴(kuò)展,所以功能更加豐富,更能滿足實(shí)際應(yīng)用。
以上就是Python并發(fā)編程線程消息通信機(jī)制詳解的詳細(xì)內(nèi)容,更多關(guān)于Python并發(fā)線程消息通信機(jī)制的資料請關(guān)注本站其它相關(guān)文章!
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。