杜政頡, 王 鵬, 黃 焱, 郎福通
(1.成都信息工程學(xué)院軟件工程學(xué)院并行計算實驗室,四川成都610225;2.中國科學(xué)院成都計算機(jī)應(yīng)用研究所,四川 成都 610041;3.中國科學(xué)院大學(xué),北京 100049)
大數(shù)據(jù)處理分為兩類模式,一類是批處理模式,數(shù)據(jù)源為靜態(tài);一類是流處理模式,數(shù)據(jù)源為動態(tài)。批處理模式系統(tǒng)有Hadoop、Spark、Disco、HPCC等,流處理模式系統(tǒng)來自 Twitter的 Storm和來自 Yahoo的 S4系統(tǒng)[1]。MapReduce是Hadoop中被廣泛應(yīng)用的一種大數(shù)據(jù)處理模型,側(cè)重于批處理,Topology是來自于Storm系統(tǒng)中的一種編程模型,側(cè)重于流處理。
MapReduce無法解決具有迭代結(jié)構(gòu)的應(yīng)用程序,迭代結(jié)構(gòu)程序在實際應(yīng)用中很普遍,因此,有人基于MapReduce提出一種迭代MapReduce。文獻(xiàn)[2]提出一種名為Twister的迭代MapReduce處理方案,文獻(xiàn)[3]提出一種名為HaLoop的MapReduce迭代方案。
Storm是一款應(yīng)用于實時流處理領(lǐng)域的大數(shù)據(jù)處理工具。在Storm中,Nathan Mar提出一種新的并行編程模型Topology。這種模型改進(jìn)了MapReduce需要存儲中間數(shù)據(jù)這一繁瑣過程[2],采用類似于流水線作業(yè)方式的任務(wù)分解模型,側(cè)重于處理動態(tài)數(shù)據(jù)源的任務(wù),實時性更強(qiáng)。Storm編程模型與MapReduce一樣,并沒有考慮這種迭代結(jié)構(gòu)應(yīng)用程序的實現(xiàn)過程。對于這一缺陷,還沒有人提出一種改進(jìn)方案。因此,文中基于這種Topology編程模型,通過增加組件Receiver、IBolt、Checker組建迭代Topology,設(shè)計了一種新的可以解決迭代結(jié)構(gòu)應(yīng)用程序Topology模型,并對這種模型的新增組件和其對應(yīng)的API進(jìn)行了介紹和分析,在Storm系統(tǒng)架構(gòu)基礎(chǔ)上設(shè)計了一種迭代Topology的實現(xiàn)方案,描述了在這種實現(xiàn)方式下解決具有迭代結(jié)構(gòu)程序的具體過程,并使用這種模型實現(xiàn)了K-Means算法,實例論證這種迭代模型的可行性。
Storm編程模型原理[4]:一般任務(wù)都可以用流水線作業(yè)方式表現(xiàn)出來,其中的組件就相當(dāng)于流水作業(yè)中的一個工人,不同的組件負(fù)責(zé)任務(wù)中不同的部分,一個組件處理完自己的工作即提交給下一個組件,直至整個任務(wù)處理完成。整個任務(wù)實現(xiàn)的過程可以用圖1的Topology來表示。這種模型的一個突出特點(diǎn):數(shù)據(jù)源可以靜態(tài)可以動態(tài),動態(tài)環(huán)境中它的表現(xiàn)更能體現(xiàn)出它的優(yōu)勢。一方面,這種模型采用消息傳遞方式交互數(shù)據(jù),數(shù)據(jù)量相比于從磁盤獲取要小,動態(tài)環(huán)境中,數(shù)據(jù)量動態(tài)讀取,每次讀取量小,很好滿足了這種模型的特點(diǎn);另一方面,這種模型是一種實時性處理模型,動態(tài)環(huán)境中更能夠體現(xiàn)這種特點(diǎn)。所以,這種處理模型側(cè)重于流處理。模型包含兩類組件:Spout和Bolt,Spout組件負(fù)責(zé)讀取數(shù)據(jù)源,Bolt組件負(fù)責(zé)實際的數(shù)據(jù)操作運(yùn)算。組件在實現(xiàn)中有對應(yīng)的API,Spout組件API為setSpout(),Bolt組件API為setBolt()。
圖1 Storm Topology
有許多并行算法內(nèi)部都帶有簡單的迭代結(jié)構(gòu)。這些算法大多分布在數(shù)據(jù)聚類、維度縮減、鏈接分析、機(jī)器學(xué)習(xí)和計算機(jī)視覺等領(lǐng)域。K-Means、確定性退火聚類、PageRank和SMACOF算法就是其中的例子。這種具有迭代結(jié)構(gòu)的算法可以用以下公式來描述[3]:
其中,R0表示初始化時的結(jié)果,L表示一種不變的關(guān)系。這種公式表示的程序,只有當(dāng)?shù)竭_(dá)某檢查點(diǎn)時才將終止運(yùn)行。比如,迭代后的結(jié)果與前面結(jié)果相比已經(jīng)不會出現(xiàn)變化就可以作為一個檢查點(diǎn),這也是很多優(yōu)化算法迭代的檢查點(diǎn)。
迭代程序的一個關(guān)鍵點(diǎn)是結(jié)果與輸入具有相關(guān)性,上面的Topology中輸出結(jié)果無法再次作為輸入,所以這種迭代部分只能放入一個組件Bolt作為一個任務(wù)來處理,這樣就增加了該組件的處理負(fù)載。對于低配置集群,這種高負(fù)載一方面導(dǎo)致整體系統(tǒng)效率降低,另一方面還有可能使節(jié)點(diǎn)失效。因此,下面設(shè)計一種具有迭代結(jié)構(gòu)的Topology模型,在這種模型中,處理迭代結(jié)構(gòu)程序時,就可以把迭代結(jié)構(gòu)部分按照功能拆分開來,而不是把整個迭代部分放入一個組件中,使得這種模型處理迭代問題時更加靈活。
基于Storm的基礎(chǔ)編程模型,文中改進(jìn)其Topology,增加了迭代模塊,能夠解決迭代類的問題,如圖2所示。迭代Topology與之前Topology相比多了一個新拓?fù)?Iterator。Iterator負(fù)責(zé)處理具有迭代結(jié)構(gòu)部分程序。里面同時包含了IBolt組件、迭代檢查器Checker和外部數(shù)據(jù)源接收器Receiver。IBolt組件負(fù)責(zé)任務(wù)處理,迭代檢查器負(fù)責(zé)判斷迭代是否結(jié)束,外部數(shù)據(jù)接收器負(fù)責(zé)接收迭代器外部發(fā)來的數(shù)據(jù)。迭代Topology好比在流水線上的工人中增加一個迭代管理員,負(fù)責(zé)管理處理迭代任務(wù)的工人,并告訴他們什么時候進(jìn)行迭代操作,什么時候結(jié)束迭代。
圖2中描述的這種迭代模型看起來只能模仿Do-While循環(huán)模式,但While-Do或者Do-While在一定條件下可以互相轉(zhuǎn)化,所以這種模型其實可以解決任何迭代問題。
迭代 Topology新增了 Receiver、IBolt、Checker 3個組件,通過新增的 3個API(setReceiver()、setIBolt()、setChecker())來實現(xiàn)。
(1)新增組件
Receiver組件:Receiver組件用來接收外部組件和內(nèi)部迭代組件Checker組件發(fā)送來的消息。將接收到的消息進(jìn)行排隊處理后發(fā)往迭代開始的IBolt組件。Receiver組件一方面解決了Checker組件和Spout組件同時向IBolt組件發(fā)送消息的功能,另一方面也可以控制數(shù)據(jù)傳入IBolt組件的速率。
IBolt組件:IBolt組件與Bolt組件一樣,負(fù)責(zé)實際任務(wù)處理,但它處理的是需要迭代運(yùn)算的任務(wù),區(qū)別于非迭代功能的任務(wù)。
Checker組件:Checker組件是實現(xiàn)迭代過程的關(guān)鍵,迭代控制主要由它完成。主要功能是檢查迭代處理是否結(jié)束。與其它組件不同的是,有兩個發(fā)射口,一個是發(fā)往Receive組件,一個是發(fā)往外部Bolt組件。如果進(jìn)入下次迭代,消息發(fā)往Receiver;如果迭代結(jié)束,消息發(fā)往外部Bolt組件。
圖2 迭代Topology
(2)新增組件API
setReceiver():構(gòu)造1個接收器,2個參數(shù),參數(shù)1設(shè)定接收器名,參數(shù)2設(shè)定并行數(shù)目;
setIBolt():構(gòu)造1個迭代處理組件,3個參數(shù),參數(shù)1設(shè)定迭代處理組件名,參數(shù)2設(shè)定要處理的任務(wù),參數(shù)3設(shè)定并行數(shù)目;
setChecker():構(gòu)造1個迭代檢查器,4個參數(shù),參數(shù)1設(shè)定迭代檢查器名,參數(shù)2設(shè)定消息接收器名,參數(shù)3設(shè)定迭代檢查器任務(wù),參數(shù)4設(shè)定并行數(shù)目;
圖3為Storm中實現(xiàn)Topology模型的一種架構(gòu)[4]。該架構(gòu)由3個進(jìn)程組成:Nimbus進(jìn)程為主進(jìn)程,負(fù)責(zé)接收客戶端提交的代碼,并將代碼序列化,為客戶機(jī)分發(fā)任務(wù);Zookeeper進(jìn)程負(fù)責(zé)Nimbus進(jìn)程和Supervisor進(jìn)程之間的消息協(xié)同工作;Supervisor進(jìn)程負(fù)責(zé)接收任務(wù)并執(zhí)行任務(wù),將任務(wù)結(jié)果返回給用戶。
實現(xiàn)這種迭代Topology,仍然延用Storm系統(tǒng)基礎(chǔ)架構(gòu),需要改變的是系統(tǒng)的調(diào)度策略、組件類型。一個完整迭代任務(wù)執(zhí)行過程如下:
(1)Nimbus進(jìn)程接收客戶端提交過來的具有迭代結(jié)構(gòu)的Topology,然后將每個組件序列化,并分發(fā)任務(wù)到Supervisor;
(2)Supervisor接收分發(fā)的任務(wù)并開始執(zhí)行;
(3)Spout組件發(fā)射一條消息到迭代消息接收器Receiver,消息接收器將消息進(jìn)行排隊,按照先來先服務(wù)策略,把消息發(fā)往第一個要開始迭代操作的組件IBolt;
(4)IBolt組件處理完自己的任務(wù),根據(jù)用戶設(shè)定,決定任務(wù)結(jié)果是發(fā)往Checker組件還是發(fā)往下一個IBolt組件;
(5)Checker組件根據(jù)接收到的消息和用戶定義的檢查點(diǎn)決定是否繼續(xù)迭代操作,如果迭代完成則將消息發(fā)往外部Bolt組件,否則,將消息發(fā)往Receiver組件開始下一次循環(huán)操作。
圖3 Storm框架
K-Means算法是數(shù)據(jù)挖掘中應(yīng)用廣泛的數(shù)據(jù)聚類算法,算法程序結(jié)構(gòu)包含了迭代處理部分,這種具有迭代結(jié)構(gòu)的算法很多,為了驗證迭代Topology模型的可行性,文中選用比較典型并且為大家熟知的K-Means算法來實現(xiàn)。
K-Means算法的核心思想[5]是找出K個聚類中心c1,c2,…,ck,使每個數(shù)據(jù)點(diǎn)xi和與其最近的聚類中心cv的平方距離和被最小化(該平方距離和被稱為偏差D)。對n個樣本進(jìn)行聚類的過程如下:
(1)初始化:隨機(jī)指定k個聚類中心(c1,c2,…,ck);
(2)重復(fù)下面過程直到D收斂:
①分配xi:對每個樣本xi,找到離它最近的聚類中心cv,并將其分配到cv所標(biāo)明類;
②修正cv:對每一個cv移動到其標(biāo)明的類中心;
藥士道:“此人之前乃是假死,蓋因體力透支嚴(yán)重,精神高度緊繃,再加之外傷失血過多,導(dǎo)致身體機(jī)能衰竭。如今能重新恢復(fù)氣息,實數(shù)罕見。容我為她配些養(yǎng)神滋補(bǔ)的草藥,至于她能不能徹底醒過來,還要看她自身的造化了。”
因此,在迭代 Topology模型上設(shè)計K-Means算法的Topology可以由圖4表示。
該Topology中,Spout組件為spout,Receiver組件為receiver,Checker組件為checker,IBolt處理組件包括caldistance和move,Bolt組件為writeResult,K-Means算法在迭代Topology上的實現(xiàn)過程的具體描述如圖5所示。
圖4 K-Means算法的迭代Topology
該實現(xiàn)采用5個點(diǎn)作為點(diǎn)群,選取2個種子點(diǎn),圖5描述了這種歸類的大致過程,具體實現(xiàn)過程如下:
(1)spout組件隨機(jī)產(chǎn)生5個點(diǎn)作為點(diǎn)群,隨機(jī)產(chǎn)生2個點(diǎn)作為基點(diǎn),然后將點(diǎn)群和基點(diǎn)作為數(shù)據(jù)輸入發(fā)往receiver組件;
(2)receiver組件接收到輸入后把輸入數(shù)據(jù)發(fā)往迭代開始組件caldistance,該組件用來計算基點(diǎn)與各個點(diǎn)群之間的距離;
(3)caldistance將計算完后的距離以及點(diǎn)群和基點(diǎn)位置發(fā)往move組件,move組件根據(jù)距離計算出點(diǎn)群中心,并將基點(diǎn)移動到點(diǎn)群中心,將移動后的基點(diǎn)與點(diǎn)群作為數(shù)據(jù)輸入發(fā)往checker組件;
(4)checker組件將第一次接收到的數(shù)據(jù)與初始化的基點(diǎn)相比較,比較完后將接收到的基點(diǎn)代替初始化的基點(diǎn)作為下一次比較的對象,初始化的基點(diǎn)位置一般為(0,0),因此第一次比較必然要進(jìn)行迭代處理,所以checker組件將接收到的點(diǎn)群和基點(diǎn)發(fā)往receiver,然后開始第二次重復(fù)處理過程;
下面是構(gòu)建K-Means迭代Topology的主要模擬實現(xiàn)代碼段:
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout(“spout” ,new RandomProducePoints(5,2),2);
builder.setReceiver(“receiver” ,4);
builder.setIBolt(“caldistance” ,new calDistance(2),2).shufflegrouping(“ receiver”);
builder.setIBolt(“move”,new MovetoCenter(),2).shufflegrouping(“caldistance”);
builder.setChecker(“checker” ,”receiver” ,new checkChanges(),2).shufflegrouping(“move”);
builder.setBolt(“writeResult”,new writeResult(),1).shufflegrouping(“checker”);
以上代碼都是模擬仿真Storm系統(tǒng)中實現(xiàn)Topology的代碼來表述的。TopologyBuilder類是構(gòu)建一個Topology圖需要的類,類中包含了設(shè)置各種組件的方法,一個應(yīng)用的Topology就是通過這個類來實現(xiàn)的,組件中的參數(shù)主要有4類:組件名;組件任務(wù);接收消息的方式;并行數(shù)目。接受消息有6種分組方式:隨機(jī)分組、字段分組、全部分組、全局分組、無分組、直接分組,示例中都為隨機(jī)分組。隨機(jī)分組有一個參數(shù),這個參數(shù)指定接受消息的組件名。隨機(jī)分組的意思就是隨機(jī)接受組件發(fā)送來的消息。例如,組件move有4個線程在同時工作,組件caldistance有5個線程同時工作,move隨機(jī)接收caldistance發(fā)送來的消息,就表示move的任意一個線程接受caldistance任意線程計算完后的結(jié)果。并行數(shù)目,指共同執(zhí)行該組件任務(wù)的線程數(shù)目。并行數(shù)目都需要用戶根據(jù)任務(wù)情況來設(shè)置。示例中receiver采用4個線程,writeResult采用1個線程,其它的都是2個線程。receiver線程數(shù)目為4,因為它既要接收spout的消息還要接收checker的消息。
圖5 迭代 Topology實現(xiàn)K-Means算法過程
迭代 Topology在Storm Topology原型基礎(chǔ)上增加了 Receiver、IBolt、Checker組件,Receiver和Checker組件與IBolt組件連接組成了一個具有迭代功能的Topology圖,使用這種迭代Topology圖成功解決了具有迭代功能的K-Means算法,這種方案因為是在以前基礎(chǔ)上添加組件完成,所以就很好保留了原Topology的特點(diǎn)。這種迭代Topology在實現(xiàn)方式上,保留了Storm基礎(chǔ)架構(gòu),只是調(diào)整了主進(jìn)程的調(diào)度策略和組件類型,實現(xiàn)上降低了后續(xù)開發(fā)難度。
致謝:感謝成都市科技局創(chuàng)新發(fā)展戰(zhàn)略研究項目(11RKYB016ZF)對本文的資助
[1] 孟小峰,慈祥.大數(shù)據(jù)管理:概念,技術(shù)與挑戰(zhàn)[J].計算機(jī)研究與發(fā)展,2013,50(1):146-169.
[2] Ekanayake J,Li H,Zhang B,et al.Twister:a runtime for iterative mapreduce[C].Proceedingsof the 19th ACM International Symposium on High Performance Distributed Computing.ACM,2010:810-818.
[3] Bu Y,Howe B,Balazinska M,et al.HaLoop:Efficient iterative data processing on large clusters[J].Proceedings of the VLDB Endowment,2010,3(1-2):285-296.
[4] Storm-wiki.[EB/OL].http://github.com/nathanmarz/storm/wiki/,2013-06-10.
[5] 孫吉貴,劉杰,趙連宇.聚類算法研究[J].軟件學(xué)報,2008,19(1):48-61.
[6] Dean J,Ghemawat S.MapReduce:simplified data processing on large clusters[J].Communications of the ACM,2008,51(1):107-113.
[7] Neumeyer L,Robbins B,Nair A,et al.S4:Distributed stream computing platform[C].Data Mining Workshops(ICDMW),2010 IEEE International Conference on.IEEE,2010:170-177.
[8] Cherniack M,Balakrishnan H,Balazinska M,et al.Scalable Distributed Stream Processing[C].CIDR.2003,3:257-268.
[9] Nathanmarz-blog[EB/OL].http://nathanmarz.com/.2013-07-10.
[10] Storm-berkeley[EB/OL].storm-berkeley.pdf.2013-09-01.
[11] 張建萍,劉希玉.基于聚類分析的K-means算法研究及應(yīng)用[J].計算機(jī)應(yīng)用研究,2007,24(5):166-168.
[12] 鄧華鋒,劉云生,肖迎元.分布式數(shù)據(jù)流處理系統(tǒng)的動態(tài)負(fù)載平衡技術(shù)[J].計算機(jī)科學(xué),2007,34(7):120-123.
[13] 亓開元,趙卓峰,房俊,等.針對高速數(shù)據(jù)流的大規(guī)模數(shù)據(jù)實時處理方法[J].計算機(jī)學(xué)報,2012,35(3):477-490.
[14] Getting Started with Storm[EB/OL].Getting Started with Storm.pdf.2013-08-18.
[15] S4 vs Storm[EB/OL].s4vStorm.pdf.2013-08-08.