王巖++王純
摘要:伴隨著互聯(lián)網(wǎng)和移動互聯(lián)網(wǎng)的發(fā)展,各種新興應(yīng)用層出不窮,對大數(shù)據(jù)處理的實時性和高并發(fā)能力要求也在不斷提高。Apache Kafka,作為一種分布式的消息系統(tǒng),具有可水平擴(kuò)展和高吞吐率而被廣泛的使用。對于數(shù)據(jù)業(yè)務(wù)的基礎(chǔ)支撐系統(tǒng),除了能夠滿足高并發(fā)度和實時性以外,數(shù)據(jù)的質(zhì)量即數(shù)據(jù)可靠性也是關(guān)鍵的一環(huán)。但是,由Kafka原生提供的數(shù)據(jù)消費者不能夠保障數(shù)據(jù)的可靠性。本文首先簡單介紹了Kafka的組成、架構(gòu)特性等技術(shù)背景,然后闡述了原生Consumer的原理和缺陷;最后,基于Kafka提出一個可靠的消費者的設(shè)計方案。本方案是基于Kafka的low-level的接口集,解決了Kafka原生Consumer由于將用戶消費數(shù)據(jù)的動作與數(shù)據(jù)消費位置的記錄獨立而引起的數(shù)據(jù)質(zhì)量問題,保障了數(shù)據(jù)的可靠性。最后,搭建Kafka集群測試環(huán)境,驗證了方案的可行性和正確性。
關(guān)鍵詞:Kafka;數(shù)據(jù)可靠性;zookeeper;實時
中圖分類號:TP311.5
文獻(xiàn)標(biāo)識碼:B
DOI: 10.3969/j.issn.1003-6970.2016.01.015
0 引言
隨著互聯(lián)網(wǎng)行業(yè)的不斷發(fā)展,各種業(yè)務(wù)的數(shù)據(jù)量不斷增多,在大數(shù)據(jù)處理環(huán)境下,對數(shù)據(jù)的實時性要求不斷提高。筆者原有的技術(shù)環(huán)境采用ftp技術(shù)作為數(shù)據(jù)傳輸手段和傳統(tǒng)關(guān)系型數(shù)據(jù)庫和文件系統(tǒng)作為存儲介質(zhì),效率較低,無法滿足客戶對數(shù)據(jù)實時性的要求。Apache KafkaⅢ,作為一種分布式的消息系統(tǒng),具有可水平擴(kuò)展、高吞吐率和實時性而被廣泛的使用。筆者為迎合項目的需求采用Kaika作為數(shù)據(jù)訂閱和發(fā)布系統(tǒng),完成數(shù)據(jù)的傳輸和緩存功能。最初,由于初學(xué)Katka采用Kafka原生提供的High Level的Api,編寫數(shù)據(jù)生產(chǎn)者和消費者。隨著使用的深入和業(yè)務(wù)數(shù)據(jù)量的增大,發(fā)現(xiàn)數(shù)據(jù)質(zhì)量不能得到保障,雖然偏差不大,但是對于某些敏感數(shù)據(jù),對于數(shù)據(jù)質(zhì)量要求十分嚴(yán)苛。
對于數(shù)據(jù)業(yè)務(wù)的基礎(chǔ)支撐系統(tǒng),除了能夠滿足高并發(fā)度和實時性以外,數(shù)據(jù)的質(zhì)量,也即數(shù)據(jù)可靠性也是關(guān)鍵的一環(huán)。本文的研究目的在于基于Kafka的底層Api給出一種具有數(shù)據(jù)可靠性的數(shù)據(jù)消費者(Consumer)的設(shè)計方案。
1 技術(shù)背景簡介
1.1 名詞介紹
主題(topic)是Kafka用于區(qū)分所發(fā)布消息的類別或是名,即一個主題包含一類消息。
分區(qū)(partitions)是Kafka為于每一個主題維護(hù)了若干個隊列,稱為分區(qū)。
假設(shè)有一個擁有3個分區(qū)的主題,其中主題(topic)和分區(qū)關(guān)系如下圖,
Kafka中每個主題的每一個分區(qū)是一個有序?qū)懭搿⒉豢勺兊南⑿蛄?,一個topic下可以擁有多個分區(qū)。
消息偏移量(offset)是Kafka賦予每個分區(qū)(partition)內(nèi)的每條消息一個唯一的遞增的序列號,稱為消息偏移量(offset)。
生產(chǎn)者(producer)是根據(jù)對于主題的選擇向Kafka的發(fā)布消息,即向broker push消息的一系列進(jìn)程。生產(chǎn)者負(fù)責(zé)決定某一條消息該被被發(fā)往選定主題(topic)的哪一個分區(qū)(partition)。
消費者(consumer)是向主題注冊,并且接收發(fā)布到這些主題的消息,即消費一類消息的進(jìn)程或集群。
代理(broker)是組成Kafka集群的單元。Kafka以一個擁有一臺或多臺服務(wù)器的分布式集群形式運行著,每一臺服務(wù)器稱為broker。
副本(replications)即分區(qū)的備份,以便容錯,分布在其他broker上,每個broker上只能有這個分區(qū)的0到1個副本,即最多只能有一個。
消費者群組(Consumer Group)是有若干個消費者組成的集體。每個Consumer屬于一個特定的Consumer Group。Kafka采用將Consumer分組的方式實現(xiàn)一個主題(Topic)的消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個Consumer)。
1.2 Kafka的基本架構(gòu)
Kafka是一個分布式的消息訂閱和發(fā)布的系統(tǒng)。消息的發(fā)布者稱作producer,將消息的訂閱者稱consumer.將中間的存儲陣列稱作broker。
圖2極為簡要的描述了一個消息訂閱和發(fā)布系統(tǒng),所必須具備的角色和工作機(jī)制。生產(chǎn)者(producer)將數(shù)據(jù)生產(chǎn)出來,推送給代理者(broker)進(jìn)行存儲,消費者需要消費數(shù)據(jù)了,就從broker中拉取數(shù)據(jù)來,然后完成一系列對數(shù)據(jù)的處理。
圖3展示了Kafka作為消息訂閱和發(fā)布系統(tǒng)的典型系統(tǒng)架構(gòu)模型。多個代理者(broker)協(xié)同合作,組成了Kafka集群。Kafka的集群架構(gòu)采用P2P (peerto peer)模式。集群中沒有主節(jié)點,所有節(jié)點都平等作為消息的處理節(jié)點。優(yōu)點是沒有單點問題,一部分節(jié)點宕機(jī),服務(wù)仍能夠正常,缺點是很難達(dá)成數(shù)據(jù)的一致性和多機(jī)備份,如果一部分節(jié)點宕機(jī)會導(dǎo)致數(shù)據(jù)的丟失。
Kafka為避免上述的問題采用主節(jié)點選舉機(jī)制,利用zookeeper,對于每一個主題(topic)的分區(qū)(partitions),選出一個leader-broker(主節(jié)點),其余broker為followers(從節(jié)點),leader處理消息的寫入和備份;當(dāng)leader宕機(jī),采用選舉算法,從followers中選出新的leader,以保障服務(wù)的可用,同時保障了消息的備份和一致性。
生產(chǎn)者(producer)和消費者(consumer)部署在各個業(yè)務(wù)邏輯中被頻繁的調(diào)用,三者通過zookeeper管理協(xié)調(diào)請求和轉(zhuǎn)發(fā)。這樣一個高性能的分布式消息發(fā)布與訂閱系統(tǒng)就完成了。producer到broker的過程是push,也就是有數(shù)據(jù)就推送到broker,而consumer到broker的過程是pull,是通過consumer主動去拉取數(shù)據(jù)的,而不是broker把數(shù)據(jù)主動發(fā)送到consumer端的。
2 原生Kafka-Consumer的原理和缺陷
2.1 設(shè)計原理
圖4就是Kafka原生的Consumer的架構(gòu)的簡要圖示。zkConnector提供一些關(guān)于與zookeeper交互操作的API;FetchDataChunk只要是提供獲取主題數(shù)據(jù)的API;ConsumerConnector即實現(xiàn)kakfa-Consumer的主體部分,即用戶API的接口類,提供Consumer鏈接和主題(topic)數(shù)據(jù)訪問的接口。
Kafka中的offset用于描述消息在一個分區(qū)中的位置偏移量,依從一個分區(qū)內(nèi)的消息的達(dá)到順序遞增;同時,Kafka的Consumer利用消息的offset來記錄在一個topic中每個分區(qū)中的消費的水位線。在kakfa-Consumer中, 對于offset的處理是在ConsumerConnect建立連接的同時,開啟一個定時器,每隔一定時間(用戶可配置),就將現(xiàn)在用戶consumer的在每個分區(qū)的offset記錄到zookeeper中;因此,每次consumer啟動的時候都會先從zookeeper中讀取記錄其中的offset,作為這次消費的起始點。
以上,就是Kafka原生的Consumer的基本設(shè)計原理,下面我們闡述一下他的缺陷,以及會造成的問題。
2.2 非可靠性的缺陷
由上述kakfa-Consumer的設(shè)計原理,標(biāo)記Consumer消費水位的offset的記錄是跟用戶對數(shù)據(jù)消費和處理是分離的。考慮如下場景,例如用戶的Consumer程序由于種種原因(程序異常、主機(jī)宕機(jī)、JVM異常、錯誤操作等)異常退出,此時用戶Consumer在異常退出前消費的數(shù)據(jù),就很有可能恰好處于ConsumerConnect中記錄offset的定時器的運行周期,使得退出是丟失的數(shù)據(jù)的offset被記錄到了zookeepero這樣,當(dāng)應(yīng)用重新啟動,向zookeeper同步offset的時候,就會拿到錯誤的偏移量,導(dǎo)致數(shù)據(jù)的丟失,使得數(shù)據(jù)不可靠。
2.3 本文設(shè)計方案的創(chuàng)新性
本文由于生產(chǎn)業(yè)務(wù)對數(shù)據(jù)質(zhì)量的需求,摒棄了Kafka提供的不可靠的High-Ievel接口集,而采用Kafka內(nèi)部底層的low-level的接口集,即只使用Kakfa獲取數(shù)據(jù)的接口,不使用原生的的對于offset的維護(hù)服務(wù)。本文的設(shè)計方案重新封裝了kafka的消息結(jié)構(gòu),并且利用zookeeper自行構(gòu)造了保存offset的結(jié)構(gòu)和方式,定義了用戶獲取數(shù)據(jù)的接口以及用戶提交offset的接口,使得用戶的數(shù)據(jù)消費行為與offset提交的行為朕動起來,保障了數(shù)據(jù)的可靠性。具體方案的設(shè)計原理會在下面章節(jié)詳細(xì)闡述。
3 可靠的Consumer設(shè)計
3.1 可靠性的定義和條件
3.1.1 Consumer可靠性和本文的選型
對于Consumer的讀取數(shù)據(jù)的可靠性有如下三種可達(dá)標(biāo)準(zhǔn):
l.At most once消息可能會丟,但絕不會重復(fù)傳輸
2.At least one消息絕不會丟,但可能會重復(fù)傳輸
3.Exactly once每條消息肯定會被傳輸一次且僅傳輸一次
對于Kafka的原生Consumer,實現(xiàn)的Consumer屬于第一種可靠性。在Kafka中是通過對于offset的保持,來控制數(shù)據(jù)的消費位置,即數(shù)據(jù)消費水位線。在Kafka的原生Consumer對于數(shù)據(jù)消費和數(shù)據(jù)水位線(offset)的保持是分離的。即在系統(tǒng)出現(xiàn)異常退出的時候,如果Consumer已經(jīng)將數(shù)據(jù)消費,但是并未提交offset,當(dāng)系統(tǒng)恢復(fù)重啟時,同步上次記錄的水位線時,就回讀取到較早提交的offset,就會造成數(shù)據(jù)的重復(fù)消費;如果Consumer還未來得及消費數(shù)據(jù),但是offset已經(jīng)提交,在下次系統(tǒng)恢復(fù)重啟,就回讀取到錯誤的水位線,導(dǎo)致一部分?jǐn)?shù)據(jù)無法被消費而丟失。
這種模式下,即Consumer的數(shù)據(jù)消費與offset的不同步,造成系統(tǒng)故障后可能丟失數(shù)據(jù)也可能重復(fù)讀取數(shù)據(jù),這就對應(yīng)于At most once的可靠性。
本文要是現(xiàn)實的就是第二種可靠性,即At leastonce。將Consumer的數(shù)據(jù)消費與offset的提交同步起來,即Consumer在讀完消息先處理再commit。這種模式下,如果在處理完消息之后commit之前Consumer宕機(jī)了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經(jīng)被處理過了。這就對應(yīng)于At least once。
由上面的描述,我們看出這種Consumer讀取的可靠性,有可能會導(dǎo)致讀取后的數(shù)據(jù)有重復(fù)的情況,這種情況很好解決。由于,在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認(rèn)為是Exactly once;如果數(shù)據(jù)中沒有主鍵,我們也可以人為的在producer端對每一條數(shù)據(jù)加入一個唯一的ID作為主見,而后在Consumer后端的業(yè)務(wù)端進(jìn)行去重,就能夠?qū)崿F(xiàn)Exactly once。
如果一定要做到Exactly once,就需要協(xié)調(diào)offset和實際操作的輸出。經(jīng)調(diào)研發(fā)現(xiàn)一般的做法有兩個:
1、引入兩階段提交。由于,許多輸出系統(tǒng)可能不支持兩階段提交,這種做法通用性很差,而且引入兩階段提交,這種類似同步的做法,會降低系統(tǒng)通用消息消費的性能,使吞吐大打折扣;
2、如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好。比如,Consumer拿到數(shù)據(jù)后可能把數(shù)據(jù)放到HDFS,如果把最新的offset和數(shù)據(jù)一起寫到HDFS,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都完成,要么都不完成,間接實現(xiàn)Exactly once。但是這種做法,也限制了Consumer端的輸出形式,并將業(yè)務(wù)和接口耦合在一起,是系統(tǒng)具有很差的擴(kuò)展性。
因此,綜合考慮了系統(tǒng)的性能和可擴(kuò)展性,以及通過后端數(shù)據(jù)再處理達(dá)到Exactly once可靠性的可實現(xiàn)性,本文選擇了實現(xiàn)能夠保障At least once可靠性的Consumer。
3.1.2 本文的可靠性設(shè)計的外部依賴條件
本文只設(shè)計并實現(xiàn)一個可靠的kakfa-consumer,只關(guān)注Consumer從broker拉取數(shù)據(jù)到處理完成數(shù)據(jù)輸出到業(yè)務(wù)層這段的數(shù)據(jù)可靠性,這就需要一些外部條件的保障:
1、假設(shè)producer是可靠的,即不會丟失數(shù)據(jù),能夠建數(shù)據(jù)源的數(shù)據(jù)不丟失的推送到broker;
2、假設(shè)broker是可靠的,不會有丟失數(shù)據(jù);不會有超過replication數(shù)目的broker不能夠提供服務(wù);
3、假設(shè)zookeeper是可靠的,能夠保障服務(wù)的提供以及數(shù)據(jù)的一致性。
3.2 Zookeeper技術(shù)
Zookeeper分布式服務(wù)框架是Apache Hadoop的一個子項目,它主要是用來解決分布式應(yīng)用中經(jīng)常遇到的一些數(shù)據(jù)管理問題,如:統(tǒng)一命名服務(wù)、狀態(tài)同步服務(wù)、集群管理、分布式應(yīng)用配置項的管理等。Zookeeper的典型的應(yīng)用場景:配置文件的管理、集群管理、同步鎖、Leader選舉、隊列管理等。
本文使用Zookeeper開源工具,保持offset數(shù)據(jù),利用zookeeper的數(shù)據(jù)一致性的特性,來保障offset的數(shù)據(jù)可靠性。
3.3 設(shè)計詳述
3.3.1 Consumer的模塊設(shè)計
本文的Consumer設(shè)計如圖5所示
ZKTools模塊,主要負(fù)責(zé)與zookeeper相關(guān)的交互操作,提供與Zookeeper進(jìn)行讀寫操作的相關(guān)操作的API;
ConsumerClient是主體模塊,主要負(fù)責(zé)針對主體的每個分區(qū)的數(shù)據(jù)讀取和可靠性維護(hù)的操作;
BatchMessage是對Kafka的原生的消息結(jié)構(gòu)進(jìn)行封裝,加入了該消息的offset和讀取該消息所在分區(qū)的ConsumerClient類的對象,為數(shù)據(jù)可靠性的實現(xiàn)提供支持,豐富了原生的Message結(jié)構(gòu)的功能;
ClientEngine是整個kafka-consumer的對用戶的接口模塊;提供了設(shè)置c onsumer鏈接,獲取數(shù)據(jù)讀取入口的API。
3.3.2 Consumer的詳細(xì)設(shè)計
圖6是kafka-consuemr的詳細(xì)設(shè)計的類圖,具體闡述了每一個模塊在代碼層面完成的功能:
ZkTools是一個單件類,提供與zookeeper相關(guān)的操作:
_init_ (confMap):構(gòu)造函數(shù),參數(shù)是用戶配置,配置主要包括zookeeper的ip和端口、要消費的主題名稱、consumer的群組名稱。構(gòu)造函數(shù)完成,與zookeeper的鏈接、初始化一些類變量。
setData (zkPath,offset):設(shè)置zkPath指定的zookeeper中的文件的內(nèi)容
getData (zkPath):獲取zkPath指定的zookeeper中的文件內(nèi)容
createPath (zkPath):創(chuàng)建zkPath指定的目錄
checkExist (zkPath):判斷zkPath是否存在
getPartitions (topic):獲取主題的所有分區(qū)編號的列表。
BatchMessage是原生Message的擴(kuò)展類,提供一些系列g(shù)et和set方法,是用戶完成消費動作的入口:
getThisOffset():獲取該消息的起始o(jì)ffset
setThisOffset():設(shè)置該條消息的offset
setConsumerClient 0:設(shè)置ConsumerClient對象,當(dāng)該條消息,被用戶讀取并執(zhí)行完處理邏輯,可以利用該對象,調(diào)用fnish()方法,完成對該消息的消費,從而提交該消息的偏移位置,只是保障消息不丟失,達(dá)到可靠性重要的一環(huán),即用戶每當(dāng)對一條消息完成用戶邏輯的時候就調(diào)用frnish (),這樣使得數(shù)據(jù)消費的偏移量和用戶處理邏輯能夠協(xié)同工作,保證數(shù)據(jù)的可靠性;
getsumerClient():獲取ConsumerClinet對象
ConsumerClient是整個kafka-Consumer的核心,完成主要的功能,ConsumerClient類,是Runable類的實現(xiàn)類,實現(xiàn)run()方法,是一個線程類,每個線程針對主題的某一個分區(qū)進(jìn)行處理:
_init_():構(gòu)造函數(shù),獲取用戶配置confMap、處理的分區(qū)編號partld_存放消息的共享隊列bq。構(gòu)造函數(shù)完成一系列的初始化工作:
1、與zookeeper建立連接
2、根據(jù)主題名稱、和分區(qū)編號獲取該主題的leader-broker的ip和port
3、根據(jù)主題名稱、分區(qū)編號、groupld獲取當(dāng)前最新的消費offset作為起始o(jì)ffset
4、初始化各種數(shù)據(jù)結(jié)構(gòu)
5、建立用于周期性提交offset的Timer
CommitOffset():是Timer定時器的定時調(diào)用函數(shù),周期性向zookeeper提交offSet。
利用offset的存儲結(jié)構(gòu)和提交策略保障可靠性
在ConsumerClient類中有一個排序的數(shù)據(jù)結(jié)構(gòu),對象名稱叫msgWait,是一個存放offset的有序列表。
另外,與msgWait,相關(guān)的是fnish()函數(shù),功能為從msgWait中刪除最小的的offset,而由于msgWait本身有序,即刪除第一個元素。
run()方法根據(jù)初始化的leader-broker的ip和port,利用Kafak底層Api向broker拉取數(shù)據(jù)(Message),并將數(shù)據(jù)、當(dāng)前ConsumerClient對象、該數(shù)據(jù)的起始o(jì)ffset,也即消費的curOffSet(當(dāng)前offset)構(gòu)造為BatchMessage對象,裝填到共享消息隊列bq里面;然后將curOffSet追加到msgWait中。
用戶讀取共享隊列中消息并執(zhí)行完處理邏輯,調(diào)用fnish()方法,將該消息的起始o(jì)ffset從msgWait中刪除。
CommitOffset()方法,是從msgWait中取出最小的offset.并將其提交到zookeeper()。也即如果用戶沒有處理完該消息,就不會調(diào)用finish()方法,那么CommitOffset就一直在提交上一條已經(jīng)消費完成的消息的偏移量;當(dāng)用戶消費完成后,調(diào)用了finish方法,將該條消息的起始位置的offset從msgWait中刪除,那么msgWait中最小的offset就是該消息的偏移量位置,就會在下一個周期被提交。本文的方案就是利用一個offset有序的結(jié)構(gòu)和finish的方法,將用戶的處理邏輯和Consumer對于offset的提交,聯(lián)系到了一起,確保只有當(dāng)用戶處理完成數(shù)據(jù)后,才會提交消息的offset,從而保障數(shù)據(jù)的可靠性。
getLeader():獲取該分區(qū)的le ader-broker的的ip和port。
getLeaderAfterElection():前文提過, 當(dāng)leader-broker異常時,kafka會采用某種選舉方式,重新選舉leader-broker,但是這個過程不是原子的,會產(chǎn)生獲取數(shù)據(jù)失敗的情況,該函數(shù)就是在獲取數(shù)據(jù)失敗的情況下,重新獲取選舉后的leader-broker。
getLastestOffset():獲取該分區(qū)最新的offset。
ClientEngine是kafka-consumer提供給用戶的入口類,主要完成ConsumerClient現(xiàn)成的啟動,提供消費數(shù)據(jù)接口:
init():構(gòu)造函數(shù),獲取用戶配置、初始化共享消息隊列bq、向zookeeper獲取主題的分區(qū)編號的列表:
Start():根據(jù)分區(qū)列表,啟動ConsumerClient線程
getConsumerlterator():返回共享消息隊列的迭代器,作為用戶消費數(shù)據(jù)的入口。
4 方案驗證
4.1 測試環(huán)境
測試環(huán)境采用實驗室的pc機(jī)進(jìn)行測試。機(jī)器配置如表1所示,
測試主機(jī)有三臺,組成kaika和zookeeper的測試集群,三臺主機(jī)網(wǎng)絡(luò)配置信息如下,
4.2 測試用例和結(jié)果
由于篇幅有限,上表實例性的展示了10次測試的對比結(jié)果,為了清晰的對比展現(xiàn)測試結(jié)果,見如下對比圖,圖7中,紅線代表原生Consumer在測試用例下的結(jié)果數(shù)據(jù),藍(lán)線代表本文設(shè)計方案下的Consumer的結(jié)果數(shù)據(jù),黑線代表著原始數(shù)據(jù)的記錄條數(shù)。由圖我們可以清楚的看出,原生Consumer無法保證數(shù)據(jù)的可靠性,時而多數(shù)據(jù),時而缺失數(shù)據(jù);而本文實現(xiàn)的可保證At least once可靠性的Consumer的線圖一直在黑線之上,表明本文的設(shè)計方案下數(shù)據(jù)在系統(tǒng)意外宕機(jī)時不會缺失,能夠保障At least once的可靠性,從而證明了本文設(shè)計方案的可行性和正確性。
5 結(jié)論
隨著互聯(lián)網(wǎng)的飛速發(fā)展,新的業(yè)務(wù)對數(shù)據(jù)處理的實時性、高并發(fā)、高吞吐的要求在不斷的提高。然而,數(shù)據(jù)的可靠性也是十分重要的一環(huán)。本文基于Kafka分布式消息隊列,提出了一種可靠的Consumer的設(shè)計方案,保障了數(shù)據(jù)的可靠性,能夠在業(yè)務(wù)端保證數(shù)據(jù)冪等的條件下,達(dá)到數(shù)據(jù)不會丟失也不重復(fù)的效果。然而,本文實現(xiàn)的Consumer對于可靠性的保障是有局限性的、并且對于主題分區(qū)較多的情況效率也會下降。所以,筆者也會不斷的學(xué)習(xí),為了使用信息社會的瞬息萬變,需要不斷地變革和創(chuàng)新,才能為社會創(chuàng)造更好的互聯(lián)網(wǎng)服務(wù)。