代明竹,高嵩峰
(北京建筑大學(xué),北京 100044)
過去十年中,大數(shù)據(jù)[1]知識已經(jīng)得到了社會的普遍認同和采用,這主要得益于提供給終端用戶強大功能的新技術(shù),使得終端用戶[2]的關(guān)注點放在數(shù)據(jù)執(zhí)行的轉(zhuǎn)換上,而非算法的并行化方面。
目前,很多研究學(xué)者在大數(shù)據(jù)的處理方面進行了很多研究。如Apache Hadoop[3]就是處理大數(shù)據(jù)的技術(shù),即一個開源的MapReduce模型。Hadoop的成功主要得益于其并行化抽象性、容錯性和可擴展性的體系結(jié)構(gòu),該結(jié)構(gòu)支持分布式存儲和大型數(shù)據(jù)集[4]的處理。然而,Hadoop對數(shù)據(jù)進行處理時所執(zhí)行的冗余內(nèi)存復(fù)制和磁盤操作[5],對其性能造成了嚴重的限制。文獻[6]綜述了12個典型的基于MapReduce的大數(shù)據(jù)處理平臺,分析對比它們的實現(xiàn)原理和適用場景,抽象其共性;介紹基于了MapReduce的大數(shù)據(jù)分析算法,包括搜索算法[7]、數(shù)據(jù)清洗/變換算法等,將這些算法按照MapReduce實現(xiàn)方式分類,分析影響算法性能的因素;將大數(shù)據(jù)處理算法抽象為外存算法,并對外存算法的特征加以梳理。
Apache Spark[8]和Apache Flink[9]具有易于使用的API,高級數(shù)據(jù)處理操作符和增強的性能,因此得到了更多的關(guān)注。文獻[10]對Hadoop與Spark的應(yīng)用場景進行了分析,闡述了Hadoop與Spark各自所適應(yīng)的應(yīng)用場景。雖然Spark和Flink的開發(fā)人員都給出了性能試驗的結(jié)果,但目前缺乏在這些框架之間公平的橫向比較。此類分析對于識別當(dāng)前技術(shù)的優(yōu)缺點來說非常重要,且有助于開發(fā)者確定未來的大數(shù)據(jù)系統(tǒng)的最優(yōu)特性。
因此,本文旨在對Hadoop,Spark和Flink進行同等條件下的性能評估。本文主要工作總結(jié)如下:1)通過各種批處理和迭代的工作負載,對Hadoop,Spark和Flink的性能做出比較評價;2)描述了一些實驗參數(shù)在整體性能上的影響特征。結(jié)果表明,Spark性能最優(yōu),是更大的Apache項目,其框架更加成熟,且在市場份額和社會關(guān)注方面均占據(jù)優(yōu)勢。然而,F(xiàn)link包含了一些有趣和新穎的設(shè)計理念,且部分理念得到了Spark的采用。Flink通過一個自定義的對象序列化程序,實現(xiàn)了持久性內(nèi)存管理的透明化使用,減少了垃圾收集的開銷,而且Flink使用的顯式迭代程序,極大提高了迭代算法的性能。
作為MapReduce模型的實際標準實現(xiàn),Hadoop已經(jīng)被很多機構(gòu)廣泛采用,以存儲和計算大型數(shù)據(jù)集。包括兩個組件:1)Hadoop分布式文件系統(tǒng)(HDFS);2)Hadoop MapReduce引擎。MapReduce模型基于兩個用戶定義函數(shù),即映射(map)和約減(reduce),這兩個函數(shù)計算由鍵值對表示的數(shù)據(jù)記錄。映射函數(shù)提取每個鍵值對的相關(guān)特征,而約減函數(shù)則使用這些特征得到期望的結(jié)果。
Spark提供了用戶能夠執(zhí)行的數(shù)據(jù)轉(zhuǎn)換的多樣性,同時依然包括了一些用于基于鍵的計算的操作符,這使得Spark特別適用于實施經(jīng)典的、基于鍵的MapReduce算法。Spark的編程模型以被稱為“彈性分布式數(shù)據(jù)集”(RDD)的抽象概念為基礎(chǔ),RDD將數(shù)據(jù)對象保留在內(nèi)存中,以降低磁盤和網(wǎng)絡(luò)操作帶來的開銷[8]。此類處理方式特別適用于在同一數(shù)據(jù)集上進行多次轉(zhuǎn)換的算法,例如迭代算法等。通過在內(nèi)存中對中間結(jié)果進行存儲,Spark避免了迭代之間的HDFS使用,由此實現(xiàn)了對此類工作負載的性能優(yōu)化。
Flink通過使用內(nèi)存處理技術(shù),改善了Hadoop的性能。其使用的高效內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,包含序列化的數(shù)據(jù)而非Java對象,避免了過多的垃圾收集。Flink進行批處理的編程模型以DataSet的概念為基礎(chǔ),DataSet通過高階操作進行轉(zhuǎn)換。與Spark不同,F(xiàn)link代表著一個真正的流式處理引擎,因為其能夠從一個操作到另一個操作、按元組逐個發(fā)送數(shù)據(jù),而不需要在批處理中執(zhí)行計算。對于批處理,批量數(shù)據(jù)被視為流數(shù)據(jù)的有限集合。Flink中包括顯式迭代操作符,應(yīng)用了批量迭代器,以得到完整的DataSets,而增量迭代器則僅被應(yīng)用于在上一次迭代地過程中發(fā)生了變化的數(shù)據(jù)項上。
2.1.1 測試平臺配置
本文在DAS-4[11]上執(zhí)行了評估實驗,DAS-4是一個多核集群,通過InfiniBand(IB)和千兆以太網(wǎng)(GbE)互連。表1給出了這個系統(tǒng)主要的硬件和軟件特性。每個節(jié)點包括8個核心,14 G內(nèi)存和2個1TB的磁盤。
本文使用大數(shù)據(jù)評價工具(BDEv)進行實驗,該工具是MapReduce評價程序的改進版本。BDEv可以自動化實施框架的配置、輸入數(shù)據(jù)集的生成、實驗的執(zhí)行、以及結(jié)果的采集。
表1 DAS-4節(jié)點配置
2.1.2 框架
在軟件設(shè)置方面,本文的實驗評價中使用了穩(wěn)定版本的Hadoop(2.7.2)、Spark(1.6.1)和Flink(1.0.2)。Spark和Flink均以獨立模式部署在HDFS 2.7.2中。本文根據(jù)相對應(yīng)的用戶指南和系統(tǒng)特性對所有框架進行了精心配置。
表2給出了本文實驗使用的框架配置。除了GbE實驗之外,所有框架的網(wǎng)絡(luò)接口被配置于使用IP over InfiniBand(IPoIB)。
表2 Hadoop,Spark和Flink框架的配置
2.1.3 基準
表3給出了實驗使用的基準及其特征,例如CPU限制、I/O限制(磁盤和網(wǎng)絡(luò))或迭代次數(shù)。其中第三列和第四列給出了輸入數(shù)據(jù)集的大小和使用的數(shù)據(jù)集生成器。其中還包括了基準代碼來源。因此,每個框架以相同的算法為基礎(chǔ),使用一個基準實施,從HDFS讀取相同的輸入,并向HDFS寫入相同的輸出。雖然算法保持不變,每個框架根據(jù)其可用的功能,采用一個優(yōu)化后的版本。因此,每個基準使用相同算法的不同實施,以得到相同的結(jié)果。
表3 基準來源
(1)WordCount:對輸入數(shù)據(jù)集中每個詞語出現(xiàn)進行計數(shù)。Hadoop發(fā)行版中,WordCount及其輸入數(shù)據(jù)生成器,RandomTextWriter均作為一個樣例(表中的“ex.”)給出。
(2)Grep:對輸入數(shù)據(jù)集中正則表達式的匹配進行計數(shù)。Hadoop發(fā)行版中包括了Grep,Spark和Flink的源代碼則改編自樣例。Grep的數(shù)據(jù)生成器也是RandomTextWriter。
(3)TeraSort:對大小為100字節(jié)的鍵值元組進行排序。Hadoop中已經(jīng)包括了TeraSort和TeraGen數(shù)據(jù)生成器的實現(xiàn)。然而,TeraSort沒有作為一個樣例提供給Spark和Flink,其源代碼源于文獻[12]。
(4)連接組件:尋找一個圖形的連接組件[13]的迭代式圖算法。其被囊括在Pegasus[14]中,Pegasus是建立在Hadoop之上的一個圖形挖掘系統(tǒng)。而對于Spark和Flink,連接組件則分別由Graphx[14]和Gelly[15]提供支持,Graphx和Gelly均為面向圖形的API。使用DataGen工具對輸入數(shù)據(jù)集進行設(shè)置,該工具被包括在HiBench基準套件中。
(5)PageRank:通過統(tǒng)計鏈接到每個元素的其他元素的數(shù)量和質(zhì)量,對元素進行評級的迭代圖算法。Pegasus中包含了用于Hadoop的PageRank,用于Spark和Flink的源代碼則來自于該樣例。
(6)K均值:將一組樣本劃分到K個聚類[16]中的迭代聚類算法。Apache Mahout[17]在Hadoop上實現(xiàn)了這一算法,并給出了數(shù)據(jù)集生成器,GenKmeansDataSet,而Spark則使用了其機器學(xué)習(xí)庫MLlib提供的高效實現(xiàn)。
2.1.4 實驗評價
為了進行全面的實驗分析,本文的評價包括了兩個方面:框架的性能和一些配置參數(shù)的影響。使用13、25、37、49個節(jié)點執(zhí)行了測試基準。每個集群大小n表示1個主節(jié)點和n-1個從屬節(jié)點。表3中給出了每個基準的輸入數(shù)據(jù)大小。
實驗中使用了不同的HDFS數(shù)據(jù)塊大小、輸入數(shù)據(jù)大小,并考慮到了最大集群規(guī)模下的網(wǎng)絡(luò)互連和線程配置。本文在實驗中選取了三個基準:WordCount、TeraSort和PageRank,分別代表著三種類型的工作負載:CPU限制、IO限制和迭代。
使用上述相同大小的輸入數(shù)據(jù),分別對數(shù)據(jù)塊大小為64、128、256和512 MB的HDFS進行了評價(見表3)。在數(shù)據(jù)規(guī)模實驗中,WordCount和TeraSort處理了100、150、200和250 GB的數(shù)據(jù),而PageRank則處理了9、12.5、16和19.5 GB的數(shù)據(jù)。本文實驗評價的網(wǎng)絡(luò)互連為GbE和IPoIB,使用每個接口對網(wǎng)絡(luò)進行配置,以進行shuffle操作和HDFS復(fù)制。這些實驗都使用了最大化的數(shù)據(jù)規(guī)模,以最大限度提高測試基準的計算要求,其中WordCount和TeraSort的最大數(shù)據(jù)為250 GB,PageRank的最大數(shù)據(jù)為19.5 GB。
網(wǎng)絡(luò)的線程配置決定了每個節(jié)點的計算資源被分配到Java進程和線程的方式。另一方面,Hadoop在映射器和約減器之間分配CPU核心,其中映射器和約減器均為單線程進程。另一方面,Spark和Flink則分別使用Worker和Taskmanager作為并行處理多個任務(wù)的多線程管理器。本文的評價試驗中,每個管理器的管理器數(shù)量/核心數(shù)量的配置為1/8,2/4,4/2和8/1。
2.2.1 性能和可擴展性
所有基準的執(zhí)行時間如圖1所示。這些圖證明了與Hadoop相比,Spark和Flink實現(xiàn)了重要的性能改進。當(dāng)使用最大的集群規(guī)模時,Spark在WordCount和K均值中取得了最優(yōu)的結(jié)果,F(xiàn)link則在PageRank中表現(xiàn)較佳。Spark和Flink在Grep、TeraSort和連接組件上則得到了相似的結(jié)果。
Spark在WordCount中得到了最好的結(jié)果,這是因為其API提供了reduceByKey()函數(shù)(按鍵約減函數(shù)),對每個詞語的出現(xiàn)次數(shù)進行加和。Flink則使用groupBy().sum()方法(分組后加和),該方法針對此類工作負載的優(yōu)化較差。此外,與其他基準相比,WordCount的CPU約束性讓Flink的內(nèi)存優(yōu)化變得不太明顯,甚至在計算結(jié)果時帶來了一定量的額外開銷。在Grep中,Spark和Flink的性能大幅超越了Hadoop。最重要的原因是MapReduce的API不適用于這一基準。在Hadoop中,該基準使用兩個MapReduce作業(yè):一個用于搜索模式,另一個對結(jié)果進行排序。這使得生成并寫入HDFS的內(nèi)存副本的數(shù)量非常大。Spark和Flink采用的方法則與之不同,其依靠filer()函數(shù)(過濾函數(shù))對輸入行進行匹配,而不用對其進行復(fù)制。接下來,對選出的行進行計數(shù),并在內(nèi)存中排序。此外,Hadoop在map()函數(shù)內(nèi)執(zhí)行模式匹配,該函數(shù)僅能使用一半的節(jié)點CPU核心。在Spark和Flink中,所有操作的并行度被設(shè)為集群中的CPU核心的總數(shù)量。
圖1 實驗性能結(jié)果
在TeraSort基準中,Hadoop與Spark和Flink表現(xiàn)出了最小的性能差異。這主要是因為Hadoop最初就是針對排序而設(shè)計,這是MapReduce引擎的核心組件之一。盡管Spark和Flink的性能優(yōu)于Hadoop,但Hadoop的高擴展性使其可以得到較好的結(jié)果,特別是當(dāng)使用的節(jié)點數(shù)量為49個的時候。Spark和Flink的在此基準上的實驗統(tǒng)計結(jié)果則不相上下。
對于迭代算法(圖1),Spark和Flink的性能明顯優(yōu)于Hadoop。如上所述,F(xiàn)link和Spark都提供了圖算法Graphx和Gelly的優(yōu)化庫,這兩個框架的連接組件的實驗結(jié)果非常相似。與之不同,PageRank的實現(xiàn)則是從樣例中推導(dǎo)出的。在該基準中,F(xiàn)link得到了最優(yōu)的性能,這主要是由于其使用了增量迭代,即僅處理那些沒有達到最終值的元素。但是,Spark在K均值上得到了最優(yōu)的結(jié)果,這得益于其優(yōu)化的MLlib庫。
圖2 不同HDFS數(shù)據(jù)塊大小的影響
圖3 執(zhí)行不同規(guī)模問題時的時間比較
2.2.2 配置參數(shù)的影響
(1)HDFS數(shù)據(jù)塊大小:圖2給出了使用不同的HDFS數(shù)據(jù)塊大小所得到的性能數(shù)值。HDFS在一定大小的數(shù)據(jù)塊中對數(shù)據(jù)進行管理,數(shù)據(jù)塊的大小決定了每個任務(wù)中讀取的數(shù)據(jù)量。在WordCount基準的測試中,數(shù)據(jù)塊的大小對性能的影響不大,唯一的例外是Hadoop,其在數(shù)據(jù)塊被配置為128MB時得到了最優(yōu)的結(jié)果。Spark在數(shù)據(jù)塊設(shè)為64MB時得到了最優(yōu)的結(jié)果,而Flink則在數(shù)據(jù)塊被設(shè)為128 MB和512 MB時取得了幾乎相同的結(jié)果。數(shù)據(jù)塊大小對TeraSort基準的影響較大,而最優(yōu)的數(shù)值則取決于框架:Hadoop為256 MB,Spark和Flink為 64 MB。在PageRank基準中,Hadoop在數(shù)據(jù)塊設(shè)為64 MB時得到了最優(yōu)的結(jié)果,其性能與數(shù)據(jù)塊的大小成反比。在該基準中,Spark和Flink沒有受到HDFS數(shù)據(jù)塊大小的影響。Hadoop在HDFS中存儲中間結(jié)果,因此其受到數(shù)據(jù)塊大小配置的影響較大。Spark和Flink將中間結(jié)果存儲在內(nèi)存中,因此僅在讀取初始數(shù)據(jù)和寫入最終輸出時,會受到HDFS配置的影響。
表4 不同網(wǎng)絡(luò)配置的運行時間比較(秒)
表5 不同線程配置的運行時間比較(s)
(2)輸入數(shù)據(jù)的規(guī)模:圖3給出了執(zhí)行不同規(guī)模問題時的性能。在WorkCount基準中,Hadoop和Flink的曲線變化幅度要大于Spark,因此對于WorkCount基準來說,Spark的擴展性更好。TeraSort基準的測試中也是如此,且隨著輸入數(shù)據(jù)的增大,Spark與Flink之間的差距也隨之變大。因此,對于TeraSort基準,Spark是最具擴展性的框架。在PageRank基準測試中,F(xiàn)link表現(xiàn)出了高于Hadoop和Spark的可擴展性,后兩者的變化幅度更大。由于Flink使用的增量迭代避免了對整個數(shù)據(jù)集的重新處理。而且Flink通過高效內(nèi)存管理避免了主要的垃圾收集,使得Flink成為PageRank測試中擴展性最好的框架。
(3)互連網(wǎng)絡(luò):表4給出了當(dāng)使用GbE和IPoIB時三個框架的性能。網(wǎng)絡(luò)不僅會影響到shuffle階段節(jié)點間的通信,而且在向HDFS進行寫入操作的過程中也會造成影響。在WordCount中,Spark是唯一從IPoIB的使用中獲得略微的性能提升的框架。TeraSort是本文實驗評價中,網(wǎng)絡(luò)密集程度最高的基準,其中所有框架的性能都得到了提升。在TeraSort中使用IPoIB分別給Hadoop,Spark和Flink帶來了12%,41%和11%的性能改善。在PageRank中,當(dāng)使用IPoIB時,Hadoop和Spark都出現(xiàn)了性能提升,而Flink的數(shù)值則保持不變。因此,IPoIB提供的高帶寬對基于塊的流水線數(shù)據(jù)處理的Spark的適用性要高于基于元組的Flink。
(4)線程配置:表5給出了在不同的線程配置下,三個框架的性能。在Hadoop中,最佳配置為4個映射器和4個約減器,WordCount是一個例外,其中的最佳配置為7個映射器和1個約減器。這是因為WordCount的CPU約束的行為,其中大部分計算通過映射器執(zhí)行,因此增加映射器的數(shù)量能夠減少執(zhí)行時間。Spark中,8核心/1個Worker是最優(yōu)配置,PageRank則是一個例外,其中最優(yōu)配置為2核心/4個Worker。Spark采用相同的JVM對PageRank的不同迭代進行計算,涉及到了大量的目標創(chuàng)建/破壞。因此,使用更小的JVM能夠降低垃圾收集停止的開銷。然而,這樣也會降低每個Worker中的并行度,并對在其中計算的一些服務(wù)進行復(fù)制。盡管該配置帶來的性能很差,但Spark成功執(zhí)行了所有的基準。不同于Spark,當(dāng)在Flink中配置8個任務(wù)管理器/1個核心時,實驗沒有順利完成完成(內(nèi)存不足)。從中可以看到,F(xiàn)link中不適合采用較小的JVM。在Flink中,迭代算法受到垃圾收集的影響較小,因為其為了避免Java目標的創(chuàng)建和破壞而進行了內(nèi)存優(yōu)化。
本文對Hadoop,Spark和Flink的可擴展性方面的性能進行了評價,并且在實驗中考慮到了不同的框架配置。實驗結(jié)果表明:當(dāng)使用最大的集群規(guī)模時,Spark在WordCount和K均值中取得了最優(yōu)的結(jié)果,F(xiàn)link則在PageRank中表現(xiàn)較佳。Spark和Flink在Grep、TeraSort和連接組件上則得到了相似的結(jié)果,Spark和Flink各有優(yōu)勢。此外,Spark納入了更豐富的操作集合和更多的工具。而Flink包含了一些有趣和新穎的設(shè)計理念,部分設(shè)計理念也得到Spark的采用。
未來,本文計劃進一步研究更多的配置參數(shù)對這些框架的性能所造成的影響(例如溢出閾值、網(wǎng)絡(luò)緩沖等)。還打算開展一個類似的評價,但主要著眼于利用Spark、Flink和其他流處理框架,對工作負載進行流式處理。
:
[1] 王元卓, 靳小龍, 程學(xué)旗. 網(wǎng)絡(luò)大數(shù)據(jù):現(xiàn)狀與展望[J]. 計算機學(xué)報, 2013, 36(6): 1125-1138.
[2] 徐計,王國胤,于洪. 基于粒計算的大數(shù)據(jù)處理[J]. 計算機學(xué)報,2015,38(08):1497-1517.
[3] White T, Cutting D. Hadoop : the definitive guide[J]. O’reilly Media Inc Gravenstein Highway North, 2009, 215, 41(11):1-4.
[4] 孫競, 余宏亮, 鄭緯民. 支持分布式存儲刪冗的相似文件元數(shù)據(jù)集合索引[J]. 計算機研究與發(fā)展, 2013, 50(1): 197-205.
[5] Veiga J, Expósito R R, Taboada G L, et al. Analysis and evaluation of MapReduce solutions on an HPC cluster [J]. Computers & Electrical Engineering, 2016, 50(C): 200-216.
[6] 宋杰,孫宗哲,毛克明,鮑玉斌,于戈. MapReduce大數(shù)據(jù)處理平臺與算法研究進展[J]. 軟件學(xué)報,2017,28(03):514-543.
[7] 王李進,尹義龍,鐘一文. 逐維改進的布谷鳥搜索算法[J]. 軟件學(xué)報,2013,24(11): 2687-2698.
[8] Zaharia M, Chowdhury M, Franklin M J, et al. Spark: cluster computing with working sets[C]// Usenix Confer-ence on Hot Topics in Cloud Computing. USENIX Association, 2010: 10-17.
[9] Alexandrov A, Bergmann R, Ewen S, et al. The Stratosphere platform for big data analytics[J]. Vldb Journal, 2014, 23(6): 939-964.
[10] 馮興杰,王文超. Hadoop與Spark應(yīng)用場景研究[J]. 計算機應(yīng)用研究,2018, (09):1-8.
[11] Karasti H, Baker K S, Millerand F. Infrastructure Time: Long-term Matters in Collaborative Development[J]. Computer Supported Cooperative Work, 2010, 19(3-4): 377-415.
[12] Garcíagil D, Ramírezgallego S, García S, et al. A comparison on scalability for batch big data processing on Apache Spark and Apache Flink[J]. Big Data Analytics, 2017, 2(1): 1-12.
[13] 林香,黃致建,郝艷華. 弧形燕尾型榫連接組件三維接觸分析[J]. 武漢理工大學(xué)學(xué)報(信息與管理工程版),2010,32(3): 427-429.
[14] Kang U, Tsourakakis C E, Faloutsos C. PEGASUS: A Peta-Scale Graph Mining System Implementation and Observations[C]// Ninth IEEE International Conference on Data Mining. IEEE Computer Society, 2009: 229-238.
[15] Junghanns M, Teichmann N, Rahm E. Analyzing extended property graphs with Apache Flink[C]// ACM SIGMOD Workshop on Network Data Analytics. ACM, 2016: 301-310.
[16] 王慧賢,靳惠佳,王嬌龍,江萬壽. k均值聚類引導(dǎo)的遙感影像多尺度分割優(yōu)化方法[J]. 測繪學(xué)報,2015,44(5): 526-532.
[17] Faghri F, Hashemi S H, Babaeizadeh M, et al. Toward Scalable Machine Learning and Data Mining: the Bioinformatics Case[J]. 2017, 35(7): 1098-1106.