人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

Ruby3多線程并行Ractor使用方法詳解

發(fā)布日期:2022-07-15 19:05 | 文章來源:源碼中國

Ruby 3 Ractor官方手冊:https://github.com/ruby/ruby/blob/master/doc/ractor.md

在Ruby3之前,使用Thread來創(chuàng)建新的線程,但這種方式創(chuàng)建的多線程是并發(fā)而非并行的,MRI有一個(gè)全局解釋器鎖GIL來控制同一時(shí)刻只能有一個(gè)線程在執(zhí)行:

# main Thread
t1 = Thread.new do 
  # new Thread
  sleep 3
end
t1.join

Ruby3通過Ractor(Ruby Actor,Actor模型通過消息傳遞的方式來修改狀態(tài))支持真正的多線程并行,多個(gè)Ractor之間可并行獨(dú)立運(yùn)行。

# main Ractor
# 創(chuàng)建一個(gè)可與main Ractor并行運(yùn)行的Ractor
r = Ractor.new do
  sleep 2
  Ractor.yield "hello"
end
puts r.take

需注意,每個(gè)Ractor中至少有一個(gè)原生Ruby線程,但每個(gè)Ractor內(nèi)部都擁有獨(dú)立的GIL,使得Ractor內(nèi)部在同一時(shí)刻最多只能有一個(gè)線程在運(yùn)行。從這個(gè)角度來看,Ractor實(shí)際上是解釋器線程,每個(gè)解釋器線程擁有一個(gè)全局解釋器鎖。

如果main Ractor退出,則其他Ractor也會(huì)收到退出信號(hào),就像main Thread退出時(shí),其他Thread也會(huì)退出一樣。

創(chuàng)建Ractor

使用Ractor.new創(chuàng)建一個(gè)Ractor實(shí)例,創(chuàng)建實(shí)例時(shí)需指定一個(gè)語句塊,該語句塊中的代碼會(huì)在該Ractor中運(yùn)行。

r = Ractor.new do
  puts "new Ractor"
end

可在new方法的參數(shù)上為該Ractor實(shí)例指定名稱:

r = Ractor.new(name: "ractor1") do
  puts "new Ractor"
end
puts r.name  # ractor 1

new方法也可指定其他參數(shù),這些參數(shù)必須在name參數(shù)之前,且這些參數(shù)將直接原樣傳遞給語句塊參數(shù):

arr = [11, 22, 33]
r = Ractor.new(arr, name: "r1") do |arr|
  puts "arr"
end
sleep 1

關(guān)于new的參數(shù),稍后還會(huì)有解釋。

可使用Ractor.current獲取當(dāng)前的Ractor實(shí)例,使用Ractor.count獲取當(dāng)前存活的Ractor實(shí)例數(shù)量。

Ractor之間傳遞消息

Ractor傳遞消息的方式分兩種:

  • Push方式:向某個(gè)特定的Ractor實(shí)例推送消息,可使用r.send(Msg)或別名r << Msg向該Ractor實(shí)例傳送消息,并在該Ractor實(shí)例內(nèi)部使用Ractor.receive或別名Ractor.recv或它們的同名私有方法來接收推送進(jìn)來的消息
    • Ractor還提供了Ractor.receive_if {expr}方法,表示只在expr為true時(shí)才接收消息,receive等價(jià)于receive_if {true}
  • Pull方式:從某個(gè)特定的Ractor實(shí)例拉取消息,可在該Ractor實(shí)例內(nèi)部使用Ractor.yield向外傳送消息,并在需要的地方使用r.take獲取傳輸出來的消息
    • Ractor.new的語句塊返回值,相當(dāng)于Ractor.yield,它也可被r.take接收

因此,對于Push方式,要求知道消息傳遞的目標(biāo)Ractor,對于Pull方式,要求知道消息的來源Ractor。

# yield + take
r = Ractor.new {Ractor.yield "hello"}
puts r.take
# send + receive
r1 = Ractor.new do
  # Ractor.receive或Ractor.recv
  # 或同名私有方法:receive、recv
  puts Ractor.receive
end
r1.send("hello")
r1.take # 本次take取得r1語句塊的返回值,即puts的返回值nil

使用new方法創(chuàng)建Ractor實(shí)例時(shí),可指定new的參數(shù),這些參數(shù)會(huì)被原樣傳遞給Ractor的語句塊參數(shù)。

arr = [11, 22, 33]
r = Ractor.new(arr) { |arr| ...}

實(shí)際上,new的參數(shù)等價(jià)于在Ractor語句塊的開頭使用了Ractor.receive接收消息:

r = Ractor.new 'ok' { |msg| msg }
r.take #=> 'ok'
# 基本等價(jià)于
r = Ractor.new do
  msg = Ractor.receive
  msg
end
r.send 'ok'
r.take #=> 'ok'

消息端口

Ractor之間傳遞消息時(shí),實(shí)際上是通過Ractor的消息端口進(jìn)行傳遞的。

每個(gè)Ractor都有自己的incoming port和outgoing port:

  • incoming port:是該Ractor接收消息的端口,r.sendRactor.receive使用該端口
    • 每個(gè)incoming port都連接到一個(gè)大小不限的隊(duì)列上
    • r.send傳入的消息都會(huì)寫入該隊(duì)列,由于該隊(duì)列大小不限,因此r.send從不阻塞
    • Ractor.receive從該隊(duì)列彈出消息,當(dāng)隊(duì)列為空時(shí),Ractor.receive被阻塞直到新消息出現(xiàn)
    • 可使用r.close_incoming關(guān)閉incoming port,關(guān)閉該端口后,r.send將直接報(bào)錯(cuò),Ractor.receive將先從隊(duì)列中取數(shù)據(jù),當(dāng)隊(duì)列為空后,再調(diào)用Ractor.receive將報(bào)錯(cuò)
  • outgoing port:是該Ractor向外傳出消息的端口,Ractor.yieldr.take使用該端口
    • Ractor.yield或Ractor語句塊返回時(shí),消息從outgoing port流出
    • 當(dāng)沒有r.take接收消息時(shí),r內(nèi)部的Ractor.yield將被阻塞
    • 當(dāng)r內(nèi)部沒有Ractor.yield時(shí),r.take將被阻塞
    • Ractor.yield從outgoing port傳出的消息可被任意多個(gè)r.take等待,但只有一個(gè)r.take可獲取到該消息
    • 可使用r.close_outgoing關(guān)閉outgoing port,關(guān)閉該端口后,再調(diào)用r.takeRactor.yield將直接報(bào)錯(cuò)。如果r.take正被阻塞(等待Ractor.yield傳出消息),關(guān)閉outgoing port操作將取消所有等待中的take并報(bào)錯(cuò)

Ractor.select等待消息

可使用Ractor.select(r1,r2,r3...)等待一個(gè)或多個(gè)Ractor實(shí)例outgoing port上的消息(因此,select主要用于等待Ractor.yield的消息),等待到第一個(gè)消息后立即返回。

Ractor.select的返回值格式為[r, obj],其中:

  • r表示等待到的那個(gè)Ractor實(shí)例
  • obj表示接收到的消息對象

例如:

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
as = []
# Wait for r1 or r2's Ractor.yield
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj
# Second try (rs only contain not-closed ractors)
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj
as.sort == ['r1', 'r2'] #=> true

通常來說,會(huì)使用Ractor.select來輪詢等待多個(gè)Ractor實(shí)例的消息,通用化的處理流程參考如下:

# 充當(dāng)管道功能的Ractor:接收消息并發(fā)送出去,并不斷循環(huán)
pipe = Ractor.new do
  loop do
 Ractor.yield Ractor.receive
  end
end
RN = 10
# rs變量保存了10個(gè)Ractor實(shí)例
# 每個(gè)Ractor實(shí)例都從管道pipe中取一次消息然后由本Ractor發(fā)送出去
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
 msg = pipe.take
 msg # ping-pong
  end
}
# 向管道中發(fā)送10個(gè)數(shù)據(jù)
RN.times{|i| pipe << i}
# 輪詢等待10個(gè)Ractor實(shí)例的outgoing port
# 每等待成功一次,從rs中刪除所等待到的Ractor實(shí)例,
# 然后繼續(xù)等待剩下的Ractor實(shí)例
RN.times.map{
  r, n = Ractor.select(*rs)
  rs.delete r
  n
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

此外,Ractor.select除了可等待消息外,也可以用來yield傳遞消息,更多用法參考官方手冊:Ractor.select。

Ractor并行時(shí)如何避免競態(tài)

多個(gè)Ractor之間是可并行運(yùn)行的,為了避免Ractor之間傳遞數(shù)據(jù)時(shí)出現(xiàn)競態(tài)問題,Ractor采取了一些措施:

  • 對于不可變對象,它們可直接在Ractor之間共享,此時(shí)傳遞它們的引用
  • 對于可變對象,它們不可直接在Ractor之間共享,此時(shí)傳遞數(shù)據(jù)時(shí),默認(rèn)先按字節(jié)逐字節(jié)拷貝,然后后傳遞副本
  • 也可以顯式指定移動(dòng)數(shù)據(jù),將某份數(shù)據(jù)從Ractor1移動(dòng)到另一個(gè)Ractor2中,即轉(zhuǎn)移數(shù)據(jù)的所有權(quán)(參考Rust的所有權(quán)規(guī)則),轉(zhuǎn)移所有權(quán)后,原始所有者Ractor中將無法再訪問該數(shù)據(jù)

傳遞可共享對象:傳遞引用

可共享的對象:自動(dòng)傳遞它們的引用,效率高

  • 不可變對象可在Ractor之間直接共享(如Integer、symbol、true/false、nil),如:
    • i=123:i是可共享的
    • s="str".freeze:s是可共享的
    • h={c: Object}.freeze:h是可共享的,因?yàn)镺bject是一個(gè)類對象,類對象是可共享的
    • a=[1,[2],3].freeze:a不可共享,因?yàn)閮鼋Y(jié)后仍然包含可變的[2]
  • Class/Module對象,即類對象自身和模塊對象自身是可共享的
  • Ractor對象自身是可共享的

例如:

i = 33
r = Ractor.new do
  m = recv
  puts m.object_id
end
r.send(i)  # 傳遞i
r.take  # 等待Ractor執(zhí)行結(jié)束(語句塊返回)
puts i.object_id  # i傳遞后仍然可用
=begin
67
67
=end

值得注意的是,Ractor對象是可共享的,因此可將某個(gè)Ractor實(shí)例傳遞給另一個(gè)Ractor實(shí)例。例如:

pipe = Ractor.new do
  loop do
 Ractor.yield Ractor.receive
  end
end
RN = 10
rs = RN.times.map{|i|
  # pipe是一個(gè)Ractor實(shí)例,這里作為參數(shù)傳遞給其他的Ractor實(shí)例
  Ractor.new pipe, i do |pipe, i|
 pipe << i
  end
}
RN.times.map{
  pipe.take
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

傳遞不可共享對象:傳遞副本

絕大多數(shù)對象不是可直接共享的。在Ractor之間傳遞不可共享的對象時(shí),默認(rèn)會(huì)傳遞deep-copy后的副本,即按字節(jié)拷貝的方式拷貝該對象的每一個(gè)字節(jié)。這種方式效率較低。

例如:

arr = [11, 22, 33]  # 數(shù)組是可變的,不可共享
r = Ractor.new do
  m = recv
  puts "copied: #{m.object_id}"
end
r.send(arr)  # 傳遞數(shù)組,此時(shí)將逐字節(jié)拷貝數(shù)組
r.take
puts "origin: #{arr.object_id}"
=begin
copied: 60
origin: 80
=end

從結(jié)果看,兩個(gè)Ractor內(nèi)的arr不是同一個(gè)對象。

需注意,對于全局唯一的對象來說(比如數(shù)值、nil、false、true、symbol),逐字節(jié)拷貝時(shí)并不會(huì)拷貝它們。例如:

arr = %i[lang action sub]
r = Ractor.new do
  m = recv
  puts "copied: #{m.object_id}, #{m[0].object_id}, #{m[1].object_id}"
end
r.send(arr)
r.take
puts "origin: #{arr.object_id}, #{arr[0].object_id}, #{arr[1].object_id}"
=begin
copied: 60, 80, 1046748
origin: 100, 80, 1046748
=end

注意,Thread對象無法拷貝,因此無法在Ractor之間傳遞。

轉(zhuǎn)移數(shù)據(jù)所有權(quán)

還可以讓r.send(msg, move: true)Ractor.yield(msg, move: true)傳遞數(shù)據(jù)時(shí),明確表示要移動(dòng)而非拷貝數(shù)據(jù),即轉(zhuǎn)移數(shù)據(jù)的所有權(quán)(從原來的所有者Ractor實(shí)例轉(zhuǎn)移到目標(biāo)Ractor實(shí)例)。

無論是可共享還是不可共享的對象,都可以轉(zhuǎn)移所有權(quán),只不過轉(zhuǎn)移可共享對象的所有權(quán)沒有意義,因?yàn)檗D(zhuǎn)移之后,原所有者仍然擁有所有權(quán)。

因此,通常只對不可共享的數(shù)據(jù)來轉(zhuǎn)移所有權(quán),轉(zhuǎn)移所有權(quán)后,原所有者將無法訪問該數(shù)據(jù)。

str = "hello"
puts str.object_id
r = Ractor.new do
  m = recv
  puts m.object_id
end
r.send(str, move: true)  # 轉(zhuǎn)移str的所有權(quán)
r.take
#puts str.object_id  # 轉(zhuǎn)移所有權(quán)后再訪問str,將報(bào)錯(cuò)
=begin
60
80
=end

值得注意的是,移動(dòng)的本質(zhì)是內(nèi)存拷貝,它底層也一樣是逐字節(jié)拷貝原始數(shù)據(jù)的過程,所以移動(dòng)傳遞數(shù)據(jù)的效率和傳遞副本數(shù)據(jù)的效率是類似的。移動(dòng)傳遞和傳遞副本的區(qū)別之處在于所有權(quán),移動(dòng)傳遞后,原所有者Ractor實(shí)例將無法訪問該數(shù)據(jù),而拷貝傳遞方式則允許原所有者訪問。

注意,Thread對象無法轉(zhuǎn)移所有權(quán),因此無法在Ractor之間傳遞。

不可共享變成可共享:Ractor.make_shareable

對于不可共享的數(shù)據(jù)obj,可通過Ractor.make_shareable(obj)方法將其轉(zhuǎn)變?yōu)榭晒蚕淼臄?shù)據(jù),默認(rèn)轉(zhuǎn)變的方式是逐層次地遞歸凍結(jié)obj。也可指定額外的參數(shù)Ractor.make_shareable(obj, copy: true),此時(shí)將深拷貝obj得其副本,再讓副本(逐層遞歸凍結(jié))轉(zhuǎn)變?yōu)榭晒蚕頂?shù)據(jù)。

例如:

arr = %w[lang action sub]
puts arr.object_id
r = Ractor.new do
  m = recv
  puts m.object_id
end
r.send(Ractor.make_shareable(arr))
r.take
puts arr.object_id
puts arr.frozen?

輸出:

60
60
60
true

示例

工作者線程池:

require 'prime'
pipe = Ractor.new do
  loop do
 Ractor.yield Ractor.receive
  end
end
N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
 while n = pipe.take
Ractor.yield [n, n.prime?]
 end
  end
end
(1..N).each{|i|
  pipe << i
}
pp (1..N).map{
  _r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

Pipeline:

# pipeline with yield/take
r1 = Ractor.new do
  'r1'
end
r2 = Ractor.new r1 do |r1|
  r1.take + 'r2'
end
r3 = Ractor.new r2 do |r2|
  r2.take + 'r3'
end
p r3.take #=> 'r1r2r3'

更多關(guān)于Ruby3多線程并行Ractor使用方法請查看下面的相關(guān)鏈接

香港穩(wěn)定服務(wù)器

版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。

相關(guān)文章

實(shí)時(shí)開通

自選配置、實(shí)時(shí)開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對1客戶咨詢顧問

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部