摘 ?要:Apache Kafka作為一種分布式的消息隊列中間件,由于其具有高可靠性、高吞吐量、可持久化、可擴展性好等特點。在大數(shù)據(jù)項目中,如日志聚合、流數(shù)據(jù)處理等應(yīng)用場景中被廣泛使用。由于Kafka的消息需要持久化到磁盤中,磁盤故障會影響Kafka的使用,嚴重時會造成數(shù)據(jù)丟失。所以基于Kafka的存儲特性,通過復(fù)盤和分析由于磁盤問題導(dǎo)致的Kafka集群故障,提出了一系列的磁盤故障處理方法,從而縮短Kafka集群故障的恢復(fù)時間。
關(guān)鍵詞:Kafka;分布式;消息隊列;磁盤故障;處理方法
中圖分類號:TQ587.22;TP309.3 ? ? 文獻標(biāo)識碼:A 文章編號:2096-4706(2020)13-0148-03
Abstract:Apache Kafka is a distributed middleware used for message queue. It has merits of high reliability,high throughput,data persistence,good scalability,and therefore has be widely used in big data project such as log aggregation,streaming data processing and so on. The messages of Kafka are persisted to disk,so Kafka is not work when its disk malfunction. Some severe cases may result in subsequent loss of data. Therefore,based on the storage characteristics of Kafka,this paper proposes a series of methods to deal with the failure of Kafka cluster through the re-disk and analysis of Kafka cluster failure caused by disk problems,so as to shorten the recovery time of Kafka cluster failure.
Keywords:Kafka;distributed;message queue;disk malfunction;solution
0 ?引 ?言
Apache Kafka[1],最初由LinkedIn公司開發(fā),并于2011年開源[2]。2012年被孵化成為Apache軟件基金會頂級項目。如今,Kafka應(yīng)用于眾多大數(shù)據(jù)項目中,很多互聯(lián)網(wǎng)公司也在自己的生產(chǎn)環(huán)境中將Kafka作為消息中間件使用。
1 ?Kafka組件及架構(gòu)
Kafka作為一種分布式的消息隊列中間件,部署多采用若干節(jié)點構(gòu)成集群的方式。在這個Kafka集群中,每個節(jié)點被稱作Broker,可以理解為Kafka提供服務(wù)的一個實例。在消息(message)隊列系統(tǒng)中,通常都會有生產(chǎn)者(Producer)發(fā)送消息,消費者(Consumer)消費消息,這樣就構(gòu)成了一個消息“流水線”的上下游,如圖1所示。每條被Producer發(fā)布到Kafka集群的消息都屬于一個Topic。
2 ?Kafka中的文件存儲介紹
Topic經(jīng)過Producer發(fā)布到Kafka集群中,這條Topic會根據(jù)配置被劃分為多個分區(qū)(Partition),這些分區(qū)又會被均勻地分布到Kafka集群所有的Broker節(jié)點上。這樣做可以通過增加分區(qū)的數(shù)量來橫向增加Topic的存儲數(shù)據(jù)量,并且均勻分布也可以起到負載均衡的作用。
在存儲層面,任何發(fā)布到此分區(qū)的消息都會被追加(append)到數(shù)據(jù)文件的尾部,文件以“.log”為后綴。消息被追加到分區(qū)中因為是順序?qū)懭耄╳rite)磁盤的,因此效率非常高。如圖2所示,圖中不同顏色的數(shù)據(jù)文件對應(yīng)的是不同的分區(qū)數(shù)據(jù),append操作正在寫入對應(yīng)虛線數(shù)據(jù)文件。
除了log文件,分區(qū)中還有一個以“.index”為后綴的索引文件,它們共同組成段(Segment)文件。在分區(qū)中會存在多個段文件,它們大小相等,但其中包含的消息數(shù)不一定相等。這種特性方便舊的段文件可以被快速刪除,這樣可以清理空間供新的消息進行存儲,提高磁盤利用率。
作為分布式系統(tǒng),Kafka在設(shè)計上也充分考慮了高可用,從Broker的多節(jié)點到Topic的多副本。Topic的副本機制則是通過分區(qū)的副本實現(xiàn)的,被稱為Replica,即在另一個或多個Broker節(jié)點上存在這個分區(qū)的副本。
3 ?故障復(fù)盤與分析
在公司某生產(chǎn)環(huán)境里的Kafka集群中,一個Broker節(jié)點的磁盤發(fā)生故障,導(dǎo)致這個Broker節(jié)點的進程退出[3],進而影響了Kafka中的某一個Topic的正常使用。
如果啟用副本,Kafka至少不會因為單個節(jié)點不能對外服務(wù)而發(fā)生Topic不能正常使用的情況,這就是Topic的高可用性。本次故障影響使用的主要原因就是Topic沒有設(shè)置副本,采用系統(tǒng)默認值1。在Broker節(jié)點發(fā)生磁盤故障停止服務(wù)時,由于這個Topic在故障Broker的分區(qū)沒有可以使用的副本,導(dǎo)致了此Topic不能正常寫入和消費數(shù)據(jù)的問題。
當(dāng)發(fā)生磁盤故障,通常快速恢復(fù)Kafka服務(wù)的方法就是修改Kafka的server.properties配置中l(wèi)og.dirs參數(shù),將故障磁盤從配置中刪除,Broker就可以啟動了。Broker啟動之后,節(jié)點上故障磁盤的分區(qū)會在此Broker的其他磁盤中創(chuàng)建。但對于這次的Kafka故障還遇到了下文提到的兩種意外情況。
3.1 ?啟動時觸發(fā)了特定版本Kafka的bug
啟動Broker時,日志出現(xiàn)異常報錯,顯示讀取index文件損壞,不能啟動,如圖3所示。遇到這種問題時一般是刪除拋出異常的index文件。
index文件存放的元數(shù)據(jù)指向?qū)?yīng)的log文件中消息的物理偏移地址,如圖4所示。
那為什么index會發(fā)生損壞呢?這是因為index文件是一個索引文件映射,它不會對每條消息都建立索引,而是間隔indexIntervalBytes大小之后才寫入一條索引條目,所以是一個稀疏文件。Kafka運行時會創(chuàng)建一個log.index.size.max.bytes大小的index文件,向其中寫入稀疏索引,內(nèi)容達到閾值后會進行滾動覆蓋。根據(jù)社區(qū)jira的內(nèi)容[4],在Kafka非正常退出后會出現(xiàn)index損壞的情況,而在0.8及以前版本,Kafka在讀取這個損壞的index文件后會出現(xiàn)報錯退出無法啟動的問題,在0.9版本中對此問題進行了修復(fù)[5],處理的邏輯是自動清理這個文件后重建,不拋出異常。
3.2 ?轉(zhuǎn)移的分區(qū)將磁盤空間寫滿
將故障磁盤從配置文件中刪除后重新啟動Broker,故障磁盤中所有Topic的分區(qū)副本會在剩余磁盤中重新創(chuàng)建,并同步消息數(shù)據(jù),此時出現(xiàn)了多個大數(shù)據(jù)量的分區(qū)副本被放入同一個磁盤中,導(dǎo)致磁盤空間被迅速寫滿。這種情況下,就不能再使用剔除磁盤的方法了。緊急處理時可以采取縮短Topic的保存時間,從物理上減小Topic數(shù)據(jù)大小,然后分階段刪除磁盤中過期數(shù)據(jù),最后重啟Broker節(jié)點恢復(fù)。
4 ?實驗驗證
對于磁盤故障這類服務(wù)器常見問題,如何能將故障對Kafka集群的影響減少至最低是研究的重點。對此總結(jié)了如下的恢復(fù)步驟,并通過實驗進行了驗證,可供參考使用。
4.1 ?緊急恢復(fù)故障時可以剔除故障磁盤后重啟
Step1:刪除Kafka配置文件config/server.properties中損壞的磁盤(例如data2為故障磁盤,原配置為:“l(fā)og.dirs=/data0/Kafka-logs,/data1/Kafka-logs,/data2/Kafka-logs,/data3/Kafka-logs...”,更改后配置為:“l(fā)og.dirs=/data0/Kafka-logs,/ data1/Kafka-logs,/data3/Kafka-logs...”)。
Step2:重啟Kafka進程。
結(jié)果:原“/data2/Kafka-logs,”目錄下的分區(qū)會被重新分配到當(dāng)前Broker的其他磁盤上。
影響:會產(chǎn)生數(shù)據(jù)傾斜的情況,大數(shù)據(jù)量的分區(qū)疊加到同一個磁盤,可能造成個別磁盤被寫滿。
4.2 ?最小化影響恢復(fù)故障
集群可允許一個Broker下線時,可暫不重啟Kafka進程,待磁盤更換完成后直接重啟Kafka進程。
如果當(dāng)前Broker有多余的一塊磁盤作備盤。當(dāng)Kafka進程下線時,修改配置文件config/server.properties(例如data2為故障磁盤,data9為備盤,原配置為:“l(fā)og.dirs=/data0/Kafka-logs,/data1/Kafka-logs,/data2/Kafka-logs,/data3/Kafka-logs...”,替換后配置為:“l(fā)og.dirs=/data0/Kafka-logs,/data1/Kafka-logs,/data9/Kafka-logs,/data3/Kafka-logs...”,用data9替換data2),直接啟動Kafka進程。之后在最近的系統(tǒng)維護周期時間點更換壞盤。
4.3 ?具體實驗驗證過程
環(huán)境:測試集群共3個Broker(Broker1至Broker3),每個主機上掛載5塊磁盤(data0至data4)。Kafka配置文件config/server.properties配置了4塊磁盤,即data0、data1、data2、data3,data4作為備盤。
Step1:新建測試Topic為testKafka,配置分區(qū)為12,副本數(shù)為2。此時分區(qū)均勻分布,每個Broker中8個分區(qū)。
Step2:測試Topic testKafka,進行正常的信息生產(chǎn)和消費,此時查看Broker3中data3目錄下面文件,存在2個分區(qū),如圖5所示。
Step3:直接刪除data3目錄,模擬磁盤故障,Kafka進程退出。此時,修改Kafka配置文件config/server.properties,將data3換成data4,即四塊磁盤變成了data0、data1、data2、data4。
Step4:上述步驟完成后,重啟Broker3服務(wù),此時會發(fā)現(xiàn)消費Topic數(shù)據(jù)時會有短暫告警打印,后續(xù)恢復(fù)正常。
結(jié)果:磁盤data3中的2個分區(qū)轉(zhuǎn)移到備用磁盤data4中,如圖6所示。
5 ?結(jié) ?論
本文描述了在磁盤損壞后導(dǎo)致Kafka集群出現(xiàn)的幾種異常情況,提出了在這些情況下的幾種故障處理方法,并通過實驗進行模擬驗證。這些方法可以應(yīng)用于日常運維Kafka集群的工作中,有效提高了Kafka集群可用性,為避免數(shù)據(jù)丟失提供了參考方案。
參考文獻:
[1] KREPS J,NARKHEDE N,RAOJ.Kafka:Adistributied messaging system for log processing [C]//Proceedings of the NetDB11.[S.l.:s.n.],2012:129-140.
[2] GOODHOPE K,KOSHY J,KREPS J,et a1. Building LinkedIns Real—time Activity Data Pipeline [J].Data Engineering,2012,35:33-45.
[3] ASF JIRA. Shutdown Kafka when there is any disk IO error [EB/OL].(2011-07-19).https://issues.apache.org/jira/browse/KAFKA-55.
[4] ACHANTA V S. Corrupt index after safe shutdown and restart [EB/OL].(2014-11-20).https://issues.apache.org/jira/browse/KAFKA-1791.
[5] PALINO T. Broker should automatically handle corrupt index files [EB/OL].(2015-03-09).https://issues.apache.org/jira/browse/KAFKA-2012.
作者簡介:汪濤(1990—),男,漢族,江西九江人,工程師,碩士,研究方向:系統(tǒng)運維。