李團結(jié)+從新法+李光明
【摘要】 日志對于每個系統(tǒng)來說都是不可或缺的一部分,而現(xiàn)階段對日志的處理效率卻不盡如人意。實時性日志考驗的是大數(shù)據(jù)處理框架的實時計算能力,基于Storm 并借助開源框架 Kafka,設(shè)計了一個實時數(shù)據(jù)收集與處理的系統(tǒng),將數(shù)據(jù)轉(zhuǎn)為流的形式,對收集來的數(shù)據(jù)直接在內(nèi)存以流的形式進行計算,輸出有價值的信息保存到Redis。最后對系統(tǒng)進行性能測試以及計算能力的測試。實驗結(jié)果表明,該系統(tǒng)可擴展性良好,且并行計算能力穩(wěn)定,適合大量實時數(shù)據(jù)處理。
【關(guān)鍵字】 Storm Kafka Redis
一、引言
大數(shù)據(jù)時代,與互聯(lián)網(wǎng)行業(yè)息息相關(guān)的諸多領(lǐng)域中用戶數(shù)量和其產(chǎn)生的數(shù)據(jù)在不斷地累加,為之提供支撐的服務(wù)器端存放的日志信息量也隨之劇增,如何準確及時的篩選海量日志中的關(guān)鍵信息成為了亟待解決的問題。眾所周知,Hadoop架構(gòu)可以使用戶可以在不了解分布式底層細節(jié)的情況下,開發(fā)分布式程序。充分利用集群的威力進行高速運算和存儲,但是對于實時性極強的流式數(shù)據(jù),顯然流處理框架Strom更適合,并且處理效率客觀。
二、Storm計算框架
Storm是由BackType開發(fā)并被Twitter于2011 年開源的分布式實時計算系統(tǒng)[1],能夠很容易可靠地處理無界持續(xù)的流數(shù)據(jù),進行實時計算 [2]。
任務(wù)拓撲是Storm的邏輯單元,一個實時的應(yīng)用打包為拓撲后發(fā)送,拓撲是由Spout和Bolt組成,其二者的關(guān)系如圖1所示。Spout節(jié)點從數(shù)據(jù)源中源源不斷的消費數(shù)據(jù)并把數(shù)據(jù)發(fā)送到后面的Bolt節(jié)點,而Topology是將Spout和Bolt組合在一起完成一項具體的計算任務(wù)。Topology一旦提交就會一直執(zhí)行。
Storm主從架構(gòu)圖包含一個主節(jié)點Nimbus和多個從節(jié)點Supervisor,Zookeeper完成兩者之間的協(xié)調(diào)。每個 Worker都執(zhí)行且只執(zhí)行任務(wù)拓撲中的一個子集, 在每個Worker 內(nèi)部,會有多個 Executor,每個 Executor對應(yīng)一個任務(wù),負責具體數(shù)據(jù)的計算,即用戶所實現(xiàn)的 Spout /Bolt 實例。
三、日志綜合管理平臺基于Storm的實現(xiàn)方案
3.1開發(fā)環(huán)境及采用的測試數(shù)據(jù)集
硬件環(huán)境包括Storm集群,Kakfa集群,Zookeeper集群,Storm包括1個Nimbus和4個Supervisor;Kafka集群包括5個節(jié)點;Zookeeper集群也包括5個節(jié)點,集體配置如表1所示。
軟件環(huán)境:jdk-1.7.0_79、logstash-2.3.4、elasticsearch-2.3.4、storm-0.9.5、kafka_2.9.1-0.8.2.0
zookeeper-3.3.5、python-2.7.12。
操作系統(tǒng):Linux version 3.10.0-327.el7.x86_64
數(shù)據(jù)集:用戶話單日志信息(約2 billon/day)。
3.2平臺架構(gòu)及處理流程
日志綜合處理平臺主要由三層組成,包括:數(shù)據(jù)采集層、數(shù)據(jù)分析及存儲層以及數(shù)據(jù)展示層。可以實現(xiàn)對日志從采集到分析處理的全過程并在頁面監(jiān)控平臺顯示。
本實驗方案使用 Kafka為消息中間件傳遞消息。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),其依賴Zookeeper保存每組消費者消費的相應(yīng)Topic的偏移量。
SpoutA接收待初始化的數(shù)據(jù),并將其發(fā)K-means&DBSCANBolt 通過數(shù)據(jù)簇形態(tài)識別以初始化微簇;SpoutB從Kafka中接收初始化后待處理的流數(shù)據(jù),將其發(fā)送至LocalBolt進行局部微聚類;SpoutC用作處理時間戳,每單位時間向LocalBolt發(fā)送一次信息,當接收到時間戳消息,將局部微聚類更新結(jié)果存放到Redis做實時局部微聚類更新結(jié)果的保存,并合并原有的增量信息發(fā)送到GlobalBolt;SpoutD通過消息中間件 Kafka接收用戶發(fā)送的查詢參數(shù)。
K-means&DBSCANBolt接收 SpoutA傳輸?shù)拇跏蓟瘮?shù)據(jù)與聚類參數(shù) k(簇數(shù)),進行標準 k-means聚類或者DBSCAN聚類,聚類的結(jié)果以微簇形式發(fā)送至 LocalBolt隨后根據(jù)時間戳信息保存結(jié)果到Redis,并由滑動窗口觸發(fā)機制合并局部微簇到全局微簇GlobalBolt。RL-DSCA算法的微簇在線維護微簇進行的在線增量更新是由LocalBolt來實現(xiàn)的,體現(xiàn)了RL-DSCA算法分布式數(shù)據(jù)的處理,到達的待處理流數(shù)據(jù)將會分配到各個LocalBolt節(jié)點,這些節(jié)點具體的功能均不相同,LocalBolt各節(jié)點處理流程如圖2所示。主要處理Bolt的實現(xiàn)功能如下。
extractBolt:該Bolt主要實現(xiàn)從初始化后的數(shù)據(jù)流中篩選目標信息,并將篩選出來的數(shù)據(jù)發(fā)送到下一個處理bolt。
judgeSysTimeBolt:該Bolt用來判斷系統(tǒng)時間和時間戳的關(guān)系檢測拓撲停止工作的異常情況,如出現(xiàn)拓撲異常,系統(tǒng)時間>時間戳?xí)r間,對時間戳補齊并進行更新(updateTimestampBolt)結(jié)果存放到Redis。
judgeLogTimeBolt:改Bolt主要是判斷來的日志是實時日志還是歷史日志,如果日志時間在時間戳范圍內(nèi)即為實時日志,否則按照歷史日志來處理。
sendAndUpdateRedisBolt:實時日志的發(fā)送,根據(jù)SpoutC傳來的時間戳消息,將局部微聚類更新結(jié)果存放到Redis。
submitLastValueBolt:該Bolt用于處理歷史日志的最后一個時間戳,根據(jù)來的一條正常日志觸發(fā)將歷史日志的微簇發(fā)送到Redis。
abnormalHandleBolt:該Bolt主要對歷史日志進行處理,避免影響實時流數(shù)據(jù)的處理,并將歷史日志的處理結(jié)果合并到Redis供全局微簇的合并。
現(xiàn)將該平臺的主要功能概述如下:
接收 K-means&DBSCANBolt生成的初始化微簇生成初始緩存集Kafka;對于到達拓撲的待處理的數(shù)據(jù)流,LocalBolt按照單位時間生成局部聚類增量,并將該中間結(jié)果發(fā)送至Redis供合并;Redis實現(xiàn)RL-DSCA算法的合并部分,即合并局部增量結(jié)果進行全局微簇增量更新:接收LocalBolt生成的初始化微簇生成初始全局微簇;緩存各局部線程傳輸?shù)闹虚g結(jié)果;使用滑動窗口觸發(fā)機制,達到觸發(fā)時間點則合并暫存的中間結(jié)果,將結(jié)果打上相應(yīng)時間標記Tag,生成實時全局微簇快照發(fā)送至GlobalBolt。GlobalBolt實現(xiàn)RL-DSCA算法的查詢輸出;接收GlobalBolt生成的全局微簇快照,將其存儲至金字塔時間幀結(jié)構(gòu)中供后續(xù)查詢;當用戶輸入查詢參數(shù)時,通過SpoutD接收查詢參數(shù),查找金字塔時間幀結(jié)構(gòu)中的相應(yīng)數(shù)據(jù),將查詢結(jié)果發(fā)送至SendBolt 進行輸出。
四、結(jié)束語
本文設(shè)計開發(fā)了流數(shù)據(jù)計算平臺 Storm 的計算架構(gòu)處理海量數(shù)據(jù)日志綜合管理平臺,結(jié)合Kafka和Redis對日志進行了實時性的分析和處理。滿足了用戶對大數(shù)據(jù)量日志信息的使用需要,并達到了客觀的處理效率。
參 考 文 獻
[1] The Apaehe Foundation. Storm official website- [EB/OL].https://storm.apache.org/.
[2] Github Inc. Storm Wiki[EB/OL]. https://github.com/apache/storm.