蔡洪山
摘要:隨著處理數(shù)據(jù)規(guī)模的迅速增長,對算法的執(zhí)行速度要求越來越高。Kmean是聚類分析分析中的一個經(jīng)典算法,雖然其已在Hadoop平臺上有并行化實現(xiàn),但Hadoop的計算模型并不適合像Kmeans這種迭代計算。Spark被看作下一代大數(shù)據(jù)并行處理框架,非常適合進(jìn)行迭代計算。該文論述了Kmeans算法在Spark平臺上的并行原理,給出了實現(xiàn)方法,并通過實驗證明該實現(xiàn)能夠快速完成在大數(shù)據(jù)集上的聚類。
關(guān)鍵詞:Kmeans;并行化;Spark;大數(shù)據(jù);聚類
中圖分類號:TP312 文獻(xiàn)標(biāo)識碼:A 文章編號:1009-3044(2016)04-0074-02
Research of Parallelized Kmeans Algorithm on Spark
CAI Hong-shan
(Anhui University of Science and Technology, School of Computer Science and Engineering,Huainan 232001, China)
Abstract: With the mount of data to deal with growing rapidly, peoples requirement on the execution speed of algorithms is higher and higher. Kmeans algorithm, which is a classical one in cluster analysis, has been implemented on Hadoop platform; but programming paradigm implemented by Hadoop is not good at addressing iterative computation such Kmeans. Spark , which is regarded as the next-generation data processing engine, excels at iterative computation. This paper discusses how Kmeans algorithm is executed parallel on spark and shows the implementation. The result turns out that the method work fine on large data set.
Key words: Kmeans algorithm; parallelization; big data; spark
聚類是一種無監(jiān)督學(xué)習(xí)的過程[1],它是在沒有給定分類的情況下,通過計算數(shù)據(jù)之間預(yù)先指定的屬性上的相識性,將數(shù)據(jù)劃分為相交或不相交的簇。聚類分析作為數(shù)據(jù)挖掘領(lǐng)域中一種工具,已經(jīng)在許多領(lǐng)域廣泛應(yīng)用[2],包括生物學(xué),信息檢索。當(dāng)前,現(xiàn)實和虛擬世界的數(shù)據(jù)產(chǎn)生速度越來越迅猛,聚類計算任務(wù)所面臨的數(shù)據(jù)規(guī)模越來越大,k-means算法是一種常用而有效的聚類算法,但其串行計算方法的時間復(fù)雜度比較高[3],處理能力存在局限性。因此如何實現(xiàn)其并行化以處理海量數(shù)據(jù)是一個很有價值的研究方向。
Hadoop是目前廣泛使用的并行計算平臺[4],但Hadoop的MapReduce比不適合迭代計算。在Hadoop的計算模型中[5],一個任務(wù)只有map和reduce兩個階段,復(fù)雜的計算需要多個的Job完成,Job之間的依賴關(guān)系是由開發(fā)者自己管理的;并且map階段的中間結(jié)果要寫到本地磁盤,這對需要多次迭代才能完成的計算顯然是不合適的。而迭代計算在數(shù)據(jù)處理中是很常見的,尤其在機(jī)器學(xué)習(xí)、數(shù)據(jù)挖掘、信息檢索等領(lǐng)域[6],很多算法都是運用多次迭代完成的。作為新一代并行處理框架,Spark彌補(bǔ)了Hadoop在迭代計算方面的不足,已成為發(fā)展最快的大數(shù)據(jù)處理平臺之一。本文了論述使用Kmeans算法在Spark平臺的并行實現(xiàn)原理并給出了其實現(xiàn)。
1 k-mean聚類算法原理
k-mean算法是一種應(yīng)用范圍非常廣泛的一種聚類方法,它是一種典型的劃分聚類方法。它有一個基本假設(shè):對于每一個類簇(cluster),我們可以選出一個中心點,使得該類簇中的所有的點到該中心點的距離小于到其他類簇的中心的距離。k-means 算法的基本思想是:先隨機(jī)或者按照某種啟發(fā)式方法選擇k個質(zhì)心(cluster centroids)作為起始類簇,把它們做k個類的聚類中心。然后,對數(shù)據(jù)集中的每個對象根據(jù)其與各個類簇中心的距離,將它劃分與其最近的簇,然后重新計算每個簇的平均值,更新聚類中心。這個過程不斷重復(fù),直到準(zhǔn)則函數(shù)收斂。
算法的具體步驟描述如下。
輸入:訓(xùn)練數(shù)據(jù)集 [D=] [{x1,x2,...,xn}] ,目標(biāo)簇個數(shù)k
輸出:類簇集合 [{N1,N2,...,Nk}]
1) 從訓(xùn)練數(shù)據(jù)集[D]中隨機(jī)選擇k個數(shù)據(jù)點 [μ1,μ2,μ3,...,μk∈D]作為k個類簇的中心
2) 重復(fù)下面的步驟,直到(3)式的函數(shù)值不在發(fā)生變化。
① 對每個樣本i,計算其應(yīng)該屬于的類:
[c(i)=argminj||xi-μj||] (1)
②對每個類簇j,重新計算該類的質(zhì)心:
[uj=i=1nsign(c(i)=j)xii=1nsign(c(i)=j)] (2)
在以上兩式中,[c(i)∈{1,2,...,k}] 表示距離數(shù)據(jù)點[xi] 最近的質(zhì)心[uj] 所屬的類簇,[sign(c(i)=j)] 表示當(dāng)[c(i)] 與j 相等是值為1,否則值為零。
k-mean算法目標(biāo)是使各個類簇中的數(shù)據(jù)點與所在類簇質(zhì)心的誤差平方和SSE(Sum of Squared Error)達(dá)到最小,這也是評價K-means算法最后聚類效果的評價準(zhǔn)則,準(zhǔn)則函數(shù)表示為
[J(c,μ)=i=1n||xi-μc(i)||] (3)
2 k-mean聚類算法在Spark平臺的并行化
2.1 Spark并行計算框架
Spark是由加州大學(xué)伯克利分校 AMP 實驗室開發(fā)的一個基于內(nèi)存并行集群計算框架,可用來構(gòu)建大型的、低延遲的數(shù)據(jù)分析應(yīng)用程序。Spark最核心的概念是RDD(Resilient Distributed Datasets),它表示已被分區(qū)、不可變的并能夠被并行操作的數(shù)據(jù)集合,不同的數(shù)據(jù)集格式對應(yīng)不同的RDD實現(xiàn)。RDD可以緩存(cache)到內(nèi)存中,每次對RDD數(shù)據(jù)集的操作之后的結(jié)果,都可以存放到內(nèi)存中,下一個操作可以直接從內(nèi)存中輸入,不像Hadoop的MapReduce那樣需要將中間結(jié)果寫到磁盤,因而省去了大量的磁盤IO操作。這對于迭代運算比較常見的機(jī)器學(xué)習(xí)算法、交互式數(shù)據(jù)挖掘來說,效率無疑會大大提升。
如圖1所示,spark使用master-slave的架構(gòu)[7],它由一個中心協(xié)調(diào)者和許多分布式的worker組成。這個中心協(xié)調(diào)者被稱為驅(qū)動程序(driver)。驅(qū)動程序和大量分布的worker進(jìn)行通信,這些worker則被稱為執(zhí)行器(executor)。驅(qū)動程序運行在自己的java進(jìn)程中,而每一個執(zhí)行器也都是獨立的java進(jìn)程。一個驅(qū)動程序和它的多個執(zhí)行器共同組成了一個Spark應(yīng)用。
2.2 基于Spark 的k-mean算法的并行原理
如前所述,RDD(Resilient Distributed Datasets)是Spark的核心抽象,它是不可變的基于內(nèi)存的數(shù)據(jù)結(jié)構(gòu),是對要處理的數(shù)據(jù)和數(shù)據(jù)操作的封裝,任何數(shù)據(jù)在Spark中都被表示為RDD。RDD中的數(shù)據(jù)是可分區(qū)存儲的,即將邏輯上作為一個整體的數(shù)據(jù)分為若干部分,這樣不同分區(qū)(partition)的數(shù)據(jù)就可以分布在不同的機(jī)器上,可以同時被并行處理。因此,Spark應(yīng)用程序所做的無非是把需要處理的數(shù)據(jù)轉(zhuǎn)換為RDD,然后對RDD進(jìn)行一系列的變換和操作從而得到結(jié)果,即創(chuàng)建RDD,轉(zhuǎn)換現(xiàn)有的RDD,對RDD進(jìn)行運算得到計算結(jié)果。
可以通過以下兩種方式創(chuàng)建RDD:一個是由Scala內(nèi)建集創(chuàng)建(如Aarry、Collecton、List等);二是由外部文件系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS、Cassandra、HBase、Amazon S3等,Spark可以支持TextFile,SequenceFiles以及其它任何Hadoop輸入格式。RDD創(chuàng)建后,就可以在RDD上進(jìn)行數(shù)據(jù)處理。RDD支持兩種操作:轉(zhuǎn)換(transformation),即從現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個新的數(shù)據(jù)集;動作(action),即在數(shù)據(jù)集上進(jìn)行計算后,返回一個最終的值給驅(qū)動程序(Driver)。 例如,map就是一種transformation,它將數(shù)據(jù)集每一個元素都傳遞給函數(shù),并返回一個新的分布數(shù)據(jù)集表示結(jié)果。reduce則是一種action,通過一些函數(shù)將所有的元素疊加起來,并將最終結(jié)果返回給Driver程序。
k-means 算法需要多次相同操作的迭代才能得到最終結(jié)果,這正好可以利用RDD實現(xiàn)。首先載入數(shù)據(jù),生產(chǎn)RDD,對于RDD每個分區(qū)里的數(shù)據(jù),隨機(jī)選出k個樣本的點作為初始的k個聚類中心,然后計算每個樣本所屬類簇,最后再更新每個類簇的聚類中心,這樣就完成了一次迭代。重復(fù)進(jìn)行多次迭代,直到達(dá)到最大的迭代次數(shù)或者準(zhǔn)則函數(shù)收斂。由于RDD的每個分區(qū)可以分布在多個不同的機(jī)器上,對不同的分區(qū)的操作可以并行的進(jìn)行,這樣就實現(xiàn)了算法的并行化。在上面的迭代過程中,中間結(jié)果是存在與內(nèi)存中的,這省去了很多對磁盤的IO操作,而IO操作是一種在時間上比較昂貴的操作,因此算法的效率會比較高。
Spark框架本身使用Scala語言開發(fā),提供了Java、Python、Scala等語言接口。Scala 是一種運行與Java虛擬機(jī)的編程語言,它支持面向?qū)ο蠛秃瘮?shù)式編程,此外Scala還有強(qiáng)大類型推斷功能,因而與Java相比,Scala程序非常簡潔。使用Scala語言實并行k-means并行算法的核心代碼如下:
var iterationCount = 0
do {
val mappedPoints = points.map(p => (closestCenter(p, centroids), (p, 1)))
val newCentroids = mappedPoints.reduceByKey {
case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2)
}.map {
case (id, (sum, count)) => (id, sum / count)
}.collect
tempDist = 0.0
for ((id, value) <- newCentroids) {
tempDist+= centroids(id).squaredDist(value)
centroids(id) = value
} while (tempDist > convergeDist && iterationCount < maxIteration)
在上面的代碼中, centroids表示K個類的質(zhì)心。在循環(huán)體中,函數(shù)losestCenter從K個質(zhì)心中找出與點p最近的哪一個,并返回哪個類的序號。迭代次數(shù)達(dá)到限制或質(zhì)心不再發(fā)生變化是循環(huán)結(jié)束。
3 實驗與結(jié)果分析
Spark集群有多種運行方式,實驗構(gòu)建的集群采用standalone模式。機(jī)器配置為: Intel Xeon CPU E5310 1.60GHz,48GB內(nèi)存,操作系統(tǒng)為CentOS 5.5。實驗采用ELKI的Data Set Generator產(chǎn)出兩個數(shù)據(jù)集DS1和DS2,數(shù)據(jù)的大小分別為418MB (5079000條記錄)980MB(12302000條記錄)。兩個數(shù)據(jù)集的加速比如圖2所示。
從圖2可以看出,隨著節(jié)點數(shù)的增加,加速比也隨著增加,但增加速度減慢,這是因為節(jié)點間的通信也會增加。在節(jié)點數(shù)量相同的情況下,數(shù)據(jù)集越大,加速效果越明顯。
4 結(jié)束語
Spark平臺是一種新的并行計算框架,被普遍認(rèn)為將會是Hadoop計算框架的替代者。本文分析了Spark平臺的并行計算的原理,探討了如何將Kmeans在Spark平臺并行化,給出了具體實現(xiàn),實驗證明,該實現(xiàn)能夠快速完成對大數(shù)據(jù)集的聚類。
參考文獻(xiàn):
[1] 羅杰斯. 機(jī)器學(xué)習(xí)基礎(chǔ)教程[M].北京:機(jī)械工業(yè)出版社, 2014.
[2] 高榕,李晶,肖雅夫,等. 基于云環(huán)境K-means聚類的并行算法[J]. 武漢大學(xué)學(xué)報:理學(xué)版,2015,61(4): 368-374
[3] 毛典輝. 基于MapRecude的Canopy-kmeans改進(jìn)算法[J]. 計算機(jī)工程與應(yīng)用,2012,48(27):22-26.
[4] 朱為盛,王鵬. 基于Hadoop云計算平臺的大規(guī)模圖像檢索方案[J].計算機(jī)應(yīng)用,2014,34(3): 695-699.
[5] Tom White. Hadoop權(quán)威指南[M].3版. 北京: 清華大學(xué)出版社, 2015.
[6] 唐振坤. 基于Spark的機(jī)器學(xué)習(xí)平臺設(shè)計與實現(xiàn)[D]. 廈門:廈門大學(xué), 2014.
[7] Holden Karau. Learning Spark[M]. Sebastopol, CA:O'Reilly Media,2015-02.
[8] 劉鵬. 基于Spark的大規(guī)模文本k-means并行聚類算法[EB/OL]. http://www.chinacloud.cn/show.aspx?id=20832&cid=28.