人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動態(tài)

深入剖析美團(tuán)基于Flume的網(wǎng)站日志收集系統(tǒng)

發(fā)布日期:2021-12-17 08:58 | 文章來源:CSDN

美團(tuán)的日志收集系統(tǒng)負(fù)責(zé)美團(tuán)的所有業(yè)務(wù)日志的收集,并分別給Hadoop平臺提供離線數(shù)據(jù)和Storm平臺提供實時數(shù)據(jù)流。美團(tuán)的日志收集系統(tǒng)基于Flume設(shè)計和搭建而成。

《基于Flume的美團(tuán)日志收集系統(tǒng)》將分兩部分給讀者呈現(xiàn)美團(tuán)日志收集系統(tǒng)的架構(gòu)設(shè)計和實戰(zhàn)經(jīng)驗。

第一部分架構(gòu)和設(shè)計,將主要著眼于日志收集系統(tǒng)整體的架構(gòu)設(shè)計,以及為什么要做這樣的設(shè)計。

第二部分改進(jìn)和優(yōu)化,將主要著眼于實際部署和使用過程中遇到的問題,對Flume做的功能修改和優(yōu)化等。

1 日志收集系統(tǒng)簡介
日志收集是大數(shù)據(jù)的基石。

許多公司的業(yè)務(wù)平臺每天都會產(chǎn)生大量的日志數(shù)據(jù)。收集業(yè)務(wù)日志數(shù)據(jù),供離線和在線的分析系統(tǒng)使用,正是日志收集系統(tǒng)的要做的事情。高可用性,高可靠性和可擴(kuò)展性是日志收集系統(tǒng)所具有的基本特征。

目前常用的開源日志收集系統(tǒng)有Flume, Scribe等。Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),目前已經(jīng)是Apache的一個子項目。Scribe是Facebook開源的日志收集系統(tǒng),它為日志的分布式收集,統(tǒng)一處理提供一個可擴(kuò)展的,高容錯的簡單方案。

2 常用的開源日志收集系統(tǒng)對比
下面將對常見的開源日志收集系統(tǒng)Flume和Scribe的各方面進(jìn)行對比。對比中Flume將主要采用Apache下的Flume-NG為參考對象。同時,美團(tuán)將常用的日志收集系統(tǒng)分為三層(Agent層,Collector層和Store層)來進(jìn)行對比。

3 美團(tuán)日志收集系統(tǒng)架構(gòu)
美團(tuán)的日志收集系統(tǒng)負(fù)責(zé)美團(tuán)的所有業(yè)務(wù)日志的收集,并分別給Hadoop平臺提供離線數(shù)據(jù)和Storm平臺提供實時數(shù)據(jù)流。美團(tuán)的日志收集系統(tǒng)基于Flume設(shè)計和搭建而成。目前每天收集和處理約T級別的日志數(shù)據(jù)。

下圖是美團(tuán)的日志收集系統(tǒng)的整體框架圖。

a. 整個系統(tǒng)分為三層:Agent層,Collector層和Store層。其中Agent層每個機(jī)器部署一個進(jìn)程,負(fù)責(zé)對單機(jī)的日志收集工作;Collector層部署在中心服務(wù)器上,負(fù)責(zé)接收Agent層發(fā)送的日志,并且將日志根據(jù)路由規(guī)則寫到相應(yīng)的Store層中;Store層負(fù)責(zé)提供永久或者臨時的日志存儲服務(wù),或者將日志流導(dǎo)向其它服務(wù)器。

b. Agent到Collector使用LoadBalance策略,將所有的日志均衡地發(fā)到所有的Collector上,達(dá)到負(fù)載均衡的目標(biāo),同時并處理單個Collector失效的問題。

c. Collector層的目標(biāo)主要有三個:SinkHdfs, SinkKafka和SinkBypass。分別提供離線的數(shù)據(jù)到Hdfs,和提供實時的日志流到Kafka和Bypass。其中SinkHdfs又根據(jù)日志量的大小分為SinkHdfs_b,SinkHdfs_m和SinkHdfs_s三個Sink,以提高寫入到Hdfs的性能,具體見后面介紹。

d. 對于Store來說,Hdfs負(fù)責(zé)永久地存儲所有日志;Kafka存儲最新的7天日志,并給Storm系統(tǒng)提供實時日志流;Bypass負(fù)責(zé)給其它服務(wù)器和應(yīng)用提供實時日志流。

下圖是美團(tuán)的日志收集系統(tǒng)的模塊分解圖,詳解Agent, Collector和Bypass中的Source, Channel和Sink的關(guān)系。

a. 模塊命名規(guī)則:所有的Source以src開頭,所有的Channel以ch開頭,所有的Sink以sink開頭;

b. Channel統(tǒng)一使用美團(tuán)開發(fā)的DualChannel,具體原因后面詳述;對于過濾掉的日志使用NullChannel,具體原因后面詳述;

c. 模塊之間內(nèi)部通信統(tǒng)一使用Avro接口;

4 架構(gòu)設(shè)計考慮
下面將從可用性,可靠性,可擴(kuò)展性和兼容性等方面,對上述的架構(gòu)做細(xì)致的解析。

4.1 可用性(availablity)
對日志收集系統(tǒng)來說,可用性(availablity)指固定周期內(nèi)系統(tǒng)無故障運(yùn)行總時間。要想提高系統(tǒng)的可用性,就需要消除系統(tǒng)的單點(diǎn),提高系統(tǒng)的冗余度。下面來看看美團(tuán)的日志收集系統(tǒng)在可用性方面的考慮。

4.1.1 Agent死掉
Agent死掉分為兩種情況:機(jī)器死機(jī)或者Agent進(jìn)程死掉。

對于機(jī)器死機(jī)的情況來說,由于產(chǎn)生日志的進(jìn)程也同樣會死掉,所以不會再產(chǎn)生新的日志,不存在不提供服務(wù)的情況。

對于Agent進(jìn)程死掉的情況來說,確實會降低系統(tǒng)的可用性。對此,美團(tuán)有下面三種方式來提高系統(tǒng)的可用性。首先,所有的Agent在supervise的方式下啟動,如果進(jìn)程死掉會被系統(tǒng)立即重啟,以提供服務(wù)。其次,對所有的Agent進(jìn)行存活監(jiān)控,發(fā)現(xiàn)Agent死掉立即報警。最后,對于非常重要的日志,建議應(yīng)用直接將日志寫磁盤,Agent使用spooldir的方式獲得最新的日志。

4.1.2 Collector死掉
由于中心服務(wù)器提供的是對等的且無差別的服務(wù),且Agent訪問Collector做了LoadBalance和重試機(jī)制。所以當(dāng)某個Collector無法提供服務(wù)時,Agent的重試策略會將數(shù)據(jù)發(fā)送到其它可用的Collector上面。所以整個服務(wù)不受影響。

4.1.3 Hdfs正常停機(jī)
美團(tuán)在Collector的HdfsSink中提供了開關(guān)選項,可以控制Collector停止寫Hdfs,并且將所有的events緩存到FileChannel的功能。

4.1.4 Hdfs異常停機(jī)或不可訪問
假如Hdfs異常停機(jī)或不可訪問,此時Collector無法寫Hdfs。由于美團(tuán)使用DualChannel,Collector可以將所收到的events緩存到FileChannel,保存在磁盤上,繼續(xù)提供服務(wù)。當(dāng)Hdfs恢復(fù)服務(wù)以后,再將FileChannel中緩存的events再發(fā)送到Hdfs上。這種機(jī)制類似于Scribe,可以提供較好的容錯性。

4.1.5 Collector變慢或者Agent/Collector網(wǎng)絡(luò)變慢
如果Collector處理速度變慢(比如機(jī)器load過高)或者Agent/Collector之間的網(wǎng)絡(luò)變慢,可能導(dǎo)致Agent發(fā)送到Collector的速度變慢。同樣的,對于此種情況,美團(tuán)在Agent端使用DualChannel,Agent可以將收到的events緩存到FileChannel,保存在磁盤上,繼續(xù)提供服務(wù)。當(dāng)Collector恢復(fù)服務(wù)以后,再將FileChannel中緩存的events再發(fā)送給Collector。

4.1.6 Hdfs變慢
當(dāng)Hadoop上的任務(wù)較多且有大量的讀寫操作時,Hdfs的讀寫數(shù)據(jù)往往變的很慢。由于每天,每周都有高峰使用期,所以這種情況非常普遍。

對于Hdfs變慢的問題,美團(tuán)同樣使用DualChannel來解決。當(dāng)Hdfs寫入較快時,所有的events只經(jīng)過MemChannel傳遞數(shù)據(jù),減少磁盤IO,獲得較高性能。當(dāng)Hdfs寫入較慢時,所有的events只經(jīng)過FileChannel傳遞數(shù)據(jù),有一個較大的數(shù)據(jù)緩存空間。

4.2 可靠性(reliability)
對日志收集系統(tǒng)來說,可靠性(reliability)是指Flume在數(shù)據(jù)流的傳輸過程中,保證events的可靠傳遞。

對Flume來說,所有的events都被保存在Agent的Channel中,然后被發(fā)送到數(shù)據(jù)流中的下一個Agent或者最終的存儲服務(wù)中。那么一個Agent的Channel中的events什么時候被刪除呢?當(dāng)且僅當(dāng)它們被保存到下一個Agent的Channel中或者被保存到最終的存儲服務(wù)中。這就是Flume提供數(shù)據(jù)流中點(diǎn)到點(diǎn)的可靠性保證的最基本的單跳消息傳遞語義。

那么Flume是如何做到上述最基本的消息傳遞語義呢?

首先,Agent間的事務(wù)交換。Flume使用事務(wù)的辦法來保證event的可靠傳遞。Source和Sink分別被封裝在事務(wù)中,這些事務(wù)由保存event的存儲提供或者由Channel提供。這就保證了event在數(shù)據(jù)流的點(diǎn)對點(diǎn)傳輸中是可靠的。在多級數(shù)據(jù)流中,如下圖,上一級的Sink和下一級的Source都被包含在事務(wù)中,保證數(shù)據(jù)可靠地從一個Channel到另一個Channel轉(zhuǎn)移。

其次,數(shù)據(jù)流中 Channel的持久性。Flume中MemoryChannel是可能丟失數(shù)據(jù)的(當(dāng)Agent死掉時),而FileChannel是持久性的,提供類似mysql的日志機(jī)制,保證數(shù)據(jù)不丟失。

4.3 可擴(kuò)展性(scalability)
對日志收集系統(tǒng)來說,可擴(kuò)展性(scalability)是指系統(tǒng)能夠線性擴(kuò)展。當(dāng)日志量增大時,系統(tǒng)能夠以簡單的增加機(jī)器來達(dá)到線性擴(kuò)容的目的。

對于基于Flume的日志收集系統(tǒng)來說,需要在設(shè)計的每一層,都可以做到線性擴(kuò)展地提供服務(wù)。下面將對每一層的可擴(kuò)展性做相應(yīng)的說明。

4.3.1 Agent層
對于Agent這一層來說,每個機(jī)器部署一個Agent,可以水平擴(kuò)展,不受限制。一個方面,Agent收集日志的能力受限于機(jī)器的性能,正常情況下一個Agent可以為單機(jī)提供足夠服務(wù)。另一方面,如果機(jī)器比較多,可能受限于后端Collector提供的服務(wù),但Agent到Collector是有Load Balance機(jī)制,使得Collector可以線性擴(kuò)展提高能力。

4.3.2 Collector層
對于Collector這一層,Agent到Collector是有Load Balance機(jī)制,并且Collector提供無差別服務(wù),所以可以線性擴(kuò)展。其性能主要受限于Store層提供的能力。

4.3.3 Store層
對于Store這一層來說,Hdfs和Kafka都是分布式系統(tǒng),可以做到線性擴(kuò)展。Bypass屬于臨時的應(yīng)用,只對應(yīng)于某一類日志,性能不是瓶頸。

4.4 Channel的選擇
Flume1.4.0中,其官方提供常用的MemoryChannel和FileChannel供大家選擇。其優(yōu)劣如下:

MemoryChannel: 所有的events被保存在內(nèi)存中。優(yōu)點(diǎn)是高吞吐。缺點(diǎn)是容量有限并且Agent死掉時會丟失內(nèi)存中的數(shù)據(jù)。
FileChannel: 所有的events被保存在文件中。優(yōu)點(diǎn)是容量較大且死掉時數(shù)據(jù)可恢復(fù)。缺點(diǎn)是速度較慢。
上述兩種Channel,優(yōu)缺點(diǎn)相反,分別有自己適合的場景。然而,對于大部分應(yīng)用來說,美團(tuán)希望Channel可以同提供高吞吐和大緩存?;诖耍缊F(tuán)開發(fā)了DualChannel。

DualChannel:基于 MemoryChannel和 FileChannel開發(fā)。當(dāng)堆積在Channel中的events數(shù)小于閾值時,所有的events被保存在MemoryChannel中,Sink從MemoryChannel中讀取數(shù)據(jù); 當(dāng)堆積在Channel中的events數(shù)大于閾值時, 所有的events被自動存放在FileChannel中,Sink從FileChannel中讀取數(shù)據(jù)。這樣當(dāng)系統(tǒng)正常運(yùn)行時,美團(tuán)可以使用MemoryChannel的高吞吐特性;當(dāng)系統(tǒng)有異常時,美團(tuán)可以利用FileChannel的大緩存的特性。
4.5 和scribe兼容
在設(shè)計之初,美團(tuán)就要求每類日志都有一個category相對應(yīng),并且Flume的Agent提供AvroSource和ScribeSource兩種服務(wù)。這將保持和之前的Scribe相對應(yīng),減少業(yè)務(wù)的更改成本。

4.6 權(quán)限控制
在目前的日志收集系統(tǒng)中,美團(tuán)只使用最簡單的權(quán)限控制。只有設(shè)定的category才可以進(jìn)入到存儲系統(tǒng)。所以目前的權(quán)限控制就是category過濾。

如果權(quán)限控制放在Agent端,優(yōu)勢是可以較好地控制垃圾數(shù)據(jù)在系統(tǒng)中流轉(zhuǎn)。但劣勢是配置修改麻煩,每增加一個日志就需要重啟或者重載Agent的配置。

如果權(quán)限控制放在Collector端,優(yōu)勢是方便進(jìn)行配置的修改和加載。劣勢是部分沒有注冊的數(shù)據(jù)可能在Agent/Collector之間傳輸。

考慮到Agent/Collector之間的日志傳輸并非系統(tǒng)瓶頸,且目前日志收集屬內(nèi)部系統(tǒng),安全問題屬于次要問題,所以選擇采用Collector端控制。

4.7 提供實時流
美團(tuán)的部分業(yè)務(wù),如實時推薦,反爬蟲服務(wù)等服務(wù),需要處理實時的數(shù)據(jù)流。因此美團(tuán)希望Flume能夠?qū)С鲆环輰崟r流給Kafka/Storm系統(tǒng)。

一個非常重要的要求是實時數(shù)據(jù)流不應(yīng)該受到其它Sink的速度影響,保證實時數(shù)據(jù)流的速度。這一點(diǎn),美團(tuán)是通過Collector中設(shè)置不同的Channel進(jìn)行隔離,并且DualChannel的大容量保證了日志的處理不受Sink的影響。

5 系統(tǒng)監(jiān)控
對于一個大型復(fù)雜系統(tǒng)來說,監(jiān)控是必不可少的部分。設(shè)計合理的監(jiān)控,可以對異常情況及時發(fā)現(xiàn),只要有一部手機(jī),就可以知道系統(tǒng)是否正常運(yùn)作。對于美團(tuán)的日志收集系統(tǒng),美團(tuán)建立了多維度的監(jiān)控,防止未知的異常發(fā)生。

5.1 發(fā)送速度,擁堵情況,寫Hdfs速度
通過發(fā)送給zabbix的數(shù)據(jù),美團(tuán)可以繪制出發(fā)送數(shù)量、擁堵情況和寫Hdfs速度的圖表,對于超預(yù)期的擁堵,美團(tuán)會報警出來查找原因。

下面是Flume Collector HdfsSink寫數(shù)據(jù)到Hdfs的速度截圖:

下面是Flume Collector的FileChannel中擁堵的events數(shù)據(jù)量截圖:

5.2 flume寫hfds狀態(tài)的監(jiān)控
Flume寫入Hdfs會先生成tmp文件,對于特別重要的日志,美團(tuán)會每15分鐘左右檢查一下各個Collector是否都產(chǎn)生了tmp文件,對于沒有正常產(chǎn)生tmp文件的Collector和日志美團(tuán)需要檢查是否有異常。這樣可以及時發(fā)現(xiàn)Flume和日志的異常.

5.3 日志大小異常監(jiān)控
對于重要的日志,美團(tuán)會每個小時都監(jiān)控日志大小周同比是否有較大波動,并給予提醒,這個報警有效的發(fā)現(xiàn)了異常的日志,且多次發(fā)現(xiàn)了應(yīng)用方日志發(fā)送的異常,及時給予了對方反饋,幫助他們及早修復(fù)自身系統(tǒng)的異常。

通過上述的講解,美團(tuán)可以看到,基于Flume的美團(tuán)日志收集系統(tǒng)已經(jīng)是具備高可用性,高可靠性,可擴(kuò)展等特性的分布式服務(wù)。

改進(jìn)和優(yōu)化
下面,美團(tuán)將會講述在實際部署和使用過程中遇到的問題,對Flume的功能改進(jìn)和對系統(tǒng)做的優(yōu)化。

1 Flume的問題總結(jié)
在Flume的使用過程中,遇到的主要問題如下:

a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰時常報隊列大小不夠的異常;使用FileChannel又導(dǎo)致IO繁忙的問題;

b. HdfsSink的性能問題:使用HdfsSink向Hdfs寫日志,在高峰時間速度較慢;

c. 系統(tǒng)的管理問題:配置升級,模塊重啟等;

2 Flume的功能改進(jìn)和優(yōu)化點(diǎn)
從上面的問題中可以看到,有一些需求是原生Flume無法滿足的,因此,基于開源的Flume美團(tuán)增加了許多功能,修改了一些Bug,并且進(jìn)行一些調(diào)優(yōu)。下面將對一些主要的方面做一些說明。

2.1 增加Zabbix monitor服務(wù)
一方面,F(xiàn)lume本身提供了http, ganglia的監(jiān)控服務(wù),而美團(tuán)目前主要使用zabbix做監(jiān)控。因此,美團(tuán)為Flume添加了zabbix監(jiān)控模塊,和sa的監(jiān)控服務(wù)無縫融合。

另一方面,凈化Flume的metrics。只將美團(tuán)需要的metrics發(fā)送給zabbix,避免 zabbix server造成壓力。目前美團(tuán)最為關(guān)心的是Flume能否及時把應(yīng)用端發(fā)送過來的日志寫到Hdfs上, 對應(yīng)關(guān)注的metrics為:

Source : 接收的event數(shù)和處理的event數(shù)
Channel : Channel中擁堵的event數(shù)
Sink : 已經(jīng)處理的event數(shù)


2.2 為HdfsSink增加自動創(chuàng)建index功能
首先,美團(tuán)的HdfsSink寫到hadoop的文件采用lzo壓縮存儲。 HdfsSink可以讀取hadoop配置文件中提供的編碼類列表,然后通過配置的方式獲取使用何種壓縮編碼,美團(tuán)目前使用lzo壓縮數(shù)據(jù)。采用lzo壓縮而非bz2壓縮,是基于以下測試數(shù)據(jù):

其次,美團(tuán)的HdfsSink增加了創(chuàng)建lzo文件后自動創(chuàng)建index功能。Hadoop提供了對lzo創(chuàng)建索引,使得壓縮文件是可切分的,這樣Hadoop Job可以并行處理數(shù)據(jù)文件。HdfsSink本身lzo壓縮,但寫完lzo文件并不會建索引,美團(tuán)在close文件之后添加了建索引功能。

Java Code復(fù)制內(nèi)容到剪貼板
  1. /**
  2. *RenamebucketPathfilefrom.tmptopermanentlocation.
  3. */
  4. privatevoidrenameBucket()throwsIOException,InterruptedException{
  5. if(bucketPath.equals(targetPath)){
  6. return;
  7. }
  8. finalPathsrcPath=newPath(bucketPath);
  9. finalPathdstPath=newPath(targetPath);
  10. callWithTimeout(newCallRunner<Object>(){
  11. @Override
  12. publicObjectcall()throwsException{
  13. if(fileSystem.exists(srcPath)){//couldblock
  14. LOG.info("Renaming"+srcPath+"to"+dstPath);
  15. fileSystem.rename(srcPath,dstPath);//couldblock
  16. //indexthedstPathlzofile
  17. if(codeC!=null&&".lzo".equals(codeC.getDefaultExtension())){
  18. LzoIndexerlzoIndexer=newLzoIndexer(newConfiguration());
  19. lzoIndexer.index(dstPath);
  20. }
  21. }
  22. returnnull;
  23. }
  24. });
  25. }


2.3 增加HdfsSink的開關(guān)
美團(tuán)在HdfsSink和DualChannel中增加開關(guān),當(dāng)開關(guān)打開的情況下,HdfsSink不再往Hdfs上寫數(shù)據(jù),并且數(shù)據(jù)只寫向DualChannel中的FileChannel。以此策略來防止Hdfs的正常停機(jī)維護(hù)。

2.4 增加DualChannel
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。美團(tuán)希望利用兩者的優(yōu)勢,在Sink處理速度夠快,Channel沒有緩存過多日志的時候,就使用MemoryChannel,當(dāng)Sink處理速度跟不上,又需要Channel能夠緩存下應(yīng)用端發(fā)送過來的日志時,就使用FileChannel,由此美團(tuán)開發(fā)了DualChannel,能夠智能的在兩個Channel之間切換。

其具體的邏輯如下:

Java Code復(fù)制內(nèi)容到剪貼板
  1. /***
  2. *putToMemChannelindicateputeventtomemChannelorfileChannel
  3. *takeFromMemChannelindicatetakeeventfrommemChannelorfileChannel
  4. **/
  5. privateAtomicBooleanputToMemChannel=newAtomicBoolean(true);
  6. privateAtomicBooleantakeFromMemChannel=newAtomicBoolean(true);
  7. voiddoPut(Eventevent){
  8. if(switchon&&putToMemChannel.get()){
  9. //往memChannel中寫數(shù)據(jù)
  10. memTransaction.put(event);
  11. if(memChannel.isFull()||fileChannel.getQueueSize()>100){
  12. putToMemChannel.set(false);
  13. }
  14. }else{
  15. //往fileChannel中寫數(shù)據(jù)
  16. fileTransaction.put(event);
  17. }
  18. }
  19. EventdoTake(){
  20. Eventevent=null;
  21. if(takeFromMemChannel.get()){
  22. //從memChannel中取數(shù)據(jù)
  23. event=memTransaction.take();
  24. if(event==null){
  25. takeFromMemChannel.set(false);
  26. }
  27. }else{
  28. //從fileChannel中取數(shù)據(jù)
  29. event=fileTransaction.take();
  30. if(event==null){
  31. takeFromMemChannel.set(true);
  32. putToMemChannel.set(true);
  33. }
  34. }
  35. returnevent;
  36. }


2.5 增加NullChannel
Flume提供了NullSink,可以把不需要的日志通過NullSink直接丟棄,不進(jìn)行存儲。然而,Source需要先將events存放到Channel中,NullSink再將events取出扔掉。為了提升性能,美團(tuán)把這一步移到了Channel里面做,所以開發(fā)了NullChannel。

2.6 增加KafkaSink
為支持向Storm提供實時數(shù)據(jù)流,美團(tuán)增加了KafkaSink用來向Kafka寫實時數(shù)據(jù)流。其基本的邏輯如下:

Java Code復(fù)制內(nèi)容到剪貼板
  1. publicclassKafkaSinkextendsAbstractSinkimplementsConfigurable{
  2. privateStringzkConnect;
  3. privateIntegerzkTimeout;
  4. privateIntegerbatchSize;
  5. privateIntegerqueueSize;
  6. privateStringserializerClass;
  7. privateStringproducerType;
  8. privateStringtopicPrefix;
  9. privateProducer<String,String>producer;
  10. publicvoidconfigure(Contextcontext){
  11. //讀取配置,并檢查配置
  12. }
  13. @Override
  14. publicsynchronizedvoidstart(){
  15. //初始化producer
  16. }
  17. @Override
  18. publicsynchronizedvoidstop(){
  19. //關(guān)閉producer
  20. }
  21. @Override
  22. publicStatusprocess()throwsEventDeliveryException{
  23. Statusstatus=Status.READY;
  24. Channelchannel=getChannel();
  25. Transactiontx=channel.getTransaction();
  26. try{
  27. tx.begin();
  28. //將日志按category分隊列存放
  29. Map<String,List<String>>topic2EventList=newHashMap<String,List<String>>();
  30. //從channel中取batchSize大小的日志,從header中獲取category,生成topic,并存放于上述的Map中;
  31. //將Map中的數(shù)據(jù)通過producer發(fā)送給kafka
  32. tx.commit();
  33. }catch(Exceptione){
  34. tx.rollback();
  35. thrownewEventDeliveryException(e);
  36. }finally{
  37. tx.close();
  38. }
  39. returnstatus;
  40. }
  41. }


2.7 修復(fù)和scribe的兼容問題
Scribed在通過ScribeSource發(fā)送數(shù)據(jù)包給Flume時,大于4096字節(jié)的包,會先發(fā)送一個Dummy包檢查服務(wù)器的反應(yīng),而Flume的ScribeSource對于logentry.size()=0的包返回TRY_LATER,此時Scribed就認(rèn)為出錯,斷開連接。這樣循環(huán)反復(fù)嘗試,無法真正發(fā)送數(shù)據(jù)。現(xiàn)在在ScribeSource的Thrift接口中,對size為0的情況返回OK,保證后續(xù)正常發(fā)送數(shù)據(jù)。

3. Flume系統(tǒng)調(diào)優(yōu)經(jīng)驗總結(jié)
3.1 基礎(chǔ)參數(shù)調(diào)優(yōu)經(jīng)驗
HdfsSink中默認(rèn)的serializer會每寫一行在行尾添加一個換行符,美團(tuán)日志本身帶有換行符,這樣會導(dǎo)致每條日志后面多一個空行,修改配置不要自動添加換行符;
lc.sinks.sink_hdfs.serializer.appendNewline = false
調(diào)大MemoryChannel的capacity,盡量利用MemoryChannel快速的處理能力;
調(diào)大HdfsSink的batchSize,增加吞吐量,減少hdfs的flush次數(shù);
適當(dāng)調(diào)大HdfsSink的callTimeout,避免不必要的超時錯誤;


3.2 HdfsSink獲取Filename的優(yōu)化
HdfsSink的path參數(shù)指明了日志被寫到Hdfs的位置,該參數(shù)中可以引用格式化的參數(shù),將日志寫到一個動態(tài)的目錄中。這方便了日志的管理。例如美團(tuán)可以將日志寫到category分類的目錄,并且按天和按小時存放:

lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H
HdfsS ink中處理每條event時,都要根據(jù)配置獲取此event應(yīng)該寫入的Hdfs path和filename,默認(rèn)的獲取方法是通過正則表達(dá)式替換配置中的變量,獲取真實的path和filename。因為此過程是每條event都要做的操作,耗時很長。通過美團(tuán)的測試,20萬條日志,這個操作要耗時6-8s左右。

由于美團(tuán)目前的path和filename有固定的模式,可以通過字符串拼接獲得。而后者比正則匹配快幾十倍。拼接定符串的方式,20萬條日志的操作只需要幾百毫秒。

3.3 HdfsSink的b/m/s優(yōu)化
在美團(tuán)初始的設(shè)計中,所有的日志都通過一個Channel和一個HdfsSink寫到Hdfs上。美團(tuán)來看一看這樣做有什么問題。

首先,美團(tuán)來看一下HdfsSink在發(fā)送數(shù)據(jù)的邏輯:

Java Code復(fù)制內(nèi)容到剪貼板
  1. //從Channel中取batchSize大小的events
  2. for(txnEventCount=0;txnEventCount<batchSize;txnEventCount++){
  3. //對每條日志根據(jù)categoryappend到相應(yīng)的bucketWriter上;
  4. bucketWriter.append(event);
  5. for(BucketWriterbucketWriter:writers){
  6. //然后對每一個bucketWriter調(diào)用相應(yīng)的flush方法將數(shù)據(jù)flush到Hdfs上
  7. bucketWriter.flush();

假設(shè)美團(tuán)的系統(tǒng)中有100個category,batchSize大小設(shè)置為20萬。則每20萬條數(shù)據(jù),就需要對100個文件進(jìn)行append或者flush操作。

其次,對于美團(tuán)的日志來說,基本符合80/20原則。即20%的category產(chǎn)生了系統(tǒng)80%的日志量。這樣對大部分日志來說,每20萬條可能只包含幾條日志,也需要往Hdfs上flush一次。

上述的情況會導(dǎo)致HdfsSink寫Hdfs的效率極差。下圖是單Channel的情況下每小時的發(fā)送量和寫hdfs的時間趨勢圖。

鑒于這種實際應(yīng)用場景,美團(tuán)把日志進(jìn)行了大小歸類,分為big, middle和small三類,這樣可以有效的避免小日志跟著大日志一起頻繁的flush,提升效果明顯。下圖是分隊列后big隊列的每小時的發(fā)送量和寫hdfs的時間趨勢圖。

4 未來發(fā)展
目前,F(xiàn)lume日志收集系統(tǒng)提供了一個高可用,高可靠,可擴(kuò)展的分布式服務(wù),已經(jīng)有效地支持了美團(tuán)的日志數(shù)據(jù)收集工作。

后續(xù),美團(tuán)將在如下方面繼續(xù)研究:

日志管理系統(tǒng):圖形化的展示和控制日志收集系統(tǒng);
跟進(jìn)社區(qū)發(fā)展:跟進(jìn)Flume 1.5的進(jìn)展,同時回饋社區(qū);

版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。

實時開通

自選配置、實時開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對1客戶咨詢顧問

在線
客服

在線客服:7*24小時在線

客服
熱線

400-630-3752
7*24小時客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部