高澤+李常寶+楊淙鈞+劉忠麟+艾中良
摘 要: 多表關(guān)聯(lián)查詢是進(jìn)行數(shù)據(jù)挖掘與分析的有效技術(shù)手段。隨著大數(shù)據(jù)時代的到來,當(dāng)前的數(shù)據(jù)分析技術(shù)在進(jìn)行海量數(shù)據(jù)多表聯(lián)查操作時存在明顯的性能瓶頸,為此提出一種基于MapReduce計算模型的多表聯(lián)查算法UGS用以提升多表關(guān)聯(lián)查詢效率。實驗表明,在海量數(shù)據(jù)背景下,該算法的查詢效率明顯優(yōu)于大數(shù)據(jù)領(lǐng)域的SparkSQL,Hive及關(guān)系型數(shù)據(jù)庫的MySQL。
關(guān)鍵詞: MapReduce; 多表聯(lián)查; 關(guān)聯(lián)空間剪枝; Spark
中圖分類號: TN911?34 文獻(xiàn)標(biāo)識碼: A 文章編號: 1004?373X(2015)14?0081?04
在當(dāng)今的生產(chǎn)生活中,圍繞著每個人每件事都會產(chǎn)生大量的數(shù)據(jù),而這些數(shù)據(jù)往往是分布在不同的數(shù)據(jù)文件中,想對這些數(shù)據(jù)進(jìn)行處理分析就必然要用到多表聯(lián)合查詢,聯(lián)合查詢在實際的生產(chǎn)生活中非常有必要。當(dāng)前的多表聯(lián)合查詢主要通過兩種方式實現(xiàn):一種是基于傳統(tǒng)數(shù)據(jù)庫的表JOIN方式,這種方式存在數(shù)據(jù)規(guī)模瓶頸問題,無法支撐大規(guī)模數(shù)據(jù)關(guān)聯(lián);另一種是基于大數(shù)據(jù)技術(shù)的多源數(shù)據(jù)融合[1]方式,雖然能夠解決關(guān)聯(lián)查詢在數(shù)據(jù)規(guī)模方面的瓶頸問題,但在運行效率方面存在較大的優(yōu)化空間,目前難以滿足交互式查詢需求。因此,針對當(dāng)前多表關(guān)聯(lián)查詢領(lǐng)域存在的問題,本文提出了一種基于MapReduce計算模型[2]的新型多表聯(lián)查方法;實驗表明,在解決多表關(guān)聯(lián)數(shù)據(jù)規(guī)模瓶頸的基礎(chǔ)上,較當(dāng)前大數(shù)據(jù)領(lǐng)域的多表關(guān)聯(lián)模式能夠顯著提升運行效率。
1 相關(guān)工作介紹
當(dāng)前多表關(guān)聯(lián)查詢主要借助2種方式實現(xiàn):關(guān)系型數(shù)據(jù)庫方式和分布式并行計算方式。下面通過一個關(guān)聯(lián)查詢實例對2種實現(xiàn)方式進(jìn)行復(fù)雜性分析。假設(shè)2張待聯(lián)查的表table1和table2,其中table1的數(shù)據(jù)量為C1條,table2的數(shù)據(jù)量為C2條。要求輸出table1.Key =table2.Key的條件下2張表的所有行,即:“SELECT * FROM table1 INNER JOIN table2 ON table1.Key=table2.Key”。
1.1 關(guān)系型數(shù)據(jù)庫的實現(xiàn)
傳統(tǒng)關(guān)系型數(shù)據(jù)庫的多表關(guān)聯(lián)查詢采用基于關(guān)聯(lián)條件的集合相乘思路實現(xiàn)[3],針對table1的每行數(shù)據(jù),在table2中對其關(guān)鍵字(Key)進(jìn)行查找,如果找到滿足條件的數(shù)據(jù),那么把它們組合成一條新數(shù)據(jù)存儲到結(jié)果數(shù)據(jù)集中。此時數(shù)據(jù)庫需要處理的條數(shù)為C1C2,也就是說時間復(fù)雜度為O(C1C2)。在此模式下,兩表規(guī)模均為10萬條數(shù)據(jù)時其響應(yīng)時間已經(jīng)達(dá)到5 min以上,兩表規(guī)模達(dá)到百萬條時,運行2 h仍未得到結(jié)果。
1.2 分布式處理引擎的實現(xiàn)
分布式并行計算實現(xiàn)方式主要基于大數(shù)據(jù)技術(shù)體系中的MapReduce模式展開,目前方法主要有Hive[4],Spark[5]兩種方式。
1.2.1 MapReduce編程模型概述
MapReduce編程模型以(Key,Value)元組為基本單位展開數(shù)據(jù)處理,整個處理過程分為Map、Reduce兩個階段:Map階段處理輸入數(shù)據(jù)并將處理結(jié)果基于Key值通過哈希計算映射到Reduce處理節(jié)點;Reduce階段處理本地數(shù)據(jù)并輸出結(jié)果。由于相同Key值的哈希計算結(jié)果是確定的,因此,每個Reduce處理節(jié)點上完整保存了該key值的所有數(shù)據(jù),編程人員在只需在每個Reduce節(jié)點處理本地數(shù)據(jù)即可完成對全局?jǐn)?shù)據(jù)的處理。
1.2.2 分布式處理引擎執(zhí)行多表聯(lián)查
Hive提供SQL查詢接口,通過對用戶輸入的查詢?nèi)蝿?wù)進(jìn)行語法樹解析,將SQL查詢轉(zhuǎn)化成Hadoop的MapReduce任務(wù)集,基于MapReduce展開數(shù)據(jù)處理[6],由于MapReduce存在中間數(shù)據(jù)磁盤讀寫瓶頸,從而在很大程度上限制了Hive的執(zhí)行效率。Spark分析引擎針對Hadoop的MapReduce中間數(shù)據(jù)磁盤讀寫瓶頸基于內(nèi)存計算展開優(yōu)化,使得同樣功能的任務(wù)在大部分情況下比Hadoop執(zhí)行效率更優(yōu),Spark在執(zhí)行多表關(guān)聯(lián)查詢時采用優(yōu)化的笛卡爾積關(guān)聯(lián)算法,雖然性能較傳統(tǒng)的笛卡爾積有所優(yōu)化,但是復(fù)雜度依舊為笛卡爾積的O(C1C2),并且空間復(fù)雜度為O(C1C2)。Hive在進(jìn)行數(shù)據(jù)關(guān)聯(lián)查詢時,單作業(yè)單機數(shù)據(jù)規(guī)模超過2 000 000×10 000 000時,執(zhí)行時間在1 min以上,存在較大的優(yōu)化空間;Spark對Hive的執(zhí)行過程進(jìn)行了基于內(nèi)存的執(zhí)行效率優(yōu)化,但關(guān)聯(lián)計算過程存在內(nèi)存占用不可控的問題,當(dāng)單作業(yè)單機數(shù)據(jù)規(guī)模超過20 000 000[×]100 000 000時,會因內(nèi)存溢出導(dǎo)致關(guān)聯(lián)查詢無法完成,數(shù)據(jù)規(guī)模相對較小時也存在一定的運行效率優(yōu)化空間。
因此需要設(shè)計一種空間膨脹相對可控,并且時間復(fù)雜度更低的算法來提高海量數(shù)據(jù)多表關(guān)聯(lián)效率,從而提升海量分析能力。
2 算法的設(shè)計與實現(xiàn)
2.1 算法思路
本算法主要借助MapReduce計算模型展開,在Map階段,對各表記錄添加來源標(biāo)記,并將各表數(shù)據(jù)采用相同的散列算法進(jìn)行映射分發(fā),使各表相同的Key值被集中到相同的處理節(jié)點上;在Reduce階段,基于各表標(biāo)記進(jìn)行關(guān)聯(lián)結(jié)果篩選,本地化獲取關(guān)聯(lián)查詢結(jié)果集。算法介紹如下:
算法名:UGS(Union Group and Segmentation)算法。
輸入?yún)?shù):參與關(guān)聯(lián)查詢的表路徑及關(guān)聯(lián)條件集,關(guān)聯(lián)查詢結(jié)果輸出路徑。
輸出數(shù)據(jù):關(guān)聯(lián)查詢結(jié)果。
執(zhí)行步驟:在上述實例中,算法在集群上的執(zhí)行過程如下:
(1) 在Map階段通過數(shù)據(jù)格式變換,將參與關(guān)聯(lián)查詢的各表數(shù)據(jù)統(tǒng)一為相同格式。將聯(lián)合查詢條件中的Key單獨抽取出來,其他數(shù)據(jù)存放在OtherRecord中,并添加標(biāo)記以記錄來源的TableID,Map階段輸出為 (Key,TableID,OtherRecord)。
(2) 在Reduce階段,輸入Map階段的輸出結(jié)果,對Key值相同的記錄進(jìn)行關(guān)聯(lián)篩選,如果某個Key存在于所有表中,那么是1條或多條(可能存在1個Key在某一table下存在多行)有效的結(jié)果。并將結(jié)果按表格式處理后輸出。
算法首先需要遍歷數(shù)據(jù),對每條數(shù)據(jù)通過Key計算出Reduce標(biāo)識,Reduce端完成數(shù)據(jù)收集后,在每個Reduce內(nèi)通過排序?qū)ey相同的記錄整合在一起然后進(jìn)行檢索條件的完備性判斷。在這種計算模式下,假設(shè)有N張表參與聯(lián)查,第i張表的數(shù)據(jù)量為[Ci],共有M個Map和R個Reduce參與并發(fā)計算。令[Sum= 1NCi],那么算法的期望時間復(fù)雜度為[O(SumM+SumR×log2SumR)],空間復(fù)雜度為O(Sum)。如在兩表聯(lián)查下,時間復(fù)雜度為[OC1+C2M+C1+C2R×log2C1+C2R],空間復(fù)雜度為O(C1+C2)。在最壞情況下,即多張表內(nèi)的Key列所有數(shù)據(jù)只有一個相同的值X,那么此方法的結(jié)果會退化為笛卡爾積結(jié)果,時間復(fù)雜度會退化到[O(1NCi)]。但是,在實際條件下很難有這種情況發(fā)生,并且如果關(guān)鍵字完全相同,那么結(jié)果數(shù)據(jù)集的數(shù)據(jù)量為[1NCi]條,此次聯(lián)查不論在任何方法下復(fù)雜度都不會小于[O(1NCi)]??梢钥吹?,由于復(fù)雜度數(shù)量級不同,在表規(guī)模較大,并且關(guān)鍵字離散的條件下,本算法的執(zhí)行時間相較于笛卡爾積優(yōu)化算法會大幅縮短,并且很好地解決了空間膨脹問題。
2.2 基于Spark的算法實現(xiàn)
本文基于大數(shù)據(jù)分析引擎Spark展開算法實現(xiàn),首先介紹Spark相關(guān)的幾個概念和操作:
SparkContext:Spark程序的入口,可以在聲明時定義各種系統(tǒng)參數(shù),如集群主節(jié)點位置,單個任務(wù)使用的最大內(nèi)存量,需要核心數(shù)等等。
RDD(Resilient Distributed Datasets):彈性分布式數(shù)據(jù)集,它是Spark系統(tǒng)提供的一種分布式內(nèi)存抽象,可以支持基于工作集的應(yīng)用,同時具有數(shù)據(jù)流模型自動容錯,位置感知調(diào)度和可伸縮性的特點。它允許用戶在執(zhí)行任務(wù)時顯示的將工作集緩存在內(nèi)存中,后續(xù)的操作能夠重用工作集,極大地提升了執(zhí)行速度。
TextFile:讀取本地或者分布式文件系統(tǒng)的數(shù)據(jù)并生成RDD。使用方法為RDD=sparkContext.textFile(FilePath)。其中FilePath為字符串類型,可以為本地文件路徑或者h(yuǎn)dfs路徑。
union:將相同格式的兩個RDD合并為一個,使用方式為RDD.union(OtherRDD)。
GroupByKey:是將數(shù)據(jù)按Key排序,并將相同Key的所有其他數(shù)據(jù)合并為一個List。使用方式為RDD.GroupByKey()。
算法實現(xiàn)如下:
輸入:結(jié)果輸出路徑OutputPath,多張表詳細(xì)信,每張表以(表路徑,關(guān)鍵字列的列號)二元組形式描述。
輸出:以文件形式返回分布式文件系統(tǒng)。
實現(xiàn)步驟:
(1) 讀取數(shù)據(jù)并將每張表的數(shù)據(jù)處理為統(tǒng)一格式。使用Spark調(diào)用hdfs數(shù)據(jù)的系統(tǒng)接口TextFile從分布式文件系統(tǒng)中讀取數(shù)據(jù),對于每張表所對應(yīng)的文件,生成文件的惟一標(biāo)識(TableID)并添加到文件的每行數(shù)據(jù)之內(nèi),再通過Map操作處理為固定格式的數(shù)據(jù),即RDD(String, (String, String)),存儲的數(shù)據(jù)為(Key, (TableID, OtherRecord))。
(2) 將多張表的數(shù)據(jù)合并到一起。由于經(jīng)過步驟(1)處理后數(shù)據(jù)格式相同,可以使用RDD的union操作來進(jìn)行合并,這樣合并后的數(shù)據(jù)可以使用Spark本身提供的方法GroupByKey來對數(shù)據(jù)進(jìn)行處理。
(3) 使用GroupByKey將關(guān)鍵字相同的數(shù)據(jù)合并為一條記錄。即將Key相同的數(shù)據(jù)行中的(TableID,Record)放在一個List下。
(4) 檢索數(shù)據(jù),剔除不滿足條件的數(shù)據(jù)。對步驟(3)執(zhí)行過GroupByKey操作的數(shù)據(jù),對每一行數(shù)據(jù)根據(jù)用戶需求的連接方式進(jìn)行數(shù)據(jù)的整理刪除,如INNER JOIN就是對每一個Key判斷該Key對應(yīng)的數(shù)據(jù)是否包含所有表的內(nèi)容,如果是則是滿足條件的結(jié)果,如果缺少某張表的數(shù)據(jù),那么便不滿足要求,對其進(jìn)行刪除操作。而LEFT JOIN和RIGHT JOIN等則只要存在指定表的數(shù)據(jù)就會被保留下來。
(5) 將符合條件的數(shù)據(jù)拆分還原。由于某些表中,相同的Key可能存在多條數(shù)據(jù)與之對應(yīng),需要將這種數(shù)據(jù)還原、補全成多條。如在在學(xué)校學(xué)生的數(shù)據(jù)庫中,同一姓名“A”可能對應(yīng)著多個學(xué)生,這樣在與其他表進(jìn)行以姓名為關(guān)鍵字的聯(lián)查時,“A”的結(jié)果數(shù)據(jù)應(yīng)該為多行,而由于GroupByKey操作會將這些數(shù)據(jù)化為1行,所以需要進(jìn)行拆分,將之還原為多行“A”。而在實現(xiàn)上,對每行數(shù)據(jù)生成若干的ArrayBuffer,然后將這些ArrayBuffer進(jìn)行全乘就可以獲得拆分后的結(jié)果。
(6) 將最終數(shù)據(jù)存入文件系統(tǒng)中。
3 對比實驗
實驗環(huán)境:
集群硬件:5臺實驗機組成的集群環(huán)境,其中主節(jié)點擁有4核心16 GB內(nèi)存,4臺從節(jié)點擁有4核心4 GB內(nèi)存,每個核心擁有3.4 GHz的主頻。
軟件部署:操作系統(tǒng)為Ubuntu 12.04;MySQL為MySQL Ver14.14 Distrib 5.5.29;Hadoop集群為Hadoop?2.2.0;Spark集群為Spark?1.1.0;Hive為0.12;Scala為2.10.4。
實驗方案:本算法需要與現(xiàn)有的關(guān)系型數(shù)據(jù)庫、傳統(tǒng)分布式文件系統(tǒng)處理方案進(jìn)行橫向?qū)Ρ龋陉P(guān)系型數(shù)據(jù)庫可接受的數(shù)據(jù)范圍內(nèi),做出數(shù)據(jù)量從小到大的對比實驗;并在數(shù)據(jù)規(guī)模較大的前提下,與SPARKJOIN[7]和Hive進(jìn)行對比實驗。
與關(guān)系型數(shù)據(jù)庫對比實驗:設(shè)計5組不同的數(shù)據(jù)規(guī)模的數(shù)據(jù),每組數(shù)據(jù)由2張表構(gòu)成。其中關(guān)系型數(shù)據(jù)庫使用INNER JOIN命令進(jìn)行查找。由于聯(lián)查需要生成2個表條數(shù)相乘的中間數(shù)據(jù)集,所以在規(guī)模分別為10萬條與100萬條數(shù)據(jù)的兩表進(jìn)行聯(lián)查時,會生成[1011]規(guī)模的數(shù)據(jù),并在1 h內(nèi)無法返回結(jié)果。故5組聯(lián)查數(shù)據(jù)數(shù)據(jù)量分別為(1 000[×]1 000),(1 000[×]10 000),(10 000[×]10 000),(10 000×100 000),(100 000×100 000)。
與傳統(tǒng)分布式文件系統(tǒng)處理方案的比較:由于數(shù)據(jù)分發(fā)、I/O等條件的限制,分布式文件系統(tǒng)處理數(shù)據(jù)有一定的數(shù)據(jù)傳遞時間,所以在小規(guī)模數(shù)據(jù)處理方面,數(shù)據(jù)分析時間占比較少,所以需要在一定規(guī)模的數(shù)據(jù)下進(jìn)行橫向?qū)Ρ?。因此設(shè)計5組數(shù)據(jù)進(jìn)行對比實驗。聯(lián)查數(shù)據(jù)規(guī)模分別為(104[×]104),(105[×]105), (106[×]106),(106[×]107),(107[×]107)。2張表的格式如表1所示,其中待聯(lián)查列均是ID列。
表1 表結(jié)構(gòu)
3.1 UGS算法與常用關(guān)系型數(shù)據(jù)庫比較
UGS算法與MySQL比較如圖1所示。表2為UGS算法與MySQL執(zhí)行時間對比。
可以看到,由于磁盤I/O、網(wǎng)絡(luò)I/O、任務(wù)劃分、數(shù)據(jù)分發(fā)收集需要占用一定時間,故在數(shù)據(jù)量較少的情況下,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫仍有著較大的優(yōu)勢,但是在數(shù)據(jù)量增大時,中間數(shù)據(jù)集每增大10倍,關(guān)系型數(shù)據(jù)庫所需處理時間都會增大約10倍,在對一張100 000條記錄的表與1 000 000條記錄的表進(jìn)行聯(lián)查時,MySQL運行了1 h仍未返回結(jié)果。而本文的UGS算法在數(shù)據(jù)量較小時,雖然也需要進(jìn)行幾秒的查詢,但是增長穩(wěn)定,在2張100 000條與100 000條的表進(jìn)行聯(lián)查時,效率比MySQL提升了將近100倍,在數(shù)據(jù)量繼續(xù)增長的條件下,將會有著更大地提升。
圖1 UGS算法與MySQL比較
表2 UGS算法與MySQL執(zhí)行時間對比
3.2 UGS算法與其他大數(shù)據(jù)平臺實現(xiàn)的比較
UGS算法與Hive,SPARKJOIN比較如圖2所示。而對于傳統(tǒng)的大數(shù)據(jù)方案,SPARKJOIN相對于Hive優(yōu)化了任務(wù)分發(fā)收集等步驟,所以效率相差穩(wěn)定為10~20 s之間,而UGS算法相對SPARKJOIN和Hive來講,由于算法復(fù)雜度的優(yōu)化,所以隨著數(shù)據(jù)規(guī)模增大,處理效率相較于Hive和SPARKJOIN有著較大地提升。
3.3 UGS算法在多表聯(lián)查下與當(dāng)前大數(shù)據(jù)實現(xiàn)方案的比較
對于多表聯(lián)查而言,由于UGS算法本身的復(fù)雜度為[O(SumM+SumR×log2SumR)],導(dǎo)致添加一張表所需的時間開銷較少;而當(dāng)前的大數(shù)據(jù)實現(xiàn)方案中,復(fù)雜度為[O(1NCiR)],每添加一張表復(fù)雜度都會提升一個數(shù)量級。所以在多表聯(lián)查下,UGS算法相較于當(dāng)前大數(shù)據(jù)實現(xiàn)方案優(yōu)勢更加明顯。
圖2 UGS算法與Hive,SPARKJOIN比較
表3 UGS算法與SPARKJOIN處理時間
對于3張數(shù)據(jù)規(guī)模均為1 000萬條的表,以相同的Key列進(jìn)行聯(lián)查,SparkJoin使用了182.170 s得出結(jié)果,Hive使用了207.281 s獲取結(jié)果,而UGS算法僅僅需要56.494 s就可以得出結(jié)果,可以看到由于增加表之后增加了任務(wù)的并發(fā)程度,并且更好的數(shù)據(jù)本地化降低了系統(tǒng)I/O開銷,導(dǎo)致了處理時間相對于2張表聯(lián)查增加了僅10 s。實驗表明,在多表聯(lián)查(表數(shù)大于等于3)的條件下,UGS算法相對于當(dāng)前的大數(shù)據(jù)解決方案效率提升更高。
4 結(jié) 語
本文提出了一種基于MapReduce的多表聯(lián)查算法用于實現(xiàn)海量多源數(shù)據(jù)的快速關(guān)聯(lián)查詢。實驗表明,在數(shù)據(jù)量為10萬條與100萬條的兩表聯(lián)查中,UGS算法相較于傳統(tǒng)關(guān)系型數(shù)據(jù)庫有著7~8倍的提升,在每張表數(shù)據(jù)量均為100萬條的兩表聯(lián)查中,相較于關(guān)系型數(shù)據(jù)庫有著100倍的性能提升,隨著數(shù)據(jù)量提升UGS算法的優(yōu)勢有著更明顯的體現(xiàn)。
在基于大數(shù)據(jù)技術(shù)的實現(xiàn)方案比較中,當(dāng)參與關(guān)聯(lián)的單表數(shù)據(jù)規(guī)模達(dá)到1 000萬級時,UGS相對于SPARKJOIN性能提升了約1倍,相對于Hive提升了1倍有余,并且隨著數(shù)據(jù)規(guī)模增大、待聯(lián)查表數(shù)量增多性能提升將更為明顯。
參考資料
[1] WHITE T. Hadoop: the definitive guide [M]. 3rd ed. BeiJing: OReilly Media, 2013.
[2] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large clusters [J]. Communications of the ACM: 50th Anniversary Issue, 2008, 51(1): 107?113.
[3] VARDI M. The complexity of relational query languages [C]// Proceedings of the fourteenth annual ACM symposium on Theory of computing. USA: ACM, 1982: 137?146.
[4] THUSOO A, SARMA J S, JAIN N, et al. Hive: a warehousing solution over a map?reduce framework [J]. Proceedings of the VLDB Endowment, 2009, 2(2): 1626?1629.
[5] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: A fault?tolerant abstraction for in?memory cluster computing [C]// Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation. [S.l.]: USENIX Association, 2012: 2?12.
[6] YANG H, DASDAN A, HSIAO R L, et al. Map?reduce?merge: simplified relational data processing on large clusters [C]// Proceedings of the ACM SIGMOD International Conference on Management of Data. New York: ACM, 2007: 1029?1040.
[7] LUO Yi, WANG Wei, LIN Xuemin. Spark: A keyword search engine on relational databases [C]// Proceedings of 2013 IEEE 29th International Conference on Data Engineering (ICDE). [S.l.]: IEEE, 2008: 1111?1118.
[8] CYGANIAK R. A relational algebra for SPARQL, HPL?2005?170 [D]. Bristol: Digital Media Systems Laboratory, 2005.