劉 宇周 虎
(1.南京烽火星空通信發(fā)展有限公司 南京 210000)(2.武漢郵電科學研究院 武漢 430074)
隨著電子商務的高速發(fā)展和普及應用,個性化推薦的推薦系統(tǒng)已成為一個重要研究領域。個性化推薦算法是推薦系統(tǒng)中最核心的技術,在很大程度上決定了電子商務推薦系統(tǒng)性能的優(yōu)劣,決定著是否能夠推薦用戶真正感興趣的信息,而面對用戶不斷提升的需求,推薦系統(tǒng)不僅需要正確的推薦,還要實時地根據(jù)用戶的行為進行分析并推薦最新的結果。實時推薦系統(tǒng)的任務就是為每個用戶實時地、精準地推送個性化的服務,甚至到達讓用戶體會到推薦系統(tǒng)比他們更了解自己的感覺。
推薦系統(tǒng)要處理的數(shù)據(jù)量是巨大的。為了快速滿足用戶對消息的需求,推薦系統(tǒng)必須有大數(shù)據(jù)處理能力。第一代大數(shù)據(jù)處理框架Hadoop以前一直被國內(nèi)外推薦系統(tǒng)作為解決方案,但是隨著推薦算法的進步,Hadoop的Map Reduce計算模型已經(jīng)難以滿足性能要求。大數(shù)據(jù)下基于Hadoop平臺構建的推薦系統(tǒng)存在著計算緩慢,不能快速地處理數(shù)據(jù),不適合實時計算、多次迭代訓練協(xié)同過濾算法模型,無法根據(jù)用戶實時行為作出推薦的問題[6]。針對以上問題,本文設計和實現(xiàn)基于Spark平臺的實時推薦系統(tǒng)。
本文采用Kafka作為消息的訂閱-發(fā)布,用Spark Streaming做實時計算,不斷從Kafka消費數(shù)據(jù),訓練模型,做出推薦,推薦的結果存放在HDFS中。其次在統(tǒng)一數(shù)據(jù)源的基礎上,采用基于Spark的矩陣分解推薦模型進行離線訓練,提升離線推薦訓練的效率;進而在離線推薦的基礎上,提出一種使用Spark Streaming實時流技術對數(shù)據(jù)進行處理,并將離線推薦結果與實時推薦結果通過統(tǒng)一介質融合的方案,實現(xiàn)對用戶隱式行為進行實時推薦反饋的功能[7]。 系統(tǒng)架構圖1。
圖1 系統(tǒng)設計流程圖
Spark基于內(nèi)存計算,適合迭代,計算速度快,但是其數(shù)據(jù)量大的時候經(jīng)常存在數(shù)據(jù)傾斜、任務卡住的問題。Spark Shuffle數(shù)據(jù)混洗、網(wǎng)絡間數(shù)據(jù)傳輸較慢,是影響大數(shù)據(jù)集群性能的重要指標[8]。
Spark靜態(tài)內(nèi)存管理機制下,儲存內(nèi)存、Shuffle內(nèi)存和Unroll內(nèi)存三部分,儲存內(nèi)存和執(zhí)行內(nèi)存共享一塊空間,可以動態(tài)占用對方的空閑區(qū)域,但用戶可以在應用程序啟動前進行配置[15],為了避免內(nèi)存溢出一般只使用90%,通過Spark.storage.safety Fraction控制[9]。Spark將要處理的數(shù)據(jù)儲存在Stor?age部分,這個部分占Safe的60%,通過Spark.stor?age.memory Fraction控制。Shuffle可用的內(nèi)存大小占Safe的20%,由spark.shuffle.menory控制,Unroll menory用作數(shù)據(jù)的序列化和反序列化,由spark.storage.unroll Fraction控制,占Safe得的20%[10]。內(nèi)存分配如圖2。
圖2 Spark內(nèi)存分配
Spark自身的Shuffle內(nèi)存分配算法試圖為內(nèi)存池中每一個Stage中的每一個Task平均分配內(nèi)存,但是在實驗中發(fā)現(xiàn),由于各Stage中Task大小不一樣對于內(nèi)存需求的不同導致了內(nèi)存的不足和浪費,使Spark集群運行效率較低、卡頓、卡住等問題。針對上述問題,根據(jù)應用中的實際情況,本文采用了一種根據(jù)Task大小和內(nèi)存溢出情況,分多級應用來拆分一個應用,盡量使任務大小相近的在一個應用中,這樣有效地避免內(nèi)存分配不足和浪費的情況[14]。
基于模型的協(xié)同過濾推薦就是基于樣本的用戶喜好信息,訓練一個推薦模型,然后根據(jù)實時的用戶喜好的信息進行預測,計算推薦。
對于一個users-products-rating的評分數(shù)據(jù)集,ALS會建立一個userproduct的mn的矩陣(其中,m為users的數(shù)量,n為products的數(shù)量)[2],如圖3。
圖3 評分數(shù)據(jù)集
這個矩陣的每一行代表一個用戶(u1,u2,…,u9)、每一列代表一個產(chǎn)品(v1,v2,…,v9)。用戶隔天產(chǎn)品的打分在1~9之間。但是在這個數(shù)據(jù)集中,并不是每個用戶都對每個產(chǎn)品進行過評分,所以這個矩陣往往是稀疏的[3],用戶i對產(chǎn)品j的評分往往是空的。ALS所做的事情就是將這個稀疏矩陣通過一定的規(guī)律填滿,這樣就可以從矩陣中得到任意一個user對任意一個product的評分,ALS填充的評分項也稱為用戶i對產(chǎn)品j的預測得分。
ALS算法的核心就是將稀疏評分矩陣分解為用戶特征向量矩陣和產(chǎn)品特征向量矩陣的乘積交替使用最小二乘法逐步計算用戶/產(chǎn)品特征向量[13],使得差平方和最小,通過用戶/產(chǎn)品特征向量的矩陣來預測某個用戶對某個產(chǎn)品的評分[11]。
最小交替二乘法確定特征值原理如下:
在式(1)中,a表示評分數(shù)據(jù)集中用戶i對產(chǎn)品j的真實評分,另外一部分表示用戶i的特征向量(轉置)產(chǎn)品j的特征向量(這里可以得到預測的i對j的評分)用真實評分減去預測評分然后求平方,對下一個用戶,下一個產(chǎn)品進行相同的計算,將所有結果累加起來[4]。
但是這里之前問題還是存在,就是用戶和產(chǎn)品的特征向量都是未知的,這個式子存在兩個未知變量,解決的辦法是交替的最小二乘法。首先對于上面的公式,定義為式(2):
為了防止過度擬合,加上正則化參數(shù):
式中I表示一個d*d的單位矩陣[12]。
首先用一個小于1的隨機數(shù)初始化V根據(jù)式(4)求U,此時就可以得到初始的UV矩陣了,計算上面說過的差平方和根據(jù)計算得到的U和式(5),重新計算并覆蓋V,計算差平方和,反復進行以上兩步的計算,直到差平方和小于一個預設的數(shù),或者迭代次數(shù)滿足要求則停止,取得最新的UV矩陣,則原本的稀疏矩陣R就可以用R=U(V)T來表示了[5]。
本文中搭建的實時推薦系統(tǒng)采用三臺Linux服務器,其中一個Master節(jié)點和兩個Worker節(jié)點,系統(tǒng)為Centos6.5,具體參數(shù)如圖4。
圖4 環(huán)境參數(shù)
本實驗采用的是Movie Lens數(shù)據(jù)集(數(shù)據(jù)集含有來自6000名用戶對4000部電影的100萬條評分數(shù)據(jù))。
測試部分:測試最佳的參數(shù),如隱性因子個數(shù),正則等,測試在Spark Streaming框架上算法的可用性。將整個數(shù)據(jù)集上傳至HDFS中,在spark程序中讀取ratings.dat文件,并隨機劃出80%作為訓練數(shù)據(jù)集,20%作為測試數(shù)據(jù)集,置隱性因子、正則式參數(shù)列表(由于物理機配置不好,集群能夠支持的最大迭代次數(shù)只有7次,再多就會內(nèi)存溢出,所以這里直接將迭代次數(shù)設置為7),對參數(shù)列表的全排列分別進行模型訓練,并計算MSE、RMSE,結果如圖5。
圖5 Spark實時推薦結果
從Kafka消費數(shù)據(jù),實時訓練協(xié)同過濾模型,實時的進行推薦結果如圖6。
圖6 實時推薦的MSE的值
實驗驗證:根據(jù)表1實驗推薦結果可以得出本Spark實時推薦系統(tǒng)可以正確地進行推薦,準確率MSE提升0.025(與參考文獻[12]相比),根據(jù)圖5可以得出結論隨著實時推薦的進行,實時訓練批次增加,準確率提升。
本文研究基于Spark Streaming實時推薦系統(tǒng)的設計與實現(xiàn)和協(xié)同過濾算法,Spark基于內(nèi)存計算,適合迭代可以滿足實時性要求,在Spark Streaming框架中加入?yún)f(xié)同過濾推薦算法,可以完成實時、動態(tài)的推薦。但是基于Spark的實時推薦還有很多工作要做:1)優(yōu)化Spark內(nèi)存分配策略,使內(nèi)存可以按照需求動態(tài)分配等。2)找到一種能自動選擇最優(yōu)參數(shù)的方式。3)優(yōu)化Spark數(shù)據(jù)傾斜問題。