卞 琛,于 炯,修位蓉,英昌甜,錢育蓉
(新疆大學 信息科學與工程學院,烏魯木齊 830046) (*通信作者電子郵箱bianchen0720@126.com)
基于迭代填充的內存計算框架分區(qū)映射算法
卞 琛*,于 炯,修位蓉,英昌甜,錢育蓉
(新疆大學 信息科學與工程學院,烏魯木齊 830046) (*通信作者電子郵箱bianchen0720@126.com)
針對內存計算框架Spark在作業(yè)Shuffle階段一次分區(qū)產(chǎn)生的數(shù)據(jù)傾斜問題,提出一種內存計算框架的迭代填充分區(qū)映射算法(IFPM)。首先,分析Spark作業(yè)的執(zhí)行機制,建立作業(yè)效率模型和分區(qū)映射模型,給出作業(yè)執(zhí)行時間和分配傾斜度的定義,證明這些定義與作業(yè)執(zhí)行效率的因果邏輯關系;然后,根據(jù)模型和定義求解,設計擴展式數(shù)據(jù)分區(qū)算法(EPA)和迭代式分區(qū)映射算法(IMA),在Map端建立一對多分區(qū)函數(shù),并通過分區(qū)函數(shù)將部分數(shù)據(jù)填入擴展區(qū)內,在數(shù)據(jù)分布局部感知后再執(zhí)行擴展區(qū)迭代式的多輪數(shù)據(jù)分配,根據(jù)Reduce端已分配數(shù)據(jù)量建立適應性的擴展區(qū)映射規(guī)則,對原生區(qū)的數(shù)據(jù)傾斜進行逐步修正,以此保障數(shù)據(jù)分配的均衡性。實驗結果表明,在不同源數(shù)據(jù)分布條件下,算法均提高了作業(yè)Shuffle過程分區(qū)映射合理性,縮減了寬依賴Stage的同步時間,提高了作業(yè)執(zhí)行效率。
內存計算;數(shù)據(jù)均衡;擴展式分區(qū);迭代式映射
近年來,利用內存的低延遲特性改進并行計算框架性能成為新的研究方向。內存計算框架避免了頻繁訪問磁盤的I/O性能瓶頸,解放了大內存+多核處理器硬件架構的潛在高性能,成為學術界一致認可的高性能并行計算系統(tǒng)[1-2]。雖然內存計算框架的性能表現(xiàn)優(yōu)異,但與大數(shù)據(jù)時代的即時應用需求相比,還存在不小的差距,因此,從計算模型的角度研究內存計算框架的性能優(yōu)化方法具有一定的現(xiàn)實意義。本文選取開源內存計算框架Spark[3-4]為研究對象,Spark以HDFS(Hadoop Distributed File System)為底層文件系統(tǒng),采用彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets, RDD)[5]作為數(shù)據(jù)結構,通過數(shù)據(jù)集血統(tǒng)(lineage)[5-6]和檢查點機制(checkpoint)[7-8]實現(xiàn)系統(tǒng)容錯,編程模式則借鑒了函數(shù)式編程語言的設計思想,簡化了多階段作業(yè)的流程跟蹤、任務重新執(zhí)行和周期性檢查點機制的實現(xiàn)。
在Spark作業(yè)的寬依賴Stage執(zhí)行過程中,Mapper將數(shù)據(jù)按key劃分并填入不同的Bucket,Bucket與Reducer為一一對應關系。由于原始數(shù)據(jù)分布的傾斜性,這樣的單一輪次分區(qū)映射過程使各Reducer計算數(shù)據(jù)量有較大差異,任務執(zhí)行時間長短不一,從而增加了寬依賴Stage的計算延時,降低了作業(yè)執(zhí)行效率。雖然系統(tǒng)支持用戶設定自定義分區(qū)函數(shù),但由于真實的數(shù)據(jù)分布難以預知,無法確保自定義分區(qū)函數(shù)的合理性和準確性,因此數(shù)據(jù)分配的傾斜問題不可規(guī)避。為解決這一問題,本文主要做了以下工作:
1)首先對內存計算框架的作業(yè)執(zhí)行機制進行分析,建立作業(yè)效率模型,給出了RDD計算代價和作業(yè)執(zhí)行時間的定義。
2)通過分析寬依賴RDD的計算過程,建立了分區(qū)映射模型,給出了源數(shù)據(jù)分布、分區(qū)映射、分配傾斜度的定義,并證明這些定義與作業(yè)執(zhí)行效率的因果關系。
3)通過模型的相關定義求解,設計了擴展式數(shù)據(jù)分區(qū)算法和迭代式分區(qū)映射算法,并對算法執(zhí)行的細節(jié)問題進行詳細的分析和說明。
在提出MapReduce的文獻[9]中,Dean等采用Hash函數(shù)對數(shù)據(jù)進行一次簡單的劃分,由于這種方法實現(xiàn)簡單且通用性高,成為開源的Hadoop系統(tǒng)默認的分區(qū)方案。Spark作為類MapReduce系統(tǒng),在實現(xiàn)中也自然承接了MapReduce的分區(qū)方法,但實際應用表明,在不了解數(shù)據(jù)分布的情況下,一次Hash劃分的方法很難實現(xiàn)數(shù)據(jù)的合理分配。
一些研究成果致力于通過優(yōu)化原生的分區(qū)映射策略解決數(shù)據(jù)分配的均衡性問題,文獻[10]研究Map和Reduce兩個階段的任務執(zhí)行過程,通過分析數(shù)據(jù)不均衡分配的原因,歸納出數(shù)據(jù)傾斜的5個類別。文獻[11]提出SkewReduce策略,該策略建立用戶定義的代價模型,在作業(yè)執(zhí)行過程逐步收集元數(shù)據(jù),鄰近代價閾值時啟動分區(qū)映射過程,以實現(xiàn)計算數(shù)據(jù)量的均勻分配。文獻[12]提出MapReduce的增量式分區(qū)策略,將原始數(shù)據(jù)劃分為細粒度的微分區(qū),通過數(shù)據(jù)分布的逐步感知和已分配數(shù)據(jù)量的統(tǒng)計,采用Max-Min算法進行數(shù)據(jù)增量分配,達到數(shù)據(jù)分配逐漸均衡的目標。文獻[13]提出SkewTune,與上述的研究成果不同,SkewTune建立Reducer的任務剩余代價評估模型,通過對Reducer執(zhí)行進度進行統(tǒng)計,決定是否將數(shù)據(jù)向其他Reducer遷移。由于數(shù)據(jù)的二次遷移將延遲Reducer計算任務,因此相比設計分區(qū)策略保證數(shù)據(jù)均衡分配的方法,SkewTune具有較大的額外開銷。文獻[14]為實現(xiàn)分區(qū)數(shù)據(jù)的實時統(tǒng)計,在系統(tǒng)中增加額外的數(shù)據(jù)構Sketch-based,通過設計的分包算法進行Reducer計算數(shù)據(jù)量的動態(tài)調配,達到數(shù)據(jù)均衡分配的目標。
另外一些研究成果期望通過數(shù)據(jù)分布的逐步感知建立合理的分區(qū)映射方案。文獻[15]通過在Mapper增加采樣進程感知原始數(shù)據(jù)分布,已生成的分區(qū)容量達到閾值后進行重組或拆分,保障分配數(shù)據(jù)的均衡性。文獻[16-17]提出精細分區(qū)和動態(tài)拆分兩種算法,精細分區(qū)算法采樣獲得近似數(shù)據(jù)分布,動態(tài)拆分函數(shù)在Map任務完成一定比例后觸發(fā),進行分區(qū)容量的二次調整,達到數(shù)據(jù)合理分配的目標。文獻[18-19]提出基于〈block,entity〉數(shù)據(jù)塊的分區(qū)方法,通過評估函數(shù)對超出閾值的數(shù)據(jù)塊進行調整,但沒有精確定義分區(qū)調整的時機問題。文獻[20]提出提前采樣的策略,在Map任務執(zhí)行前先對輸入數(shù)據(jù)進行25%的隨機采樣,通過采樣結果獲得數(shù)據(jù)分布并制定分區(qū)函數(shù)。文獻[21]提出LEEN策略,通過對輸入數(shù)據(jù)的預掃描獲取數(shù)據(jù)分布,在Map任務執(zhí)行過程中逐步統(tǒng)計key的頻率,然后綜合數(shù)據(jù)分布和key頻率設定合理的分區(qū)函數(shù)。
本文與上述研究成果的不同之處在于從寬依賴Stage數(shù)據(jù)分配的基本原理入手,以提高作業(yè)的整體執(zhí)行效率為目的,設計了迭代填充分區(qū)映射算法,解決同構集群環(huán)境下數(shù)據(jù)分配的均衡性問題。通過分析作業(yè)的執(zhí)行過程,建立了作業(yè)效率模型,提出了RDD計算代價和作業(yè)執(zhí)行時間的定義。建立分區(qū)映射模型,提出了源數(shù)據(jù)分布、分配傾斜度的定義,并證明了兩個定義與作業(yè)執(zhí)行時間的因果關系。根據(jù)模型和定義求解,設計了擴展式數(shù)據(jù)分區(qū)算法和迭代式分區(qū)映射算法。通過擴展式分區(qū)預留部分原始數(shù)據(jù),并設計擴展區(qū)的延遲映射機制,為迭代式分區(qū)映射奠定基礎。通過擴展區(qū)迭代式的多輪數(shù)據(jù)分配,對原生區(qū)的數(shù)據(jù)傾斜進行逐步修正,減少各Reducer分配數(shù)據(jù)量差異,從而從整體上提高寬依賴Stage的計算速度,提高作業(yè)執(zhí)行效率。相比已有的研究工作,迭代填充分區(qū)映射算法更適宜于內存計算框架的性能優(yōu)化,并具有較高的普適性和易用性。
本章首先分析Spark作業(yè)的執(zhí)行機制,建立作業(yè)效率模型和分區(qū)映射模型,然后提出迭代填充分區(qū)映射的優(yōu)化目標,為第3章的算法設計提供理論基礎。
2.1 作業(yè)執(zhí)行機制
Spark將操作分為Transformation和Action兩類,調度策略采用延時調度機制,即當Action操作執(zhí)行時,作業(yè)才會分發(fā)到集群執(zhí)行?;谘訒r調度的原理,Spark會首先根據(jù)RDD的血統(tǒng)生成作業(yè)的有向無環(huán)圖(Directed Acyclic Graph,DAG),如圖1所示。其中虛線框代表Stage,圓角矩形代表RDD,填充方框表示RDD分區(qū)。Stage的劃分以寬依賴為邊界,各Stage順序執(zhí)行,直至計算出最終結果。集群任務分配則以數(shù)據(jù)本地性作為依據(jù),即任務總是調度給具有最佳數(shù)據(jù)本地性的工作節(jié)點,以減少網(wǎng)絡通信延時,提高作業(yè)執(zhí)行效率。
圖1 Spark作業(yè)的有向無環(huán)圖
2.2 作業(yè)效率模型
根據(jù)2.1節(jié)的描述,Spark作業(yè)在執(zhí)行時劃分為多個Stage同步執(zhí)行,每個Stage由一個或多個RDD構成,每個RDD由多個分區(qū)并行計算生成,因此,記一個作業(yè)的Stage集合為stages={stg1,stg2,…,stgi},每個Stage包含的RDD表示為集合stgi={RDDi1,RDDi2,…,RDDij}, 其中RDDij表示第i個Stage中第j個RDD,對于每個RDD,其分區(qū)集合記為RDDij={Pij1,Pij2,…,Pijk},這里Pijk表示RDDij中的第k個分區(qū)。
定義1 RDD計算代價。Spark任務中,分區(qū)是最基本的計算單位,分區(qū)計算首先要讀取輸入,再根據(jù)閉包運算符和操作符進行運算。設Parentsijk為分區(qū)Pijk的父分區(qū)集合,用于表示分區(qū)計算的輸入數(shù)據(jù),那么分區(qū)Pijk的計算代價為數(shù)據(jù)讀取代價與數(shù)據(jù)處理代價之和,本文以分區(qū)計算時間作為衡量計算代價的唯一指標,即:
TPijk=read(Parentsijk)+proc(Parentsijk)
(1)
每個RDD的分區(qū)分配到不同的工作節(jié)點并行計算生成,因此RDD計算代價為所有分區(qū)計算代價的最大值,即:
TRDDij=max(TPij1,TPij2,…,TPijk)
(2)
定義2 作業(yè)執(zhí)行時間。如圖1所示,Spark將Stage分為窄依賴和寬依賴兩類。對于窄依賴Stage,每個Stage包括多條流水線(每條流水線包括多個RDD的不同分區(qū))。設窄依賴stagei共有h個RDD,所有RDD劃分為x條流水線,單條流水線的分區(qū)集合為pipeix={Pi1x,Pi2x,…,Pijx},那么單條流水線的執(zhí)行時間可表示為:
(3)
對于stagei,記其流水線集合為Pipesi={pipei1,pipei2,…,pipeix},那么stagei的執(zhí)行時間應為各流水線執(zhí)行時間最大值,即:
Tstagei=max(Tpipei1,Tpipei2,…,Tpipeix)
(4)
設stagei+1為寬依賴,則其中僅包含一個RDD的計算任務,記為RDD(i+1)j,那么stagei的執(zhí)行時間與RDD(i+1)j的計算代價相同,即:
Tstagei+1=TRDD(i+1)j
(5)
若Spark作業(yè)共有n個Stage(其中包括若干個窄依賴和寬依賴Stage),則各Stage順序執(zhí)行,因此作業(yè)執(zhí)行總時長為:
(6)
2.3 分區(qū)映射模型
作業(yè)的寬依賴Stage分Map和Reduce兩個階段執(zhí)行,其中Map階段將前一Stage的生成結果轉化為〈key,value〉元組,放入不同的Bucket中,每個Bucket對應一個Reduce任務,所有Map任務執(zhí)行結束后,由Reducer到各個工作節(jié)點拉取對應Bucket的數(shù)據(jù),完成后續(xù)計算。由于工作節(jié)點內存空間有限,為防止頻繁內存回收,Spark將Bucket數(shù)據(jù)寫入磁盤,以保證Reducer輸入數(shù)據(jù)的可用性。
定義3 源數(shù)據(jù)分布。用于描述輸入數(shù)據(jù)在Mapper端的分布情況。記源數(shù)據(jù)的key集合為keys={key1,key2,…,keyl},即源數(shù)據(jù)有l(wèi)個不同的key,記作業(yè)的Mapper集合為mps={1, 2,…,m},那么對于編號為m的任意Mapper,其數(shù)據(jù)分布可表示為:
Am=(Am1,Am2,…,Aml)T
(7)
其中Aml表示第l個key在第m個Mapper上的數(shù)據(jù)量。將所有Mapper的數(shù)據(jù)分布向量進行歸并,那么源數(shù)據(jù)的整體分布可表示為m×l矩陣:
(8)
矩陣中同行元素表示相同key在不同Mapper上的數(shù)據(jù)分布,映射過程同行元素也由相同的Reducer完成計算,因此將數(shù)據(jù)按key進行數(shù)據(jù)量統(tǒng)計,任意key的數(shù)據(jù)總量可表示為:
(9)
那么將源數(shù)據(jù)按key進行劃分,可表示為如下集合:
S={c1,c2,…,cl};l∈keys
(10)
定義4 分區(qū)映射。用于描述Mapper數(shù)據(jù)分布中key與Reducer之間的映射關系,分區(qū)映射也表示與Reducer對應Bucket的填充規(guī)則。Spark系統(tǒng)延用MapReduce的一次分區(qū)機制,默認對key進行哈希值轉換,再與Reducer的數(shù)量取模,以此決定數(shù)據(jù)所對應的Bucket,因此原生的分區(qū)函數(shù)可表示為:
f(Bucket)=hash(key)mod(n)
(11)
通過上述的分區(qū)函數(shù)可以看出,分區(qū)函數(shù)保證相同key值數(shù)據(jù)存放在同一個Bucket。由于所有Mapper采用同一分區(qū)函數(shù)劃分數(shù)據(jù),因此源數(shù)據(jù)中所有相同key數(shù)據(jù)都映射到同一Reducer。
記作業(yè)的Reducer集合為rds={rd1,rd2,…,rdn},那么任意Reducer的分區(qū)映射關系可表示為:
inputrdi|→ {ci,cn+i,…,cj×n+i};j∈[0,l/n]
(12)
定義5 分配傾斜度。用于描述Reducer分配數(shù)據(jù)與均值的差異程度。由定義3可知,Reducer集合要處理的數(shù)據(jù)總量為S,那么在同構集群環(huán)境下,各Reducer分配數(shù)據(jù)量的均值應表示為:
(13)
根據(jù)定義4的描述,Spark依舊延用MapReduce的一次分區(qū)技術,將key的哈希值與Reducer數(shù)量取模,以判定該key數(shù)據(jù)與Reducer的對應關系,但由于key的哈希值與其在數(shù)據(jù)分布的出現(xiàn)頻率無關,即與相同key的元組數(shù)無關,因此在多數(shù)情況下,各Reducer的數(shù)據(jù)分配量與均值不匹配,根據(jù)式(11)、(12),將任意Reducer的分配傾斜度定義為:
Qi=inputrdi/E
(14)
定理1 在同構集群環(huán)境下,對于所有執(zhí)行寬依賴Stage的Reducer,其分配傾斜度越小,作業(yè)的執(zhí)行效率越高。
證明 設作業(yè)當前執(zhí)行寬依賴Stagei,基于定義3,寬依賴Stage僅包含一個RDD的計算工作,因此Stagei的執(zhí)行時間等于RDDij的計算代價。記任意的Reducer計算時間為Tfinishn,由于在任務分配中,每個Reducer負責計算RDDij的一個分區(qū),因此Stagei的執(zhí)行時間也可表示為:
Tstagei=TRDDij=max(Tfinish1,Tfinish2,…Tfinishn)
(15)
同構集群環(huán)境下,各Reducer的計算能力基本一致,因此輸入數(shù)據(jù)量成為決定計算時長的唯一因素。根據(jù)定義5的描述,分配傾斜度表示Reducer數(shù)據(jù)分配與均值的差異,均值代表完全均勻的數(shù)據(jù)分配,因此對于所有執(zhí)行寬依賴Stage的Reducer,其分配傾斜度越小,作業(yè)的執(zhí)行效率越高。
本章基于模型相關的定義和證明,提出擴展式數(shù)據(jù)分區(qū)算法和迭代式分區(qū)映射算法,并對算法的執(zhí)行細節(jié)進行分析和說明。
3.1 算法的總體描述
根據(jù)2.3節(jié)定義4,傳統(tǒng)Spark的分區(qū)方法延用MapReduce的一次劃分方法,數(shù)據(jù)分配與key的個數(shù)有關而與分配數(shù)據(jù)量無關,導致數(shù)據(jù)發(fā)生傾斜影響作業(yè)執(zhí)行效率,因此,迭代填充分區(qū)映射算法的目標是提高數(shù)據(jù)分配策略與數(shù)據(jù)量的關聯(lián)度,以此增加分配策略的合理性,但由于在所有Map任務完成之前難以預知真實的數(shù)據(jù)分布,因此考慮改進既有的一次分區(qū)策略,通過多輪的分區(qū)映射過程達到數(shù)據(jù)適應性分配的目標。
迭代填充分區(qū)映射算法的主要思想是:1)將Mapper與Reducer之間的數(shù)據(jù)緩沖區(qū)劃分為原生區(qū)和擴展區(qū)兩部分,每個區(qū)域包含的Bucket數(shù)量與Reducer的個數(shù)相同。2)在原生分區(qū)策略的基礎上加以改進,保證大部分數(shù)據(jù)寫入原生區(qū),而小部分key的數(shù)據(jù)寫入擴展區(qū),并能夠對應到擴展區(qū)中不同的Bucket編號。3)原生區(qū)中的Bucket與Reducer之間為固定對應關系,當某個Mapper計算完畢后,所有Reducer即可開始進行原生區(qū)的數(shù)據(jù)拉取。4)初始狀態(tài)下,擴展區(qū)中的Bucket與Reducer無對應關系,達到特定時機則啟動后續(xù)輪次分配,將擴展區(qū)的數(shù)據(jù)逐步映射到Reducer。
3.2 擴展式數(shù)據(jù)分區(qū)算法
擴展式數(shù)據(jù)分區(qū)算法的主要步驟如下:
1)確定擴展參數(shù)x,原生區(qū)和擴展區(qū)生成Bucket,原生區(qū)的Bucket數(shù)量與Reducer的個數(shù)n相同,擴展區(qū)的Bucket數(shù)量為n×x。
2)Mapper計算hash(key)mod(n+x)獲得寫入數(shù)據(jù)的Bucket編號,若編號小于n,則寫入數(shù)據(jù),本次過程結束;若編號大于等于n,則表示數(shù)據(jù)應放入擴展區(qū),繼續(xù)執(zhí)行步驟3)。
3)對于hash(key)mod(n+x)≥n的情況,繼續(xù)計算(hash(key)/(n+x))mod(n×x),確定該數(shù)據(jù)在擴展區(qū)中的Bucket編號并寫入數(shù)據(jù),本次過程結束。
擴展式數(shù)據(jù)分區(qū)算法的偽代碼如下:
算法1 擴展式數(shù)據(jù)分區(qū)算法。
輸入:原生區(qū)native;擴展區(qū)extension;Reducer個數(shù)n;源數(shù)據(jù)鍵值key;擴展參數(shù)x。
初始化:bukNo←-1;
1)
native.creatBucket(n);
2)
extension.creatBucket(n*x);
3)
bukNo=hash(key) mod (n+x);
4)
if(bukNo 5) write(key,native[bukNo]); 6) else 7) bukNo= (hash(key) /(n+x)) mod (n*x); 8) write(key,extension[bukNo]); 9) end if 由算法描述可以看出,擴展參數(shù)決定了原生區(qū)與擴展區(qū)的劃分比例,而擴展區(qū)則為后續(xù)的分區(qū)映射算法服務,通過多輪分配漸進填充,提高數(shù)據(jù)分配的合理性。 3.3 迭代式分區(qū)映射算法 根據(jù)3.1節(jié)的描述,原生區(qū)的Bucket數(shù)量與Reducer個數(shù)相同,兩者之間為一一對應關系,由于原生區(qū)的生成方式與MapReduce的一次分區(qū)策略相同,難以保證數(shù)據(jù)的均勻分配,因而擴展區(qū)的后續(xù)輪次分配的合理性成為算法目標實現(xiàn)的關鍵問題。為達到精準分配,本文方法在原生Spark系統(tǒng)中增加了1個計數(shù)器counter和1個數(shù)據(jù)構RelationSchema,counter用于統(tǒng)計擴展區(qū)內各Bucket的數(shù)據(jù)量,RelationSchema用于表示Bucket與Reducer的映射關系。原生區(qū)映射過程與傳統(tǒng)Spark相同,不再贅述,下面重點討論擴展區(qū)的映射過程,其主要步驟如下: 1)將擴展區(qū)中的Bucket倒序排列,并選取前n個Bucket生成待分配列表。 2)對所有Reducer的RelationSchema進行映射數(shù)據(jù)量統(tǒng)計,挑選出映射數(shù)據(jù)量最小的Reducer。 3)將分配列表容量最大的Bucket與映射數(shù)據(jù)量最小的Reducer建立一一對應的映射關系,更新RelationSchema。 4)重復步驟2),直到n個Bucket都映射完畢。 5)啟動數(shù)據(jù)拉取進程,等待下一輪映射過程。 算法2 迭代式分區(qū)映射算法。 輸入:擴展區(qū)extension;Reducer集合rds; 初始化:candis←newList〈Bucket〉; //待分配列表 1) extension.orderDesc(); 2) candis=extension.getTop(n); 3) fori=0 ton-1 do 4) rds.RelationSchema.statistics(); 5) minload=min(rds); //負載最小Reducer 6) minload.mapping(candis[i]); //建立映射 7) minload.RelationSchema.update(); 8) end for 9) start pull; //啟動數(shù)據(jù)拉取進程 10) waitfor nextround; //等待下一輪分配 接下來討論分區(qū)映射算法執(zhí)行的時機問題,原生區(qū)的映射過程依舊采用傳統(tǒng)Spark的處理方式,即當?shù)?個Mapper計算完成后,所有Reducer即可從該Mapper拉取數(shù)據(jù)。而對于擴展區(qū)的映射過程,由于僅當所有Mapper都計算完成才能獲得精確的擴展區(qū)數(shù)據(jù)分布,因此若算法過早執(zhí)行,計數(shù)器counter的統(tǒng)計結果不夠精確,影響分區(qū)映射的合理性,而過晚執(zhí)行則會使Reducer處于饑餓狀態(tài),影響了作業(yè)的執(zhí)行效率,因此分區(qū)映射算法的執(zhí)行時機應設定為不影響作業(yè)執(zhí)行效率的最晚時間,即當任意1個Reducer完成原生區(qū)的拉取工作,即啟動第1次分區(qū)映射算法,而后續(xù)輪次分配的執(zhí)行時機均為上一輪拉取工作結束時間,以此類推,完成整個擴展區(qū)的映射過程。因此,每一輪分區(qū)映射過程都是對上一輪因統(tǒng)計結果不精確而產(chǎn)生的分配誤差進行修正,從而經(jīng)過多輪迭代求得數(shù)據(jù)分配的近似最優(yōu)解。 本章通過實驗比較和評價,驗證迭代填充分區(qū)映射算法的有效性。 4.1 實驗環(huán)境 實驗環(huán)境搭建采用1臺服務器和8個工作節(jié)點組成的集群,其中服務器作為Hadoop的NameNode和Spark的Master,主要配置為16顆4核心處理器陣列、256GB內存和4個千兆網(wǎng)卡。8個工作節(jié)點作為DataNode和Slave,配置如表1所示。參數(shù)配置方面,HDFS的默認備份數(shù)為3,Block大小為64MB,Spark的并行參數(shù)值(spark.default.parallelism)設置為16。作業(yè)執(zhí)行時間的監(jiān)測通過Spark控制臺,各種資源的使用狀況數(shù)據(jù)來源于nmon。 實驗數(shù)據(jù)選取Zipf數(shù)據(jù)集和有向圖兩種類型,其中Zipf數(shù)據(jù)集主要包括9個子數(shù)據(jù)集,總量為7.3GB,用于執(zhí)行WordCount作業(yè)。每個子數(shù)據(jù)集滿足指數(shù)為γ的標準Zipf分布,γ取值范圍為0.2~1.0的小數(shù),增量為0.1,γ的取值越大,表示數(shù)據(jù)分布越傾斜。有向圖主要包括SNAP(StanfordNetworkAnalysisProject)[22]提供的標準數(shù)據(jù)集,用于執(zhí)行PageRank作業(yè),如表2所示。 表1 工作節(jié)點配置參數(shù) 表2 測試數(shù)據(jù)集列表 4.2 擴展參數(shù)評估實驗 迭代填充分區(qū)映射算法通過引入擴展參數(shù),確定原生區(qū)與擴展區(qū)的劃分比例,同時擴展參數(shù)也決定了數(shù)據(jù)分配的輪數(shù),因此實驗首先驗證擴展參數(shù)對作業(yè)執(zhí)行效率的影響。實驗選取Zipf數(shù)據(jù)集中γ取值為0.3、0.6和0.9的3個子數(shù)據(jù)集執(zhí)行WordCount作業(yè),實驗結果如圖2所示。 圖2 擴展參數(shù)影響實驗 由圖2可以看出,對于Zipf-0.3數(shù)據(jù)集,由于數(shù)據(jù)分布的傾斜度較低,其作業(yè)執(zhí)行效率隨擴展參數(shù)值的增大,優(yōu)化效果并不明顯。而對于Zipf-0.6和Zipf-0.9,在前4個監(jiān)測點,隨著擴展參數(shù)值的增大,作業(yè)執(zhí)行時間急劇下降,這是因為在數(shù)據(jù)分布傾斜度較大的情況下,原生區(qū)中各Bucket數(shù)據(jù)量差異較大,通過擴展參數(shù)的介入,能夠附加額外的數(shù)據(jù)分配,修正原生區(qū)數(shù)據(jù)分配產(chǎn)生的誤差,因此擴展參數(shù)值越大,修正效果越明顯。當擴展參數(shù)值為4時,作業(yè)執(zhí)行效率的優(yōu)化效果趨于穩(wěn)定,在后2個監(jiān)測點,作業(yè)執(zhí)行時間又出現(xiàn)小幅提高,這是由于擴展系數(shù)具有最優(yōu)上限,在此基礎上繼續(xù)增加分配輪數(shù)也無法提高作業(yè)執(zhí)行效率;達到最優(yōu)上限后,算法的額外開銷開始顯現(xiàn),額外開銷導致了作業(yè)執(zhí)行效率的輕微下降。 4.3WordCount對比實驗 實驗選取5個不同分布的Zipf數(shù)據(jù)集執(zhí)行WordCount作業(yè),對比迭代填充分區(qū)映射算法與傳統(tǒng)Spark的性能差異。其中擴展參數(shù)值統(tǒng)一設置為4,Spark啟動的Reducer數(shù)量為16。實驗首先監(jiān)測最大負載節(jié)點和最小負載節(jié)點的分配數(shù)據(jù)量變化,實驗結果如圖3所示。 由圖3可以看出,與傳統(tǒng)Spark環(huán)境相對比,迭代填充分區(qū)映射算法降低了最大負載工作節(jié)點的計算數(shù)據(jù)量,提高了最小負載節(jié)點的數(shù)據(jù)量。這是因為傳統(tǒng)Spark一次分區(qū)策略對原始數(shù)據(jù)的分布不敏感,也缺乏有效的應對策略,因此數(shù)據(jù)分布的傾斜性導致了最大、最小負載節(jié)點之間的數(shù)據(jù)分配差。而對于迭代填充分區(qū)算法,擴展區(qū)的分配是對原生區(qū)數(shù)據(jù)傾斜分配有效彌補,從而產(chǎn)生相對均衡的數(shù)據(jù)分配。綜合來看,隨著Zipf分布指數(shù)的增大,傳統(tǒng)Spark的數(shù)據(jù)分配量差異越來越明顯,而迭代填充分區(qū)映射算法始終保持較為穩(wěn)定的均勻狀態(tài)。這是由于在傳統(tǒng)Spark環(huán)境下,數(shù)據(jù)傾斜度越大,工作節(jié)點數(shù)據(jù)量差異也越大,分配效果也越差。而迭代填充分區(qū)映射算法的映射過程是通過多輪次分配完成,每一輪分配都是對上一輪分配誤差的修正,有效降低了數(shù)據(jù)分布對分配效果的影響,因此,從數(shù)據(jù)分配合理性的角度來看,迭代填充分區(qū)映射算法具有良好的優(yōu)化效果。 圖3 分配數(shù)據(jù)量對比 圖4顯示了不同分布數(shù)據(jù)集作業(yè)執(zhí)行時間的對比,從實驗結果來看,對于不同分布的Zipf數(shù)據(jù)集,迭代填充分區(qū)映射算法的作業(yè)執(zhí)行時間均小于傳統(tǒng)Spark的執(zhí)行時間。根據(jù)最大、最小節(jié)點的數(shù)據(jù)分配量可知,迭代填充分區(qū)映射算法保障了同構集群環(huán)境上數(shù)據(jù)分配的均衡性,具有相同計算能力的Reducer分配計算量差異較小,各任務完成時間也較為接近,因此寬依賴Stage的執(zhí)行時間較短,作業(yè)的執(zhí)行效率更高。而對于傳統(tǒng)Spark,由于其對數(shù)據(jù)傾斜分布無任何有效應對策略,往往導致相同計算能力Reducer所分配的計算數(shù)據(jù)量有較大差異,各任務完成時間長短不一,因此寬依賴Stage的執(zhí)行時間較長,降低了作業(yè)的整體執(zhí)行效率。從優(yōu)化效果來看,數(shù)據(jù)分布傾斜度越大,迭代填充分區(qū)映射算法的優(yōu)化效果越明顯,但優(yōu)化效果并未隨分布指數(shù)據(jù)的增大呈線性增加趨勢,這是因為無法預知精確的數(shù)據(jù)分布,迭代填充分區(qū)映射算法僅是在現(xiàn)有條件下提供較為均衡的數(shù)據(jù)分配方案,而且擴展區(qū)的預留也采用固定的公式計算,不能根據(jù)不同數(shù)據(jù)分布進行靈活的適應性調整,因此對于不同的數(shù)據(jù)集其優(yōu)化效果無明顯規(guī)律。 圖4 WordCount作業(yè)執(zhí)行時間對比 4.4PageRank對比實驗 上一節(jié)通過WordCount作業(yè)驗證了算法的有效性,但由于WordCount僅包含一個依賴操作,Reducer也僅是作簡單的加法運算,不能完全體現(xiàn)迭代填充分區(qū)映射算法的優(yōu)化效果,因此本節(jié)選擇寬依賴個數(shù)更多、操作復雜度更高的PageRank作業(yè)對算法作進一步評估。實驗選擇了2個不同大小的數(shù)據(jù)集進行,擴展參數(shù)值為4,Reducer個數(shù)為16,實驗結果如圖5所示。 圖5 PageRank作業(yè)執(zhí)行時間對比 由圖5可以看出,對于每一個數(shù)據(jù)集,傳統(tǒng)Spark和迭代填充分區(qū)映射算法的作業(yè)執(zhí)行時間都隨迭代次數(shù)的增加而上升;在每一個監(jiān)測點,傳統(tǒng)Spark的作業(yè)執(zhí)行時間均大于迭代填充分區(qū)映射算法,從而證明了本文算法對Spark框架的性能優(yōu)化具有良好的效果,也驗證了理論模型及算法設計的正確性。從不同迭代次數(shù)的效率差異來看,作業(yè)的迭代次數(shù)越多,執(zhí)行時間的優(yōu)化度越高,作業(yè)執(zhí)行時間的縮減比基本隨迭代次數(shù)據(jù)增加呈線性增長趨勢,這是由于在PageRank作業(yè)中,每輪迭代的數(shù)據(jù)分布都相同,因此迭代填充分區(qū)映射算法每輪迭代的優(yōu)化效果也基本相同。從作業(yè)執(zhí)行的整體趨勢來看,隨著迭代次數(shù)的增加,傳統(tǒng)Spark的作業(yè)執(zhí)行時間上升幅度較大,而迭代填充分區(qū)映射算法由于其多輪分配均衡了不同Reducer間的計算數(shù)據(jù)量,加速了寬依賴Stage的執(zhí)行,因此作業(yè)執(zhí)行時間上升趨勢較為緩和。由此可以看出,傳統(tǒng)Spark的作業(yè)執(zhí)行效率受寬依賴的影響較大,而迭代填充分區(qū)映射算法對寬依賴的敏感度較低,寬依賴Stage越多,越能夠體現(xiàn)算法的優(yōu)化效果,作業(yè)執(zhí)行的加速效應也越明顯。 本文針對Spark寬依賴Stage數(shù)據(jù)分區(qū)的傾斜問題,首先分析Spark作業(yè)執(zhí)行機制,建立了作業(yè)效率模型,給出了RDD計算代價和作業(yè)執(zhí)行時間的定義。通過對Spark框架原始的分區(qū)策略進行研究和分析,建立了分區(qū)映射模型,給出了源數(shù)據(jù)分布、分區(qū)映射和分配傾斜度的定義,并證明了這些定義對作業(yè)執(zhí)行效率影響,為算法設計提供依據(jù)。其次,在相關定義和證明的基礎上,提出擴展式數(shù)據(jù)分區(qū)算法和迭代式分區(qū)映射算法,并對算法的執(zhí)行細節(jié)進行分析和說明。最后,通過不同的實驗驗證算法的有效性,實驗結果表明,迭代填充分區(qū)映射算法提高了數(shù)據(jù)分配的合理性,優(yōu)化了寬依賴Stage的作業(yè)執(zhí)行效率。下一步的研究方向是探索異構集群下適應工作節(jié)點計算能力的分區(qū)映射策略。 ) [1]STRANDESM,CICOTTIP,SINKOVITSRS,etal.Gordon:design,performance,andexperiencesdeployingandsupportingadataintensivesupercomputer[C]//Proceedingsofthe1stConferenceoftheExtremeScienceandEngineeringDiscoveryEnvironment:BridgingfromtheExtremetotheCampusandBeyond.NewYork:ACM, 2012:ArticleNo. 3. [2]BRONEVETSKYG,MOODYA.ScalableI/Osystemsvianode-localstorage:approaching1TB/secfileI/O,LLNL-TR-415791 [R].Livermore,CA:LawrenceLivermoreNationalLaboratory, 2009: 1-6. [3]ZAHARIAM,CHOWDHURYM,DAST,etal.FastandinteractiveanalyticsoverHadoopdatawithSpark[J].Login, 2012, 37(4): 45-51. [4]ApacheSpark.Sparkoverview[EB/OL]. [2015- 03- 18].http://spark.apache.org. [5]ZAHARIAM,CHOWDHURYM,DAST,etal.Resilientdistributeddatasets:afault-tolerantabstractionforin-memoryclustercomputing[C]//Proceedingsofthe9thUSENIXConferenceonNetworkedSystemsDesignandImplementation.Berkeley,CA:USENIXAssociation, 2012: 2. [6]LINX,WANGP,WUB.LoganalysisincloudcomputingenvironmentwithHadoopandSpark[C]//Proceedingsofthe5thIEEEInternationalConferenceonBroadbandNetworkandMultimediaTechnology.Piscataway,NJ:IEEE, 2013: 273-276. [7]DONGX,XIEY,MURALIMANOHARN,etal.Hybridcheckpointingusingemergingnonvolatilememoriesforfutureexascalesystems[J].ACMTransactionsonArchitectureandCodeOptimization, 2011, 8(2): 510-521. [8] 慈軼為,張展,左德承,等.可擴展的多周期檢查點設置[J].軟件學報,2010,21(2):218-230.(CIYW,ZHANGZ,ZUODC,etal.Scalabletime-basedmulti-cyclecheckpointing[J].JournalofSoftware, 2010, 21(2): 218-230.) [9]DEANJ,GHEMAWATS.MapReduce:simplifieddataprocessingonlargeclusters[C]//Proceedingsofthe6thConferenceonSymposiumonOpeartingSystemsDesignandImplementation.Berkeley,CA:USENIXAssociation, 2004,6: 10. [10]KWONY,BALAZINSKAM,HOWEB,etal.AstudyofskewinMapReduceapplication[EB/OL]. [2016- 03- 18].https://www.researchgate.net/publication/228941278_A_Study_of_Skew_in_MapReduce_Applications. [11]KWONY,BALAZINSKAM,HOWEB,etal.Skew-resistantparallelprocessingoffeature-extractingscientificuser-definedfunctions[C]//Proceedingsofthe1stACMSymposiumonCloudComputing.NewYork:ACM, 2010: 75-86. [12] 王卓,陳群,李戰(zhàn)懷,等.基于增量式分區(qū)策略的MapReduce數(shù)據(jù)均衡方法[J].計算機學報,2016,39(1):19-35.(WANGZ,CHENQ,LIZH,etal.AnincrementalpartitioningstrategyfordatabalanceonMapReduce[J].ChineseJournalofComputers, 2016, 39(1): 19-35.) [13]KWONY,BALAZINSKAM,HOWEB,etal.SkewTune:mitigatingskewinMapReduceapplications[C]//Proceedingsofthe2012ACMSIGMODInternationalConferenceonManagementofData.NewYork:ACM, 2012: 25-36. [14]YANW,XUEY,MALINB.ScalableandrobustkeygroupsizeestimationforreducerloadbalancinginMapReduce[C]//Proceedingsofthe2013IEEEInternationalConferenceonBigData.Piscataway,NJ:IEEE, 2013: 156-162. [15]RAMAKRISHNANSR,SWARTG,URMANOVA,etal.BalancingreducerskewinMapReduceworkloadsusingprogressivesampling[C]//Proceedingsofthe3rdACMSymposiumonCloudComputing.NewYork:ACM, 2012:ArticleNo. 16. [16]GUFLERB,AUGSTENN,REISERA,etal.HandingdataskewinMapReduce[C]//Proceedingsofthe1stInternationalConferenceonCloudComputingandServicesScience.Berlin:Springer, 2011: 574-583. [17]GUFLERB,AUGSTENN,REISERA,etal.LoadbalancinginMapReducebasedonscalablecardinalityestimates[C]//Proceedingsofthe2012IEEE28thInternationalConferenceonDataEngineering.Washington,DC:IEEEComputerSociety, 2012: 522-533. [18]KOLBL,THORA,RAHME.LoadbalancingforMapReduce-basedentityresolution[C]//Proceedingsofthe2012IEEE28thInternationalConferenceonDataEngineering.Washington,DC:IEEEComputerSociety, 2012: 618-629. [19]KOLBL,THORA,RAHME,etal.Block-basedloadbalancingforentityresolutionwithMapReduce[C]//Proceedingsofthe20thACMInternationalConferenceonInformationandKnowledgeManagement.NewYork:ACM, 2011: 2397-2400. [20]RACHASC.LoadbalancingMap-Reducecommunicationsforefficientexecutionsofapplicationsinacloud[D].Bangalore,India:IndianInstituteofScience, 2012: 12-16. [21]IBRAHIMS,JINH,LUL,etal.HandlingpartitioningskewinMapReduceusingLEEN[J].Peer-to-PeerNetworkingandApplications, 2013, 6(4): 409-424. [22]JUREL.Stanfordnetworkanalysisproject[EB/OL]. [2015- 03- 18].http://snap.stanford.edu. ThisworkispartiallysupportedbytheNationalNaturalScienceFoundationofChina(61262088, 61462079, 61363083, 61562086),theEducationalResearchProgramofXinjiangUygurAutonomousRegion(XJEDU2016S106). BIAN Chen, born in 1981, Ph. D. candidate, associate professor. His research interests include parallel computing, distributed system. YU Jiong, born in 1964, Ph. D., professor. His research interests include grid computing, high performance computing. XIU Weirong, born in 1979, M. S., lecturer. Her research interests include data mining, distributed applications. YING Changtian, born in 1989. Ph. D. candidate. Her research interests include big data storage, in-memory computing. Qian Yurong, born in 1980. Ph. D., associate professor. Her research interests include cloud computing, graphics and image processing. Partitioning and mapping algorithm for in-memory computing framework based on iterative filling BIAN Chen*, YU Jiong, XIU Weirong, YING Changtian, QIAN Yurong (SchoolofInformationScienceandEngineering,XinjiangUniversity,UrumqiXinjiang830046,China) Focusing on the issue that the only one Hash/Range partitioning strategy in Spark usually results in unbalanced data load at Reduce phase and increases job duration sharply, an Iterative Filling data Partitioning and Mapping algorithm (IFPM) which include several innovative approaches was proposed. First of all, according to the analysis of job execute scheme of Spark, the job efficiency model and partition mapping model were established, the definitions of job execute timespan and allocation incline degree were given. Moreover, the Extendible Partitioning Algorithm (EPA) and Iterative Mapping Algorithm (IMA) were proposed, which reserved partial data into extend region by one-to-many partition function at Map phase. Data in extended region would be mapped by extra iterative allocation until the approximate data distribution was obtained, and the adaptive mapping function was executed by awareness of calculated data size at Reduce phase to revise the unbalanced data load in original region allocation. Experimental results demonstrate that for any distribution of the data, IFPM promotes the rationality of data load allocation from Map phase to Reduce phase and optimize the job efficiency of in-memory computing framework. in-memory computing; load balance; extendible partitioning; iterative mapping 2016- 09- 26; 2016- 10- 17。 國家自然科學基金資助項目(61262088, 61462079, 61363083, 61562086);新疆維吾爾自治區(qū)高??蒲杏媱濏椖?XJEDU2016S106)。 卞琛(1981—),男,江蘇南京人,副教授,博士研究生,CCF會員,主要研究方向:網(wǎng)絡計算、分布式系統(tǒng); 于炯(1964—),男,北京人,教授,博士,CCF高級會員,主要研究方向:網(wǎng)格計算、高性能計算; 修位蓉(1979—),女,重慶人,講師,碩士,主要研究方向:數(shù)據(jù)挖掘、分布式應用; 英昌甜(1989—),女,新疆烏魯木齊人,博士研究生,主要研究方向:大數(shù)據(jù)存儲、內存計算; 錢育蓉(1979—),女,新疆烏魯木齊人,副教授,博士,CCF會員,主要研究方向:云計算、圖形圖像處理。 1001- 9081(2017)03- 0647- 07 10.11772/j.issn.1001- 9081.2017.03.647 TP A4 實驗與評價
5 結語