付 晨,鐘 誠,葉 波
(1.廣西大學(xué) 計(jì)算機(jī)與電子信息學(xué)院,南寧 530004; 2.廣西科技信息網(wǎng)絡(luò)中心,南寧 530012)
(*通信作者電子郵箱chzhong@gxu.edu.cn)
MapReduce并行加速數(shù)據(jù)流多模式相似性搜索
付 晨1,鐘 誠1*,葉 波2
(1.廣西大學(xué) 計(jì)算機(jī)與電子信息學(xué)院,南寧 530004; 2.廣西科技信息網(wǎng)絡(luò)中心,南寧 530012)
(*通信作者電子郵箱chzhong@gxu.edu.cn)
設(shè)計(jì)時(shí)間序列數(shù)據(jù)在Hadoop分布式文件系統(tǒng)(HDFS)中的有效存儲(chǔ)方式,利用分布式緩存工具Distributed Cache將各子序列分發(fā)到Hadoop集群的計(jì)算節(jié)點(diǎn)上,將動(dòng)態(tài)時(shí)間彎曲距離矩陣劃分成多個(gè)子矩陣,采取并行迭代計(jì)算每條反對(duì)角線上子矩陣的方法,基于MapReduce編程模型,實(shí)現(xiàn)高效并行計(jì)算時(shí)間序列動(dòng)態(tài)彎曲距離,通過改進(jìn)剪裁冗余計(jì)算方法,設(shè)計(jì)實(shí)現(xiàn)一種數(shù)據(jù)流多模式相似性搜索并行算法。中國雪深長(zhǎng)時(shí)間序列數(shù)據(jù)集的實(shí)驗(yàn)結(jié)果表明,當(dāng)每條時(shí)間序列的長(zhǎng)度達(dá)到5 000以上時(shí),并行計(jì)算動(dòng)態(tài)彎曲距離所需時(shí)間少于串行計(jì)算所需時(shí)間,當(dāng)每條時(shí)間序列的長(zhǎng)度達(dá)到9 000以上時(shí),參與計(jì)算的集群節(jié)點(diǎn)越多,并行計(jì)算所需時(shí)間越少;當(dāng)模式長(zhǎng)度達(dá)到4 000、參與計(jì)算的集群節(jié)點(diǎn)數(shù)達(dá)5個(gè)以上時(shí),從數(shù)據(jù)流中并行搜索出與模式匹配的相似子序列所需時(shí)間約為串行搜索所需時(shí)間的20%。
時(shí)間序列;數(shù)據(jù)流;動(dòng)態(tài)時(shí)間彎曲距離;模式搜索;Hadoop
時(shí)間序列數(shù)據(jù)流相似性搜索在網(wǎng)絡(luò)點(diǎn)擊流分析、金融分析、氣象監(jiān)測(cè)、語音識(shí)別等諸多領(lǐng)域具有廣泛的應(yīng)用[1]。大數(shù)據(jù)環(huán)境下長(zhǎng)時(shí)間序列數(shù)據(jù)流多模式相似性搜索十分耗時(shí)。因此,在分布式并行計(jì)算Hadoop平臺(tái)上,研究設(shè)計(jì)實(shí)現(xiàn)高效的數(shù)據(jù)流多模式相似性搜索并行算法具有重要應(yīng)用價(jià)值。
歐氏距離及其擴(kuò)展的相似性度量方法針對(duì)時(shí)間序列在時(shí)間軸上拉伸、收縮、平移等變形的健壯性不強(qiáng)[2]。動(dòng)態(tài)時(shí)間彎曲(Dynamic Time Warping, DTW)距離[3]能夠度量不同步不等長(zhǎng)的時(shí)間序列,使它們通過一定的變形進(jìn)行比較,適合于時(shí)間序列相似性比較。文獻(xiàn)[4]利用MPI(Message Passing Interface)和Open MP機(jī)制,采取均勻劃分?jǐn)?shù)據(jù)分配方法,設(shè)計(jì)機(jī)群計(jì)算時(shí)間序列動(dòng)態(tài)彎曲距離的并行算法,獲得了較好的加速。文獻(xiàn)[5]針對(duì)無線傳感器網(wǎng)絡(luò)環(huán)境下不確定異常時(shí)間序列檢測(cè)效率低下的問題,對(duì)不確定時(shí)間序列進(jìn)行壓縮變換,以減少不確定數(shù)據(jù)量,利用MapReduce架構(gòu)實(shí)現(xiàn)基于期望距離的不確定時(shí)間序列動(dòng)態(tài)彎曲距離算法并行化,同時(shí)提出了基于顯著特征匹配的局部約束算法,對(duì)彎曲路徑進(jìn)行局部限制,提高了檢測(cè)效率。文獻(xiàn)[6]提出一種判斷累加距離是否超出閾值的推算方法,以減少一些不必要的冗余計(jì)算。文獻(xiàn)[7]利用值域劃分柱圖,將時(shí)間序列映射到k維空間,構(gòu)造新的距離函數(shù),獲得動(dòng)態(tài)時(shí)間彎曲距離的新的上、下界,從而縮小原始候選集,使算法僅需在小規(guī)模候選集上計(jì)算DTW距離,降低了計(jì)算復(fù)雜度。時(shí)間序列動(dòng)態(tài)彎曲距離的一個(gè)重要應(yīng)用是被用來搜索數(shù)據(jù)流中的相似模式。通過保存時(shí)間序列累計(jì)距離和候選序列的起始點(diǎn),采取計(jì)算時(shí)間子序列匹配矩陣的方法,文獻(xiàn)[8]的SPRING算法解決了針對(duì)數(shù)據(jù)流連續(xù)、實(shí)時(shí)、無限制等特性的時(shí)間序列的子序列匹配問題,但該算法存在冗余計(jì)算。文獻(xiàn)[9]構(gòu)造一個(gè)計(jì)分函數(shù)來降低動(dòng)態(tài)時(shí)間彎曲距離的計(jì)算量,然后利用得到的時(shí)間彎曲距離來搜索發(fā)現(xiàn)數(shù)據(jù)流中公共的局部相似模式。
本文基于Hadoop平臺(tái),通過設(shè)計(jì)時(shí)間序列數(shù)據(jù)在HDFS中的有效存儲(chǔ)方式和改進(jìn)剪裁減少冗余計(jì)算方法,采取MapReduce并行迭代計(jì)算每條反對(duì)角線上動(dòng)態(tài)時(shí)間彎曲距離子矩陣的方法,設(shè)計(jì)實(shí)現(xiàn)高效的數(shù)據(jù)流多模式相似性搜索并行算法,在獲得較好匹配效果的同時(shí),大大縮短計(jì)算時(shí)間。
1.1 遞推計(jì)算動(dòng)態(tài)時(shí)間彎曲距離
采用遞推方法計(jì)算時(shí)間序列X={x1,x2,…,xm}和Y={y1,y2,…,yn}之間的動(dòng)態(tài)彎曲距離矩陣D(X,Y)[9]:
(1)
其中c=min{d(i,j-1),d(i-1,j),d(i-1,j-1)},i=1,2,…,m,j=1,2,…,n。從動(dòng)態(tài)時(shí)間彎曲距離矩陣中可以尋找出彎曲路徑DTW(X,Y)={w1,w2,…,wk},從而得到X和Y之間的匹配關(guān)系。
1.2 數(shù)據(jù)存儲(chǔ)與算法設(shè)計(jì)
1.2.1 算法設(shè)計(jì)
采用MapReduce編程模型可有效處理大規(guī)??茖W(xué)計(jì)算問題[10]。為了在Hadoop平臺(tái)上實(shí)現(xiàn)并行計(jì)算動(dòng)態(tài)時(shí)間彎曲距離矩陣,將序列X劃分為長(zhǎng)度分別為「m/p?的p個(gè)子序列X0,X1,…,Xp-1,并將序列Y劃分為長(zhǎng)度分別為「n/q?的q個(gè)子序列Y0,Y1,…,Yq-1。利用Hadoop分布式緩存工具Distributed Cache將各子序列分發(fā)到Hadoop集群計(jì)算節(jié)點(diǎn)上,于是構(gòu)造p×q個(gè)子矩陣D(f,g),f=1,2,…,p,g=1,2,…,q,每個(gè)子矩陣規(guī)模為「m/p?×「n/q?,這些子矩陣D(f,g)可以并行計(jì)算[11]。
算法1描述了Hadoop平臺(tái)上并行計(jì)算動(dòng)態(tài)時(shí)間彎曲距離(ParallelComputingDistancesofTimeWinding,PCDTW)算法。
算法1PCDTW算法。
輸入:時(shí)間序列X和Y。 輸出:動(dòng)態(tài)時(shí)間彎曲距離矩陣D。Begin1) 計(jì)算子矩陣D(1,1),將其最后一行和最后一列存入HDFS中; 2) 讀取HDFS中D(1,1)最后一行,存入D(2,1)的第0行,用于D(2,1)的計(jì)算;讀取HDFS中D(1,1)最后一列,存入D(1,2)的第0列,用于D(1,2)的計(jì)算;將D(2,1)的最后一行與D(1,2)的最后一列存儲(chǔ)到HDFS中; 3) 獲取子矩陣D(2,1)的最后一行,存入D(3,1)的第0行;獲取D(1,2)的最后一行及D(2,1)的最后一列,分別存入D(2,2)的第0行和第0列;獲取D(1,2)的最后一列,存入D(1,3)的第0列;并行計(jì)算D(3,1)、D(2,2)和D(1,3); 4) 依此類推,獲取子矩陣D(f-1,g)的最后一行及D(f,g-1)的最后一列,存入D(f,g)的第0行和第0列,計(jì)算D(f,g),將D(f,g)的最后一行傳送給D(f+1,g)計(jì)算,將D(f,g)的最后一列傳送給D(f,g+1)計(jì)算;并行計(jì)算每條反對(duì)角線上的子矩陣,每次Map/Reduce過程完成計(jì)算一條反對(duì)角線上的子矩陣; 5) 若距離累加超過事先設(shè)定的閾值,則算法結(jié)束;否則迭代k=p+q-1次,并行計(jì)算D(f,g);End
1.2.2 動(dòng)態(tài)時(shí)間彎曲距離矩陣數(shù)據(jù)存儲(chǔ)設(shè)計(jì)
HDFS文件中每一行存儲(chǔ)動(dòng)態(tài)時(shí)間彎曲距離子矩陣中每個(gè)元素的信息,子矩陣信息用坐標(biāo)(子矩陣分塊行號(hào),子矩陣分塊列號(hào))表示,元素信息包括(行號(hào),列號(hào)#元素值)。HDFS中每一行的存儲(chǔ)格式為〈(子矩陣分塊行號(hào),子矩陣分塊列號(hào))(元素行號(hào),元素列號(hào)#元素值)〉,即以〈(f,g)(i,j#d[i][j])〉的形式存儲(chǔ),f和g代表子矩陣在動(dòng)態(tài)時(shí)間彎曲距離矩陣分塊中的行、列號(hào),i和j為子矩陣元素在子矩陣中的行、列號(hào),d[i][j]代表元素值。
1.2.3Map過程的設(shè)計(jì)
并行計(jì)算Map階段的主要工作:從HDFS文件中逐行讀取動(dòng)態(tài)時(shí)間彎曲距離矩陣數(shù)據(jù)〈(f,g)(i,j#d[i][j])〉;篩選出本次計(jì)算的子矩陣分塊坐標(biāo)(f,g),獲取子矩陣的第0行元素和第0列元素。
如果上一輪迭代計(jì)算有相關(guān)子矩陣傳遞最后一行記錄數(shù)據(jù),那么本次計(jì)算的子矩陣用第0行接收。如果上一輪迭代計(jì)算有相關(guān)子矩陣傳遞最后一列記錄數(shù)據(jù),那么本次計(jì)算的子矩陣用第0列接收。
算法2 并行計(jì)算DTW距離的Map函數(shù)。
輸入:〈key1,value1〉為〈行號(hào),HDFS中每一行記錄〉。 輸出:中間結(jié)果〈key2,value2〉。Begin1) 逐行讀取HDFS文件,獲取子矩陣中每個(gè)元素信息〈(f,g)(i,j#d[i][j])〉: ①key1←行號(hào); ②value1←〈(f,g)(i,j#d[i][j])〉。 2) 篩選出本輪迭代計(jì)算的子矩陣分塊坐標(biāo)(f,g),記錄其第0行和第0列信息; 3) 將中間結(jié)果〈key2,value2〉寫入本地節(jié)點(diǎn)的中間文件: ①key2←(f,g);value2←(0,j#d[0][j]); 將中間結(jié)果〈key2,value2〉以“〈(子矩陣分塊行號(hào),子矩陣分塊列號(hào))(第0行,元素列號(hào)#元素值)〉”的形式(即〈(f,g)(0,j#d[0][j])〉)寫入本地中間文件; ②key2←(f,g);value2←(i,0#d[i][0]); 將中間結(jié)果〈key2,value2〉以“〈(子矩陣分塊行號(hào),子矩陣分塊列號(hào))(元素行號(hào),第0列#元素值)〉”的形式(即〈(f,g)(i,0#d[i][0])〉)寫入中間文件;End
算法2中步驟3)的中間結(jié)果key2值為本次計(jì)算的子矩陣坐標(biāo)(f,g),相應(yīng)的value2值為本次計(jì)算子矩陣第0行每個(gè)元素信息(0,j#d[0][j])或者第0列元素的信息(i,0#d[i][0])。通過算法2可篩選本次計(jì)算的子矩陣分塊坐標(biāo)(f,g),獲取本次計(jì)算的子矩陣的第0行元素和第0列元素,并將其記錄到中間文件。
1.2.4Reduce函數(shù)的設(shè)計(jì)
并行計(jì)算Reduce階段的任務(wù):根據(jù)輸入的鍵值對(duì)〈key2,list〈value2〉〉,接收相同key2值對(duì)應(yīng)的list〈value2〉得到本次計(jì)算子矩陣第0行元素集合和第0列元素集合。提取分布式緩存中本次參與計(jì)算子序列Xf和Yg。記錄本輪迭代計(jì)算的反對(duì)角線上的各個(gè)DTW距離子矩陣,同時(shí)將子矩陣最后一行和最后一列寫到HDFS結(jié)果文件中,以傳遞給下一輪迭代計(jì)算時(shí)Map函數(shù)使用。
算法3 并行計(jì)算DTW距離的Reduce函數(shù)。
輸入:key2值為子矩陣坐標(biāo)(f,g),list〈value2〉值為計(jì)算子矩陣第0行元素集合和第0列元素集合。 輸出:結(jié)果〈key3,value3〉。
Begin1)for(相同key2值(f,g)對(duì)應(yīng)的list〈value2〉)dobegin2) 獲取value2,給子矩陣第0行和第0列賦值; 3) 提取分布式緩存中參與計(jì)算子矩陣D(f,g)的子序列Xf和Yg; 4) 計(jì)算并記錄子矩陣中各DTW距離d[i][j];
5) 將〈key3,value3〉寫入HDFS的結(jié)果文件: ①key3←本輪迭代計(jì)算的子矩陣坐標(biāo)(f,g);value3←(i,j#d[i][j]);將本次計(jì)算的子矩陣結(jié)果〈key3,value3〉以“〈(子矩陣分塊行號(hào),子矩陣分塊列號(hào))(元素行號(hào),元素列號(hào)#元素值)〉”的形式(即〈(f,g)(i,j#d[i][j])〉)寫入HDFS的結(jié)果文件中; ②d[c][j] ←本次計(jì)算的子矩陣最后一行元素值;key3←下輪迭代要計(jì)算的子矩陣坐標(biāo)(f+1,g);value3←(0,j#d[c][j]);將下輪迭代計(jì)算的子矩陣〈key3,value3〉以“〈(子矩陣分塊行號(hào),子矩陣分塊列號(hào))(第0行,元素列號(hào)#元素值)〉”的形式(即〈(f+1,g)(0,j#d[c][j])〉)寫入HDFS的結(jié)果文件中; ③d[i][c]←本次計(jì)算的子矩陣最后一列元素值;key3←下輪迭代計(jì)算的子矩陣坐標(biāo)(f,g+1);value3←(i,0#d[i][c]);將下輪迭代計(jì)算的子矩陣〈key3,value3〉以“〈(子矩陣分塊行號(hào),子矩陣分塊列號(hào))(元素行號(hào),第0列#元素值)〉”的形式(即〈(f,g+1)(i,0#d[i][c])〉)寫入HDFS的結(jié)果文件中;end
End
并行計(jì)算DTW距離矩陣的主函數(shù)迭代地啟動(dòng)Job調(diào)用算法2和3的Map/Reduce函數(shù)。一輪迭代過程結(jié)束則計(jì)算完成一條反對(duì)角線上的子矩陣塊,迭代次數(shù)加1,輸出路徑編號(hào)加1。算法采用MapReduce的多目錄輸出,將要傳遞的子矩陣最后一行及最后一列與本次計(jì)算的子矩陣分開不同目錄存放。僅將本次MapReduce的輸出路徑中存放子矩陣最后一行及最后一列的目錄,作為下輪迭代計(jì)算的輸入目錄,從而避免了不必要的數(shù)據(jù)復(fù)制和傳遞。在Reduce過程中,若出現(xiàn)DTW距離超過閾值,則算法提前結(jié)束。所有Reduce寫入的子矩陣結(jié)果集合即為最終結(jié)果的DTW距離矩陣。
模式Q與數(shù)據(jù)流S子序列S[ts,te]之間的DTW距離為d(S[ts,te],Q),起始點(diǎn)位置記為sp(t,i)[8]:
(2)
其中d(t,0)=0,d(0,i)=∞,t=1,2,…,n,i=1,2,…,m。
(3)
從式(2)可看出,若min{d(t,i-1),d(t-1,i-1),d(t-1,i)}>ε且d(t,i)≥0,則d(t,i)>ε,ε為相似性閾值。因此,可以按式(2)裁剪計(jì)算,提前排除超出閾值的子序列,以提高搜索效率。
算法4描述了Hadoop平臺(tái)上時(shí)間序列數(shù)據(jù)流多模式相似性并行搜索(Multi-PatternParallelSearchingOfDistributedStreams,MPPSODS)算法。
算法4MPPSODS算法。
輸入:數(shù)據(jù)流S,模式Q1,Q2,Q3,…。 輸出:S中與每個(gè)模式相匹配的前k條數(shù)據(jù)流子序列對(duì)應(yīng)的起止點(diǎn)。Begin1)Map階段。將數(shù)據(jù)流分段分配到不同的Map任務(wù)中,每個(gè)任務(wù)在相應(yīng)的DataNode上采用改進(jìn)的SPRING算法進(jìn)行相似性搜索: ①將數(shù)據(jù)流分段S1,S2,S3,…以及模式Q1,Q2,Q3,…發(fā)送到各計(jì)算節(jié)點(diǎn)上,Map獲取計(jì)算的數(shù)據(jù)流分段及其id號(hào)、模式及其id號(hào); ②每個(gè)節(jié)點(diǎn)運(yùn)行改進(jìn)的SPRING算法,判斷相似性閾值是否符合提前終止的條件; ③將模式id號(hào)與每次計(jì)算得到的DTW距離連接作為中間結(jié)果〈key,value〉中的key,將匹配子序列的起始點(diǎn)、結(jié)束點(diǎn)存儲(chǔ)在value中。 2) 中間階段。將所有中間結(jié)果〈key,value〉經(jīng)過一個(gè)名為Shuffle的過程,按key值排序并分配給對(duì)應(yīng)的Reducer處理; 3) Reduce階段。輸入的〈key,list〈value〉〉已按從小到大的順序排列好,分解key中模式id與DTW距離,對(duì)于模式id相同的記錄只讀取前k條記錄作為結(jié)果輸出,同時(shí)將前k條記錄匹配的數(shù)據(jù)流子序列對(duì)應(yīng)的起止點(diǎn)寫入文件;End
3.1 實(shí)驗(yàn)數(shù)據(jù)及環(huán)境
實(shí)驗(yàn)數(shù)據(jù)來源于“寒區(qū)旱區(qū)科學(xué)數(shù)據(jù)中心”(http://westdc.westgis.ac.cn)的中國雪深長(zhǎng)時(shí)間序列數(shù)據(jù)集(1978—2012)[12-14],將其中的時(shí)間序列文件預(yù)處理為時(shí)間序列的每個(gè)數(shù)值點(diǎn)占一行,以〈序號(hào),數(shù)值〉的形式存儲(chǔ)。
Hadoop平臺(tái)采用通過交換機(jī)連接的6臺(tái)計(jì)算機(jī)組成分布式并行計(jì)算環(huán)境,其中每臺(tái)計(jì)算機(jī)的內(nèi)存容量為2GB、處理器為IntelPentiumCPUG2020、主頻為2.90GHz,將1臺(tái)計(jì)算機(jī)的角色作為主節(jié)點(diǎn)(Master)、名稱節(jié)點(diǎn)(NameNode)和作業(yè)跟蹤節(jié)點(diǎn)(JobTracker),其余5臺(tái)計(jì)算機(jī)的角色作為從節(jié)點(diǎn)(Slave)、數(shù)據(jù)節(jié)點(diǎn)(DataNode)和任務(wù)跟蹤節(jié)點(diǎn)(TaskTracker),系統(tǒng)網(wǎng)絡(luò)傳輸速率為100Mb/s。Hadoop版本為hadoop-1.0.4。運(yùn)行的操作系統(tǒng)為ubuntu-10.04.4版本。開發(fā)環(huán)境為Eclipse,采用Java語言編程實(shí)現(xiàn)算法。
3.2 實(shí)驗(yàn)結(jié)果
首先測(cè)試并行計(jì)算動(dòng)態(tài)時(shí)間彎曲距離(PCDTW)算法對(duì)運(yùn)行時(shí)間改進(jìn)的程度。
對(duì)于12個(gè)不同規(guī)模的數(shù)據(jù)集,當(dāng)Hadoop平臺(tái)上參與計(jì)算的節(jié)點(diǎn)數(shù)目逐步增多時(shí),圖1給出了PCDTW并行算法和串行算法的運(yùn)行時(shí)間。
從圖1的結(jié)果可以看出,當(dāng)時(shí)間序列長(zhǎng)度小于5 000時(shí),PCDTW并行算法的運(yùn)行時(shí)間多于串行算法的運(yùn)行時(shí)間,且集群中參與計(jì)算節(jié)點(diǎn)越多反而越耗時(shí)。這是因?yàn)閷?duì)于較短的時(shí)間序列,需要計(jì)算的DTW距離矩陣也相對(duì)較小,而每次計(jì)算一條反對(duì)角線上的子矩陣,都要啟動(dòng)一次Job調(diào)用Map/Reduce過程,通信交互和輸入輸出IO操作都相對(duì)耗時(shí)。
當(dāng)每條時(shí)間序列的長(zhǎng)度達(dá)到5 000以上時(shí),在Hadoop上運(yùn)行PCDTW算法的耗時(shí)低于串行算法。時(shí)間序列越長(zhǎng),PCDTW算法加速效果越明顯。當(dāng)每條時(shí)間序列的長(zhǎng)度達(dá)到9 000以上時(shí),集群中參與計(jì)算的節(jié)點(diǎn)越多,PCDTW算法所需運(yùn)行時(shí)間越少。
值得注意的是,當(dāng)每條時(shí)間序列的長(zhǎng)度達(dá)到8 000以上時(shí),運(yùn)行串行算法產(chǎn)生內(nèi)存溢出,無法獲得計(jì)算結(jié)果。
圖1 PCDTW算法與串行算法的運(yùn)行時(shí)間
對(duì)于時(shí)間序列X和Y的長(zhǎng)度在10 000以上的數(shù)據(jù)集A、B、C、D和E,PCDTW算法計(jì)算DTW距離矩陣需要的存儲(chǔ)容量如表1所示。
表1 計(jì)算DTW距離矩陣需要的存儲(chǔ)容量
從表1可知,當(dāng)時(shí)間序列較長(zhǎng)時(shí),計(jì)算兩序列之間的DTW距離矩陣需要較大的存儲(chǔ)容量。
圖2給出了Hadoop集群中參與計(jì)算的節(jié)點(diǎn)數(shù)目分別為2,3,4,5和6時(shí),PCDTW算法計(jì)算數(shù)據(jù)集A、B、C、D和E的DTW距離所需的時(shí)間。
圖2 參與計(jì)算節(jié)點(diǎn)增多時(shí)PCDTW算法的運(yùn)行時(shí)間
從圖2可以看出,當(dāng)參與計(jì)算的節(jié)點(diǎn)數(shù)目固定時(shí),時(shí)間序列數(shù)據(jù)集規(guī)模越大,PCDTW算法所需時(shí)間也越多;對(duì)于同一個(gè)數(shù)據(jù)集,當(dāng)集群中參與計(jì)算的節(jié)點(diǎn)增多時(shí),PCDTW算法所需時(shí)間逐漸減少。這表明,Hadoop平臺(tái)上并行計(jì)算時(shí)間序列DTW矩陣的PCDTW算法能夠利用集群中不斷增多的節(jié)點(diǎn)的計(jì)算能力。
加速比反映了算法的并行性對(duì)運(yùn)行時(shí)間的改進(jìn)程度。Hadoop集群中參與計(jì)算的節(jié)點(diǎn)數(shù)目分別為2,3,4,5和6時(shí),PCDTW并行算法獲得的加速比如圖3所示。
圖3的結(jié)果表明,隨著時(shí)間序列的增長(zhǎng),PCDTW算法獲得的加速比逐漸提高。當(dāng)每條時(shí)間序列長(zhǎng)度達(dá)到7 000以上時(shí),在Hadoop集群上運(yùn)行的PCDTW算法加速更加明顯。這說明,PCDTW并行算法適用于計(jì)算時(shí)間序列長(zhǎng)、數(shù)據(jù)集大的動(dòng)態(tài)時(shí)間彎曲距離。
圖3 參與計(jì)算節(jié)點(diǎn)增多時(shí)PCDTW算法的加速比
為了測(cè)試數(shù)據(jù)流多模式相似性搜索并行算法MPPSODS相對(duì)于串行算法STRING運(yùn)行時(shí)間的改進(jìn)程度,將包含25 000個(gè)數(shù)據(jù)點(diǎn)的數(shù)據(jù)流劃分為5段,依次對(duì)表2中的H、I、J和K這4組模式集中每組的3個(gè)模式(查詢序列)進(jìn)行并行搜索,算法所需的運(yùn)行時(shí)間如圖4所示。
表2 模式(查詢序列)集及相似性閾值
圖4 MPPSODS算法與串行算法的運(yùn)行時(shí)間
從圖4中可看出,隨著模式(查詢序列)中時(shí)間序列長(zhǎng)度的增大,并行算法MPPSODS和串行算法SPRING搜索時(shí)間也隨之增長(zhǎng),但SPRING算法所需的運(yùn)行時(shí)間增長(zhǎng)較快,而MPPSODS算法的運(yùn)行時(shí)間增長(zhǎng)較為緩慢,且Hadoop集群中參與計(jì)算的節(jié)點(diǎn)越多,MPPSODS算法運(yùn)行時(shí)間越少。
對(duì)于H、I、J和K這4組模式(查詢序列),圖5給出了Hadoop集群參與計(jì)算的節(jié)點(diǎn)增多時(shí),MPPSODS算法的運(yùn)行時(shí)間。
圖5的結(jié)果表明,對(duì)于同一組模式(查詢序列),隨著Hadoop集群中參與計(jì)算的節(jié)點(diǎn)增多,MPPSODS算法運(yùn)行時(shí)間逐漸減少;而且模式越長(zhǎng)、參與計(jì)算的節(jié)點(diǎn)越多,MPPSODS算法所需時(shí)間越少。這說明,MPPSODS算法有效利用了Hadoop集群中逐步增多的節(jié)點(diǎn)的計(jì)算能力。
圖6~8分別給出MPPSODS算法對(duì)模式(查詢序列)集H中3條序列HQ1、HQ2、HQ3進(jìn)行相似性搜索得到的結(jié)果。
圖5 參與計(jì)算節(jié)點(diǎn)增多時(shí)MPPSODS算法的運(yùn)行時(shí)間
圖6 MPPSODS算法對(duì)HQ1搜索的結(jié)果
圖7 MPPSODS算法對(duì)HQ2搜索的結(jié)果
從圖6可以看到,在數(shù)據(jù)流上搜索出與模式(查詢序列)HQ1最為匹配的數(shù)據(jù)流子序列的開始位置為6 362、結(jié)束位置為7 337、長(zhǎng)度為986。
從圖7可以看到,在數(shù)據(jù)流上搜索出與模式(查詢序列)HQ2最為匹配的數(shù)據(jù)流子序列的開始位置為2 008、結(jié)束位置為3 001、長(zhǎng)度為994。
從圖8可以看到,在數(shù)據(jù)流上搜索出與模式(查詢序列)HQ3最為匹配的數(shù)據(jù)流子序列的開始位置為11 455、結(jié)束位置為12 425、長(zhǎng)度為971。
圖6~8的結(jié)果表明,MPPSODS算法并行搜索數(shù)據(jù)流得到匹配的子序列與模式(查詢序列)在形態(tài)上具有很高的相似性。
圖8 MPPSODS算法對(duì)HQ3搜索的結(jié)果
時(shí)間序列數(shù)據(jù)流具有連續(xù)、實(shí)時(shí)和無限制特性。大數(shù)據(jù)環(huán)境下長(zhǎng)時(shí)間序列的動(dòng)態(tài)彎曲距離計(jì)算和數(shù)據(jù)流多模式相似性搜索相當(dāng)耗時(shí),并行求解算法提供了有效的解決方案?;贖adoop平臺(tái)和MapReduce編程模型,本文設(shè)計(jì)實(shí)現(xiàn)的并行算法能夠利用集群中不斷增加的節(jié)點(diǎn)的計(jì)算能力,高效地計(jì)算動(dòng)態(tài)時(shí)間彎曲距離和從數(shù)據(jù)流中搜索出與模式相似的子序列,所得到的匹配子序列與模式序列在形態(tài)上具有很高的相似性。下一步的工作將在Hadoop平臺(tái)上開發(fā)一個(gè)時(shí)間序列數(shù)據(jù)流多模式相似性并行搜索軟件包供人們?cè)诰€使用。
References)
[1] MATSUBARA Y, SAKURAI Y, FALOUTSOS C, et al.Fast mining and forecasting of complex time-stamped events [C]// Proceedings of the 18th ACM SIGKDD International Conference on Knowledge Discovery and Data Mining.New York: ACM, 2012: 271-279.
[2] WANG L, LEEDHAM G.Near and far infrared imaging for vein pattern biometrics [C]// Proceedings of the 2006 IEEE International Conference on Video and Signal Based Surveillance.Piscataway, NJ: IEEE, 2006: 52-52.
[3] 陳乾,胡谷雨.一種新的DTW最佳彎曲窗口學(xué)習(xí)方法[J].計(jì)算機(jī)科學(xué),2012,39(8):191-195.(CHEN Q, HU G Y.New leaning method for optimal warping window of DTW [J].Computer Science, 2012, 39(8): 191-195.)
[4] 莫倩蕓,鐘誠.機(jī)群系統(tǒng)上并行計(jì)算時(shí)間序列的動(dòng)態(tài)彎曲距離[J].微電子學(xué)與計(jì)算機(jī),2008,25(10):155-158.(MO Q Y, ZHONG C.Parallel computing dynamic warping distances for time sequences on the cluster computing systems [J].Microelectronics & Computer, 2008, 25(10): 155-158.)
[5] 張建平,李斌,劉學(xué)軍,等.基于Hadoop的不確定異常時(shí)間序列檢測(cè)[J].傳感技術(shù)學(xué)報(bào),2014,27(12):1659-1665.(ZHANG J P, LI B, LIU X J, et al.Uncertain abnormal time series detection based on Hadoop [J].Chinese Journal of Sensors and Actuators, 2014, 27(12): 1659-1665.)
[6] 沙劍.基于GPU的時(shí)間序列并行檢索算法研究[D].大連:大連理工大學(xué),2011:42-55.(SHA J.The research on parallel time series retrieval method based on GPU [D].Dalian: Dalian University of Technology, 2011: 42-55.)[7] 歐陽一村.基于DTW距離的兩步式時(shí)間序列相似搜索[D].廣州:中山大學(xué),2010:33-54.(OUYANG Y C.Two-step similarity search of time series based on DTW distance [D].Guangzhou: Sun Yat-sen University, 2010: 33-54.)
[8] SAKURAI Y, FALOUTSOS C, YAMAMURO M.Stream monitoring under the time warping distance [C]// Proceedings of the 23rd International Conference on Data Engineering.Piscataway, NJ: IEEE, 2007: 1046-1055.
[9] TOYODA M, SAKURAI Y, ISHIKAWA Y.Pattern discovery in data streams under the time warping distance [J].VLDB Journal, 2013, 22(3): 295-318.
[10] SRIRAMA S N, JAKOVITS P, VAINIKKO E.Adapting scientific computing problems to clouds using MapReduce [J].Future Generations Computer System, 2012, 28(1): 184-192.
[11] 鐘誠,陳國良.PRAM和LARPBS模型上的近似串匹配并行算法[J].軟件學(xué)報(bào),2004,15(2):159-169.(ZHONG C, CHEN G L.Parallel algorithms for approximate string matching on PRAM and LARPBS [J].Journal of Software, 2004, 15(2): 159-169.)
[12] 寒區(qū)旱區(qū)科學(xué)數(shù)據(jù)中心.中國雪深長(zhǎng)時(shí)間序列數(shù)據(jù)集(1978—2012) [EB/OL].[2014-09-28].http://westdc.westgis.ac.cn.(Cold and Arid Regions Science Data Center at Lanzhou.Snow depth long time series data set in China (1978—2012) [EB/OL].[2014-09-28].http://westdc.westgis.ac.cn.)
[13] CHE T, LI X, JIN R, et al.Snow depth derived from passive microwave remote-sensing data in China [J].Annals of Glaciology, 2008, 49(1): 145-154.
[14] DAI L, CHE T, WANG J, et al.Snow depth and snow water equivalent estimation from AMSR-E data based on a priori snow characteristics in Xinjiang, China [J].Remote Sensing of Environment, 2012, 127: 14-29.
This work is supported by the Natural Science Foundation of Guangxi (2014GXNSFAA118396).
FU Chen, born in 1988, M.S.Her research interests include network software engineering, high-performance computing for big data.
ZHONG Cheng, born in 1964, Ph.D., professor.His research interests include high-performance computing for big data, network software engineering.
YE Bo, born in 1974, Ph.D., senior engineer.His research interests include network software engineering, management information system.
Accelerating parallel searching similar multiple patterns from data streams by using MapReduce
FU Chen1, ZHONG Cheng1*, YE Bo2
(1.SchoolofComputer,ElectronicsandInformation,GuangxiUniversity,NanningGuangxi530004,China;2.GuangxiScientificandTechnologicalInformationCenter,NanningGuangxi530012,China)
The effective storage mode for time series was designed on Hadoop Distributed File System (HDFS), the sub-series were distributed to the compute nodes on Hadoop cluster by applying Distributed Cache tool, and the matrix of dynamic time warping distances was partitioned into several sub-matrixes.Based on MapReduce programming mode, by parallel computing sub-matrixes in each back-diagonal iteratively, the parallel computation of dynamic time warping distances was implemented, and an efficient parallel algorithm for searching similar patterns from data streams was developed by improving pruning redundant computation.The experimental results on the data set of snow depth long time series in China show that when the length of each time series is equal to or longer than 5 000, the required time of parallel computing dynamic time warping distances is less than that of the corresponding sequential computation, and when the length of each time series is equal to or longer than 9 000, the more the compute nodes used, the less the required parallel computation time; furthermore, when the length of each pattern is equal to or longer than 4 000 and the number of compute nodes is equal to or larger than 5, the required time of parallel searching similar sub-series from data streams is 20% of the corresponding sequential searching time.
time series; data stream; dynamic time warping distance; pattern searching; Hadoop
2016-08-25;
2016-09-03。 基金項(xiàng)目:廣西自然科學(xué)基金資助項(xiàng)目(2014GXNSFAA118396)。
付晨(1988—),女,江西瑞昌人,碩士,主要研究方向:網(wǎng)絡(luò)軟件工程、大數(shù)據(jù)高性能計(jì)算; 鐘誠(1964—),男,廣西桂平人,教授,博士,CCF高級(jí)會(huì)員,主要研究方向:并行計(jì)算、大數(shù)據(jù)高性能計(jì)算、網(wǎng)絡(luò)軟件工程; 葉波(1974—),男,廣西南寧人,教授級(jí)高級(jí)工程師,博士,主要研究方向:網(wǎng)絡(luò)軟件工程、管理信息系統(tǒng)。
1001-9081(2017)01-0037-05
10.11772/j.issn.1001-9081.2017.01.0037
TP338.6; TP301.6
A