王立友 鄭海鵬
(淮南聯(lián)合大學計算機系 安徽淮南 232001)
在現(xiàn)實生活中,人們經(jīng)常會將類似的事物或事情劃分為同一類。即所謂的“物以類聚、人以群分”。在計算機程序設計過程中,分類算法是一種監(jiān)督學習方法。但是,在許多情況下,如果數(shù)據(jù)通過預處理滿足分類算法的要求,則無法滿足上述條件,特別是在處理大量數(shù)據(jù)時,成本非常大,您可以考慮使用K-Means聚類算法。K-Means算法屬于無監(jiān)督學習方法[1],作為較為經(jīng)典的分類算法,在很多應用領域中都有其自身的優(yōu)勢,因為它與分類進行比較,所以聚類不依賴于預定義的類和類標簽的訓練實例。
K-Means算法,也稱為K-Means方法或K均值法,具有良好的可擴展性和快速收斂性[2],是最廣泛使用的聚類算法。K均值算法是不斷改變聚類中心點的過程(聚類過程如圖1所示)。計算所有成員到每個質(zhì)心的距離,然后重新劃分其內(nèi)部集群成員。將對應成員劃分給到其距離最小的那個質(zhì)心。K值表示簇的質(zhì)心數(shù)(即聚類中心的數(shù)量);K-means可以自動將對應的成員分配給不同的類,但是不可能決定要分割哪些類。K必須是小于訓練集中樣本數(shù)的正整數(shù)。
假設輸入成員樣本集合為C=X1,X2,…,Xn-1,Xn(含有n個成員樣本),K-means算法具體的過程如下:
1)選擇初始化過程中中心點的個數(shù)(K值):A1,A2,…,Ak;
2)計算成員樣本X1,X2,…,Xn-1,Xn到A1,A2,…,Ak的各自的距離;選取各個成員樣本到各個中心點的最小距離,將樣本成員劃分到最小距離的中心點中(如式1);
1<=j<=k
3)重新為中心點Aj為隸屬該類別的所有樣本的均值;
4)重復2、3以上兩個步驟,直到迭代終止(即上一次和本次聚類的中心點不再發(fā)生變化)。
圖1 K-Means聚類示意圖
Hadoop是Apache Foundation開發(fā)的分布式系統(tǒng)基礎架構[3]。其核心是MapReduce和HDFS(分布式文件系統(tǒng))。在現(xiàn)代社會中,只要和海量數(shù)據(jù)有關的應用領域都會出現(xiàn)Hadoop 的身影[4]。隨著Hadoop 版本的不斷發(fā)展,其自身的功能正在變得越來越強大,并且已經(jīng)形成了相對完整的系統(tǒng)。如圖2所示。
圖2 hadoop平臺架構
(一)HDFS 分布式文件存儲系統(tǒng)。HDFS 代表Hadoop分布式文件系統(tǒng),是一種具有高容錯性,高吞吐量和高可擴展性的分布式文件存儲系統(tǒng)。HDFS 在存儲數(shù)據(jù)時使用塊存儲。它將節(jié)點分為兩類:NameNode 和DataNode,其中NameNode是HDFS集群的主節(jié)點。DataNode是HDFS集群從節(jié)點,用來存儲真正的數(shù)據(jù)。每個數(shù)據(jù)塊可以在多個DataNode上存儲多個副本,以提高數(shù)據(jù)安全性。
(二)MapReduce 框架。MapReduce 是一個開源并行計算框架,主要由兩個函數(shù)組成:Map()和Reduce()。它可以將大型任務分解為相對相反的多個子任務,并且可以并行處理。Map()函數(shù)負責指定數(shù)據(jù)集上的相反元素,并生成一系列[key,value]作為中間結果[5]。而Reduce()函數(shù)則對中間結果中相同Key的所有“value”進行規(guī)約,并得到最終結果。
(三)HBase數(shù)據(jù)庫。HBase是一個針對結構化數(shù)據(jù)的可伸縮、高性能的非關系型分布式動態(tài)模式數(shù)據(jù)庫,與傳統(tǒng)數(shù)據(jù)庫相比,HBase采用BigTable數(shù)據(jù)模型。在Hadoop平臺上運行時,HBase 中的表可用作MapReduce 任務的輸入和輸出,并可通過Java API檢索。
(四)Hive數(shù)據(jù)倉庫。Hive是數(shù)據(jù)倉庫管理文件。它提供完整的SQL 查詢功能,可將數(shù)據(jù)文件映射到單個數(shù)據(jù)表中。將其轉(zhuǎn)化為MapReduce任務進行運行。
(一)K-Means算法實現(xiàn)過程解析。
1.使用全局變量在最后一次迭代后存儲質(zhì)心。
2.使用Hadoop中的地圖框架(map)來計算每個質(zhì)心和樣本之間的距離,并獲得最接近樣本的質(zhì)心。
3.在Hadoop 中使用reduce 框架,輸入鍵(Key)是質(zhì)心,值(Value)是其他樣本。
4.將先前質(zhì)心和最新質(zhì)心作比較,若發(fā)生了變化,就需要程序繼續(xù)運行,進行下一次迭代,否則將停止迭代,終止當前程序,輸出最終的聚類結果。
(二)算法實現(xiàn)過程需要解決的問題。本文的思路基本上是按照上面的步驟來做的,有以下幾個問題急需解決:
1.Hadoop沒有自定義全局變量,因此無法定義存儲質(zhì)心的全局變量,因此需要另一種想法,即質(zhì)心存儲在文件中。
2.存儲質(zhì)心的文件讀取。我們選擇在主main函數(shù)中讀取質(zhì)心,然后將質(zhì)心設置保存到配置中。configuration 在map和reduce均可讀
3.如何比較質(zhì)心的變化,你可以在主要的主函數(shù)中進行比較,讀取質(zhì)心的文件和最新的質(zhì)心并進行比較。這個方法是可以實現(xiàn)的,我們使用自定義計數(shù)器,計數(shù)器是一個全局變量,可在地圖中讀寫并減少,在上面的想法中,我們看到reduce具有最后一次迭代的質(zhì)心和計算的質(zhì)心,因此完全可以直接在reduce中進行比較。如果發(fā)生變化,counter加1。
(三)基于手肘法的K-Means 算法中最佳K 值的確定。本文選取手肘法用來確定最佳K值[6]。手肘法的核心指標是平方誤差的總和:SSE(如式2所示),其中,Ci是第i個聚類中心,p是聚類中心的元素,mi是Ci的質(zhì)心(Ci中所有元素的平均值),SSE則可以表示聚類的質(zhì)量。
SSE 方法的原理為:隨著聚類中心數(shù)量的增加,聚類將更加細化,每個聚類中心的聚合度將逐漸增加,因此誤差SSE將逐漸減小。當K小于實際簇數(shù)時,隨著K值的增加,簇中心的聚集程度將大大加快。加速誤差的平方和SSE減小的幅度。SSE 的下降趨勢急劇減少,然后趨于平緩,即SSE和K之間的關系是肘形。而對應肘部的K值為最佳。
為測試數(shù)據(jù)選擇最佳簇編號K值,具體實施過程為K=1-6,針對對每一個K值記下對應的SSE,制作K和SSE關系圖,并選擇與肘相對應的K 值作為最佳簇數(shù)。如3 所示:即K=4聚類結果最優(yōu)。
圖3 K與SSE關系圖
(四)Hadoop實現(xiàn)K-Means算法的主要代碼。
1.定義Center類、初始化質(zhì)心.首先定義一個Center類,它主要存儲質(zhì)心數(shù)量K值,以及兩個從HDFS讀取質(zhì)心文件的方法。一個用于讀取初始質(zhì)心,一個用于在每次迭代后讀取質(zhì)心文件夾。這個是在文件夾中的,代碼如下:
public class Center{
protected static int k=4;//質(zhì)心的個數(shù)(由手肘法確定K=4為最佳)
/**
從初始質(zhì)心文件加載質(zhì)心并返回字符串,由質(zhì)心之間的制表符分隔
*/
public String loadInitCenter(Path path) throws IOException {
StringBuffer sb = new StringBuffer();
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FSDataInputStream dis = hdfs.open(path);
LineReader in = new LineReader(dis,conf);
Text line = new Text();
while(in.readLine(line) >0) {
sb.append(line.toString().trim());
sb.append(" "); }
return sb.toString().trim();}
2.每次迭代從質(zhì)心文件中讀取質(zhì)心并返回字符串代碼
public String loadCenter(Path path) throws IOException{
StringBuffer sb = new StringBuffer();
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileStatus[]files = hdfs.listStatus(path);
for(int i = 0; i < files.length; i++) {
Path filePath = files[i].getPath();
if(!filePath.getName().contains("part")) continue;
FSDataInputStream dis = hdfs.open(filePath);
LineReader in = new LineReader(dis,conf);
Text line = new Text();
while(in.readLine(line) >0) {
sb.append(line.toString().trim());
sb.append(" ");}}
return sb.toString().trim();}}
public class KmeansMR {
private static String FLAG = "KCLUSTER";
public static class TokenizerMapper extends Mapper