徐超一,劉曉清,顧 淼,王 巍
(1 復(fù)旦大學(xué) 計(jì)算機(jī)科學(xué)技術(shù)學(xué)院,上海 200441;2 新浪,北京 100193)
數(shù)據(jù)量爆炸式增長(zhǎng)的同時(shí),數(shù)據(jù)分析的重要性也日益凸顯。多年來(lái),數(shù)據(jù)分析的各類需求日益旺盛,人們對(duì)數(shù)據(jù)分析的要求也不再局限于更強(qiáng)的數(shù)據(jù)處理能力,多個(gè)計(jì)算(分析)平臺(tái)共同進(jìn)行一個(gè)分析任務(wù)流的需求也隨之而生。
現(xiàn)實(shí)中,人們經(jīng)常根據(jù)數(shù)據(jù)分析需求的不同,來(lái)確定分析任務(wù)所使用的執(zhí)行平臺(tái),甚至一個(gè)分析任務(wù)流中的分析算子,也可能運(yùn)行在不同的計(jì)算平臺(tái)上。分析任務(wù)流在不同平臺(tái)之間的調(diào)度成為人們關(guān)注的一個(gè)重要問題。
RHEEM[1]是卡塔爾大學(xué)開源的跨平臺(tái)數(shù)據(jù)處理系統(tǒng),其支持在一個(gè)任務(wù)流中自動(dòng)調(diào)用多種平臺(tái),來(lái)優(yōu)化處理時(shí)間和處理性能。RHEEM借由內(nèi)部的優(yōu)化器,通過代價(jià)模型自動(dòng)為任務(wù)流中的算子選擇平臺(tái),獲取最優(yōu)的執(zhí)行計(jì)劃。Apache Beam[2]為開源的統(tǒng)一編程模型,用于進(jìn)行跨平臺(tái)的大數(shù)據(jù)分析處理。Beam是基于Google 的Dataflow Model[3]論文的一種實(shí)現(xiàn)。通過對(duì)數(shù)據(jù)分析的多維度規(guī)范和總結(jié),構(gòu)成了一套編程范式,實(shí)現(xiàn)了不同平臺(tái)間的統(tǒng)一,但并沒有關(guān)注任務(wù)流的調(diào)度問題。Kumar等人基于Actors模型[4]研發(fā)了一個(gè)帶圖形界面的任務(wù)流系統(tǒng)Amber[5]。但Amber的研究側(cè)重于任務(wù)流執(zhí)行過程中的實(shí)時(shí)調(diào)試,而不關(guān)注任務(wù)的調(diào)度順序。而目前針對(duì)單個(gè)任務(wù)流調(diào)度的研究已經(jīng)趨于成熟,HEFT[6]、CPOP[7]、PETS[8]等算法,以較高的調(diào)度效率被廣泛接受。但其都需要知道每個(gè)算子的時(shí)間、資源開銷以及算子間的傳輸開銷,并且都沒有考慮多平臺(tái)的條件。
為此,本文研究了多平臺(tái)環(huán)境下,數(shù)據(jù)分析任務(wù)流的調(diào)度和優(yōu)化問題。具體研究?jī)?nèi)容包括:根據(jù)多平臺(tái)的特性,提出了一種基于啟發(fā)式規(guī)則優(yōu)化的拓?fù)湔{(diào)度算法,來(lái)完成任務(wù)流調(diào)度;針對(duì)SQL算子加入了代價(jià)模型,能對(duì)任務(wù)流的SQL分析開銷做出估計(jì);訓(xùn)練了GBDT樹,為用戶動(dòng)態(tài)選擇機(jī)器學(xué)習(xí)任務(wù)的運(yùn)行平臺(tái)并確定開銷;通過實(shí)驗(yàn),驗(yàn)證了調(diào)度算法的優(yōu)化性能和代價(jià)模型的準(zhǔn)確率。
通常來(lái)說,一個(gè)大任務(wù)可以拆分成一個(gè)子任務(wù)的集合(不同的子任務(wù)間存在一定的依賴關(guān)系)。此時(shí),可以通過一個(gè)有向無(wú)環(huán)圖DAG(Directed Acyclic Graph)的形式來(lái)表示這些關(guān)系,形成了任務(wù)的DAG模型。因此,任務(wù)流的調(diào)度問題就轉(zhuǎn)化為DAG的調(diào)度問題。顯然,當(dāng)考慮任務(wù)調(diào)度時(shí),首要滿足的是子任務(wù)之間的依賴關(guān)系。
拓?fù)渑判?,是指?duì)一個(gè)有向無(wú)環(huán)圖G進(jìn)行排序。對(duì)于圖中邊的集合E(G),若存在邊∈E(G),則在排序序列中,頂點(diǎn)u一定出現(xiàn)在頂點(diǎn)v之前??梢?,拓?fù)渑判虻奶匦钥梢院芎玫仄鹾献尤蝿?wù)之間的依賴限制。通過對(duì)DAG做拓?fù)渑判蚓涂梢缘玫揭粋€(gè)滿足任務(wù)流依賴關(guān)系的執(zhí)行順序。
Kahn算法[9]是進(jìn)行拓?fù)渑判虻某R娝惴ǎ蒏ahn在1962年提出。其中心思路是:每次取出一個(gè)沒有先驅(qū)節(jié)點(diǎn)(即入度等于0)的節(jié)點(diǎn),將其放入排序序列中,然后將這個(gè)節(jié)點(diǎn)的所有后繼節(jié)點(diǎn)的入度減一,重復(fù)這一過程直至排序完成。該算法的時(shí)間復(fù)雜度為O(E+V)。但需要注意的是:有效的拓?fù)湫蛄胁⒉皇俏ㄒ坏?,每次使用Kahn算法得到的拓?fù)湫蛄幸膊灰欢ㄊ且恢碌摹?/p>
如圖1所示,給出的一種存在兩個(gè)不同平臺(tái)算子的任務(wù)流。從A算子讀取輸入后,分別通過后續(xù)的其它算子計(jì)算,直至結(jié)束。對(duì)于這樣一個(gè)任務(wù)流,其拓?fù)湫蛄酗@然不唯一??赡艿男蛄杏?A,B,C,D,E,F,G,H,I,J,K,L),(A,B,D,F,C,E,G,I,J,K,L,H)等等。在非常多的可能序列中,再次考慮兩種比較極端的調(diào)度序列。第一種是序列L1(A,B,C,D,E,J,F,G,K,I,L,H),第二種是序列L2(A,B,D,F,C,E,G,H,I,J,K,L)。顯然,序列L1和序列L2都是滿足拓?fù)渑判虻男蛄校蛄蠰1的調(diào)度效果是一個(gè)平臺(tái)P1的算子和平臺(tái)P2的算子交錯(cuò)運(yùn)行。而序列L2的調(diào)度效果是優(yōu)先將滿足依賴的同平臺(tái)算子全部調(diào)度完后,再考慮調(diào)度其他平臺(tái)的算子。
圖1 多平臺(tái)任務(wù)流示例
對(duì)于序列L1來(lái)說,頻繁的平臺(tái)切換會(huì)帶來(lái)額外的切換開銷。更關(guān)鍵的是在進(jìn)行平臺(tái)切換的同時(shí),需要將之前的計(jì)算結(jié)果保存,又會(huì)帶來(lái)額外的內(nèi)存開銷。而過多的中間結(jié)果緩存會(huì)在一定程度上影響后續(xù)節(jié)點(diǎn)的計(jì)算速度。以Spark平臺(tái)為例,過多的中間結(jié)果被緩存,會(huì)導(dǎo)致可用內(nèi)存不足。此時(shí),在讀入當(dāng)前節(jié)點(diǎn)數(shù)據(jù)時(shí),需要進(jìn)行GC和內(nèi)存置換,從而會(huì)降低任務(wù)的處理速度。另一方面,如果一個(gè)平臺(tái)內(nèi)的算子操作可以連續(xù)進(jìn)行,那么計(jì)算的中間結(jié)果可以用平臺(tái)的內(nèi)部結(jié)構(gòu)表示,不僅使得整體的運(yùn)算速度變快,也使整個(gè)流程的類型十分安全,可以應(yīng)對(duì)復(fù)雜的計(jì)算流程,大大提高系統(tǒng)的魯棒性。而例如Spark這樣的平臺(tái),其原生數(shù)據(jù)結(jié)構(gòu)還使得運(yùn)算結(jié)果的展示具有極高的靈活性。
可以想象,當(dāng)任務(wù)流的復(fù)雜度提升,或是任務(wù)流中平臺(tái)數(shù)量增加,這兩種調(diào)度序列所帶來(lái)的開銷差別是巨大的。
樸素的拓?fù)渑判蚩梢越鉀Q任務(wù)流的調(diào)度問題,但在任務(wù)流的整體開銷上,普通的拓?fù)渑判蚴谴嬖谝欢▎栴}的。由于拓?fù)湫蛄惺遣晃ㄒ坏模瑢?duì)于一個(gè)給定的任務(wù)流J,使用不同的拓?fù)湫蛄袑?duì)其進(jìn)行調(diào)度,最終整體的內(nèi)存和時(shí)間開銷是不同的。
因此,該問題的描述是給定一個(gè)有向無(wú)環(huán)圖G(V,E),圖中節(jié)點(diǎn)E表示系統(tǒng)中不同平臺(tái)的算子,圖中有向邊V表示算子之間的依賴關(guān)系。在滿足依賴關(guān)系約束的情況下,本文希望找到一種節(jié)點(diǎn)序列,使得通過該順序執(zhí)行任務(wù)流時(shí),較少的其它中間結(jié)果被緩存,且進(jìn)行的平臺(tái)切換最少,從而能優(yōu)化整體的時(shí)間開銷。
針對(duì)這一問題,如果預(yù)先將所有的拓?fù)湫蛄星蟪觯僖灰贿M(jìn)行比較,選出最優(yōu)解,算法的復(fù)雜度會(huì)過高。所以本文選擇通過啟發(fā)式規(guī)則來(lái)求解。
本算法思路:先將輸入節(jié)點(diǎn)置入結(jié)果序列,然后將其指向的節(jié)點(diǎn)入度減1,每次選擇下一個(gè)節(jié)點(diǎn)(入度為0的節(jié)點(diǎn))時(shí),遵循三條規(guī)則:
(1)總是優(yōu)先選擇與之前節(jié)點(diǎn)同平臺(tái)的節(jié)點(diǎn);
(2)如果有多個(gè)同平臺(tái)的節(jié)點(diǎn),優(yōu)先選擇當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn);
(3)如果不存在同一平臺(tái)的節(jié)點(diǎn),則選擇任意滿足依賴約束的節(jié)點(diǎn)。算法偽代碼如算法1所示。
算法1基于啟發(fā)式規(guī)則優(yōu)化的拓?fù)湔{(diào)度算法
輸入有向無(wú)環(huán)圖G=(V,E),S為入度為0的節(jié)點(diǎn)集合
輸出調(diào)度序列L
L={}
WhileSis not empty do
//啟發(fā)式規(guī)則優(yōu)化
ifShas the same-platform-node with tail ofL
ifShas the successor of the tail ofL
remove the successornfromS
addntoL
else
remove the same-platform-noden’ fromS
addn’ toL
else
remove another nodekfromS
addktoL
for each node m with edge e fromntomdo
remove edgeefromG
ifmhas no other incoming edges then
insertmtoS
returnL
仍以圖1為例,序列L1(A,B,C,D,E,J,F,G,K,I,L,H)和序列L2(A,B,D,F,C,E,G,I,H,J,K,L)。其中序列L2為本算法得出的調(diào)度序列。輸入算子A與后繼算子的切換不計(jì)入考量,L1序列總共切換了8次平臺(tái),L2序列總共切換了2次。同時(shí)可以看出,L1序列中,在計(jì)算大部分節(jié)點(diǎn)時(shí),都有無(wú)關(guān)的中間結(jié)果被緩存。如在計(jì)算G節(jié)點(diǎn)時(shí),節(jié)點(diǎn)F和節(jié)點(diǎn)G的結(jié)果緩存,則增加了內(nèi)存和時(shí)間開銷。而在L2序列中,基本保證了任務(wù)能夠以一種近乎深度遍歷的方式執(zhí)行,減少了與當(dāng)前計(jì)算無(wú)關(guān)的中間結(jié)果的緩存。
如算法1所示,本算法對(duì)圖中的每個(gè)節(jié)點(diǎn)和每條邊都會(huì)進(jìn)行一次遍歷。因此,本算法的復(fù)雜度為O(E+V),其中E為圖中的節(jié)點(diǎn)數(shù),V為圖中的邊數(shù)。
本文主要考慮的任務(wù)流算子為SQL類算子和機(jī)器學(xué)習(xí)算子,這些算子的主要運(yùn)行平臺(tái)是Spark和Python。因此,本文針對(duì)這兩種情況分別加入相應(yīng)的代價(jià)模型。
2.2.1 針對(duì)SparkSQL的代價(jià)模型
代價(jià)模型涵蓋了3個(gè)基本的SQL操作:Projection、Selection和Join。
首先,系統(tǒng)收集一些SparkSQL相關(guān)的物理參數(shù)。參考Baldacci[10]等人的工作,系統(tǒng)收集的物理參數(shù)見表1。
表1 物理參數(shù)收集表
根據(jù)表1可以初步得到公式(1)和公式(2):
(1)
(2)
其中,Read(RSize)為根據(jù)讀入大小得到的讀入時(shí)間,Write(WSize)為根據(jù)寫出大小得到的寫出時(shí)間。
計(jì)算執(zhí)行開銷最簡(jiǎn)單的方式就是將其運(yùn)行一遍,但這樣就失去了代價(jià)模型的意義。事實(shí)上,代價(jià)模型不需要知道一個(gè)精確的代價(jià),而只需要一個(gè)估計(jì)值。所以,代價(jià)模型可以對(duì)結(jié)果的數(shù)據(jù)量進(jìn)行估計(jì),再通過數(shù)據(jù)量的估算值來(lái)推斷執(zhí)行的代價(jià)。
在查詢語(yǔ)句中,用戶常常會(huì)使用一些過濾條件(即本文中提到的Filter操作),在SystemR[11]中對(duì)過濾條件的估算只考慮到了數(shù)據(jù)連續(xù)且分布均勻的情況。但現(xiàn)實(shí)中的數(shù)據(jù)往往不是連續(xù)均勻分布的。所以考慮數(shù)據(jù)的分布情況,對(duì)數(shù)據(jù)量的估計(jì)是至關(guān)重要的。
其中,直方圖是一種能夠表示數(shù)據(jù)分布的統(tǒng)計(jì)方式。其通過分桶策略對(duì)數(shù)據(jù)做出劃分,從而得到大致的分布情況。本系統(tǒng)選擇了等深直方圖來(lái)了解數(shù)據(jù)分布情況。與普通的等寬直方圖不同,等深直方圖盡可能的保證桶的深度相同。Piatetsky-Shapiro[12]的研究指出,等深直方圖的魯棒性更強(qiáng)。
直方圖的構(gòu)建需要有序數(shù)據(jù)??紤]到數(shù)據(jù)量增大后導(dǎo)致的排序開銷,所以本系統(tǒng)使用蓄水池采樣,采樣后再進(jìn)行排序和直方圖的構(gòu)建。
系統(tǒng)將過濾條件分為3類:?jiǎn)瘟械姆秶樵儭瘟械牡戎挡樵?、多列查詢。下面將分別介紹3種情況的估計(jì)方法。
(1)單列范圍查詢。對(duì)于單列的范圍查詢,可以通過等深直方圖來(lái)進(jìn)行估計(jì)。對(duì)于給定的一個(gè)范圍查詢,只需要知道其覆蓋范圍內(nèi)的所有桶的深度即可。如果遇到桶的范圍與查詢范圍有部分交集的情況,可以交集占桶大小的比例再做一次估算。此時(shí),需要假設(shè)數(shù)據(jù)的分布是均勻且連續(xù)的。對(duì)于其它類型的數(shù)據(jù),一般是將其映射成數(shù)字后再計(jì)算比例。
(2)等值查詢。對(duì)于等值查詢,需要知道記錄出現(xiàn)的頻率。對(duì)于一般情況下頻率的計(jì)算,人們傾向于使用HashMap來(lái)統(tǒng)計(jì)。而當(dāng)數(shù)據(jù)量非常大時(shí),一則要求的內(nèi)存非常大,二則當(dāng)HashMap的沖突很高時(shí),時(shí)間復(fù)雜度的上升,導(dǎo)致無(wú)法滿足實(shí)時(shí)性的需要。
本系統(tǒng)使用了Count-Min Sketch[13]算法,其是一種可以處理等值查詢的方法,可以提供很強(qiáng)的準(zhǔn)確性保證。該算法的基本思路是維護(hù)一個(gè)初始為0的D×W大小的數(shù)組。對(duì)于數(shù)據(jù)中出現(xiàn)的每一個(gè)值,分別用D個(gè)獨(dú)立的哈希函數(shù)進(jìn)行映射和計(jì)數(shù)。查詢頻率時(shí),依舊對(duì)其進(jìn)行D次哈希,找到每一行中對(duì)應(yīng)的計(jì)數(shù)值,再取其中的最小值作為估計(jì)值。
Count-Min Sketch可以看作布隆過濾器在統(tǒng)計(jì)方面的一個(gè)變形。其缺點(diǎn)是估計(jì)值總是大于等于真實(shí)值。
(3)多列查詢。本系統(tǒng)假設(shè)不同列之間是相互獨(dú)立的,只需要把不同列的過濾結(jié)果相乘即可。
綜上,可以得到一個(gè)函數(shù)Filter(cols,type)。
其中,cols表示過濾的列,type表示過濾的種類。可得對(duì)Project操作的估計(jì),如公式(3)所示:
(3)
其中,projCols表示被選中的列;all表示表中所有列;attr.Size表示該列的平均大小。
接下來(lái)可以考慮就可以完成代價(jià)模型,本文考慮3種任務(wù)。
第一種任務(wù)類型是全表掃描任務(wù),記為SCAN。SCAN任務(wù)可以包含F(xiàn)ilter、Project和Aggregate操作。在SparkSQL中,F(xiàn)ilter操作和Project操作被Spark的優(yōu)化器Catalyst,通過謂詞下推和列值剪裁來(lái)優(yōu)化執(zhí)行,減小無(wú)用的元組和列對(duì)整體開銷的影響。此時(shí),數(shù)據(jù)大小的估計(jì)如公式(4)所示。
WSize=RDDSize·Project(projCols,all)
·Filter(projCols,type)
(4)
若不存在聚合操作,則整個(gè)任務(wù)可以在一個(gè)管道中邊讀入,邊寫出,整體開銷如公式(5)所示。而存在聚合操作時(shí),任務(wù)的寫出必須在所有數(shù)據(jù)被讀入后才能進(jìn)行。此時(shí),整體開銷如公式(6)所示。
Write(WSize))
(5)
Write(WSize))
(6)
(7)
其中,TableSize為表大小,RDDSize為Spark的RDD分區(qū)大小。若不存在寫出操作,Write(WSize)取0。
如果涉及Join操作,SparkSQL將根據(jù)不同情況,使用以下幾種不同的Join方式:
(1)Broadcast Hash Join應(yīng)用于小表(默認(rèn)閾值為10 MB)和大表之間的Join。使用Broadcast的方式來(lái)完成Join操作,犧牲空間換取時(shí)間。此時(shí),通過將小表廣播到每個(gè)運(yùn)行節(jié)點(diǎn)上,避免了Shuffle帶來(lái)的大量時(shí)間開銷。
(2)Shuffle Hash Join適合較小表和大表之間的Join。如果小表的大小大于10 MB,此時(shí)將小表廣播出去會(huì)造成較大的數(shù)據(jù)冗余和帶寬內(nèi)存消耗,使得運(yùn)行節(jié)點(diǎn)的壓力較大。所以,SparkSQL轉(zhuǎn)為使用Shuffle Hash join,通過Join的key將兩張表進(jìn)行分區(qū),即Shuffle操作。集群內(nèi)的每個(gè)工作節(jié)點(diǎn)都會(huì)參與Shuffle操作,每個(gè)工作節(jié)點(diǎn)處理每個(gè)Bucket的一部分,然后對(duì)每個(gè)分區(qū)內(nèi)的記錄進(jìn)行Hash Join的操作。
(3)Sort Merge Join則適合兩張大表之間的Join。Hash Join的方法是將其中一張表完全讀入內(nèi)存中,然后使用哈希的方法對(duì)另一張表進(jìn)行探測(cè)和連接。而當(dāng)兩張表都較大的情況下,使用哈希方法對(duì)內(nèi)存的壓力過大。此時(shí)SparkSQL通過Join Key將兩張表進(jìn)行Shuffle分區(qū),以便后續(xù)的分布式處理,然后分別對(duì)每個(gè)分區(qū)進(jìn)行排序和合并。
第二種任務(wù)類型是Broadcast Join任務(wù)。該任務(wù)的開銷可以用函數(shù)BJ()表示。在進(jìn)行Broadcast Join時(shí),大表仍然通過一個(gè)SCAN任務(wù)讀入,而小表要進(jìn)行廣播,所以不需要寫操作。Broadcast過程的開銷,參考文獻(xiàn)[10]的研究,得到公式(8)。
(8)
由于整個(gè)Broadcast Join是在內(nèi)存中,通過Hash的方式來(lái)完成,速度非???,瓶頸主要體現(xiàn)在最后寫出的速度上。所以Broadcast Join的開銷函數(shù)如公式(9)所示:
(9)
其中,PartitionNum為Spark中設(shè)置的分區(qū)數(shù)。
第三種任務(wù)類型是Shuffle Join。因?yàn)镾huffle Hash Join和Sort Merge Join的開銷瓶頸都是Shuffle階段,所以其開銷都可以通過Shuffle Join任務(wù)來(lái)描述。該任務(wù)的開銷可以用函數(shù)SJ()來(lái)表示。在進(jìn)行Shuffle Join時(shí),顯然Shuffle的過程是瓶頸所在。另外,Shuffle Join只有在整個(gè)分區(qū)的數(shù)據(jù)都被讀入后才能進(jìn)行,不能邊讀邊寫。但是在第一階段的Shuffle過程中,數(shù)據(jù)是一邊被分配到分區(qū),一邊被讀取的。Shuffle階段的開銷如公式(10)所示:
(10)
對(duì)于Shuffle Join階段的開銷如公式(11)所示:
(11)
綜上所述,不同任務(wù)的開銷可以通過上述3種函數(shù)的組合來(lái)計(jì)算。
2.2.2 針對(duì)機(jī)器學(xué)習(xí)任務(wù)的代價(jià)模型
由于本文考慮的機(jī)器學(xué)習(xí)任務(wù)的運(yùn)行平臺(tái)包括Spark和Python(Scikit-Learn),因此會(huì)在確定運(yùn)行平臺(tái)后完成對(duì)機(jī)器學(xué)習(xí)任務(wù)的開銷估計(jì)。
在確定了運(yùn)行平臺(tái)之后,就可以根據(jù)硬件信息、平臺(tái)選擇、任務(wù)種類(分類、聚類、回歸)和訓(xùn)練數(shù)據(jù)的維度、數(shù)量等信息估計(jì)出這個(gè)機(jī)器學(xué)習(xí)分析算子的大致時(shí)間開銷,可通過一個(gè)回歸任務(wù)來(lái)完成這一開銷估計(jì)??紤]到GBDT(Gradient Boosting Decision Tree)的本質(zhì)是回歸樹,且具有很強(qiáng)的泛化能力,因此本文使用GBDT回歸樹來(lái)完成對(duì)機(jī)器學(xué)習(xí)算子時(shí)間開銷的估計(jì)。
本文在 Spark 集群上進(jìn)行實(shí)驗(yàn),其中包括 1 個(gè) master 節(jié)點(diǎn)和 2 個(gè) worker 節(jié)點(diǎn)。節(jié)點(diǎn)的硬件配置如下: Intel(R)Xeon(R)Silver 4208@2.10 GHz,24核;64 GB 內(nèi)存;8 TB硬盤。軟件配置為:Linux Ubuntu 18.04、Spark-2.3.1、Python 3.6.8版本。
用于數(shù)據(jù)分析流程的訓(xùn)練數(shù)據(jù)為Numpy隨機(jī)生成。而用于文本分析流程的數(shù)據(jù),來(lái)源于新浪公司CMS內(nèi)容管理系統(tǒng)真實(shí)發(fā)布的新聞文章(為2019.01.01~2019.04.22發(fā)布在新浪新聞娛樂頻道的所有新聞文章),去重后共有2 172 925篇。實(shí)驗(yàn)時(shí)先根據(jù)關(guān)鍵詞篩選出1 000篇文章進(jìn)行訓(xùn)練。
第一組實(shí)驗(yàn):測(cè)試調(diào)度算法的優(yōu)化對(duì)系統(tǒng)性能的影響。
實(shí)驗(yàn)比較了樸素拓?fù)湔{(diào)度和基于啟發(fā)式規(guī)則優(yōu)化的拓?fù)湔{(diào)度在任務(wù)流調(diào)度中的開銷差異。
本文首先通過Numpy生成了12 GB的訓(xùn)練數(shù)據(jù),并限制系統(tǒng)內(nèi)存為10 GB,模擬復(fù)雜任務(wù)下的情況,并使用多種不同的算子組合構(gòu)成任務(wù)流,進(jìn)行實(shí)驗(yàn)。其中一種任務(wù)流如圖2所示。實(shí)驗(yàn)結(jié)果見表2。
圖2 實(shí)驗(yàn)任務(wù)流示例
表2 調(diào)度性能測(cè)試
第二組實(shí)驗(yàn):測(cè)試代價(jià)模型的準(zhǔn)確率。
本文在TPC-H的標(biāo)準(zhǔn)下,生成了12 GB的表數(shù)據(jù)。分別測(cè)試了不含Join操作的簡(jiǎn)單SQL和包含了Broadcast Join或Shuffle Join操作的稍復(fù)雜SQL的實(shí)際執(zhí)行時(shí)間,并將之與代價(jià)模型給出的預(yù)估時(shí)間進(jìn)行對(duì)比。實(shí)驗(yàn)結(jié)果見表3。
表3 代價(jià)模型準(zhǔn)確性測(cè)試
誤差率的定義如公式(12)所示:
(12)
由表3中可以看出,Join操作帶來(lái)的復(fù)雜性提升,使得不含Join的簡(jiǎn)單SQL的估計(jì)更加準(zhǔn)確,對(duì)帶Join的SQL的開銷估計(jì)則稍微有所下降。而在兩種Join類型之間,由于Shuffle Join涉及兩張表的分桶和更多的數(shù)據(jù)傳輸,其過程更加復(fù)雜,使得這種類型下的平均誤差率最高,達(dá)到了33%。但3種情況下,代價(jià)模型的誤差控制在35%以內(nèi),達(dá)到了預(yù)期。
針對(duì)多平臺(tái)條件的任務(wù)調(diào)度問題,本文提出了基于啟發(fā)式規(guī)則優(yōu)化的拓?fù)湔{(diào)度算法,并且結(jié)合代價(jià)模型完成了對(duì)SparkSQL任務(wù)和機(jī)器學(xué)習(xí)任務(wù)的開銷估計(jì)。通過實(shí)驗(yàn)證明了調(diào)度算法的有效性。在后續(xù)工作中,可以結(jié)合歷史運(yùn)行數(shù)據(jù),通過一個(gè)端到端的機(jī)器學(xué)習(xí)模型完善和改進(jìn)系統(tǒng)的代價(jià)模型。