張譯天 于炯 魯亮 李梓楊
摘 要:新型大數(shù)據(jù)流式計(jì)算框架Apache Heron默認(rèn)使用輪詢調(diào)度算法進(jìn)行任務(wù)調(diào)度,忽略了拓?fù)溥\(yùn)行時狀態(tài)以及任務(wù)實(shí)例間不同通信方式對系統(tǒng)性能的影響。針對這個問題,提出Heron環(huán)境下流分類任務(wù)調(diào)度策略(DSC-Heron),包括流分類算法、流簇分配算法和流分類調(diào)度算法。首先通過建立Heron作業(yè)模型明確任務(wù)實(shí)例間不同通信方式的通信開銷差異;其次基于流分類模型,根據(jù)任務(wù)實(shí)例間實(shí)時數(shù)據(jù)流大小對數(shù)據(jù)流進(jìn)行分類;最后將相互關(guān)聯(lián)的高頻數(shù)據(jù)流整體作為基本調(diào)度單元構(gòu)建任務(wù)分配計(jì)劃,在滿足資源約束條件的同時盡可能多地將節(jié)點(diǎn)間通信轉(zhuǎn)化為節(jié)點(diǎn)內(nèi)通信以最小化系統(tǒng)通信開銷。在包含9個節(jié)點(diǎn)的Heron集群環(huán)境下分別運(yùn)行SentenceWordCount、WordCount和FileWordCount拓?fù)洌Y(jié)果表明DSC-Heron相對于Heron默認(rèn)調(diào)度策略,在系統(tǒng)完成時延、節(jié)點(diǎn)間通信開銷和系統(tǒng)吞吐量上分別平均優(yōu)化了8.35%、7.07%和6.83%;在負(fù)載均衡性方面,工作節(jié)點(diǎn)的CPU占用率和內(nèi)存占用率標(biāo)準(zhǔn)差分別平均下降了41.44%和41.23%。實(shí)驗(yàn)結(jié)果表明,DSC-Heron對測試拓?fù)涞倪\(yùn)行性能有一定的優(yōu)化作用,其中對接近真實(shí)應(yīng)用場景的FileWordCount拓?fù)鋬?yōu)化效果最為顯著。
關(guān)鍵詞:大數(shù)據(jù);流式計(jì)算;Apache Heron;任務(wù)調(diào)度;數(shù)據(jù)流分類;通信開銷
中圖分類號:TP311
文獻(xiàn)標(biāo)志碼:A
文章編號:1001-9081(2019)04-1106-011
0?引言
隨著云計(jì)算、物聯(lián)網(wǎng)、移動互聯(lián)、社交媒體和人工智能等新型信息技術(shù)和應(yīng)用模式的不斷發(fā)展,數(shù)據(jù)正以前所未有的方式推動人類社會進(jìn)入大數(shù)據(jù)時代[1]。國際數(shù)據(jù)公司(International Data Corporation, IDC)發(fā)布的白皮書《數(shù)據(jù)時代2025》中顯示,預(yù)計(jì)到2025年全球互聯(lián)網(wǎng)數(shù)據(jù)總量達(dá)162ZB,其中超過1/4的數(shù)據(jù)為實(shí)時數(shù)據(jù),而物聯(lián)網(wǎng)實(shí)時數(shù)據(jù)將占這部分?jǐn)?shù)據(jù)的95%以上[2]。面對實(shí)時大數(shù)據(jù)具有的實(shí)時性、易失性、突發(fā)性、無序性和無限性的新特征[3],傳統(tǒng)的MapReduce等批處理方式不再適用,分布式大數(shù)據(jù)流式計(jì)算應(yīng)運(yùn)而生。流數(shù)據(jù)處理摒棄了傳統(tǒng)批處理中對數(shù)據(jù)先存儲后計(jì)算的方式,將數(shù)據(jù)以數(shù)據(jù)流的形式在數(shù)據(jù)產(chǎn)生初期進(jìn)行計(jì)算,使用可靠傳輸模式而不對計(jì)算中間結(jié)果進(jìn)行存儲,在對數(shù)據(jù)分析實(shí)時性要求較高的場景中得到了廣泛的應(yīng)用,并不斷融入實(shí)時圖像識別、智慧城市等人工智能的發(fā)展中。
目前,在典型的流數(shù)據(jù)處理系統(tǒng)(例如,Storm[4]、Flink[5])中,默認(rèn)調(diào)度算法常采用靜態(tài)輪詢調(diào)度算法。該算法實(shí)現(xiàn)簡單,在拓?fù)涮峤粫r對拓?fù)溥M(jìn)行任務(wù)分配,拓?fù)溥\(yùn)行期間任務(wù)分配狀態(tài)不再發(fā)生改變;但靜態(tài)輪詢調(diào)度算法在進(jìn)行任務(wù)分配時,僅考慮資源的可滿足性將任務(wù)均勻分配到各個工作節(jié)點(diǎn),未考慮拓?fù)渲腥蝿?wù)間的關(guān)聯(lián)關(guān)系和節(jié)點(diǎn)間的通信開銷,會對集群性能產(chǎn)生一定影響。針對這一問題,國內(nèi)外研究人員針對不同流式計(jì)算平臺提出在線調(diào)度算法,在拓?fù)溥\(yùn)行過程中通過實(shí)時監(jiān)測集群運(yùn)行狀態(tài),對已分配的任務(wù)進(jìn)行重調(diào)度或調(diào)整,以使集群具有更高效的性能。針對Storm環(huán)境,文獻(xiàn)[6]提出自適應(yīng)調(diào)度策略,分為離線調(diào)度和在線調(diào)度。其中在線調(diào)度通過實(shí)時監(jiān)測拓?fù)溥\(yùn)行過程中CPU負(fù)載和工作節(jié)點(diǎn)間數(shù)據(jù)流流量等數(shù)據(jù),依次將通信開銷較大的一對任務(wù)調(diào)度到CPU負(fù)載較小的工作節(jié)點(diǎn)中。該策略可以較好地降低Storm的通信開銷,但實(shí)驗(yàn)中使用的是自定義鏈?zhǔn)酵負(fù)?,缺乏一定的代表性。文獻(xiàn)[7]提出流量感知在線調(diào)度策略T-Storm,旨在最小化進(jìn)程間和工作節(jié)點(diǎn)間通信開銷,同時實(shí)現(xiàn)了細(xì)粒度的任務(wù)分配控制,可以通過調(diào)整預(yù)設(shè)參數(shù)控制工作節(jié)點(diǎn)數(shù)量。但該策略忽略了直接通信的一對任務(wù)之間的數(shù)據(jù)流情況且調(diào)度執(zhí)行開銷較大。文獻(xiàn)[8]提出資源感知調(diào)度策略R-Storm,通過將CPU、內(nèi)存和網(wǎng)絡(luò)帶寬資源映射為三維空間向量,使用最小化向量距離的方法尋找任務(wù)和工作節(jié)點(diǎn)的分配關(guān)系,從而最大化資源利用并提高系統(tǒng)吞吐量。該策略充分考慮了集群資源的有效利用,但拓?fù)渲懈魅蝿?wù)的資源需求由編程人員設(shè)定而非實(shí)時監(jiān)測獲得,很難應(yīng)用于資源需求變化較大的在線調(diào)度。文獻(xiàn)[9]提出一種異構(gòu)環(huán)境下的任務(wù)遷移策略TMSH-Storm,將超出閾值節(jié)點(diǎn)中的阻尼線程細(xì)粒度地遷移至滿足條件的目標(biāo)節(jié)點(diǎn),避免了資源溢出后的任務(wù)重部署,可以較好地降低調(diào)度時延和節(jié)點(diǎn)間通信開銷。針對Flink環(huán)境,文獻(xiàn)[10]提出流網(wǎng)絡(luò)的流式計(jì)算動態(tài)任務(wù)調(diào)度策略。該策略通過建立流網(wǎng)絡(luò)模型,基于最大流算法使模型在滿足延遲約束前提下提高集群的實(shí)際吞吐量,能在一定程度上解決輸入速率增加階段出現(xiàn)的計(jì)算延遲升高問題。
但該策略僅關(guān)注集群輸入速率急劇上升階段的性能優(yōu)化,且沒有考慮流網(wǎng)絡(luò)容量動態(tài)變化的問題。針對其他流式計(jì)算系統(tǒng),文獻(xiàn)[11]從資源分配的角度出發(fā),對多代分布式流處理系統(tǒng)的彈性資源調(diào)度機(jī)制進(jìn)行對比并提出未來研究方向。文獻(xiàn)[12]基于拓?fù)淙蝿?wù)與集群資源的分配與映射關(guān)系提出模型驅(qū)動的調(diào)度策略,為流處理系統(tǒng)提供高效的資源利用率和吞吐量。文獻(xiàn)[13-14]側(cè)重于流處理系統(tǒng)的穩(wěn)定性,分別提出一種基于隊(duì)列的穩(wěn)定性預(yù)測模型和一種穩(wěn)定的在線調(diào)度策略以優(yōu)化系統(tǒng)性能。
Heron[15-17]是Twitter為解決其上一代分布式流處理平臺Storm在可擴(kuò)展性、可調(diào)試性、可管理性以及集群資源共享等方面問題,而構(gòu)建的新一代分布式流處理平臺[15]。Heron在2015年已經(jīng)取代Storm,成為Twitter實(shí)際使用的實(shí)時數(shù)據(jù)處理系統(tǒng)[17],現(xiàn)已進(jìn)入Apache開源項(xiàng)目孵化器。Heron在設(shè)計(jì)層面對Storm進(jìn)行多方面優(yōu)化的同時,其默認(rèn)調(diào)度策略仍使用輪詢(Round Robin,RR)算法進(jìn)行任務(wù)實(shí)例分配。該算法根據(jù)拓?fù)渲腥蝿?wù)實(shí)例資源需求和容器資源需求,創(chuàng)建用于調(diào)度的任務(wù)分配計(jì)劃,然后交由Heron調(diào)度器(Aurora[18]、Mesos[19]、YARN[20]等)分配至工作節(jié)點(diǎn)。對于Twitter目前使用的Aurora調(diào)度器,Heron將一個拓?fù)鋵?yīng)于Aurora中一個工作(Job),一個容器與其中分配的任務(wù)實(shí)例作為Aurora的一個資源分配單元,由Aurora負(fù)責(zé)將這些單元分配至集群中滿足資源需求的工作節(jié)點(diǎn)中運(yùn)行。在配置了Aurora調(diào)度器的Heron集群中,工作節(jié)點(diǎn)資源分配由Mesos負(fù)責(zé)進(jìn)行,其采用DRF(Dominant Resource Fairness)資源分配策略為Aurora框架中提交的任務(wù)尋找集群中注冊的合適工作節(jié)點(diǎn)進(jìn)行任務(wù)分配。
在Heron調(diào)度策略和任務(wù)分配過程中仍存在如下問題:1)任務(wù)分配沒有考慮任務(wù)實(shí)例間通信開銷,忽略了節(jié)點(diǎn)間通信和節(jié)點(diǎn)內(nèi)通信的差異;2)任務(wù)分配僅考慮了任務(wù)實(shí)例和工作節(jié)點(diǎn)資源的約束關(guān)系,且算法采用資源最大化對齊的方式容易造成資源浪費(fèi);3)僅提供了靜態(tài)調(diào)度策略,無法針對運(yùn)行狀態(tài)下的拓?fù)溥M(jìn)行實(shí)時調(diào)度。
針對Heron默認(rèn)調(diào)度策略中存在的上述問題,本文提出流分類調(diào)度策略(task scheduling strategy based on Data Stream Classification in Heron, DSC-Heron),主要工作如下:1)提出Heron作業(yè)模型,將拓?fù)渲腥蝿?wù)實(shí)例通信方式劃分為節(jié)點(diǎn)間、容器間和實(shí)例間通信,明確不同通信方式間的通信開銷差異。
2)以Heron作業(yè)模型為基礎(chǔ),提出資源約束模型、最優(yōu)通信開銷模型和流分類模型,作為提出DSC-Heron任務(wù)調(diào)度策略的理論依據(jù)。
3)提出流分類調(diào)度策略,包括流分類算法、流簇分配算法和流分類調(diào)度算法。該策略首先根據(jù)數(shù)據(jù)流實(shí)時大小對數(shù)據(jù)流進(jìn)行分類,然后以高頻數(shù)據(jù)流關(guān)聯(lián)的高頻流簇為單位進(jìn)行任務(wù)調(diào)度,使得拓?fù)涞娜蝿?wù)分配在滿足資源約束條件的同時最小化節(jié)點(diǎn)間通信開銷。
4)使用Heron示例拓?fù)浜妥远x拓?fù)鋵α鞣诸愓{(diào)度策略進(jìn)行性能評估。實(shí)驗(yàn)結(jié)果表明,相較于Heron默認(rèn)調(diào)度策略,DSC-Heron在系統(tǒng)完成時延、節(jié)點(diǎn)間數(shù)據(jù)流大小和吞吐量方面均有一定的優(yōu)化效果。
1?Heron作業(yè)模型
在Heron中,拓?fù)涫怯脩舳x流式作業(yè)的抽象,使用有向無環(huán)圖(Directed Acyclic Graph, DAG)表示,由組件和數(shù)據(jù)流構(gòu)成。組件分為Spout和Bolt兩類:Spout為數(shù)據(jù)源編程單元,可以從Kafka[21]、DistributedLog[22]或HDFS(Hadoop Distributed File System)中不間斷地讀取數(shù)據(jù),以數(shù)據(jù)流的形式傳遞給下游組件;Bolt為數(shù)據(jù)流處理單元,用于實(shí)現(xiàn)數(shù)據(jù)處理邏輯。數(shù)據(jù)流是對組件間以元組形式進(jìn)行數(shù)據(jù)傳遞的抽象,可以通過不同的流組模式定義元組的傳遞和分組方式。由此定義拓?fù)溥壿嬆P腿缦隆?/p>
Heron中為提高系統(tǒng)并行度和數(shù)據(jù)處理速度可以為拓?fù)渲忻總€組件定義運(yùn)行并行度,并在拓?fù)涮峤粫r為每個組件創(chuàng)建相應(yīng)數(shù)量的任務(wù)實(shí)例。每個任務(wù)實(shí)例運(yùn)行一個Java進(jìn)程且運(yùn)行在一個JVM中。由此定義拓?fù)鋵?shí)例模型如下。
根據(jù)實(shí)例分配模型可知,Heron任務(wù)實(shí)例之間的通信需要經(jīng)過所在容器中的SM進(jìn)行路由。集群中各個SM彼此連接形成一個全連接網(wǎng)絡(luò),將復(fù)雜度為O(N(N-1)/2)的N個任務(wù)實(shí)例間通信,通過M個SM簡化為O(M(M-1)/2),其中NM。因此,在Heron集群中存在三種不同的通信方式:1)工作節(jié)點(diǎn)間通信,即集群不同物理工作節(jié)點(diǎn)間任務(wù)實(shí)例的通信方式。這種通信方式中,數(shù)據(jù)流需要經(jīng)過源任務(wù)實(shí)例、源任務(wù)所屬SM、目的任務(wù)所屬SM和目的任務(wù)實(shí)例進(jìn)行傳輸,會占用大量的網(wǎng)絡(luò)帶寬資源,是集群中通信開銷最大的一種通信方式。如圖3中,任務(wù)實(shí)例Ia和Id1間的通信即屬于工作節(jié)點(diǎn)間通信。
2)容器間通信,即同一工作節(jié)點(diǎn)、不同容器中任務(wù)實(shí)例之間的通信方式。這種通信方式不占用網(wǎng)絡(luò)帶寬,數(shù)據(jù)流在同一節(jié)點(diǎn)的不同任務(wù)間進(jìn)行傳遞,但仍需要經(jīng)過源任務(wù)實(shí)例、目的任務(wù)實(shí)例以及各自所屬的SM,屬于進(jìn)程間通信且通信開銷較小。如圖3中,任務(wù)實(shí)例Id2和Ic間的通信即為容器間通信方式。
3)實(shí)例間通信,即同一工作節(jié)點(diǎn)且同一容器中任務(wù)實(shí)例間的直接通信。這種通信方式在一個容器中進(jìn)行,數(shù)據(jù)流只經(jīng)過一個SM進(jìn)行傳輸,是三種通信方式中通信開銷最小的一種。由于在Heron中每個任務(wù)實(shí)例都是一個Java進(jìn)程,因此這種通信方式也屬于進(jìn)程間通信,但由于減少了數(shù)據(jù)流經(jīng)過的SM數(shù)量,因此通信開銷較容器間通信開銷小。如圖3中,任務(wù)實(shí)例Ib和Ig間即屬于實(shí)例間通信方式。
2?問題建模與分析
本章在Heron作業(yè)模型的基礎(chǔ)之上提出資源約束模型、最優(yōu)通信開銷模型和流分類模型。其中資源約束模型為任務(wù)分配的基礎(chǔ)條件;最優(yōu)通信開銷模型論證了節(jié)點(diǎn)間和節(jié)點(diǎn)內(nèi)通信開銷的相互關(guān)系,為最小化通信開銷的任務(wù)調(diào)度過程提供依據(jù);流分類模型定義了拓?fù)渲袛?shù)據(jù)流分類的理論基礎(chǔ)。
2.1?資源約束模型
在Heron應(yīng)用環(huán)境中,為使各個工作節(jié)點(diǎn)不會出現(xiàn)滿負(fù)荷運(yùn)行狀態(tài)以影響集群運(yùn)行性能,需要為每個工作節(jié)點(diǎn)預(yù)留少量的計(jì)算資源。因此,在上述資源約束中,α、 β、γ分別為集群管理人員為CPU、內(nèi)存和網(wǎng)絡(luò)帶寬資源設(shè)定的資源閾值參數(shù),該參數(shù)可根據(jù)集群資源情況進(jìn)行設(shè)置。
2.2?最優(yōu)通信開銷模型
2.3?流分類模型
3?流分類調(diào)度策略
本章基于上述模型提出流分類調(diào)度策略(DSC-Heron),包括流分類算法、流簇分配算法和流分類調(diào)度算法。其中流分類算法以流分類模型為基礎(chǔ),以數(shù)據(jù)流實(shí)時大小為依據(jù)對數(shù)據(jù)流進(jìn)行分類;流簇分配算法以高頻流簇為基本單元進(jìn)行任務(wù)分配;流分類調(diào)度算法對不同類別的數(shù)據(jù)流依次進(jìn)行調(diào)度,最終完成目標(biāo)任務(wù)分配計(jì)劃的構(gòu)建。
3.1?流分類算法
通過使用3.4節(jié)提出的負(fù)載監(jiān)測模塊對集群中任務(wù)實(shí)例以及實(shí)例間數(shù)據(jù)流大小進(jìn)行監(jiān)測,實(shí)時獲取拓?fù)渲腥蝿?wù)實(shí)例間數(shù)據(jù)流大小,得到拓?fù)涞臄?shù)據(jù)流集合S={s1,2,s1,3,…,sI-1,I}作為輸入;然后根據(jù)流分類模型將集合S劃分為高頻數(shù)據(jù)流集Hf、中頻數(shù)據(jù)流集Mf和低頻數(shù)據(jù)流集Nf。具體算法如算法1所示。
算法1?流分類算法。
輸出?高頻數(shù)據(jù)流集Hf;中頻數(shù)據(jù)流集Mf;低頻數(shù)據(jù)流集Nf。
步驟1?根據(jù)S中各個數(shù)據(jù)流大小vij,kl,使用式(8)計(jì)算數(shù)據(jù)流總量V;使用式(9)計(jì)算數(shù)據(jù)流平均值;使用式(10)計(jì)算sij,kl的絕對偏差值;使用式(11)計(jì)算數(shù)據(jù)流總絕對偏差ΔV;使用式(12)平均偏差Δ。
步驟2?根據(jù)vij,kl大小降序排序S并進(jìn)行遍歷,進(jìn)行如下判斷:若數(shù)據(jù)流sij,kl滿足式(13),將其加入高頻數(shù)據(jù)流集Hf;若滿足式(14)或式(15),將其加入中頻數(shù)據(jù)流集合Mf;若滿足式(16),將其加入低頻數(shù)據(jù)流集合Nf。
步驟3?返回Hf、Mf和Nf。
流分類算法步驟1中包括兩次對數(shù)據(jù)流集合S的遍歷。第一次遍歷數(shù)據(jù)流集合S,計(jì)算出拓?fù)鋽?shù)據(jù)流總量V,進(jìn)而可以計(jì)算數(shù)據(jù)流平均值。第二次遍歷數(shù)據(jù)流集合S,由數(shù)據(jù)流平均值計(jì)算各個數(shù)據(jù)流的絕對偏差值,同時累加計(jì)算所有數(shù)據(jù)流的總絕對偏差值ΔV和平均偏差值Δ。算法步驟2~3根據(jù)流分類模型中的式(13)~(16),使用步驟1計(jì)算所得值,對數(shù)據(jù)流集合遍歷的同時進(jìn)行數(shù)據(jù)流的分類并返回分類所得不同數(shù)據(jù)流集合。由此可知,該算法的時間復(fù)雜度為數(shù)據(jù)流集合S的大小,即O(S)。
3.2?流簇分配算法
由定義4可知,DSC-Heron將高頻數(shù)據(jù)流關(guān)聯(lián)的高頻流簇作為調(diào)度的基本單元,盡可能得將同一高頻流簇相關(guān)的任務(wù)實(shí)例調(diào)度到同一工作節(jié)點(diǎn)。在調(diào)度的過程中,需要根據(jù)當(dāng)前高頻數(shù)據(jù)流關(guān)聯(lián)的任務(wù)實(shí)例在高頻流集合Hf中遞歸搜索與之關(guān)聯(lián)的流簇,然后對該流簇中的任務(wù)實(shí)例進(jìn)行分配,由此得到流簇分配算法。具體算法如算法2所示。
算法2?流簇分配算法。
輸入?數(shù)據(jù)流sij,kl關(guān)聯(lián)的任務(wù)實(shí)例Iij和Ikl;高頻數(shù)據(jù)流集合Hf;目標(biāo)節(jié)點(diǎn)nk;原始任務(wù)分配計(jì)劃PPold;當(dāng)前目標(biāo)任務(wù)分配計(jì)劃PPnew。
輸出?更新后的目標(biāo)任務(wù)分配計(jì)劃PPnew。
步驟1?根據(jù)Iij和Ikl在Hf中搜索關(guān)聯(lián)的高頻數(shù)據(jù)流sgh,ij和skl,mn,若不存在相關(guān)聯(lián)高頻數(shù)據(jù)流或存在任務(wù)實(shí)例Igh、Imn且均已分配則遞歸結(jié)束。若存在且未分配,根據(jù)PPold判斷Igh和Imn是否分別位于目標(biāo)節(jié)點(diǎn)nk中:如果是,判斷將該任務(wù)實(shí)例仍分配至nk中是否滿足資源約束條件:若滿足則進(jìn)行分配,更新PPnew并進(jìn)行步驟2;若不滿足則查找當(dāng)前PPnew中負(fù)載最小的工作節(jié)點(diǎn)進(jìn)行調(diào)度。
如果否,判斷將未分配任務(wù)實(shí)例調(diào)度到nk節(jié)點(diǎn)之后是否滿足資源約束條件:若滿足進(jìn)行調(diào)度,更新PPnew并進(jìn)行步驟2;若不滿足則查找當(dāng)前PPnew中負(fù)載最小的節(jié)點(diǎn)進(jìn)行調(diào)度。
步驟2?在Hf中遞歸查找Igh和Imn關(guān)聯(lián)的高頻數(shù)據(jù)流sef,gh和smn,op,若存在任務(wù)實(shí)例Ief,Iop未分配,重復(fù)步驟1直至遞歸結(jié)束。
流簇分配算法是一個遞歸的集合搜索過程,它將一個高頻數(shù)據(jù)流關(guān)聯(lián)的源任務(wù)實(shí)例、目的任務(wù)實(shí)例和高頻數(shù)據(jù)流集合作為輸入,分別對源任務(wù)實(shí)例和目的任務(wù)實(shí)例在高頻數(shù)據(jù)流集合中搜索相關(guān)聯(lián)的高頻數(shù)據(jù)流,并依次對搜索結(jié)果中包含的未分配任務(wù)實(shí)例進(jìn)行調(diào)度,以更新目標(biāo)任務(wù)分配計(jì)劃。同時,該搜索過程也是構(gòu)建該數(shù)據(jù)流高頻流簇SC的過程。若一條高頻數(shù)據(jù)流在高頻數(shù)據(jù)流集合中不存在關(guān)聯(lián)的高頻流或存在且均已經(jīng)分配完成則遞歸結(jié)束,返回對該條數(shù)據(jù)流構(gòu)建完成的目標(biāo)任務(wù)分配計(jì)劃。流簇分配算法將每個高頻數(shù)據(jù)流的關(guān)聯(lián)的流簇集合整體作為任務(wù)調(diào)度對象,旨在最大化節(jié)點(diǎn)內(nèi)任務(wù)間數(shù)據(jù)流,根據(jù)最優(yōu)通信開銷模型即等價于最小化節(jié)點(diǎn)間數(shù)據(jù)流,以減少拓?fù)湔w通信開銷。根據(jù)該遞歸算法可知,其時間復(fù)雜度為:O(Hf·SC)。其中,由于SC的最大值等于拓?fù)潢P(guān)鍵路徑長度,因此該算法的運(yùn)行時間與拓?fù)涞膶訑?shù)以及高頻數(shù)據(jù)流集合規(guī)模有關(guān)。
3.3?流分類任務(wù)調(diào)度算法
流分類調(diào)度算法整合了流分類算法和流簇分配算法,將流分類算法中得到的高頻流集合作為輸入,通過遍歷數(shù)據(jù)流集合S,首先對其中高頻數(shù)據(jù)流進(jìn)行分配并對該數(shù)據(jù)流調(diào)用流簇分配算法進(jìn)行任務(wù)調(diào)度,直至所有的高頻數(shù)據(jù)流分配完成,然后對未分配的中頻數(shù)據(jù)流和低頻數(shù)據(jù)流分別調(diào)度,完成構(gòu)建目標(biāo)任務(wù)分配計(jì)劃。具體算法如算法3所示。
算法3?流分類調(diào)度算法。
輸出?目標(biāo)任務(wù)分配計(jì)劃PPnew。
初始化?由負(fù)載監(jiān)測模塊獲取當(dāng)前各數(shù)據(jù)流大小vij,kl和各任務(wù)實(shí)例CPU負(fù)載wIij以初始化數(shù)據(jù)流集合S與任務(wù)實(shí)例負(fù)載集合W;使用流分類算法得到各數(shù)據(jù)流分類集合Hf、Mf、Nf;初始化PPnew為空。
步驟1?根據(jù)數(shù)據(jù)流vij,kl的大小對集合S進(jìn)行降序排序,遍歷集合S中各數(shù)據(jù)流sij,kl。
步驟2?如果sij,kl屬于高頻數(shù)據(jù)流集合Hf,進(jìn)行如下步驟:①判斷數(shù)據(jù)流sij,kl關(guān)聯(lián)的兩個任務(wù)實(shí)例Iij和Ikl是否在PPnew中已經(jīng)重新分配,若都已經(jīng)重新分配則對該數(shù)據(jù)流的調(diào)度結(jié)束。
②若僅其中一個任務(wù)已分配,這里以Iij已分配至工作節(jié)點(diǎn)nk且Ikl未分配為例,進(jìn)行如下步驟:(a)根據(jù)PPold判斷Iij和Ikl是否位于同一節(jié)點(diǎn)。若在同一節(jié)點(diǎn)nk,更新PPnew并調(diào)用流簇分配算法,分配sij,kl關(guān)聯(lián)的其他高頻數(shù)據(jù)流,調(diào)度結(jié)束。
(b)若Iij和Ikl位于不同節(jié)點(diǎn),根據(jù)資源約束模型判斷Ikl調(diào)度到工作nk后是否滿足資源約束,若滿足則將Ikl調(diào)度到工作節(jié)點(diǎn)nk,更新PPnew并調(diào)用流簇分配算法,分配sij,kl關(guān)聯(lián)的高頻數(shù)據(jù)流,調(diào)度結(jié)束。若不滿足,則根據(jù)W查找當(dāng)前PPnew中負(fù)載最小節(jié)點(diǎn)分配任務(wù)Ikl。
③若任務(wù)實(shí)例Iij和Ikl均未分配,進(jìn)行如下步驟:(a)根據(jù)PPold判斷Iij和Ikl是否位于同一節(jié)點(diǎn),若在同一節(jié)點(diǎn)nk,更新PPnew并調(diào)用流簇分配算法,分配sij,kl關(guān)聯(lián)的高頻數(shù)據(jù)流,調(diào)度結(jié)束。
(b)若Iij和Ikl位于不同節(jié)點(diǎn)ni和nk中,計(jì)算當(dāng)前分配計(jì)劃PPnew中ni和nk已分配任務(wù)的負(fù)載,將Iij和Ikl分配到負(fù)載較小的節(jié)點(diǎn)中,并根據(jù)資源約束模型判斷調(diào)度過程中是否滿足資源約束,若滿足則逐個調(diào)度任務(wù),更新PPnew并調(diào)用流簇分配算法,分配sij,kl關(guān)聯(lián)的高頻數(shù)據(jù)流,調(diào)度結(jié)束。若在調(diào)度任務(wù)實(shí)例Ikl時已不滿足資源約束條件,則將其調(diào)度至當(dāng)前負(fù)載最小的節(jié)點(diǎn)。
步驟3?如果sij,kl屬于中頻數(shù)據(jù)流集合Mf,則重復(fù)步驟2中的①~③,但不再調(diào)用流簇分配算法,此時高頻數(shù)據(jù)流已經(jīng)調(diào)度完成。
步驟4?結(jié)束數(shù)據(jù)流集合S的遍歷,對剩下低頻數(shù)據(jù)流集合Nf中sij,kl計(jì)算當(dāng)前PPnew中各個節(jié)點(diǎn)的任務(wù)負(fù)載情況,優(yōu)先將任務(wù)Iij、Ikl調(diào)度到負(fù)載較輕的節(jié)點(diǎn)中并保證滿足資源約束條件,直到全部任務(wù)調(diào)度完成。
算法步驟1中根據(jù)集合中數(shù)據(jù)流的大小對數(shù)據(jù)流集合S進(jìn)行降序排序,這樣可以保證在對集合S進(jìn)行遍歷時,對高頻數(shù)據(jù)流進(jìn)行優(yōu)先處理。
步驟2對集合S進(jìn)行遍歷,對屬于高頻數(shù)據(jù)流集合的當(dāng)前數(shù)據(jù)流進(jìn)行調(diào)度。該步驟中包含的①~③分別是對當(dāng)前高頻數(shù)據(jù)流關(guān)聯(lián)的兩個任務(wù)實(shí)例是否在目標(biāo)任務(wù)分配計(jì)劃中分配完成進(jìn)行判斷,目的是為了避免對可能出現(xiàn)在不同高頻數(shù)據(jù)流中的同一任務(wù)實(shí)例進(jìn)行重復(fù)調(diào)度。步驟①中,如果當(dāng)前的兩個任務(wù)實(shí)例均已經(jīng)在目標(biāo)任務(wù)分配計(jì)劃中分配完成,則不再進(jìn)行重復(fù)調(diào)度。步驟②中,當(dāng)前兩個任務(wù)實(shí)例中僅有一個任務(wù)實(shí)例已經(jīng)分配,則優(yōu)先將未分配的任務(wù)實(shí)例調(diào)度到已分配任務(wù)實(shí)例當(dāng)前所在的工作節(jié)點(diǎn)中,此舉是為了最大化節(jié)點(diǎn)內(nèi)任務(wù)實(shí)例間的直接通信以最小化節(jié)點(diǎn)間的通信開銷。但在這種調(diào)度的過程中,需要判斷調(diào)度發(fā)生后目標(biāo)節(jié)點(diǎn)是否滿足資源約束條件:若滿足則可以進(jìn)行調(diào)度,完成最大化節(jié)點(diǎn)內(nèi)通信開銷;若不滿足,則需要使用當(dāng)前目標(biāo)任務(wù)分配計(jì)劃PPnew和任務(wù)負(fù)載集合W尋找當(dāng)前集群中負(fù)載最小的工作節(jié)點(diǎn)分配該任務(wù),目的是為了在未能滿足最小化通信開銷的情況下,盡量平衡集群各工作節(jié)點(diǎn)負(fù)載從而在負(fù)載均衡的角度優(yōu)化集群性能。步驟③中為當(dāng)前兩個任務(wù)實(shí)例均未重新分配,需要依次對兩個未分配任務(wù)實(shí)例進(jìn)行調(diào)度。首先根據(jù)原始任務(wù)分配計(jì)劃PPold判斷兩個任務(wù)實(shí)例是否位于同一節(jié)點(diǎn),并優(yōu)先將兩個任務(wù)調(diào)度到其中當(dāng)前負(fù)載較輕的工作節(jié)點(diǎn)中,以最小化通信開銷的同時均衡集群負(fù)載。但在對兩個任務(wù)進(jìn)行依次調(diào)度時,均需要對目標(biāo)工作節(jié)點(diǎn)計(jì)算調(diào)度完成后的資源約束情況,若不滿足資源約束,則同步驟②中的方法相同,將該任務(wù)實(shí)例分配至當(dāng)前集群中負(fù)載最小的工作節(jié)點(diǎn)。
通過步驟1~2,算法將數(shù)據(jù)流集合S中包含的高頻數(shù)據(jù)流及其關(guān)聯(lián)高頻流簇調(diào)度完成,因此步驟3對中頻數(shù)據(jù)流集合中的數(shù)據(jù)流進(jìn)行調(diào)度時,不需要考慮高頻流簇的分配,僅對中頻數(shù)據(jù)流本身繼續(xù)調(diào)度。
算法步驟4進(jìn)行之前,前序算法步驟已經(jīng)對數(shù)據(jù)流集合S中高頻數(shù)據(jù)流和中頻數(shù)據(jù)流調(diào)度結(jié)束,此時只剩下低頻數(shù)據(jù)流,而低頻數(shù)據(jù)流在拓?fù)渲袑φw通信開銷的影響最小,因此僅對其進(jìn)行負(fù)載均衡處理,依次將低頻數(shù)據(jù)流調(diào)度至集群當(dāng)前負(fù)載較小的工作節(jié)點(diǎn)中,最終完成全部數(shù)據(jù)流的調(diào)度。根據(jù)該算法步驟可知,該算法在對數(shù)據(jù)流集合S遍歷的同時分別將屬于各個類別的數(shù)據(jù)流調(diào)度至符合條件的工作節(jié)點(diǎn)中,因此算法時間復(fù)雜度為O(S·N)(其中N為工作節(jié)點(diǎn)數(shù)量)。
3.4?算法部署與實(shí)現(xiàn)
Heron為編程人員提供了可擴(kuò)展的Custom Scheduler[23]實(shí)現(xiàn)。為實(shí)現(xiàn)自定義調(diào)度器,需要實(shí)現(xiàn)與Heron調(diào)度器相關(guān)的IPacking、ILauncher、IScheduler和IUploader四個Java接口。在本文實(shí)驗(yàn)中,DSC-Heron基于Heron中默認(rèn)AuroraScheduler進(jìn)行部署和實(shí)現(xiàn),Uploader仍使用HDFS不作修改,但分別實(shí)現(xiàn)了以下三個接口:1)DSCPacking。實(shí)現(xiàn)IPacking接口,用于部署DSC-Heron以構(gòu)建目標(biāo)任務(wù)分配計(jì)劃,為調(diào)度控制模塊對拓?fù)淙蝿?wù)的重調(diào)度提供依據(jù)。
2)DSCLauncher。實(shí)現(xiàn)ILauncher接口,替換原有的AuroraLauncher。用于在拓?fù)涮峤缓髣?chuàng)建DSCScheduler對象實(shí)例并調(diào)用其onSchedule方法啟動自定義調(diào)度器。
3)DSCScheduler。實(shí)現(xiàn)IScheduler接口,替換默認(rèn)AuroraScheduler,拓?fù)涮峤缓髮⒂稍撜{(diào)度器完成拓?fù)涞某醮握{(diào)度。其中部署調(diào)度觸發(fā)模塊和調(diào)度控制模塊,用于重調(diào)度的觸發(fā)以及使用DSCPacking創(chuàng)建的目標(biāo)任務(wù)分配計(jì)劃更新拓?fù)?,完成重調(diào)度過程。
在拓?fù)渲卣{(diào)度的過程中,需要實(shí)時獲取各工作節(jié)點(diǎn)以及工作節(jié)點(diǎn)內(nèi)各任務(wù)實(shí)例的CPU負(fù)載。對于各任務(wù)進(jìn)程的CPU資源占用信息,可以通過Java API中ThreadMXBean類的getThreadCpuTime(long id)方法獲取,其中id為各Java進(jìn)程中運(yùn)行任務(wù)實(shí)例的線程ID。對于各工作節(jié)點(diǎn)的CPU負(fù)載,可以通過對運(yùn)行在該工作節(jié)點(diǎn)中的任務(wù)實(shí)例CPU負(fù)載進(jìn)行累加求得。此外,工作節(jié)點(diǎn)中相關(guān)硬件參數(shù)可通過/proc目錄下的相關(guān)文件獲得。在代碼編寫完成后,使用Maven創(chuàng)建自定義調(diào)度器的jar文件,將其放置到${HERON_HOME}/lib/scheduler目錄下,并在${HERON_HOME}/conf/aurora目錄下的scheduler.yaml文件中進(jìn)行配置DSCScheduler和DSCLauncher類名后即可使用。
改進(jìn)后的Heron系統(tǒng)結(jié)構(gòu)如圖4所示。其中,在Heron系統(tǒng)結(jié)構(gòu)中新增的四個自定義模塊分別是:
1)負(fù)載監(jiān)測模塊。部署在各個工作節(jié)點(diǎn)中,負(fù)責(zé)在一定時間窗口內(nèi)監(jiān)測工作節(jié)點(diǎn)中運(yùn)行的任務(wù)實(shí)例CPU負(fù)載、任務(wù)間數(shù)據(jù)流大小和內(nèi)存資源占用等信息,并將監(jiān)測信息實(shí)時寫入數(shù)據(jù)存儲模塊。使用該模塊,需要在拓?fù)渲懈鱏pout的open()和nextTuple()方法以及各Bolt的prepare()和execute()方法中調(diào)用該模塊。
2)調(diào)度觸發(fā)模塊。部署在DSCScheduler中并在該調(diào)度器對象實(shí)例化時啟動。負(fù)責(zé)在滿足重調(diào)度觸發(fā)條件時調(diào)用調(diào)度控制模塊中重調(diào)度方法完成DSC-Heron的重調(diào)度過程。
3)數(shù)據(jù)存儲模塊。存儲并實(shí)時更新負(fù)載監(jiān)測模塊獲取的任務(wù)實(shí)例監(jiān)測信息,這里使用MySQL數(shù)據(jù)庫實(shí)現(xiàn)。
4)調(diào)度控制模塊。完成流分類調(diào)度策略的核心模塊,通過調(diào)度觸發(fā)模塊調(diào)用。根據(jù)原始任務(wù)分配計(jì)劃獲取由DSC-Heron構(gòu)建的目標(biāo)任務(wù)分配計(jì)劃,更新拓?fù)淙蝿?wù)分配以完成任務(wù)調(diào)度過程。該模塊采用與DSCScheduler松耦合的設(shè)計(jì)范式,便于未來部署其他在線重調(diào)度算法。
4?實(shí)驗(yàn)
4.1?實(shí)驗(yàn)環(huán)境
實(shí)驗(yàn)環(huán)境采用硬件配置相同的PC搭建一個9節(jié)點(diǎn)的Heron集群。其中一個主控節(jié)點(diǎn)運(yùn)行Heron、Heron Tracker、Heron UI、Mesos Master和Aurora Scheduler;一個協(xié)調(diào)節(jié)點(diǎn)運(yùn)行ZooKeeper和MySQL等服務(wù);一個節(jié)點(diǎn)運(yùn)行Heron自動創(chuàng)建的拓?fù)涔芾磉M(jìn)程(Topology Master)用于管理拓?fù)湔麄€生命周期;其余節(jié)點(diǎn)為工作節(jié)點(diǎn)分別運(yùn)行Mesos Agent、Aurora Observer和Aurora Executor,負(fù)責(zé)實(shí)際運(yùn)行拓?fù)涞娜蝿?wù)實(shí)例。此外,集群中各節(jié)點(diǎn)共同運(yùn)行HDFS作為Heron的Uploader系統(tǒng)組件并負(fù)責(zé)共享Heron Binaries文件。實(shí)驗(yàn)集群的軟硬件配置如表1所示。
實(shí)驗(yàn)采用Heron Github開源項(xiàng)目[24]提供的Sentence WordCount和WordCount示例拓?fù)湟约白远xFileWordCount拓?fù)?,三種拓?fù)渲胁捎貌煌慕Y(jié)構(gòu)和數(shù)據(jù)源以評估DSC-Heron在不同場景中的表現(xiàn)。其中SentenceWordCount拓?fù)浒龑咏Y(jié)構(gòu):
第一層Spout組件隨機(jī)創(chuàng)建一個長度為128×1024的句子數(shù)組并隨機(jī)發(fā)射;
第二層Bolt組件(名為Split)通過空格字符分割句子產(chǎn)生單詞;
第三層Bolt組件(名為Count)接受Split中發(fā)送的單詞并進(jìn)行計(jì)數(shù)。
WordCount為兩層結(jié)構(gòu),Spout組件(名為word)隨機(jī)生成單詞并發(fā)射,由Bolt(名為consumer)組件進(jìn)行統(tǒng)計(jì)。FileWordCount拓?fù)浣Y(jié)構(gòu)與SentenceWordCount相同,但數(shù)據(jù)源來自原版英文歷史小說《雙城記》,格式為txt。SentenceWordCount拓?fù)湎鄬τ赪ordCount的兩層結(jié)構(gòu),包含的數(shù)據(jù)流數(shù)量和流簇規(guī)模較大,兩者對比有利于評估系統(tǒng)性能的優(yōu)化效果。
FileWordCount拓?fù)鋽?shù)據(jù)源采用真實(shí)文本文檔,其中各單詞出現(xiàn)的頻率不盡相同,在實(shí)際的應(yīng)用場景中有一定代表性。
實(shí)驗(yàn)拓?fù)渲性O(shè)置了各組件的并行度和資源需求,數(shù)據(jù)流在各組件間的傳遞模式,可用的容器數(shù)量以及資源需求,詳細(xì)的測試拓?fù)溥\(yùn)行參數(shù)配置如表2所示。
表2中:topology.max.spout.pending(簡稱為pending)的值為Spout緩存隊(duì)列的最大容量,當(dāng)隊(duì)列長度達(dá)到設(shè)置的容量時Spout停止發(fā)送數(shù)據(jù),當(dāng)隊(duì)列長度小于設(shè)定值時Spout持續(xù)發(fā)送數(shù)據(jù),從而實(shí)現(xiàn)對拓?fù)鋽?shù)據(jù)傳輸速率的控制。topology.message.timeout.secs(簡稱為timeout)的值配合ATLEAST_ONCE可靠性語義模式和Acknowledgement[25]機(jī)制使用。實(shí)驗(yàn)拓?fù)湓贐olt組件中設(shè)置ack機(jī)制,唯一標(biāo)識標(biāo)記的元組從Spout中發(fā)射,在timeout設(shè)定的時間內(nèi)經(jīng)過各個組件處理完成后由Spout中ack方法進(jìn)行確認(rèn),若沒有在該參數(shù)規(guī)定的時間內(nèi)接受到指定元組,Heron則會重新發(fā)送以保證ATLESAT_ONCE的有效進(jìn)行。
對于參數(shù)pending和timeout的取值,在默認(rèn)調(diào)度策略下使用SentenceWordCount拓?fù)浣?jīng)過多次實(shí)驗(yàn)得到表3所示參數(shù)取值對拓?fù)湓M失敗率的影響,在表2所示參數(shù)下其他測試拓?fù)鋵?shí)驗(yàn)結(jié)果與此類似。其中, pending的值設(shè)置為100且timeout值設(shè)置為60s時,雖然拓?fù)溥\(yùn)行前5min的失敗率較低,但由于pending的值較小無法正常發(fā)揮集群運(yùn)行性能且無法體現(xiàn)真實(shí)應(yīng)用場景;當(dāng)pending的值設(shè)置為1000且timeout的值設(shè)置為60s時,拓?fù)涮峤缓笄?min內(nèi)的元組失敗率為6.6%,相對于相同pending值但timeout值為30s時拓?fù)溥\(yùn)行的20%失敗率,元組失敗率明顯降低并且有少量拓?fù)涑霈F(xiàn)重新發(fā)送的情況,該場景較符合真實(shí)應(yīng)用場景且集群能夠快速地趨于穩(wěn)定。而當(dāng)pending的值設(shè)置為10000且timeout的值為60s時,元組失敗率較高,此時雖然提高timeout參數(shù)值可以降低元組失敗率,但會導(dǎo)致CPU負(fù)載過高從而使集群運(yùn)行情況不可預(yù)測。因此, pending的值設(shè)置為1000,timeout的值設(shè)置為60s,在當(dāng)前集群的配置下能夠較好地滿足實(shí)驗(yàn)的需要。
此外,由于集群中主控節(jié)點(diǎn)獨(dú)立運(yùn)行,拓?fù)涔芾砥鲉为?dú)運(yùn)行于一個工作節(jié)點(diǎn)的容器中,ZooKeeper和MySQL等服務(wù)進(jìn)程占用一個節(jié)點(diǎn)資源,因此集群中可分配任務(wù)實(shí)例的工作節(jié)點(diǎn)數(shù)量為6。在表2中將topology.stmgrs的數(shù)量設(shè)置為6,即容器數(shù)量與集群中工作節(jié)點(diǎn)的數(shù)量相同,意味著每個工作節(jié)點(diǎn)中僅運(yùn)行一個容器,從而消除容器間通信帶來的開銷,重點(diǎn)關(guān)注節(jié)點(diǎn)間通信和節(jié)點(diǎn)內(nèi)實(shí)例間通信的轉(zhuǎn)換對集群性能的影響。
為驗(yàn)證DSC-Heron的有效性,本文與Heron默認(rèn)的輪詢調(diào)度策略進(jìn)行了對比,表4中列出了DSC-Heron的參數(shù)設(shè)置,其中主控節(jié)點(diǎn)根據(jù)reschedule.timeout參數(shù)觸發(fā)重調(diào)度。工作節(jié)點(diǎn)中α、 β和γ為資源約束模型中設(shè)置的資源閾值參數(shù),為避免節(jié)點(diǎn)滿負(fù)荷運(yùn)行影響集群性能,α值設(shè)置為0.7,由于Heron容器中已為系統(tǒng)級進(jìn)程留有內(nèi)存資源,因此β和γ的值設(shè)置為1。time.window.length和time.window.count為負(fù)載監(jiān)測模塊中設(shè)置的數(shù)據(jù)統(tǒng)計(jì)窗口大小和數(shù)量,即使用長度為5s的時間窗口對數(shù)據(jù)采樣3次統(tǒng)計(jì)平均值。此外Heron集群的其他配置參數(shù)與默認(rèn)輪詢調(diào)度算法的參數(shù)均取默認(rèn)值。