国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

異構(gòu)Spark集群數(shù)據(jù)傾斜修正調(diào)度策略*

2022-04-21 05:05修位蓉
計算機工程與科學 2022年4期
關(guān)鍵詞:鍵值數(shù)據(jù)量異構(gòu)

卞 琛,修位蓉,于 炯

(1廣東金融學院互聯(lián)網(wǎng)金融與信息工程學院,廣東 廣州 510521;2.廣州商學院信息技術(shù)與工程學院,廣東 廣州 511363;3新疆大學信息科學與工程學院,新疆 烏魯木齊 830046)

1 引言

隨著非易失內(nèi)存的發(fā)展和高復雜度計算需求的提升,能夠較好地支持迭代計算和數(shù)據(jù)密集型應(yīng)用的內(nèi)存計算框架[1-3]已經(jīng)得到了廣泛的應(yīng)用。分布式內(nèi)存計算框架Spark[4,5]作為其中的典型代表,利用分布式集群的節(jié)點,在內(nèi)存對數(shù)據(jù)進行高效處理和臨時緩存,具有高性能、高可靠和低延遲的特性。傳統(tǒng)基于磁盤的分布式計算框架MapReduce[6]將應(yīng)用簡單劃分為多個Map和Reduce操作,磁盤I/O的開銷很大。與之不同的是,Spark將作業(yè)基于操作間的前后關(guān)系轉(zhuǎn)化成DAG圖,并根據(jù)處理的操作類型不同(寬依賴操作或窄依賴操作)劃分成多個計算階段(stage),各階段串行處理,階段內(nèi)部由多個任務(wù)(task)并行處理。作業(yè)執(zhí)行時間取決于所有階段執(zhí)行時間之和,若能夠盡可能減少每個階段的執(zhí)行時間,則最終整個作業(yè)的執(zhí)行時間就能夠得到優(yōu)化。由于每個階段內(nèi)的任務(wù)需要同步,完成時長取決于執(zhí)行時間最長的任務(wù),讓每個任務(wù)都盡快完成才成達到優(yōu)化目標,因此并發(fā)任務(wù)數(shù)(并行度)和計算量分配是影響任務(wù)執(zhí)行速度的關(guān)鍵因素。

Spark框架的任務(wù)并行度是分布式內(nèi)存計算環(huán)境中各階段內(nèi)部任務(wù)執(zhí)行的并發(fā)程度。任務(wù)并行度包含物理并行度和邏輯并行度:物理并行度是集群資源限定下能夠承載的最大并發(fā)任務(wù)個數(shù),由注冊的工作節(jié)點數(shù)及每個節(jié)點貢獻的CPU核心數(shù)確定;而邏輯并行度為用戶/程序所預期的任務(wù)并發(fā)程度,由Spark的執(zhí)行參數(shù)確定。理想情況下,物理并行度和邏輯并行度一致時能夠獲得最佳效率,而由于存儲容量限制,數(shù)據(jù)量較大或分布不均衡可能會導致內(nèi)存空間不足或內(nèi)存溢出,反而影響執(zhí)行效率。同時,邏輯并行度設(shè)置過大時,也同樣會帶來調(diào)度和任務(wù)切換的額外開銷。Spark的任務(wù)并行度通常在系統(tǒng)配置參數(shù)default.parallelism中設(shè)定,往往由程序員依靠經(jīng)驗預先設(shè)定,較難契合各類型的作業(yè)和集群環(huán)境,增加了溢寫、容錯甚至是程序異常的風險。同時,并行度在整個作業(yè)所有階段的執(zhí)行過程中恒定不變,與作業(yè)類型、數(shù)據(jù)特性和節(jié)點能力無關(guān),每個階段的數(shù)據(jù)分布和數(shù)據(jù)大小都不一樣,不能很好地適應(yīng)作業(yè)特點,提高并發(fā)執(zhí)行效率。

在不改變分區(qū)規(guī)則的情況下,由原始數(shù)據(jù)分布和并行度參數(shù)共同決定計算數(shù)據(jù)量的分配,對作業(yè)執(zhí)行效率和集群資源利用率均產(chǎn)生重要影響,在異構(gòu)Spark集群中的表現(xiàn)尤為突出。因此,本文針對異構(gòu)Spark集群節(jié)點性能與數(shù)據(jù)分配的適配性問題,研究數(shù)據(jù)分布和并行度設(shè)置對作業(yè)執(zhí)行效率的影響,建立節(jié)點資源模型、數(shù)據(jù)分配模型和任務(wù)執(zhí)行模型,分析數(shù)據(jù)分布、并行度參數(shù)和節(jié)點任務(wù)分配的耦合關(guān)系,異構(gòu)Spark集群的數(shù)據(jù)傾斜修正調(diào)度策略DSCS(Data Skew Correction Scheduling strategy),包括并行度預估算法、數(shù)據(jù)傾斜修正和異構(gòu)節(jié)點任務(wù)分配算法。并行度預估算法基于統(tǒng)計的輸入數(shù)據(jù)總量、可用節(jié)點數(shù)量、可用核心數(shù)量和可用內(nèi)存數(shù)量,對并行度和輪數(shù)進行預估;數(shù)據(jù)傾斜修正算法對輸入數(shù)據(jù)分布傾斜度較高的階段進行處理,盡可能均衡不同數(shù)據(jù)桶(Bucket)中的分配量,當部分鍵值(key)數(shù)據(jù)量明顯大于其他鍵值時,對該健值的數(shù)據(jù)進行拆分并增加并行度;異構(gòu)節(jié)點任務(wù)分配算法分別根據(jù)節(jié)點計算能力和內(nèi)存空間大小倒序生成2個隊列,優(yōu)先分配更多的任務(wù)到計算能力更強的節(jié)點和內(nèi)存可用空間更大的節(jié)點,從而使異構(gòu)Spark集群能夠通過并行調(diào)度策略,選擇具有更小內(nèi)存溢出量和更小任務(wù)調(diào)度開銷的并行計算方案。

2 相關(guān)工作

在內(nèi)存計算框架中,任務(wù)并行度設(shè)置的合理性將直接影響到作業(yè)執(zhí)行效率和集群資源利用率。針對內(nèi)存計算框架的任務(wù)并行調(diào)度效率問題,研究者從不同角度提出了優(yōu)化方案。文獻[7]針對嵌套型列存儲Impala,利用多層查詢樹的結(jié)構(gòu)提高了查詢?nèi)蝿?wù)的并行度,有效地降低了系統(tǒng)廣播和查詢開銷。文獻[8]分析發(fā)現(xiàn),在MapReduce環(huán)境中,并行度的設(shè)置會顯著影響緩存在作業(yè)執(zhí)行中的作用,合理的并行度能夠有效提高作業(yè)執(zhí)行效率,在此基礎(chǔ)上提出了基于并行度優(yōu)化的緩存設(shè)置算法。文獻[9]研究表明,在Spark環(huán)境中,各計算階段的任務(wù)并行度設(shè)置不合理會降低作業(yè)執(zhí)行性能,增加作業(yè)的計算開銷,在此基礎(chǔ)上提出通過調(diào)整并行度來提高內(nèi)存資源利用率的任務(wù)調(diào)度算法。文獻[10]在MapReduce環(huán)境中,為了合理地設(shè)置節(jié)點任務(wù)并行度,有效地均衡節(jié)點通信成本,提高作業(yè)執(zhí)行效率,設(shè)置了基于通信成本模型的任務(wù)調(diào)度算法。文獻[11]在并行度設(shè)置的基礎(chǔ)上,提出了任務(wù)本地化調(diào)度算法,從而優(yōu)化任務(wù)執(zhí)行機制,提高任務(wù)本地性,降低網(wǎng)絡(luò)開銷。文獻[12]基于并行度分布的情況,提出了動態(tài)任務(wù)調(diào)度策略,利用任務(wù)的數(shù)據(jù)分布情況,結(jié)合節(jié)點的計算能力為任務(wù)分配相應(yīng)的系統(tǒng)資源。文獻[13,14]對Spark環(huán)境中的具體參數(shù)進行建模和預測,實現(xiàn)對包含并行度等多個作業(yè)參數(shù)的優(yōu)化。

針對任務(wù)計算量分配問題,也涌現(xiàn)出大量的研究成果。文獻[15]提出了SkewReduce,通過建立代價模型評估分區(qū)容量,在作業(yè)執(zhí)行過程中進行元數(shù)據(jù)的逐步收集,達到契機時執(zhí)行分區(qū)優(yōu)化函數(shù)并實施新的分區(qū)方案。文獻[16]提出了SkewTune策略,該策略并不期望在執(zhí)行計劃階段就建立均衡的分區(qū),而是建立了Reduce端的任務(wù)剩余代價評估模型,任何Reduce端的任務(wù)完成后,都將觸發(fā)其他未完成任務(wù)的剩余代價評估,并將未處理數(shù)據(jù)向已完成任務(wù)的工作節(jié)點遷移,從而達到數(shù)據(jù)分配的整體均衡。文獻[17]提出了一種基于采樣的分區(qū)策略,該策略通過在Map端增加獨立的采樣進程獲得近似數(shù)據(jù)分布,采樣達到閾值后對已生成的分區(qū)進行拆分和重組,從而提高數(shù)據(jù)分配的均衡性。文獻[18,19]提出了精細分區(qū)和動態(tài)拆分2種算法,首先通過精細分區(qū)算法生成固定數(shù)量的分區(qū),同時進行采樣獲得近似數(shù)據(jù)分布,當完成一定比例的Map任務(wù)后,觸發(fā)動態(tài)拆分函數(shù),達到數(shù)據(jù)合理分配的目標。文獻[20,21]提出了基于數(shù)據(jù)塊的采樣分區(qū)方法,該方法將原生的鍵值對轉(zhuǎn)換為〈blocking_key,entity〉形式,通過設(shè)計評估函數(shù)對塊內(nèi)數(shù)據(jù)進行評估,對不符合條件的數(shù)據(jù)塊進行調(diào)整。文獻[22]提出先對輸入數(shù)據(jù)進行25%的隨機采樣,通過采樣結(jié)果獲得數(shù)據(jù)分布并制定分區(qū)函數(shù),然后啟動任務(wù)填充數(shù)據(jù)。文獻[23]提出了LEEN策略,通過對輸入數(shù)據(jù)的預掃描獲取數(shù)據(jù)分布,在Map任務(wù)執(zhí)行過程中對數(shù)據(jù)的鍵值進行頻率統(tǒng)計,然后綜合數(shù)據(jù)分布和鍵值頻率統(tǒng)計設(shè)定合理的分區(qū)函數(shù)。文獻[24]提出了一種任務(wù)級別的啟發(fā)式調(diào)度策略,通過收集任務(wù)特性和資源需求,將任務(wù)發(fā)送到最適合的工作節(jié)點,監(jiān)測慢任務(wù)并進行重定向,但對任務(wù)劃分的合理性則沒有進行評估和改進。H-Scheduler[25]和Selecta[26]主要針對計算集群的存儲結(jié)構(gòu),評估HDD+SSD混合存儲結(jié)構(gòu)對作業(yè)執(zhí)行效率的影響。H-Scheduler利用存儲類型和數(shù)據(jù)本地性對任務(wù)進行分類,再根據(jù)分類結(jié)果重新定義任務(wù)調(diào)度優(yōu)先級和工作節(jié)點映射。Selecta則分析影響作業(yè)效率的潛在因素,采用協(xié)同過濾算法對作業(yè)在不同配置環(huán)境中的性能表現(xiàn)進行預測,預測結(jié)果有94%的最佳性能配置命中率和80%的最優(yōu)成本命中率。

本文與現(xiàn)有研究成果的不同之處在于,發(fā)現(xiàn)并行度和計算量分配都是決定任務(wù)執(zhí)行時間的重要因素且兩者存在耦合,無法通過已知直接求得2個相互耦合未知數(shù)的最優(yōu)解,因此也并不期望在作業(yè)計劃階段就確定適合異構(gòu)Spark集群的最優(yōu)并行度和最佳數(shù)據(jù)分配,而是在多階段任務(wù)執(zhí)行過程中為每個階段制定相對合理的并行度,通過后期調(diào)整來優(yōu)化數(shù)據(jù)分配。首先通過建立計算節(jié)點資源模型、數(shù)據(jù)分配模型和任務(wù)執(zhí)行模型,確定并行度設(shè)置的約束,分析任務(wù)并行度和計算量分配的耦合關(guān)系,在此基礎(chǔ)上提出異構(gòu)Spark集群的數(shù)據(jù)傾斜修正調(diào)度策略DSCS,包括并行度預估、數(shù)據(jù)傾斜修正和異構(gòu)節(jié)點任務(wù)分配算法。并行度預估算法基于有限的數(shù)據(jù)集信息和Spark工作節(jié)點的資源狀況,預估并行度、分配輪數(shù)的相對合理值,實施作業(yè)的發(fā)送。在作業(yè)并行計算過程中,首個計算階段任務(wù)運行完畢后進行數(shù)據(jù)分布的采集和存儲,根據(jù)傾斜情況對數(shù)據(jù)進行切分,更新并行度參數(shù)。最后,在考慮節(jié)點異構(gòu)的情況下,根據(jù)節(jié)點計算能力的不同,將數(shù)據(jù)分配到不同節(jié)點,從而有效均衡節(jié)點計算能力和計算數(shù)據(jù)量之間的關(guān)系。

3 問題的建模與分析

本節(jié)主要分析Spark作業(yè)的并行執(zhí)行機制,進行模型定義和相關(guān)定理證明,為第4節(jié)算法設(shè)計提供理論基礎(chǔ)。

3.1 模型定義

定義1(節(jié)點計算能力) 定義由n個節(jié)點構(gòu)成的Spark集群w={w1,w2,…,wn},其中wx為第x個Spark計算節(jié)點(簡稱節(jié)點x)。內(nèi)存計算框架的節(jié)點計算資源主要包括CPU資源、內(nèi)存資源和網(wǎng)絡(luò)資源,通常同一個集群的網(wǎng)絡(luò)資源及傳輸能力相近,因此節(jié)點x的節(jié)點計算能力可定義為cx=(CPUx,Memx)。對節(jié)點計算能力進行統(tǒng)計分析時,計算能力主要反映單位時間處理的數(shù)據(jù)量,其中CPU貢獻用主頻和核數(shù)之積來評判,表示為CPUx=fx×corex。內(nèi)存則以吞吐量為指標,由于吞吐量牽涉的因素較多,而內(nèi)存計算的最大瓶頸是容量問題,因此可定義內(nèi)存處理能力為Memx,用于表示內(nèi)存空間大小。

(1)

(2)

則節(jié)點x的綜合歸一化計算能力則可表示如式(3)所示:

(3)

定義2(節(jié)點資源占用率) 節(jié)點資源占用率是指系統(tǒng)中節(jié)點資源的已使用比率,占用率越高,節(jié)點計算能力越低;反之,則計算能力越高。節(jié)點x的資源利用率ux可定義如式(4)所示:

ux={ucx,umx|0≤ucx≤1,0≤umx≤1}

(4)

其中,ucx表示CPU占用率,umx表示內(nèi)存占用率。

ucx和umx均可通過系統(tǒng)性能監(jiān)測軟件獲得,本文利用nmon軟件進行監(jiān)測。當CPU利用率為ucx時,則實際的CPU計算能力為fx×corex×(1-ucx),當內(nèi)存資源占用率為umx時,則實際的內(nèi)存可用空間為Memx×(1-umn)。

定義3(數(shù)據(jù)分布) 輸入數(shù)據(jù)分布對應(yīng)Spark作業(yè)的每個階段,第1個階段的輸入數(shù)據(jù)分布是原始數(shù)據(jù)集的鍵值分布情況,若輸入數(shù)據(jù)中包含m個鍵值,且km對應(yīng)的數(shù)據(jù)量為vm時,則輸入數(shù)據(jù)分布如式(5)所示:

d=(〈k1,v1〉,〈k2,v2〉,…,〈km,vm〉)

(5)

后續(xù)計算階段的輸入數(shù)據(jù)分布對應(yīng)前一個階段的輸出結(jié)果,假設(shè)某個階段包含r個數(shù)據(jù)分區(qū)(partition),則需統(tǒng)計r個分區(qū)中鍵值的數(shù)量,則第b個分區(qū)的數(shù)據(jù)分布db=(〈k1,vb1〉,〈k2,vb2〉,…,〈km,vbm〉),而輸入總數(shù)據(jù)量則如式(6)所示:

(6)

定義4(數(shù)據(jù)傾斜度) 輸入數(shù)據(jù)傾斜是某個鍵值的數(shù)據(jù)量相較于其他鍵值數(shù)據(jù)量的差異程度,因此可定義為某個鍵值的數(shù)據(jù)量與總體數(shù)據(jù)量均值的方差。對于輸入數(shù)據(jù)的第j個鍵值,其數(shù)據(jù)傾斜度可定義為:

(7)

skewj越大,表示該鍵值數(shù)據(jù)傾斜度越高;反之,則表示該鍵值的數(shù)據(jù)量與平均數(shù)據(jù)量差距越小。

因此,輸入數(shù)據(jù)分布d的數(shù)據(jù)傾斜度可表示如式(8)所示:

(8)

S越大,表示數(shù)據(jù)分布越不均勻;S越趨近于0,則數(shù)據(jù)分布越均勻。

定義5(key-bucket分布) 對于作業(yè)中每個計算階段的輸出,需要將輸出數(shù)據(jù)按鍵值分配到多個數(shù)據(jù)桶中。Spark系統(tǒng)根據(jù)哈希函數(shù)映射,將每個分區(qū)數(shù)據(jù)的鍵值關(guān)聯(lián)到對應(yīng)的桶中,如式(9)所示:

bucketid=hashfunction(key)

(9)

每個計算階段輸出的數(shù)據(jù)桶個數(shù)由設(shè)置的并行度參數(shù)決定,若當前并行度數(shù)為p,則數(shù)據(jù)桶的id取值為0到p-1。

定義6(數(shù)據(jù)桶傾斜度) 數(shù)據(jù)桶傾斜度是對于同一計算階段的多個數(shù)據(jù)桶,某個數(shù)據(jù)桶的數(shù)據(jù)量與總體數(shù)據(jù)量均值的方差。對于第p-1個數(shù)據(jù)桶,其數(shù)據(jù)傾斜度可定義為:

(10)

skewbucketp-1越大,表示該數(shù)據(jù)桶傾斜度越高。若所有數(shù)據(jù)桶的傾斜度越趨近于0,表示各數(shù)據(jù)桶的數(shù)據(jù)量分配越均勻。

定義7(任務(wù)內(nèi)存需求) 由于Spark利用內(nèi)存處理來提升計算效率,因此分配足夠內(nèi)存既能夠保障內(nèi)存計算的執(zhí)行速度,同時也能避免因不正確配置參數(shù)導致的不可預期程序異常。記Memtaskh為第h個任務(wù)的內(nèi)存需求,若共有q個任務(wù),則作業(yè)總體內(nèi)存需求如式(11)所示:

(11)

內(nèi)存需求量主要包括3部分:(1)基本內(nèi)存需求,即數(shù)據(jù)集中對象大小的2~3倍;(2)訪問這些對象的內(nèi)存消耗;(3)垃圾回收消耗。因此,某個任務(wù)的執(zhí)行區(qū)需求量與所需計算的數(shù)據(jù)量有關(guān)。實際執(zhí)行區(qū)需求量可通過實時查看內(nèi)存所占用空間大小和溢出量來綜合判斷。

3.2 定理分析

定理1并行度適度法則:并行度的選擇符合適度法則,既不能太少,也不能太多,需要和執(zhí)行作業(yè)的需求以及集群計算能力相匹配。若分區(qū)數(shù)較少,會產(chǎn)生額外磁盤開銷,影響執(zhí)行效率;若分區(qū)數(shù)較多,會增加任務(wù)切換的開銷,且生成較多的小文件。

因此,并行度的設(shè)置既不能太小也不能太大,想要獲得較好的性能,需根據(jù)作業(yè)DAG及具體數(shù)據(jù)狀況選擇合適的并行度。

定理2數(shù)據(jù)溢出規(guī)避法則:在某個計算階段中產(chǎn)生數(shù)據(jù)傾斜時,分配更大的并行度,使得每個任務(wù)內(nèi)存需求量減小,其內(nèi)存溢出量也會隨之降低。

證明記節(jié)點x分配的內(nèi)存大小和核數(shù)分別為Memx和fx。設(shè)分配的任務(wù)數(shù)量為l。

任務(wù)內(nèi)存需求量分別為{Memtask1,Memtask2,…,Memtaskl}且Memtask1>Memtask2>Memtaskl,即將l個任務(wù)按照所需內(nèi)存大小降序排列,隨著l變大,則每個任務(wù)的內(nèi)存需求量隨之減小。

若lfx,由于輸入數(shù)據(jù)量衡定,因此l越大,Memtaska的均值越小,而Memx/fx為常量,則Memtaska-Memx/l>0的概率越低,即任務(wù)數(shù)越多則單任務(wù)的內(nèi)存需求越小,內(nèi)存溢出量也越少。

4 異構(gòu)并行調(diào)度策略

本節(jié)基于模型的相關(guān)定義及定理證明,首先描述算法的程序模塊和代碼更新,然后進行異構(gòu)Spark集群并行調(diào)度策略的整體描述,最后提出并行度預估算法、數(shù)據(jù)傾斜修正算法和異構(gòu)節(jié)點任務(wù)分配算法。

4.1 并行調(diào)度策略總體描述

自適應(yīng)異構(gòu)并行調(diào)度策略根據(jù)數(shù)據(jù)的傾斜情況和節(jié)點計算能力,在作業(yè)執(zhí)行過程中對并行度和任務(wù)分配進行調(diào)整。計算各階段并行度的前提是獲取前序階段的任務(wù)數(shù)及其對應(yīng)輸出數(shù)據(jù)量,然后根據(jù)計算節(jié)點的資源情況對任務(wù)進行有效分配。對于迭代執(zhí)行的算法,首輪執(zhí)行Spark任務(wù)時,根據(jù)并行度預估算法的默認并行度執(zhí)行,在第2輪開始,根據(jù)前一輪的數(shù)據(jù)統(tǒng)計和傾斜分析,利用預估算法和數(shù)據(jù)傾斜修正算法對當前的并行度進行調(diào)整,任務(wù)分配時,則需要通過異構(gòu)節(jié)點任務(wù)分配算法根據(jù)當前節(jié)點的資源狀況進行任務(wù)量分配。自適應(yīng)異構(gòu)并行調(diào)度策略的主要過程如下所示:

(1)除第1個階段之外,其余每個計算階段收集前序階段輸出數(shù)據(jù)大小、數(shù)據(jù)分布和可用節(jié)點的空閑內(nèi)存及核數(shù)資源情況,構(gòu)建算法所需的元數(shù)據(jù)。

(2)根據(jù)采集的元數(shù)據(jù),執(zhí)行初始并行度預估算法(見4.2節(jié)),計算當前計算階段的初始并行度。

(3)根據(jù)并行調(diào)度策略的機制,首先預判斷不均衡的鍵值,利用數(shù)據(jù)傾斜修正算法(見4.3節(jié))對鍵值進行拆分評估, 對數(shù)據(jù)傾斜度較高的鍵值增加前綴,使其能夠映射到多個數(shù)據(jù)桶,然后將傾斜的數(shù)據(jù)桶進一步劃分,增加相應(yīng)的并行度。

(4)當確定并行度后,根據(jù)異構(gòu)節(jié)點任務(wù)分配算法(見4.4節(jié))對(3)中待分配數(shù)據(jù)桶進行劃分和映射。

(5)迭代(1)~(4),在多寬依賴作業(yè)中實施多次分配,為后續(xù)計算階段確定合理的并行度和數(shù)據(jù)分配。

4.2 并行度預估算法

根據(jù)3.2節(jié)定理1可知,并行度設(shè)置應(yīng)當在執(zhí)行任務(wù)內(nèi)存需求不超過所分配工作節(jié)點可用容量的情況下,選擇額外調(diào)度開銷盡可能少的并行度設(shè)定值。因此,算法通過前序計算階段的數(shù)據(jù)量和可用節(jié)點的內(nèi)存空間計算并行度,并且保障任務(wù)平均分配數(shù)據(jù)大小不超過平均內(nèi)存空間大小。并行度預估算法詳細步驟如下所示:

(1)統(tǒng)計系統(tǒng)各節(jié)點資源情況,并將其存儲在node表中,其中包括節(jié)點內(nèi)存容量和核心數(shù)等。

(2)在計算開始前,基于作業(yè)DAG生成計算階段結(jié)構(gòu)樹stageTree。

(3)遍歷計算階段結(jié)構(gòu)樹,記錄計算階段集合stagei(wdi,inputRDDi和outputRDDi),其中inputRDDi和outputRDDi分別為stagei中輸入RDD和輸出RDD,可以是一個或多個RDD。

(4)初始化stage0,其并行度劃分取決于作業(yè)輸入數(shù)據(jù)在HDFS中分塊的個數(shù)blockNum,該值由輸入數(shù)據(jù)大小inputSize和默認HDFS分塊大小blockSize共同決定。

(5)在stage1到stagen執(zhí)行階段中,獲取當前可用節(jié)點的可用資源情況,存儲在availableResource表,包括內(nèi)存和核心數(shù),統(tǒng)計可用內(nèi)存總?cè)萘縯ms和可用核心總數(shù)tcn。

(6)同時統(tǒng)計每個任務(wù)的鍵值分布和數(shù)據(jù)大小。當前序stagei執(zhí)行結(jié)束,統(tǒng)計各分區(qū)大小ptSize和數(shù)據(jù)分布情況〈key,Value〉,并計算所有parentRDD的總?cè)萘縫rSize。

(7)假定內(nèi)存需求量與數(shù)據(jù)量的比值關(guān)系為xp,默認情況下將xp設(shè)置為2,即數(shù)據(jù)在內(nèi)存中所需的內(nèi)存空間為數(shù)據(jù)量大小的2倍,則計算parentRDD總?cè)萘颗c可用內(nèi)存總?cè)萘縯ms和核心總數(shù)tcn乘積的商值,即(xp×prSize/tms)×tcn。

(8)若(xp×prSize/tms)×tcn≤1,則初始并行度設(shè)置為tcn。

并行度預估算法如算法1所示。

算法1并行度預估算法

輸入:作業(yè)DAG,系統(tǒng)配置systemconfigruarion,輸入數(shù)據(jù)塊個數(shù)blocknum,可用資源availableResource,內(nèi)存需求比xp。

輸出:初始并行度stagei.Pa0

初始化:prSize←0;xp←2;

①stageTree←generate(DAG);//獲取執(zhí)行計劃

②stage0.Pa←blockNum;/*初始化第1個計算階段*/

③for(i=1 tostageTree.length-1)

④stagei(wdi,parentRDD)←get(DAG);

⑤ptnumi←stagei.partitionnum;

⑥tms←sum(availableResource.mem);

⑦tcn←sum(availableResource.core);

⑧for(j=0 toptnumi-1)

⑨ptSizeij←getsize(i,j);

⑩ (kid,ksize)←partitionij(keyid,keysize);

4.3 數(shù)據(jù)傾斜修正算法

根據(jù)3.2節(jié)定理2可知,當節(jié)點產(chǎn)生數(shù)據(jù)傾斜時,為了盡可能地減少內(nèi)存溢出,需要對數(shù)據(jù)傾斜且內(nèi)存需求超過內(nèi)存的鍵值進行拆分。傾斜修正算法的主要目的是利用分區(qū)函數(shù)將數(shù)據(jù)傾斜且明顯大于需求內(nèi)存的鍵值進行重新劃分,進而劃分到不同的分區(qū)中去,在分配之后去掉添加的前綴,恢復成原本的鍵值,再重新執(zhí)行一個Reduce操作。

數(shù)據(jù)傾斜修正算法的主要步驟如下所示:

(4)將所有拆分sk_id的拆分個數(shù)統(tǒng)計求和,記錄為tskn;

(6)此時將默認數(shù)據(jù)桶數(shù)量修改為修正并行度Pa1=stagei.Pa0+tskn;

(7)通過計算hash(key) mod (Pa0+tskn)獲得寫入數(shù)據(jù)的數(shù)據(jù)桶編號,將數(shù)據(jù)存入相應(yīng)數(shù)據(jù)桶;

(8)數(shù)據(jù)桶則暫不建立映射關(guān)系,交由后續(xù)異構(gòu)節(jié)點任務(wù)分配算法進行操作。

數(shù)據(jù)傾斜修正算法如算法2所示。

算法2數(shù)據(jù)傾斜修正算法

輸入:當前執(zhí)行階段stagei,總元組數(shù)vinput,鍵值的個數(shù)m,可用內(nèi)存總?cè)萘縯ms,可用核心總數(shù)tcn,初始并行度Pa0。

初始化:vj+=Get(keyj_size);

//獲取按鍵值劃分的數(shù)據(jù)量

①forkeyjinstagei(j=0 tom-1)

③if(skewj>0 andvj>tms/tcn)then

④sk_id[l2]←kj(key_id);

⑤l2++;

⑥svl2←sk_id[l2].size;

⑧tskn+←svnl2;

⑨for(k=0 tosvnl2)

⑩key[n2]←addPrefix(sk_id[k]);

4.4 異構(gòu)節(jié)點任務(wù)分配算法

數(shù)據(jù)桶填充完畢之后,需要將桶映射到節(jié)點相應(yīng)的Reducer中,桶的數(shù)量與Reducer個數(shù)相同,系統(tǒng)默認方式是根據(jù)編號直接建立映射關(guān)系,由于此時未考慮節(jié)點CPU計算能力和內(nèi)存大小的差異,因此不能有效地平衡節(jié)點計算能力和任務(wù)量之間的關(guān)系。異構(gòu)節(jié)點任務(wù)分配算法根據(jù)節(jié)點計算能力進行排序,將各數(shù)據(jù)桶數(shù)據(jù)量大小進行排序,盡可能將符合節(jié)點計算要求的數(shù)據(jù)桶分配到相應(yīng)的節(jié)點中。異構(gòu)節(jié)點任務(wù)分配算法主要過程如下所示:

(1)將并行度Pa1擴展至Pa2=α×tcn,其中α為正整數(shù)且(α-1)×tcn≤Pa1≤α×tcn,α≥1;

(2)同時,將Pa1個數(shù)據(jù)桶劃分為α輪進行分配,其中前α-1輪,每輪分配tcn個數(shù)據(jù)桶,第α輪分配Pa1-(α-1)×tcn個數(shù)據(jù)桶;

(4)將bucket[Pa1]根據(jù)數(shù)據(jù)量大小進行排序存入數(shù)組bs;

(5)從第1輪到第α-1輪中,每輪從bs[(round-1)×tcn]到bs[round×tcn-1]開始選擇,將bs[(round-1) ×tcn]到bs[round×tcn-1]一一映射到reducer1到reducertcn;

(6)第α輪,將bs[(α-1)×tcn]至bs[Pa1-1]一一映射到節(jié)點計算能力較強的Reducer上,即Pa1-(α-1)×tcn到reducertcn。

異構(gòu)節(jié)點任務(wù)分配算法如算法3所示。

算法3異構(gòu)節(jié)點任務(wù)分配算法

輸入:當前執(zhí)行階段stagei,可用核心總數(shù)tcn,修正并行度Pa1。

初始化:k←1;q1←1;

//將節(jié)點根據(jù)計算能力排序

③bs[e]←descendBy(vbucket[e]);

//將數(shù)據(jù)桶根據(jù)數(shù)據(jù)量大小排序

④for(round=1 toα-1)

⑤for(u=round-1)*tcntotcn)

⑥for(v=bs[(α-1)*tcntotcn)

⑦reducerk1←assign(bs[(round-1)*tcn+q1]);

⑧k1++;

⑨q1++;

⑩endfor/*將第1到α-1輪的數(shù)據(jù)桶映射到Reducer*/

5 實驗與評價

本節(jié)將通過實驗進行比較和評價,驗證異構(gòu)并行調(diào)度策略的有效性。

5.1 實驗環(huán)境

本文的Spark集群共有10個節(jié)點,包括1個主節(jié)點與9個工作節(jié)點,各節(jié)點軟硬件配置如表1所示。Spark集群為5個工作節(jié)點各分配4 GB內(nèi)存和2個CPU核心,其余2個工作節(jié)點各分配2 GB內(nèi)存和1個CPU核心,2個工作節(jié)點各分配8 GB內(nèi)存和4個CPU核心,因此集群中共有40 GB內(nèi)存以及20個CPU邏輯核心用于執(zhí)行迭代應(yīng)用。

Table 1 Configuration parameters of worker nodes

實驗采用nmon來監(jiān)控Spark集群的資源使用情況,由于主節(jié)點主要負責任務(wù)調(diào)度與資源分配,不需要實際執(zhí)行任務(wù),為方便起見,本文將主節(jié)點與監(jiān)控服務(wù)器集成到一個節(jié)點,避免對集群計算性能造成影響。實驗使用基準測試集BigDataBench[27]中多個作業(yè),包括WordCount算法、TeraSort算法、k-means聚類算法和PageRank算法進行評估。

5.2 算法單獨評估

實驗采用數(shù)據(jù)密集型應(yīng)用PageRank作業(yè)對提出的3個算法分別進行評估,數(shù)據(jù)選取SNAP(Stanford Network Analysis Project)提供的3個數(shù)據(jù)量差異較大的標準數(shù)據(jù)集,均為有向圖,如表2所示。任務(wù)選用數(shù)據(jù)密集型算法PageRank,因為數(shù)據(jù)密集型算法對系統(tǒng)的并行度策略更加敏感,更有利于驗證算法的有效性。為了明顯體現(xiàn)并行度對內(nèi)存溢出情況的顯著影響可用節(jié)點數(shù)量調(diào)整為4個工作節(jié)點和1個主節(jié)點,將工作節(jié)點可用內(nèi)存空間調(diào)整為1 GB,其中2個節(jié)點的可用核心數(shù)為2個,另外2個節(jié)點的可用核心數(shù)為4個。

Table 2 Information of test datasets

(1)初始并行度生成算法。

利用PageRank作業(yè)進行10輪迭代,驗證初始并行度生成算法的效率,與系統(tǒng)默認并行度設(shè)置為2×40=80的情況進行對比,實驗結(jié)果如圖1所示。其中,圖1a為不同輸入數(shù)據(jù)類型,利用動態(tài)并行度進行設(shè)置與默認固定并行度的執(zhí)行時間對比;圖1b為不同輸入數(shù)據(jù)類型,利用動態(tài)并行度設(shè)置算法進行配置時,算法在不同計算階段的并行度變化。

Figure 1 Parallelism prediction algorithm

由圖1a可知,對3類數(shù)據(jù)類型而言,使用動態(tài)并行度都能夠有效地縮短作業(yè)執(zhí)行時間,不同的并行度對作業(yè)執(zhí)行效率的影響很明顯,不恰當?shù)牟⑿卸瓤赡軙沟貌糠止?jié)點資源利用率較低,其余節(jié)點內(nèi)存資源不足而溢出,從而增加任務(wù)執(zhí)行時間,降低作業(yè)執(zhí)行效率。其中Cit-Patents受到并行度影響更大,由于Cit-Patents具有較大的數(shù)據(jù)量,因此在執(zhí)行過程中,需要占用的內(nèi)存空間較大,更有可能發(fā)生溢寫,因此選擇合理的并行度能夠更好地縮短作業(yè)執(zhí)行時間。

結(jié)合圖1b可知,各個階段的并行度具有明顯的變化,DSCS能夠有效結(jié)合計算階段的執(zhí)行狀態(tài)進行調(diào)整,隨著并行度的變化,優(yōu)化了階段內(nèi)任務(wù)的完成時間,最終縮短了作業(yè)總執(zhí)行時間。

(2)數(shù)據(jù)傾斜修正算法。

使用PageRank作業(yè)3個不同數(shù)據(jù)集,驗證數(shù)據(jù)傾斜修正算法效率,并與系統(tǒng)默認的數(shù)據(jù)桶劃分算法進行對比。算法未修改的參數(shù)保持Spark系統(tǒng)默認,迭代次數(shù)為10,并行度固定設(shè)置為20,傾斜度分別設(shè)置為0.2,0.4,0.6和0.8。圖2a為不同傾斜度時,數(shù)據(jù)傾斜修正算法與系統(tǒng)默認數(shù)據(jù)桶劃分算法的執(zhí)行時間對比;圖2b為不同傾斜度時,3個數(shù)據(jù)集平均內(nèi)存溢寫情況的對比。

由圖2a可知,PageRank在數(shù)據(jù)量較大且產(chǎn)生數(shù)據(jù)傾斜時,數(shù)據(jù)傾斜修正算法對執(zhí)行時間的影響較大;而在數(shù)據(jù)量較小時,即使產(chǎn)生數(shù)據(jù)傾斜,該算法的效果也不明顯,因為不同任務(wù)之間數(shù)據(jù)傾斜的差異較小。由圖2b可知,執(zhí)行作業(yè)數(shù)據(jù)量越大、數(shù)據(jù)傾斜越嚴重,在默認數(shù)據(jù)桶劃分算法的情況下,產(chǎn)生的磁盤溢寫量就越大。

(3)異構(gòu)節(jié)點任務(wù)分配算法。

利用PageRank作業(yè)驗證異構(gòu)節(jié)點任務(wù)分配算法的效率,并與系統(tǒng)默認任務(wù)分配一一映射的算法進行對比。圖3a為異構(gòu)節(jié)點任務(wù)分配算法與系統(tǒng)默認的節(jié)點分配算法的執(zhí)行時間對比;圖3b為利用異構(gòu)節(jié)點任務(wù)分配算法進行設(shè)置時,3個數(shù)據(jù)作業(yè)在不同迭代次數(shù)時的高性能節(jié)點3的資源利用率情況。由圖3a可知,PageRank在節(jié)點異構(gòu)情況下,使用系統(tǒng)默認的數(shù)據(jù)桶分配算法與異構(gòu)節(jié)點任務(wù)分配算法相比,執(zhí)行時間相對略長。在各節(jié)點之間計算能力差異明顯的情況下,隨著數(shù)據(jù)傾斜度的增加,效率差異會更加顯著。由圖3b可知,PageRank算法在使用異構(gòu)節(jié)點任務(wù)分配算法進行配置時,節(jié)點資源利用率得到了較好的均衡,表明充分利用了計算能力更強的節(jié)點。

5.3 算法綜合評估

使用多個作業(yè)包括WordCount算法、TeraSort算法、k-means聚類算法和PageRank算法進行綜合測試,將優(yōu)化算法嵌在插件中,通過建立的Spark平臺對DSCS策略進行驗證和評估,對比優(yōu)化前后的作業(yè)執(zhí)行時間和內(nèi)存利用率,實驗結(jié)果如圖4所示。其中WordCount算法輸入集為Wiki,數(shù)據(jù)大小為7.8 GB;TeraSort算法輸入數(shù)據(jù)大小為3.5 GB;k-means算法輸入數(shù)據(jù)大小為16.4 GB;PageRank算法輸入數(shù)據(jù)大小為1.2 GB。

Figure 4 Overall evaluation of algorithm

通過圖4a和圖4b可知,經(jīng)過并行度優(yōu)化策略的調(diào)整,4個作業(yè)的作業(yè)執(zhí)行時間均得到降低,DSCS有效地提高了工作節(jié)點的平均內(nèi)存利用率。對并行度預估算法而言,作業(yè)處理的數(shù)據(jù)量和工作節(jié)點的內(nèi)存容量是評估并行度的重要依據(jù),能夠盡量減少工作節(jié)點內(nèi)存溢出和頻繁垃圾收集,縮短作業(yè)執(zhí)行時間;對于數(shù)據(jù)傾斜修正算法,數(shù)據(jù)分布越傾斜,寬依賴的同步代價越大,延時也越大。數(shù)據(jù)傾斜修正算法能夠?qū)A斜度較大的數(shù)據(jù)進行劃分,為合理的分區(qū)映射提供依據(jù);異構(gòu)節(jié)點任務(wù)分配算法能夠根據(jù)節(jié)點的計算能力分配數(shù)據(jù)塊,解決節(jié)點計算能力不均衡的問題,提高工作節(jié)點資源利用率和計算效率。因此,從總體上來看,數(shù)據(jù)傾斜修正調(diào)度策略具有良好的優(yōu)化效果。

6 結(jié)束語

現(xiàn)有的研究較少關(guān)注異構(gòu)Spark系統(tǒng)設(shè)置中的并行度參數(shù),往往根據(jù)經(jīng)驗進行設(shè)定,很難契合不同的作業(yè)類型和數(shù)據(jù)量大小,不能在作業(yè)各計算階段的執(zhí)行過程中發(fā)生變化。因此,本文根據(jù)作業(yè)的數(shù)據(jù)量、數(shù)據(jù)分布傾斜情況和節(jié)點的計算能力進行評估,提出了適應(yīng)計算階段狀態(tài)變化的數(shù)據(jù)傾斜修正調(diào)度策略,其中包括并行度預估算法、數(shù)據(jù)傾斜修正算法和異構(gòu)節(jié)點任務(wù)分配算法3個部分。計算細粒度任務(wù)的并行度并進行計算數(shù)據(jù)合理分配。實驗表明,該策略能夠較好地貼合作業(yè)類型、數(shù)據(jù)分布和節(jié)點計算能力,有效地提高了作業(yè)執(zhí)行效率。下一步將針對于每階段任務(wù)的具體操作,計劃采用回歸的方法,爭取對數(shù)據(jù)的內(nèi)存需求進行更精確的預估,從而實現(xiàn)作業(yè)執(zhí)行效率更高級別的提升,并將研究內(nèi)容延伸至流式計算平臺的并行效率優(yōu)化。

猜你喜歡
鍵值數(shù)據(jù)量異構(gòu)
ETC拓展應(yīng)用場景下的多源異構(gòu)交易系統(tǒng)
試論同課異構(gòu)之“同”與“異”
基于大數(shù)據(jù)量的初至層析成像算法優(yōu)化
高刷新率不容易顯示器需求與接口標準帶寬
非請勿進 為注冊表的重要鍵值上把“鎖”
寬帶信號采集與大數(shù)據(jù)量傳輸系統(tǒng)設(shè)計與研究
多源異構(gòu)數(shù)據(jù)整合系統(tǒng)在醫(yī)療大數(shù)據(jù)中的研究
吳健:多元異構(gòu)的數(shù)字敦煌
一鍵直達 Windows 10注冊表編輯高招
注冊表值被刪除導致文件夾選項成空白
樟树市| 遵义县| 新邵县| 灵石县| 织金县| 岳西县| 梓潼县| 通渭县| 津南区| 阿荣旗| 新干县| 松潘县| 嘉黎县| 高阳县| 海安县| 庆安县| 通州市| 乐亭县| 册亨县| 扬中市| 玉门市| 二手房| 德保县| 农安县| 聂拉木县| 共和县| 上高县| 景洪市| 虎林市| 湖南省| 理塘县| 东城区| 亳州市| 北碚区| 万全县| 龙泉市| 封丘县| 临邑县| 平邑县| 辽阳县| 安泽县|