劉邦 余華平
摘要:消息代理的使用有多種原因(將處理與數(shù)據(jù)生成器分離,緩沖未處理的消息等)。Kafka作為一個(gè)分布式消息隊(duì)列,可以替代更傳統(tǒng)的消息代理,與大多數(shù)消息傳遞系統(tǒng)相比,具有更好的吞吐量,內(nèi)置分區(qū),高性能,復(fù)制和容錯(cuò)功能,這使其成為大規(guī)模消息處理應(yīng)用程序的理想解決方案。Kafka對(duì)外使用topic的概念,生產(chǎn)者往topic里寫消息,消費(fèi)者從各個(gè)top-ic中讀取消息。每個(gè)topic是由多個(gè)partition組成,雖然partition中的消息是有序的,但是多個(gè)partition是無序的,需要保證消息的有序讀寫,并且提高Kafka的性能。
關(guān)鍵詞:Kafka;topic;partition;高性能;分布式消息隊(duì)列
中圖分類號(hào):TP391 文獻(xiàn)標(biāo)識(shí)碼:A
文章編號(hào):1009-3044(2019132-0004-03
1概述
大數(shù)據(jù)階段為了分析用戶的行為,我們將各類日志信息收集并保存到hadoop上做離線的處理,與此同時(shí),我們將日志信息置于檢索系統(tǒng)中,方便高效定位問題所在。核心上,該問題是集成數(shù)據(jù)的問題,但是一個(gè)系統(tǒng)并不能解決所有的問題,邏輯上我們用不同的系統(tǒng)處理不同的業(yè)務(wù)數(shù)據(jù),例如歸類、查找、分析、緩存等。系統(tǒng)中產(chǎn)生的數(shù)據(jù)冗余沒有問題,但是將不同系統(tǒng)中的數(shù)據(jù)進(jìn)行同步時(shí),就會(huì)產(chǎn)生一系列問題。Kafka處理數(shù)據(jù)冗余的做法是提供一個(gè)分布式消息隊(duì)列,讓數(shù)據(jù)生產(chǎn)者向隊(duì)列的末尾添加數(shù)據(jù),然后消費(fèi)者依次從隊(duì)列里面讀取數(shù)據(jù),這樣保證數(shù)據(jù)以合適的形式出現(xiàn)在合適的地方。
2Kafka架構(gòu)組件及數(shù)據(jù)流程
2.1Kafka架構(gòu)組件
每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic。我們?cè)诰唧w操作中為每類的數(shù)據(jù)信息創(chuàng)建一個(gè)topic,生產(chǎn)者(producer)即數(shù)據(jù)的發(fā)布者,該角色將消息發(fā)布到Kafka的topic中,消費(fèi)者(consumer)可以從broker中讀取數(shù)據(jù),消費(fèi)者可以消費(fèi)多個(gè)topic中的數(shù)據(jù)。Producers和consumers進(jìn)行操作時(shí),可同時(shí)在多個(gè)topic中讀寫數(shù)據(jù)。Kafka集群包含一個(gè)或多個(gè)服務(wù)器,服務(wù)器節(jié)點(diǎn)稱為broker,它的職責(zé)是負(fù)責(zé)持久化和備份具體的kafka消息。
topic:消息存放的目錄即主題
Producer:生產(chǎn)消息到topic的一方
Consumer:訂閱topic消費(fèi)消息的一方
Broker:Kafka的服務(wù)實(shí)例就是一個(gè)broker
2.1.1Kafka中的topic和partition
在Kafka中,Topic是一個(gè)存儲(chǔ)消息的邏輯概念,可以認(rèn)為是一個(gè)消息集合。每條發(fā)送到Kafka集群的消息都有一個(gè)類別。我們?cè)诓僮髦挟a(chǎn)生的不同類型的數(shù)據(jù),都能將其分類為不同的topic。一把情況下,一個(gè)topic會(huì)有多個(gè)消息的訂閱者,當(dāng)producer發(fā)布消息到某個(gè)topic時(shí),訂閱了該topic的consumer都可以接收到producer寫入的新消息。Kafka為每個(gè)topic維護(hù)了分布式的分區(qū)(Partition)日志文件,所有的partition在Kafka存儲(chǔ)的層面都是Append Log。所有發(fā)布到該parti-tion的消息都將會(huì)置于Log日志文件的尾部,在partition中按照時(shí)間順序,每條消息均會(huì)分配一個(gè)單調(diào)遞增的順序編號(hào),這也是我們的位移offset。系統(tǒng)中Offset默認(rèn)是一個(gè)Long型的數(shù)字。我們可以通過該offset確定一條在該分區(qū)下的唯一消息。在分區(qū)中保證了消息的有序性,但是在topic中,信息的有序性沒有得到保證。
2.2 Kafka數(shù)據(jù)流程
使用Kafka作為消息中間件,我們需要涉及包括Kafka集群,分布式協(xié)調(diào)中心(Zookeeperl,生產(chǎn)者,消費(fèi)者在內(nèi)的四個(gè)部分對(duì)象,它們協(xié)同工作,讓消息高吞吐高可靠的存儲(chǔ)和流通。生產(chǎn)者往topic中寫數(shù)據(jù),消費(fèi)者從中讀數(shù)據(jù),每當(dāng)新增一條消息時(shí),kafka就會(huì)在對(duì)應(yīng)的文件append寫,用這種方式處理消息,確保kafka的性能非常高。
2.3Kafka消費(fèi)模型
消息由生產(chǎn)者發(fā)送到Kafka集群后,會(huì)被消費(fèi)者消費(fèi)。一般來說我們的消費(fèi)模型有兩種:一種是推送模型(Push);另一種是拉取模型(pull)。
在推送模型(Push)的消息系統(tǒng)中,是由消息代理記錄消息的消費(fèi)狀態(tài)。消息代理將消息推送到consumer后,然后標(biāo)記該消息為已經(jīng)被消費(fèi)狀態(tài),但是這種方式有個(gè)缺點(diǎn),它無法很好地保證消息消費(fèi)的處理語義。例如,當(dāng)我們已經(jīng)把消息發(fā)送給consumer之后,由于網(wǎng)絡(luò)原因或者消費(fèi)進(jìn)程宕機(jī)等原因,消費(fèi)者沒有收到該消息,如果此時(shí)我們?cè)谙M(fèi)代理中將該消息標(biāo)記為已消費(fèi),那將會(huì)出現(xiàn)該消息永久丟失的情況。如果我們采用producer收到消息后回復(fù)這種方法,消息代理需要自己記錄消息的消費(fèi)狀態(tài),這種方法不合適。如果我們采用推送模型,消費(fèi)代理將會(huì)完全控制消息消費(fèi)的速率,如果consumer發(fā)生突發(fā)情況,形成阻塞,就會(huì)出現(xiàn)一系列問題?;谶@種情況,Kafka采取拉取模型(Poll),由自己控制消費(fèi)速度及進(jìn)度,consumer可以按照任意的offset進(jìn)行消費(fèi)。例如,消費(fèi)者可以對(duì)已經(jīng)消費(fèi)過的消息進(jìn)行重新處理,或者是消費(fèi)近期的消息等。
3 Kafka高性能的實(shí)現(xiàn)
3.1分區(qū)
kafka是個(gè)分布式集群的系統(tǒng),整個(gè)系統(tǒng)可以包含多個(gè)bro-ker,也就是多個(gè)服務(wù)器實(shí)例。每個(gè)主題topic會(huì)有多個(gè)分區(qū),kafka將分區(qū)均勻地分配到整個(gè)集群中,當(dāng)生產(chǎn)者向?qū)?yīng)主題傳遞消息,消息通過負(fù)載均衡機(jī)制傳遞到不同的分區(qū)以減輕單個(gè)服務(wù)器實(shí)例的壓力。一個(gè)Consumer Group中可以有多個(gè)consumer,多個(gè)consumer可以同時(shí)消費(fèi)不同分區(qū)的消息,大大地提高了消費(fèi)者的并行消費(fèi)能力。但是一個(gè)分區(qū)中的消息只能被一個(gè)Consumer Group中的一個(gè)consumer消費(fèi)。如圖3所示。
3.2網(wǎng)絡(luò)傳輸上減少開銷
3.2.1批量發(fā)送
在發(fā)送消息的時(shí)候,kafka不會(huì)直接將少量數(shù)據(jù)發(fā)送出去,否則每次發(fā)送少量的數(shù)據(jù)會(huì)增加網(wǎng)絡(luò)傳輸頻率,降低網(wǎng)絡(luò)傳輸效率。kafka會(huì)先將消息緩存在內(nèi)存中,當(dāng)超過一個(gè)的大小或者超過一定的時(shí)間,那么會(huì)將這些消息進(jìn)行批量發(fā)送。
3.2.2端到端壓縮
網(wǎng)絡(luò)傳輸時(shí)數(shù)據(jù)量小時(shí)也可以減小網(wǎng)絡(luò)負(fù)載,kafaka會(huì)將這些批量的數(shù)據(jù)進(jìn)行壓縮,將一批消息打包后進(jìn)行壓縮,發(fā)送broker服務(wù)器后,最終這些數(shù)據(jù)還是提供給消費(fèi)者用,所以數(shù)據(jù)在服務(wù)器上還是保持壓縮狀態(tài),不會(huì)進(jìn)行解壓,而且頻繁的壓縮和解壓也會(huì)降低性能,最終還是以壓縮的方式傳遞到消費(fèi)者的手上。
3.3順序讀寫
kafka是個(gè)可持久化的日志服務(wù),它將數(shù)據(jù)以數(shù)據(jù)日志的形式進(jìn)行追加,最后持久化在磁盤中。katka消息存儲(chǔ)時(shí)依賴于文件系統(tǒng),我們普遍認(rèn)為磁盤的性能比不上內(nèi)存性能,但是kafka卻將磁盤性能發(fā)揮得淋漓盡致。在一個(gè)由6個(gè)7200rpm的SATA硬盤組成的RAID-5磁盤陣列上,線性寫入(linearwrite)的速度大約是300MB/秒,但隨即寫入?yún)s只有50k/秒??梢姶疟P的線性和隨機(jī)讀寫的速度差距甚大。為了利用數(shù)據(jù)的局部相關(guān)性,操作系統(tǒng)從磁盤中讀取數(shù)據(jù)以數(shù)據(jù)塊為單位,將一個(gè)數(shù)據(jù)塊讀入內(nèi)存中,如果有相鄰的數(shù)據(jù),就不用再去磁盤中讀取。在某些情況下,順序磁盤訪問能比隨機(jī)內(nèi)存訪問還要快。同時(shí)在寫數(shù)據(jù)的時(shí)候也是將一整塊數(shù)據(jù)塊寫人磁盤中,大大提升了10效率。
現(xiàn)代操作系統(tǒng)樂于將更多的空閑內(nèi)存來當(dāng)作磁盤緩存。當(dāng)我們?cè)诔绦蛑袑?duì)數(shù)據(jù)進(jìn)行緩存時(shí),可能這些數(shù)據(jù)已經(jīng)緩存在了操作系統(tǒng)的緩存頁中。我們將緩存的操作邏輯交給操作系統(tǒng),那么比我們自己維護(hù)來得更加高效。所以使用磁盤的方式進(jìn)行線性地讀取數(shù)據(jù)也有很高的效率。
kafka將消息追加到日志文件中,正是利用了磁盤的順序讀寫,來提高讀寫效率。我們平時(shí)操作磁盤可能會(huì)用Btree這種數(shù)據(jù)結(jié)構(gòu),但是運(yùn)算的時(shí)間復(fù)雜度為0(10gN),持久化隊(duì)列利用追加日志的方式構(gòu)建,生產(chǎn)者將消息追加到日志尾部,消費(fèi)者讀取頭部的消息,兩者互不干擾,也不需要加鎖,提高了性能,同時(shí)時(shí)間復(fù)雜度為O(1)。
3.4零拷貝
kafka將數(shù)據(jù)以日志的形式保存在磁盤中。當(dāng)消費(fèi)者向服務(wù)器請(qǐng)求數(shù)據(jù),那么需要從文件傳輸?shù)絪ocket中。那么從文件到sueker需要以下這些步驟:
①調(diào)用read陷入內(nèi)核模式,操作系統(tǒng)將數(shù)據(jù)從磁盤讀到內(nèi)核緩沖區(qū);
②然后從內(nèi)核態(tài)切換到用戶態(tài),應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀取到用戶空間的緩沖區(qū);
③然后應(yīng)用程序?qū)?shù)據(jù)寫帶內(nèi)核空間的socket緩沖區(qū);
④最后操作系統(tǒng)將socket緩沖區(qū)的數(shù)據(jù)拷貝到網(wǎng)卡接口緩沖區(qū)并發(fā)出去。
從上面可見,當(dāng)我們將數(shù)據(jù)從文件傳輸?shù)絪ocket最后發(fā)送出去經(jīng)過了好幾次拷貝,同時(shí)還有好幾次的用戶態(tài)和內(nèi)核態(tài)的切換,我們知道用戶態(tài)和內(nèi)核態(tài)的切換也是很耗時(shí)的,那么多次拷貝對(duì)性能的影響更是雪上加霜。
從上面的過程來看,可以看出沒必要從內(nèi)核空間的緩沖區(qū)拷貝到用戶空間。所以零拷貝技術(shù)正是改進(jìn)了這項(xiàng)缺點(diǎn),零拷貝將文件內(nèi)容從磁盤通過DMA引擎復(fù)制到內(nèi)核緩沖區(qū),而且沒有把數(shù)據(jù)復(fù)制到socket緩沖區(qū),只是將數(shù)據(jù)位置和長度信息的描述符復(fù)制到了socket緩存區(qū),然后直接將數(shù)據(jù)傳輸?shù)骄W(wǎng)絡(luò)接口,最后發(fā)送。這樣大大減小了拷貝的次數(shù),提高了效率,kafka正是調(diào)用linux系統(tǒng)給出的sendfile系統(tǒng)調(diào)用來使用零拷貝。
3.5優(yōu)秀的文件存儲(chǔ)機(jī)制
之前說過一個(gè)主題可以有多個(gè)分區(qū),假設(shè)只有一個(gè)服務(wù)器broker,那么多個(gè)分區(qū)必然是存在一個(gè)服務(wù)器上。kafka將一個(gè)分區(qū)以一個(gè)目錄的方式存儲(chǔ),目錄的命名為topicname-分區(qū)下標(biāo),例如有個(gè)topic叫作hello,有3個(gè)分區(qū),那么就有三個(gè)文件夾分別為hello-0,hello-1,hello-2。在一個(gè)分區(qū)文件夾中,又分為多個(gè)段文件。段文件又由一個(gè)index索引文件和一個(gè)log實(shí)質(zhì)的數(shù)據(jù)日志文件構(gòu)成。文件的命名規(guī)則為日志文件中第一個(gè)消息的offset值一1,offset可以理解為消息id,例如一個(gè)000…0015354.10g這個(gè)文件中消息最小的offset為15353。
log數(shù)據(jù)文件由消息和偏移量構(gòu)成,而索引文件中的索引用的是稀疏索引。稀疏索引減少的索引文件的大小,索引文件中存著消息的物理偏移量。
4總結(jié)
Kafka中生產(chǎn)者及消費(fèi)者是直接與broker進(jìn)行交互實(shí)現(xiàn)生產(chǎn)消費(fèi)功能,Kalka在設(shè)計(jì)上并未采用傳統(tǒng)系統(tǒng)中通過增加一層代理實(shí)現(xiàn)系統(tǒng)的平行擴(kuò)展能力。Kafka在設(shè)計(jì)中通過內(nèi)部路由協(xié)議,實(shí)現(xiàn)了生產(chǎn)者與消費(fèi)者可以直接與broker進(jìn)行路由協(xié)商,從而實(shí)現(xiàn)了客戶端直接與broker進(jìn)行生產(chǎn)消費(fèi),而不需要借助第三方代理。無代理的方式不僅會(huì)減少整個(gè)數(shù)據(jù)鏈路的長度,降低延遲,也可以提高整個(gè)系統(tǒng)的穩(wěn)定性,而且也會(huì)節(jié)省大量的成本。
在優(yōu)化其性能方面,采用分區(qū)、網(wǎng)絡(luò)傳輸減少開銷、順序讀寫、零拷貝、優(yōu)秀的文件存儲(chǔ)機(jī)制等方式,提高Kafka性能,另外我們也可以通過進(jìn)一步了解Kafka的架構(gòu),找出它可能的瓶頸點(diǎn),然后在針對(duì)瓶頸點(diǎn)進(jìn)行優(yōu)化優(yōu)化,根據(jù)實(shí)際的需求,我們可以做出對(duì)應(yīng)的優(yōu)化,提高其性能。