人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動態(tài)

分析python并發(fā)網(wǎng)絡(luò)通信模型

發(fā)布日期:2022-03-10 19:27 | 文章來源:gibhub

一、常見模型分類

1.1、循環(huán)服務(wù)器模型

循環(huán)接收客戶端請求,處理請求。同一時刻只能處理一個請求,處理完畢后再處理下一個。

  • 優(yōu)點(diǎn):實(shí)現(xiàn)簡單,占用資源少
  • 缺點(diǎn):無法同時處理多個客戶端請求
  • 適用情況:處理的任務(wù)可以很快完成,客戶端無需長期占用服務(wù)端程序。udp比tcp更適合循環(huán)。

1.2、IO并發(fā)模型

利用IO多路復(fù)用,異步IO等技術(shù),同時處理多個客戶端IO請求。

  • 優(yōu)點(diǎn) : 資源消耗少,能同時高效處理多個IO行為
  • 缺點(diǎn) : 只能處理并發(fā)產(chǎn)生的IO事件,無法處理cpu計(jì)算
  • 適用情況:HTTP請求,網(wǎng)絡(luò)傳輸?shù)榷际荌O行為。

1.3、多進(jìn)程/線程網(wǎng)絡(luò)并發(fā)模型

每當(dāng)一個客戶端連接服務(wù)器,就創(chuàng)建一個新的進(jìn)程/線程為該客戶端服務(wù),客戶端退出時再銷毀該進(jìn)程/線程。

  • 優(yōu)點(diǎn):能同時滿足多個客戶端長期占有服務(wù)端需求,可以處理各種請求。
  • 缺點(diǎn): 資源消耗較大
  • 適用情況:客戶端同時連接量較少,需要處理行為較復(fù)雜情況。

二、基于fork的多進(jìn)程網(wǎng)絡(luò)并發(fā)模型

1.創(chuàng)建監(jiān)聽套接字

2.等待接收客戶端請求

3.客戶端連接創(chuàng)建新的進(jìn)程處理客戶端請求

4.原進(jìn)程繼續(xù)等待其他客戶端連接

5.如果客戶端退出,則銷毀對應(yīng)的進(jìn)程

from socket import *
import os
import signal
# 創(chuàng)建監(jiān)聽套接字
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)
# 客戶端服務(wù)函數(shù)
def handle(c):
  while True:
 data = c.recv(1024)
 if not data:
break
 print(data.decode())
 c.send(b'OK')
  c.close()
s = socket()  # tcp套接字
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)# 設(shè)置套接字端口重用
s.bind(ADDR)
s.listen(3)
signal.signal(signal.SIGCHLD,signal.SIG_IGN) # 處理僵尸進(jìn)程
print("Listen the port %d..." % PORT)
# 循環(huán)等待客戶端連接
while True:
  try:
 c,addr = s.accept()
  except KeyboardInterrupt:
 os._exit(0)
  except Exception as e:
 print(e)
 continue
  # 創(chuàng)建子進(jìn)程處理這個客戶端
  pid = os.fork()
  if pid == 0:  # 處理客戶端請求
 s.close()
 handle(c)
 os._exit(0)  # handle處理完客戶端請求子進(jìn)程也退出
  # 無論出錯或者父進(jìn)程都要循環(huán)回去接受請求
  # c對于父進(jìn)程沒用
  c.close()

三、基于threading的多線程網(wǎng)絡(luò)并發(fā)

1.創(chuàng)建監(jiān)聽套接字

2.循環(huán)接收客戶端連接請求

3.當(dāng)有新的客戶端連接創(chuàng)建線程處理客戶端請求

4.主線程繼續(xù)等待其他客戶端連接

5.當(dāng)客戶端退出,則對應(yīng)分支線程退出

from socket import *
from threading import Thread
import sys
# 創(chuàng)建監(jiān)聽套接字
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)
# 處理客戶端請求
def handle(c):
  while True:
 data = c.recv(1024)
 if not data:
break
 print(data.decode())
 c.send(b'OK')
  c.close()
s = socket()  # tcp套接字
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(ADDR)
s.listen(3)
print("Listen the port %d..."%PORT)
# 循環(huán)等待客戶端連接
while True:
  try:
 c,addr = s.accept()
  except KeyboardInterrupt:
 sys.exit("服務(wù)器退出")
  except Exception as e:
 print(e)
 continue
  # 創(chuàng)建線程處理客戶端請求
  t = Thread(target=handle, args=(c,))
  t.setDaemon(True)# 父進(jìn)程結(jié)束則所有進(jìn)程終止
  t.start()

四、ftp 文件服務(wù)器

4.1、項(xiàng)目功能

客戶端有簡單的頁面命令提示:功能包含:

  • 查看服務(wù)器文件庫中的文件列表(普通文件)
  • 可以下載其中的某個文件到本地
  • 可以上傳客戶端文件到服務(wù)器文件庫

服務(wù)器需求 :

  • 允許多個客戶端同時操作
  • 每個客戶端可能回連續(xù)發(fā)送命令

技術(shù)分析:

  • tcp套接字更適合文件傳輸
  • 并發(fā)方案 ---》 fork 多進(jìn)程并發(fā)
  • 對文件的讀寫操作獲取
  • 文件列表 ----》 os.listdir()

粘包的處理

4.2、整體結(jié)構(gòu)設(shè)計(jì)

  • 服務(wù)器功能封裝在類中(上傳,下載,查看列表)
  • 創(chuàng)建套接字,流程函數(shù)調(diào)用 main()
  • 客戶端負(fù)責(zé)發(fā)起請求,接受回復(fù),展示
  • 服務(wù)端負(fù)責(zé)接受請求,邏輯處理

ftp server:

from socket import *
from threading import Thread
import os
import time
# 全局變量
HOST = '0.0.0.0'
PORT = 8080
ADDR = (HOST,PORT)
FTP = "/home/tarena/FTP/"  # 文件庫位置
# 創(chuàng)建文件服務(wù)器服務(wù)端功能類
class FTPServer(Thread):
  def __init__(self,connfd):
 self.connfd = connfd
 super().__init__()
  def do_list(self):
 # 獲取文件列表
 files = os.listdir(FTP)
 if not files:
self.connfd.send("文件庫為空".encode())
return
 else:
self.connfd.send(b'OK')
time.sleep(0.1)  # 防止和后面發(fā)送內(nèi)容粘包
 # 拼接文件列表
 files_ = ""
 for file in files:
if file[0] != '.' and \
  os.path.isfile(FTP+file):
  files_ += file + '\n'
 self.connfd.send(files_.encode())
  def do_get(self,filename):
 try:
fd = open(FTP+filename,'rb')
 except Exception:
self.connfd.send("文件不存在".encode())
return
 else:
self.connfd.send(b'OK')
time.sleep(0.1)
 # 文件發(fā)送
 while True:
data = fd.read(1024)
if not data:
  time.sleep(0.1)
  self.connfd.send(b'##')
  break
self.connfd.send(data)
  # 循環(huán)接收客戶端請求
  def run(self):
 while True:
data = self.connfd.recv(1024).decode()
if not data or data == 'Q':
  return 
elif data == 'L':
  self.do_list()
elif data[0] == 'G':# G filename
  filename = data.split(' ')[-1]
  self.do_get(filename)
# 網(wǎng)絡(luò)搭建
def main():
  # 創(chuàng)建套接字
  sockfd = socket()
  sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
  sockfd.bind(ADDR)
  sockfd.listen(3)
  print("Listen the port %d..."%PORT)
  while True:
 try:
connfd,addr = sockfd.accept()
print("Connect from",addr)
 except KeyboardInterrupt:
print("服務(wù)器程序退出")
return
 except Exception as e:
print(e)
continue
 # 創(chuàng)建新的線程處理客戶端
 client = FTPServer(connfd)
 client.setDaemon(True)
 client.start()# 運(yùn)行run方法

if __name__ == "__main__":
  main()

ftp client:

from socket import *
import sys
ADDR = ('127.0.0.1',8080) # 服務(wù)器地址
# 客戶端功能處理類
class FTPClient:
  def __init__(self,sockfd):
 self.sockfd = sockfd
  def do_list(self):
 self.sockfd.send(b'L')  # 發(fā)送請求
 # 等待回復(fù)
 data = self.sockfd.recv(128).decode()
 if data == 'OK':
# 一次接收文件列表字符串
data = self.sockfd.recv(4096)
print(data.decode())
 else:
print(data)
  def do_get(self,filename):
 # 發(fā)送請求
 self.sockfd.send(('G '+filename).encode())
 # 等待回復(fù)
 data = self.sockfd.recv(128).decode()
 if data == 'OK':
fd = open(filename,'wb')
# 接收文件
while True:
  data = self.sockfd.recv(1024)
  if data == b'##':
 break
  fd.write(data)
fd.close()
 else:
print(data)
  def do_quit(self):
 self.sockfd.send(b'Q')
 self.sockfd.close()
 sys.exit("謝謝使用")
# 創(chuàng)建客戶端網(wǎng)絡(luò)
def main():
  sockfd = socket()
  try:
 sockfd.connect(ADDR)
  except Exception as e:
 print(e)
 return
  ftp = FTPClient(sockfd) # 實(shí)例化對象
  # 循環(huán)發(fā)送請求
  while True:
 print("\n=========命令選項(xiàng)==========")
 print("****list****")
 print("**** get file ****")
 print("**** put file ****")
 print("****quit****")
 print("=============================")
 cmd = input("輸入命令:")
 if cmd.strip() == 'list':
ftp.do_list()
 elif cmd[:3] == 'get':
# get filename
filename = cmd.strip().split(' ')[-1]
ftp.do_get(filename)
 elif cmd[:3] == 'put':
# put ../filename
filename = cmd.strip().split(' ')[-1]
ftp.do_put(filename)
 elif cmd.strip() == 'quit':
ftp.do_quit()
 else:
print("請輸入正確命令")

if __name__ == "__main__":
  main()

五、IO并發(fā)

定義:在內(nèi)存中數(shù)據(jù)交換的操作被定義為IO操作,IO------輸入輸出

內(nèi)存和磁盤進(jìn)行數(shù)據(jù)交換: 文件的讀寫 數(shù)據(jù)庫更新

內(nèi)存和終端數(shù)據(jù)交換 :input print sys.stdin sys.stdout sys.stderr

內(nèi)存和網(wǎng)絡(luò)數(shù)據(jù)的交換: 網(wǎng)絡(luò)連接 recv send recvfrom

IO密集型程序 : 程序執(zhí)行中有大量的IO操作,而較少的cpu運(yùn)算操作。消耗cpu較少,IO運(yùn)行時間長

CPU(計(jì)算)密集型程序:程序中存在大量的cpu運(yùn)算,IO操作相對較少,消耗cpu大。

5.1、IO分類

IO分為:阻塞IO、非阻塞IO、IO多路復(fù)用、事件驅(qū)動IO、異步IO

阻塞IO

  • 定義: 在執(zhí)行IO操作時如果執(zhí)行條件不滿足則阻塞。阻塞IO是IO的默認(rèn)形態(tài)。
  • 效率: 阻塞IO是效率很低的一種IO。但是由于邏輯簡單所以是默認(rèn)IO行為。

阻塞情況:

  • 因?yàn)槟撤N執(zhí)行條件沒有滿足造成的函數(shù)阻塞  e.g. accept input recv
  • 處理IO的時間較長產(chǎn)生的阻塞狀態(tài)  e.g. 網(wǎng)絡(luò)傳輸, 大文件讀寫

非阻塞IO

定義 : 通過修改IO屬性行為, 使原本阻塞的IO變?yōu)榉亲枞臓顟B(tài)。

設(shè)置套接字為非阻塞IO

  • sockfd.setblocking(bool)
  • 功能: 設(shè)置套接字為非阻塞IO
  • 參數(shù): 默認(rèn)為True,表示套接字IO阻塞;設(shè)置為False則套接字IO變?yōu)榉亲枞?/li>

超時檢測 :設(shè)置一個最長阻塞時間,超過該時間后則不再阻塞等待。

  • sockfd.settimeout(sec)
  • 功能:設(shè)置套接字的超時時間
  • 參數(shù):設(shè)置的時間

5.2、IO多路復(fù)用

定義 :通過一個監(jiān)測,可以同時監(jiān)控多個IO事件的行為。當(dāng)哪個IO事件可以執(zhí)行,即讓這個IO事件發(fā)生。

rs, ws, xs = select(rlist, wlist, xlist[, timeout])  監(jiān)控IO事件,阻塞等待監(jiān)控的IO時間發(fā)生

參數(shù) :

  • rlist列表,存放(被動)等待處理的IO (接收)
  • wlist列表,存放主動處理的IO(發(fā)送)
  • xlist列表,存放出錯,希望去處理的IO(異常)
  • timeout 超時檢測

返回值:

  • rs列表rlist中準(zhǔn)備就緒的IO
  • ws列表wlist中準(zhǔn)備就緒的IO
  • xs列表xlist中準(zhǔn)備就緒的IO

select 實(shí)現(xiàn)tcp服務(wù)

1.將關(guān)注的IO放入對應(yīng)的監(jiān)控類別列表

2.通過select函數(shù)進(jìn)行監(jiān)控

3.遍歷select返回值列表,確定就緒IO事件

4.處理發(fā)生的IO事件

from socket import *
from select import select
# 創(chuàng)建一個監(jiān)聽套接字作為關(guān)注的IO
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# 設(shè)置關(guān)注列表
rlist = [s]
wlist = []
xlist = [s]
# 循環(huán)監(jiān)控IO
while True:
  rs,ws,xs = select(rlist,wlist,xlist)
  # 遍歷三個返回列表,處理IO
  for r in rs:
 # 根據(jù)遍歷到IO的不同使用if分情況處理
 if r is s:
c,addr = r.accept()
print("Connect from",addr)
rlist.append(c) # 增加新的IO事件
 # else為客戶端套接字就緒情況
 else:
data = r.recv(1024)
# 客戶端退出
if not data:
  rlist.remove(r) # 從關(guān)注列表移除
  r.close()
  continue # 繼續(xù)處理其他就緒IO
print("Receive:",data.decode())
# r.send(b'OK')
# 我們希望主動處理這個IO對象
wlist.append(r)
  for w in ws:
 w.send(b'OK')
 wlist.remove(w) # 使用后移除
  for x in xs:
 pass

注意:

  • wlist中如果存在IO事件,則select立即返回給ws
  • 處理IO過程中不要出現(xiàn)死循環(huán)占有服務(wù)端的情況
  • IO多路復(fù)用消耗資源較少,效率較高擴(kuò)展:

5.3、位運(yùn)算

將整數(shù)轉(zhuǎn)換為二進(jìn)制, 按照二進(jìn)制位進(jìn)行運(yùn)算符操作
& 按位與   | 按位或   ^ 按位異或   << 左移 >> 右移
11 1011    14 1110
(11 & 14 1010)   (11| 14 1111)  (11^ 14 0101)
11 << 2 ===> 44右側(cè)補(bǔ)0    14 >> 2 ===> 3 擠掉右側(cè)的數(shù)字

使用 :

  • 在做底層硬件時操作寄存器
  • 做標(biāo)志位的過濾

5.4、poll方法實(shí)現(xiàn)IO多路復(fù)用

創(chuàng)建poll對象:p = select.poll()

注冊關(guān)注的IO事件:p.register(fd,event)

  • fd要關(guān)注的IO
  • event 要關(guān)注的IO事件類型

常用類型:

  • POLLIN 讀IO事件(rlist)
  • POLLOUT 寫IO事件 (wlist)
  • POLLERR 異常IO (xlist)
  • POLLHUP 斷開連接

取消對IO的關(guān)注:p.unregister(fd)

參數(shù): IO對象或者IO對象的fileno

events = p.poll():

  • 功能:   阻塞等待監(jiān)控的IO事件發(fā)生
  • 返回值: 返回發(fā)生的IO事件

events是一個列表 [(fileno,evnet),(),()....]

每個元組為一個就緒IO,元組第一項(xiàng)是該IO的fileno,第二項(xiàng)為該IO就緒的事件類型

poll_server 步驟

1.創(chuàng)建套接字

2.將套接字register

3.創(chuàng)建查找字典,并維護(hù)

4.循環(huán)監(jiān)控IO發(fā)生

5.處理發(fā)生的IO

from socket import *
from select import *
# 創(chuàng)建套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# 創(chuàng)建poll對象關(guān)注s
p = poll()
# 建立查找字典,用于通過fileno查找IO對象
fdmap = {s.fileno():s}
# 關(guān)注s
p.register(s,POLLIN|POLLERR)
# 循環(huán)監(jiān)控
while True:
  events = p.poll()
  # 循環(huán)遍歷發(fā)生的事件?。妫?->fileno
  for fd,event in events:
 # 區(qū)分事件進(jìn)行處理
 if fd == s.fileno():
c,addr = fdmap[fd].accept()
print("Connect from",addr)
# 添加新的關(guān)注IO
p.register(c,POLLIN|POLLERR)
fdmap[c.fileno()] = c # 維護(hù)字典
 # 按位與判定是POLLIN就緒
 elif event & POLLIN:
data = fdmap[fd].recv(1024)
if not data:
  p.unregister(fd) # 取消關(guān)注
  fdmap[fd].close()
  del fdmap[fd]  # 從字典中刪除
  continue
print("Receive:",data.decode())
fdmap[fd].send(b'OK')

5.5、epoll方法

1. 使用方法 : 基本與poll相同

  • 生成對象改為 epoll()
  • 將所有事件類型改為EPOLL類型

2. epoll特點(diǎn)

  • epoll 效率比select poll要高
  • epoll 監(jiān)控IO數(shù)量比select要多
  • epoll 的觸發(fā)方式比poll要多 (EPOLLET邊緣觸發(fā))
from socket import *
from select import *
# 創(chuàng)建套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# 創(chuàng)建epoll對象關(guān)注s
ep = epoll()
# 建立查找字典,用于通過fileno查找IO對象
fdmap = {s.fileno():s}
# 關(guān)注s
ep.register(s,EPOLLIN|EPOLLERR)
# 循環(huán)監(jiān)控
while True:
  events = ep.poll()
  # 循環(huán)遍歷發(fā)生的事件 fd-->fileno
  for fd,event in events:
 print("親,你有IO需要處理哦")
 # 區(qū)分事件進(jìn)行處理
 if fd == s.fileno():
c,addr = fdmap[fd].accept()
print("Connect from",addr)
# 添加新的關(guān)注IO
# 將觸發(fā)方式變?yōu)檫吘売|發(fā)
ep.register(c,EPOLLIN|EPOLLERR|EPOLLET)
fdmap[c.fileno()] = c # 維護(hù)字典
 # 按位與判定是EPOLLIN就緒
 # elif event & EPOLLIN:
 #data = fdmap[fd].recv(1024)
 #if not data:
 #  ep.unregister(fd) # 取消關(guān)注
 #  fdmap[fd].close()
 #  del fdmap[fd]  # 從字典中刪除
 #  continue
 #print("Receive:",data.decode())
 #fdmap[fd].send(b'OK')

以上就是分析python并發(fā)網(wǎng)絡(luò)通信模型的詳細(xì)內(nèi)容,更多關(guān)于python 并發(fā)網(wǎng)絡(luò)通信模型的資料請關(guān)注本站其它相關(guān)文章!

美國服務(wù)器租用

版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。

相關(guān)文章

實(shí)時開通

自選配置、實(shí)時開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對1客戶咨詢顧問

在線
客服

在線客服:7*24小時在線

客服
熱線

400-630-3752
7*24小時客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部