倪政君,夏哲雷
(中國計(jì)量大學(xué) 信息工程學(xué)院,浙江 杭州 310018)
聚類、分類、關(guān)聯(lián)規(guī)則等是從大量數(shù)據(jù)中發(fā)現(xiàn)有價(jià)值信息的數(shù)據(jù)挖掘算法.其中關(guān)聯(lián)規(guī)則挖掘是數(shù)據(jù)挖掘的重要研究方向,主要研究從事務(wù)數(shù)據(jù)庫、關(guān)系型數(shù)據(jù)庫或數(shù)據(jù)倉庫等海量數(shù)據(jù)的項(xiàng)集之間發(fā)現(xiàn)有價(jià)值的頻繁出現(xiàn)的數(shù)據(jù)項(xiàng)集合[1].
關(guān)聯(lián)規(guī)則挖掘擁有諸多算法,如Apriori[2]、Eclat[3]、FP-Growth[4]等,大多數(shù)的關(guān)聯(lián)規(guī)則算法都是通過掃描數(shù)據(jù)用于找到頻繁項(xiàng)集.Apriori算法會對每一個(gè)候選項(xiàng)集進(jìn)行數(shù)據(jù)掃描判斷是否是頻繁項(xiàng)集,并通過維度數(shù)較低的k項(xiàng)集迭代生成維度數(shù)較高的k+1項(xiàng)集,最終獲得所有的頻繁項(xiàng)集.Apriori的原理和實(shí)現(xiàn)都較為簡單,因此在數(shù)據(jù)挖掘中得到大量應(yīng)用,但是當(dāng)處理大數(shù)據(jù)量的數(shù)據(jù)集時(shí),單機(jī)串行處理的Apriori算法的挖掘速度會出現(xiàn)下降.因此有些研究人員提出了如CD、DD和CaD等并行Apriori算法[5],并行Apriori算法同時(shí)通過多臺機(jī)器進(jìn)行并行計(jì)算,能夠處理大數(shù)據(jù)量的數(shù)據(jù)集,但此類并行Apriori算法的實(shí)現(xiàn)較復(fù)雜,實(shí)現(xiàn)成本較高,并且有諸如同步、數(shù)據(jù)復(fù)制等問題.因此,現(xiàn)有的并行Apriori算法都采用一些成熟的并行計(jì)算框架和平臺進(jìn)行實(shí)現(xiàn),如現(xiàn)在廣泛應(yīng)用的MapReduce計(jì)算框架[6],MapReduce中鍵值對模型和Apriori算法匹配度較高.Apriori算法的每一次迭代都可以使用鍵值對的形式進(jìn)行數(shù)據(jù)傳遞,所以Apriori算法在MapReduce上進(jìn)行并行計(jì)算設(shè)計(jì)會較為簡單.Hadoop是實(shí)現(xiàn)MapReduce計(jì)算框架最佳的開源平臺之一[7],具有開源、穩(wěn)定性高、性能優(yōu)秀等優(yōu)點(diǎn),因此在Hadoop上出現(xiàn)了如MRApriori[8]等并行Apriori算法.但是基于Hadoop平臺的并行Apriori算法存在一些限制,Hadoop平臺將每一次迭代后的結(jié)果都存儲到基于硬盤的hdfs,然后在下一次迭代時(shí)從hdfs讀取數(shù)據(jù),如此會產(chǎn)生大量的I/O讀寫,造成算法挖掘速度的下降.而基于彈性分布式數(shù)據(jù)集架構(gòu)的Spark平臺[9]解決了此類問題,在每一次迭代結(jié)束時(shí)存儲迭代的中間結(jié)果到內(nèi)存并直接提供給下一次迭代進(jìn)行讀取,從而避免了硬盤的I/O消耗,使得算法的挖掘速度得到提高,如基于Spark的YAFIM算法[10].然而Spark平臺中新的迭代只能在更早的迭代都執(zhí)行完畢生成結(jié)果后才能執(zhí)行,會造成一定的時(shí)間延遲,從而算法的挖掘速度受到限制.
Apache公司的Flink平臺[11]使用完全基于流處理的結(jié)構(gòu)解決了這一問題,一個(gè)新的迭代可以只獲得部分結(jié)果就可以開始計(jì)算,避免了迭代延遲,并且同樣使用內(nèi)存對迭代的中間結(jié)果進(jìn)行存儲,使得算法的挖掘速度得到提高.
本文使用Flink平臺對并行Apriori算法進(jìn)行設(shè)計(jì)和實(shí)現(xiàn),采用Flink的流處理的結(jié)構(gòu)解決了算法批處理時(shí)出現(xiàn)的迭代延遲問題,并且將迭代后的結(jié)果存儲到基于內(nèi)存的緩存當(dāng)中,從而降低了迭代時(shí)的I/O消耗,提高了算法并行計(jì)算下的挖掘速度.在機(jī)器集群上進(jìn)行測試的結(jié)果表明,本文提出的基于Flink平臺實(shí)現(xiàn)的并行Apriori算法對大數(shù)據(jù)處理有著良好的適應(yīng)能力,并且挖掘速度得到提高.
Apriori算法為布爾關(guān)聯(lián)規(guī)則挖掘頻繁項(xiàng)集的原創(chuàng)性算法,使用一種稱為逐層搜索的迭代思想,其中k項(xiàng)集用于探索k+1項(xiàng)集.Apriori算法的迭代過程如下:
若為第一次迭代,則只是掃描事務(wù)數(shù)據(jù)從中獲得頻繁1項(xiàng)集,將其存儲.第一次迭代結(jié)束.
第2次迭代將頻繁1項(xiàng)集作為輸入數(shù)據(jù),通過連接操作生成候選項(xiàng)集,并掃描事務(wù)數(shù)據(jù)篩選出頻繁2項(xiàng)集.
…
第k+1次迭代,將頻繁k項(xiàng)集作為輸入數(shù)據(jù),通過連接操作生成候選項(xiàng)集,并掃描事務(wù)數(shù)據(jù)篩選出頻繁k+1項(xiàng)集.直到迭代無法再產(chǎn)生頻繁項(xiàng)集,算法迭代結(jié)束.
傳統(tǒng)的Apriori算法采用批處理的方式進(jìn)行迭代,每一次迭代的輸入數(shù)據(jù)都依賴于上一次迭代產(chǎn)生的中間結(jié)果,即頻繁項(xiàng)集.在接收到上一次迭代完整的頻繁項(xiàng)集之后,新的迭代采用連接操作,將頻繁項(xiàng)集進(jìn)行交叉組合并去重,產(chǎn)生新的候選項(xiàng)集,并對產(chǎn)生的候選項(xiàng)集進(jìn)行支持度統(tǒng)計(jì)篩選出頻繁項(xiàng)集,作為下一次迭代的輸入數(shù)據(jù).
可見,批處理的迭代方式存在迭代延遲現(xiàn)象.只有當(dāng)上一次的迭代完成之后,才能進(jìn)行新的迭代,會使得上一次迭代產(chǎn)生的部分中間結(jié)果處于等待的狀態(tài),無法進(jìn)行下一次的迭代.
當(dāng)采用流處理對迭代進(jìn)行實(shí)現(xiàn)時(shí),可在只接收到部分中間結(jié)果的情況下也可以進(jìn)行連接產(chǎn)生新的候選項(xiàng)集,并對產(chǎn)生的候選項(xiàng)集進(jìn)行實(shí)時(shí)的支持度統(tǒng)計(jì)篩選頻繁項(xiàng)集.當(dāng)有新的部分中間結(jié)果再次輸入時(shí),連接操作可以增量更新候選項(xiàng)集,從而避免了迭代延遲現(xiàn)象.
Apache公司的Flink平臺是一個(gè)面向分布式數(shù)據(jù)流處理和批數(shù)據(jù)處理的開源云計(jì)算平臺,針對數(shù)據(jù)流的分布式計(jì)算提供了數(shù)據(jù)分布、數(shù)據(jù)通信以及容錯(cuò)機(jī)制等功能.其中統(tǒng)一的批處理和流處理系統(tǒng)極為重要.Flink完全基于流處理,作為流處理看待時(shí)輸入數(shù)據(jù)流是無界的,而作為批處理看待時(shí)輸入數(shù)據(jù)流被定義為有界的.所以當(dāng)對關(guān)聯(lián)規(guī)則等批處理的算法進(jìn)行執(zhí)行時(shí),實(shí)際上調(diào)用的是有界輸入數(shù)據(jù)流的流處理功能.而基于流處理進(jìn)行迭代時(shí),只需要上一次的迭代生成部分結(jié)果就可以進(jìn)行下一次迭代,從而可以減少時(shí)間開銷.
并行Apriori算法主要計(jì)算量集中在迭代生成頻繁項(xiàng)集過程中,其它步驟的計(jì)算量相對較小.因此,基于Flink的并行Apriori算法的關(guān)鍵是實(shí)現(xiàn)迭代生成頻繁項(xiàng)集過程.Apriori算法本身的迭代機(jī)制和MapReduce計(jì)算模型非常契合,并且Flink平臺也支持MapReduce進(jìn)行實(shí)現(xiàn).因此,本文在Flink平臺下使用MapReduce計(jì)算模型對并行Apriori算法進(jìn)行設(shè)計(jì).
算法的設(shè)計(jì)如圖1,基于Flink的并行Apriori算法初始數(shù)據(jù)經(jīng)過劃分子數(shù)據(jù)之后,分別輸入到各個(gè)Map當(dāng)中,并生成
圖1 算法設(shè)計(jì)Figure 1 Algorithm design
本文提出了使用Flink平臺來實(shí)現(xiàn)Apriori算法,使用Flink的內(nèi)存緩存和流結(jié)構(gòu)的批處理等方法,解決了用Hadoop平臺實(shí)現(xiàn)存在的I/O消耗大和用Spark平臺實(shí)現(xiàn)存在的迭代延遲問題.Flink實(shí)現(xiàn)Apriori有兩個(gè)步驟.
圖2 頻繁1項(xiàng)集生成Figure 2 1-frequent itemset generation
步驟一實(shí)現(xiàn)過程如圖2:首先從分布式文件系統(tǒng)拿到輸入文件,一般是以文本文件的格式輸入.將文本中的數(shù)據(jù)先后用換行符和空白符進(jìn)行分割,得到適合算法進(jìn)行迭代的數(shù)據(jù)格式.下一步按照MapReduce的過程進(jìn)行執(zhí)行,最終可以得到所有的頻繁1項(xiàng)集.最后將得到的所有頻繁1項(xiàng)集依然以鍵值對的形式進(jìn)行存儲,在Flink中可以存儲到Flink緩存當(dāng)中,而Flink的緩存是基于內(nèi)存的,故而在步驟二中讀取輸入數(shù)據(jù)時(shí)可以節(jié)省一定的I/O開銷.
步驟二實(shí)現(xiàn)過程如圖3:首先從從緩存中拿到鍵值對形式的頻繁1項(xiàng)集,然后進(jìn)行連接操作生成候選項(xiàng)集,并建立哈希索引樹用于查詢候選項(xiàng)集是否在事務(wù)數(shù)據(jù)當(dāng)中.若在事務(wù)數(shù)據(jù)當(dāng)中,則輸出Map結(jié)果
圖3 算法迭代Figure 3 Algorithm iteration
本文提出的Flink平臺下并行Apriori算法的實(shí)現(xiàn),使用MapReduce計(jì)算框架對算法進(jìn)行設(shè)計(jì),避免了并行算法的實(shí)現(xiàn)復(fù)雜度,使得算法在大數(shù)據(jù)環(huán)境下有著良好的適應(yīng)能力.并且,在算法實(shí)現(xiàn)過程中使用Flink內(nèi)存緩存迭代結(jié)果和Flink流處理結(jié)構(gòu)避免了I/O消耗過大和迭代延遲等問題,使得在迭代次數(shù)較多和迭代輸出結(jié)果較多的情況下,算法的挖掘速度得到提高.
本文搭建了三臺機(jī)器的Flink集群進(jìn)行實(shí)驗(yàn),其中有一個(gè)master節(jié)點(diǎn)和2個(gè)slave節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都是CPU為I5 4590,8 G內(nèi)存的機(jī)器,并且是操作系統(tǒng)為Centos6.4和Flink版本為0.9.1的軟件環(huán)境.
實(shí)驗(yàn)用的數(shù)據(jù)集是FIMI存儲庫(http://fimi.ua.ac.be/data/)提供的webdocks事務(wù)數(shù)據(jù)集,此數(shù)據(jù)集是爬蟲抓取的并經(jīng)過處理后公開的網(wǎng)頁文檔數(shù)據(jù).數(shù)據(jù)集事務(wù)條數(shù)為169萬.本文在此數(shù)據(jù)集的基礎(chǔ)上將此數(shù)據(jù)集分割成50 MB、100 MB、150 MB、200 MB、250 MB、300 MB、500 MB等塊數(shù)據(jù)集.
本文采用加速比、數(shù)據(jù)伸縮率和擴(kuò)展率對實(shí)驗(yàn)結(jié)果進(jìn)行評價(jià).其中加速比代表著當(dāng)算法采用機(jī)器集群實(shí)現(xiàn)時(shí),相比較于算法單機(jī)實(shí)現(xiàn)的時(shí)性能提高速率,等于算法單臺機(jī)器的串行處理時(shí)間除以機(jī)器集群的并行處理時(shí)間,當(dāng)加速比呈現(xiàn)線性增長時(shí)代表著該算法擁有較好的并行計(jì)算性能;數(shù)據(jù)伸縮率代表著隨著數(shù)據(jù)規(guī)模的變大,并行算法處理相應(yīng)規(guī)模數(shù)據(jù)所花費(fèi)時(shí)間的變化情況,當(dāng)數(shù)據(jù)伸縮率呈現(xiàn)線性時(shí),代表著算法可以隨著數(shù)據(jù)規(guī)模的變化,依然具有較好的計(jì)算能力;擴(kuò)展率用來查看隨著機(jī)器規(guī)模增加時(shí),并行算法處理能力的變化情況.
加速比分析:算法的加速比實(shí)驗(yàn)結(jié)果如圖4,圖4中橫坐標(biāo)是數(shù)據(jù)集大小,采用50 MB到500 MB的區(qū)間,縱坐標(biāo)是算法在單機(jī)情況和3臺機(jī)器的集群情況下的加速比.從圖4可以看出算法的加速比總體上是接近線性的.因?yàn)镕link集群的啟動和通信需要一定時(shí)間,所以當(dāng)數(shù)據(jù)集較小的時(shí)候,并行Apriori算法啟動時(shí)間占總時(shí)間的比例較大,此時(shí)算法的加速比性能不是很好.而當(dāng)數(shù)據(jù)集規(guī)模變大,算法的加速比性能越來越好,并呈現(xiàn)線性增長,說明基于Flink實(shí)現(xiàn)的并行Apriori算法具有良好的加速比,算法的運(yùn)行性能有良好的提升空間.
圖4 加速比Figure 4 Acceleration ratio
數(shù)據(jù)伸縮率分析:算法的數(shù)據(jù)伸縮率實(shí)驗(yàn)結(jié)果如圖5,圖5中橫坐標(biāo)是數(shù)據(jù)集與最小數(shù)據(jù)集的比值,縱坐標(biāo)是數(shù)據(jù)集處理時(shí)間與最小數(shù)據(jù)集的處理時(shí)間的比值.從圖5可以看出,在Flink集群上的Apriori算法的執(zhí)行時(shí)間是隨著數(shù)據(jù)集的規(guī)模同時(shí)增加的,說明算法可以有效應(yīng)用于大規(guī)模數(shù)據(jù)處理.
圖5 數(shù)據(jù)伸縮率Figure 5 Data scalability
擴(kuò)展率分析:算法的擴(kuò)展率實(shí)驗(yàn)結(jié)果如圖6,圖6中橫坐標(biāo)是集群的機(jī)器臺數(shù),縱坐標(biāo)是加速比.從圖6可以看出,在Flink集群上的Apriori算法隨著集群機(jī)器臺數(shù)的增加,加速比也呈現(xiàn)線性增長,可以看出機(jī)器臺數(shù)的增加能夠明顯提高挖掘效率,體現(xiàn)了并行算法良好的擴(kuò)展性.
圖6 擴(kuò)展率Figure 6 Expansion rate
由以上實(shí)驗(yàn)可以看出,本文基于Flink平臺實(shí)現(xiàn)的Apriori算法具有良好的性能提升空間和擴(kuò)展能力,能夠適應(yīng)大數(shù)據(jù)量的挖掘.
本文將基于Flink實(shí)現(xiàn)的并行Apriori算法和在Spark平臺上的YAFIM算法在webdocks數(shù)據(jù)集上進(jìn)行速度性能評估,支持度設(shè)為0.25,算法在此支持度上需要進(jìn)行迭代的次數(shù)較多并且將生成較多的迭代結(jié)果.
圖7 算法性能對比Figure 7 Algorithm performance comparison
從圖7可以看出,在數(shù)據(jù)集較小的情況下,兩個(gè)算法的挖掘速度相差不多,是因?yàn)閿?shù)據(jù)集較小時(shí),所需要迭代的次數(shù)和時(shí)間都較少,而隨著數(shù)據(jù)集數(shù)量的增長,迭代次數(shù)和迭代時(shí)間也隨之增長,此時(shí)本文基于Flink平臺的算法的挖掘效率得到提高,在迭代次數(shù)和迭代結(jié)果較多的情況下挖掘速度要快于基于Spark平臺的YAFIM算法.可見,基于Flink平臺實(shí)現(xiàn)的并行Apriori算法的挖掘速度得到提高.
本文基于Flink平臺對并行Apriori算法進(jìn)行設(shè)計(jì)和實(shí)現(xiàn),通過MapReduce計(jì)算框架進(jìn)行設(shè)計(jì),并采用Flink流處理結(jié)構(gòu)和內(nèi)存緩存提高了算法挖掘速度.通過實(shí)驗(yàn)可見本文實(shí)現(xiàn)的算法能夠適應(yīng)大數(shù)據(jù)量的挖掘,并且擁有較快的挖掘速度.