樊春美,朱建生,單杏花,楊立鵬,李 雯
(中國(guó)鐵道科學(xué)研究院,北京 100081)
隨著鐵路售票系統(tǒng)業(yè)務(wù)的多樣化,來(lái)自各個(gè)業(yè)務(wù)系統(tǒng)的日志解析變得越來(lái)越復(fù)雜。對(duì)于日志數(shù)據(jù)進(jìn)行實(shí)時(shí)的采集和解析是當(dāng)前需要解決的重要問(wèn)題。鑒于大數(shù)據(jù)的應(yīng)用比較廣泛,尤其是分布式的架構(gòu)和實(shí)時(shí)計(jì)算框架,如Flink[1]、Spark[2]計(jì)算平臺(tái)變得越來(lái)越熱門(mén),因此現(xiàn)在大部分互聯(lián)網(wǎng)公司都會(huì)采用實(shí)時(shí)計(jì)算來(lái)完成日志的采集和存儲(chǔ)。而對(duì)于復(fù)雜的鐵路售票系統(tǒng),不僅需要針對(duì)不同服務(wù)層進(jìn)行日志的采集,還需要對(duì)不同業(yè)務(wù)場(chǎng)景的不同類(lèi)型的日志進(jìn)行解析,為了業(yè)務(wù)數(shù)據(jù)分析的需要,可能同一份數(shù)據(jù)還要存儲(chǔ)到不同的渠道中;與此同時(shí)系統(tǒng)會(huì)隨著生產(chǎn)的需求而不斷完善,因此也會(huì)導(dǎo)致日志存儲(chǔ)格式的變動(dòng),從而需要不停地調(diào)整解析程序來(lái)適應(yīng)新的需求。在每一次變動(dòng)中,都需要測(cè)試整套解析程序,開(kāi)發(fā)效率較慢,甚至對(duì)于一個(gè)數(shù)據(jù)流的解析要啟動(dòng)多個(gè)程序,造成資源的浪費(fèi),維護(hù)多個(gè)程序也變得更加復(fù)雜。于是可以采用現(xiàn)在常用的改變配置文件的方式來(lái)實(shí)時(shí)更新解析相關(guān)的配置。但是分布式的計(jì)算框架刷新配置文件存在著兩個(gè)問(wèn)題,一個(gè)是配置文件的存放問(wèn)題,一般分布式的計(jì)算框架,真正的計(jì)算程序是在每個(gè)節(jié)點(diǎn)上執(zhí)行的,配置文件如果在master節(jié)點(diǎn)上,而每個(gè)執(zhí)行節(jié)點(diǎn)是不能讀取master節(jié)點(diǎn)的配置文件的,如果每個(gè)執(zhí)行節(jié)點(diǎn)都放一份配置文件,也無(wú)法保證每個(gè)執(zhí)行節(jié)點(diǎn)的路徑是一致的,只要其中的一個(gè)節(jié)點(diǎn)路徑不一致就會(huì)導(dǎo)致程序出錯(cuò),無(wú)法執(zhí)行;另外一個(gè)問(wèn)題是,假設(shè)能夠保證每臺(tái)機(jī)器的路徑都是一致的,配置文件可以放在每臺(tái)機(jī)器上,這時(shí)程序每解析一條數(shù)據(jù)都要讀取一次配置文件,頻繁的讀取必然會(huì)影響解析的效率。由于對(duì)于分布式的計(jì)算框架刷新配置存在著上述兩個(gè)問(wèn)題,文中提出了一種通過(guò)自動(dòng)化流控制的方法,實(shí)現(xiàn)配置文件的實(shí)時(shí)刷新。
目前針對(duì)大型數(shù)據(jù)的分析框架主要有Hadoop、Spark、Flink等[3-5]。以Hadoop為代表的大數(shù)據(jù)技術(shù)的出現(xiàn),可以很好地解決大量靜態(tài)數(shù)據(jù)集的數(shù)據(jù)處理與分析,但是很多數(shù)據(jù)都是實(shí)時(shí)產(chǎn)生的,用戶(hù)希望可以實(shí)時(shí)地處理這些數(shù)據(jù),這就需要使用流計(jì)算技術(shù)來(lái)實(shí)時(shí)處理這些數(shù)據(jù),及時(shí)產(chǎn)出應(yīng)用價(jià)值。
而Apache Spark是專(zhuān)為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎,是一種與Hadoop相似的開(kāi)源集群計(jì)算環(huán)境,其擁有Hadoop MapReduce所具有的優(yōu)點(diǎn),但與MapReduce存在的不同是任務(wù)中間輸出結(jié)果可以保存在內(nèi)存中,從而不再需要讀寫(xiě)HDFS,因此Spark除了能夠提供交互式查詢(xún)外,還可以?xún)?yōu)化迭代工作,能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法。
隨著Spark計(jì)算引擎的出現(xiàn),促進(jìn)了上層應(yīng)用快速發(fā)展,如對(duì)各種迭代計(jì)算的性能以及對(duì)流計(jì)算和SQL等的支持,F(xiàn)link就在這樣的背景下誕生了。Flink的突出優(yōu)勢(shì)是對(duì)流計(jì)算的支持及更一步的實(shí)時(shí)性。為了對(duì)流式引擎處理有更深刻的認(rèn)識(shí),Sanket Chintapalli等人[6]基于常用的流式計(jì)算構(gòu)建了三種不同的流引擎,進(jìn)行對(duì)比分析。其中Spark streaming是將流劃分解成一系列短小的批處理作業(yè),F(xiàn)link才是真正的流處理,批處理只是流數(shù)據(jù)的一個(gè)極限特例而已。Flink不僅可以支持本地的快速迭代,以及一些環(huán)形的迭代任務(wù),而且Flink可以定制化內(nèi)存管理,其并沒(méi)有將內(nèi)存完全交給應(yīng)用層,因此Flink處理大數(shù)據(jù)速度快,能夠更好地滿(mǎn)足大數(shù)據(jù)背景下應(yīng)用實(shí)時(shí)計(jì)算平臺(tái)的需求。
由于Flink框架的優(yōu)勢(shì),現(xiàn)在有很多關(guān)于Flink應(yīng)用的相關(guān)研究。蔡鯤鵬[7]研究了Flink的概念、生態(tài)系統(tǒng)和相關(guān)技術(shù)等理論基礎(chǔ)并對(duì)Hadoop和Flink在處理大批量數(shù)據(jù)上的耗時(shí)和準(zhǔn)確率進(jìn)行了對(duì)比分析,針對(duì)不同的流式處理平臺(tái),分析總結(jié)了Flink所面臨的一些挑戰(zhàn),為Flink的進(jìn)一步研究提供了參考。麥冠華等[8]基于Flink的計(jì)算框架,設(shè)計(jì)了對(duì)大規(guī)模軌跡數(shù)據(jù)進(jìn)行實(shí)時(shí)運(yùn)動(dòng)模式檢測(cè)的算法,彌補(bǔ)了對(duì)于當(dāng)前大規(guī)模軌跡數(shù)據(jù)只能做范圍查詢(xún)、近鄰查詢(xún)的簡(jiǎn)單處理的不足,很好地應(yīng)用了Flink實(shí)時(shí)計(jì)算的優(yōu)勢(shì)。Marciani G[9]利用Flink框架對(duì)社交網(wǎng)絡(luò)進(jìn)行實(shí)時(shí)分析,系統(tǒng)架構(gòu)的設(shè)計(jì)重點(diǎn)在于利用Flink的并行性和內(nèi)存效率,以便能夠在分布式基礎(chǔ)設(shè)施上有效地處理大容量數(shù)據(jù)流。不僅Flink的應(yīng)用比較廣泛,對(duì)其優(yōu)化的相關(guān)研究也比較多。Verbitskiy I等[10]分析了Flink的執(zhí)行效率,通過(guò)各種實(shí)驗(yàn)評(píng)估表明Apache Flink的性能是高度依賴(lài)于問(wèn)題的;李梓楊等人[11]通過(guò)針對(duì)大數(shù)據(jù)流式計(jì)算平臺(tái)中輸入數(shù)據(jù)流急劇上升所導(dǎo)致的計(jì)算延遲升高問(wèn)題進(jìn)行優(yōu)化,有效地提高了現(xiàn)在Flink框架集群的吞吐量;文獻(xiàn)[12-13]對(duì)基于多查詢(xún)和狀態(tài)管理進(jìn)行了優(yōu)化,研究了Flink的可擴(kuò)展性等等。但是對(duì)于Flink使用過(guò)程中數(shù)據(jù)解析邏輯的控制研究相對(duì)較少。
文中主要從對(duì)Flink進(jìn)行實(shí)時(shí)解析時(shí)邏輯的更改角度進(jìn)行優(yōu)化,通過(guò)使用流控制的方式,減少代碼的開(kāi)發(fā)量,提高Flink應(yīng)用實(shí)時(shí)解析的效率。
隨著業(yè)務(wù)越來(lái)越復(fù)雜,需要采集和存儲(chǔ)的數(shù)據(jù)越來(lái)越多,由于存在著不同的業(yè)務(wù)系統(tǒng),日志的存儲(chǔ)格式多種多樣。為了對(duì)不同的日志進(jìn)行解析,同時(shí)能夠根據(jù)不同的需求將解析的數(shù)據(jù)輸出到相應(yīng)的存儲(chǔ)空間,需要開(kāi)發(fā)一套滿(mǎn)足靈活地適配各種日志格式的數(shù)據(jù)解析架構(gòu),從而減少同類(lèi)解析代碼的開(kāi)發(fā),將不同的數(shù)據(jù)解析進(jìn)行集中式的管理。文中基于隊(duì)列和分布式流處理架構(gòu)構(gòu)建了大數(shù)據(jù)的實(shí)時(shí)采集計(jì)算和存儲(chǔ)平臺(tái)。數(shù)據(jù)處理架構(gòu)如圖1所示。
1.數(shù)據(jù)采集。
日志數(shù)據(jù)都是實(shí)時(shí)產(chǎn)生的,在采集的過(guò)程中,也是在不斷生成的,因此數(shù)據(jù)采集模塊需要完成實(shí)時(shí)采集。目前應(yīng)用較多的有Tcollector、Filebeat[14]等采集工具。
其中Filebeat具有兩個(gè)較大的優(yōu)勢(shì):
(1)性能穩(wěn)健。
圖1 數(shù)據(jù)處理架構(gòu)
無(wú)論什么樣的應(yīng)用都可能存在程序中斷的情況,F(xiàn)ilebeat能夠讀取并轉(zhuǎn)發(fā)日志行,如果出現(xiàn)中斷,還會(huì)在一切恢復(fù)正常后,從中斷前停止的位置繼續(xù)開(kāi)始。
(2)部署簡(jiǎn)單。
Filebeat內(nèi)置有多種模塊(Apache、System、MySQL等等),可以針對(duì)常見(jiàn)格式的日志大大簡(jiǎn)化收集、解析和可視化過(guò)程。
基于Filebeat的優(yōu)勢(shì),在構(gòu)建數(shù)據(jù)的采集平臺(tái)時(shí)采用該服務(wù)進(jìn)行日志的實(shí)時(shí)采集。在部署采集程序時(shí),將不同的業(yè)務(wù)發(fā)送到不同的topic數(shù)據(jù)流中;通過(guò)Filebeat的配置文件實(shí)現(xiàn)數(shù)據(jù)采集的機(jī)器、日志文件、采集的路徑、數(shù)據(jù)的輸出端的配置。
2.數(shù)據(jù)傳輸。
數(shù)據(jù)傳輸采用Kafka[15]隊(duì)列,每個(gè)topic隊(duì)列作為一個(gè)單獨(dú)的數(shù)據(jù)流,并與數(shù)據(jù)的采集和解析構(gòu)成完整的數(shù)據(jù)處理流。除了采集業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)流,這里增加一個(gè)空流,用來(lái)進(jìn)行流控制。Kafka采集的各個(gè)數(shù)據(jù)流如圖2所示。
圖2 采集的數(shù)據(jù)流
3.數(shù)據(jù)解析。
數(shù)據(jù)解析部分需要獲取多種數(shù)據(jù)的配置,如系統(tǒng)配置、數(shù)據(jù)源配置、數(shù)據(jù)解析邏輯配置、數(shù)據(jù)存儲(chǔ)配置、監(jiān)控配置等。數(shù)據(jù)解析模塊主要是通過(guò)使用Flink的各種算子組合完成業(yè)務(wù)數(shù)據(jù)解析邏輯的。該模塊是整個(gè)實(shí)時(shí)采集計(jì)算和存儲(chǔ)平臺(tái)的核心部分,對(duì)數(shù)據(jù)的實(shí)時(shí)計(jì)算能力要求較高。該模塊不僅需要完成對(duì)數(shù)據(jù)流的實(shí)時(shí)解析,同時(shí)還要支持對(duì)數(shù)據(jù)流解析的實(shí)時(shí)更改。例如一個(gè)流能夠解析多個(gè)topic的數(shù)據(jù),一個(gè)topic能夠通過(guò)解析程序分流到不同的存儲(chǔ)路徑,對(duì)一個(gè)流能夠?qū)崟r(shí)地更改解析邏輯,而不需要重啟。
4.數(shù)據(jù)存儲(chǔ)。
日志數(shù)據(jù)不僅用來(lái)進(jìn)行業(yè)務(wù)的分析,還需要對(duì)各種業(yè)務(wù)的指標(biāo)進(jìn)行監(jiān)控,所以同一份日志的數(shù)據(jù)需要存儲(chǔ)到不同的存儲(chǔ)介質(zhì),因此數(shù)據(jù)流的輸出結(jié)果也會(huì)有多種,如hdfs、hive、clickhouse、opentsdb等多個(gè)存儲(chǔ)渠道。
基于第2節(jié)介紹的實(shí)時(shí)計(jì)算架構(gòu),提出了使用更新算子的方式來(lái)改變數(shù)據(jù)流的解析邏輯。
現(xiàn)在通用的數(shù)據(jù)流處理方式是流stream1處理完,將得到的結(jié)果作為stream2的輸入,在stream2流的處理中完成對(duì)stream1的結(jié)果的處理,將stream流的解析通過(guò)不同的map邏輯依次處理,直到得到想要的輸出結(jié)果。解析邏輯如圖3所示。
圖3 解析邏輯
而第2節(jié)介紹的實(shí)時(shí)計(jì)算架構(gòu),不再使用這種方式進(jìn)行業(yè)務(wù)邏輯的處理,而是通過(guò)配置文件來(lái)實(shí)現(xiàn),為了滿(mǎn)足通過(guò)更新配置文件代替代碼開(kāi)發(fā)完成日志解析的需求,提出流迭代的算法,具體算法邏輯如下:
輸入:需要解析的數(shù)據(jù)流stream。
輸出:解析結(jié)果。
Step1:將每個(gè)要處理的數(shù)據(jù)流的名稱(chēng)通過(guò)hashmap進(jìn)行存儲(chǔ),假設(shè)
Step2:按照對(duì)datastream1的流處理算子得到流處理結(jié)果dataset1;
Step3:更新hashmap中stream1的value值為dataset1;
Step4:遍歷下一個(gè)需要處理的算子,直接讀取key=stream1的value值,對(duì)stream1的value值執(zhí)行相應(yīng)的解析邏輯得到數(shù)據(jù)集dataset2;
Step5:更新stream1的value值為dataset2;
Step6:依次迭代對(duì)數(shù)據(jù)流處理的各個(gè)算子,直到完成所有的解析邏輯,最后結(jié)果依然保存在stream1中。
通過(guò)流迭代的方法,每次算子執(zhí)行的時(shí)候都是對(duì)同一個(gè)stream1進(jìn)行處理,只需要遍歷定義好的算子即可,這樣很多算子在不同的數(shù)據(jù)流解析中可以共用,不僅可以減少代碼的開(kāi)發(fā)量,還可以把開(kāi)發(fā)的重點(diǎn)放在業(yè)務(wù)邏輯處理中,解析日志的程序開(kāi)發(fā)變得更加簡(jiǎn)單。在算子中還可以添加復(fù)制的算子,將一個(gè)數(shù)據(jù)流復(fù)制成多個(gè)數(shù)據(jù)流,再針對(duì)不同的數(shù)據(jù)流配置不同的日志解析算子,實(shí)現(xiàn)分流的效果。
程序的重新啟動(dòng)會(huì)中斷正在運(yùn)行的解析邏輯,有些數(shù)據(jù)實(shí)時(shí)性要求較高,中間重啟程序會(huì)造成一些數(shù)據(jù)的缺失。同時(shí)針對(duì)3.1介紹的流切換算法中的stream值更新的時(shí)候也需要實(shí)時(shí)地傳入,因此文中設(shè)計(jì)了免更新、免重啟的流控制算法。
輸入:解析算子γ,算子γ是可以實(shí)現(xiàn)數(shù)據(jù)流選擇和各種解析業(yè)務(wù)邏輯的配置,里面通過(guò)設(shè)置一個(gè)參數(shù)source,實(shí)現(xiàn)對(duì)不同數(shù)據(jù)流的解析邏輯控制。
輸出:按照算子指定的執(zhí)行邏輯輸出結(jié)果。
Step1:假設(shè)需要解析的數(shù)據(jù)流為dataA,在現(xiàn)有需要解析的數(shù)據(jù)流中增加一個(gè)空的數(shù)據(jù)流temp,該數(shù)據(jù)流開(kāi)始時(shí)不存儲(chǔ)任何數(shù)據(jù),同時(shí)增加一個(gè)內(nèi)部類(lèi)的變量用來(lái)存儲(chǔ)解析的算子γ;
Step2:在實(shí)時(shí)的代碼解析邏輯中,增加一個(gè)對(duì)temp流的解析;
Step3:在需要更新解析邏輯時(shí),通過(guò)注入的方式將最新的解析邏輯注入到temp流中;
Step4:通過(guò)解析temp流中的數(shù)據(jù),獲取針對(duì)當(dāng)前數(shù)據(jù)流的解析邏輯,并更新為γ的值;
Step5:再次解析數(shù)據(jù)流dataA的時(shí)候,就會(huì)使用最新的解析邏輯來(lái)處理數(shù)據(jù),從而實(shí)現(xiàn)解析邏輯的實(shí)時(shí)控制。
算法的實(shí)現(xiàn)邏輯如圖4所示,數(shù)據(jù)流處理主要分為數(shù)據(jù)的采集和解析,業(yè)務(wù)數(shù)據(jù)流主要是從各個(gè)業(yè)務(wù)系統(tǒng)實(shí)時(shí)采集對(duì)應(yīng)的數(shù)據(jù),而邏輯數(shù)據(jù)流是在需要解析某個(gè)業(yè)務(wù)數(shù)據(jù)時(shí),傳入業(yè)務(wù)流對(duì)應(yīng)的解析邏輯;在數(shù)據(jù)解析環(huán)節(jié)首先獲取解析邏輯的解析算子,從而實(shí)現(xiàn)對(duì)業(yè)務(wù)數(shù)據(jù)流解析的控制。
圖4 業(yè)務(wù)數(shù)據(jù)流處理流程
注入temp數(shù)據(jù)流的解析算子γ配置如下:
#系統(tǒng)配置
#數(shù)據(jù)源
#業(yè)務(wù)邏輯(任務(wù)解析)
#配置輸出
#監(jiān)控配置
通過(guò)java生成相應(yīng)的文件,注入到temp數(shù)據(jù)流中。在整個(gè)架構(gòu)中有一個(gè)控制類(lèi),該類(lèi)通過(guò)讀取XML文件,解析一個(gè)配置類(lèi),配置的成員包括系統(tǒng)配置類(lèi)、數(shù)據(jù)源類(lèi)、業(yè)務(wù)邏輯算子類(lèi)、配置輸出sink類(lèi)等,其中算子類(lèi)在實(shí)現(xiàn)的時(shí)候會(huì)繼承一個(gè)基類(lèi),這樣不同類(lèi)型的算子都可以組成一個(gè)基類(lèi)算子的list列表。
以從Kafka隊(duì)列接收數(shù)據(jù),使用Flink解析,輸出到hdfs、Opentsdb存儲(chǔ)數(shù)據(jù)中為例,介紹該算法在具體的實(shí)時(shí)流計(jì)算框架中的應(yīng)用。
下面給出了注入temp的主要配置信息。
Flink是一種典型的分布式計(jì)算,對(duì)于內(nèi)部類(lèi)外的變量會(huì)在程序啟動(dòng)后存儲(chǔ)在master節(jié)點(diǎn)上,且后續(xù)都不能改變,而對(duì)于內(nèi)部類(lèi)中的變量在每次代碼執(zhí)行時(shí)都會(huì)執(zhí)行,因此采用3.2介紹的流控制方法在內(nèi)部類(lèi)中添加一個(gè)變量,用來(lái)存儲(chǔ)解析算子,在對(duì)不同業(yè)務(wù)數(shù)據(jù)進(jìn)行處理時(shí),更新這個(gè)值就可以達(dá)到解析的目的。
偽代碼邏輯如下:
創(chuàng)建一個(gè)臨時(shí)變量str=temp;
#解析數(shù)據(jù)流DataA
{
#根據(jù)str,實(shí)現(xiàn)map的解析
DataStream
}
#解析數(shù)據(jù)流temp
{
Parse(temp_map)
輸入新的解析算子new_temp
If(Source=stream1){ str=new_temp;}
else{Return;}
目前生產(chǎn)上共配置了5臺(tái)CPU16核,內(nèi)存為64 G的服務(wù)器,搭建了實(shí)時(shí)解析架構(gòu)平臺(tái),每秒的日志處理流量大概15 W左右,處理業(yè)務(wù)日志種類(lèi)多達(dá)50個(gè),隨著業(yè)務(wù)的變動(dòng),實(shí)時(shí)平臺(tái)的調(diào)整也會(huì)比較頻繁。
完成一個(gè)在線(xiàn)運(yùn)行的業(yè)務(wù)更改邏輯過(guò)程對(duì)比如下:
不使用文中提出的自動(dòng)化流控制算法的業(yè)務(wù)處理過(guò)程:
(1)從Filebeat采集的每條數(shù)據(jù)中獲取需要的字段;
(2)對(duì)指定的日志數(shù)據(jù)寫(xiě)代碼進(jìn)行解析,調(diào)試;
(3)查看解析結(jié)果,是否滿(mǎn)足業(yè)務(wù)的需求;
(4)寫(xiě)入庫(kù)的sink代碼,調(diào)試;
(5)查看入庫(kù)的結(jié)果;
(6)將代碼打包、上傳jar包到各臺(tái)服務(wù)上;
(7)重啟各個(gè)進(jìn)程,查看程序是否正常啟動(dòng)。
使用文中提出的自動(dòng)化流控制算法的業(yè)務(wù)處理過(guò)程:
(1)在配置文件中實(shí)現(xiàn)所有的邏輯;
(2)查看解析和入庫(kù)的結(jié)果;
(3)將配置文件注入到topic流中;
(4)自動(dòng)生效,完成邏輯的更改。
消耗和用時(shí)結(jié)果對(duì)比如表1所示。
表1 是否使用流處理的對(duì)比結(jié)果
從實(shí)現(xiàn)的流程可以看到,使用文中提出的自動(dòng)化流控制算法可以很大程度地減少代碼的開(kāi)發(fā)與測(cè)試,減少生產(chǎn)部署的工作量,最大程度地保證了生產(chǎn)數(shù)據(jù)的實(shí)時(shí)性。
通過(guò)Filebeat、Kafka隊(duì)列和Flink流式處理架構(gòu),構(gòu)建了一套實(shí)時(shí)數(shù)據(jù)流解析的平臺(tái),不僅能夠針對(duì)大量的數(shù)據(jù)進(jìn)行實(shí)時(shí)解析,還能夠滿(mǎn)足實(shí)時(shí)更新解析邏輯的策略,極大地提高了生產(chǎn)數(shù)據(jù)的采集和分析效率。通過(guò)該實(shí)時(shí)解析架構(gòu)平臺(tái),可以實(shí)現(xiàn)多流合并,單流分流,及業(yè)務(wù)邏輯實(shí)時(shí)更新等多個(gè)功能,提高了分布式流處理架構(gòu)Flink的應(yīng)用性能,為當(dāng)前各個(gè)互聯(lián)網(wǎng)公司復(fù)雜業(yè)務(wù)邏輯的大數(shù)據(jù)處理提供了解決方案,具有一定的現(xiàn)實(shí)意義。后續(xù)將會(huì)對(duì)Flink使用的資源做進(jìn)一步的優(yōu)化,提高數(shù)據(jù)解析平臺(tái)的資源利用率。