馬卿云 季航旭 趙宇海 毛克明 王國仁
1(東北大學(xué)計算機(jī)科學(xué)與工程學(xué)院 沈陽 110169)
2(東北大學(xué)軟件學(xué)院 沈陽 110169)
3(北京理工大學(xué)計算機(jī)學(xué)院 北京 100081)
隨著物聯(lián)網(wǎng)、移動互聯(lián)網(wǎng)、產(chǎn)業(yè)互聯(lián)網(wǎng)和社交媒體等技術(shù)的飛速發(fā)展,每天都會產(chǎn)生大量的數(shù)據(jù),人們已經(jīng)身處大數(shù)據(jù)時代[1].根據(jù)國際數(shù)據(jù)公司(International Data Corporation, IDC)的預(yù)測,到2025年,全球的數(shù)據(jù)量將是現(xiàn)在的10倍,達(dá)到175 ZB.
大數(shù)據(jù)中有著豐富的信息,并且蘊(yùn)含著巨大的價值[2].谷歌通過用戶搜索詞頻的變化成功對冬季流感進(jìn)行了預(yù)測,沃爾瑪通過分析消費者購物行為對紙尿褲和啤酒進(jìn)行共同銷售,這些耳熟能詳?shù)陌咐加∽C了這一點.但隨著數(shù)據(jù)產(chǎn)生速度的加快,數(shù)據(jù)量的急劇增長,如何對龐大的數(shù)據(jù)進(jìn)行處理成為了新的難題.傳統(tǒng)的單機(jī)處理已經(jīng)無法滿足大數(shù)據(jù)的需求,分布式的大數(shù)據(jù)處理框架應(yīng)運而生.谷歌首先提出了用于大規(guī)模數(shù)據(jù)并行計算的編程模型MapReduce[3],引起了極大的反響,也因此促使了Hadoop[4]的誕生.之后為了改進(jìn)傳統(tǒng)MapReduce中運行效率低下的問題,基于內(nèi)存計算的Spark[5]被提出.時至今日,為了追求更快的處理速度、更低的時延,F(xiàn)link[6]開始嶄露頭角,并得到了飛速的發(fā)展.
與此同時,隨著云計算[7]的興起,包括谷歌、微軟、阿里巴巴等在內(nèi)的互聯(lián)網(wǎng)公司都提供了大數(shù)據(jù)存儲與分析的相關(guān)服務(wù),眾多企業(yè)開始選擇將自己的業(yè)務(wù)上“云”.這些提供云服務(wù)的公司需要存儲和處理的數(shù)據(jù)同樣是海量的,為了更好地為客戶提供服務(wù),提供云服務(wù)的公司通常都會在各地建立數(shù)據(jù)中心[8],例如微軟和谷歌在世界各地就分布著超過十個的數(shù)據(jù)中心.各數(shù)據(jù)中心之間經(jīng)常需要聯(lián)合進(jìn)行數(shù)據(jù)分析,此時分布式大數(shù)據(jù)處理框架依然是不二之選.跨數(shù)據(jù)中心的大數(shù)據(jù)分析業(yè)務(wù)許多都是數(shù)據(jù)密集型作業(yè),作業(yè)運行過程中,通常需要使用數(shù)據(jù)分區(qū)方法將相同鍵的數(shù)據(jù)發(fā)送到同一數(shù)據(jù)中心進(jìn)行處理,而各個數(shù)據(jù)中心之間通常相隔較遠(yuǎn),這樣會產(chǎn)生大量的網(wǎng)絡(luò)傳輸開銷,導(dǎo)致數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸時間成為大數(shù)據(jù)分析作業(yè)的瓶頸.由于網(wǎng)絡(luò)提供商硬件設(shè)備的不同,各數(shù)據(jù)中心之間的帶寬通常差異較大,這樣便會形成異構(gòu)帶寬的分布式環(huán)境[9].當(dāng)然,即使在同構(gòu)的集群中,也可能因為某些節(jié)點上的作業(yè)搶占了帶寬而導(dǎo)致集群環(huán)境中各節(jié)點帶寬異構(gòu).綜上所述,在異構(gòu)帶寬環(huán)境下,如何高效地進(jìn)行數(shù)據(jù)分區(qū)是一個急需解決的問題.
數(shù)據(jù)分區(qū)是大數(shù)據(jù)框架的一個基本功能,通過數(shù)據(jù)分區(qū)可以將各分區(qū)數(shù)據(jù)交給不同的節(jié)點進(jìn)行處理.常用的數(shù)據(jù)分區(qū)方式有隨機(jī)分區(qū)、Hash分區(qū)和Range分區(qū)[10].其中Hash分區(qū)和Range分區(qū)都能保證具有相同鍵的數(shù)據(jù)分發(fā)到同一節(jié)點,這也為許多需要這種保證的算子提供了保障.現(xiàn)有的研究很少在數(shù)據(jù)分區(qū)時對節(jié)點的帶寬進(jìn)行考慮,在節(jié)點間異構(gòu)帶寬的情況下,傳統(tǒng)的數(shù)據(jù)分區(qū)方法效率低下,完成數(shù)據(jù)分區(qū)的時間開銷較大.針對該問題,本文提出了一種基于帶寬的數(shù)據(jù)分區(qū)方法,在帶寬異構(gòu)的集群環(huán)境下可以有效減少數(shù)據(jù)分區(qū)完成的時間.
本文的主要貢獻(xiàn)有3個方面:
1) 提出了一種基于帶寬的數(shù)據(jù)分區(qū)方法,該方法在異構(gòu)帶寬的集群下能有效減少數(shù)據(jù)分區(qū)所需的時間;
2) 在新一代大數(shù)據(jù)計算框架Flink中,對基于帶寬的數(shù)據(jù)分區(qū)方法進(jìn)行了實現(xiàn);
3) 通過實驗對基于帶寬的數(shù)據(jù)分區(qū)方法進(jìn)行了驗證,實驗結(jié)果顯示該方法可以有效地減少完成數(shù)據(jù)分區(qū)所需的時間.
針對異構(gòu)集群環(huán)境下的大數(shù)據(jù)框架優(yōu)化的研究已有不少,主要的研究方向是針對節(jié)點間計算能力的不同,為各節(jié)點分配不同的數(shù)據(jù)量或者不同的計算任務(wù).如在異構(gòu)Hadoop集群中,文獻(xiàn)[11]針對集群中節(jié)點計算性能不同的特點,以數(shù)據(jù)本地性策略為基礎(chǔ),通過在計算能力更強(qiáng)的節(jié)點放置更多的數(shù)據(jù)塊,使得計算能力強(qiáng)的節(jié)點處理更多的數(shù)據(jù),從而提升系統(tǒng)的性能;文獻(xiàn)[12-13]則針對異構(gòu)的Hadoop集群,考慮提交至集群的作業(yè)運行時需要的資源大小和集群中可用資源的數(shù)量,提出了一種新的調(diào)度系統(tǒng)COSHH,該調(diào)度系統(tǒng)可以結(jié)合Hadoop中原始的調(diào)度策略進(jìn)行使用,進(jìn)一步減少異構(gòu)Hadoop集群中作業(yè)的平均完成時間,使得MapReduce模型在異構(gòu)集群中的運行效率更高.
在異構(gòu)Spark集群中同樣有著相應(yīng)的研究,如文獻(xiàn)[14]提出了一種在異構(gòu)Spark集群下的自適應(yīng)任務(wù)調(diào)度策略,其主要考慮的是集群中各節(jié)點的計算能力不同,通過對各節(jié)點的負(fù)載和資源利用率進(jìn)行監(jiān)測來動態(tài)地調(diào)整節(jié)點任務(wù)的分配;文獻(xiàn)[15]則采用了一種主動式的數(shù)據(jù)放置策略,通過對任務(wù)所需的計算時間進(jìn)行預(yù)測,在初始數(shù)據(jù)加載過程中將數(shù)據(jù)放置在適當(dāng)?shù)墓?jié)點上,并在作業(yè)執(zhí)行的過程中進(jìn)一步對數(shù)據(jù)的放置進(jìn)行調(diào)整,縮短作業(yè)的整體運行時間.
對于數(shù)據(jù)在節(jié)點之間的傳輸,主要的研究方向是針對同構(gòu)集群中的數(shù)據(jù)傾斜問題,比如文獻(xiàn)[16]提出了一種用于MapReduce的采樣算法,在Hadoop集群中不需要對輸入數(shù)據(jù)運行額外的預(yù)采樣程序,就能比較精確地估計出中間結(jié)果的分布,從而均衡各節(jié)點的數(shù)據(jù)量;同樣是針對MapReduce框架中出現(xiàn)的數(shù)據(jù)傾斜問題,文獻(xiàn)[17]基于對Map端中間結(jié)果的采樣,提出了一種基于動態(tài)劃分的負(fù)載均衡方法,可以保證每個Reduce任務(wù)處理的數(shù)據(jù)量盡量均衡;文獻(xiàn)[18]則針對Spark提出了一種基于鍵重分配和分區(qū)切分的算法,該算法作用于中間結(jié)果的產(chǎn)生和shuffle過程中,同樣用于解決數(shù)據(jù)傾斜問題;針對數(shù)據(jù)傳輸過程的優(yōu)化通常都需要使用采樣算法來獲取數(shù)據(jù)的信息,文獻(xiàn)[19]針對大規(guī)模數(shù)據(jù)流,提出了一種改進(jìn)的水塘抽樣方法,F(xiàn)link中使用該抽樣方法實現(xiàn)了Range分區(qū).
以上研究都沒有考慮在異構(gòu)帶寬情況下,如何對數(shù)據(jù)分區(qū)方法進(jìn)行優(yōu)化.對于數(shù)據(jù)密集型作業(yè),網(wǎng)絡(luò)傳輸往往是瓶頸所在,在異構(gòu)帶寬條件下,傳統(tǒng)考慮負(fù)載均衡的數(shù)據(jù)分區(qū)方法運行效率反而低下.針對該問題,本文通過建立基于傳輸時間的數(shù)據(jù)分發(fā)模型,提供了一種基于帶寬的數(shù)據(jù)分區(qū)方法,在異構(gòu)帶寬的集群環(huán)境下可以有效地減少數(shù)據(jù)的傳輸時間.
Flink與大多數(shù)大數(shù)據(jù)框架一樣也可以分為Master和Slave節(jié)點,如圖1所示,其中充當(dāng)Master的稱為JobManager,充當(dāng)Slave的稱為TaskManager,除此之外,提交作業(yè)的節(jié)點通常稱為Client.
Fig. 1 The architecture of Flink圖1 Flink架構(gòu)圖
Flink中JobManager將接收Client提交的作業(yè),對作業(yè)進(jìn)行調(diào)度并選定TaskManager進(jìn)行任務(wù)的執(zhí)行,收集作業(yè)運行的狀態(tài),并在作業(yè)運行失敗時進(jìn)行容錯和恢復(fù),TaskManager上則真正運行著作業(yè)的各個子任務(wù)[20].通常Flink集群中會有一個JobManager和多個TaskManager.
Flink中作業(yè)會被抽象為數(shù)據(jù)流圖,通常都是一個DAG結(jié)構(gòu)[21].具體來講,作業(yè)在Client端提交后,如果是批處理作業(yè)會通過優(yōu)化器生成Optimized-Plan,如果是流處理作業(yè)則會生成StreamGraph,之后會繼續(xù)在Client統(tǒng)一轉(zhuǎn)化為JobGraph,提交給JobManager.在JobManager處接收到JobGraph之后,會將其轉(zhuǎn)化為ExecutionGraph,最后調(diào)度執(zhí)行.
大數(shù)據(jù)計算框架通常都會為用戶提供數(shù)據(jù)分區(qū)的功能[22],F(xiàn)link在其批處理API中也提供了3種常用的數(shù)據(jù)分區(qū)方法,包括Rebalance分區(qū)、Hash分區(qū)和Range分區(qū).
2.2.1 Rebalance分區(qū)
Rebalance分區(qū)是Flink中最簡單的數(shù)據(jù)分區(qū)方法,通過該分區(qū)方法可以很好地均衡每個節(jié)點上的數(shù)據(jù),但其無法保證具有相同鍵的數(shù)據(jù)分發(fā)到同一節(jié)點上.Flink中使用Round-Robin算法實現(xiàn)了Rebalance分區(qū),具體算法如算法1所示:
算法1.Rebalance分區(qū)算法.
輸入:待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions、當(dāng)前分區(qū)編號partitionToSendTo;
輸出:待分區(qū)記錄的分區(qū)編號partitionToSendTo.
①partitionToSendTo++;
② IFpartitionToSendTo≥numPartitions
③partitionToSendTo=0;
④ END IF
⑤ RETURNpartitionToSendTo.
2.2.2 Hash分區(qū)
Hash分區(qū)是使用最普遍的數(shù)據(jù)分區(qū)方法,該分區(qū)方法是基于Hash算法實現(xiàn)的[23].使用該分區(qū)方法首先會根據(jù)待分區(qū)記錄的key值得到相應(yīng)的Hash值,之后利用Hash值對分區(qū)數(shù)量取余,得到的結(jié)果作為該條記錄所屬的分區(qū)編號.因為相同的key值一定有相同的Hash值,因此Hash分區(qū)可以保證鍵相同的記錄分發(fā)到同一節(jié)點上.具體算法如算法2所示:
算法2.Hash分區(qū)算法.
輸入:待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions;
輸出:待分區(qū)記錄的分區(qū)編號partitionToSendTo.
①key=extractKey(record); /*提取記錄的key*/
② IFkey==null
③partitionToSendTo=0;
④ ELSE
⑤hash=key.hashCode;
⑥partitionToSendTo=Hash%numPartitions;
⑦ END IF
⑧ RETURNpartitionToSendTo.
2.2.3 Range分區(qū)
Range分區(qū)是一種根據(jù)所有待分區(qū)記錄的鍵的范圍進(jìn)行數(shù)據(jù)分區(qū)的方法[24],也就是說每個分區(qū)結(jié)果都包含互不相交的鍵在一定范圍內(nèi)的記錄,也因此使用Range分區(qū)方法時需要確定每個分區(qū)的邊界.為了確定每個分區(qū)的邊界,通常使用的方式是對輸入數(shù)據(jù)進(jìn)行抽樣.Flink中使用的抽樣算法是改進(jìn)后的蓄水池抽樣算法,各分區(qū)邊界的確定則是對抽樣數(shù)據(jù)進(jìn)行排序后按等比例獲取各個分區(qū)的邊界.
舉例來說,假設(shè)抽樣得到的數(shù)據(jù)按鍵排序后的結(jié)果為{(10,value1),(20,value2),(30,value3),(40,value4),(50,value5),(60,value6),(70,value7),(80,value8),(90,value9)},分區(qū)數(shù)量為3,則計算得到的邊界為{30,60},也就是說key≤30的記錄將會被發(fā)往第1個分區(qū),key∈(30,60]之間的數(shù)據(jù)會發(fā)往第2個分區(qū),key>60的數(shù)據(jù)則會發(fā)往第3個分區(qū).Range分區(qū)方法通過抽樣并等比例劃分各個分區(qū)的邊界,可以在保證鍵相同的記錄發(fā)往同一節(jié)點的同時,使得各分區(qū)擁有的數(shù)據(jù)大致相等.具體算法如算法3所示:
算法3.Range分區(qū)算法.
輸入:輸入源的分區(qū)數(shù)量numInputPartitions、待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions;
輸出:待分區(qū)記錄的分區(qū)編號partitionToSendTo.
① 使用改進(jìn)后的蓄水池抽樣算法在每個輸入源分區(qū)上進(jìn)行抽樣;
② 將各輸入源分區(qū)上的抽樣結(jié)果進(jìn)行匯總,得到sampleData[],并排序;
③ 根據(jù)分區(qū)數(shù)量numPartitions和sampleData[],計算出分區(qū)邊界rangeBoundary[];
④ 對于每條待分區(qū)的記錄record,在分區(qū)邊界rangeBoundary[]中查找出所屬的分區(qū)編號partitionToSendTo;
⑤ RETURNpartitionToSendTo.
本節(jié)我們先對最優(yōu)數(shù)據(jù)分發(fā)比例的計算建立模型.之后舉例說明異構(gòu)帶寬的集群中不同的數(shù)據(jù)分發(fā)比例對數(shù)據(jù)分區(qū)完成時間的影響,體現(xiàn)基于帶寬的數(shù)據(jù)分區(qū)方法的重要性.最后介紹針對異構(gòu)帶寬的數(shù)據(jù)分區(qū)方法的算法流程以及在Flink中的實現(xiàn).
本節(jié)對異構(gòu)帶寬環(huán)境下各節(jié)點最優(yōu)數(shù)據(jù)分發(fā)比例的計算建立模型,首先對所要用到的變量進(jìn)行定義.
Di:節(jié)點i上的初始數(shù)據(jù)量大小;
ui:節(jié)點i的上行帶寬;
di:節(jié)點i的下行帶寬;
cost:數(shù)據(jù)分發(fā)所要花費的總時間.
(1)
(2)
(3)
(4)
數(shù)據(jù)分發(fā)所要花費的總時間cost則是各節(jié)點傳輸數(shù)據(jù)所需時間的最大值.我們的目標(biāo)是最小化數(shù)據(jù)分發(fā)所需的時間,則形式化地針對數(shù)據(jù)傳輸時間的優(yōu)化模型表示為
mincost
s.t. ?i:xi≥0,
該模型是一個典型的線性規(guī)劃問題,使用計算機(jī)可以比較方便地求解.
考慮集群中參與數(shù)據(jù)分區(qū)的2個節(jié)點Slave1和Slave2,它們初始的節(jié)點信息如表1所示:
Table 1 Information of Nodes表1 節(jié)點信息表
其中Slave1節(jié)點上的初始數(shù)據(jù)量D1=320 MB,上行帶寬u1=2 Mbps,下行帶寬d1=10 Mbps.Slave2節(jié)點上的初始數(shù)據(jù)量D2=160 MB,上行帶寬u2=10 Mbps,下行帶寬d2=10 Mbps.
當(dāng)Slave1和Slave2以50%和50%的比例進(jìn)行數(shù)據(jù)分發(fā)時,可以分別計算出Slave1和Slave2傳輸?shù)臄?shù)據(jù)量大小和所需時間,具體如表2所示:
Table 2 Transmission Information on Proportional 50%∶50%表2 50%∶50%比例分配數(shù)據(jù)傳輸信息表
其中Slave1需要傳出50%的數(shù)據(jù),即160 MB,接收來自Slave2的80 MB數(shù)據(jù).Slave2則需傳出80 MB數(shù)據(jù),接收來自Slave1的160 MB數(shù)據(jù).根據(jù)Slave1和Slave2的上下行帶寬可以計算得出相應(yīng)的傳輸時間,而最終數(shù)據(jù)分區(qū)完成需要取決于傳輸最慢的節(jié)點,也就是說以50%和50%的比例進(jìn)行數(shù)據(jù)分區(qū),最終需要花費640 s來完成.
考慮以90%和10%的比例進(jìn)行數(shù)據(jù)分發(fā),也就是說數(shù)據(jù)分發(fā)結(jié)束后Slave1保留90%的數(shù)據(jù),Slave2保留10%的數(shù)據(jù).同樣可以計算出各節(jié)點所需傳輸數(shù)據(jù)量大小和相應(yīng)的時間,如表3所示:
Table 3 Transmission Information on Proportional 90%∶10%表3 90%∶10%比例分配數(shù)據(jù)傳輸信息表
其中Slave1需要傳出10%的數(shù)據(jù),即32 MB,接收來自Slave2的144 MB的數(shù)據(jù).Slave2則需傳出144 MB數(shù)據(jù),接收來自Slave1的32 MB數(shù)據(jù).同理計算出傳輸時間后,可以得到最終傳輸結(jié)束所需時間為128 s,與分配比例為50%時相比速度提高了4倍.
以建立的最優(yōu)數(shù)據(jù)分發(fā)比例模型為基礎(chǔ),可以設(shè)計出基于帶寬的數(shù)據(jù)分區(qū)算法,如算法4所示:
算法4.基于帶寬的數(shù)據(jù)分區(qū)算法.
輸入:輸入源的分區(qū)數(shù)量numInputPartitions、待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions、參與分區(qū)的節(jié)點信息instanceInfo;
輸出:待分區(qū)記錄的分區(qū)編號partitionToSendTo.
① 使用改進(jìn)后的蓄水池抽樣算法在每個輸入源分區(qū)上進(jìn)行抽樣;
② 將各輸入源分區(qū)上的抽樣結(jié)果進(jìn)行匯總,得到sampleData[],并排序;
③ 根據(jù)參與分區(qū)的節(jié)點信息instanceInfo計算出最優(yōu)數(shù)據(jù)分發(fā)比例ratio[];
④ 根據(jù)最優(yōu)數(shù)據(jù)分發(fā)比例ratio[]和得到的抽樣結(jié)果sampleData[],計算出分區(qū)邊界rangeBoundary[];
⑤ 對于每條待分區(qū)的記錄record,在分區(qū)邊界rangeBoundary[]中查找出所屬的分區(qū)編號partitionToSendTo;
⑥ RETURNpartitionToSendTo.
鑒于新一代大數(shù)據(jù)計算框架Flink的出色性能,選用Flink對基于帶寬的數(shù)據(jù)分區(qū)算法進(jìn)行了實現(xiàn).
實現(xiàn)基于帶寬的數(shù)據(jù)分區(qū)方法,需要完成最優(yōu)數(shù)據(jù)分發(fā)比例的計算和作業(yè)圖邏輯的修改.3.4節(jié)已經(jīng)提到過,計算節(jié)點的最優(yōu)數(shù)據(jù)分發(fā)比例需要節(jié)點的帶寬信息和數(shù)據(jù)量.原始的Flink無法獲取集群中各節(jié)點的帶寬信息,考慮實現(xiàn)簡便性,我們在Flink的配置文件中添加了各節(jié)點上下行帶寬的配置項,在Flink集群啟動時,各TaskManager會將自身的帶寬信息匯總到JobManager處.各節(jié)點的數(shù)據(jù)量則根據(jù)作業(yè)JobGraph中的Source算子,獲取相應(yīng)的數(shù)據(jù)源分布情況后推算得出.作業(yè)圖邏輯的修改包括采樣算法的加入和分區(qū)方法的重寫,這部分將在生成OptimizedPlan時完成,這樣可以減少JobManager處的負(fù)載.
圖2對一個基于帶寬的數(shù)據(jù)分區(qū)作業(yè)的整體流程進(jìn)行了詳細(xì)描述.如圖2中Step1~3所示,作業(yè)在Client端提交后,首先會通過優(yōu)化器優(yōu)化生成OptimizedPlan,之后將以生成的OptimizedPlan為基礎(chǔ),生成作業(yè)圖JobGraph.在作業(yè)圖生成的過程中我們添加了采樣的邏輯和用于計算分區(qū)邊界的算子,并重寫了數(shù)據(jù)分區(qū)的方法.其中計算分區(qū)邊界的算子會根據(jù)最優(yōu)數(shù)據(jù)分發(fā)比例得到數(shù)據(jù)分區(qū)的界,該結(jié)果將會通過廣播的方式發(fā)送到每個分區(qū)算子.需要注意的是,此時還在Client端,計算分區(qū)邊界的算子還沒有實際獲取到最優(yōu)數(shù)據(jù)分發(fā)比例,最優(yōu)數(shù)據(jù)分發(fā)比例的獲取需要在JobManager處完成.完成作業(yè)邏輯的修改后, 通過Step3生成的JobGraph將被提交到JobManager.
如圖2中Step4~7所示,JobManager收到作業(yè)的JobGraph后,首先會遍歷JobGraph中的算子并找到Source算子,通過Source算子中存儲的數(shù)據(jù)源信息去獲取待處理數(shù)據(jù)在集群中的分布情況,并結(jié)合作業(yè)的并行度選擇運行該作業(yè)的節(jié)點.考慮網(wǎng)絡(luò)傳輸是作業(yè)運行的瓶頸,節(jié)點的選擇策略是盡可能選擇擁有數(shù)據(jù)的節(jié)點來運行作業(yè),這樣根據(jù)數(shù)據(jù)本地性策略,可以減少Source算子讀取數(shù)據(jù)源時的網(wǎng)絡(luò)傳輸.確定作業(yè)運行的節(jié)點后,通過節(jié)點的帶寬信息和初始數(shù)據(jù)量大小,使用數(shù)據(jù)分發(fā)比例計算模塊就可以計算出各節(jié)點的最優(yōu)數(shù)據(jù)比例,該比例將會被寫回JobGraph中用于計算分界的算子.至此,包含最終作業(yè)執(zhí)行邏輯的作業(yè)圖JobGraph才真正構(gòu)建完成.最后根據(jù)JobGraph中各算子的并行度,會生成對應(yīng)的執(zhí)行圖ExecutionGraph,執(zhí)行圖中的每個任務(wù)通過Step7將部署至對應(yīng)的節(jié)點,進(jìn)行調(diào)度執(zhí)行.
Fig. 2 The process of bandwidth partitioning job圖2 基于帶寬的數(shù)據(jù)分區(qū)方法作業(yè)運行過程
實驗所用環(huán)境為4個節(jié)點的分布式集群,每個節(jié)點的處理器為Intel Xeon E5-2603 V4(6核6線程),內(nèi)存為64 GB,節(jié)點間通過千兆以太網(wǎng)連接,安裝的操作系統(tǒng)為CentOS7.集群上通過Standalone模式搭建了修改后的Flink集群,其中1臺master節(jié)點作為JobManager,另外3臺Slave節(jié)點作為TaskManager,使用的版本為Flink1.7.2.除此之外集群中還基于Hadoop2.7.5搭建了Hadoop集群,使用其中的HDFS作為分布式文件存儲系統(tǒng).集群中各節(jié)點帶寬的控制則通過工具Wondershaper[25]來實現(xiàn).
實驗使用的數(shù)據(jù)是通過TPC-H[26]基準(zhǔn)測試工具生成的數(shù)據(jù)集,該工具可以生成8種表,選取了其中較大的Lineitem表和Orders表作為數(shù)據(jù)源.其中Lineitem有16個字段,前3個字段Orderkey,Partkey,Suppkey,其中Suppkey是主鍵.Orders表有9個字段,前2個字段Orderkey和Custkey,其中Custkey是主鍵.
本節(jié)從算法開銷和算法效果2方面進(jìn)行實驗結(jié)果的說明與分析.
4.3.1 算法開銷
基于帶寬的數(shù)據(jù)分區(qū)方法的算法開銷主要包括作業(yè)圖邏輯修改、最優(yōu)數(shù)據(jù)分發(fā)比例的計算、數(shù)據(jù)采樣3部分,本文針對這3部分所需的開銷分別進(jìn)行了實驗.
作業(yè)圖邏輯的修改發(fā)生在Flink作業(yè)圖生成的過程中,主要包括采樣算子的添加、計算分界算子的添加以及分區(qū)方法的重寫等步驟.通過與未修改作業(yè)圖邏輯進(jìn)行對比,可以得到作業(yè)圖邏輯修改所需的時間.經(jīng)過5次實驗并取平均值,得到從作業(yè)提交到作業(yè)圖生成完畢所需的平均時間為185 ms,如果進(jìn)行作業(yè)圖邏輯的修改,所需的平均時間則為232 ms,即作業(yè)圖邏輯修改平均所需時間為47 ms.
最優(yōu)數(shù)據(jù)分發(fā)比例的計算是利用數(shù)學(xué)規(guī)劃優(yōu)化器Gurobi Optimizer[27]實現(xiàn)的數(shù)據(jù)分發(fā)比例計算模塊完成的,實驗對不同節(jié)點數(shù)量下最優(yōu)比例計算所需的時間進(jìn)行了測試,每個節(jié)點的帶寬和數(shù)據(jù)量大小則隨機(jī)生成.實驗結(jié)果如圖3所示,當(dāng)節(jié)點數(shù)量為5時所需的計算時間為32 ms,節(jié)點數(shù)量擴(kuò)大至1 000時計算時間仍在100 ms以內(nèi),當(dāng)節(jié)點數(shù)量達(dá)到6 000時所需的計算時間也僅為293 ms.
Fig. 3 The time of optimal ratio calculation圖3 最優(yōu)比例計算時間
為了測試數(shù)據(jù)采樣所需的開銷,我們分別運行了添加了采樣過程的作業(yè)和未添加采樣過程的作業(yè),使用作業(yè)運行的時間差作為采樣所需的開銷.實驗中使用了不同數(shù)據(jù)量大小的Lineitem表作為輸入,具體實驗結(jié)果如圖4所示,在數(shù)據(jù)量大小分別為3.6 GB,7.2 GB,14.6 GB,29.4 GB時,所需的采樣時間分別為21 s,44 s,86 s,167 s,平均每GB數(shù)據(jù)所需的采樣時間約為5.8 s.
Fig. 4 The time of sampling圖4 采樣時間
總體來說,基于帶寬的數(shù)據(jù)分區(qū)方法在作業(yè)圖邏輯修改和最優(yōu)數(shù)據(jù)分發(fā)比例的計算過程中所需的時間開銷都較小,為毫秒級別.數(shù)據(jù)采樣則相對時間開銷較大,且與數(shù)據(jù)量大小相關(guān),但在計算資源更為充足的情況下,采樣所需時間可以進(jìn)一步減少.
4.3.2 算法效果
為了探究本文提出的算法在不同的異構(gòu)帶寬條件下的效果,我們設(shè)置了帶寬異構(gòu)程度不同的4個實驗.同時為了實驗的方便,主要針對Slave1節(jié)點設(shè)置了不同的下行帶寬,這樣已經(jīng)可以涵蓋不同的數(shù)據(jù)分發(fā)比例,其他情形的異構(gòu)帶寬集群則與此類似.實驗中各節(jié)點的具體帶寬如表4所示,表4中上行帶寬在前、下行帶寬在后.
Table 4 Bandwidth Information表4 帶寬信息表 Mbps
實驗中使用的數(shù)據(jù)為3.6 GB的Lineitem表和1.63 GB的Orders表.實驗程序會先對數(shù)據(jù)集進(jìn)行分區(qū)操作,數(shù)據(jù)分區(qū)結(jié)束之后將在每個分區(qū)中進(jìn)行一次聚合,統(tǒng)計各分區(qū)最終的記錄數(shù)量,以便計算出最終各分區(qū)數(shù)據(jù)的比例.程序的執(zhí)行模式設(shè)置為Batch模式,分區(qū)方法使用Hash分區(qū)、Range分區(qū)與基于帶寬的Bandwidth分區(qū)進(jìn)行比較,驗證基于帶寬的Bandwidth分區(qū)效果.
因為實驗主要針對的是節(jié)點帶寬的影響,而實驗所使用的集群TaskManager的數(shù)量是3,因此作業(yè)運行的并行度同樣設(shè)置為3.使用的數(shù)據(jù)源則被上傳到HDFS中,3個節(jié)點上的數(shù)據(jù)量幾乎是相等的,因此可以認(rèn)為Source算子的每個并行度讀入的數(shù)據(jù)量都是相同的.對數(shù)據(jù)集進(jìn)行介紹時有過說明,Lineitem表中有3個字段是主鍵,Orders表中有2個字段是主鍵,在實驗過程中我們發(fā)現(xiàn)這2個表中并沒有明顯的數(shù)據(jù)傾斜,通過主鍵中的任一字段做數(shù)據(jù)分區(qū),實驗結(jié)果都是相似的.后續(xù)的實驗結(jié)果都是以各個表的第1個字段作為鍵來進(jìn)行數(shù)據(jù)分區(qū),也就是說數(shù)據(jù)源Lineitem和Orders都使用Orderkey作為鍵進(jìn)行數(shù)據(jù)分區(qū).
如圖5所示,在實驗1條件下,使用Bandwidth分區(qū)的作業(yè)時間在Lineitem上所需時間為198 s,在Orders上所需時間為92 s,明顯小于Hash分區(qū)和Range分區(qū)所需時間,作業(yè)運行完成整體速度提升了為2.5~3倍.
Fig. 5 Running time in different partition modes in experiment 1圖5 實驗1中不同分區(qū)模式下的執(zhí)行時間
在實驗1的條件下,可以計算出3個節(jié)點數(shù)據(jù)的最優(yōu)分配比例為4∶48∶48,通過表5和表6可以看出,使用Bandwidth分區(qū)很好地契合了最優(yōu)數(shù)據(jù)分配比例,特別是在Lineitem上實際數(shù)據(jù)分區(qū)比例與最優(yōu)比例幾乎完全相同.
Table 5 Proportion of Lineitem After Partition in Experiment 1表5 實驗1中Lineitem分區(qū)后各節(jié)點數(shù)據(jù)比例 %
Table 6 Proportion of Orders After Partition in Experiment 1表6 實驗1中Orders分區(qū)后各節(jié)點數(shù)據(jù)比例 %
如圖6所示,在實驗2條件下,使用Bandwidth分區(qū)的作業(yè)時間在Lineitem上所需時間為209 s,在Orders上所需時間為99 s,相較于Hash分區(qū)和Range分區(qū),效果同樣不錯,提升為0.6~0.7倍.
Fig. 6 Running time in different partition modes in experiment 2圖6 實驗2中不同分區(qū)模式下的執(zhí)行時間
在實驗2的條件下,計算出的各節(jié)點數(shù)據(jù)的最優(yōu)分配比例為11∶44∶44,通過表7和表8可以看出,使用Bandwidth分區(qū)后的數(shù)據(jù)分布也與最優(yōu)數(shù)據(jù)分配比例比較契合.
如圖7所示,在實驗3條件下,使用Bandwidth分區(qū)的作業(yè)時間在Lineitem上所需時間為184 s,在Orders上Bandwidth分區(qū)所需時間為68 s,相較Hash分區(qū)所需時間194 s和88 s已經(jīng)沒有太大的優(yōu)勢,但仍然能節(jié)省一些時間.
Table 7 Proportion of Lineitem After Partition in Experiment 2表7 實驗2中Lineitem分區(qū)后各節(jié)點數(shù)據(jù)比例 %
Table 8 Proportion of Orders After Partition in Experiment 2表8 實驗2中Orders分區(qū)后各節(jié)點數(shù)據(jù)比例 %
Fig. 7 Running time in different partition modes in experiment 3圖7 實驗3中不同分區(qū)模式下的執(zhí)行時間
此時節(jié)點之間數(shù)據(jù)傳輸?shù)淖顑?yōu)比例已經(jīng)是20∶40∶40,與平均分配差距已經(jīng)沒有那么大,同時還可以發(fā)現(xiàn),此次實驗條件下Range分區(qū)表現(xiàn)較差,分區(qū)完成所用時間與其他實驗相比明顯變長.分析表9和表10可以發(fā)現(xiàn),Range分區(qū)在實驗4中,對需要數(shù)據(jù)比例少的Slave1節(jié)點反而分配了更多的數(shù)據(jù),導(dǎo)致Range分區(qū)所需時間遠(yuǎn)超了其他2種分區(qū)方法.
如圖8所示,在實驗4條件下,與Hash分區(qū)相比,Bandwidth分區(qū)所需時間反而變得更長.此時節(jié)點間的最優(yōu)比例是27∶36∶36,與平均分配比例已經(jīng)十分接近,而Range分區(qū)和Bandwidth分區(qū)需要額外的采樣時間.除此之外,結(jié)合表11和表12可以發(fā)現(xiàn),采樣得到的結(jié)果并不是特別準(zhǔn)確,導(dǎo)致并不能完全按計算得到的最優(yōu)比例進(jìn)行數(shù)據(jù)分發(fā).
Table 9 Proportion of Lineitem After Partition in Experiment 3表9 實驗3中Lineitem分區(qū)后各節(jié)點數(shù)據(jù)比例 %
Table 10 Proportion of Orders After Partition in Experiment 3表10 實驗3中Orders分區(qū)后各節(jié)點數(shù)據(jù)比例 %
Fig. 8 Running time in different partition modes in experiment 4圖8 實驗4中不同分區(qū)模式下的執(zhí)行時間
Table 11 Proportion of Lineitem After Partition in Experiment 4表11 實驗4中Lineitem分區(qū)后各節(jié)點數(shù)據(jù)比例 %
結(jié)合4個實驗,可以發(fā)現(xiàn)Hash分區(qū)十分穩(wěn)定,每次實驗分區(qū)結(jié)果都十分均衡,說明數(shù)據(jù)源中并沒有明顯的數(shù)據(jù)傾斜.Range分區(qū)和Bandwidth分區(qū)則并不能每次都保證數(shù)據(jù)按預(yù)設(shè)的比例分配,主要是因為它們都需要使用采樣算法來估計數(shù)據(jù)分布,有時候采樣的結(jié)果并不是十分精確.同樣由于采樣算法的存在,Range分區(qū)和Bandwidth分區(qū)都需要額外的開銷,這也導(dǎo)致大多數(shù)時候Range分區(qū)都比Hash分區(qū)花費更多的時間.唯一例外的是在實驗1中對Orders表進(jìn)行分區(qū),原因是Range分區(qū)恰好給瓶頸節(jié)點Slave1分配了更小的比例,而Slave1下行帶寬很小,較小的數(shù)據(jù)量就會對傳輸時間產(chǎn)生較大的影響.
Table 12 Proportion of Orders After Partition in Experiment 4表12 實驗4中Orders分區(qū)后各節(jié)點數(shù)據(jù)比例 %
綜合來看,當(dāng)帶寬異構(gòu)性強(qiáng),各節(jié)點之間最優(yōu)數(shù)據(jù)分發(fā)比例比較不均衡時,基于帶寬的數(shù)據(jù)分區(qū)方法可以取得較好的效果,甚至帶來數(shù)倍的速度提升.當(dāng)帶寬異構(gòu)性較弱時,由于采樣算法需要額外的開銷,基于帶寬的數(shù)據(jù)分區(qū)方法所需時間可能會長于Hash分區(qū)方法,這種情況下可以通過更充足的計算資源來降低采樣過程所需的開銷.在實際應(yīng)用過程中,則可以綜合考慮最優(yōu)比例的計算結(jié)果和采樣所需的時間,在速度提升較為明顯時選擇使用基于帶寬的數(shù)據(jù)分區(qū)方法.
在異構(gòu)帶寬的條件下,傳統(tǒng)的數(shù)據(jù)分區(qū)方法會因為瓶頸節(jié)點的存在,導(dǎo)致數(shù)據(jù)分發(fā)效率低下.通過對各節(jié)點之間數(shù)據(jù)傳輸模型進(jìn)行分析,本文提出了一種針對異構(gòu)帶寬集群的數(shù)據(jù)分區(qū)方法,并在Flink中進(jìn)行了實現(xiàn).實驗證明:在節(jié)點間帶寬異構(gòu)的情況下,基于帶寬的數(shù)據(jù)分區(qū)方法可以極大地提升數(shù)據(jù)分區(qū)完成的速度.