国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

分布式環(huán)境下大規(guī)模維表關(guān)聯(lián)技術(shù)優(yōu)化

2022-02-23 10:03趙恒泰趙宇海季航旭喬百友王國仁
計算機(jī)與生活 2022年2期
關(guān)鍵詞:數(shù)據(jù)量算子數(shù)據(jù)處理

趙恒泰,趙宇海+,袁 野,季航旭,喬百友,王國仁

1.東北大學(xué) 計算機(jī)科學(xué)與工程學(xué)院,沈陽110169

2.北京理工大學(xué) 計算機(jī)學(xué)院,北京100081

隨著互聯(lián)網(wǎng)的發(fā)展和普及,網(wǎng)絡(luò)中每天產(chǎn)生的數(shù)據(jù)量在迅速增加。傳統(tǒng)的數(shù)據(jù)處理方式已經(jīng)無法面對當(dāng)前的數(shù)據(jù)規(guī)模,為了獲取這些海量的數(shù)據(jù)中潛在的價值,開發(fā)者們提出了大數(shù)據(jù)處理技術(shù)。隨著大數(shù)據(jù)處理技術(shù)的不斷發(fā)展和計算需求的不斷更迭,到現(xiàn)在為止大數(shù)據(jù)處理技術(shù)已經(jīng)經(jīng)歷了三代計算引擎的變化。第一代大數(shù)據(jù)計算引擎以Apache Hadoop為代表,利用MapReduce進(jìn)行大數(shù)據(jù)處理。這一代計算的顯著特點(diǎn)基于物理存儲的計算模式。這類計算有著非常高的吞吐量,但由于每一步的計算操作都要寫入到物理存儲中,基于內(nèi)存的計算速度與磁盤的I/O 開銷的不匹配導(dǎo)致了非常高的處理延遲。這一代計算引擎適合處理實(shí)時性要求不高的離線批處理任務(wù)。這個時代的大數(shù)據(jù)分析技術(shù)以離線分析為主,需要先統(tǒng)計和獲取全部數(shù)據(jù)再進(jìn)行分析,對分析結(jié)果的實(shí)時性要求不高,更偏重于對歷史數(shù)據(jù)的總結(jié)。第二代大數(shù)據(jù)計算引擎以Apache Spark為代表,利用內(nèi)存進(jìn)行批處理計算。相對于第一代技術(shù),這一代的顯著特點(diǎn)是將計算數(shù)據(jù)移入了內(nèi)存中,基于內(nèi)存的數(shù)據(jù)進(jìn)行計算。這種基于內(nèi)存的計算方式大大降低了第一代技術(shù)中每一步計算都需要把結(jié)果寫入磁盤產(chǎn)生的I/O 開銷所帶來的延遲。但是由于技術(shù)仍然是建立在批處理的計算模式之上,每一個批次的數(shù)據(jù)處理都有一定的時間間隔,在面對一些實(shí)時性比較強(qiáng)的計算任務(wù)時仍無法保證極低的延遲。這個時代的大數(shù)據(jù)分析技術(shù)逐漸向在線分析靠攏,已經(jīng)有了對實(shí)時數(shù)據(jù)處理的需求。第三代大數(shù)據(jù)計算引擎以Apache Flink為代表,是完全基于流計算的數(shù)據(jù)處理引擎。Apache Flink 所提供的計算平臺可以在實(shí)現(xiàn)毫秒級的延遲下,每秒處理上億次的消息或者事件。極高的數(shù)據(jù)處理能力和極低的延遲,使以Apache Flink 為代表的流計算技術(shù)成為了實(shí)時大數(shù)據(jù)分析的首選。越來越多的公司使用流計算技術(shù)構(gòu)建自己的實(shí)時數(shù)據(jù)分析引擎來替代傳統(tǒng)的數(shù)據(jù)倉庫分析。在這類數(shù)據(jù)處理和分析過程中,實(shí)時產(chǎn)生的流數(shù)據(jù)往往信息量不足,需要與離線存儲的數(shù)據(jù)進(jìn)行關(guān)聯(lián),擴(kuò)充數(shù)據(jù)屬性。離線存儲的數(shù)據(jù)稱為維表數(shù)據(jù),流數(shù)據(jù)和維表數(shù)據(jù)進(jìn)行關(guān)聯(lián)的過程稱為維表關(guān)聯(lián)。

維表關(guān)聯(lián)是當(dāng)前在線大數(shù)據(jù)分析的關(guān)鍵技術(shù)之一。針對維表關(guān)聯(lián)技術(shù)在分布式環(huán)境下的優(yōu)化,主要是對維表數(shù)據(jù)查詢進(jìn)行優(yōu)化以降低查詢維表數(shù)據(jù)所帶來的I/O 開銷和延遲。維表關(guān)聯(lián)的數(shù)據(jù)查詢優(yōu)化主要是通過異步I/O 技術(shù)增加單位時間的查詢數(shù)量和通過使用數(shù)據(jù)緩存技術(shù)將查詢到的維表緩存在分布式引擎的計算節(jié)點(diǎn)的內(nèi)存中加速查詢。通常有緩存查詢結(jié)果和緩存整個維表兩種緩存模式,其原理如圖1 所示。

圖1 傳統(tǒng)維表關(guān)聯(lián)邏輯Fig.1 Traditional dimension table connection

這兩種基于計算節(jié)點(diǎn)的優(yōu)化方式,都存在著各自的不足。數(shù)據(jù)全部緩存的方式因?yàn)閮?nèi)存問題無法支持過大規(guī)模的維表。緩存查詢結(jié)果的方式對每條未緩存數(shù)據(jù)的處理時間受限于數(shù)據(jù)庫I/O 能力,會隨著維表數(shù)據(jù)規(guī)模的增大而線性上升。當(dāng)數(shù)據(jù)流速處于一個較低水平,保證數(shù)據(jù)有效性的緩存超時機(jī)制會讓每一次數(shù)據(jù)查詢都指向數(shù)據(jù)庫,使本地緩存無效化,降低數(shù)據(jù)處理效率。

針對單節(jié)點(diǎn)優(yōu)化技術(shù)的不足,本文首先提出了一種可以用于離線存儲的數(shù)據(jù)和實(shí)時生成的數(shù)據(jù)進(jìn)行混合計算的計算模型,然后基于該模型設(shè)計了一種新型的維表關(guān)聯(lián)技術(shù)優(yōu)化方案,其原理如圖2所示。

圖2 優(yōu)化的維表關(guān)聯(lián)邏輯Fig.2 Optimized dimension table connection

該方案單點(diǎn)讀取維表數(shù)據(jù),將數(shù)據(jù)切分后分發(fā)到計算節(jié)點(diǎn),然后與流數(shù)據(jù)進(jìn)行關(guān)聯(lián)。這個優(yōu)化方案每個計算節(jié)點(diǎn)只需緩存部分維表數(shù)據(jù),提高了維表數(shù)據(jù)的緩存規(guī)模,同時大幅降低維表數(shù)據(jù)查詢所產(chǎn)生的消耗。該方案將批處理技術(shù)和流計算技術(shù)進(jìn)行了結(jié)合,同是對離線的批數(shù)據(jù)和實(shí)時的流數(shù)據(jù)進(jìn)行混合計算的一種探究。

本文主要貢獻(xiàn)如下:

(1)提出了一種適用于對離線的批數(shù)據(jù)和實(shí)時的流數(shù)據(jù)進(jìn)行混合計算的計算模型,該計算模型可以在一套API(application programming interface)中同時處理流數(shù)據(jù)和批數(shù)據(jù),也可以單獨(dú)處理流數(shù)據(jù)或單獨(dú)處理批數(shù)據(jù)。

(2)提出了一種單點(diǎn)讀取維表數(shù)據(jù),切分后進(jìn)行分發(fā)和計算的維表關(guān)聯(lián)數(shù)據(jù)緩存方式,降低了維表數(shù)據(jù)查詢對數(shù)據(jù)庫產(chǎn)生的壓力并提高了集群環(huán)境中計算系統(tǒng)對維表規(guī)模支持的上限。同時針對維表關(guān)聯(lián)計算邏輯進(jìn)行了優(yōu)化,使維表關(guān)聯(lián)技術(shù)不再局限于對數(shù)據(jù)的連接。

(3)在流計算引擎Apache Flink 中對該優(yōu)化的維表關(guān)聯(lián)技術(shù)和傳統(tǒng)的維表關(guān)聯(lián)技術(shù)進(jìn)行了實(shí)現(xiàn)。通過實(shí)驗(yàn)對該維表關(guān)聯(lián)技術(shù)進(jìn)行了驗(yàn)證,實(shí)驗(yàn)結(jié)果對比顯示該方法相對于同等條件下傳統(tǒng)的維表關(guān)聯(lián)方法,可以使計算任務(wù)的吞吐量有8~9 倍的提升,同時在計算能力滿足的情況下降低40%以上的計算延遲。

1 相關(guān)工作

在過去的幾年時間中,針對分布式環(huán)境下維表關(guān)聯(lián)操作以及以其為代表的典型的流數(shù)據(jù)與靜態(tài)數(shù)據(jù)交互的計算的優(yōu)化研究已經(jīng)存在不少的進(jìn)展。

為了解決海量數(shù)據(jù)關(guān)聯(lián)計算問題,文獻(xiàn)[9]提出了一種MESHJOIN 算法,該算法優(yōu)化了單點(diǎn)計算時連續(xù)數(shù)據(jù)流和維度數(shù)據(jù)的連接過程,但該算法對內(nèi)存分配的方式不夠高效。為此,文獻(xiàn)[10]提出了一種改進(jìn)的算法用于改善內(nèi)存分配問題。文獻(xiàn)[11]提出基于分塊思想的算法來提高M(jìn)ESHJOIN 算法的連接性能。文獻(xiàn)[12]提出MESHJOIN*算法,該算法采用多線程并發(fā)連接技術(shù),并根據(jù)工程學(xué)原理,實(shí)現(xiàn)了連接操作和關(guān)系R 讀取操作的最佳調(diào)度,保證了連接算法效率的最大化,進(jìn)一步提高連接效率。在MESHJOIN 算法的基礎(chǔ)上,文獻(xiàn)[13]又提出EHJOIN 算法,對傳統(tǒng)散列連接方法進(jìn)行改進(jìn),利用索引將部分頻繁使用的主數(shù)據(jù)存儲在內(nèi)存中,解決了高速數(shù)據(jù)流下的磁盤頻繁訪問問題。以上的算法都是基于維度數(shù)據(jù)的角度對維表連接過程進(jìn)行優(yōu)化,沒有考慮流數(shù)據(jù)的傾斜問題,據(jù)此文獻(xiàn)[14]提出了CSJR(cachedbased stream-relation join)算法,優(yōu)化了在流數(shù)據(jù)傾斜環(huán)境下的數(shù)據(jù)連接效率。

在分布式計算引擎領(lǐng)域,大量的開發(fā)貢獻(xiàn)都來自于社區(qū)。而主流的分布式系統(tǒng)Apache Spark 和Apache Flink 都針對維表關(guān)聯(lián)這種場景做出了自己的適配。

Apache Spark 提出了Spark Streaming用以改善計算延遲并提供流計算支持,這一計算模式通過將無限的流數(shù)據(jù)拆分成離散流(discretized stream),盡可能縮小每個彈性分布式數(shù)據(jù)集(resilient distributed dataset,RDD)的大小,構(gòu)建微量批數(shù)據(jù)集,來達(dá)到近似于流計算的效果,這個階段流計算的延遲在100 ms 級。在Spark Streaming 的技術(shù)背景下,Apache Spark 可以將維表數(shù)據(jù)定義為一個獨(dú)立的RDD,并緩存到每一個關(guān)聯(lián)節(jié)點(diǎn)上,在每一個批次的微量批數(shù)據(jù)集到達(dá)后,以局部批處理的方式進(jìn)行關(guān)聯(lián)。同時可以對RDD 內(nèi)容進(jìn)行更新以達(dá)到和離線維表數(shù)據(jù)相同的狀態(tài)。以上的更新操作需要用戶自己定義更新的邏輯。

由于Spark Streaming 的數(shù)據(jù)傳輸是基于微量批數(shù)據(jù)集的方式進(jìn)行數(shù)據(jù)傳輸而不是真正意義上的持續(xù)傳輸,Spark 又提出了結(jié)構(gòu)化流計算(structured streaming)。在該計算模式中引入了連續(xù)處理的概念,將流計算的延遲降低到了1 ms 級的層面。同時Structured Streaming原生地支持了Stream-Static Join,但是其底層的實(shí)現(xiàn)邏輯仍是針對每個計算節(jié)點(diǎn)進(jìn)行獨(dú)立的緩存操作。

Apache Flink 本身是基于流計算的分布式計算引擎,但是為了兼容批處理,仍獨(dú)立維護(hù)了一套用于批處理的DataSet API。Apache Flink 在1.8 版本中并未提供對維表關(guān)聯(lián)計算的官方支持,需要用戶通過一些算子手動實(shí)現(xiàn)流計算中對維表關(guān)聯(lián)計算的支持。

阿里巴巴公司基于Apache Flink 而構(gòu)建的開源引擎Blink 在Table API 的層面上提出了維表關(guān)聯(lián)的操作模式,用戶需要手動實(shí)現(xiàn)對維表數(shù)據(jù)的查詢邏輯。依靠異步查詢維表數(shù)據(jù)并緩存查詢結(jié)果的方式降低數(shù)據(jù)庫I/O 開銷,提高查詢效率。該方案的緩存模式有數(shù)據(jù)全部緩存并定時更新和LRU Cache兩種,都是基于計算節(jié)點(diǎn)的緩存方式。

以上的維表關(guān)聯(lián)計算設(shè)計,實(shí)質(zhì)上都是在流計算過程中直接向數(shù)據(jù)庫發(fā)起查詢,將查詢邏輯都綁定在每個獨(dú)立的計算節(jié)點(diǎn)中,所有的計算節(jié)點(diǎn)都要同時訪問數(shù)據(jù)源或緩存完整的維表信息。

在面對高速流數(shù)據(jù)輸入時,異步查詢數(shù)據(jù)庫的方式會對數(shù)據(jù)庫造成極大的壓力;在面對海量的維表數(shù)據(jù)時,由于每個計算節(jié)點(diǎn)的內(nèi)存限制,維表數(shù)據(jù)無法完整地寫入內(nèi)存中并導(dǎo)致執(zhí)行引擎產(chǎn)生內(nèi)存異常。

2 維表關(guān)聯(lián)技術(shù)的設(shè)計與優(yōu)化

如前所述,現(xiàn)有的大數(shù)據(jù)計算平臺并沒有支持緩存水平擴(kuò)展的維表關(guān)聯(lián)機(jī)制。在面對大規(guī)模維表數(shù)據(jù)時,全緩存模式下,維表無法完整地緩存到每一個計算節(jié)點(diǎn)中。而使用異步連接方式則會在高速計算時產(chǎn)生大量數(shù)據(jù)I/O,對數(shù)據(jù)庫造成過大的壓力,甚至導(dǎo)致數(shù)據(jù)庫連接超時,失去響應(yīng)。針對這個問題,本章設(shè)計并提出了一種面向大規(guī)模分布式計算的維表關(guān)聯(lián)機(jī)制。

本章介紹了維表關(guān)聯(lián)技術(shù)優(yōu)化的幾個具體設(shè)計。首先介紹了一種混合計算模型,做到了在同一個計算任務(wù)中統(tǒng)一批數(shù)據(jù)處理和流數(shù)據(jù)處理。然后介紹了維表關(guān)聯(lián)技術(shù)的緩存優(yōu)化,包含對維表數(shù)據(jù)源的緩存設(shè)計以及各個計算節(jié)點(diǎn)對維表數(shù)據(jù)的緩存設(shè)計。最后介紹了維表關(guān)聯(lián)技術(shù)的計算設(shè)計,包含對并行計算下的數(shù)據(jù)分發(fā)的處理和每個計算節(jié)點(diǎn)對關(guān)聯(lián)計算的處理。

2.1 定義和概念

本節(jié)給出了文中所涉及的一些基本定義和概念。

(維表)維表是指存儲在外部數(shù)據(jù)庫中的,具有數(shù)據(jù)規(guī)模大、更新時間慢等特征的一類數(shù)據(jù)表。

(算子)算子是指在計算過程中對數(shù)據(jù)處理的最小計算單位,不同的算子具有不同的計算邏輯。批處理和流計算擁有功能相同但運(yùn)行邏輯不同的算子。

(混合計算)混合計算是指在同一個計算任務(wù)中,同時存在流計算算子和批處理算子,也被稱作批流融合計算。

(維表關(guān)聯(lián))維表關(guān)聯(lián)指數(shù)據(jù)流和維表之間存在一定關(guān)系,并根據(jù)這種關(guān)系進(jìn)行的數(shù)據(jù)處理,根據(jù)關(guān)系確定連接信息并將數(shù)據(jù)表進(jìn)行組合的過程稱為連接。從廣義上來看,維表關(guān)聯(lián)包含但不限于連接操作,通過維表數(shù)據(jù)計算分析等操作也屬于維表關(guān)聯(lián)。

(背壓)背壓指當(dāng)某個計算節(jié)點(diǎn)數(shù)據(jù)處理速度低于數(shù)據(jù)傳輸速度時,接收到的數(shù)據(jù)產(chǎn)生積壓。進(jìn)而使單個計算節(jié)點(diǎn)的輸入緩存超出限制,拒絕接收新數(shù)據(jù)。最終讓整個流計算系統(tǒng)從過載的節(jié)點(diǎn)開始的一系列上游節(jié)點(diǎn)直至數(shù)據(jù)源節(jié)點(diǎn)鏈?zhǔn)骄彺嬉绯鐾V菇邮諗?shù)據(jù),等待過載節(jié)點(diǎn)處理數(shù)據(jù)。

2.2 混合計算模型

本節(jié)介紹本文所提出的混合計算模型。傳統(tǒng)的批式計算架構(gòu)中,計算的運(yùn)行邏輯如圖3 所示。計算節(jié)點(diǎn)分階段啟動,每一組計算只有全部完成以后才會解除同步等待,啟動下一組計算任務(wù),并將計算數(shù)據(jù)重新分發(fā)到下一輪的計算節(jié)點(diǎn)中。

圖3 批式計算架構(gòu)Fig.3 Batch computing architecture

傳統(tǒng)的流式計算架構(gòu)中,計算的運(yùn)行邏輯如圖4所示。所有的計算節(jié)點(diǎn)在計算開始時全部創(chuàng)建,然后數(shù)據(jù)在計算節(jié)點(diǎn)之間不斷流動,每個節(jié)點(diǎn)通過不斷拉取和消費(fèi)上游計算節(jié)點(diǎn)的數(shù)據(jù),來做到持續(xù)不停的流式處理。流計算會盡可能地不進(jìn)行數(shù)據(jù)重新分發(fā)操作,從而讓數(shù)據(jù)連續(xù)處理。

圖4 流式計算架構(gòu)Fig.4 Stream computing architecture

為了做到在流式計算中兼容對批數(shù)據(jù)的處理,本文對計算算子進(jìn)行了重新設(shè)計。在流計算中構(gòu)建標(biāo)志為批處理的算子,進(jìn)而提出了混合計算模型,如圖5 所示。該類模型算子分為三類:流計算算子、批處理算子、混合計算算子。該模型中的流算子仍和傳統(tǒng)流計算的計算邏輯一致,隨著數(shù)據(jù)流入實(shí)時計算。批處理算子通過修改流計算環(huán)境中的數(shù)據(jù)分發(fā)和計算邏輯,做到和傳統(tǒng)的批式計算一樣效果?;旌嫌嬎闼阕佑糜诹鲾?shù)據(jù)和批數(shù)據(jù)的混合計算,上游算子分別為流計算算子和批處理算子?;旌嫌嬎闼阕釉谂鷶?shù)據(jù)完全到達(dá)之前,會阻塞流計算,等待批數(shù)據(jù)分發(fā)完成以后,開始進(jìn)行計算。

圖5 混合計算架構(gòu)Fig.5 Mixed computing architecture

2.3 維表關(guān)聯(lián)技術(shù)的緩存優(yōu)化

本節(jié)介紹大規(guī)模維表關(guān)聯(lián)計算環(huán)境下對數(shù)據(jù)源節(jié)點(diǎn)緩存和計算節(jié)點(diǎn)緩存的設(shè)計和優(yōu)化。

由于維表數(shù)據(jù)也是存在變化的,若每次都將讀取到的數(shù)據(jù)全部發(fā)送給計算節(jié)點(diǎn),在更新時仍會造成較大的網(wǎng)絡(luò)傳輸開銷并阻塞流數(shù)據(jù)的處理。本文參考數(shù)據(jù)庫操作記錄日志的思想,在數(shù)據(jù)更新時只分發(fā)需要修改的數(shù)據(jù),降低網(wǎng)絡(luò)傳輸開銷。為了確定數(shù)據(jù)更新的具體內(nèi)容,本文根據(jù)維表變化緩慢的特性,在數(shù)據(jù)源處建立了緩存機(jī)制緩存已分發(fā)數(shù)據(jù)記錄。在緩存機(jī)制的鍵值對中,Key 值由關(guān)聯(lián)所需主鍵列數(shù)據(jù)合并得出,Value 值則改為數(shù)組,存儲主鍵相同其他數(shù)據(jù)不同的多版本的維表數(shù)據(jù)。但緩存完整的維表數(shù)據(jù)會占用數(shù)據(jù)源算子過多的內(nèi)存,無法支持太大的維表。由于數(shù)據(jù)源并不需對數(shù)據(jù)進(jìn)行操作,故而將維表數(shù)據(jù)拼接成字符串后計算哈希值,并存入對應(yīng)的數(shù)組中,不保存原始數(shù)據(jù)。

每輪讀取維表數(shù)據(jù)時,將數(shù)據(jù)主鍵和哈希值進(jìn)行緩存,并根據(jù)上次緩存結(jié)果判斷數(shù)據(jù)分發(fā)行為,數(shù)據(jù)存在則跳過,數(shù)據(jù)不存在則向下游計算節(jié)點(diǎn)發(fā)送新增行為,具體算法如算法1 所示。首先通過調(diào)用關(guān)聯(lián)信息選擇函數(shù)構(gòu)建鍵值并構(gòu)建整條數(shù)據(jù)的哈希值,然后將鍵值數(shù)據(jù)和哈希值寫入新緩存,最后將鍵值與舊緩存進(jìn)行比較,確定數(shù)據(jù)是否存在,存在則跳過數(shù)據(jù)并清理對應(yīng)舊緩存,不存在則發(fā)出新增行為請求。

算法1 源節(jié)點(diǎn)新數(shù)據(jù)分發(fā)算法

輸入:上一次分發(fā)所構(gòu)建的數(shù)據(jù)緩存,本次數(shù)據(jù)分發(fā)所構(gòu)建的數(shù)據(jù)緩存,待判斷數(shù)據(jù)。

輸出:無。

1.=buildKey();/*計算緩存主鍵*/

2.=Hash();/*計算維表哈希值*/

3.將與存入中;

4.If.()

5.If.().()

6.=“”;

7..().();

8.Else

9.Action=“ADD”;

10.End if

11.Else

12.Action=“ADD”;

13.End if

14.If Action==“ADD”;

15.將newData和Action 發(fā)送給下游計算節(jié)點(diǎn);

16.End if

在第一次數(shù)據(jù)分發(fā)時舊緩存的初始值為空,因此第一次將會分發(fā)所有數(shù)據(jù)。在數(shù)據(jù)讀取分發(fā)完成后,由于在數(shù)據(jù)讀取過程中已經(jīng)將讀取的數(shù)據(jù)和舊緩存進(jìn)行了匹配和刪除操作,仍存在于舊緩存的數(shù)據(jù)則全部是需要刪除的數(shù)據(jù)。此時執(zhí)行針對過期數(shù)據(jù)的數(shù)據(jù)刪除行為的分發(fā),具體操作如算法2 所示。遍歷數(shù)據(jù)讀取完成后的舊緩存,針對舊緩存中的每一條數(shù)據(jù)生成數(shù)據(jù)刪除請求并發(fā)送給下游計算節(jié)點(diǎn),完成過期數(shù)據(jù)清理。

算法2 源節(jié)點(diǎn)過期數(shù)據(jù)刪除算法

輸入:上一次分發(fā)所構(gòu)建的數(shù)據(jù)緩存。

輸出:無。

1.遍歷,獲取每一個對應(yīng)的列表;

2.遍歷列表,獲取存儲在舊緩存中的;

3.針對每一個,發(fā)送=“”的數(shù)據(jù)刪除行為,通知下游計算節(jié)點(diǎn)刪除過期緩存;

4.清空舊緩存,釋放空間.

最后在添加和刪除請求都發(fā)送完成后將當(dāng)前緩存賦值給舊緩存,即可完成一輪數(shù)據(jù)緩存和分發(fā)。

在傳統(tǒng)的維表關(guān)聯(lián)計算中,對數(shù)據(jù)的緩存均為Key-Value 的形式,其中Key 值通過關(guān)聯(lián)的主鍵列計算得出,Value 值保存維表具體數(shù)據(jù)內(nèi)容。這種傳統(tǒng)的數(shù)據(jù)緩存模式,無法支持主鍵相同的多版本的維表數(shù)據(jù)緩存,進(jìn)而無法支持對歷史維表數(shù)據(jù)的全量緩存和查詢。

本文針對該問題,在計算節(jié)點(diǎn)中提出了一種二層緩存策略。在原有的Key-Value 緩存模式下,對Value 格式進(jìn)行優(yōu)化,將Key 值相同的維表數(shù)據(jù),轉(zhuǎn)換為Json 字符串,并以數(shù)組的形式存儲于Value 中,使緩存結(jié)構(gòu)支持歷史維表數(shù)據(jù)。同時相較于存儲完整的維表結(jié)構(gòu),轉(zhuǎn)換為Json 字符串可以降低一定的內(nèi)存存儲需求。

在計算節(jié)點(diǎn)獲取數(shù)據(jù)的過程中,為了支持優(yōu)化后的數(shù)據(jù)源節(jié)點(diǎn)的數(shù)據(jù)分發(fā),數(shù)據(jù)獲取算法也要進(jìn)行重新設(shè)計,具體算法如算法3 所示。對數(shù)據(jù)變更的行為進(jìn)行判斷,如果是增加數(shù)據(jù)(ADD)就將數(shù)據(jù)寫入緩存,如果是刪除數(shù)據(jù)(DEL)就根據(jù)鍵值對緩存相關(guān)數(shù)據(jù)進(jìn)行刪除。

算法3 計算節(jié)點(diǎn)數(shù)據(jù)獲取算法

輸入:節(jié)點(diǎn)數(shù)據(jù)緩存,數(shù)據(jù)主鍵,數(shù)據(jù)文本,數(shù)據(jù)操作。

輸出:更新后的節(jié)點(diǎn)數(shù)據(jù)緩存。

1.If=“”

2.根據(jù)存儲數(shù)據(jù)在中;

3.Else

4.If=“”

5.根據(jù),刪除中數(shù)據(jù);

6.End if

7.End if

8.Return.

2.4 維表關(guān)聯(lián)技術(shù)的計算設(shè)計

維表關(guān)聯(lián)技術(shù)的計算設(shè)計分為兩部分:數(shù)據(jù)分區(qū)方式設(shè)計、數(shù)據(jù)計算模式設(shè)計。

傳統(tǒng)的維表關(guān)聯(lián)中,維表數(shù)據(jù)在各個計算節(jié)點(diǎn)間獨(dú)立存儲,因此流數(shù)據(jù)可以根據(jù)不同計算系統(tǒng)的分發(fā)策略進(jìn)行分發(fā),在流經(jīng)計算節(jié)點(diǎn)時向遠(yuǎn)程數(shù)據(jù)庫查詢獲取維表數(shù)據(jù)。全局分發(fā)的維表關(guān)聯(lián)策略中,如果流數(shù)據(jù)分發(fā)規(guī)則與維表數(shù)據(jù)分發(fā)規(guī)則不匹配,則會導(dǎo)致關(guān)聯(lián)結(jié)果為空。

因此,為了優(yōu)化后的維表關(guān)聯(lián)可以正常執(zhí)行,將對流數(shù)據(jù)和維表數(shù)據(jù)的數(shù)據(jù)分發(fā)策略進(jìn)行重新設(shè)計,統(tǒng)一計算出關(guān)聯(lián)主鍵并對主鍵進(jìn)行哈希計算獲得對應(yīng)的數(shù)據(jù)分區(qū),計算分區(qū)方式的算法如算法4 所示。第1 行根據(jù)輸入的記錄通過調(diào)用關(guān)聯(lián)信息選擇函數(shù)構(gòu)建鍵值,第2、3 行根據(jù)鍵值計算最終的數(shù)據(jù)分區(qū)。

算法4 Hash 分區(qū)算法

輸入:待分區(qū)的記錄,分區(qū)數(shù)量。

輸出:待分區(qū)記錄的分區(qū)編號。

1.key=()/*提取記錄的key*/

2.=.;

3.=%;

4.Return.

傳統(tǒng)的維表關(guān)聯(lián)計算,通過指定輸入數(shù)據(jù)和維表的主鍵列確定連接信息,輸出的是關(guān)聯(lián)以后的數(shù)據(jù)列集合。本文首先對數(shù)據(jù)緩存進(jìn)行了重新設(shè)計,支持了分布式的緩存策略以及多版本的數(shù)據(jù)緩存。為了適配對多版本數(shù)據(jù)的連接選擇問題,本文將數(shù)據(jù)關(guān)聯(lián)計算模式進(jìn)行了重新的設(shè)計。將關(guān)聯(lián)邏輯和關(guān)聯(lián)鍵值選擇邏輯進(jìn)行了抽象,獨(dú)立出了用戶可自行定義和實(shí)現(xiàn)的關(guān)聯(lián)計算函數(shù)和關(guān)聯(lián)鍵值選擇函數(shù)。用戶在連接計算的基礎(chǔ)上,可以根據(jù)實(shí)際的計算需求,確定數(shù)據(jù)流和維表的數(shù)據(jù)關(guān)聯(lián)鍵值,并自定義關(guān)聯(lián)的計算輸出結(jié)果。

由于關(guān)聯(lián)計算函數(shù)每次的調(diào)用是傳入通過關(guān)聯(lián)信息選擇函數(shù)所獲取的全部版本維表數(shù)據(jù),用戶可以做到對維表關(guān)聯(lián)的靈活處理,以支持指定關(guān)聯(lián)特定歷史版本、計算特定關(guān)系等需求。

在實(shí)際的生產(chǎn)生活場景中,一個典型的不是連接的關(guān)聯(lián)場景為商品關(guān)聯(lián)度計算。通過在計算函數(shù)處理輸入商品和歷史訂單中的存在該商品的訂單的相似關(guān)系,計算出與商品關(guān)聯(lián)的其他商品,從而獲得輸入商品的關(guān)聯(lián)商品。

3 實(shí)驗(yàn)分析

本章針對前文所提出的優(yōu)化的維表關(guān)聯(lián)技術(shù)在Apache Flink 進(jìn)行了實(shí)現(xiàn),同時也實(shí)現(xiàn)了傳統(tǒng)的維表關(guān)聯(lián)技術(shù)。然后對比和驗(yàn)證了在不同流數(shù)據(jù)和維表數(shù)據(jù)規(guī)模下,優(yōu)化的維表關(guān)聯(lián)技術(shù)和傳統(tǒng)的維表關(guān)聯(lián)技術(shù)的效率差異。

3.1 編碼實(shí)現(xiàn)

編碼實(shí)現(xiàn)分為傳統(tǒng)的維表關(guān)聯(lián)實(shí)現(xiàn)與優(yōu)化后的維表關(guān)聯(lián)實(shí)現(xiàn)。

在Apache Flink 1.8.0 中,DataStream API 并沒有原生提供對維表關(guān)聯(lián)計算的支持。由于傳統(tǒng)的維表關(guān)聯(lián)本質(zhì)上還是流計算,本文根據(jù)計算邏輯,對流計算中的異步計算算子進(jìn)行了改造,添加的對數(shù)據(jù)庫查詢的支持和基于LRU(least recently used)策略的數(shù)據(jù)緩存機(jī)制。然后對該算子進(jìn)行了封裝,使算子支持維表的定義和讀取以及連接條件的設(shè)置。

優(yōu)化后的維表關(guān)聯(lián)基于Mixed API 進(jìn)行實(shí)現(xiàn)。通過設(shè)計新的雙輸入算子,使一個輸入為流數(shù)據(jù),一個輸入為維表數(shù)據(jù),做到對混合計算的支持。然后單獨(dú)設(shè)計了維表數(shù)據(jù)的數(shù)據(jù)源方法,并對系統(tǒng)中原有的數(shù)據(jù)分發(fā)過程進(jìn)行改造以適應(yīng)新的數(shù)據(jù)分發(fā)邏輯。最后對整個過程進(jìn)行了封裝,對數(shù)據(jù)連接信息選擇函數(shù)和維表關(guān)聯(lián)計算函數(shù)進(jìn)行了抽象和默認(rèn)邏輯的實(shí)現(xiàn)。對于緩存,依托Mixed API 的特性,在批數(shù)據(jù)計算和分發(fā)完成前,流數(shù)據(jù)處理是阻塞的,保證了維表緩存的完整性。

3.2 數(shù)據(jù)集

本文針對維表關(guān)聯(lián)這一實(shí)際生產(chǎn)活動中所遇到的計算類型,使用了阿里巴巴“雙十一”中的真實(shí)生產(chǎn)場景進(jìn)行性能測試。該場景在數(shù)據(jù)脫敏和業(yè)務(wù)簡化后,共涉及3 個數(shù)據(jù)表:用戶信息表、商品信息表、用戶點(diǎn)擊數(shù)據(jù)表。本實(shí)驗(yàn)根據(jù)數(shù)據(jù)表格式在MySQL數(shù)據(jù)庫中建立了對應(yīng)數(shù)據(jù)表并進(jìn)行相應(yīng)的數(shù)據(jù)生成,數(shù)據(jù)表數(shù)據(jù)記錄數(shù)及數(shù)據(jù)容量如表1所示。同時根據(jù)該表同比例構(gòu)建了10 萬條用戶信息數(shù)據(jù)和100 萬條用戶信息數(shù)據(jù)的對應(yīng)數(shù)據(jù)表,用于進(jìn)行對比測試。查詢所需的鍵值列均構(gòu)建了索引,用于保證異步訪問時的查詢效率。

表1 數(shù)據(jù)表大小Table 1 Size of data table

該生產(chǎn)場景的具體業(yè)務(wù)為對用戶進(jìn)行廣告投放,簡化業(yè)務(wù)流程為:

(1)向推薦系統(tǒng)流入用戶ID;

(2)系統(tǒng)根據(jù)用戶ID 獲取用戶一天內(nèi)的商品點(diǎn)擊數(shù)據(jù)和用戶信息數(shù)據(jù),通過機(jī)器學(xué)習(xí)算法計算出推薦商品ID;

(3)通過推薦商品ID 獲取商品具體信息并返回。

本實(shí)驗(yàn)為專注于測試維表關(guān)聯(lián)效率,簡化了推薦計算部分,只保留維表關(guān)聯(lián)部分,修改后的業(yè)務(wù)邏輯如下:

(1)向測試系統(tǒng)流入用戶ID;

(2)根據(jù)用戶ID,擴(kuò)展出用戶具體信息和用戶點(diǎn)擊商品ID;

(3)對每個用戶點(diǎn)擊商品ID 追加一個隨機(jī)數(shù)作為推薦商品ID;

(4)通過推薦商品ID 獲取商品具體信息并返回。

3.3 實(shí)驗(yàn)環(huán)境

本文所描述的相關(guān)技術(shù)細(xì)節(jié),均在Flink 1.8.0 中進(jìn)行實(shí)現(xiàn),采用Java 語言進(jìn)行編寫。實(shí)驗(yàn)的運(yùn)行環(huán)境及軟硬件環(huán)境如下。

(1)硬件環(huán)境。實(shí)驗(yàn)采用的分布式環(huán)境由4 臺服務(wù)器組成,1 臺主節(jié)點(diǎn),3 臺從節(jié)點(diǎn)。從節(jié)點(diǎn)采用的服務(wù)器配置為:①CPU Intel Xeon E5-2603 V4 *2,核心數(shù)目6 核心;主頻1.7 GHz。②內(nèi)存64 GB,主頻2 400 MHz。③硬盤400 GB SSD。主節(jié)點(diǎn)配置為:①CPU Intel Xeon E5-2603 V4 *2,核心數(shù)目6 核心;主頻1.7 GHz。②內(nèi)存128 GB,主頻2 400 MHz。③硬盤400 GB SSD。

(2)軟件環(huán)境。①操作系統(tǒng)Centos 7;②存儲環(huán)境MySQL 5.6.45;③Apache Flink 版本1.8.0,JDK 版本1.8.0。

3.4 實(shí)驗(yàn)結(jié)果與分析

本文通過使用阿里巴巴所提供的advertising測試工具,通過修改其中業(yè)務(wù)計算部分的邏輯來進(jìn)行測試。該測試會在計算開始和結(jié)束時對數(shù)據(jù)添加時間戳并寫入Redis,然后通過對Redis 中的數(shù)據(jù)進(jìn)行分析獲取相關(guān)統(tǒng)計結(jié)果。本實(shí)驗(yàn)所有計算的并行度固定為12,忽略維表全緩存的冷啟動時間。生成100 萬條流數(shù)據(jù)作為實(shí)驗(yàn)1,生成1 000 萬條流數(shù)據(jù)作為實(shí)驗(yàn)2,對比流數(shù)據(jù)的數(shù)據(jù)量和維表的數(shù)據(jù)量對計算吞吐量和延遲的影響。

(1)吞吐量對比分析

吞吐量(throughput)是指單位時間內(nèi)計算引擎所處理的數(shù)據(jù)量,反映了系統(tǒng)的負(fù)載能力。吞吐量越高,系統(tǒng)的極限負(fù)載就越大,有助于在單位時間內(nèi)處理更多的數(shù)據(jù)。本實(shí)驗(yàn)通過計算在Redis 中單位時間寫入的數(shù)據(jù)量來統(tǒng)計吞吐,吞吐量計算公式如下:

其中,代表當(dāng)前讀到的數(shù)據(jù)編號,表示前一次讀到的數(shù)據(jù)編號,表示讀取當(dāng)前數(shù)據(jù)表編號所對應(yīng)的時間,表示讀取上一次數(shù)據(jù)編號所對應(yīng)的時間。

在實(shí)驗(yàn)1 中,不同規(guī)模的維表數(shù)據(jù)量所產(chǎn)生的數(shù)據(jù)平均吞吐量如圖6 所示。實(shí)驗(yàn)結(jié)果表明數(shù)據(jù)全量緩存可以有效提升計算吞吐量,在三種維表規(guī)模下,均有10 倍的提升。異步連接方式隨著維表數(shù)據(jù)增大,吞吐量呈現(xiàn)下降趨勢。經(jīng)分析得出,這是由于異步連接所構(gòu)建的緩存沒有被命中,還有大量查詢打入了數(shù)據(jù)庫中。數(shù)據(jù)查詢所用的時間延長了每條數(shù)據(jù)的處理時間,使單位時間內(nèi)可以處理的數(shù)據(jù)量減少。

圖6 實(shí)驗(yàn)1 平均吞吐量統(tǒng)計Fig.6 Statistics of experiment 1 mean throughput

在實(shí)驗(yàn)2 中,不同規(guī)模的維表數(shù)據(jù)量所產(chǎn)生的數(shù)據(jù)平均吞吐量如圖7 所示。與實(shí)驗(yàn)1 相同,全緩存方式吞吐量統(tǒng)計結(jié)果相似,說明數(shù)據(jù)處理效率已經(jīng)和流入數(shù)據(jù)量無關(guān),增大并行度可以進(jìn)一步提高吞吐量。隨著維表數(shù)據(jù)增大,異步連接吞吐量趨于穩(wěn)定。這種現(xiàn)象的產(chǎn)生是由于流數(shù)據(jù)量增大,短時間內(nèi)查詢到的維表數(shù)據(jù)已經(jīng)全部加載到了本地緩存中。異步計算方式吞吐量仍處于較低水平是異步連接機(jī)制并發(fā)度不足導(dǎo)致的,然而過高的并發(fā)度會在計算初期直接導(dǎo)致存儲系統(tǒng)連接崩潰。

圖7 實(shí)驗(yàn)2 平均吞吐量統(tǒng)計Fig.7 Statistics of experiment 2 mean throughput

(2)延遲對比分析

延遲(latency)是指數(shù)據(jù)從進(jìn)入計算系統(tǒng)到真正被處理并輸出所使用的時間,單位為毫秒(ms)。延遲的大小反映了系統(tǒng)對數(shù)據(jù)的處理速度和實(shí)時性。實(shí)時推薦的廣告系統(tǒng)對延遲有較高的需求,延遲越小,用戶體驗(yàn)越好。本實(shí)驗(yàn)通過在Redis中記錄的每條數(shù)據(jù)的起止時間戳來計算延遲,延遲計算公式如下:

表示數(shù)據(jù)被處理完成的時間,指數(shù)據(jù)流入系統(tǒng)后被標(biāo)記的時間。

在實(shí)驗(yàn)1 中,不同規(guī)模的維表數(shù)據(jù)量所產(chǎn)生的數(shù)據(jù)平均延遲如圖8 所示。全緩存模式下,數(shù)據(jù)都已經(jīng)寫入到內(nèi)存中,沒有數(shù)據(jù)I/O 開銷,因此平均延遲沒有太大的起伏。異步連接的計算模式由于前期沒有維表數(shù)據(jù)的緩存,需要與數(shù)據(jù)庫進(jìn)行大量的交互,隨著維表規(guī)模的增大,單次查詢的處理時間越來越高,導(dǎo)致平均延遲越來越大。

圖8 實(shí)驗(yàn)1 平均延遲Fig.8 Experiment 1 mean delay

在實(shí)驗(yàn)2 中,不同規(guī)模的維表數(shù)據(jù)量所產(chǎn)生的數(shù)據(jù)平均延遲如圖9 所示。全緩存模式下,延遲相對于實(shí)驗(yàn)1 有所增加是由于為了平攤內(nèi)存壓力,多次維表關(guān)聯(lián)計算間沒有共享計算槽,數(shù)據(jù)分發(fā)需要經(jīng)歷序列化與反序列化。過高的吞吐量和數(shù)據(jù)量導(dǎo)致數(shù)據(jù)傳輸壓力過大,從而產(chǎn)生阻塞。同時由于緩存機(jī)制是手動實(shí)現(xiàn)的,沒有對接Flink 計算系統(tǒng)的背壓機(jī)制,導(dǎo)致計算節(jié)點(diǎn)數(shù)據(jù)過量積壓。異步連接模式由于計算前期的大量I/O 已經(jīng)暫時緩存了所有需要的維表數(shù)據(jù)。多次維表關(guān)聯(lián)共享了計算槽,數(shù)據(jù)傳輸不需要經(jīng)歷序列化和反序列化過程,直接發(fā)給下游計算節(jié)點(diǎn)。同時由于Apache Flink 的背壓機(jī)制,數(shù)據(jù)實(shí)際被標(biāo)記的流入系統(tǒng)的時間可能晚于數(shù)據(jù)實(shí)際產(chǎn)生的時間。

圖9 實(shí)驗(yàn)2 平均延遲Fig.9 Experiment 2 mean delay

4 總結(jié)

本文對面向分布式流計算的維表關(guān)聯(lián)技術(shù)進(jìn)行了研究和優(yōu)化。與現(xiàn)有的大數(shù)據(jù)計算平臺的關(guān)聯(lián)機(jī)制相比,首先提供了一套可以實(shí)現(xiàn)流批處理一體化的數(shù)據(jù)處理邏輯,其次提供了一種可以支持縱向擴(kuò)展的維表關(guān)聯(lián)數(shù)據(jù)全緩存方案,并對數(shù)據(jù)分發(fā)進(jìn)行了優(yōu)化。大量實(shí)驗(yàn)結(jié)果表明,該優(yōu)化技術(shù)可以有效提高全緩存條件下對維表數(shù)據(jù)的加載量,在高速處理環(huán)節(jié)最多可以提高10 倍的吞吐量,顯著降低了數(shù)據(jù)的查詢壓力,提高了任務(wù)的并行度。在維表數(shù)據(jù)量較大的情況下,異步查詢方式在高并行度時會有很大概率導(dǎo)致數(shù)據(jù)庫假死,并且由于數(shù)據(jù)I/O 的關(guān)系,無法做到很高的吞吐量。普通全量緩存模式則直接會導(dǎo)致系統(tǒng)產(chǎn)生內(nèi)存溢出異常。本文所提出的維表關(guān)聯(lián)技術(shù),支持在高并行度下運(yùn)行,并且對數(shù)據(jù)庫造成的讀寫壓力不隨并行度提高而提升。

猜你喜歡
數(shù)據(jù)量算子數(shù)據(jù)處理
認(rèn)知診斷缺失數(shù)據(jù)處理方法的比較:零替換、多重插補(bǔ)與極大似然估計法*
有界線性算子及其函數(shù)的(R)性質(zhì)
基于低頻功率數(shù)據(jù)處理的負(fù)荷分解方法
無人機(jī)測繪數(shù)據(jù)處理關(guān)鍵技術(shù)及運(yùn)用
基于大數(shù)據(jù)量的初至層析成像算法優(yōu)化
高刷新率不容易顯示器需求與接口標(biāo)準(zhǔn)帶寬
Domestication or Foreignization:A Cultural Choice
高層建筑沉降監(jiān)測數(shù)據(jù)處理中多元回歸分析方法的應(yīng)用研究
高層建筑沉降監(jiān)測數(shù)據(jù)處理中多元回歸分析方法的應(yīng)用研究
QK空間上的疊加算子