国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

可擴(kuò)展的流數(shù)據(jù)Join處理框架

2018-05-03 06:07賽影輝
關(guān)鍵詞:處理單元組塊負(fù)載量

賽影輝 黃 浩

1(奇瑞汽車股份有限公司 安徽 蕪湖 241006) 2(武漢大學(xué)計(jì)算機(jī)學(xué)院 湖北 武漢 430072)

0 引 言

在傳感網(wǎng)絡(luò)中,流數(shù)據(jù)的產(chǎn)生、存儲和實(shí)時分析等操作愈見頻繁。為了實(shí)時地從繁冗復(fù)雜的數(shù)據(jù)中提取有用的信息,有關(guān)流數(shù)據(jù)的查詢操作非常普遍。對于不同的查詢操作,我們用于應(yīng)答的數(shù)據(jù)處理主要有三種,即選擇(selection)、投影(projection)和連接(join)操作。相對于作為一元操作的選擇和投影操作,join操作的處理就復(fù)雜得多。由于兩個輸入的流數(shù)據(jù)之間可能是相關(guān)聯(lián)的,join操作的結(jié)果并不一目了然[6]。此外,對于同一對輸入流數(shù)據(jù)的多個join操作同時進(jìn)行的情況十分常見,join操作和查詢處理也會同時出現(xiàn)。為能在可接受的時間內(nèi)應(yīng)答相應(yīng)的查詢請求,流數(shù)據(jù)join處理的框架要能夠高效地完成join操作。

例如,圖1中Query 1和Query 2是有關(guān)環(huán)境污染在線分析的兩條典型的查詢語句,所得到的查詢結(jié)果蘊(yùn)含有關(guān)當(dāng)前空氣質(zhì)量指數(shù)AQI(Air Quality Index)的重要信息。其中,兩個流數(shù)據(jù)C和H分別代表目前和歷史(一個月前)的空氣質(zhì)量指數(shù),兩個查詢語句對這些流數(shù)據(jù)進(jìn)行join操作。由于每個查詢對結(jié)果有時效性的要求,即30分鐘之內(nèi)的數(shù)據(jù)結(jié)果,所以兩個join任務(wù):C??|C.PM2_5-H.PM2.5|>tH和C??|C.PM10-H.PM10|>tH不僅需要并行地執(zhí)行數(shù)據(jù)流C和H上的join操作,還要在30分鐘內(nèi)(保證數(shù)據(jù)的有效性)完成每一個join操作。在實(shí)際操作中,并行join操作任務(wù)的個數(shù)、有效時間(join window)的長度、流數(shù)據(jù)的輸入速率在不同的應(yīng)用中都截然不同,所以在設(shè)計(jì)可擴(kuò)展的流數(shù)據(jù)join處理框架時,以上幾個因素都需要考慮在內(nèi)。

圖1 AQI中的流數(shù)據(jù)查詢

一個可擴(kuò)展的流數(shù)據(jù)join處理框架需要具備:

(1) 通用性:一個好的join處理框架應(yīng)該是應(yīng)用無關(guān)的,即不是為某個特定的應(yīng)用而設(shè)計(jì)的,要能夠盡可能多的支持不同種類的join操作。

(2) 并發(fā)處理:對于一對輸入的流數(shù)據(jù),通常會有多個查詢語句同時對其進(jìn)行操作,這樣就會出現(xiàn)許多并發(fā)的join操作任務(wù)。所以,join處理框架應(yīng)能夠高效地安排并處理好這些并發(fā)的join操作任務(wù)。

(3) 高負(fù)載量:流數(shù)據(jù)的高輸入率和join操作冗長的有效時間增加了join操作的工作量,挑戰(zhàn)著流數(shù)據(jù)join處理的效率和能力。因此,join處理框架應(yīng)能容納并處理很大的工作量并避免工作量失衡。此外,對于操作時效的控制和維持應(yīng)避免成為效率瓶頸。

雖然在獨(dú)立機(jī)器上的流數(shù)據(jù)join處理已經(jīng)有較完善的方法[4,7,10,16,18],但大多數(shù)現(xiàn)有方法[1,9,13,17,20]無法擴(kuò)展到分布式的環(huán)境中?,F(xiàn)有的分布式j(luò)oin處理的研究成果在結(jié)果完整性或通信開銷上有一定的限制,且通常是為特定的應(yīng)用而設(shè)計(jì)或優(yōu)化。為能處理同一輸入流數(shù)據(jù)對上的并發(fā)join操作,可直接將輸入的流數(shù)據(jù)復(fù)制成多份并分配給每一個join操作任務(wù)。但這樣的復(fù)制操作會顯著地增加通信開銷,并不可行。

本文中我們提出一種可擴(kuò)展的流數(shù)據(jù)join處理框架S2J(Scalable Stream Join),能夠高效地完成單輸入流數(shù)據(jù)對上大量并發(fā)的join操作。S2J自動地分配適當(dāng)個數(shù)的串聯(lián)的處理單元(join worker)來分擔(dān)join處理的工作量,并用資源共享的方式將并行的多個join操作任務(wù)分配給不同的處理單元,不必復(fù)制流數(shù)據(jù),節(jié)省了通信開銷。為處理join處理單元間信息交換,S2J采用了基于元組塊的信息傳遞協(xié)議MP-2PF,以保證對于每一個目標(biāo)元組對的join操作只執(zhí)行一次。元組塊的大小適當(dāng),則S2J能夠縮小帶寬,并有效地發(fā)掘每一個join處理單元的處理能力。此外,S2J還應(yīng)用了可以動態(tài)改變以適應(yīng)多種不同輸入源及其不同輸入速率的輸入適配器和減輕負(fù)載的單元。

本文的主要貢獻(xiàn)如下:

提出了能夠動態(tài)地適應(yīng)變化的工作量的,包括巨大的工作量,可擴(kuò)展的流數(shù)據(jù)join處理框架S2J;提供了一個基于元組塊的信息傳輸協(xié)議MP-2PF和元組塊大小的選擇方法,使得S2J框架能夠保證join處理結(jié)果的完整性,減少通信開銷,避免工作量安排失衡,同時減小輸入流速率的波動帶來的影響。

1 相關(guān)工作

目前流數(shù)據(jù)join處理研究大致可分為兩種:(1) 單機(jī)上的流數(shù)據(jù)join處理;(2) 分布式的流數(shù)據(jù)join處理。

1.1 單機(jī)上的流數(shù)據(jù)join處理

1.1.1 集中式流數(shù)據(jù)join

早期流數(shù)據(jù)join方法需要集中維護(hù)join狀態(tài)(如中間結(jié)果),并采用基于哈?;蚧谂判虻膉oin方法。

基于哈希的join方法[18]:管道哈希join是最經(jīng)典的流數(shù)據(jù)join方法之一,它利用并行內(nèi)存加速join處理。然而,為了保持join狀態(tài),此方法要求內(nèi)存容量充足。為解決此問題,出現(xiàn)了雙管道哈希join[7],XJoin[16],以及將部分哈希表刷新到磁盤以便后續(xù)處理的哈希歸并join[10]。為使輸出率最大化,一些方法采用統(tǒng)計(jì)刷新策略[3,5,15],即只有更可能參與到j(luò)oin操作中的數(shù)據(jù)元組才能保存到內(nèi)存中。

基于排序的join方法:哈希join方法適用于等值join,而基于排序的join方法適用于非等值join,但往往在join處理前需要完整的輸入。為此,逐步合并join[4]將內(nèi)存分為兩部分,每部分各保存一個流數(shù)據(jù),當(dāng)內(nèi)存空間已滿時進(jìn)行join處理。不過,以上操作在輸出結(jié)果時可能帶來嚴(yán)重的延遲。

1.1.2 基于多核的流數(shù)據(jù)join

多核技術(shù)支持了單機(jī)并行流數(shù)據(jù)join[5,8,12,14]。如Gedik等[5]提出利用多核單元處理器提高流數(shù)據(jù)join效率,但效率依賴于硬件并行性,而商用硬件通常不能很好地支持這種并行性。此外,多核和共享內(nèi)存相結(jié)合的方法也可以進(jìn)一步提高join處理能力和效率。例如,HandShake join[14]中,流數(shù)據(jù)中的每個元組與其他流數(shù)據(jù)中的元組分別進(jìn)行join操作。但這些基于多核的方法難以擴(kuò)展到分布式環(huán)境中從而實(shí)現(xiàn)并行化。

1.2 分布式的流數(shù)據(jù)join處理

為得到高質(zhì)量和可擴(kuò)展的join處理,很多研究著重于分布式的流數(shù)據(jù)join處理,但是現(xiàn)有方法大多用于特定環(huán)境,有些方法為了高效率而犧牲結(jié)果完整性。

Photon[1]是Google公司提出的一種容錯的分布式流數(shù)據(jù)join系統(tǒng),被特定地用于處理網(wǎng)頁搜索查詢和用戶對廣告點(diǎn)擊的數(shù)據(jù)join操作,缺少通用性。

D-stream[20]將連續(xù)的流數(shù)據(jù)分為離散的單元,在Spark中對其進(jìn)行批處理[19]。然而,由于一些目標(biāo)元組對在批處理時被分至不同批次,無法進(jìn)行相應(yīng)join操作,無法保證結(jié)果的完整性。很多基于MapReduce的流數(shù)據(jù)join處理方法[2,9]也面臨同樣的問題。

TimeStream[13]利用元組依賴關(guān)系執(zhí)行流數(shù)據(jù)join操作。但依賴關(guān)系的維護(hù)會導(dǎo)致通信開銷,可能成為其性能瓶頸。多join謂詞可能使此解決方法更加復(fù)雜。

PSP[17]通過狀態(tài)的時間切片將整體的join處理轉(zhuǎn)換成一系列相互聯(lián)系的較小的子運(yùn)算過程,并分散為環(huán)形結(jié)構(gòu)。但它需要同步分布式的join狀態(tài),通信開銷可能會很高,一般達(dá)到子運(yùn)算符數(shù)量的指數(shù)級。

storm是一個開源的分布式實(shí)時計(jì)算系統(tǒng),同樣,storm具有無法保證join結(jié)果完整性的缺點(diǎn),應(yīng)進(jìn)行join操作的一組數(shù)據(jù)可能被包含在不同的數(shù)據(jù)元組中。

Flink是一個針對流數(shù)據(jù)的分布式處理引擎,將所有任務(wù)當(dāng)作流來處理是其最大的特點(diǎn)。然而在執(zhí)行Flink過程中,若對其計(jì)算資源的分配和定義進(jìn)行干預(yù),則有可能會出現(xiàn)資源不公平利用的現(xiàn)象。

2 S2J處理框架

為了使流數(shù)據(jù)join處理框架能動態(tài)適應(yīng)變化的工作量,減少通信帶寬同時又能保證join處理結(jié)果的完整性,本文提出S2J框架,且具有以下三個特點(diǎn):(1) 對變化的工作負(fù)載有良好的適應(yīng)性;(2) 保證join處理結(jié)果完整性同時減少帶寬;(3) 信息處理的高效性。

為實(shí)現(xiàn)動態(tài)適應(yīng)變化的工作負(fù)載這一特點(diǎn),S2J框架采用基于處理單元平均利用率的分段式解決方法,并使用自適應(yīng)分級卸載機(jī)制,動態(tài)管理處理單元。

為保證join處理結(jié)果的完整性,同時縮小通信帶寬,減少信息過載現(xiàn)象,S2J框架采用MP-2PF傳輸協(xié)議和獨(dú)特的元組塊大小選擇方法。

為了實(shí)現(xiàn)信息處理的高效性,S2J框架使用了特殊的優(yōu)化策略以提高join處理效率。

2.1 系統(tǒng)結(jié)構(gòu)

圖2即S2J框架的系統(tǒng)結(jié)構(gòu),由可擴(kuò)展的join引擎、輸入適配器、負(fù)載均衡單元、物化單元、查詢代理和查詢處理器組成。

2.1.1 可擴(kuò)展的join引擎

為使計(jì)算可擴(kuò)展性最大化,S2J框架的join引擎采用以worker為基本處理單元的流數(shù)據(jù)處理模型,部署在分布式平臺的節(jié)點(diǎn)上,一個節(jié)點(diǎn)可以負(fù)載一個或多個處理單元。所有處理單元通過流數(shù)據(jù)通道以級聯(lián)的方式連接,系統(tǒng)運(yùn)行時可根據(jù)需要增加處理單元。

S2J框架采用數(shù)據(jù)流為導(dǎo)向的處理方式,各進(jìn)入join引擎的元組有一個生命周期。如圖2中,數(shù)據(jù)源R的一個元組r到達(dá)join引擎的左側(cè)終端時,其生命周期開始,生命周期與join操作的有效時間(join window)長度一致。以1 min為例,元組r沿各處理單元向右移動,每遇到一個向左移動的數(shù)據(jù)源S的目標(biāo)元組就與其進(jìn)行join操作,1 min后,元組r從引擎的右側(cè)終端流出,生命周期終止。元組r進(jìn)入引擎的同時,上一周期數(shù)據(jù)源S的所有元組已在引擎中,下一周期的元組在元組r終止之前移至引擎中。與元組r有關(guān)的所有join操作能在限定時間內(nèi)完成,保證引擎的時效性。

圖2 S2J框架結(jié)構(gòu)

為使全局負(fù)載平衡,join引擎中信息傳遞規(guī)則如下:當(dāng)處理單元的負(fù)載超過其后繼單元,且超過部分大于一定閾值,該處理單元將一定的負(fù)載傳遞給后繼單元。這樣,全局的負(fù)載能平均分配到每個處理單元。

同時,為能并發(fā)處理在同一對流數(shù)據(jù)上的多個join任務(wù),join引擎將一組獨(dú)立的worker實(shí)例鏈安排給每個join任務(wù)。當(dāng)新的join任務(wù)到達(dá)流數(shù)據(jù)對時,新的worker實(shí)例自動初始化。同一個處理單元中的所有實(shí)例共享流數(shù)據(jù)通道和流數(shù)據(jù),節(jié)省通信開銷。

2.1.2 輸入適配器和負(fù)載均衡

為使框架獨(dú)立于不同的輸入流數(shù)據(jù),S2J采用輸入適配器將輸入的源數(shù)據(jù)轉(zhuǎn)換為標(biāo)準(zhǔn)的流數(shù)據(jù),并根據(jù)查詢語句預(yù)先對數(shù)據(jù)進(jìn)行選擇和投影操作。此外,負(fù)載均衡單元用于解決流數(shù)據(jù)輸入速率暴增的問題。

2.1.3 物化單元

Join引擎最新的輸出存儲于內(nèi)存緩沖區(qū),而之前的輸出記錄以快照的方式快速備份存儲于數(shù)據(jù)庫中。每個快照擁有共同標(biāo)識符(如執(zhí)行時間),并根據(jù)這個標(biāo)識在記錄上建索引。此外,S2J框架支持快照的定期執(zhí)行或根據(jù)輸入流中的標(biāo)點(diǎn)符號執(zhí)行。

2.1.4 查詢代理和查詢處理

S2J框架支持連續(xù)查詢(要求輸出最新的結(jié)果)和一次查詢(只要求輸出一段時間內(nèi)的結(jié)果)。通過查詢處理上Client/Server模式的應(yīng)用,可同時處理多個查詢。在Client端,查詢代理將查詢請求轉(zhuǎn)化為流數(shù)據(jù),將查詢應(yīng)答流數(shù)據(jù)轉(zhuǎn)換為Client版本。在Server端,查詢處理單元從流數(shù)據(jù)中解析出查詢請求,作出應(yīng)答,通過查詢應(yīng)答流數(shù)據(jù)將查詢結(jié)果返回至相應(yīng)的Client端。

2.2 動態(tài)工作負(fù)載的適應(yīng)

join任務(wù)的工作負(fù)載大小取決于join有效時間的長度和輸入流數(shù)據(jù)對的輸入速率,以上兩者隨應(yīng)用程序不斷變化,使不同應(yīng)用程序有不同的工作負(fù)載。在特定的應(yīng)用程序中,當(dāng)join有效時間固定,數(shù)據(jù)流輸入速率的波動亦會引起join過程中的動態(tài)工作負(fù)載。因此,適應(yīng)動態(tài)的工作負(fù)載至關(guān)重要。

圖3闡明這種分段式方法,其中τ*是τ的初始值,所有的閾值滿足以下關(guān)系:0<τ0<τ*<τ1≤τ2≤1。

圖3 動態(tài)適應(yīng)工作負(fù)載

基于以上閾值,我們將討論不同應(yīng)用中計(jì)算資源(即S2J的join處理單元)的初始配置,介紹執(zhí)行過程中S2J動態(tài)適應(yīng)不同工作負(fù)載量的方法,即自適應(yīng)地將工作負(fù)載分級卸載以及在線調(diào)整join處理單元的個數(shù)。

2.2.1 初始化

在join任務(wù)初始化之前,需指定m個join處理單元用于S2J框架的初始部署。由于部署少量的處理單元能減少計(jì)算資源的浪費(fèi),而部署更多的處理單元能增加整體的處理能力,承擔(dān)更大的工作負(fù)載的波動,因此m值的選取尤為重要。

由于τ*<τ1,我們設(shè)定τ*=β·τ1(0<β<1)。如果τ*→1·τ1,雖然所有的處理單元都將被有效的利用,但輸入流速率短暫的增加也會觸發(fā)假陽性分級卸載。而如果τ*→0·τ1,大量處理單元利用率降低,引起額外的通信開銷。綜上,我們在實(shí)驗(yàn)中測試τ*·0.5·τ1,這樣,在保證每一個處理單元達(dá)到合理的利用率的同時,還能夠承受相對較大的輸入流波動。

2.2.2 自適應(yīng)分級卸載

在join任務(wù)的執(zhí)行過程中,短暫的工作量增加造成輸入流波動的情況經(jīng)常發(fā)生。為此,S2J框架采用自適應(yīng)分級卸載,當(dāng)處理單元狀態(tài)(或即將)飽和時,卸載一定比例的元組。我們將這個比率定義為卸載率SR,具體如公式所示:

式中:B∈[0,1]是基本卸載率。

以上卸載率運(yùn)用線性的卸載模型,τ超過閾值τ1時,卸載率與負(fù)載的超出部分成正比。實(shí)際操作時,可運(yùn)用其他卸載模型確定卸載率,如二次卸載模型。圖4闡示了這兩種卸載模型中卸載比率的變化趨勢。

圖4 分級卸載模型示例

2.2.3 join處理單元管理

為處理執(zhí)行過程中流輸入速率的持續(xù)增加和減少,S2J框架對join處理單元運(yùn)用動態(tài)的管理機(jī)制。

若τ>τ2且持續(xù)一定時間,S2J通過分配額外的處理單元以增加其處理能力。由于配置額外的處理單元,平均利用率τ下降,S2J將增加新的處理單元直到τ的值減少至接近τ*(見2.2.1)或不存在多余節(jié)點(diǎn)。

若τ>τ0持續(xù)相對長的時間,為解決欠載問題,S2J釋放多余的處理單元以節(jié)省計(jì)算資源。根據(jù)τ值,S2J框架從一端釋放處理單元,使剩余處理單元的平均利用率增加至τ*左右。被釋放的處理單元被S2J框架回收并轉(zhuǎn)換為可用狀態(tài)。

2.3 信息傳遞機(jī)制

在分布式處理中,處理單元之間的信息傳遞會導(dǎo)致大量的通信,可能會成為系統(tǒng)的性能瓶頸,特別是網(wǎng)絡(luò)帶寬有限的情況下。為了節(jié)約通信成本,S2J框架令并發(fā)join任務(wù)共享流通道和數(shù)據(jù)流,以減少流數(shù)據(jù)對的數(shù)目。對于給定的流數(shù)據(jù)對,主要的通信過載是由信息序列化造成的。為減少這種過載,S2J框架使用由多個元組組成的元組塊進(jìn)行信息傳遞。

對于基于元組塊的信息傳遞,我們提出MP-2PF協(xié)議部署信息的傳遞過程,使其與join處理過程同步,并提出元組塊大小的選擇方法,動態(tài)調(diào)整元組塊粒度以滿足不同需求,如減少帶寬和避免假陽性分級卸載。

2.3.1 MP-2PF協(xié)議

S2J框架在join過程中采用two-phase forwarding model[14]。基于此模型,我們提出MP-2PF信息傳遞協(xié)議,實(shí)現(xiàn)處理單元之間被動的信息交換,即當(dāng)處理單元的狀態(tài)發(fā)生改變的時候,它立即將這一轉(zhuǎn)變信息送至依賴于此信息來完成下一操作的近鄰單元。

MP-2PF協(xié)議中共有三種信息類型,即:

SIZE_CHG:將其工作負(fù)載大小發(fā)送至前趨。

TUPLE_BLK:在相鄰處理單元之間傳遞元組塊。

ACK:確認(rèn)接收元組塊。

當(dāng)join處理單元的工作負(fù)載量(以元組數(shù)為計(jì)量單位)發(fā)生改變,SIZE_CHG信息會被發(fā)送至其前趨單元。若工作負(fù)載量小于其前趨單元且兩者之差至少為一個元組塊大小,那么這兩個相鄰的處理單元之間會有工作負(fù)載傳遞。此過程中,在收到相應(yīng)的確認(rèn)反饋之前,前趨單元將一直保存?zhèn)鬟f元組的副本,TUPLE_BLK信息包含傳遞的元組塊,當(dāng)收到后繼單元發(fā)送的ACK信息時,刪除副本信息。這樣,S2J框架避免了錯失join數(shù)據(jù)對的問題[14],同時保證一對元組之間的每一個join操作執(zhí)行且只執(zhí)行一次。

圖5概述了MP-2PF協(xié)議內(nèi)容。處理元組塊的過程中,S2J的處理單元于三個狀態(tài)中相互轉(zhuǎn)換。每個單元的初始狀態(tài)為Processing狀態(tài),并且在接收一個新的元組塊之后回到此狀態(tài)。當(dāng)收到新的元組塊時,處理單元利用新的元組進(jìn)行join操作,并將確認(rèn)信息發(fā)送至其前趨單元,隨后檢查forwarding條件以決定是否調(diào)用元組塊的forwarding步驟。Forwarding條件是指兩個近鄰處理單元的工作負(fù)載量差值達(dá)到閾值(此閾值即元組塊的大小),若forwarding條件滿足,則單元轉(zhuǎn)換到forwarding狀態(tài)。隨后處理單元將元組塊發(fā)送至其后繼單元,在其站點(diǎn)中留下轉(zhuǎn)發(fā)信息的副本,并最終返回processing狀態(tài)。若處于processing狀態(tài)的處理單元接收到確認(rèn)信息,其狀態(tài)轉(zhuǎn)變?yōu)閐eleting狀態(tài),刪除之前轉(zhuǎn)發(fā)元組的副本以及相應(yīng)的確認(rèn)信息,并將其工作負(fù)載量的改變信息發(fā)送至其前趨單元。最后,處理單元回到processing狀態(tài)。

圖5 MP-2PF協(xié)議

2.3.2 元組塊大小的選擇

對每次信息傳遞,兩個處理單元之間傳遞的元組需序列化為帶信息頭的信息,這增加了通信量,傳輸固定數(shù)量的元組的通信開銷與序列化的次數(shù)成正比。因此S2J不用每次只轉(zhuǎn)發(fā)一個元組的傳統(tǒng)策略[14],而利用包含ζ個元組的元組塊作為信息單元進(jìn)行轉(zhuǎn)發(fā),即僅當(dāng)處理單元的工作負(fù)載量高于其后繼單元ζ個元組時,它才將ζ個元組發(fā)送給其后繼單元,如圖6(a)所示;無元組傳輸?shù)那闆r如圖6(b)所示。

圖6 不同Worker間的workload差異

為最小化序列化的開銷,ζ值應(yīng)盡量大。隨著ζ增長,減少的通信過載會更小,若ζ足夠大,這種過載可忽略不計(jì)。此現(xiàn)象可由以下引理和定理來解釋。

引理1令?為一對給定的輸入數(shù)據(jù)流的輸入速率,常量c為信息頭的長度,在有m個處理單元的S2J框架中,元組塊的大小ζ與帶寬b的關(guān)系如下:

(1)

式中:bout為用于輸出join結(jié)果的帶寬,對于給定的join任務(wù),輸出帶寬是固定的。

基于引理1,我們可以得到以下定理。

定理1提供了一個定量保證:當(dāng)元組塊大小適當(dāng)時,S2J能達(dá)到一個相對低的帶寬。圖7描繪了式(1)的函數(shù)曲線,從中可發(fā)現(xiàn)當(dāng)ζ超過一個相對小的數(shù)值如20,S2J能夠得到前5%的最小帶寬。

圖7 S2J中變化量為ζ的帶寬

基于以上分析,我們提出了元組塊大小ζ值的選擇方法如下:

(2)

式中:ζ*是用戶定義的參數(shù),決定系統(tǒng)能夠達(dá)到的最小帶寬(ζ≤ζ*)。在我們的實(shí)驗(yàn)中,ζ*值設(shè)置為20。

圖8闡述了此選擇方法,從中可以看出:(1) 在τ達(dá)到閾值τ1之前,此方法在多數(shù)情況下選擇的元組塊大小ζ保持在ζ*左右,減少了所需帶寬;(2) 當(dāng)τ接近τ1時,選擇更小的ζ值,如前所述,較大的元組塊大小會增加假陽性分級卸載的可能,為了解決這一問題,S2J以增加帶寬為代價(jià)運(yùn)用了更小的元組塊大小;(3) 當(dāng)τ超過τ1時,元組塊大小縮小至1以幫助S2J平均地分配工作負(fù)載,以有效地開發(fā)每個處理單元。

圖8 不同τ值對應(yīng)的ζ

利用此元組塊大小選擇方法,S2J能夠有效地避免假陽性分級卸載。接下來,我們將闡述在每個join處理單元的利用率達(dá)到τ1之前,S2J所有m個處理單元的平均利用率τ不會超過分級卸載的閾值τ1。

2.4 優(yōu)化策略

為了輸出一定數(shù)量的join結(jié)果,所需計(jì)算量與找到目標(biāo)元組對的比較次數(shù)成正比。因此,join處理效率P可以通過以下公式計(jì)算:

(3)

更大的P值意味著框架能夠?yàn)榱鲾?shù)據(jù)的join安排更高效的處理過程。

為提高join操作的效率,S2J使用內(nèi)存中索引,減少不必要的join操作,加快處理進(jìn)度。對輸入流數(shù)據(jù)中的每個元組塊生成join鍵的內(nèi)存索引。當(dāng)元組塊到達(dá)join處理單元時,索引條目插入,當(dāng)元組塊移出處理單元時,索引條目刪除。在join過程中,每個元組搜索滿足索引中謂詞的join鍵,并與關(guān)聯(lián)于匹配的join鍵的元組進(jìn)行join操作。

為進(jìn)行不同謂詞的join操作,S2J對于等值join采用哈希索引直接定位目標(biāo)join鍵,得到鍵值相同的相應(yīng)元組;對于非等值join,利用平衡二叉搜索樹(BST)快捷訪問由謂詞如<,>,≤,≥指示的join鍵。

3 實(shí)驗(yàn)評估

我們在分布式實(shí)時流數(shù)據(jù)處理平臺Apache S4[11]基礎(chǔ)上實(shí)現(xiàn)了S2J框架,并在以下實(shí)驗(yàn)環(huán)境中對S2J進(jìn)行評估:Intel Xeon X3430 @ 2.4 GHz CPU以及CentOS 5.11,單元內(nèi)存為2 GB。在本實(shí)驗(yàn)中,我們將分別在單機(jī)(單核運(yùn)行一個join處理單元)和分布式環(huán)境(同一集群的2~8個結(jié)點(diǎn))中評估S2J的處理能力,并分別在單機(jī)4核環(huán)境以及擁有8個結(jié)點(diǎn)的集群上評估處理并發(fā)任務(wù)的效率,同時驗(yàn)證不同參數(shù)對框架的影響。

3.1 處理性能研究

由于S2J處理器在單機(jī)(單核運(yùn)行一個join處理單元)和分布式環(huán)境中都能夠部署展開,我們將研究其在兩種不同模式下運(yùn)行的性能。此外,我們也對其處理并發(fā)join任務(wù)的性能進(jìn)行報(bào)告。

3.1.1 輸入適配器和負(fù)載均衡

在此實(shí)驗(yàn)中,將S2J的join處理性能與傳統(tǒng)多核的流數(shù)據(jù)處理框架HandShake Join(握手Join)[14]性能進(jìn)行比較。為此,通過提高流數(shù)據(jù)對的輸入速率找到系統(tǒng)處理能力的上限(執(zhí)行過程中join的有效執(zhí)行時間固定),記錄使CPU利用率達(dá)到95%的數(shù)據(jù)輸入率,此輸入率表示系統(tǒng)可承受的最大輸入速率,即系統(tǒng)的最大處理性能。為研究系統(tǒng)在不同工作負(fù)載量下的處理性能,將join有效操作時間從5分鐘逐漸調(diào)整至20分鐘,在同一單機(jī)上依次運(yùn)行所測驗(yàn)的框架,每次執(zhí)行運(yùn)用不同數(shù)量的核(2~8個)。

圖9闡述了在不同join有效操作時間及不同數(shù)量核的情況下,HandShake Join和S2J在單機(jī)上的系統(tǒng)處理性能對比。如圖9(a),有效操作時間為5分鐘,核的數(shù)量由2增至8,利用哈希索引的S2J能承受的最大數(shù)據(jù)輸入率由每秒4 000元組線性增至8 500元組,利用二叉搜索樹索引的S2J同樣具有線性增長趨勢且增長幅度較大,而HandShake Join能承受的最大數(shù)據(jù)輸入率最小,且增加核的數(shù)量時,其增長幅度不明顯。從中發(fā)現(xiàn):(1) 與HandShake Join相比,S2J框架能承擔(dān)更高的最大輸入速率,具有更高的join操作處理性能;(2) 運(yùn)行的核數(shù)量越多,相比于HandShake Join框架,S2J最大輸入率的增長更快,即S2J在join處理性能上亦有更好的擴(kuò)展性。這是由于S2J獨(dú)特的優(yōu)化策略和系統(tǒng)結(jié)構(gòu)使其在處理join操作時具有更高的效率,也使其較HandShake Join具有更高的處理性能和更好的擴(kuò)展性。

圖9 單機(jī)環(huán)境下S2J和HandShake Join所能承受的最大輸入率

3.1.2 分布式環(huán)境中的處理性能

此實(shí)驗(yàn)將S2J框架與D-Stream[20]在分布式環(huán)境下的性能進(jìn)行比較。由于其底層實(shí)現(xiàn)部分是不支持非等值join的基于哈希的map和reduce,D-Stream只處理等值join操作。且1.2節(jié)指出,D-Stream將輸入的流數(shù)據(jù)分解成離散的處理單元分批處理,可能會使本應(yīng)執(zhí)行join操作的兩個元組因存在于不同的兩個單元而相互錯過,處理結(jié)果不完整。因此,這個部分僅報(bào)告D-Stream在處理等值join方面的系統(tǒng)處理性能,單機(jī)環(huán)境下的HandShake Join不參與這部分的對比。我們在同一組集群上部署S2J和D-Stream,每次執(zhí)行的時候使用不同數(shù)目的結(jié)點(diǎn)(2~8個),并且通過測驗(yàn)不同長度的有效執(zhí)行時間(5~20分鐘)來研究每個框架在不同的工作負(fù)載量下所能承受的最大數(shù)據(jù)輸入率。

圖10展示了在不同join有效操作時間以及不同數(shù)量的核的情況下,D-Stream和S2J在分布式環(huán)境下進(jìn)行等值join操作的系統(tǒng)處理性能的對比結(jié)果(其中基于二叉搜索樹索引的S2J的實(shí)驗(yàn)數(shù)據(jù)僅作為參考)。如圖10(a)中,有效的join操作時間為5分鐘,運(yùn)用的核的數(shù)量由2增加至8,D-Stream和S2J能夠承受的最大數(shù)據(jù)輸入率都有所增加且兩者數(shù)值幾乎相等,D-Stream能夠承受的最大數(shù)據(jù)輸入率略高。

圖10 分布式環(huán)境下S2J和D-Stream所能承受的最大輸入率

對于等值join任務(wù),S2J與D-Stream性能相當(dāng)。然而,D-Stream只能進(jìn)行等值join操作,且僅能輸出部分目標(biāo)join結(jié)果,而S2J處理框架能保證得到結(jié)果的完整性。所以,S2J在支持高效的θ-join和保證join結(jié)果的完整性方面優(yōu)于D-Stream。

3.1.3 執(zhí)行并發(fā)join任務(wù)的處理性能

本實(shí)驗(yàn)從兩方面研究S2J執(zhí)行并發(fā)join任務(wù)的處理性能。即并發(fā)join任務(wù)如何影響:(1) S2J框架能夠承受的最大數(shù)據(jù)輸入率;(2) S2J框架所需帶寬。我們在S2J中運(yùn)行2~8個join處理單元,使用不同數(shù)量(2~8個)的并發(fā)join任務(wù),有效執(zhí)行時間為10分鐘。

處理性能方面,圖11描述結(jié)點(diǎn)數(shù)量不同時,S2J能承受的最大數(shù)據(jù)輸入率隨并發(fā)任務(wù)數(shù)量變化的趨勢。如圖11(a)描述了結(jié)點(diǎn)數(shù)量為2,當(dāng)并發(fā)任務(wù)數(shù)量從2增至8時,基于哈希索引的S2J(等值join)能承受的最大數(shù)據(jù)輸入率從接近2 700元組每秒減少至2 000。從圖中發(fā)現(xiàn)隨任務(wù)數(shù)量的增加,最大輸入率減小,且趨勢逐漸減緩。對于相同的并發(fā)任務(wù)數(shù)量,結(jié)點(diǎn)數(shù)量增加時,S2J能承受的最大數(shù)據(jù)輸入率增加,這是由于結(jié)點(diǎn)數(shù)量的增加能有效提高S2J的處理性能。

圖11 并發(fā)任務(wù)時S2J能承受的最大輸入率(有效時間10分鐘)

通信帶寬方面,圖11為S2J在執(zhí)行不同數(shù)量的并發(fā)join任務(wù)時所需帶寬。僅作為參考,這里有S2J的樸素(naive)版本的實(shí)驗(yàn)結(jié)果,即對于每個任務(wù)都采用復(fù)制其流數(shù)據(jù)的方法來進(jìn)行join時的所需要的帶寬,將此結(jié)果作為實(shí)驗(yàn)數(shù)據(jù)的對比基準(zhǔn)。從圖12我們可以發(fā)現(xiàn):(1) 基于流數(shù)據(jù)復(fù)制的join處理框架所需帶寬隨著并發(fā)join任務(wù)數(shù)量的增加呈線性增長趨勢,而S2J所需帶寬趨于穩(wěn)定,幾乎不受并發(fā)join任務(wù)數(shù)量的影響;(2) 與基于流數(shù)據(jù)復(fù)制的join處理所需帶寬相比,當(dāng)存在大量并發(fā)join任務(wù)時,S2J框架能顯著節(jié)約帶寬。這是由于其獨(dú)特的消息傳遞機(jī)制減少了通信開銷。

圖12 不同數(shù)量的并發(fā)join任務(wù)時所需帶寬(有效時間5分鐘)

總之,S2J處理框架能夠以更少的性能代價(jià)處理更多并發(fā)的join任務(wù),同時對于帶寬幾乎沒有損耗。

3.2 處理效率研究

在實(shí)際操作中,并不是處理框架部署的所有計(jì)算都能夠得到j(luò)oin結(jié)果的輸出,通常一些計(jì)算會由于低效的join處理而被浪費(fèi)。有效計(jì)算的百分比可以通過join操作的效率進(jìn)行估算(見2.4節(jié)中的式(3))。Join處理效率越高,框架可通過更少的計(jì)算量獲得同樣的join操作結(jié)果,這就為更多的進(jìn)程節(jié)約了更多的CPU周期。我們在單機(jī)4核環(huán)境下對S2J和HandShake Join的處理效率進(jìn)行對比,以及在有8個結(jié)點(diǎn)的集群上對S2J和D-Stream的處理效率進(jìn)行對比。有效執(zhí)行時間為5秒至1分鐘,兩組輸入流對分別以1 000元組每秒和2 000元組每秒進(jìn)行輸入。

圖13展示了對比結(jié)果。從中可以發(fā)現(xiàn):(1) S2J的處理效率遠(yuǎn)高于HandShake Join和D-Stream,并隨由有效時間增加或流數(shù)據(jù)輸入率增加引起的工作負(fù)載量增加而提高;(2) HandShake Join和D-Stream的處理效率很低(在本實(shí)驗(yàn)中,其效率低于1%),由于HandShake Join對每一個元組對進(jìn)行比較,降低處理效率;而D-Stream丟失了許多用于join操作的目標(biāo)元組對,只輸出部分join結(jié)果,降低處理效率。

3.3 元組塊大小的影響

在本實(shí)驗(yàn)中,首先確認(rèn)元組塊的大小和S2J的帶寬之間的關(guān)系,接著核實(shí)元組塊大小選擇方法在承受輸入流輸入速率波動方面的有效性。

如2.3.2節(jié)中,S2J的帶寬與元組塊大小ζ成反比,基于這一點(diǎn),當(dāng)ζ增大超過一定閾值(如20)時,縮小的帶寬(如5%的變化)幾乎可被忽略。為證明此結(jié)論,我們部署有效執(zhí)行時間5分鐘,具有8個join處理單元,流數(shù)據(jù)輸入率每秒2 000至4 000元組的join任務(wù)。執(zhí)行過程中,將元組塊大小從1個元組調(diào)整至200,并報(bào)告相應(yīng)的帶寬如圖14。從圖中可以發(fā)現(xiàn),帶寬的變化率呈現(xiàn)出與圖7中S2J帶寬與ζ之間的關(guān)系相同的趨勢,由此證明上述結(jié)論。

圖14 不同元組塊大小的S2J中的帶寬

運(yùn)用元組塊大小選擇方法,S2J能容忍數(shù)據(jù)輸入率的波動,避免假陽性分級卸載。為證實(shí)這一點(diǎn),卸載的基本比率(見2.2.2節(jié))設(shè)為B=30%,τ1=80%,有效執(zhí)行時間為1分鐘,在8個join處理單元上運(yùn)行join任務(wù)。輸入的流數(shù)據(jù)對具有每秒1 000元組的平均傳入速率,同時其波動具有不同的振幅和頻率。

圖15展示S2J使用選定的ζ值和使用固定元組塊大小(即20,50,500)時不同結(jié)果的對比??砂l(fā)現(xiàn),使用選定的元組塊大小時,S2J成功承受數(shù)據(jù)輸入率的波動。而使用其他的ζ值時發(fā)生假陽性分級卸載現(xiàn)象。

圖15 不同輸入數(shù)據(jù)率波動振幅及頻率下元組塊大小對分級卸載的影響

3.4 適應(yīng)不同工作負(fù)載量的案例研究

本實(shí)驗(yàn)中,為闡述S2J框架在執(zhí)行過程中如何適應(yīng)不同的工作負(fù)載量,我們安排了一個案例研究。為此,采用具有不同輸入率的流數(shù)據(jù)對作為輸入,如圖16(a),S2J中安排4個結(jié)點(diǎn)作為其初始處理單元(一個處理單元運(yùn)行在一個結(jié)點(diǎn)上),有效執(zhí)行時間為60秒,設(shè)置閾值τ0=20%,τ1=80%,τ2=82%。添加和釋放處理單元的規(guī)則如下:(1) 若所有處理單元的平均利用率τ滿足τ>τ2且延續(xù)超過30秒,系統(tǒng)將添加額外的結(jié)點(diǎn)使得τ減少至τ*左右(即40%,因?yàn)棣?=0.5·τ1);(2) 若τ<τ0且延續(xù)5分鐘,將卸載一定數(shù)量的結(jié)點(diǎn)以使τ上升至τ*附近。

圖16 S2J適應(yīng)不同工作負(fù)載量的案例研究

圖16的(b)至(d)分別展示τ值,實(shí)時分級卸載,執(zhí)行過程中的節(jié)點(diǎn)數(shù)量,從中得出以下幾點(diǎn)發(fā)現(xiàn)(按照時間順序):(1) S2J能有效容忍在第400至500秒時由于短暫的數(shù)據(jù)輸入率增加而造成的工作負(fù)載量波動;(2) 一旦每個join處理單元的利用率將要達(dá)到飽和(即τ>τ1),如圖16(c)所示,S2J卸載一部分負(fù)載使得其處理單元恢復(fù)處理性能;(3) 當(dāng)工作負(fù)載量持續(xù)增加,τ>τ2的情況持續(xù)超過30秒時,如圖16(d)所示,4個新的處理單元添加至S2J框架來幫助處理增加的工作負(fù)載,執(zhí)行此操作之后,所有處理單元的平均工作負(fù)載在幾秒內(nèi)降至40%左右。這是因?yàn)楦鶕?jù)MP-2PF協(xié)議,原始單元的元組被迅速傳遞給新添加的單元,直到工作負(fù)載量達(dá)到平衡;(4) 若平均利用率降至τ0以下,且此狀態(tài)持續(xù)5分鐘以上,S2J卸載多余的join處理單元。系統(tǒng)可自動選擇合適數(shù)量的處理單元(此例中為4個)來卸載,以使τ恢復(fù)到接近于τ*的值。卸載操作在60秒內(nèi)完成,即有效執(zhí)行時間,其原因是每一個新添加的元組將被S2J中的處理單元傳遞,其生命周期終止時被移除。

4 結(jié) 語

我們提出了S2J——一個基于高度分布式環(huán)境的可擴(kuò)展流數(shù)據(jù)join處理框架。S2J能在不同工作負(fù)載量下有效地使用計(jì)算資源,有能力處理由流數(shù)據(jù)輸入率的增加帶來的暫時性過載情況,高效處理θ-join且提供實(shí)時保證,并保證join結(jié)果的完整性。S2J還可減少通信開銷,避免流數(shù)據(jù)輸入速率波動時過大的元組塊造成的假陽性分級卸載。我們分別于單機(jī)和分布式環(huán)境下進(jìn)行實(shí)驗(yàn),實(shí)驗(yàn)結(jié)果闡述了S2J框架處理join操作的高效性和有效性,同時證實(shí)其能夠容忍波動的數(shù)據(jù)輸入率并適應(yīng)不同的工作負(fù)載量。

[1] Ananthanarayanan R,Basker V,Das S,et al.Photon:fault-tolerant and scalable joining of continuous data streams[C]//Proceedings of the 2013 ACM SIGMOD International Conference on Management of Data.ACM,2013:577-588.

[2] Blanas S,Patel J M,Ercegovac V,et al.A comparison of join algorithms for log processing in mapreduce[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data.ACM,2010:975-986.

[3] Chen S,Gibbons P B,Nath S.PR-join:a non-blocking join achieving higher early result rate with statistical guarantees[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data.ACM,2010:147-158.

[4] Dittrich J P,Seeger B,Taylor D S,et al.Progressive merge join:a generic and non-blocking sort-based join algorithm[C]//Proceedings of the 28th International Conference on Very Large Data Bases.ACM,2002:299-310.

[5] Gedik B,Yu P S,Bordawekar R R.Executing stream joins on the cell processor[C]//Proceedings of the 33th International Conference on Very Large Data Bases.ACM,2007:363-374.

[6] Golab L,?zsu M T.Issues in data stream management[J].SIGMOD Record,2003,32(2):5-14.

[7] Ives A G,Florescu D,Friedman M,et al.An adaptive query execution system for data integration[C]//Proceedings of the 1999 ACM SIGMOD International Conference on Management of Data.ACM,1999:299-310.

[8] Karnagel T,Habich D,Schlegel B,et al.The hells-join:a heterogeneous stream join for extremely large windows[C]//Proceedings of the 19th International Workshop on Data Management on New Hardware.ACM,2013:1-7.

[9] Logothetis D,Yocum K.Ad-hoc data processing in the cloud[J].VLDB Endowment,2008,1(1):1472-1475.

[10] Mokbel M,Lu M,Aref W G.Hash-merge join:a non-blocking join algorithm for producing fast and early join results[C]//Proceedings of the 20th IEEE International Conference on Data Engineering.IEEE,2004:251-262.

[11] Neumeyer L,Robbins B,Nair A,et al.S4:distributed stream computing platform[C]//Proceedings of the 12th IEEE International Conference on Data Mining.IEEE,2010:170-177.

[12] Qian J,Li Y,Wang Y,et al.An embedded co-processor for accelerating window joins over uncertain data streams[J].Microprocessors and Microsystems,2012,36(6):489-504.

[13] Qian Z,He Y,Su C,et al.Timestream:reliable stream computation in the cloud[C]//Proceedings of the 8th ACM European Conference on Computer Systems.ACM,2013:1-14.

[14] Teubner J,Mueller R.How soccer players would do stream joins[C]//Proceedings of the 2011 ACM SIGMOD International Conference on Management of Data.ACM,2011:625-636.

[15] Tok W H,Bressan S,Lee M L.RRPJ:result-rate based progressive relational join[C]//Proceedings of the 12th International Conference on Database Systems for Advanced Applications.Springer,2007:43-54.

[16] Urhan T,Franklin M J.Dynamic pipeline scheduling for improving interactive query performance[C]//Proceedings of the 27th International Conference on Very Large DataBases.ACM,2001:501-510.

[17] Wang S,Rundensteiner E.Scalable stream join processing with expensive predicates:workload distribution and adaptation by time-slicing[C]//Proceedings of the 12th International Conference on Extending Database Technology.ACM,2009:299-310.

[18] Wilschut A N,Apers P M G.Dataflow query execution in a parallel main-memory environment[C]//Proceedings of the 1st International Conference on Parallel and Distributed Information Systems.IEEE,1991:68-77.

[19] Zaharia M,Chowdhury M,Das T,et al.Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation.ACM,2012:25-27.

[20] Zaharia M,Das T,Li H,et al.Discretized streams:fault-tolerant streaming computation at scale[C]//Proceedings of the 24th ACM Symposium on Operating Systems Principles.ACM,2013:423-438.

猜你喜歡
處理單元組塊負(fù)載量
“廠”字形克瑞森無核葡萄負(fù)載量對果實(shí)質(zhì)量的影響
城市污水處理廠設(shè)備能耗及影響因素分析研究
不同負(fù)載量對‘馬瑟蘭’枝條貯藏營養(yǎng)的影響
空氣處理系統(tǒng)應(yīng)用性測試
組塊理論的解讀及啟示
大型半潛式起重船塢內(nèi)建造整體合攏方案論證
上海地區(qū)不同果實(shí)負(fù)載量對信濃樂葡萄品質(zhì)影響研究
為什么聽得懂卻不會做
榛子主干橫截面積和產(chǎn)量關(guān)系的研究
電動汽車主控制器雙機(jī)熱備的設(shè)計(jì)
信宜市| 平顶山市| 青铜峡市| 任丘市| 泰宁县| 台东县| 伊吾县| 金坛市| 合阳县| 桐乡市| 文登市| 越西县| 上高县| 平江县| 临朐县| 汝阳县| 大冶市| 建瓯市| 屏南县| 颍上县| 新营市| 迭部县| 广汉市| 莒南县| 白沙| 阳春市| 永嘉县| 荃湾区| 宾阳县| 大洼县| 陕西省| 沙坪坝区| 颍上县| 吴忠市| 南雄市| 左贡县| 葵青区| 惠东县| 怀集县| 禹州市| 无为县|