人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

Apache?Kafka?分區(qū)重分配的實(shí)現(xiàn)原理解析

發(fā)布日期:2022-07-15 19:38 | 文章來源:源碼之家

本文作者為中國移動(dòng)云能力中心大數(shù)據(jù)團(tuán)隊(duì)軟件開發(fā)工程師孫大鵬,本文結(jié)合 2.0.0 版本的 Kafka 源碼,詳細(xì)介紹了 Kafka 分區(qū)副本重分配的流程和邏輯,供大家參考。

一、

Kafka 是由 Apache 軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái),旨在提供一個(gè)統(tǒng)一的、高吞吐、低延遲的實(shí)時(shí)數(shù)據(jù)處理平臺(tái)。其持久化層本質(zhì)上是一個(gè)“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊(duì)列”,這使它作為企業(yè)級基礎(chǔ)設(shè)施來處理流式數(shù)據(jù)非常有價(jià)值。

在 Kafka 中,用 topic 來對消息進(jìn)行分類,每個(gè)進(jìn)入到 Kafka 的信息都會(huì)被放到一個(gè) topic 下,同時(shí)每個(gè) topic 中的消息又可以分為若干 partition 以此來提高消息的處理效率。存儲(chǔ)消息數(shù)據(jù)的主機(jī)服務(wù)器被命名為 broker。通常為了保證數(shù)據(jù)的可靠性,數(shù)據(jù)是以多副本的形式保存在不同 broker 的不同磁盤上的。對于每一個(gè) topic 的每一個(gè) partition,如果多個(gè)副本之間完成了數(shù)據(jù)同步,保證了數(shù)據(jù)的一致性,則此時(shí)的多個(gè)副本所在的 broker 的集合稱為 Isr。同一時(shí)間,某個(gè) topic 的某個(gè) partition 的多個(gè)副本中僅有一個(gè)對外提供服務(wù),此時(shí)對外提供服務(wù)的 broker 被認(rèn)定為該 partition 的 leader,客戶端的請求都集中到 leader 上。

對于 2 副本 3 分區(qū)的 topic 其描述信息及存儲(chǔ)狀態(tài)如下所示:

test的描述信息:
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:min.insync.replic
as=1
Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

test的副本分布

健康狀態(tài)的 Kafka 集群,對于每個(gè) topic 的每個(gè) partition,其 Isr 都應(yīng)該等于預(yù)期的副本集合(后面均已 Replicas 表示),但在實(shí)際場景中,不可避免的存在磁盤/主機(jī)故障,或者 由于某些原因需要將部分 broker 節(jié)點(diǎn)下線的情況,此時(shí)就需要將故障/要下線的 broker 從 Replicas 中移除。對此 Kafka 提供了 kafka-reassign-partitions 工具來進(jìn)行手動(dòng)的分區(qū)副本遷移。

二、工具的使用

在 Kafka 的根路徑下,通過執(zhí)行如下命令,來完成分區(qū)副本的重分配:

./bin/kafka‐reassign‐partitions.sh ‐‐zookeeper localhost:2181/kafka ‐‐reassignment‐json‐file reassign‐topic.json ‐‐execute

其中:reassign‐topic.json 文件指定了分區(qū)副本的分布情況,示例如下:

{   
"version": 1,   
"partitions": [       
{         
"topic": "test",         
"partition": 2,         
"replicas": [            
2,             
1         
],         
"log_dirs": [             
"any",             
"any"         
]        
} 
}

文件中指明了將 topic=test,partition=2 的分區(qū)的兩副本分別移動(dòng)到 brokerId=2 和 brokerId=1 的節(jié)點(diǎn)的任意磁盤路徑上。

下面將結(jié)合 2.0.0 版本的 Kafka 源碼簡單的介紹下 Kafka 分區(qū)副本重分配的流程和邏輯。

三、元數(shù)據(jù)管理及協(xié)調(diào)器

在開始之前先簡單介紹下在 Kafka 分區(qū)副本重分配中涉及到的兩個(gè)概念:ZooKeeper 和 Kafka Controller。

3.1 ZooKeeper

Kafka 的元數(shù)據(jù),是存儲(chǔ)在ZooKeeper中的。ApacheZooKeeper是一個(gè)提供高可靠性的分布式協(xié)調(diào)服務(wù)框架。它使用的數(shù)據(jù)模型類似于文件系統(tǒng)的樹形結(jié)構(gòu),根目錄也是以“/”開始。該結(jié)構(gòu)上的每個(gè)節(jié)點(diǎn)被稱為 znode,用來保存一些元數(shù)據(jù)協(xié)調(diào)信息。同時(shí) ZooKeeper 賦予客戶端監(jiān)控 znode 變更的能力,即所謂的 Watch 通知功能。一旦 znode 節(jié)點(diǎn)被創(chuàng)建、刪除,子節(jié)點(diǎn)數(shù)量發(fā)生變化,或是 znode 所存的數(shù)據(jù)本身變更, ZooKeeper 會(huì)通過節(jié)點(diǎn)變更監(jiān)聽器 (ChangeHandler) 的方式顯式通知客戶端以便客戶端 觸發(fā)對應(yīng)的處理操作。

3.2 Kafka Controller

Kafka Controller 是 Apache Kafka 的核心組件,它的主要作用是在 Apache ZooKeeper 的幫助下管理和協(xié)調(diào)整個(gè) Kafka 集群。集群中任意一臺(tái) Broker 都能充當(dāng)控制器的角色,但是,在運(yùn)行過程中,只能有一個(gè) Broker 成為控制器,行使其管理和協(xié)調(diào)的職責(zé)。

四、分區(qū)重分配流程分析

Kafka 的分區(qū)重分配就是在 client、broker 和 controller 的協(xié)同運(yùn)行下完成的。即:

1.客戶端發(fā)起分區(qū)重分配任務(wù),在ZooKeeper中創(chuàng)建/admin/reassign_partitions 節(jié)點(diǎn),然 后向涉及的 broker 發(fā)送 alterReplicaLogDirs 請求

2.controller 監(jiān)測到ZooKeeper中/admin/reassign_partitions 的變化,觸發(fā) Kafka 分區(qū)元 數(shù)據(jù)的變更維護(hù)操作

3.broker 接收到客戶端發(fā)送的 alterReplicaLogDirs 請求,根據(jù)具體任務(wù)內(nèi)容在服務(wù)端實(shí)際完成分區(qū)副本移動(dòng)

流程總結(jié)如下圖所示:

下面將針對這三部分分別展開介紹:

4.1 kafka-reassign-partitions 客戶端

分區(qū)重分配任務(wù)是由客戶端發(fā)起的,其入口主類為 ReassignPartitionsCommand.scala 中,調(diào)用 executeAssignment 方法??蛻舳说?executeAssignment 方法主要完成了如下操作:

1.解析 json 文件并進(jìn)行相關(guān)校驗(yàn)
•讀取 json 文件內(nèi)容,校驗(yàn)“partitions”的“version”,僅為 1 時(shí),繼續(xù)執(zhí)行副本重分 配
•校驗(yàn)分區(qū)副本數(shù)和副本數(shù)據(jù)路徑數(shù)是否一致
•校驗(yàn) partition/replica 是否為空/重復(fù)
2.檢查待重分配的分區(qū)在集群中是否存在(根據(jù) zk 中的/brokers/topics/${topic})
3.檢查確認(rèn)所有目標(biāo) broker 均在線(zk 中/brokers/ids 的子 znode 列表)
4.檢查是否已存在分區(qū)副本重分配任務(wù),如果已存在相關(guān)任務(wù),則退出
5.將分區(qū)重分配任務(wù)記錄到 zk 中,即在 zk 中創(chuàng)建/admin/reassign_partitions,以便 controller 可以發(fā)現(xiàn)并協(xié)調(diào) broker 進(jìn)行相關(guān)操作
6.根據(jù)解析的 json 內(nèi)容,逐個(gè) topic 向相關(guān)的 broker 發(fā)送 alterReplicaLogDirs 請求

客戶端的處理邏輯可總結(jié)為如下流程圖:

4.2 controller 維護(hù)分區(qū)的元數(shù)據(jù)信息

在 controller 啟動(dòng)時(shí)會(huì)創(chuàng)建 partitionReassignmentHandler,kafkaController 主線程回調(diào) onControllerFailover 時(shí),檢測到/admin/reassign_partitions 發(fā)生變化時(shí),觸發(fā)分區(qū)副本重分配操作,在 maybeTriggerPartitionReassignment 中通過調(diào)用 onPartitionReassignment 真正執(zhí)行分區(qū)副本重分配。在 onPartitionReassignment 中定 義了三個(gè)概念:

•RAR:指定的分區(qū)副本放置策略
•OAR:原始的分區(qū)副本放置策略
•AR:當(dāng)前的分區(qū)副本放置策略

onPartitionReassignment 的執(zhí)行過程可以總結(jié)為如下步驟:

檢查指定的分區(qū)副本是否處在 isr 中,如果不在則執(zhí)行以下前 3 步,否則直接執(zhí)行第 4 步

1.在 zk 中將 AR 更新為 RAR+OAR (/broker/topics/${topicName})
2.向所有副本(RAR+OAR)中發(fā)送 LeaderAndIsr 請求
3.將 RAR-OAR 的副本狀態(tài)置為 NewReplica,等待 NewReplica 中的數(shù)據(jù)與 leader 中的數(shù)據(jù) 完成同步
4.等待直到所有 RAR 中的副本完成與 leader 的同步
5.將所有 RAR 的副本置為 OnlineReplica 狀態(tài)
6.將 RAR 作為 AR
7.如果當(dāng)前的 leader 不在 RAR 中,發(fā)送 LeaderAndIsr Request 從 RAR 中選出一個(gè)新的 leader;如果當(dāng)前 leader 在 RAR 中,檢查 leader 狀態(tài),如果 leader 健康則更新 LeaderEpoch,否則重新選擇 leader
8.將 OAR-RAR 的副本置為 Offline 狀態(tài)
9.將 OAR-RAR 的副本置為 NonExistentReplica 狀態(tài)(真實(shí)刪除對應(yīng)的分區(qū)副本)
10.將 zk 中的 AR 置為 RAR(/brokers/topics/${topicName}數(shù)據(jù)格式:{"version":1,"partitions":{"0":[${brokerId}]}})
11.更新 zk 中/admin/reassign_partitions 的值,將完成遷移的分區(qū)刪除
12.同步所有 broker,更新元數(shù)據(jù)信息

邏輯流程圖如下:

4.3 broker 端數(shù)據(jù)跨路徑遷移

底層數(shù)據(jù)跨路徑遷移,是由 broker 端完成的,broker 接收到客戶端發(fā)來的 ALTER_REPLICA_LOG_DIRS 請求后,調(diào)用 alterReplicaLogDirs 方法,相關(guān)流程如下:

1.確保目的路徑/待移動(dòng)分區(qū)在線
2.如果當(dāng)前分區(qū)副本的 log 路徑不存在給定的目的路徑并且 futureLogs(用于跨路徑數(shù)據(jù)遷移的中間過程)也不包含目的路徑,則在內(nèi)存中記錄當(dāng)前分區(qū)副本和目的 logDir,即標(biāo)記那些需要進(jìn)行遷移的分區(qū)副本路徑
3.對于需要移動(dòng)的分區(qū)副本,目的 broker 的路徑中創(chuàng)建 future Log
4.停止當(dāng)前 Log 的清理工作,等待 future Log 同步完再清理
5.創(chuàng)建 ReplicaAlterLogDirsThread,逐個(gè) topic 逐個(gè) partition 獲取 fetchOffset、 logStartOffset 、fetchSize 等數(shù)據(jù)構(gòu)造 Fetch 請求
6.通過 ReplicaManager.fetchMessages 從分區(qū)副本 leader 獲取數(shù)據(jù),完成數(shù)據(jù)同步

更詳細(xì)的處理流程如下圖所示:

五、總結(jié)

Kafka 分區(qū)重分配,通過 kafka-reassign-partitions 啟動(dòng)任務(wù),將任務(wù)記錄在元數(shù)據(jù)管理器ZooKeeper中,Kafka controller 通過對ZooKeeper的監(jiān)測,發(fā)現(xiàn)相關(guān)任務(wù)通過和 broker 的交互按序處理相關(guān)的遷移任務(wù),同時(shí) controller 實(shí)時(shí)維護(hù)ZooKeeper中的元數(shù)據(jù)信息并進(jìn)行相關(guān)變化的記錄,保證在重分配過程中,不影響 topic 分區(qū)的正常使用,在任務(wù)完成后,再由 controller 負(fù)責(zé)ZooKeeper中重分配任務(wù)標(biāo)記的清理,以便客戶端驗(yàn)證重分配任務(wù)的結(jié)果。

到此這篇關(guān)于ApacheKafka分區(qū)重分配的實(shí)現(xiàn)原理解析的文章就介紹到這了,更多相關(guān)ApacheKafka分區(qū)重分配內(nèi)容請搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!

香港服務(wù)器租用

版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。

實(shí)時(shí)開通

自選配置、實(shí)時(shí)開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對1客戶咨詢顧問

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部