關(guān)鍵詞:大數(shù)據(jù);流式計算;窗口計算;Flink
中圖分類號:TP316.4 文獻標識碼:A
1引言(Introduction)
在大數(shù)據(jù)技術(shù)發(fā)展早期,批處理技術(shù)應(yīng)用廣泛,如阿帕奇軟件基金會開發(fā)的Hadoop MapReduce框架、加州大學(xué)伯克利分校開發(fā)的Spark框架等,取得了令人矚目的成果[1];但隨著業(yè)務(wù)要求的不斷提高,離線計算高延遲的弊端逐漸暴露,流式計算引擎應(yīng)運而生,包括最早由推特公司開發(fā)的Storm框架、Spark框架的流計算擴展Spark Streaming、谷歌公司開發(fā)的Google Dataflow框架、最早由柏林工業(yè)大學(xué)開發(fā)的Flink框架等[2],它們能在數(shù)據(jù)連續(xù)到達的同時進行實時計算,被廣泛應(yīng)用在對時間性要求很高的場景中。流式計算的數(shù)據(jù)源沒有邊界,由計算引擎負責確定窗口范圍,但在如“雙11”網(wǎng)購促銷日、春運搶票等高負載應(yīng)用中,原生窗口的性能常無法滿足實際應(yīng)用需要[3]。分析原因,一是這些框架采用的全量創(chuàng)建窗口的方式難以支持毫秒級的刷新頻率,生成的窗口數(shù)量巨大;二是交易數(shù)據(jù)流存在非均勻性,為及時計算活躍用戶的數(shù)據(jù),窗口必須密集,但也會導(dǎo)致系統(tǒng)為低活躍的用戶構(gòu)造大量的重復(fù)窗口,造成資源浪費。
本文以由阿帕奇軟件基金會孵化的Apache Flink計算引擎為例,分析其窗口機制性能缺陷的根源。之后提出一種優(yōu)化密集滑動窗口的方案,能減少系統(tǒng)需要構(gòu)造的窗口數(shù)量,并通過計算比較兩種方案,闡明優(yōu)化方案的有效性。
2原生窗口機制及其問題(The native windowmechanism and its problems)
2.1原生窗口機制
流式計算中的數(shù)據(jù)源不斷產(chǎn)生數(shù)據(jù)形成流,需要由窗口劃定一段時間范圍的計算結(jié)果。例如,一個商務(wù)平臺中對每個商戶的交易進行1 min的統(tǒng)計,每500 ms更新一次計算結(jié)果,生成一個滑動窗口,則1 min為滑動窗口的長度(size),500 ms為此窗口的步長(slide),商戶號為此窗口的聚合鍵(key)。
在理想情況下,流處理引擎應(yīng)該為每個key預(yù)先分配好滑動窗口,這樣在有數(shù)據(jù)到達時,數(shù)據(jù)就可以直接落到對應(yīng)的窗口中,但實際上流式計算框架普遍采用懶構(gòu)造方式[4],這種方式為了節(jié)省資源是不會為還未出現(xiàn)的key預(yù)先分配窗口的,只有當一個key有對應(yīng)數(shù)據(jù)到達后才會創(chuàng)建,然后向前追溯,補充生成之前由于懶策略而跳過的所有窗口。圖1展示了Flink計算滑動窗口的創(chuàng)建過程:數(shù)據(jù)在t時刻到達后,除了創(chuàng)建從t到t+size范圍的窗口,還通過循環(huán)不斷向前補充創(chuàng)建需要追溯的滑動窗口。
在后續(xù)不斷有新數(shù)據(jù)抵達時,窗口構(gòu)建策略依然不變,繼續(xù)循環(huán)和向前追溯。此外,F(xiàn)link會將準備創(chuàng)建的新窗口和已有的窗口進行比較,合并相同的窗口,這在Flink源代碼中streaming.runtime模塊的WindowOperator類的processElement方法中實現(xiàn)。
2.2滑窗機制存在的問題
首先是數(shù)據(jù)傾斜帶來的問題,在流式系統(tǒng)中的數(shù)據(jù)源是非均勻性的,在相同時間內(nèi),不同key產(chǎn)生的數(shù)據(jù)量級存在顯著的差異,或者對于同一個key,其數(shù)據(jù)產(chǎn)生頻率在不同時間段存在顯著的差異。對于頻繁更新的key,為了保證數(shù)據(jù)的及時性,必須使用較小的步長,滿足熱點key的刷新頻率要求,導(dǎo)致系統(tǒng)不得不為稀疏key也配置同樣的窗口創(chuàng)建策略,而這些窗口內(nèi)部大部分都保存了相同的狀態(tài)、具有相同的輸出,實際上是多余的,造成內(nèi)存資源的浪費。
其次是實時性問題,在很多系統(tǒng)中,響應(yīng)時間是評估系統(tǒng)能力的硬指標,例如對于風險監(jiān)控系統(tǒng),風險行為越早被檢測到,被攔截或挽回損失的可能性就越大。流式計算引擎的窗口刷新頻率決定了一個風險行為從發(fā)生到體現(xiàn)在計算結(jié)果的延遲時間。假設(shè)窗口計算的步長是10 min,那么無論將系統(tǒng)處理和數(shù)據(jù)傳輸?shù)难舆t壓縮到多低,在最壞情況下,一個風險事件也要在10 min后才從窗口統(tǒng)計中被輸出。在這個時間差之內(nèi),系統(tǒng)無法感知到風險的發(fā)生,也無法及時響應(yīng)。因此,理想情況下,毫秒級的窗口步長是最佳的,但根據(jù)窗口的定義方式,對每個新key所需要創(chuàng)建窗口的個數(shù)=窗口長度/窗口步長,當窗口長度達到小時甚至天數(shù)時,維持毫秒級的步長會導(dǎo)致窗口數(shù)量巨大,如果實際系統(tǒng)中有百萬級以上數(shù)量的key,就會帶來大型分布式計算架構(gòu)也難以承擔的負載。
同時,密集的多余窗口在創(chuàng)建和銷毀時的高并發(fā)會導(dǎo)致CPU占用過高和I/O負載高,這使得高頻(毫秒級延遲)的窗口很難用原生窗口實現(xiàn),因為在下一輪窗口創(chuàng)建時,上一輪窗口產(chǎn)生的CPU占用和I/O負載可能還未被完全消化,導(dǎo)致系統(tǒng)性能雪崩問題[5]。
3現(xiàn)有優(yōu)化及其不足(Existing optimization and itsdeficiencies)
3.1 ProcessFunction優(yōu)化方法
應(yīng)用層目前使用處理函數(shù)(ProcessFunction)替代傳統(tǒng)窗口是一種常用的優(yōu)化方法,它本質(zhì)上是應(yīng)用一個流處理函數(shù),在其中定義對每條上游數(shù)據(jù)的獨立處理邏輯[ 6 ]。
ProcessFunction對一個key只構(gòu)造一個實例,接受與保存新到達的數(shù)據(jù),不斷地對過期的舊數(shù)據(jù)進行清理。以Flink為例,其提供了MapState類用于保存ProcessFunction的狀態(tài),之后可以調(diào)用windowState.put()方法向其中添加和更新狀態(tài)。這樣就可以將ProcessFunction當作一個窗口,雖然此優(yōu)化避免了窗口冗余,但是要求在函數(shù)內(nèi)部保存原始數(shù)據(jù),因此削弱了優(yōu)化效果。此外,如果Flink上游數(shù)據(jù)更新非???,而ProcessFunction沒有滑窗策略的頻率控制的步長,下游的I/O負載壓力會顯著增大。
3.2采用分桶策略的ProcessFunction優(yōu)化
為了減少ProcessFunction優(yōu)化方法中保存原始數(shù)據(jù)過多消耗的內(nèi)存空間,可以在犧牲一定精度的情況下采用分桶策略,在新數(shù)據(jù)到達時進行部分聚合。例如,對一個長度為10 min的窗口,每分鐘分為一桶,這樣窗口中只需要存10 份子狀態(tài),每分鐘清理一個桶即可,不再需要保存原始數(shù)據(jù)。圖2展示了采用分桶策略的ProcessFunction優(yōu)化方法的原理。
此優(yōu)化方法雖然在一定程度上解決了內(nèi)存問題,但同時限制了計算的粒度,如果分桶的間隔過?。ɡ绾撩爰壏滞埃碗y以達到優(yōu)化效果,甚至因加入了額外的邏輯而進一步加重了系統(tǒng)的負擔。擴大分桶的間隔相當于放大了滑動窗口的步長,雖然能夠起到優(yōu)化內(nèi)存占用的效果,但是無法滿足上文所述的對于高刷新頻率的需求。
綜上所述,應(yīng)用層的常用優(yōu)化方法雖然能夠在一定程度上減少窗口冗余、降低一定的內(nèi)存負載,但其優(yōu)化策略均存在一定的副作用,難以滿足非均勻數(shù)據(jù)源、高窗口刷新率場景的應(yīng)用要求。除此以外,使用這些優(yōu)化方法不僅要求重新編碼實現(xiàn)計算邏輯,還必須手動維護和清理狀態(tài),這與直接使用引擎提供的窗口功能相比,額外增加了開發(fā)工作量。
4基于關(guān)鍵窗口機制的優(yōu)化(Optimization basedon key-window mechanism)
為解決非均勻數(shù)據(jù)源中稀疏key導(dǎo)致的狀態(tài)內(nèi)存資源浪費及CPU占用高和I/O負載高的問題,本文提出一種基于關(guān)鍵窗口機制的窗口實現(xiàn)優(yōu)化方案,即在不影響計算結(jié)果正確性的情況下,流處理引擎只進行真正組成計算結(jié)果的關(guān)鍵窗口的創(chuàng)建,省略多余的原生窗口,從而較少了內(nèi)部窗口數(shù)量,優(yōu)化了系統(tǒng)性能。
4.1關(guān)鍵窗口的定義
如上文所述,流式處理引擎的原生窗口機制的問題根源在于,為稀疏key構(gòu)造了大量的內(nèi)部狀態(tài)和輸出都相同的冗余窗口。為了從根本上優(yōu)化系統(tǒng)性能,就需要設(shè)計一種方法使得系統(tǒng)跳過對多余窗口的創(chuàng)建。在排除冗余窗口之后,剩下的窗口就是關(guān)鍵窗口。
具體可以將“關(guān)鍵窗口”定義為所有窗口中那些輸出能使得計算結(jié)果實際產(chǎn)生變化的窗口。例如,對于圖3中的數(shù)據(jù)序列,每個方格表示一個時間單位,方格中的圓形符號表示該時間單位內(nèi)上游系統(tǒng)向流計算引擎發(fā)送了數(shù)據(jù)。
在本研究中,采用長度為4 個單位時間、步長為1 個單位時間的滑動窗口進行統(tǒng)計,如果采用原生的窗口邏輯,則需要創(chuàng)建的窗口如圖4所示,圖中數(shù)據(jù)流上下方的每個線段表示一個窗口,共有13個。
對同樣的數(shù)據(jù)源若采用優(yōu)化的策略,根據(jù)上文對“關(guān)鍵窗口”的定義,需要創(chuàng)建的關(guān)鍵窗口如圖5所示,共有8個。這些關(guān)鍵窗口分別對應(yīng)了各個計算結(jié)果發(fā)生變化的時間節(jié)點:“1”表示首個數(shù)據(jù)點進入統(tǒng)計,“2”表示首個數(shù)據(jù)點離開統(tǒng)計,“3”表示第二個數(shù)據(jù)點進入統(tǒng)計,“4”表示第二個數(shù)據(jù)點離開統(tǒng)計,“5”表示第三個數(shù)據(jù)點進入統(tǒng)計,“6”表示第四個數(shù)據(jù)點進入統(tǒng)計,“7”表示第三個數(shù)據(jù)點離開統(tǒng)計,“8”表示第四個數(shù)據(jù)點離開統(tǒng)計。
在本研究中,關(guān)鍵窗口的數(shù)量比默認窗口少5 個,而隨著窗口長度越長、窗口步長越短,以及數(shù)據(jù)源的不均勻程度越高,關(guān)鍵窗口和原生窗口相比,在數(shù)量上的優(yōu)勢就會越發(fā)明顯。
4.2關(guān)鍵窗口優(yōu)化方案的實現(xiàn)
本文提出的關(guān)鍵窗口優(yōu)化方案,不需要推倒現(xiàn)有的流計算框架從零設(shè)計新系統(tǒng),而是可以通過對現(xiàn)有的流計算引擎進行適當?shù)母脑?,使其支持關(guān)鍵窗口的創(chuàng)建和計算。這樣能充分利用成熟引擎在功能和穩(wěn)定性上的諸多優(yōu)勢,使優(yōu)化方案低成本地投入工程應(yīng)用。
改造現(xiàn)有流計算引擎實現(xiàn)關(guān)鍵窗口,關(guān)鍵在于使關(guān)鍵窗口的創(chuàng)建和計算都基于原生的方法實現(xiàn),不增加額外的方法和狀態(tài)存儲。以Flink為例,其原生的窗口機制是基于Accumulator(也稱Aggregate Function,聚合函數(shù))的方式實現(xiàn)的[7],這要求在僅利用窗口的add()、merge()和getResult()方法的前提下,完成對所有關(guān)鍵窗口的生成和計算,而不需要借助額外的Evictor(即刪除元素的方法)或其他的狀態(tài)管理手段:不使用Evictor,是因為指定Evictor之后,會使得窗口不再進行預(yù)聚合,實際上導(dǎo)致系統(tǒng)喪失了有狀態(tài)計算的優(yōu)勢;而借助其他底層狀態(tài)管理函數(shù),如ProcessFunction,則會丟失窗口的語義性,使其和更高抽象的API(如SQL API)的兼容出現(xiàn)問題[8]。
基于上述前提,本文提出的關(guān)鍵窗口實現(xiàn)方案如下。
將關(guān)鍵窗口分為左關(guān)鍵窗口和右關(guān)鍵窗口,關(guān)鍵窗口定義示意圖如圖6所示。左關(guān)鍵窗口是有數(shù)據(jù)進入而使得計算結(jié)果發(fā)生改變的窗口,右關(guān)鍵窗口是有數(shù)據(jù)離開而使得計算結(jié)果發(fā)生改變的窗口。
對窗口創(chuàng)建邏輯的改變不影響數(shù)據(jù)進入窗口后的計算方式,需要相應(yīng)改變的是窗口狀態(tài)的初始化方法:在傳統(tǒng)的窗口機制中,窗口的創(chuàng)建是時間驅(qū)動的,窗口按照步長逐個創(chuàng)建,新數(shù)據(jù)到達時,它所屬于的所有窗口在邏輯上已經(jīng)處于預(yù)備的狀態(tài),在等待這個新數(shù)據(jù);而在關(guān)鍵窗口機制中,窗口的創(chuàng)建是數(shù)據(jù)驅(qū)動的,所有窗口均是隨著相對應(yīng)的某個數(shù)據(jù)點的到達而創(chuàng)建的。因此,在新數(shù)據(jù)到達時,它一方面會落入此前已經(jīng)創(chuàng)建好的窗口,另一方面同時需要創(chuàng)建其自身所對應(yīng)的關(guān)鍵窗口。
右關(guān)鍵窗口反映的是這個數(shù)據(jù)點失效而使計算結(jié)果發(fā)生的變化。因為在創(chuàng)建時,當前數(shù)據(jù)點是最新的,其后的數(shù)據(jù)還未到來,所以為了表示當前數(shù)據(jù)點失效而產(chǎn)生的變化,直接創(chuàng)建一個空窗口即可。
左關(guān)鍵窗口反映的是這個數(shù)據(jù)點加入統(tǒng)計后使得計算結(jié)果發(fā)生的變化。左關(guān)鍵窗口在創(chuàng)建時需要繼承此前的歷史狀態(tài)。根據(jù)對“關(guān)鍵窗口”的定義,系統(tǒng)會在有數(shù)據(jù)點進入和失效的關(guān)鍵時間位置創(chuàng)建窗口,而系統(tǒng)的統(tǒng)計結(jié)果是完全由窗口輸出的。因此,當一個新數(shù)據(jù)點抵達時,必然能夠找到一個已經(jīng)存在的窗口,其中保存的狀態(tài)正是新窗口需要繼承的歷史狀態(tài)。最簡單的一種情況是,在前一個窗口時間內(nèi),沒有其他數(shù)據(jù)抵達,即無歷史狀態(tài)可以繼承,這種情況下直接創(chuàng)建空窗口即可。如果在前一個窗口時間內(nèi)存在數(shù)據(jù)點,就要將當前的數(shù)據(jù)合并到最新的歷史狀態(tài)中,即取到系統(tǒng)輸出的前一個統(tǒng)計結(jié)果,再將新數(shù)據(jù)加入計算。在流計算引擎中,窗口管理器只記錄各個窗口起始和結(jié)束的位置信息[9],它無法直接感知到系統(tǒng)的前一個統(tǒng)計結(jié)果是什么,或者是由哪個窗口觸發(fā)的,因此需要通過判斷窗口位置的確定從哪個窗口繼承歷史狀態(tài)信息。
具體策略如下:當新數(shù)據(jù)到達時,如上文所述,首先創(chuàng)建一個空的右關(guān)鍵窗口;然后確定此數(shù)據(jù)對應(yīng)的左關(guān)鍵窗口的位置;檢查即將創(chuàng)建的左關(guān)鍵窗口和此前存在的左關(guān)鍵窗口是否存在重合;若無重合,直接創(chuàng)建此窗口;若有重合,檢查即將創(chuàng)建的左關(guān)鍵窗口與最近一個左關(guān)鍵窗口的非重疊部分是否包含數(shù)據(jù)(即其中是否存在右關(guān)鍵窗口的起點);若沒有這樣的右關(guān)鍵窗口,調(diào)用merge()方法,合并最接近的一個左關(guān)鍵窗口;若存在這樣的右關(guān)鍵窗口,調(diào)用merge()方法,合并最新的一個右關(guān)鍵窗口。
對下游使用計算結(jié)果的系統(tǒng)而言,采用關(guān)鍵窗口后,下游系統(tǒng)依然能夠感知到所有計算結(jié)果的更新,而唯一的區(qū)別在于,原生窗口會每隔一個窗口步長的時間就向下游發(fā)送一次數(shù)據(jù),而關(guān)鍵窗口僅在計算結(jié)果更新時發(fā)送。為了配合優(yōu)化,下游系統(tǒng)需要對失效時間的利用做簡單的調(diào)整。
具體來說,在使用原生窗口時,引擎不會發(fā)送某個key統(tǒng)計結(jié)果歸零的數(shù)據(jù),需要下游系統(tǒng)自行觀察該key的數(shù)據(jù)不再更新,從而得知該key的最后一個統(tǒng)計結(jié)果過期。在使用關(guān)鍵窗口優(yōu)化后,數(shù)據(jù)過期時會將空窗口的輸出發(fā)送給下游,因此下游系統(tǒng)僅需將數(shù)據(jù)過期時間調(diào)整為窗口長度即可,如果下游系統(tǒng)在此數(shù)據(jù)過期之前收到了新的輸出,則說明有新數(shù)據(jù)抵達,要對數(shù)據(jù)過期時間進行重置。因此,改用關(guān)鍵窗口后,僅需調(diào)整下游系統(tǒng)的數(shù)據(jù)過期時間即可兼容此優(yōu)化,不會影響系統(tǒng)本身的功能。
4.3優(yōu)化效果
設(shè)某系統(tǒng)正在使用長度為p s,步長為k s的窗口。對目前的流式計算引擎而言,接收到n 個數(shù)據(jù)后,需要創(chuàng)建的窗口最大個數(shù)為p×n/k個。這是因為每接收到一個數(shù)據(jù),系統(tǒng)都需要創(chuàng)建最多p/k個窗口。在引入關(guān)鍵窗口優(yōu)化后,接收到n 個數(shù)據(jù)后,系統(tǒng)需要創(chuàng)建的窗口數(shù)量為2n 個。這是因為關(guān)鍵窗口僅體現(xiàn)數(shù)據(jù)加入統(tǒng)計、失效離開統(tǒng)計的變化,系統(tǒng)每接收到一個數(shù)據(jù),只需要創(chuàng)建2 個關(guān)鍵窗口。
在實際應(yīng)用中,p/k幾乎總是大于2的。例如,統(tǒng)計每個用戶在過去120 s內(nèi)的操作次數(shù),每2 s更新一次統(tǒng)計結(jié)果,那么p/k=60。顯然,統(tǒng)計窗口的長度越大、刷新頻率越快時,p/k值也就越大,將遠遠超過2??梢姡P(guān)鍵窗口優(yōu)化能夠有效地減少需要的窗口數(shù)量。
5結(jié)論(Conclusion)
首先,本文分析了目前流式計算引擎在密集窗口情況下在性能方面存在的潛在問題,指出采用原生的窗口策略是導(dǎo)致高負載的原因。其次,說明了現(xiàn)有的優(yōu)化方法雖然能降低內(nèi)存消耗,但是存在不足之處。最后,本文提出了基于關(guān)鍵窗口的優(yōu)化方案,通過減少計算中創(chuàng)建的窗口數(shù)量,能有效地降低系統(tǒng)在內(nèi)存和I/O兩方面的性能壓力。目前,本優(yōu)化方案是基于系統(tǒng)時間的,若要推廣到使用事件時間的應(yīng)用,未來可以進一步優(yōu)化對數(shù)據(jù)流水位線等機制的兼容。
作者簡介:
程盛陽(1996-),男,碩士,助理工程師.研究領(lǐng)域:計算機應(yīng)用.