人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

Python并發(fā)編程線程消息通信機(jī)制詳解

發(fā)布日期:2021-12-20 12:56 | 文章來源:站長之家

前面我已經(jīng)向大家介紹了,如何使用創(chuàng)建線程,啟動(dòng)線程。相信大家都會(huì)有這樣一個(gè)想法,線程無非就是創(chuàng)建一下,然后再start()下,實(shí)在是太簡單了。

可是要知道,在真實(shí)的項(xiàng)目中,實(shí)際場景可要我們舉的例子要復(fù)雜的多得多,不同線程的執(zhí)行可能是有順序的,或者說他們的執(zhí)行是有條件的,是要受控制的。如果僅僅依靠前面學(xué)的那點(diǎn)淺薄的知識(shí),是遠(yuǎn)遠(yuǎn)不夠的。

那今天,我們就來探討一下如何控制線程的觸發(fā)執(zhí)行。

要實(shí)現(xiàn)對多個(gè)線程進(jìn)行控制,其實(shí)本質(zhì)上就是消息通信機(jī)制在起作用,利用這個(gè)機(jī)制發(fā)送指令,告訴線程,什么時(shí)候可以執(zhí)行,什么時(shí)候不可以執(zhí)行,執(zhí)行什么內(nèi)容。

經(jīng)過我的總結(jié),線程中通信方法大致有如下三種:

threading.Event

threading.Condition

queue.Queue

接下來我們來一一探討下。

1Event事件

Python提供了非常簡單的通信機(jī)制 Threading.Event,通用的條件變量。多個(gè)線程可以等待某個(gè)事件的發(fā)生,在事件發(fā)生后,所有的線程都會(huì)被激活。

關(guān)于Event的使用也超級(jí)簡單,就三個(gè)函數(shù)

event = threading.Event()
# 重置event,使得所有該event事件都處于待命狀態(tài)
event.clear()
# 等待接收event的指令,決定是否阻塞程序執(zhí)行
event.wait()
# 發(fā)送event指令,使所有設(shè)置該event事件的線程執(zhí)行
event.set()

舉個(gè)例子來看下。

import time
import threading
class MyThread(threading.Thread):
 def __init__(self, name, event):
  super().__init__()
  self.name = name
  self.event = event
 def run(self):
  print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
  # 等待event.set()后,才能往下執(zhí)行
  self.event.wait()
  print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))
threads = []
event = threading.Event()
# 定義五個(gè)線程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]
# 重置event,使得event.wait()起到阻塞作用
event.clear()
# 啟動(dòng)所有線程
[t.start() for t in threads]
print('等待5s...')
time.sleep(5)
print('喚醒所有線程...')
event.set()
Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018
等待5s...
喚醒所有線程...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018

可見在所有線程都啟動(dòng)(start())后,并不會(huì)執(zhí)行完,而是都在self.event.wait()止住了,需要我們通過event.set()來給所有線程發(fā)送執(zhí)行指令才能往下執(zhí)行。

2 Condition

Condition和Event 是類似的,并沒有多大區(qū)別。

同樣,Condition也只需要掌握幾個(gè)函數(shù)即可。

cond = threading.Condition()
# 類似lock.acquire()
cond.acquire()
# 類似lock.release()
cond.release()
# 等待指定觸發(fā),同時(shí)會(huì)釋放對鎖的獲取,直到被notify才重新占有瑣。
cond.wait()
# 發(fā)送指定,觸發(fā)執(zhí)行
cond.notify()

舉個(gè)網(wǎng)上一個(gè)比較趣的捉迷藏的例子來看看

import threading, time
class Hider(threading.Thread):
 def __init__(self, cond, name):
  super(Hider, self).__init__()
  self.cond = cond
  self.name = name
 def run(self):
  time.sleep(1)  #確保先運(yùn)行Seeker中的方法
  self.cond.acquire()
  print(self.name + ': 我已經(jīng)把眼睛蒙上了')
  self.cond.notify()
  self.cond.wait()
  print(self.name + ': 我找到你了哦 ~_~')
  self.cond.notify() 
  self.cond.release()
  print(self.name + ': 我贏了')
class Seeker(threading.Thread):
 def __init__(self, cond, name):
  super(Seeker, self).__init__()
  self.cond = cond
  self.name = name
  
 def run(self):
  self.cond.acquire()
  self.cond.wait()
  print(self.name + ': 我已經(jīng)藏好了,你快來找我吧')
  self.cond.notify()
  self.cond.wait()
  self.cond.release()
  print(self.name + ': 被你找到了,哎~~~')
  
cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

通過cond來通信,阻塞自己,并使對方執(zhí)行。從而,達(dá)到有順序的執(zhí)行。
看下結(jié)果

hider:我已經(jīng)把眼睛蒙上了
seeker:  我已經(jīng)藏好了,你快來找我吧
hider:我找到你了 ~_~
hider:我贏了
seeker:  被你找到了,哎~~~

3 Queue隊(duì)列

最后一個(gè),隊(duì)列,它是本節(jié)的重點(diǎn),因?yàn)樗俏覀內(nèi)粘i_發(fā)中最使用頻率最高的。

從一個(gè)線程向另一個(gè)線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊(duì)列了。創(chuàng)建一個(gè)被多個(gè)線程共享的 Queue 對象,這些線程通過使用put()get() 操作來向隊(duì)列中發(fā)送和獲取元素。

同樣,對于Queue,我們也只需要掌握幾個(gè)函數(shù)即可。

from queue import Queue
# maxsize默認(rèn)為0,不受限
# 一旦>0,而消息數(shù)又達(dá)到限制,q.put()也將阻塞
q = Queue(maxsize=0)
# 默認(rèn)阻塞程序,等待隊(duì)列消息,可設(shè)置超時(shí)時(shí)間
q.get(block=True, timeout=None)
# 發(fā)送消息:默認(rèn)會(huì)阻塞程序至隊(duì)列中有空閑位置放入數(shù)據(jù)
q.put(item, block=True, timeout=None)
# 等待所有的消息都被消費(fèi)完
q.join()
# 通知隊(duì)列任務(wù)處理已經(jīng)完成,當(dāng)所有任務(wù)都處理完成時(shí),join() 阻塞將會(huì)解除
q.task_done()

以下三個(gè)方法,知道就好,一般不需要使用

# 查詢當(dāng)前隊(duì)列的消息個(gè)數(shù)
q.qsize()
# 隊(duì)列消息是否都被消費(fèi)完,返回 True/False
q.empty()
# 檢測隊(duì)列里消息是否已滿
q.full()

函數(shù)會(huì)比之前的多一些,同時(shí)也從另一方面說明了其功能更加豐富。

我來舉個(gè)老師點(diǎn)名的例子。

# coding=utf-8
# /usr/bin/env python
'''
Author: wangbm
Email: wongbingming@163.com
Wechat: mrbensonwon
Blog: python-online.cn
公眾號(hào):Python編程時(shí)光
date: 2020/9/20 下午7:30
desc: 
'''
__author__ = 'wangbm'
from queue import Queue
from threading import Thread
import time
class Student:
 def __init__(self, name):
  self.name = name
 def speak(self):
  print("{}:到!".format(self.name))
class Teacher:
 def __init__(self, queue):
  super().__init__()
  self.queue=queue
 def call(self, student_name):
  if student_name == "exit":
print("點(diǎn)名結(jié)束,開始上課..")
  else:
print("老師:{}來了沒?".format(student_name))
# 發(fā)送消息,要點(diǎn)誰的名
  self.queue.put(student_name)
class CallManager(Thread):
 def __init__(self, queue):
  super().__init__()
  self.students = {}
  self.queue = queue
 def put(self, student):
  self.students.setdefault(student.name, student)
 def run(self):
  while True:
# 阻塞程序,時(shí)刻監(jiān)聽老師,接收消息
student_name = queue.get()
if student_name == "exit":
 break
elif student_name in self.students:
 self.students[student_name].speak()
else:
 print("老師,咱班,沒有 {} 這個(gè)人".format(student_name))
queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name="小明")
s2 = Student(name="小亮")
cm = CallManager(queue)
cm.put(s1)
cm.put(s2)
cm.start()
print('開始點(diǎn)名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小亮')
time.sleep(1)
teacher.call("exit")

運(yùn)行結(jié)果如下

開始點(diǎn)名~
老師:小明來了沒?
小明:到!
老師:小亮來了沒?
小亮:到!
點(diǎn)名結(jié)束,開始上課..

其實(shí) queue 還有一個(gè)很重要的方法,Queue.task_done()

如果不明白它的原理,我們在寫程序,就很有可能卡死。

當(dāng)我們使用 Queue.get() 從隊(duì)列取出數(shù)據(jù)后,這個(gè)數(shù)據(jù)有沒有被正常消費(fèi),是很重要的。

如果數(shù)據(jù)沒有被正常消費(fèi),那么Queue會(huì)認(rèn)為這個(gè)任務(wù)還在執(zhí)行中,此時(shí)你使用 Queue.join() 會(huì)一直阻塞,即使此時(shí)你的隊(duì)列里已經(jīng)沒有消息了。

那么如何解決這種一直阻塞的問題呢?

就是在我們正常消費(fèi)完數(shù)據(jù)后,記得調(diào)用一下 Queue.task_done(),說明隊(duì)列這個(gè)任務(wù)已經(jīng)結(jié)束了。

當(dāng)隊(duì)列內(nèi)部的任務(wù)計(jì)數(shù)器歸于零時(shí),調(diào)用 Queue.join() 就不會(huì)再阻塞了。

要理解這個(gè)過程,請參考 http://python.iswbm.com/en/latest/c02/c02_06.html 里自定義線程池的的例子。

4 總結(jié)一下

學(xué)習(xí)了以上三種通信方法,我們很容易就能發(fā)現(xiàn)EventCondition 是threading模塊原生提供的模塊,原理簡單,功能單一,它能發(fā)送 TrueFalse 的指令,所以只能適用于某些簡單的場景中。

Queue則是比較高級(jí)的模塊,它可能發(fā)送任何類型的消息,包括字符串、字典等。其內(nèi)部實(shí)現(xiàn)其實(shí)也引用了Condition模塊(譬如putget函數(shù)的阻塞),正是其對Condition進(jìn)行了功能擴(kuò)展,所以功能更加豐富,更能滿足實(shí)際應(yīng)用。

以上就是Python并發(fā)編程線程消息通信機(jī)制詳解的詳細(xì)內(nèi)容,更多關(guān)于Python并發(fā)線程消息通信機(jī)制的資料請關(guān)注本站其它相關(guān)文章!

版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。

相關(guān)文章

實(shí)時(shí)開通

自選配置、實(shí)時(shí)開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對1客戶咨詢顧問

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部