武守曉,房 俊
(1.北方工業(yè)大學(xué) 大規(guī)模流數(shù)據(jù)集成與分析技術(shù)北京市重點實驗室 北京 100144;2.北方工業(yè)大學(xué) 數(shù)據(jù)工程研究院 北京 100144)
目前處理亂序流的主流方法是基于緩存的方法?;诰彺娴姆椒ㄊ褂瞄_辟的緩沖區(qū)等待遲到數(shù)據(jù),對緩沖區(qū)內(nèi)數(shù)據(jù)進行排序,以避免系統(tǒng)處理亂序數(shù)據(jù)。傳統(tǒng)的基于緩存的K-slack[1]方法和MP-K-slack[2]方法無法做到對緩沖區(qū)大小自適應(yīng),在時延變小的情況下會浪費系統(tǒng)資源。AQ-K-slack[3]雖然實現(xiàn)了緩沖區(qū)自適應(yīng),但無法應(yīng)用于top-k連續(xù)查詢這類復(fù)雜的聚合函數(shù)。在具體的top-k連續(xù)查詢算法中,SMA算法[4]利用k-skyband對象在有序流上快速進行top-k連續(xù)查詢,但需要維護大量的k-skyband對象,內(nèi)存耗費大,并且該方法只能在有序流上使用。MinTopk算法[5]維護了一個最小的top-k候選集,每次計算都從該候選集中得出結(jié)果,大大減少了計算量,但在亂序流上使用會有誤差。GSTopk算法[6]對MinTopk算法進行了一些改良,使其能在亂序流下立刻給出近似結(jié)果,可以在時效性要求較高的情況下使用。在上述算法的基礎(chǔ)上,本文提出一種面向高速亂序數(shù)據(jù)流的top-k連續(xù)查詢方法。首先使用基于緩存的亂序流處理技術(shù),舍棄緩存數(shù)據(jù)重排序步驟,緩存時長的確定使用緩存時長自適應(yīng)算法,在保證用戶允許的最小正確率的情況下計算出最小緩存時長,其次使用改造的MinTopk算法計算當(dāng)前窗口的top-k結(jié)果集。實驗結(jié)果表明,該方法能有效權(quán)衡查詢精度和查詢時延之間的關(guān)系,對窗口內(nèi)數(shù)據(jù)執(zhí)行快速且高效的查詢并得出結(jié)果,使亂序流下的top-k連續(xù)查詢收到了良好的效果。
在亂序數(shù)據(jù)流處理方面,研究工作按處理機制的不同大致分為基于緩存的方法[1-3, 7-8]、基于標(biāo)點的方法[9-12]、基于推測的方法和基于近似的方法?;诰彺娴姆椒ㄊ情_辟緩沖區(qū)來緩存亂序數(shù)據(jù)以等待遲到數(shù)據(jù),以一定延遲開銷換取結(jié)果質(zhì)量的提升。K-slack[1]是基于緩存的典型方法,其中參數(shù)K與緩沖區(qū)的大小密切相關(guān)。具體來說,K-slack技術(shù)維護緩沖區(qū)用來緩存到達(dá)的元組,緩沖區(qū)內(nèi)的數(shù)據(jù)最多等待K個時間單位,然后被提交至查詢處理模塊進行查詢。MP-K-slack方法[2]是基于流元組延遲的動態(tài)變化來不斷調(diào)整K值,如果延遲不斷增大,會使數(shù)據(jù)越積越多,導(dǎo)致查詢時延的上升和查詢吞吐量的下降。AQ-K-slack方法[3]以用戶給定的結(jié)果精度為目標(biāo),通過聚合函數(shù)與窗口覆蓋率的定值關(guān)系,動態(tài)調(diào)整K值大小。但由于top-k查詢這類聚合函數(shù)過于復(fù)雜,會造成AQ-K-slack方法難以實施。另外,基于緩存的方法大多會對緩存的數(shù)據(jù)進行排序,以保證計算時的有序,代價比較大?;跇?biāo)點的方法依賴于數(shù)據(jù)流內(nèi)被稱為標(biāo)點的特殊元組,表示沒有時間戳小于標(biāo)點的元組。當(dāng)收到一個標(biāo)點,查詢算子確定未來將沒有數(shù)據(jù)到達(dá),然后得到這些窗口的查詢結(jié)果[10],如心跳[12]、部分排序[11]是標(biāo)點的特殊類型。標(biāo)點顯式地通知查詢算子什么時候返回窗口的結(jié)果,因此查詢算子能夠直接消費無序輸入。然而,查詢結(jié)果的準(zhǔn)確性從根本上會受到標(biāo)點準(zhǔn)確性的限制[9]。假定標(biāo)點是由外部數(shù)據(jù)源提供,或者是由應(yīng)用程序語義和數(shù)據(jù)流亂序特征的先驗知識通過系統(tǒng)非常簡單地生成,但這個假設(shè)不一定在現(xiàn)實世界的場景中成立。基于推測的方法和基于近似的方法基本上采用了激進處理方法。激進處理方法與保守等待方法相反,它不管亂序問題是否存在,總是優(yōu)先快速地處理數(shù)據(jù)流,直到遲到元組出現(xiàn)以后再彌補錯誤。激進處理方法通常應(yīng)用于實時性要求較高且急需獲取處理結(jié)果的分析處理系統(tǒng)。但是這種方法的場景局限性很大,并且有可能得不到正確結(jié)果。
在top-k連續(xù)查詢具體算法方面,SMA算法[4]根據(jù)數(shù)據(jù)特征提出k-skyband對象的概念。該算法需要維護k-skyband對象之間的支配關(guān)系,總體代價較大,并且不具備過濾新增數(shù)據(jù)(即新流入窗口的數(shù)據(jù))的能力,不能處理亂序數(shù)據(jù)流。MinTopk算法[5]維護了一個最小top-k候選集,對于流入窗口的新元組,高效地過濾掉不可能成為top-k結(jié)果的元組,將可能成為top-k結(jié)果的元組插入候選集,每次只要查找候選集即可找到top-k結(jié)果。但是該算法只能處理順序流,在亂序流中會導(dǎo)致查詢錯誤。GSTopk算法[6]改造了MinTopk算法,使其能夠快速地處理亂序數(shù)據(jù)流,但是該算法得出的僅僅是當(dāng)前窗口內(nèi)的top-k結(jié)果,沒有對當(dāng)前窗口的遲到數(shù)據(jù)進行處理,導(dǎo)致其計算結(jié)果往往不夠準(zhǔn)確。由于該算法的高效性,在正確率要求不高而實時性要求特別高的情況下可以使用。基于以上研究,本文使用基于緩存的亂序處理方法等待遲到元組,但不對緩沖區(qū)內(nèi)數(shù)據(jù)進行排序,配合使用改造的MinTopk算法,保證top-k連續(xù)查詢正確率在用戶可接受范圍內(nèi),減少了查詢時延。
圖1為面向亂序流的top-k連續(xù)查詢算法流程。為了解決亂序數(shù)據(jù)流中top-k連續(xù)查詢結(jié)果不準(zhǔn)確的問題,使用基于緩存的亂序流處理方法,該方法的難點在于緩存時長的確定?;诰彺娴姆椒ú豢赡軣o限等待遲到元組,不能保證查詢的絕對正確性。使用緩存時長自適應(yīng)算法對top-k查詢進行正確率和緩存時長的統(tǒng)計,在保證用戶允許的最小正確率的情況下,周期性地計算出所需要的最小緩存時長。接下來通過具體的top-k查詢方法,計算出當(dāng)前窗口的top-k結(jié)果。為了方便計算,靈活地實施緩存時長自適應(yīng)算法,使用元組的Event Time[13]劃分窗口,也就是使用元組自身的時間戳作為滑動窗口的劃分依據(jù)。
圖1 面向亂序流的top-k連續(xù)查詢算法流程
圖2為基于緩存的亂序流處理方法。當(dāng)前滑動窗口為W0,W0在tend時刻閉合,閉合后等待K個時間單位,即在tlate時刻計算并輸出W0的top-k結(jié)果。在這K個時間單位中,對于到來的每一個元組,若其屬于當(dāng)前滑動窗口W0,該元組就會被發(fā)送到W0處理;若其屬于W0前面或后面的窗口,則進行相應(yīng)的處理。
圖2 基于緩存的亂序流處理方法
基于緩存的亂序流處理方法,其難點在于緩存時長K的確定。緩存時間越長,時延越高,正確率也就越高。網(wǎng)絡(luò)延遲的制約因素有很多,不可能準(zhǔn)確地計算出最晚元組到達(dá)的時間。另外,在高速亂序數(shù)據(jù)流下,數(shù)據(jù)流量巨大,緩存時間越長,對緩沖區(qū)和系統(tǒng)吞吐量造成的壓力越大。因此,在保證用戶允許的最小正確率的情況下選擇一個恰當(dāng)?shù)木彺鏁r長K,可以有效地緩解系統(tǒng)壓力,減少查詢時延。
計算單次的top-k結(jié)果的正確性是沒有意義的,但統(tǒng)計多次的top-k結(jié)果的正確率足以證明某種方法的有效性。所以,通過統(tǒng)計不同緩存時長下top-k查詢結(jié)果的正確率,以質(zhì)量驅(qū)動的方式[3]選出最小緩存時長,即在保證用戶允許的最小正確率的情況下計算出最小緩存時長。具體步驟如下。
1)參數(shù)初始化。系統(tǒng)指定一個初始緩存時長K,即窗口的緩存時長到達(dá)K時輸出查詢結(jié)果。初始化用于計算恰當(dāng)緩存時長的區(qū)間,用(Kdown,Kup]表示。在初始情況下,(Kdown,Kup]將被初始化為(0,K]。另外,需要用戶指定能承受的最小正確率εmin。
1.解戒人員社區(qū)康復(fù)時間長短與操守率之間的關(guān)系。為分析社區(qū)康復(fù)是否對保持操守率存在積極影響,筆者以廣州市某強制隔離戒毒所2017年7月至2018年3月期間解戒的221名解戒人員為樣本,協(xié)同禁毒社工赴解戒人員所在戶籍街道,通過現(xiàn)場訪談、電話訪問和尿樣檢測等方式,于2018年5月及2018年9月分兩次,對同一批221名解戒人員進行跟蹤調(diào)查,來了解戒斷鞏固率情況。
2)統(tǒng)計計算。將(Kdown,Kup]平均劃分得到m個緩存時長,即{Kdown+(Kup-Kdown)/m,Kdown+2*(Kup-Kdown)/m, …,Kdown+(m-1)*(Kup-Kdown)/m,Kup}。對于每次top-k查詢,記錄下這m個不同緩存時長得到的top-k結(jié)果集,同時,后臺等待所有的遲到元組計算出此次查詢正確的top-k結(jié)果集。對于每一個緩存時長對應(yīng)的top-k結(jié)果集,將其與正確的結(jié)果集進行比較,計算該top-k結(jié)果集的命中率,即top-k結(jié)果集與正確結(jié)果集一致的項數(shù)與總項數(shù)的比值。經(jīng)過n次top-k查詢?nèi)∑骄?,就能計算出不同緩存時長的查詢準(zhǔn)確率。
3)求最小緩存時長。根據(jù)用戶給定的所能承受的最小正確率εmin,即可定位出可以達(dá)到該正確率的最小緩存時長所在的區(qū)間(Kdown,Kup],那么最小緩存時長Kmin改為Kup。若此時符合要求的緩存時長不在(Kdown,Kup]內(nèi),則區(qū)間相應(yīng)前移或者后移(Kup-Kdown)/m個單位。為了避免區(qū)間太小,收斂速度太慢,(Kup-Kdown)/m不能太小。重復(fù)上一個步驟,統(tǒng)計出(Kdown,Kup]中不同緩存時長對應(yīng)的正確率。
表1為根據(jù)不同緩存時長統(tǒng)計的top-3查詢結(jié)果示例。當(dāng)前緩存時長區(qū)間為(0 s, 4.5 s],Kmin=4.5 s,m=3,n=20,εmin=0.8,則平均劃分為1.5 s、3 s、4.5 s三個緩存時長。在每次top-3查詢中,記錄下這三個緩存時長對應(yīng)的top-3結(jié)果,最后和此次查詢正確的top-3結(jié)果集進行比較,得到這三個緩存時長對應(yīng)結(jié)果的命中率。這個過程重復(fù)20次,每一個緩存時長會得到一個查詢正確率。其中,緩存時長為1.5 s的正確率為72%,緩存時長為3 s的正確率為83%,緩存時長為4.5 s的正確率為94%。由于用戶允許的最小正確率εmin=0.8,所以下一次用于計算最小緩存時長的區(qū)間改為(1.5 s, 3 s],最小緩存時長Kmin改為3 s,重復(fù)進行以上操作。
表1 不同緩存時長的統(tǒng)計結(jié)果示例
Top-k連續(xù)查詢依托于滑動窗口模型,給定滑動窗口W和top-k查詢q,每當(dāng)窗口滑動后,q返回W中分值最高的k個元組。由于算法需要實時處理大量數(shù)據(jù),且每次窗口滑動前后有大量重疊數(shù)據(jù),計算這些重復(fù)數(shù)據(jù)耗時費力。因此,本文借鑒MinTopk算法的思想,利用滑動窗口的特性過濾掉大量對結(jié)果無貢獻(xiàn)的元組,維護一個top-k結(jié)果候選集C。當(dāng)窗口滑動后,更新候選集C,只需要訪問候選集C便可得出top-k結(jié)果集,這樣既大大削減了數(shù)據(jù)規(guī)模[14],又保證了查詢結(jié)果的準(zhǔn)確性。
圖3展示了相鄰滑動窗口的數(shù)據(jù)歸屬,其中Wi表示某編號窗口,si表示由滑動步長劃分的某批數(shù)據(jù)。每次窗口滑動后,最早的一批數(shù)據(jù)被釋放,最新的一批數(shù)據(jù)流入窗口。新來的元組有可能一直成為top-k結(jié)果,直到它被窗口釋放。如s3中的某元組可能成為W3或W2、W1、W0的top-k結(jié)果??梢钥闯?,W0包含所有批次數(shù)據(jù),W1包括批次s1、批次s2、批次s3的數(shù)據(jù),W2包括批次s2、批次s3的數(shù)據(jù),W3僅包括批次s3的數(shù)據(jù)。對于W0中的數(shù)據(jù),為了避免重復(fù)計算,首先計算出W3(s3)的top-k結(jié)果集,然后計算出W2(s2和s3)的top-k結(jié)果集,再計算出W1(s1、s2和s3)的top-k結(jié)果集,最后計算出W0的top-k結(jié)果集。如此計算則可以充分利用上一次的計算結(jié)果,避免重復(fù)計算。
圖3 相鄰滑動窗口的數(shù)據(jù)歸屬
圖4為不同窗口的示例數(shù)據(jù),圖5為候選集C和候選集D。如圖4(a)所示,當(dāng)前窗口為W0,每個元組的標(biāo)簽表示元組的到達(dá)順序。如圖5(a)所示,僅對于W0窗口中的元組,計算出窗口W0、W1、W2、W3的top-3結(jié)果集,使用一個有序列表來維護這些元組。如圖5(b)所示, 按元組分值從大到小排列,元組右側(cè)表示該元組會對哪些窗口做出貢獻(xiàn),這個有序列表就是候選集C。由于一個元組作出貢獻(xiàn)的窗口集合是連續(xù)的,只維護起始貢獻(xiàn)窗口id和結(jié)束貢獻(xiàn)窗口id即可。同時,為了快速過濾掉不作貢獻(xiàn)的元組,還需要維護各個窗口的最小元組指針。由于候選集列表C是有序的,所以當(dāng)前窗口的top-k結(jié)果集為候選集C前k個元組的集合。
圖4 不同窗口的示例數(shù)據(jù)
圖5 候選集C和候選集D
由于本文算法的復(fù)雜性,為了避免在極端情況下對維護的候選集列表C進行頻繁插入,需要對原MinTopk算法進行改造后使用。當(dāng)窗口滑動后,對應(yīng)圖2中[tstart,tlate]時刻,對于其中的每一個元組有可能屬于前面窗口,或?qū)儆诋?dāng)前窗口,或?qū)儆谙乱粋€窗口。并且由于需要維護不同緩存時長的top-k結(jié)果,所以不僅要維護前面窗口不同緩存時長的top-k結(jié)果,還需要維護當(dāng)前窗口的候選集C和下一批次數(shù)據(jù)的top-k結(jié)果D。下面給出tlate時刻執(zhí)行計算的具體流程。
1)獲取當(dāng)前top-k結(jié)果。此時,當(dāng)前窗口候選集列表C中的前k個元組為當(dāng)前窗口的top-k結(jié)果。創(chuàng)建一個空的候選集列表,將當(dāng)前窗口的top-k結(jié)果復(fù)制到該列表,用以后臺繼續(xù)記錄不同緩存時長的top-k結(jié)果。
2)刪除過期元組。把當(dāng)前窗口候選集列表C中最早一批元組(即候選集列表前k個元組)的起始貢獻(xiàn)窗口id加1。當(dāng)起始貢獻(xiàn)窗口id大于結(jié)束貢獻(xiàn)窗口id時,該元組被淘汰,從列表中刪除。
3)合并候選集列表D到候選集列表C。通過指針?biāo)饕?,對于候選集列表D中的每一個元組,按從小到大順序和各個窗口的元組最小分值進行比較,快速計算出起始貢獻(xiàn)窗口id和結(jié)束貢獻(xiàn)窗口id。若其對某個窗口有貢獻(xiàn),將其插入到候選集C中,使C保持有序,同時刪除對候選集C不作貢獻(xiàn)的元組。
對于當(dāng)前窗口之前的窗口,針對不同的緩存時長,記錄每個窗口的top-k結(jié)果,對于晚到的元組持續(xù)進行處理,直到該窗口沒有元組到達(dá)。如圖4(b)所示,窗口滑動后,當(dāng)前窗口由W0變?yōu)閃1,新流入了元組21~25,元組1~5被釋放。圖5(b)展示了候選集C和候選集D的合并過程。對于元組25,依次和元組17、元組15、元組10進行比較,得出元組25的起始貢獻(xiàn)窗口為W1,結(jié)束貢獻(xiàn)窗口為W4,并將其有序地插入候選集C中。
實驗環(huán)境使用CPU為3.2 GHz,內(nèi)存為16 GB的ubuntu18.04電腦。緩存時長自適應(yīng)算法的實驗參數(shù)如下:緩存時長K初始為2 s,緩存時長劃分份數(shù)m為10,迭代次數(shù)n為30,允許的最小正確率εmin為0.95;top-k查詢的實驗參數(shù)如下:偏好函數(shù)為求元組最大值,k值為5,滑動窗口總大小為60 s,滑動窗口的滑動步長為5 s。
由于網(wǎng)絡(luò)延遲通常遵循指數(shù)分布等長拖尾型概率分布[15],故使用指數(shù)分布生成亂序數(shù)據(jù)。為了營造高速亂序流的環(huán)境,盡量增大窗口中的元組數(shù)目。另外,生成的數(shù)據(jù)應(yīng)包含時間戳字段、值字段。通過指數(shù)分布生成了充足的亂序數(shù)據(jù),選擇SMA算法、MinTopk算法、GSTopk算法作為本實驗的對比算法,使用相同的數(shù)據(jù)進行top-k連續(xù)查詢,記錄下運行參數(shù),得出top-k結(jié)果正確率與算法運行時間的關(guān)系以及top-k結(jié)果查詢時延與算法運行時間的關(guān)系。
為了測試算法對亂序程度不同的數(shù)據(jù)的有效性,構(gòu)造一個定量的數(shù)據(jù)集,并將其打亂為三種不同亂序程度的數(shù)據(jù)集。本文算法和對比算法分別使用三種不同亂序程度的數(shù)據(jù)集作為輸入進行top-k連續(xù)查詢,并記錄下運行參數(shù),得出top-k結(jié)果正確率與數(shù)據(jù)亂序程度的關(guān)系。另外,為了避免偶然性所帶來的實驗誤差,以上所述實驗均在參數(shù)及數(shù)據(jù)不變的情況下進行多次,并取平均值作為最終實驗結(jié)果。
不同算法的實驗結(jié)果對比如圖6所示。圖6(a)展示了隨著運行時間的增加,不同算法的正確率變化情況??梢钥闯?,本文算法比其他算法的正確率高很多,顯示了本文算法處理亂序流的優(yōu)越性。這是由于本文算法使用基于緩存的方法等待遲到元組,使邊界元組被包含在正確的滑動窗口內(nèi),提高了正確率。本文算法的正確率隨著運行時間一直在變化,這是由于本文算法可以根據(jù)數(shù)據(jù)流的亂序程度自適應(yīng)緩存時長。若當(dāng)前緩存時長的正確率大于允許的最小正確率,則應(yīng)減小緩存時長;否則進行相反的操作。圖6(b)展示了隨著運行時間的增加,不同算法的查詢時延變化情況。查詢時延表示當(dāng)滑動窗口閉合后到計算得出該滑動窗口的top-k結(jié)果集所需要的時間??梢钥闯觯疚乃惴ǖ牟樵儠r延比其他三種算法的查詢時延要高。這是由于本文算法使用了基于緩存的亂序流處理方法,等待屬于當(dāng)前滑動窗口的遲到元組,以時延換取了正確率的上升。隨著運行時間的改變,查詢時延還會不斷變化。這是由于本文算法可以自適應(yīng)改變緩存時長,使得時延增大,正確率提高。當(dāng)面臨實時性要求特別高而正確率要求不太高的情況,應(yīng)盡量避免使用本文算法。GSTopk算法可以快速處理亂序數(shù)據(jù)流,只是結(jié)果不是那么精確。由此可見,本文算法的一個缺點是查詢時延較高。圖6(c)展示了數(shù)據(jù)集輕度、中度和重度亂序時,不同算法的正確率變化情況。在亂序數(shù)據(jù)集上,隨著亂序程度的增加,SMA算法和MinTopk算法的正確率偏低,這是由于這兩種算法沒有針對亂序數(shù)據(jù)流做處理,屬于當(dāng)前滑動窗口的遲到數(shù)據(jù)被丟棄或者是延遲到下一個窗口執(zhí)行,導(dǎo)致查詢結(jié)果不準(zhǔn)確。GSTopk算法可以處理亂序流,但正確率也較低,而本文算法的正確率較高,證明本文算法適合處理亂序數(shù)據(jù)流下的top-k連續(xù)查詢問題。
圖6 不同算法的實驗結(jié)果對比
本文研究了高速亂序流環(huán)境下的top-k連續(xù)查詢問題,盡管已有一些相關(guān)方法研究了此類問題,但是查詢結(jié)果誤差較大。本文通過已有的亂序流處理方法和滑動窗口的數(shù)據(jù)特征,首先使用基于緩存的方法等待遲到元組,但不對緩沖區(qū)排序,并運用統(tǒng)計的思想實現(xiàn)了緩存時長自適應(yīng)。然后使用改造的MinTopk算法,在保證用戶允許的最小正確率的情況下計算出最小緩存時長,減少了查詢時延。后續(xù)工作將優(yōu)化緩存時長自適應(yīng)算法,減小算法資源消耗,進一步加快算法的計算速度。