張明敏,張功萱,周秀敏
(南京理工大學(xué) 計算機科學(xué)與工程學(xué)院,江蘇 南京210094)
Mahout是Apache軟件基金會 (ASF)旗下的一個開源項目,它是一種機器學(xué)習(xí)軟件庫,提供了一些包括聚類、分類和協(xié)同過濾等機器學(xué)習(xí)領(lǐng)域經(jīng)典的算法,旨在幫助開發(fā)人員更加方便高效地創(chuàng)建智能應(yīng)用程序。Mahout還支持Apache社區(qū)的Hadoop平臺,并且已經(jīng)將一部分算法實現(xiàn)MapReduce并行化,以此來作為一種解決機器學(xué)習(xí)問題的廉價方案[1-4]。
Mahout提供了大量的協(xié)同過濾算法的功能,其中比較典型的兩類算法:基于用戶的協(xié)同過濾 (user CF)和基于物品的協(xié)同過濾 (item CF),都在Mahout中得到了實現(xiàn)。兩種算法都依賴于兩個事物 (用戶或者物品)之間的相似性度量,或者說等同性定義[5]。在協(xié)同過濾算法中,相似性的度量是關(guān)鍵步驟,也是算法效率的瓶頸所在[6]。然而,Mahout中并沒有提供基于MapReduce的相似度計算方法。在實際應(yīng)用中,單機的相似度算法受限于內(nèi)存和效率,無法處理海量的數(shù)據(jù)。
因此,本文引入了Hadoop集群和MapReduce并行計算模型,研究并實現(xiàn)對數(shù)似然相似度算法的并行化。根據(jù)算法自身的特點,采用復(fù)合鍵對和同現(xiàn)矩陣的思想[7]對MapReduce過程進(jìn)行優(yōu)化,然后在Hadoop平臺上運行優(yōu)化后的算法。
MapReduce是谷歌公司在2004 年提出的一種軟件架構(gòu),主要用來解決大規(guī)模數(shù)據(jù)集在集群上的計算問題,它是一種用于處理和生成大數(shù)據(jù)集的編程模型[8]。文獻(xiàn) [9]詳細(xì)地描述了MapReduce的核心思想。
MapReduce框架使用兩種類型的組件來控制作業(yè)的執(zhí)行過程。一個是JobTracker,負(fù)責(zé)作業(yè)的分解和狀態(tài)的調(diào)控。另一個是TaskTracker,負(fù)責(zé)執(zhí)行具體的MapReduce程序[10]。根據(jù)作業(yè)中輸入數(shù)據(jù)的位置,JobTracker把任務(wù)分配給一些TaskTracker,并保證它們協(xié)調(diào)工作。Task-Tracker運行分配到的任務(wù),同時向Jobtracker匯報任務(wù)的進(jìn)展情況。在執(zhí)行具體的程序時,每個MapReduce任務(wù)又分為map和reduce兩個階段,分別負(fù)責(zé)分解任務(wù)和匯總結(jié)果。
在作業(yè)開始之前,存儲在Hadoop 分布式文件系統(tǒng)(HDFS)[11]中的數(shù)據(jù)被切分成很多小的數(shù)據(jù)集或者碎片(split)。每一個split對應(yīng)一個map任務(wù),每個map任務(wù)又會被分配給靠近該split的節(jié)點中的TaskTracker運行。在map階段,map函數(shù)接收鍵值對形式的輸入,經(jīng)過處理產(chǎn)生同樣格式的輸出。然后通過重新洗牌過程[12],將鍵值對分配到各個Reduce節(jié)點。在reduce階段,reduce函數(shù)將具有相同key值的鍵值對合并,然后對value集合進(jìn)行處理,產(chǎn)生一個<key,value>形式的輸出。其原理如圖1所示。
圖1 分布式的MapReduce處理過程
Ted Dunnning在1993年提出一種對數(shù)似然比的概念,主要應(yīng)用于自然文本語言庫中兩個詞的搭配關(guān)系問題。它是基于這樣一種思想,即統(tǒng)計假設(shè)可以確定一個空間的很多子空間,而這個空間是被統(tǒng)計模型的未知參數(shù)所描述。似然比檢驗假設(shè)模型是已知的,但是模型的參數(shù)是未知的。2.1.1 二項分布的對數(shù)似然比
對于二項分布的情況,似然函數(shù)為
式中:H——給定的統(tǒng)計模型,k1,k2,n1,n2——給定實驗結(jié)果的參數(shù)。p1,p2——給定模型的參數(shù)。
假設(shè)二項分布有相同的基本參數(shù)集合 {(p1,p2)|p1=p2},那么對數(shù)似然比λ就是
式中:maxpH ——當(dāng)p取得某值時,統(tǒng)計模型H 的最大值。
當(dāng)p1=,p2=時,分母取得最大值。當(dāng)p =時,分子取得最大值。
所以對數(shù)似然比簡化為
式中:L——二項分布,n——實驗重復(fù)的次數(shù),p——某事件發(fā)生的概率,k——該事件發(fā)生的次數(shù),L(p,k,n)=pk(1-p)n-k。
兩邊取對數(shù)可以將對數(shù)似然比的公式變形為,-2logλ=2[logL(p1,k1,n1)+logL(p2,k2,n2)- logL(p,k1,n1)-logL(p,k2,n2)]。
2.1.2 Mahout中對數(shù)似然相似度算法的實現(xiàn)
由于二項分布的對數(shù)似然比能夠合理地描述兩個事物相似的模型,所以Mahout中利用對數(shù)似然比來計算兩個事物(用戶或者物品)的相似度。對數(shù)似然相似度基于兩個用戶共同評估過的物品數(shù)目,但在給定物品總數(shù)和每個用戶評價物品數(shù)量的情況下,其最終結(jié)果衡量的是兩個用戶有這么多共同物品的 “不可能性”,它是一種不考慮具體偏好值的方法。對數(shù)似然相似相似度算法在Mahout中的具體實現(xiàn)為
其中
entropy(int...elements)實質(zhì)上是一種簡單計算熵值的函數(shù)。以計算用戶1和用戶2的相似度為例,k11表示兩個用戶共同偏好的item 數(shù)量,k12表示用戶1偏好而用戶2不偏好的item 數(shù)量,k21表示用戶2偏好而用戶1不偏好的item 數(shù)量,k22表示用戶1和用戶2都不偏好的item 數(shù)量,可以將這4個變量看成一個二維矩陣),然后計算這個矩陣的行熵 (rowEntropy)、列熵 (columnEntropy)和矩陣的熵(matrixEntropy),從而得出相似度值。
通過上述過程可以發(fā)現(xiàn),相似度的計算最終可以歸結(jié)為計算k11,k12,k21和k22的值。進(jìn)一步分析可以得到,k12=k1-k11,k21=k2-k11,k22=item 總數(shù)-k1-k2+k11。其中k1,k2分別為用戶1和用戶2偏好的物品個數(shù)。區(qū)域具體分布情況如圖2所示。在計算過程中,只需獲得item 總數(shù),用戶1和用戶2分別偏好item 的個數(shù)以及他們共同偏好的物品個數(shù),就能得出所需的4個參數(shù)值。
圖2 4個區(qū)域的分布
下面以計算用戶之間的相似度為例,具體介紹對數(shù)似然相似度算法的并行化過程。以表1中的二維矩陣為例,其中U1,U2,U3為用戶,I1,I2,I3,I4為物品。表格中的數(shù)字1表示某個用戶偏好該物品,空白表示用戶不偏好該物品。
表1 二維偏好矩陣
Loglikelihood相似度并行計算將拆分成4 個MapReduce任務(wù)。第1 個MapReduce任務(wù)計算每個用戶偏好的物品總數(shù)。第2 個MapReduce任務(wù)將偏好某個物品的用戶放到一條記錄中,形成以物品為鍵、偏好該物品的所有用戶為值的倒排列表。第3個MapReduce任務(wù)計算兩兩用戶共同偏好的物品個數(shù),并且記錄物品的總個數(shù)numItems。第4個MapReduce任務(wù)計算相似度。具體過程如圖3所示。
圖3 MapReduce的并行化過程
第1個MapReduce過程稱為倒排索引,其輸入數(shù)據(jù)在文件中的存儲格式為<用戶,物品,偏好值>。首先將文件分割成splits并按行作為程序的輸入,然后將<每行的偏移量,每行的內(nèi)容>形式的鍵值對交付給程序中定義好的map函數(shù)進(jìn)行處理。對每個用戶,以用戶和用戶偏好的物品個數(shù)為鍵,該用戶偏好的所有物品列表為值,中間用分號隔開,這樣我們就能得到如<U1:2,<I1;I2>>、<U2:2,<I2;I3>>以及<U3:3,<I2;I3;I4>>格式的輸出。這樣做的目的是,能夠統(tǒng)計出每個用戶偏好的物品總數(shù),為下面計算k12和k21打下基礎(chǔ)。
第2個MapReduce過程還是倒排索引,其輸入為第1步MapReduce輸出的結(jié)果。map階段以每一個物品為鍵,偏好該物品的用戶和用戶偏好的物品總數(shù)為值作為輸出。然后經(jīng)過重新洗牌過程發(fā)送到Reduce節(jié)點,reduce函數(shù)將具有相同key值 (物品)的所有value組合起來,中間用分號隔開,這樣就能得到如<I1,U1:2>、<I2,<U1:2;U2:2;U3:3>>、<I3,<U2:2;U3:3>>以及<I4,U3:3>格式的輸出。這樣做的好處是,將偏好同一物品的所有用戶都聚集在同一個value下,方便下一步MapReduce任務(wù)的處理。
第3個MapReduce過程主要是計算用戶兩兩之間共同偏好物品的個數(shù)即k11,以及物品的總個數(shù),其輸入為第2步MapReduce輸出的結(jié)果。對每一條記錄,map函數(shù)丟棄掉key值,將value中的用戶兩兩配對,并作為鍵值,value值設(shè)置為1。同時,map函數(shù)通過逐行讀入數(shù)據(jù),記錄下總行數(shù),即物品的總個數(shù)numItems。Reduce函數(shù)將具有相同鍵值的value值相加,得出兩兩用戶共同偏好物品的總個數(shù)。最終可以得到以下輸出:<<U1:2;U2:2>,1>,<<U1:2;U3:3>,1>,<<U2:2;U3:3>,2>。
第4個MapReduce過程真正用來計算用戶兩兩之間的相似度。其以上一步MapReduce輸出的結(jié)果為輸入,從每一條記錄里面能夠提取到k11,k1,k2以及上一步MapReduce過程計算好的物品總個數(shù)numItems。通過k1,k2,numItems能夠分別計算出k12,k21和k22的值,然后調(diào)用函數(shù)loglikelihoodRatio (k11,k12,k21,k22),最終計算出相似度的數(shù)值。
4個MapReduce任務(wù)采用順序組合的方式[13],每個MapReduce任務(wù)都需要配置自己的運行代碼,并按照前后關(guān)系正確的配置輸入/輸出的路徑。程序運行后,會按照MapReduce任務(wù)之間的順序逐個運行作業(yè)。因為前一個MapReduce任務(wù)的輸出要作為后一個MapReduce任務(wù)的輸入,所以需要調(diào)用job.waitForCompletion (true)來保證前一個子任務(wù)執(zhí)行完成后再執(zhí)行下一個子任務(wù)。
初步分析和運行上述4 步MapReduce任務(wù)可以發(fā)現(xiàn),第3步MapReduce任務(wù)執(zhí)行的時間最長,其原因主要是:這一步MapReduce任務(wù)產(chǎn)生大量的鍵值對,而且這些鍵值對無法用combiner處理,Hadoop將它們寫到磁盤上時需要耗費大量的時間。針對一條記錄,假設(shè)同時對一個物品有偏好的用戶數(shù)有n個,那么在第3步map函數(shù)將產(chǎn)生n* (n-1)/2個鍵值對,時間復(fù)雜度為O(n*n)。當(dāng)n=10000時,鍵值對數(shù)目將達(dá)到10億數(shù)量級。即使一臺機器處理一條記錄,也會非常耗費資源,達(dá)不到預(yù)期的效果。而且map階段產(chǎn)生的鍵值對需要傳輸給Reduce節(jié)點,不但增加網(wǎng)絡(luò)通信的開銷,而且使得reduce階段的copy和sort過程非常緩慢。因此,本文根據(jù)Jimmy Lin的單詞同現(xiàn)矩陣的思想,提出一種將大量小的鍵值對合并為較大鍵值對的方法,大幅減少傳送給Reduce節(jié)點的鍵值對數(shù)量。
如圖4所示,針對第2步MapReduce產(chǎn)生的一條記錄<I2,<U1:2;U2:2;U3:3>>,原先產(chǎn)生的許多小鍵值對可以合并成右側(cè)大的鍵值對。然后,在Reduce階段,將具有相同key值的鍵值對進(jìn)行累加,即可獲得一個用戶同其他用戶共同偏好物品的關(guān)系及其具體的個數(shù)。還是假設(shè)同時對一個物品有偏好的用戶數(shù)有n個,那么在這一步map函數(shù)將產(chǎn)生n-1 個鍵值對,時間復(fù)雜度為O(n),所產(chǎn)生的鍵值對數(shù)量遠(yuǎn)遠(yuǎn)小于原來的步驟。此時的MapReduce相應(yīng)的偽代碼如圖5所示。
圖4 第3步MapReduce的優(yōu)化過程
圖5 MapReduce過程的偽代碼
Hadoop集群為建立在openstack云平臺上的6臺虛擬機,其中,1臺為主節(jié)點 (master),5臺為從節(jié)點(slave)。每臺虛擬機的主要配置如下:兩個虛擬內(nèi)核,內(nèi)存為2G,磁盤為10G。Java版本為Java-7-oracle,Linux系統(tǒng)為Ubuntu12.04,Hadoop版本為1.2.1。
實驗數(shù)據(jù):本實驗采用的數(shù)據(jù)集來自于GroupLens提供的電影評分集。該數(shù)據(jù)集包含6000 多位用戶對3900 多部電影的一百多萬條評分記錄。評分?jǐn)?shù)據(jù)集中包含用戶ID,電影ID,評分和時間戳。用戶ID 的區(qū)間為1 到6040,電影ID 的區(qū)間為0到3952,評分區(qū)間為0到5,每個用戶至少對20部電影的進(jìn)行評分。因為,對數(shù)似然相似度是處理無評分?jǐn)?shù)據(jù)的,所以可以將用戶對某部電影評分,視為用戶看過該電影,用戶沒有對某部電影,視為用戶沒有看過該電影。
實驗設(shè)置:本實驗采用Eclipse作為集成開發(fā)環(huán)境。首先,在單機環(huán)境中,調(diào)用Mahout中計算對數(shù)似然相似度的函數(shù),統(tǒng)計運行時間。然后,分別采用1,2,3,4,5 個節(jié)點的集群,運行本文所提出的并行化的算法,統(tǒng)計運行時間。最后,將單機運行時間與集群運行時間進(jìn)行比較。
實驗結(jié)果:由圖6可以看出,當(dāng)節(jié)點數(shù)為1~2個的時候,集群運行的效率遠(yuǎn)低于單機運行效率。其主要原因有兩個:對于集群而言,一是任務(wù)的啟動和交互占據(jù)一定的時間,尤其當(dāng)實際的計算量比較小時,集群的優(yōu)勢無法體現(xiàn)出來;二是數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)挠绊憽螜C版的相似度算法首先會將數(shù)據(jù)全部讀入內(nèi)存,然后進(jìn)行計算,所以處理的速度比較快。而在Hadoop集群中,Map函數(shù)先將數(shù)據(jù)寫到磁盤上,然后Reduce函數(shù)再從磁盤上讀取數(shù)據(jù),增加了數(shù)據(jù)傳輸?shù)臅r間。但是當(dāng)集群節(jié)點數(shù)大于3個的時候,集群的優(yōu)勢就開始逐漸體現(xiàn)出來。由此可知:當(dāng)節(jié)點數(shù)達(dá)到一定數(shù)量時,集群的運行效率要優(yōu)于單機的運行效率。
圖6 集群和單機運行時間的對比
加速比S=Ts/Tm 是衡量并行系統(tǒng)或程序并行化性能的重要指標(biāo)。其中,S是加速比,Ts是單機算法的運行時間,Tm 是m 個節(jié)點運行的時間。由圖7 可以看出,加速比隨著集群節(jié)點數(shù)的增加而增大,當(dāng)節(jié)點數(shù)大于4時,加速比大于1。這說明,基于Hadoop集群的對數(shù)似然相似度算法具有較好的加速比。而且,隨著集群節(jié)點數(shù)量的增加,這種優(yōu)勢將會更加明顯。
圖7 集群的加速比
Hadoop集群和MapReduce編程模型是當(dāng)前解決海量數(shù)據(jù)問題的主要解決方案,Mahout結(jié)合Hadoop將使得數(shù)據(jù)的挖掘和分析更加高效和便捷。本文主要探討了Mahout中對數(shù)似然相似度算法的并行化問題,并使用MapReduce編程模型在Hadoop 平臺上實現(xiàn)了該算法,并且優(yōu)化了其中的MapReduce過程。相關(guān)的實驗結(jié)果表明,在處理大數(shù)據(jù)集時,并行算法的運行效率要優(yōu)于單機算法的運行效率。集群規(guī)模越大,算法的執(zhí)行效率越高,加速比越明顯。
[1]DanEr CHEN.The collaborative filtering recommendation algrorithm based on BP netral networks[J].Computer Society of IEEE,2009,121:234-235.
[2]Apache Mahout.The apache software foundation [EB/OL].[2012-02-06].http://mahout.apache.org.
[3]Apache Hadoop.The apache software foundation [EB/OL].[2012-04-01].http://hadoop.apache.org.
[4]Esteves RM,Rong C.K-means clustering in the cloud-a mahout test [J].Computer Society of IEEE,2011,136:515-516.
[5]Sean Owen,Robin Anil,Ted Dunning,et al.Mahout in action [M].US:Manning Publications,2010:41-42.
[6]MA Ning.Research and implementation of recommendation system based on mahout[D].Lanzhou:Lanzhou University,2012:30 (in Chinese).[馬寧.基于Mahout推薦系統(tǒng)的研究與實現(xiàn) [D].蘭州:蘭州大學(xué),2012:30.]
[7]Jimmy Lin,Chris Dyer.Data-intensive text processing with MapReduce [M].US:University of Maryland,College Park,2010:39-52.
[8]Dean J,Ghemawat S.MapReduce:Simplified data processing on large clusters [J].Communications of ACM,2008,51(1):107-113.
[9]LANG Weimin,YANG Depeng.MapReduce technology on cloud computing [J].Telecommunications Information,2012,3:3-5 (in Chinese).[郎為民,楊德鵬.云計算中的MapReduce技術(shù) [J].電信快報,2012,3:3-5.]
[10]Narayan S,Bailey S,Daga A.Hadoop acceleration in an open flow-based cluster [J].Computer Society of IEEE,2012,76:535-538.
[11]Konstantin Shvachko,Hairong Kuang,SanjayRadia,et al.The Hadoop distributed file system [C]//Mass Storage Systems and Technologies,2010:1-10.
[12]YAN Yonggang,MA Tinghuai,WANG Jian.Parallel implementing KNN classification algorithm using MapReduce programming mode[J].Journal of Nanjing University of Aeronautics and Astronautics,2013,45 (4):551-554 (in Chinese).[閆永剛,馬廷淮,王建.KNN 分類算法的MapReduce并行化實現(xiàn) [J].南京航空航天大學(xué)學(xué)報,2013,45(4):551-554.]
[13]LIU Peng.Hadoop in action [M].Beijing:Electronic Industry Press,2011:142-143 (in Chinese). [劉鵬.實戰(zhàn)Ha-doop [M].北京:電子工業(yè)出版社,2011:142-143.]