楊冬暉
摘要:現(xiàn)如今,互聯(lián)網(wǎng)已經(jīng)滲透到日常生活的方方面面。生活的各個(gè)領(lǐng)域都迎來(lái)大數(shù)據(jù)的影響,數(shù)據(jù)傳輸過(guò)程的可靠性,以及如何有效地使用數(shù)據(jù)尤為重要。分布式消息系統(tǒng)可以有效地解決大規(guī)模分布式系統(tǒng)中消息傳遞問(wèn)題。因此需要一個(gè)高吞吐量,高性能以及具有一定可靠性的分布式消息系統(tǒng)。Kafka是一個(gè)處理海量數(shù)據(jù)的分布式消息系統(tǒng)。[1]Kafka具有高效的數(shù)據(jù)傳輸速率,相對(duì)于其他的消息隊(duì)列系統(tǒng)具有較高的性能,采用發(fā)布/訂閱模式。該文主要總結(jié)介紹了kafka系統(tǒng)的架構(gòu)以及特征,重點(diǎn)介紹了分布式集群下kafka如何通過(guò)副本模式保證其消息的可靠傳輸。并通過(guò)實(shí)驗(yàn)驗(yàn)證其副本模式的可靠性,以支持后續(xù)相關(guān)研究。
關(guān)鍵詞:分布式消息系統(tǒng);分布式系統(tǒng);卡夫卡;可靠性
中圖分類號(hào):TP311 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1009-3044(2015)21-0075-02
An Research on the Reliability of a Distributed Message Queue
YANG Dong-hui
(School of Software Engineering, Tongji University, Shanghai 201804, China)
Abstract:Nowadays, the Internet has penetrated into every aspect of our daily lives. Every field of social life have the impact of big data transmission, and how to effectively use the data is particularly important. Distributed information system can effectively solve the problem of transmission information in large scale distributed systems. So we need a high throughput, high performance and distributed information system with a certain reliability. Kafka is a distributed information system with a massive data processing. Kafka has efficient data transfer rate, and compared with other message queuing the system has higher performance. Using the publish / subscribe model. This paper mainly introduces the Kafka system structure and characteristics, introduced the Kafka how to ensure reliable transmission of messages with replica. And ensure reliability by experiments, in order to support the follow-up study.
Key words:distributed messaging system; distributed system; Kafka; reliability
在現(xiàn)如今生活中,網(wǎng)上購(gòu)物已成為日常生活的一部分,電商的發(fā)展規(guī)模越來(lái)越龐大。大型互聯(lián)網(wǎng)電商公司的業(yè)務(wù)越來(lái)越復(fù)雜,不同業(yè)務(wù)需要不同的子系統(tǒng)進(jìn)行支持,每個(gè)子系統(tǒng)之間的通信需要依靠消息系統(tǒng)支持,這就進(jìn)一步加大了對(duì)消息系統(tǒng)的依賴。當(dāng)消息系統(tǒng)出現(xiàn)故障,消息不能按時(shí)傳遞、消息丟失等等都會(huì)造成不可估量的后果,公司可能造成重大損失。因此分布式系統(tǒng)中的消息通信系統(tǒng)的可靠性尤為重要。
分布式系統(tǒng)[2,11]是指分散的物理機(jī)通過(guò)互聯(lián)網(wǎng)連接建立起的一套軟件系統(tǒng),分布式系統(tǒng)具有高度的內(nèi)聚性和透明性。分布式系統(tǒng)中數(shù)據(jù)的傳輸需要面臨比傳統(tǒng)系統(tǒng)更多的挑戰(zhàn)。如何保證分布式系統(tǒng)中大量,高速以及可靠的數(shù)據(jù)傳輸是分布式消息隊(duì)列需要解決的問(wèn)題。消息隊(duì)列是操作系統(tǒng)的進(jìn)程之間用于通信的一種機(jī)制,兩個(gè)或多個(gè)進(jìn)程間通過(guò)訪問(wèn)共同的消息隊(duì)列完成消息的交換。在分布式環(huán)境下分布式的消息隊(duì)列能在客戶端和服務(wù)端提供同步和異步的連接,實(shí)現(xiàn)應(yīng)用程序之間的協(xié)同。
1 KAFKA可靠性研究
1.1 相關(guān)背景
在分布式消息隊(duì)列[3]發(fā)展初期,消息傳遞通常采用點(diǎn)對(duì)點(diǎn)的傳輸結(jié)構(gòu),發(fā)送方需要事先指明接收方的地址,雖然消息的接收方和發(fā)送方是松耦合連接的,不必保持相互之間通信的同步。但是由于消息在傳遞過(guò)程中需要綁定接收方的地址,系統(tǒng)不夠靈活難以擴(kuò)展。由于以上的原因分布式消息系統(tǒng)向發(fā)布/訂閱[4]模式轉(zhuǎn)變,發(fā)布/訂閱模式具有異步松耦合和多對(duì)多通信的特點(diǎn),此模式適應(yīng)目前大多數(shù)企業(yè)分布式計(jì)算環(huán)境的要求。發(fā)布/訂閱模式下發(fā)送消息的一方稱為發(fā)布者,接收消息的一方稱為訂閱者。消息不再被發(fā)送到指定的接收者中而是轉(zhuǎn)而發(fā)送給一個(gè)中間的消息代理服務(wù)器。訂閱者只需去服務(wù)器中接收自己感興趣的消息,發(fā)布者不需要知道何種訂閱者接收了發(fā)布的消息。這種發(fā)布/訂閱的模式更易于擴(kuò)展。常見(jiàn)的發(fā)布/訂閱消息隊(duì)列包括Kafka[5]、RabbitMQ[6]、ActiveMQ[7]以及Microsoft MSMQ等。其中RabbitMQ是使用Erlang語(yǔ)言編寫(xiě)的一個(gè)開(kāi)源的消息隊(duì)列,是AMQP協(xié)議的一個(gè)實(shí)現(xiàn)。它實(shí)現(xiàn)了代理(Broker)架構(gòu),使得消息在發(fā)送到客戶端之前可以在中央節(jié)點(diǎn)上排隊(duì)。
1.2 Kafka結(jié)構(gòu)
Kafka由Linkedin公司開(kāi)發(fā),并使用Scala語(yǔ)言編寫(xiě)。Kafka是一個(gè)分布式的、分區(qū)的、多副本的、多訂閱者的日志提交系統(tǒng)[9]。Kafka系統(tǒng)包括消息的生產(chǎn)者(producer),消息的消費(fèi)者(consumer),消息代理者(broker)和管理者(zookeeper) [8]四個(gè)部分。生產(chǎn)者生產(chǎn)的消息被放在topic中。一個(gè)topic中可以設(shè)置多個(gè)partition,每一個(gè)partition可以對(duì)應(yīng)一個(gè)消費(fèi)者被消費(fèi)。當(dāng)設(shè)置了多個(gè)partition,生產(chǎn)者在生產(chǎn)消息的時(shí),需指定對(duì)應(yīng)的partition,如果沒(méi)有指定,則會(huì)使用默認(rèn)值。每個(gè)partition是一個(gè)有序的、不可變的消息序列,這個(gè)序列可以被連續(xù)地追加。每條消息會(huì)有一個(gè)序列號(hào),在文件中的位置被稱為offset(偏移量),offset為一個(gè)long型數(shù)字,唯一標(biāo)識(shí)一條消息。生產(chǎn)者生產(chǎn)消息,產(chǎn)生的消息發(fā)送到broker,等待消費(fèi)者的接收。消息在broker中根據(jù)topic和partition來(lái)區(qū)分不同。消費(fèi)者根據(jù)topic和partition的值從broker中訂閱消息。消費(fèi)者在想要訂閱消息時(shí)向broker端發(fā)送請(qǐng)求,告知其topic和partition值來(lái)獲取消息。消費(fèi)者可以決定從partition何處開(kāi)始消費(fèi),并通過(guò)重置offset的值來(lái)重新消費(fèi)已經(jīng)消費(fèi)的消息。在Kafka中消費(fèi)者在和broker建立連接后,主動(dòng)拉取消息。生產(chǎn)者和消費(fèi)者之間消息的傳遞可以用圖1所示[9]。
管理者zookeeper在其中進(jìn)行協(xié)調(diào)控制,管理broker和consumer的動(dòng)態(tài)加入和離開(kāi),維護(hù)了生產(chǎn)者和消費(fèi)者之間的關(guān)系和topic中的信息,并提供一定的負(fù)載均衡支持。Kafka server為生產(chǎn)者消費(fèi)者提供服務(wù),當(dāng)生產(chǎn)者和消費(fèi)者運(yùn)行時(shí),都需要配置相應(yīng)的zookeeper信息。Zookeeper服務(wù)器主要功能為與kafka服務(wù)器進(jìn)行交互,zookeeper服務(wù)器對(duì)kafka服務(wù)器進(jìn)行簡(jiǎn)單的管理。每個(gè)broker啟動(dòng)后會(huì)在zookeeper上臨時(shí)注冊(cè),注冊(cè)信息包含broker的ip地址和端口號(hào),broker上存在的topic和此topic的partition。在kafka中producer可以將生產(chǎn)的消息指定發(fā)送到某一個(gè)具體的partition。相對(duì)應(yīng)的對(duì)于consumer來(lái)說(shuō),每一個(gè)consumer屬于一個(gè)消費(fèi)者集群,每個(gè)group中的consumer消費(fèi)消息相互獨(dú)立。
1.3 Kafka可靠性研究
Kafka消息隊(duì)列為分布式系統(tǒng)提供一定的可靠性保證,針對(duì)生產(chǎn)者生產(chǎn)的消息,在集群模式下,kafka可以將每一個(gè)partition中的數(shù)據(jù)復(fù)制到多個(gè)kafka server中。當(dāng)復(fù)制為多個(gè)副本時(shí),每個(gè)partition會(huì)由zookeeper指定一個(gè)leader節(jié)點(diǎn)和多個(gè)follower節(jié)點(diǎn)(當(dāng)副本數(shù)多于2個(gè))。備份的個(gè)數(shù)可以通過(guò)修改broker配置文件或者手動(dòng)配置更改。Follower節(jié)點(diǎn)需要和leader節(jié)點(diǎn)保持同步。其中l(wèi)eader節(jié)點(diǎn)負(fù)責(zé)處理讀寫(xiě)請(qǐng)求,leader節(jié)點(diǎn)還需要監(jiān)控它所控制的所有follower節(jié)點(diǎn)的運(yùn)行狀態(tài),當(dāng)follower節(jié)點(diǎn)與leader節(jié)點(diǎn)嚴(yán)重不一致或者節(jié)點(diǎn)失效,leader節(jié)點(diǎn)會(huì)刪除此follower節(jié)點(diǎn)。當(dāng)leader節(jié)點(diǎn)失效時(shí),通過(guò)選舉將在存活的follower節(jié)點(diǎn)中重新選舉出新的leader節(jié)點(diǎn),新的leader節(jié)點(diǎn)替代失效leader節(jié)點(diǎn)繼續(xù)負(fù)責(zé)監(jiān)控整個(gè)集群運(yùn)行。因此當(dāng)分布式集群中某一個(gè)業(yè)務(wù)節(jié)點(diǎn)失效時(shí),只要還有一個(gè)節(jié)點(diǎn)存活,即還有一個(gè)partition的副本。此消息都可以進(jìn)行正常發(fā)送和接收[10]。
Kafka集群的可靠性通過(guò)partition的多副本方式得到提高。在kafka集群的運(yùn)行過(guò)程中,每一個(gè)kafka server上可以運(yùn)行多個(gè)生產(chǎn)者或多個(gè)消費(fèi)者,kafka server啟動(dòng)時(shí)會(huì)在zookeeper上注冊(cè)相應(yīng)的信息。對(duì)相應(yīng)的partition保存了多個(gè)副本時(shí),當(dāng)其中某個(gè)kafka server發(fā)生故障,zookeeper會(huì)將失效的kafka server部署到集群中處于存活狀態(tài)的kafka server中去完成此操作后,分布式集群中消息通信會(huì)通過(guò)備用的kafka server進(jìn)行傳遞,由此保證消息通信服務(wù)不受影響。
Kafka集群的建立使得partition可以復(fù)制為多個(gè)副本,當(dāng)正在使用的kafka server因故障而停止工作時(shí),可以使用副本繼續(xù)進(jìn)行生產(chǎn)者消費(fèi)者的消息之間的傳遞,不需要重啟kafka server,保證了進(jìn)程的正常運(yùn)行。如圖2所示,當(dāng)kafka服務(wù)開(kāi)始時(shí),生產(chǎn)者與消費(fèi)者進(jìn)程正常運(yùn)行,經(jīng)過(guò)broker進(jìn)行消息的傳遞,生產(chǎn)者將消息推送給已訂閱的消費(fèi)者。Zookeeper server對(duì)kafka server進(jìn)行管理,本圖包含兩個(gè)kafka server所以topic最多保存了兩個(gè)副本。如圖中所示,正在提供服務(wù)的kafka server出現(xiàn)故障,zookeeper server選舉新的leader節(jié)點(diǎn)并調(diào)用其副本繼續(xù)提供服務(wù)。生產(chǎn)者和消費(fèi)者正常的消息傳遞會(huì)通過(guò)副本繼續(xù)進(jìn)行,消息不會(huì)丟失,保證了分布式集群中消息的可靠傳遞。
2 實(shí)驗(yàn)分析
通過(guò)實(shí)驗(yàn)驗(yàn)證分析kafka分布式集群的可靠性。實(shí)驗(yàn)環(huán)境為三臺(tái)kafka服務(wù)器與一臺(tái)zookeeper服務(wù)器。實(shí)驗(yàn)過(guò)程為:
1)啟動(dòng)zookeeper服務(wù)與3臺(tái)kafka服務(wù);
2)創(chuàng)建1個(gè)生產(chǎn)者與1個(gè)消費(fèi)者,并創(chuàng)建1個(gè)topic;
3)針對(duì)創(chuàng)建的topic將其設(shè)置為3個(gè)副本模式,并觀察哪一個(gè)節(jié)點(diǎn)為leader節(jié)點(diǎn),哪兩個(gè)節(jié)點(diǎn)為follower節(jié)點(diǎn);
4)生產(chǎn)者發(fā)送信息,手動(dòng)終止leader節(jié)點(diǎn)kafka服務(wù),觀察消費(fèi)者有無(wú)正常接收消息;
5)手動(dòng)終止還存活的kafka服務(wù)節(jié)點(diǎn)中的一個(gè),觀察消費(fèi)者是否可以正常接收消息;
6)手動(dòng)終止最后一個(gè)存活的kafka服務(wù)節(jié)點(diǎn),觀察消費(fèi)者是否可以正常接收消息。
通過(guò)上述實(shí)驗(yàn),得到實(shí)驗(yàn)結(jié)果如下:當(dāng)?shù)谝淮谓K止leader節(jié)點(diǎn)后,zookeeper會(huì)重新選舉出新的leader節(jié)點(diǎn),生產(chǎn)者發(fā)布消息后,訂閱此topic的消費(fèi)者仍可以正常的接收消息。當(dāng)終止的 服務(wù)節(jié)點(diǎn)不為leader節(jié)點(diǎn)時(shí),leader節(jié)點(diǎn)不會(huì)重新選舉,消費(fèi)者仍可接收訂閱的消息。當(dāng)終止所有kafka服務(wù)節(jié)點(diǎn)時(shí),生產(chǎn)者生產(chǎn)的消息無(wú)法發(fā)布到broker中,消費(fèi)者也無(wú)法從broker中訂閱消息。
通過(guò)上述實(shí)驗(yàn)結(jié)果,可以得出kafka分布式集群通過(guò)多副本,選取leader節(jié)點(diǎn)對(duì)多個(gè)副本進(jìn)行管理的形式,保證集群消息傳遞具有一定的可靠性。
3 結(jié)束語(yǔ)
分布式消息隊(duì)列在如今互聯(lián)網(wǎng)企業(yè)得到越來(lái)越多的應(yīng)用。Kafka為分布式集群中消息傳遞提供了一定的可靠性保證。本文分析了kafka的基本結(jié)構(gòu),并對(duì)其副本模式進(jìn)行了分析。實(shí)驗(yàn)驗(yàn)證了其副本模式下生產(chǎn)者消費(fèi)者消息的傳遞是否能容忍一定的故障。為后續(xù)深入研究kafka提供前期支持。
參考文獻(xiàn):
[1] Goodhope K K G, Goodhope K. Building LinkedIn's Real-time Activity Data Pipeline[J]. Bulletin of the Technical Committee on Data Engineering, 2012(2).
[2] 胡華平, 金士堯, 王召福. 分布式系統(tǒng)的可信性研究[J]. 計(jì)算機(jī)工程與科學(xué), 1998(1): 48-53.
[3] Patel D, Khasib F, Sadooghi I, et al. Towards In-Order and Exactly-Once Delivery Using Hierarchical Distributed Message Queues[C]//IEEE/ACM International Symposium on Cluster, Cloud and Grid Computing. IEEE, 2014:883-892.
[4] 馬建剛, 黃濤, 汪錦嶺, 等. 面向大規(guī)模分布式計(jì)算發(fā)布訂閱系統(tǒng)核心技術(shù)[J]. 軟件學(xué)報(bào), 2006, 17(1): 134-147.
[5] Jay Kreps, Neha Narkhede, Jun Rao. Kafka: a Distributed Messaging System for Log Processing[Z].2011.
[6] 袁佳, 郭燕慧. 基于rabbitmq的海量日志的分布式處理[J]. 軟件, 2013(7): 19-23.
[7] 戴俊, 朱曉民. 基于 ActiveMQ 的異步消息總線的設(shè)計(jì)與實(shí)現(xiàn)[J]. 計(jì)算機(jī)系統(tǒng)應(yīng)用, 2010(8): 254-257.
[8] Okorafor E, Patrick M K. Availability of Jobtracker machine in hadoop/mapreduce zookeeper coordinated clusters[J]. Advanced Computing: An International Journal (ACIJ), 2012, 3(3): 19-30.
[9] Apacher Kafka[EB/OL]. http://kafka.apache.org/.
[10] Chen J, Arumaithurai M, Fu X, et al. Reliable publish/subscribe in content-centric networks[C]//Proceedings of the 3rd ACM SIGCOMM workshop on Information-centric networking. ACM, 2013: 21-26.
[11] 馬浩然. 基于NS3的分布式消息系統(tǒng)Kafka的仿真實(shí)現(xiàn)[J]. 軟件, 2015(1).