魯 亮 于 炯 卞 琛 劉月超 廖 彬 李慧娟
1(新疆大學信息科學與工程學院 烏魯木齊 830046)
2(新疆財經(jīng)大學統(tǒng)計與信息學院 烏魯木齊 830012)
3(國網(wǎng)烏魯木齊供電公司 烏魯木齊 830011)
(luliang19891108@gmail.com)
近些年來,大數(shù)據(jù)相關(guān)研究及應(yīng)用已成為學術(shù)界和企業(yè)界關(guān)注的熱點,其計算模式包括批量計算、流式計算、交互計算、圖計算等[1-5],并以前兩者應(yīng)用居多.批量計算為先存儲后計算(如Hadoop生態(tài)系統(tǒng)),適合實時性不高且需覆蓋全局數(shù)據(jù)的應(yīng)用場景;流式計算打破了Hadoop中MapReduce[6]框架一統(tǒng)天下的局面,它無需存儲,只要數(shù)據(jù)源處于活動狀態(tài),數(shù)據(jù)就會持續(xù)生成,并以流(由時間上無窮的元組序列組成)的形式在各工作節(jié)點的內(nèi)存中進行計算,適合實時性要求嚴格且僅需針對窗口內(nèi)的局部數(shù)據(jù)進行處理的應(yīng)用場景.流式大數(shù)據(jù)處理平臺大大提高了在線數(shù)據(jù)密集型(on line data intensive, OLDI)應(yīng)用[7]的用戶體驗,可廣泛應(yīng)用于金融銀行業(yè)、互聯(lián)網(wǎng)和物聯(lián)網(wǎng)等諸多領(lǐng)域,涵蓋股市實時分析、搜索引擎與社交網(wǎng)站、交通流量實時預(yù)警等各類典型應(yīng)用[8].現(xiàn)有的流式大數(shù)據(jù)處理框架以Twitter公司的Storm系統(tǒng)為代表[9].Storm是一個采用主從式架構(gòu)的開源分布式實時計算平臺,其編程模型簡單,支持包括Java在內(nèi)的多種編程語言,橫向可擴展性良好.相較于目前同樣主流的Flink[10]和Spark Streaming[11],Storm在大數(shù)據(jù)流式處理方面的實時性更佳;相較于不開源的Puma[12]和社區(qū)冷淡的S4[13],Storm的商用前景更為廣闊.加之新版本特性的加入、更多庫的支持以及與其他開源項目的無縫融合,Storm逐漸成為學術(shù)界和工業(yè)界的研究熱點,被稱為“實時處理領(lǐng)域的Hadoop”.
一個流式計算作業(yè)及其包含的一系列任務(wù)可用有向無環(huán)圖(directed acyclic graph, DAG)表示,Storm中稱之為拓撲(topology).從拓撲實例模型的角度來看,拓撲的1個頂點代表某一特定任務(wù),1條有向邊代表任務(wù)之間的依賴關(guān)系.Storm在進行任務(wù)分配時采用輪詢調(diào)度策略(round-robin scheduling),即先將拓撲中包含的每個任務(wù)均勻地分布到各個工作進程中,再將各工作進程均勻地分布到各工作節(jié)點上,并未考慮到不同工作節(jié)點的性能和負載差異,以及工作節(jié)點之間的網(wǎng)絡(luò)傳輸開銷和節(jié)點內(nèi)部的進程與線程通信開銷,無法最大限度地發(fā)揮Storm集群的實時計算能力.本文針對Storm輪詢調(diào)度策略存在的不足,主要作了4個方面研究工作:
1) 分析已有流式計算框架調(diào)度策略的優(yōu)缺點,闡述本文的優(yōu)化方向和實施思路.
2) 從拓撲的邏輯模型、實例模型和任務(wù)分配模型出發(fā),比較Storm集群中3種不同的通信方式.由此建立資源約束模型和最優(yōu)通信開銷模型,提出并證明了最優(yōu)通信開銷原則.
3) 為解決異構(gòu)Storm系統(tǒng)中工作節(jié)點任務(wù)過載和節(jié)點間通信開銷大的問題,建立任務(wù)遷移模型,提出并證明了遷移優(yōu)化原則和節(jié)點間數(shù)據(jù)流最優(yōu)性原理,并由此推出最優(yōu)遷移原則,為任務(wù)遷移策略的設(shè)計提供理論依據(jù).
4) 從源節(jié)點選擇、阻尼線程選擇和目的節(jié)點選擇3個方面出發(fā),提出異構(gòu)Storm環(huán)境下的任務(wù)遷移策略(task migration strategy for heterogeneous Storm cluster, TMSH-Storm),包括源節(jié)點選擇算法和任務(wù)遷移算法,使系統(tǒng)在拓撲執(zhí)行過程中根據(jù)各工作節(jié)點和各任務(wù)的實時負載情況以及任務(wù)間的數(shù)據(jù)流大小,實現(xiàn)任務(wù)的優(yōu)化遷移.實驗通過4個基準測試從不同角度證明了算法的有效性.
針對實時大規(guī)模數(shù)據(jù)的處理,現(xiàn)有解決方案可大致歸納為3類:高性能批量計算模式、流式計算模式和兩者混合的模式.其中,高性能批量計算模式的核心思想是修改以Hadoop為代表的批處理框架,通過減少中間結(jié)果的磁盤讀寫次數(shù)以及增加作業(yè)間的流水化程度來提高吞吐量[14-19].混合模式的主要思想是基于MapReduce模型增加或修改其中的某些處理步驟,以實現(xiàn)流式處理[20-25].這2種方案在一定程度上解決了大數(shù)據(jù)處理的實時性問題,但其性能仍遜色于流式計算模式,且其研究成果無法直接移植于現(xiàn)有流式計算平臺中.針對這一問題,已有國內(nèi)外學者提出了流式計算框架下的各種性能優(yōu)化方向和實施策略.文獻[8,26]在大數(shù)據(jù)流式計算綜述中總結(jié)了流式大數(shù)據(jù)在典型應(yīng)用領(lǐng)域中呈現(xiàn)出的實時性、易失性、突發(fā)性、無序性和無限性等特征,給出了理想的大數(shù)據(jù)流式計算系統(tǒng)在系統(tǒng)結(jié)構(gòu)、數(shù)據(jù)傳輸、應(yīng)用接口和高可用性等方面應(yīng)該具備的關(guān)鍵技術(shù)特征,闡述了已有各類流式計算系統(tǒng)在可伸縮性、容錯機制、狀態(tài)一致性、負載均衡和吞吐量等方面所面臨的技術(shù)挑戰(zhàn).此外,為獲得更好的服務(wù)質(zhì)量,已有學者提出各類性能感知的數(shù)據(jù)流調(diào)度算法,例如混合啟發(fā)式遺傳調(diào)度算法[27]、競爭感知的任務(wù)復(fù)制調(diào)度算法[28]、基于支持向量域描述和支持向量聚類算法[29]以及基于雙分子結(jié)構(gòu)的化學反應(yīng)優(yōu)化算法[30]等.
以上研究工作的側(cè)重點各異,但大多只適用于任務(wù)和節(jié)點信息變化不夠迅速的場景.如果數(shù)據(jù)流大小和速率經(jīng)常變化,以上調(diào)度策略的部署可能引起較大的系統(tǒng)波動.針對大數(shù)據(jù)流式計算環(huán)境下數(shù)據(jù)量大、變化迅速且無法追蹤的特點,有學者提出各類流式計算環(huán)境下的虛擬機監(jiān)測和可擴展平臺,從監(jiān)測對象[31-33](CPU等硬件負載、工作節(jié)點和線程的數(shù)量、作業(yè)的執(zhí)行情況等)、虛擬機部署和調(diào)整方式[33-35](人工干預(yù)、半自動和全自動)、優(yōu)化范圍[36](基于當前狀態(tài)的局部優(yōu)化、基于預(yù)測的全局優(yōu)化)等方面彈性調(diào)整虛擬機的數(shù)量和任務(wù)的部署,以達到性能優(yōu)化和負載均衡的效果.為更好地處理復(fù)雜作業(yè)中的大規(guī)模流式數(shù)據(jù),文獻[37]針對高可擴展的分布式中間件System S[38]提出一種分布式應(yīng)用調(diào)度優(yōu)化算法.該算法分為4個階段:前2個階段計算作業(yè)運行的備選節(jié)點,后2個階段決定各處理單元在各節(jié)點上的分配.該算法在滿足系統(tǒng)高可擴展性的同時,提高了調(diào)度的實時性.文獻[39]從有向無環(huán)圖優(yōu)化的角度出發(fā),提出彈性自適應(yīng)數(shù)據(jù)流圖模型,并使用該策略進行資源分配,以尋求最大化吞吐量和最小化響應(yīng)時間.文獻[40]從虛擬化網(wǎng)絡(luò)數(shù)據(jù)中心(virtualized networked data centers, VNetDCs)的角度,提出云計算SaaS計算模型下針對實時流式應(yīng)用的最小化能耗調(diào)度策略.該研究充分考慮到大數(shù)據(jù)實時流數(shù)據(jù)量大、數(shù)據(jù)到達速率不可控和不穩(wěn)定等特性,在響應(yīng)時間約束的前提下,最小化計算和網(wǎng)絡(luò)傳輸總能耗.文獻[41]提出大數(shù)據(jù)流式計算環(huán)境下實時且節(jié)能的資源調(diào)度模型Re-Stream.作者建立了能耗、響應(yīng)時間和CPU利用率之間的數(shù)學關(guān)系,定義了有向無環(huán)圖的關(guān)鍵路徑,并綜合運用關(guān)鍵路徑上性能感知的任務(wù)調(diào)度策略和非關(guān)鍵路徑上能耗感知的任務(wù)整合策略,達到了響應(yīng)時間和能耗雙目標優(yōu)化的效果.以上研究均在兼顧了流式大數(shù)據(jù)特征的前提下提出了各類流式計算平臺下的優(yōu)化策略,但針對Storm這一具體框架,在資源感知和通信開銷的優(yōu)化方面仍存在很大的探索空間.
針對Storm框架下的調(diào)度優(yōu)化策略,已有少量學者開展了部分研究.針對地理位置分散的Storm集群而言,文獻[42]提出并實現(xiàn)了一種服務(wù)質(zhì)量感知的Storm分布式調(diào)度器,其在網(wǎng)絡(luò)時延和可靠性等方面均優(yōu)于Storm默認調(diào)度策略的執(zhí)行結(jié)果,但這與本文集中式數(shù)據(jù)中心的研究背景不符.文獻[43]提出Storm環(huán)境下的自適應(yīng)調(diào)度策略,分為離線調(diào)度和在線調(diào)度2種.離線調(diào)度是在拓撲運行前分析其結(jié)構(gòu),將相互關(guān)聯(lián)的1對任務(wù)盡量調(diào)度到同一個工作進程中,然后將所有工作進程輪詢調(diào)度到各個工作節(jié)點上;在線調(diào)度則是通過插件實時監(jiān)測拓撲運行過程中系統(tǒng)的CPU負載和任務(wù)間數(shù)據(jù)流大小,當某節(jié)點上的任務(wù)持續(xù)過載時觸發(fā)重分配機制,依次將通信開銷較大的1對任務(wù)分配在CPU負載較輕的工作進程中,同理將通信開銷較大的1對工作進程分配在CPU負載較輕的工作節(jié)點上.這種自適應(yīng)調(diào)度策略很好地解決了Storm環(huán)境中的通信開銷問題,但仍有4點有待優(yōu)化:1)算法復(fù)雜度很高,在線調(diào)度策略執(zhí)行時將導(dǎo)致延遲極大,對性能造成了一定的沖擊;2)算法有效降低了工作節(jié)點間通信開銷,但對于同一節(jié)點內(nèi)的進程間開銷仍比較大;3)Storm的拓撲結(jié)構(gòu)錯綜復(fù)雜,而實驗中采用的線性拓撲缺乏代表性;4)該策略僅考慮CPU這一資源負載,度量角度較為單一.
文獻[44]提出Storm框架下流量感知的在線調(diào)度策略T-Storm.T-Storm通過監(jiān)控各任務(wù)的CPU負載、數(shù)據(jù)流傳輸(流量)負載以及各工作節(jié)點的CPU負載,實現(xiàn)任務(wù)的在線動態(tài)重部署,在保證沒有工作節(jié)點過載的情況下最小化網(wǎng)絡(luò)傳輸開銷;同時,針對運行于同一個工作節(jié)點且屬于同一個拓撲的所有任務(wù),T-Storm只為其分配1個工作進程(槽),從而消除了進程間通信開銷.此外,T-Storm能通過參數(shù)調(diào)整增加或減少啟動的工作節(jié)點數(shù)量.然而該文獻仍存在2方面不足:1)與文獻[43]類似,T-Storm仍存在調(diào)度執(zhí)行開銷大和資源負載度量角度單一的問題;2)T-Storm將各任務(wù)發(fā)送和接收的數(shù)據(jù)流大小孤立考慮,忽略了直接通信的1對任務(wù)之間的數(shù)據(jù)流情況,對于通信復(fù)雜的拓撲而言可能無法達到近似最優(yōu).
文獻[45]提出Storm框架下資源感知的調(diào)度策略R-Storm.R-Storm將資源約束分為針對內(nèi)存的硬約束以及針對CPU和網(wǎng)絡(luò)帶寬的軟約束,其中硬約束條件必須滿足,而軟約束條件則盡量滿足.與文獻[44]提出的T-Storm相同,R-Storm不考慮任務(wù)與工作進程的映射關(guān)系,而直接考慮拓撲中各任務(wù)在工作節(jié)點上的分配,在最小化網(wǎng)絡(luò)時延的同時最大化資源利用率,最終達到提高整個集群吞吐量的效果.R-Storm充分考慮了資源池中各類資源的有效利用,但仍存在2點不足:1)雖然R-Storm為編程人員提供了豐富的應(yīng)用程序接口(application programming interface, API),但對于拓撲中各任務(wù)的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬需求需要通過API人為設(shè)置而非實時監(jiān)測獲得,無法用于數(shù)據(jù)流快速變化場景下的在線調(diào)度.與此同時,Storm編程人員往往專注于業(yè)務(wù)功能的開發(fā),對任務(wù)運行時的資源評估缺乏經(jīng)驗,可能對資源的有效利用造成一定影響.2)R-Storm只適合于同構(gòu)環(huán)境下,不能通用于異構(gòu)環(huán)境.
本文與上述研究成果的不同之處在于:
1) 本文提出的任務(wù)遷移策略針對以上不足進行了改進,能夠?qū)崟r監(jiān)測并統(tǒng)籌兼顧拓撲中各線程的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負載,在各類資源約束的前提下最小化通信開銷,保證集群的低延遲.
2) 現(xiàn)有研究[43-45]均是在拓撲的運行過程中,發(fā)現(xiàn)資源溢出后針對所有任務(wù)進行重新部署.這種做法雖然能夠帶來較為明顯的優(yōu)化效果,但算法的執(zhí)行過程將引起極大的延遲,進而影響Storm系統(tǒng)的運行效率.本文提出的任務(wù)遷移策略開銷較小,能夠保證任意時刻系統(tǒng)的實時性處理需求.
3) 實驗基于Intel公司Zhang等人[46]發(fā)布在GitHub上的storm-benchmark-master基準測試,而非已有文獻中作者定義的拓撲結(jié)構(gòu),更具代表性.
4) 該策略可適用于異構(gòu)集群環(huán)境中,應(yīng)用范圍更為廣闊.
本節(jié)針對Storm架構(gòu)和部分概念做簡要介紹.
如圖1所示,一個完整的Storm分布式系統(tǒng)由4類節(jié)點組成:
1) 主控節(jié)點(master node).它是運行Storm Nimbus后臺服務(wù)的節(jié)點.Nimbus是Storm系統(tǒng)的中心,負責接收用戶提交的拓撲作業(yè),向工作節(jié)點分配任務(wù)(進程級和線程級)并傳輸作業(yè)副本,依賴協(xié)調(diào)節(jié)點的服務(wù)監(jiān)控集群運行狀態(tài),提供狀態(tài)獲取接口,在Storm中的地位類似于Hadoop中的JobTracker.
2) 工作節(jié)點(work node).它是運行Storm Supervisor后臺服務(wù)的節(jié)點.Supervisor負責監(jiān)聽Nimbus分配的任務(wù)并下載作業(yè)副本,啟動、暫?;虺蜂N任務(wù)的工作進程(Worker)及工作線程(Executor).Supervisor是分布式部署的,在Storm中的地位類似于Hadoop中的TaskTracker.
3) 控制臺節(jié)點(Web console node).它是運行Storm 用戶界面(user interface, UI)后臺服務(wù)的節(jié)點.它實際上是一個Web服務(wù)器,在指定端口提供網(wǎng)頁服務(wù).用戶可以使用瀏覽器訪問控制臺節(jié)點的Web頁面,提交、暫停和撤銷作業(yè),也可以以只讀的形式獲取系統(tǒng)配置、作業(yè)及各個組件的運行時狀態(tài).
4) 協(xié)調(diào)節(jié)點(coordinate node).它是運行Zoo-Keeper進程的節(jié)點.Nimbus和Supervisor之間所有的協(xié)調(diào),包括分布式狀態(tài)維護和分布式配置管理,都是通過該協(xié)調(diào)節(jié)點實現(xiàn)的.在協(xié)調(diào)節(jié)點的幫助下,Nimbus和Supervisor的無狀態(tài)服務(wù)可以快速失敗,這也是Storm系統(tǒng)的穩(wěn)定性和可用性較高的原因之一.
為便于后文理解,現(xiàn)對工作節(jié)點內(nèi)部結(jié)構(gòu)以及拓撲相關(guān)概念做簡要說明:
1) 拓撲(topology).即流式作業(yè)本身,可用一個有向無環(huán)圖表示,由Spout、Bolt和數(shù)據(jù)流組成.
2) 元組(tuple).Storm數(shù)據(jù)處理的基本單元,是包含了1個或者多個鍵值對的列表.
3) 數(shù)據(jù)流(stream).由無限的元組組成的序列,是Storm中對傳遞的數(shù)據(jù)進行的抽象.
4) Spout.Storm中的數(shù)據(jù)源編程單元,用于為拓撲生產(chǎn)數(shù)據(jù).一般地,Spout從消息隊列、關(guān)系型數(shù)據(jù)庫、非關(guān)系型數(shù)據(jù)庫(NoSQL)、實時日志、Hadoop分布式文件系統(tǒng)(Hadoop distributed file system, HDFS)等外部數(shù)據(jù)源不間斷地讀取數(shù)據(jù),并以元組的形式傳遞給拓撲進行處理.
5) Bolt.Storm中的數(shù)據(jù)處理編程單元,實現(xiàn)拓撲中的處理邏輯.一般地,編程人員可在Bolt中實現(xiàn)數(shù)據(jù)過濾、聚合和查詢數(shù)據(jù)庫等操作,處理的結(jié)果以元組形式流式傳遞至其下游組件進行處理.
6) 組件(component).Spout和Bolt的統(tǒng)稱.
7) 流組模式(stream grouping).Storm各組件之間的數(shù)據(jù)傳遞模式,目前共有8種,分別是隨機分組(shuffle grouping)、按域分組(field grouping)、副本分組(all grouping)、全局分組(global grouping)、直接分組(direct grouping)、本地分組(local or shuffle grouping)、不區(qū)分分組(non grouping)和自定義流組.
8) 工作進程(Worker).實際上是一個Java虛擬機(Java virtual machine, JVM),由它執(zhí)行指定的子拓撲.同一個拓撲可以由多個工作進程共同完成,但1個工作進程只能執(zhí)行1個子拓撲.
9) 槽(Slot).配置在工作節(jié)點上用于接收數(shù)據(jù)的端口,1個工作進程占用1個槽,槽的數(shù)量表明該工作節(jié)點最多可容納的工作進程的數(shù)量.一般為了拓撲的進程級并行,槽的個數(shù)配置為工作節(jié)點CPU的核心數(shù).
10) 任務(wù)(task).每個任務(wù)為其對應(yīng)組件的1個實例,是拓撲執(zhí)行的代碼單元.
11) 工作線程(Executor).每個工作進程由多個工作線程組成,并且運行1個組件中包含的1個或多個任務(wù),因此系統(tǒng)中工作線程的數(shù)量總是小于或等于任務(wù)的數(shù)量.一般地,為了實現(xiàn)任務(wù)的線程級并行,1個工作線程只包含1個任務(wù).在這種情況下,Storm任務(wù)的調(diào)度即相當于該任務(wù)所對應(yīng)的工作線程的調(diào)度.本文即在此場景下研究Storm的任務(wù)遷移策略,下文簡稱為線程.
為簡便起見,除第5節(jié)之外,下文出現(xiàn)“節(jié)點”之處均指工作節(jié)點,出現(xiàn)“進程”之處均指工作進程;第5節(jié)實驗部分為避免混淆,均對具體意義上的“節(jié)點”和“進程”做了說明.
本節(jié)從Storm拓撲的邏輯模型、實例模型和任務(wù)分配模型出發(fā),比較Storm集群中3種不同的通信方式.由此建立資源約束模型和最優(yōu)通信開銷模型,為任務(wù)遷移策略的設(shè)計與實現(xiàn)提供了理論依據(jù).
定義1. 拓撲邏輯模型.任意拓撲可使用二元組(C,S)表示,其中C={c1,c2,…,c|C|}為拓撲的頂點集合,每個元素表示1個組件,即Spout或Bolt;S={s1,2,s1,3,…,s|C|-i,|C|}為拓撲的有向邊集合,每個元素表示2個組件間傳遞的數(shù)據(jù)流.如果存在si,j∈S且i≠j,則ci,cj∈C,表示數(shù)據(jù)由組件ci發(fā)出并由cj接收.這樣的有向無環(huán)圖表示的拓撲稱為拓撲邏輯模型.如圖2所示,其中{ca,cb,…,cg}為組件集,{sa,c,sa,d,…,se,g}為數(shù)據(jù)流集.ca和cb為數(shù)據(jù)源編程單元Spout,負責讀取外部數(shù)據(jù)源并發(fā)送至流式計算集群進行處理;其余的組件為數(shù)據(jù)處理編程單元Bolt,負責處理上游組件發(fā)送的數(shù)據(jù)并以某種流組模式將結(jié)果發(fā)送至其下游組件;特別地,組件cf和cg為數(shù)據(jù)終點,通常用于將最終結(jié)果展示至終端或持久化至數(shù)據(jù)庫.
Fig. 3 Instance model of a topology圖3 拓撲實例模型
定義3. 子拓撲.由拓撲實例模型中原拓撲的線程的子集以及這些線程之間的數(shù)據(jù)流構(gòu)成.設(shè)原拓撲的線程集合為E,拓撲的線程的子集為E′,對于emn∈E和ei j,ek l∈E′,若在原拓撲中存在si j,mn和smn,k l,則必然有emn∈E′.例如在圖3中,eb,ed2,ef2以及它們之間的數(shù)據(jù)流可構(gòu)成子拓撲,而僅由eb和ef2則無法構(gòu)成子拓撲.
定義4. 任務(wù)分配模型.在Storm集群中,資源池由一系列節(jié)點構(gòu)成,定義該集合為N={n1,n2,…,n|N|}.每個節(jié)點內(nèi)配置有若干個槽,對于任意ni∈N,有Sloti={sloti1,sloti2,…,sloti|Sloti|},其中sloti j表示第i個節(jié)點的第j個槽.對于任意1個拓撲,用戶將在代碼中設(shè)置其運行所需的進程數(shù)量和每個組件的線程數(shù)量.設(shè)某拓撲的進程集合為Wi={wi1,wi2,…,wi|Wi|},其中wi j表示第i個節(jié)點上運行的第j個進程.由槽和工作進程的定義可知,有Wi?Sloti.若組件ci運行的第j個線程ei j分配到了第k個節(jié)點上,則記f(ei j)=nk;若組件ci運行的第j個線程ei j分配到了第k個節(jié)點的第l個槽上,則記g(ei j)=slotk l.其中f和g均為分配函數(shù).如圖4為圖3的拓撲運行于Storm集群中的真實場景,其中橢圓形虛線表示進程,矩形實線表示節(jié)點,圖4的含義為拓撲的10個線程分布在2個節(jié)點的3個進程中,以ed2和ee為例可表示為:f(ed2)=n1,g(ed2)=slot12,f(ee)=n2,g(ee)=slot21.
Fig. 4 Model for task assignment圖4 任務(wù)分配模型
由此可見,在Storm系統(tǒng)中存在3種通信方式:
1) 節(jié)點間通信,即節(jié)點間的直接通信.這種通信開銷最大,需要占用大量的網(wǎng)絡(luò)資源.如果網(wǎng)絡(luò)負載過高或帶寬過小,將對數(shù)據(jù)處理的實時性產(chǎn)生很大影響.如圖4所示,線程eb和ed3之間的通信即屬于節(jié)點間通信,數(shù)據(jù)流sb,d3為節(jié)點間數(shù)據(jù)流.
2) 進程間通信,即同一個節(jié)點內(nèi)的進程間直接通信.這種通信開銷介于節(jié)點間通信開銷和線程間通信開銷之間.如圖4所示,線程ed1和ef2之間的通信即屬于進程間通信,數(shù)據(jù)流sd1,f2為進程間數(shù)據(jù)流.
3) 線程間通信,即同一個節(jié)點且同一進程內(nèi)的線程間直接通信.這種通信開銷最小且不可避免.如圖4所示,線程ed1和ef1之間的通信即屬于線程間通信,數(shù)據(jù)流sd1,f1為線程間數(shù)據(jù)流.
Storm默認調(diào)度器EvenScheduler采用輪詢策略進行任務(wù)分配.EvenScheduler首先遍歷拓撲中的所有實例并為每個實例分配1個線程;然后將其均勻分配到各個進程中;接著根據(jù)各個節(jié)點上槽的空閑情況,將進程均勻分配到各個節(jié)點上.這樣的調(diào)度策略并未考慮到節(jié)點間和進程間較大的通信開銷以及異構(gòu)節(jié)點間的性能差異.針對這一問題,本文從以下3個方面提出優(yōu)化構(gòu)思:
1) 文獻[44]通過Storm吞吐量測試[47]表明,針對運行在1個節(jié)點上的子拓撲而言,若為其分配多個進程(即占用多個槽),將會增加進程間通信開銷進而影響性能.因此,在節(jié)點數(shù)量不變的情況下,僅為位于每個節(jié)點的子拓撲各分配1個進程,能夠達到性能最優(yōu)的效果,這為解決最小化進程間通信開銷問題提供了思路.
2) 對于節(jié)點間通信問題,需在充分利用有限資源的基礎(chǔ)上,盡量將彼此間數(shù)據(jù)流較大的1對任務(wù)部署到同一節(jié)點,即將節(jié)點間通信轉(zhuǎn)化為節(jié)點內(nèi)的線程間通信,從而達到最小化節(jié)點間通信開銷的目的.
3) 針對異構(gòu)節(jié)點間的性能差異問題,需實時監(jiān)測不同節(jié)點上各線程的負載,并預(yù)測某線程遷移到目的節(jié)點上各資源的變化情況,為線程的遷移提供決策.
(1)
(2)
在Storm應(yīng)用環(huán)境中,為保證集群的可靠性目標,各節(jié)點不能滿負荷運行,一般需由運維人員設(shè)定相應(yīng)閾值以預(yù)留少量的計算資源,當超出閾值后則發(fā)出警告.設(shè)CPU占用率閾值為α,內(nèi)存占用率閾值為β,網(wǎng)絡(luò)帶寬占用率閾值為γ,則式(1)~(3)可進一步完善為
(4)
(5)
在Storm集群中,各節(jié)點的資源占用由運行在該節(jié)點上的子拓撲所包含的所有線程占用的各類資源共同構(gòu)成,則剩余資源為各資源總量與資源占用的差值,有:
(7)
(8)
(10)
(11)
(12)
轉(zhuǎn)化后的資源約束模型將為后文線程遷移的決策過程提供理論依據(jù).
由3.1節(jié)可知,拓撲中包含3種通信開銷,節(jié)點間通信開銷最大,進程間通信開銷次之,線程間通信開銷最小.因此需要在滿足資源約束模型的同時最小化這3類開銷.然而拓撲一旦提交,線程數(shù)量和數(shù)據(jù)流數(shù)量即固定下來,且為保證元組的流式傳遞,各線程之間的通信開銷不可避免.若要達到最小化通信開銷的效果,需最小化節(jié)點間通信開銷和進程間通信開銷;換言之,即需盡可能地將節(jié)點間通信開銷和進程間通信開銷轉(zhuǎn)化為線程間通信開銷.同一類型的通信開銷可由數(shù)據(jù)流大小體現(xiàn),數(shù)據(jù)流越大,則傳輸時間越長,即通信開銷越大,反之亦然.設(shè)拓撲中線程ei j與ek l之間的數(shù)據(jù)流大小為vi j,k l或vk l,i j(v的下標與數(shù)據(jù)流向無關(guān)),則可建立優(yōu)化模型:
(13)
約束條件為式(10)~(12).
定理1. 最優(yōu)通信開銷原則.最小化節(jié)點間通信開銷和進程間通信開銷等價于最大化線程間通信開銷,即上述目標函數(shù)等價于
(15)
證明. 根據(jù)Storm流式計算模型,拓撲一旦提交到集群,其包含的線程數(shù)量和數(shù)據(jù)流數(shù)量即不可改變.因此在不發(fā)生擁塞的情況下,總數(shù)據(jù)流大小為一定值C,即
(16)
證畢.
為了消除進程間通信開銷,本文僅為調(diào)度到各節(jié)點上的子拓撲各分配1個進程,可看作是將節(jié)點上原有的多個進程合并,這樣就將原來存在的進程間通信開銷轉(zhuǎn)化為線程間通信開銷,在節(jié)點數(shù)量不變的前提下優(yōu)化了通信效率.因此下面將著重解決節(jié)點間通信開銷問題.
為滿足最優(yōu)通信開銷模型的要求,需修改Storm默認調(diào)度策略,將已提交拓撲的各個線程重新分配至各個節(jié)點.若將各節(jié)點理解成不同容量的背包,拓撲中的線程理解為不同的物品,可使用0-1背包問題的思想分析和解決此類NP問題.然而為同時滿足3.2節(jié)資源約束模型的要求,各背包應(yīng)同時存在CPU、內(nèi)存和網(wǎng)絡(luò)帶寬3類不同的資源約束,且為保證最小化節(jié)點間開銷,需成對考慮線程的放置,這便構(gòu)成了一個二次型三維約束條件的多背包問題.已有學者提出多種方法解決背包問題的變形,例如動態(tài)規(guī)劃[48]、搜索樹(如A*算法)[49]、近似算法[50-51]等.然而,在分布式流式計算框架中,此類算法時間復(fù)雜度較高,拓撲的部署過程開銷很大,將對集群的運行效率帶來較大的負面影響;此外,在拓撲各線程的調(diào)度過程中,數(shù)據(jù)源仍在源源不斷地產(chǎn)生數(shù)據(jù),若無法及時得到處理,可能導(dǎo)致元組積壓甚至因超時而處理失敗,無法滿足Storm用戶的實時性業(yè)務(wù)需求.可見調(diào)度策略的優(yōu)化設(shè)計應(yīng)盡可能減少受影響的線程數(shù)量,由此提出Storm框架下的任務(wù)遷移模型.
定理2. 遷移優(yōu)化原則1.若存在這樣1個節(jié)點,使得某線程與該節(jié)點的節(jié)點間數(shù)據(jù)流大于該線程與其所在節(jié)點內(nèi)的線程間數(shù)據(jù)流,則該線程遷移后將獲得更優(yōu)的通信開銷.
證明. 設(shè)待遷移線程為節(jié)點ns上的線程ei j,與ei j存在節(jié)點間數(shù)據(jù)流且剩余資源充裕的節(jié)點集合為Nd,其中nd為Nd中任意節(jié)點.則線程ei j與其所在節(jié)點ns內(nèi)的線程形成的數(shù)據(jù)流大小為
(17)
線程ei j與其他節(jié)點之間形成的數(shù)據(jù)流大小總和為
(18)
線程ei j與nd之間的數(shù)據(jù)流大小為
(19)
因此,對于線程ei j,與其相關(guān)的數(shù)據(jù)流總量為
vi j=vinterExecutor+vinterNode=
vinterExecutor+vinter_nd+(vinterNode-vinter_nd).
(20)
若將線程ei j遷移至節(jié)點nd,則原來的vinterExecutor必然由節(jié)點ns內(nèi)的線程間數(shù)據(jù)流變?yōu)閚d與ns之間的節(jié)點間數(shù)據(jù)流,即:
(21)
而原來的vinter_nd變?yōu)榱斯?jié)點nd內(nèi)部的線程間數(shù)據(jù)流,即:
(22)
此時遷移至節(jié)點nd上的線程ei j形成的節(jié)點間數(shù)據(jù)流總和為
(23)
因此對于遷移后的線程ei j,與其相關(guān)的數(shù)據(jù)流總量為
(24)
線程遷移前后拓撲實例模型并未改變,因此有:
(25)
要使得遷移后的拓撲具有更優(yōu)的通信開銷,則根據(jù)定理1需獲得更大的線程間數(shù)據(jù)流,即令:
(26)
(27)
故式(26)等價于
vinter_nd-vinterExecutor>0.
(28)
因此,若存在這樣的節(jié)點nd,使得線程ei j與nd的節(jié)點間數(shù)據(jù)流大于ei j與其所在節(jié)點ns內(nèi)的線程間數(shù)據(jù)流,則該線程遷移至節(jié)點nd后將獲得更優(yōu)的通信開銷.
證畢.
定理2闡述了為滿足最優(yōu)通信開銷模型的要求,選擇遷移目的節(jié)點時的理論依據(jù).實際上,定理2還可以從選擇待遷移線程的角度出發(fā)進行如下描述:
定理2′. 遷移優(yōu)化原則2.若存在這樣1個線程,使得該線程與某節(jié)點的節(jié)點間數(shù)據(jù)流大于該線程與其所在節(jié)點內(nèi)的線程間數(shù)據(jù)流,則該線程遷移后將獲得更優(yōu)的通信開銷.
定理2′的證明過程與定理2類似,在此不再贅述.
引理1. 節(jié)點間數(shù)據(jù)流最優(yōu)性原理.若某線程遷移前與某一節(jié)點之間的數(shù)據(jù)流最大,則遷移到該節(jié)點后將轉(zhuǎn)化為最大的線程間數(shù)據(jù)流,且與其他節(jié)點之間的數(shù)據(jù)流達到最小.
證畢.
由定理2和引理1可以得出以下結(jié)論:
結(jié)論1. 最優(yōu)遷移原則.為了通過線程遷移達到最優(yōu)通信開銷原則的要求,需在存在若干節(jié)點與待遷移線程之間的數(shù)據(jù)流大于該線程與其所在節(jié)點內(nèi)的線程形成的數(shù)據(jù)流的基礎(chǔ)上,將線程遷移至具有最大節(jié)點間數(shù)據(jù)流的節(jié)點,即
(29)
s.t.
(30)
為保證Storm的性能需求,任務(wù)遷移策略需滿足3個原則:1)為減少線程的遷移對Storm運行效率的沖擊,需最小化遷移開銷;2)遷移后各節(jié)點的各類資源占用不超過閾值;3)遷移后集群的運行效率得以提高,即令通信開銷達到最優(yōu).為同時滿足以上3個原則的要求,需要實時監(jiān)測各節(jié)點的各類資源剩余情況,當可用資源不足時,選擇對該節(jié)點運行效率影響最大的線程執(zhí)行遷移,且同時兼顧降低節(jié)點之間的通信開銷.本節(jié)將圍繞任務(wù)遷移策略的具體設(shè)計過程展開討論.
當某節(jié)點的某類資源占用率持續(xù)一段時間超出閾值后,則將該節(jié)點標記為源節(jié)點,意味著運行在該節(jié)點上的某些線程將被遷出.這里需要考慮2種特殊情況:
1) 多個節(jié)點的資源占用率同時超出閾值.當這種情況發(fā)生時,不便于同時將各節(jié)點均標記為源節(jié)點.這是因為源節(jié)點上的線程在遷移之前需要預(yù)測遷移后目的節(jié)點的資源剩余情況,而為了尋求最優(yōu)遷移目標,符合本節(jié)提出的3個原則的節(jié)點均可作為各源節(jié)點上待遷出線程的目的節(jié)點,不具有排他性.若多個源節(jié)點同時選擇了相同的目的節(jié)點且并發(fā)地將部分線程遷移至此,則遷移后目的節(jié)點的資源剩余情況將遠小于線程遷移前預(yù)測的結(jié)果,并可能發(fā)生資源溢出.因此就本文研究背景來看,需在時間開銷可接受范圍內(nèi)將資源占用率超出閾值的節(jié)點分別處理.
2) 同一節(jié)點上不同類資源(CPU、內(nèi)存和網(wǎng)絡(luò)帶寬)的占用率均超出閾值.當某2類資源或者這3類資源的占用率同時超過閾值時,需根據(jù)流式計算的特點為各類資源分配不同的優(yōu)先級.在Storm系統(tǒng)中,內(nèi)存作為數(shù)據(jù)臨時性存儲的唯一介質(zhì),一旦溢出將會造成災(zāi)難性后果,所以解決內(nèi)存資源的緊缺性問題最為迫切,優(yōu)先級應(yīng)設(shè)為最高.其次,CPU執(zhí)行具體任務(wù),并不可避免地產(chǎn)生序列化、反序列化和保證消息可靠傳輸?shù)阮~外開銷,網(wǎng)絡(luò)則負責將CPU計算后的結(jié)果在節(jié)點之間進行傳輸,這2類資源均會對拓撲的執(zhí)行效率產(chǎn)生直接影響,其優(yōu)先級需針對不同應(yīng)用中資源占用的傾向不同而人為設(shè)定.
綜合以上2種情況,我們在主控節(jié)點中采用了LinkedHashMap這一數(shù)據(jù)結(jié)構(gòu).與HashMap不同的是,LinkedHashMap保存了記錄的插入順序,讀取時可按序輸出,保證最先發(fā)生任務(wù)過載的節(jié)點被最先處理.當某個節(jié)點的某類資源占用率超出閾值后,將節(jié)點ID,資源類型這一鍵值對發(fā)送并插入到位于主控節(jié)點的LinkedHashMap中;當同一節(jié)點上不同類型的資源占用率超出閾值后,執(zhí)行插入操作時則需根據(jù)資源優(yōu)先級類型進行判斷:若當前資源的優(yōu)先級高于LinkedHashMap中已存有的資源類型,則自動執(zhí)行替換;否則保持原狀.由此設(shè)計算法1.
算法1. 源節(jié)點選擇算法.
輸入:節(jié)點n1,n2,…,ni,…;CPU占用率閾值α,內(nèi)存占用率閾值β,網(wǎng)絡(luò)帶寬占用率閾值γ;CPU優(yōu)先級pr_cpu,內(nèi)存優(yōu)先級pr_ram,網(wǎng)絡(luò)帶寬優(yōu)先級pr_bandwidth;
輸出:源節(jié)點集Ns;
初始化:Ns←newLinkedHashMapNode,Resource. /*源節(jié)點集*/
① if節(jié)點ni的內(nèi)存占用率在時間間隔T內(nèi)持續(xù)大于βthen
②Ns.put(ni, “RAM”);
③ end if
④ if節(jié)點ni的CPU占用率在時間間隔T內(nèi)持續(xù)大于αthen
⑤ ifNs.containsKey(ni)=false then
/*ni為新加入源節(jié)點集的節(jié)點*/
⑥Ns.put(ni, “CPU”);
⑦ else
⑧pr←getPriority(Ns.get(ni));
/*獲取ni已超出閾值資源的優(yōu)先級*/
⑨ ifpr_cpu>prthen
/*CPU優(yōu)先級更高*/
⑩Ns.put(ni, “CPU”);
出于篇幅原因,算法1僅以CPU和內(nèi)存資源為例說明了不同優(yōu)先級的資源占用率超出閾值時源節(jié)點的選擇情況.若某一節(jié)點的網(wǎng)絡(luò)帶寬資源占用率超出閾值時,其算法邏輯與行④~相同,只需把CPU資源替換為網(wǎng)絡(luò)帶寬資源即可.可見,當同一節(jié)點上不同類資源占用率均超出閾值時,算法1能夠保證優(yōu)先級高的資源被優(yōu)先考慮,而優(yōu)先級低的資源則暫被取代.其可行的原因是后文采用了線程遷移策略,隨著線程的遷出,源節(jié)點上優(yōu)先級高的資源問題解決之后,其余類型的資源問題可能隨之解決;若沒有解決,該節(jié)點將由于某類資源占用率超出閾值而繼續(xù)觸發(fā)行①和行④的判定條件,進而重新加入源節(jié)點集等候處理.各源節(jié)點的處理時間很短,不會因某源節(jié)點未及時處理而導(dǎo)致資源溢出等嚴重后果,這將在4.3節(jié)的算法評估中詳細介紹.
任務(wù)遷移分為遷移決策和遷移執(zhí)行2個過程,其中執(zhí)行僅作為決策結(jié)果的實施,因此本節(jié)的重點在于論述任務(wù)遷移的決策過程,分為阻尼線程的選擇和遷移目的節(jié)點的選擇2個步驟.
1) 阻尼線程的選擇
源節(jié)點資源占用率超出閾值后,應(yīng)遷出運行在該節(jié)點上的部分線程以降低負載.為選擇合適的待遷移線程,首先引入阻尼線程的概念.
定義5. 阻尼線程.即決定從源節(jié)點遷移出的線程.由于這樣的線程對源節(jié)點運行效率起到了一定的阻礙作用,故稱之為“阻尼”.為使得Storm集群的性能達到最優(yōu),需選擇阻尼線程進行遷移.
從表面上看,資源占用越大的線程則阻尼越大,為使得某節(jié)點的資源占用率迅速降至閾值以下,應(yīng)采用貪心策略依次將該節(jié)點上資源占用最大的線程遷出,直到滿足資源約束模型為止.這種做法雖然可以減少遷移次數(shù)從而最小化遷移開銷,但未將線程之間的通信類型考慮在內(nèi),可能導(dǎo)致遷移后節(jié)點間通信開銷的增加.因此,為同時滿足最優(yōu)通信開銷模型的要求,應(yīng)在盡可能減少遷移次數(shù)的前提下,選擇那些能夠使得節(jié)點間通信開銷轉(zhuǎn)化為更多節(jié)點內(nèi)線程間通信開銷的線程.這樣的線程才滿足阻尼線程的標準.
2) 目的節(jié)點的選擇
阻尼線程選定之后,另一個關(guān)鍵問題是目的節(jié)點的選擇,由結(jié)論1可知,選擇的目的節(jié)點需與阻尼線程之間存在最大節(jié)點間數(shù)據(jù)流,且需滿足這樣的節(jié)點間數(shù)據(jù)流大于與阻尼線程相關(guān)的源節(jié)點內(nèi)線程間數(shù)據(jù)流.這種做法雖然優(yōu)化了通信開銷,但需在滿足資源約束模型的前提下進行遷移,否則可能導(dǎo)致遷移后目的節(jié)點負載過重形成新的瓶頸.為了保證容納阻尼線程遷入的節(jié)點資源占用不超出閾值,在阻尼線程遷出源節(jié)點之前務(wù)必完成剩余資源的估算,下面將以CPU資源為例展開討論.
(31)
(32)
例如,若源節(jié)點采用2.5 GHz的雙核CPU,備選目的節(jié)點采用2.0 GHz的4核CPU,某線程在源節(jié)點上的CPU占用率為10%,則根據(jù)式(32),得該線程在備選目的節(jié)點上的CPU占用率為6.25%.
在遷移的決策過程中,可通過式(33)(34)預(yù)測線程遷移后源節(jié)點和備選目的節(jié)點的資源剩余情況:
(33)
(34)
算法2. 任務(wù)遷移算法.
輸入:節(jié)點n1,n2,…,ni,…;源節(jié)點集Ns;
初始化:ns←newNode; /*源節(jié)點*/
/*分配在源節(jié)點上的有序線程集合*/
vinterExecutor←0; /*當前線程與源節(jié)點內(nèi)的前驅(qū)和后繼線程構(gòu)成的線程間數(shù)據(jù)流大小*/
Nd←newArrayNode; /*備選目的節(jié)點集*/
vinter_nd←0; /*當前線程與某備選目的節(jié)點上的前驅(qū)和后繼線程構(gòu)成的節(jié)點間數(shù)據(jù)流大小*/
vmax←0. /*最大節(jié)點間數(shù)據(jù)流*/
①ns←獲取Ns中的第1個元素;
④ 若源節(jié)點剩余資源充裕則跳出該循環(huán),不再遷移下一個線程,否則執(zhí)行下述語句;
⑤ 根據(jù)式(17)計算vinterExecutor;
⑥Nd←與線程i存在直接節(jié)點間通信且當前剩余資源充裕的節(jié)點;
⑦ fornd=Nd[0] toNd.Length-1 do
⑧ if預(yù)測節(jié)點nd容納線程i后依然滿足
資源約束模型then
⑨ 根據(jù)式(19)計算vinter_nd;
⑩ ifvinter_nd>vmaxthen
/*當前線程與目的節(jié)點的節(jié)點間數(shù)據(jù)流大于該線程與所在源節(jié)點內(nèi)的線程間數(shù)據(jù)流*/
/*該源節(jié)點上所有阻尼線程及其遷出的目的節(jié)點決策完成,從源節(jié)點集中刪去該節(jié)點*/
4.3.1 算法復(fù)雜度分析
算法1實質(zhì)上為LinkedHashMap的插入和替換算法,時間復(fù)雜度為O(1).下面將針對算法2中遷移決策的計算開銷和遷移執(zhí)行開銷進行分析.
4.3.2 算法執(zhí)行效果分析
如3.3節(jié)所述,Storm拓撲在各節(jié)點上的分配是一個二次型三維約束條件的多背包問題,無法在多項式時間內(nèi)找到全局最優(yōu)解,但可以采用已有研究的方法將拓撲重部署以尋求近似全局最優(yōu)[43-45].而本文提出的線程遷移僅針對阻尼線程重新調(diào)度,可看作是任務(wù)分配模型的局部優(yōu)化.本節(jié)將通過對比分析,針對局部優(yōu)化后的效果展開討論.
流式計算的性能可使用各任務(wù)計算的時間開銷和任務(wù)間傳輸?shù)臅r間開銷進行衡量.為了實現(xiàn)任務(wù)的線程級并行,通常1個線程只包含1個任務(wù),因此文中線程與任務(wù)的含義一致.根據(jù)文獻[41]中流式計算模式下計算和通信開銷的評估方法可知,對于運行在源節(jié)點ns上的任意1個線程ei j,其計算開銷可表示為
(35)
(36)
(37)
(38)
隨著遷移次數(shù)k的增加,每次遷移后源節(jié)點ns可獲得的計算能力提升將愈加有限,即:
且
(39)
同理可分析各對線程之間的通信開銷,其函數(shù)關(guān)系為
(40)
(41)
(42)
式(42)為遷移前線程ei j與ek l之間的通信開銷與遷移后線程ei j與ek l之間的通信開銷的差值,表明了線程因遷移而降低的通信開銷,可見Δti j,k l僅與線程ei j與ek l之間剩余的網(wǎng)絡(luò)帶寬資源有關(guān).接下來即可采用與上述評估計算開銷相同的方法進行各次遷移過程中通信開銷的評估.結(jié)果表明,若在第m次遷移完成后,各對線程之間的通信開銷已達最優(yōu),則在共計m次的線程遷移過程中,通信開銷逐漸降低且下降的速率逐漸變緩,能夠由節(jié)點間通信轉(zhuǎn)化為節(jié)點內(nèi)線程間通信的數(shù)據(jù)流將越來越少.
設(shè)采用Storm默認調(diào)度策略的線程分配結(jié)果為G,采用本文策略進行局部遷移后的線程分配結(jié)果為G′,拓撲重部署后達到的近似全局最優(yōu)的線程分配結(jié)果為G″.如4.3.1節(jié)所述,在G向G′演變的過程中,對于?ns∈N,線程的遷移次數(shù)最多為Δhns;若要使得G′與G″相等,則需在G′的基礎(chǔ)上繼續(xù)遷移位于ns上的m-Δhns個線程.設(shè)第i次線程遷移可降低的計算和通信開銷共為Δti,則由G演變到G′的Δhns次遷移過程中可降低的開銷總和為
(43)
平均每次遷移可降低的開銷為
(44)
在此基礎(chǔ)上若再進行m-Δhns次遷移,即由G′演變到G″的過程中可降低的開銷總和為
(45)
平均每次遷移可降低的開銷為
(46)
由G到G″的m次遷移過程中可降低的開銷總和為
(47)
平均每次遷移可降低的開銷為
(48)
Storm為編程人員提供了可插拔的自定義調(diào)度器.為部署本文提出的任務(wù)遷移策略,需實現(xiàn)backtype.storm.scheduler.IScheduler接口中的schedule方法,其原型為public voidschedule(Topologiestopologies, Clustercluster).其中對象topologies包含當前集群運行的所有拓撲信息,包括各類參數(shù)的配置信息以及線程到組件ID的映射關(guān)系等;對象cluster包含當前集群的所有狀態(tài)信息,包括拓撲中各線程在節(jié)點和進程上的映射信息、節(jié)點和槽的使用與空閑信息等.以上信息均可通過各對象的API獲得.對于拓撲中各線程的CPU資源占用信息,可通過Java API中ThreadMXBean類的getThreadCpuTime(longid)方法獲取,其中id為線程ID;對于各線程的網(wǎng)絡(luò)帶寬資源占用信息,可通過Storm UI提供的REST API獲取節(jié)點間各線程的元組傳輸速率,并結(jié)合實驗中設(shè)置的元組大小,通過累加簡單估算求得;而由于各線程存在共享內(nèi)存,則對于各線程的內(nèi)存資源占用情況,僅能結(jié)合storm.yaml文件中配置的worker.childopts參數(shù)和jstack等JVM性能監(jiān)控工具進行粗略估計;此外,操作系統(tǒng)中硬件相關(guān)參數(shù)和負載信息可通過/proc目錄下相關(guān)文件獲取.代碼編寫完畢后,將其打jar包至${STORM_HOME}/lib目錄下,并在主控節(jié)點的storm.yaml文件中配置參數(shù)storm.scheduler即可運行.
改進的Storm架構(gòu)如圖5所示.需要說明的是,運行進程UI的控制臺節(jié)點和運行進程ZooKeeper的協(xié)調(diào)節(jié)點仍保持原狀,故圖5中將其相關(guān)部分省去.改進的Storm系統(tǒng)架構(gòu)中新增4個模塊:
1) 負載監(jiān)視器(load monitor).在一定時間窗口內(nèi),收集各線程占用的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負載信息及各線程之間的數(shù)據(jù)流大小.部署時需在各Spout的open()和nextTuple()方法以及各Bolt的prepare()和execute()方法中調(diào)用該模塊.
2) MySQL數(shù)據(jù)庫(MySQL Database).存儲任務(wù)分配信息和負載監(jiān)視器傳來的負載信息,并實時更新.
3) 遷移發(fā)生器(migration generator).部署算法1和算法2.負責讀取數(shù)據(jù)庫中的負載信息,并作出任務(wù)遷移決策.
4) 自定義調(diào)度器(custom scheduler).覆蓋主控節(jié)點的默認調(diào)度策略,讀取遷移發(fā)生器的調(diào)度決策并執(zhí)行遷移.
Fig. 5 Improved architecture of Storm圖5 改進的Storm系統(tǒng)架構(gòu)
為驗證任務(wù)遷移策略的有效性,本節(jié)將通過下述實驗進行比較和評價.
為更好地觀測資源有限且節(jié)點異構(gòu)的情況下任務(wù)遷移策略的執(zhí)行過程,實驗環(huán)境采用不同硬件配置的PC機搭建1個包含有20個節(jié)點的Storm集群,其中運行進程Nimbus、進程UI和數(shù)據(jù)庫服務(wù)MySQL的主控節(jié)點1個,運行進程ZooKeeper的協(xié)調(diào)節(jié)點3個,其余16個為運行進程Supervisor的工作節(jié)點.表1列出了各節(jié)點具體的硬件配置,其中各工作節(jié)點的CPU僅使用其單核的處理能力,硬盤容量為250 GB,轉(zhuǎn)速為7 200 r/min,接口為SATA3.0.在表1中,根據(jù)運行進程Supervisor的16個工作節(jié)點的硬件配置的高低,可大體將工作節(jié)點分為低端、中端和高端3類,為簡便起見,下文將運行進程Supervisor 1~3的工作節(jié)點簡稱為低配節(jié)點,將運行進程Supervisor 4~13的工作節(jié)點簡稱為中配節(jié)點,將運行進程Supervisor 14~16的工作節(jié)點簡稱為高配節(jié)點.除硬件配置之外,各節(jié)點軟件方面配置相同,如表2所示.
Table 1 Hardware Configuration of Storm Cluster表1 Storm集群硬件配置
Table 2 Software Configuration of Storm Cluster表2 Storm集群軟件配置
為全面測試任務(wù)遷移策略在各類不同資源開銷下的有效性,實驗數(shù)據(jù)選取GitHub上storm-benchmark-master提供的4組基準測試用例,分別是CPU敏感型(CPU-sensitive)的WordCount、內(nèi)存敏感型(memory-sensitive)的RollingSort、網(wǎng)絡(luò)帶寬敏感型(network-sensitive)的SOL以及Storm真實場景下的應(yīng)用RollingCount[46].各基準測試均采用其自帶的文本文檔作為輸入數(shù)據(jù).表3列出了各項參數(shù)配置,需要進一步解釋的參數(shù)如下:1)component.xxx_num為該基準測試中設(shè)置的組件并行度,即1個Spout或Bolt運行的實例(線程)數(shù)量.2)topology.pr_xxx為運行該項基準測試時設(shè)置的資源優(yōu)先級,其值越大表示該類資源的優(yōu)先級越高,在源節(jié)點選擇算法運行時將被優(yōu)先考慮.3)SOL中的topology.level表示拓撲的層次,即其包含的組件數(shù)量,需設(shè)置為大于或等于2的整數(shù);本文設(shè)置該值為3,結(jié)合component.xxx_num參數(shù)配置來看,該拓撲應(yīng)包含有1個運行著64個實例的
Table 3 Configuration of Benchmarks表3 基準測試參數(shù)配置
Spout和2個運行著128個實例的Bolt,其包含的線程總數(shù)與WordCount和RollingCount一致,但與RollingSort不同.除表3所示配置之外還需進行一些通用配置:1)為消除進程間通信開銷,各基準測試運行時僅在1個工作節(jié)點內(nèi)分配1個工作進程,即設(shè)置topology.workers為16;2)為保證數(shù)據(jù)流的可靠傳輸,各工作進程除了運行分配給它的線程之外,還額外運行1個Acker Bolt實例,即設(shè)置topology.acker.executors為16;3)為方便實驗觀測而有意提高工作節(jié)點負載,但需保證在表3的配置下防止元組傳輸因超時而重傳,通過多次實驗結(jié)果設(shè)置topology.max.spout.pending的合適值為100;4)為結(jié)合以上配置而適時觸發(fā)任務(wù)遷移,設(shè)定CPU占用率閾值α、內(nèi)存占用率閾值β和網(wǎng)絡(luò)帶寬占用率閾值γ均為70%,設(shè)定任務(wù)遷移策略的觸發(fā)周期T為30 s,表示系統(tǒng)在趨于穩(wěn)定后,若某類資源占用率在30 s內(nèi)持續(xù)超過70%,則觸發(fā)任務(wù)遷移策略.
為驗證本文遷移策略的有效性,文中除了與Storm默認輪詢策略進行對比之外,還部署了文獻[43]的Storm自適應(yīng)在線調(diào)度策略.其核心思想是實時監(jiān)測CPU負載情況和各對線程之間的數(shù)據(jù)流大小,當CPU負載持續(xù)超出閾值時觸發(fā)任務(wù)重部署機制,即首先按照大小遞減的順序排列拓撲中各對線程之間的數(shù)據(jù)流,然后將線程逐對調(diào)度至那些能夠令其部署后產(chǎn)生最低CPU負載的工作進程和工作節(jié)點中.該策略可看作是任務(wù)重部署策略的代表,下文簡稱為在線策略.表4列出了采用在線策略時的各項參數(shù)配置.需要說明的是,表4中的reschedule.timeout為在線策略的觸發(fā)周期,capacity為在線策略中CPU的使用率上限,這2個值分別與本文算法中的T和α值設(shè)置一致,目的是在同等CPU負載條件下觸發(fā)任務(wù)調(diào)度;此外,最后4項為在線策略中為優(yōu)化拓撲執(zhí)行效率而人為設(shè)定的值,與本文提到的資源占用率閾值α,β,γ無關(guān).
Table 4 Configuration of Online Scheduler表4 在線策略參數(shù)設(shè)置
本節(jié)首先使用WordCount,RollingSort,SOL這3組資源敏感型的基準測試在延遲、資源占用和工作節(jié)點間通信開銷這3個方面進行任務(wù)遷移策略的評估,最后使用RollingCount這一Storm環(huán)境下的真實場景在延遲方面進行測試.為便于數(shù)據(jù)統(tǒng)計,以下測試均設(shè)置metrics.poll=5 s,metrics.time=300 s,即每組實驗每5 s進行1次采樣,時長為5 min.
5.2.1 延遲測試
Fig. 6 Comparison of latency among different task scheduling strategies圖6 不同任務(wù)調(diào)度策略下的系統(tǒng)延遲對比
延遲表明了1個元組從Spout發(fā)射到最終被成功處理的時間消耗,反應(yīng)了拓撲執(zhí)行1次的響應(yīng)時間,刻畫了系統(tǒng)的運行效率.圖6統(tǒng)計了基準測試WordCount,RollingSort,SOL在Storm默認策略(Default)、在線策略(Online)和任務(wù)遷移策略(TMSH-Storm)下的系統(tǒng)延遲.
如圖6所示,從0開始到第1個峰值結(jié)束時間段表明各拓撲提交時的部署過程,此時的調(diào)度均遵循Storm默認策略,故在同一個基準測試中,各策略均需在這一階段耗費幾乎相同的時間,元組的處理延遲也大體相同.第1個峰值過后,系統(tǒng)延遲逐漸趨于收斂,在線策略與任務(wù)遷移策略開始收集集群中各工作節(jié)點以及工作節(jié)點上各線程占用的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負載信息及各線程之間的數(shù)據(jù)流大小,為各線程的優(yōu)化配置提供決策依據(jù).
圖6(a)展示了各策略執(zhí)行1次WordCount過程中的系統(tǒng)延遲.可見在第160 s時在線策略觸發(fā),延遲出現(xiàn)極高峰值.這是由于在線策略需根據(jù)各線程的CPU負載和各對線程之間數(shù)據(jù)流的大小情況,將拓撲中包含的所有線程在各工作節(jié)點上重新分配,整個過程相當于拓撲提交時的初始化任務(wù)分配,執(zhí)行開銷較大,此時數(shù)據(jù)流因無法及時處理而導(dǎo)致延遲出現(xiàn)極高峰值.由圖6(a)可知,該策略執(zhí)行對系統(tǒng)延遲的影響時長約在第160~205 s范圍內(nèi),共耗時約45 s,平均延遲約2.369 s,這將影響Storm系統(tǒng)處理數(shù)據(jù)的實時性并易導(dǎo)致數(shù)據(jù)流處理失敗.而任務(wù)遷移策略因只影響超出閾值的一小部分線程,執(zhí)行開銷較小.遷移發(fā)生在第165~185 s之間,共耗時約20 s,平均延遲約920.1 ms,有效降低了算法執(zhí)行過程對集群性能的沖擊.
接下來對內(nèi)存敏感型的RollingSort和網(wǎng)絡(luò)帶寬敏感型的SOL進行測試.需要說明的是,僅當CPU負載在30 s內(nèi)持續(xù)超過70%時在線策略才會被觸發(fā)生效,對于其他資源占用率超出閾值的情況并不作任何處理.本文為了更全面地開展對比實驗,特經(jīng)過反復(fù)調(diào)參設(shè)定各值(如表3所示),以保證在RollingSort和SOL的運行過程中,存在1個以上的工作節(jié)點在超出內(nèi)存和網(wǎng)絡(luò)帶寬閾值但不發(fā)生資源溢出的同時,令其CPU占用率也超出閾值,這樣即滿足了在線策略觸發(fā)的條件.由圖6(b)和圖6(c)可以看出,RollingSort和SOL的執(zhí)行過程與Word-Count類似,分別約在第140~170 s和第155~195 s期間執(zhí)行在線策略,平均延遲分別約為471.4 ms和983.6 ms;在第140~160 s和160~185 s期間執(zhí)行任務(wù)遷移策略,平均延遲分別約為217.4 ms和408.1 ms,分別僅為在線策略的46.1%和41.5%.其中Rolling-Sort的初次部署和重部署消耗的時間都略小于其他基準測試,這是因為它的組件中僅存在1個包含有64個線程的Spout和1個包含有128個線程的Bolt,相對于WordCount和SOL,其包含的線程數(shù)更少,因此具有更低的部署開銷;而SOL的遷移時間略大于其他的基準測試,這是因為它需要遷移的線程數(shù)量更多,具體原因?qū)⒃?.2.2節(jié)中進行闡述.
在線策略和任務(wù)遷移策略執(zhí)行完畢后,系統(tǒng)延遲再次趨于收斂,且表現(xiàn)明顯低于默認策略.在WordCount中,2種策略的系統(tǒng)延遲分別穩(wěn)定在332.4 ms和340.8 ms,相對于默認策略分別降低約28.7%和26.9%;在RollingSort中,延遲分別穩(wěn)定在93.3 ms和88.4 ms,相比默認策略降低約21.1%和25.3%;在SOL中,延遲分別穩(wěn)定在123.1 ms和118.6 ms,相比默認策略降低約24.6%和27.4%.可以看到,在WordCount基準測試中,任務(wù)遷移策略穩(wěn)定后的表現(xiàn)稍遜色于在線策略;而在RollingSort和SOL基準測試中,任務(wù)遷移策略穩(wěn)定后的表現(xiàn)甚至略優(yōu)于在線策略.這是因為在線策略在進行拓撲中各線程重部署時僅考慮到CPU資源剩余情況,對于集群中各工作節(jié)點而言,這樣的分配策略僅可在一定程度上滿足CPU這一資源層面上的負載均衡,適合于WordCount這類CPU敏感型的應(yīng)用;而就本實驗中各工作節(jié)點的硬件配置來看,10個中配節(jié)點和3個高配節(jié)點均具有相同的內(nèi)存大小和網(wǎng)絡(luò)帶寬,若僅從CPU資源方面考慮,3個高配節(jié)點必將承載更多的線程放置,進而導(dǎo)致內(nèi)存和網(wǎng)絡(luò)帶寬剩余資源緊缺形成性能瓶頸.而任務(wù)遷移策略充分考慮到各工作節(jié)點中各類資源的剩余情況,優(yōu)化遷移負載超出閾值的線程,保證各類資源的剩余情況均在閾值設(shè)定范圍之內(nèi),因此在RollingSort和SOL測試中表現(xiàn)更佳.
綜上所述,相對于Storm默認調(diào)度機制,在線策略和任務(wù)遷移策略均能有效降低系統(tǒng)延遲.而由于任務(wù)遷移策略能夠統(tǒng)籌兼顧工作節(jié)點中各類資源的剩余情況,且只針對負載超出閾值的少量線程進行遷移,因此更加適用于不同種類的應(yīng)用場景,且執(zhí)行過程不會對集群運行效率造成較大影響,保證了大數(shù)據(jù)流式處理的實時性.
5.2.2 資源占用測試
本節(jié)討論在Storm默認調(diào)度策略、在線策略和任務(wù)遷移策略下,CPU敏感型的WordCount、內(nèi)存敏感型的RollingSort和網(wǎng)絡(luò)帶寬敏感型的SOL分別運行時的資源占用情況.由于這3組基準測試具有明顯不同的資源占用傾向,因此只需分別測試其傾向于占用的資源類型即可.圖7展示了各基準測試在3類調(diào)度策略下運行穩(wěn)定后各工作節(jié)點的資源占用情況.
Fig. 7 Comparison of resource occupancy rate among different task scheduling strategies圖7 不同任務(wù)調(diào)度策略下的資源占用率對比
由圖7可知,由于Storm輪詢的調(diào)度機制為各工作節(jié)點分配相同的線程個數(shù),忽略了彼此之間的性能差異,因此各工作節(jié)點上的負載分配并不均勻,3個低配節(jié)點上的各類資源占用均已超出閾值.圖7(a)表示W(wǎng)ordCount運行時各工作節(jié)點的CPU占用率.由于低配、中配和高配3類節(jié)點具有明顯的CPU性能差異,因此在默認策略中顯示出了明顯的階梯特征,性能越低的工作節(jié)點CPU占用率越高,而同等配置的若干工作節(jié)點之間CPU占用率差距不大.圖7(b)(c)分別表示RollingSort運行時各工作節(jié)點的內(nèi)存占用率和SOL運行時各工作節(jié)點的網(wǎng)絡(luò)帶寬占用率.由于在表1的硬件配置中,中高配節(jié)點的內(nèi)存和網(wǎng)絡(luò)帶寬性能相同,僅與3個低配節(jié)點存在性能差異,因此中高配節(jié)點上這2類資源的占用率相差不大,而低配節(jié)點的占用率明顯更高,且資源占用與其擁有的資源總量基本呈現(xiàn)反比關(guān)系.以圖7(b)為例,對于配置了1 GB內(nèi)存的3個低配節(jié)點,其內(nèi)存占用率平均值為78.3%;而對于配置了2 GB內(nèi)存的其他13個工作節(jié)點,其內(nèi)存占用平均值僅為39.2%,約為低配節(jié)點的一半左右.可見默認策略僅適用于Storm同構(gòu)環(huán)境,異構(gòu)環(huán)境中則極易造成嚴重的資源占用傾斜甚至溢出.
在線策略根據(jù)CPU負載和各對線程之間的數(shù)據(jù)流大小情況實現(xiàn)在線任務(wù)重部署.這種做法執(zhí)行開銷較大,但針對CPU敏感型的拓撲而言,能夠達到異構(gòu)環(huán)境下CPU層面上負載均衡的效果.圖7(a)充分說明了這一點,可以看出在線策略運行穩(wěn)定后各工作節(jié)點的CPU占用率基本均衡.但在圖7(b)和(c)中,高配節(jié)點的內(nèi)存和網(wǎng)絡(luò)帶寬占用率明顯更高,甚至在圖7(c)的Supervisor 15中,網(wǎng)絡(luò)帶寬占用率已超出設(shè)定的閾值.這是由于在線策略僅孤立地考慮CPU負載,而忽略了其他資源的剩余情況.在本實驗的硬件配置環(huán)境下,10個中配節(jié)點和3個高配節(jié)點的CPU配置不同,而內(nèi)存和網(wǎng)絡(luò)帶寬配置一致,當在線策略執(zhí)行后,勢必導(dǎo)致更多的線程分配至高配節(jié)點,內(nèi)存和網(wǎng)絡(luò)帶寬占用率必將大幅上升.特別地,當高配節(jié)點CPU性能更高,而其他類型的硬件配置與中配節(jié)點持平甚至更低的情況下,內(nèi)存與網(wǎng)絡(luò)帶寬資源可能發(fā)生溢出并導(dǎo)致拓撲無法執(zhí)行.這是在線策略的另一個缺陷.
探討本文提出的任務(wù)遷移策略.當某工作節(jié)點上某類資源的占用率持續(xù)30 s超出閾值時,遷移發(fā)生器根據(jù)本文提出的源節(jié)點選擇算法和任務(wù)遷移算法,選擇少量線程執(zhí)行遷移,直到集群中不存在任何工作節(jié)點的任意類型資源負載超出閾值為止.由于任務(wù)遷移策略觸發(fā)前采用的依舊是Storm默認輪詢的調(diào)度機制,因此可將圖中的Default策略看成是遷移策略執(zhí)行前各工作節(jié)點的資源占用情況.由圖7可知,WordCount,RollingSort,SOL在默認策略下運行時,3個低配節(jié)點的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬占用已分別超出閾值.當任務(wù)遷移策略執(zhí)行完畢并趨于穩(wěn)定后,低配節(jié)點上的若干線程遷移到了其他節(jié)點,任務(wù)過載問題均得以解決.此時通過Storm UI觀測發(fā)現(xiàn),在圖7(a)所示的WordCount運行過程中,原分布在3個低配節(jié)點上的11個線程分別遷移到了節(jié)點Supervisor 7,8,9,11,14上,其中節(jié)點Supervisor 9容納3個線程,其余節(jié)點各容納2個線程;同理,在圖7(b)所示的RollingSort運行過程中,原分布在3個低配節(jié)點上的8個線程分別遷移到了節(jié)點Supervisor 4,5,6,11,16上,其中節(jié)點Supervisor 4和11分別容納1個線程,其余節(jié)點各容納2個線程.在圖7(c)所示的SOL運行過程中,3個低配節(jié)點的網(wǎng)絡(luò)帶寬占用率分別為87.9%,92.1%,90%,已較大程度超出閾值,因此遷移的線程數(shù)量較多.據(jù)統(tǒng)計,低配節(jié)點中共計27個線程分別遷移到了節(jié)點Supervisor 5,9,12,14上,各節(jié)點分別容納8個、6個、8個和5個線程.由此可見,任務(wù)遷移策略能夠統(tǒng)籌兼顧Storm異構(gòu)環(huán)境下各類資源的剩余情況,有效解決任務(wù)過載問題,但出于最小遷移開銷考慮,尚無法實現(xiàn)集群中各工作節(jié)點的負載均衡.為更好地解決這一問題,需在拓撲運行前充分分析其內(nèi)部結(jié)構(gòu),使用改進的任務(wù)分配方式取代輪詢方式的初次部署,未來將繼續(xù)開展研究.
5.2.3 節(jié)點間通信開銷測試
本節(jié)討論在Storm默認策略、在線策略和任務(wù)遷移策略下,WordCount,RollingSort,SOL運行時的工作節(jié)點間通信開銷.圖8展示了10次實驗中各基準測試運行穩(wěn)定后工作節(jié)點間單位時間通信總量的均值.
Fig. 8 Comparison of inter-node communication overhead among different task scheduling strategies圖8 不同任務(wù)調(diào)度策略下的節(jié)點間通信開銷對比
由圖8可知,使用在線策略和任務(wù)遷移策略執(zhí)行3組基準測試之后,工作節(jié)點間傳輸?shù)臄?shù)據(jù)流大小均有所下降.在線策略執(zhí)行后,工作節(jié)點間數(shù)據(jù)流大小的平均值分別為61 743 tuple/s,27 504 tuple/s,33 046 tuple/s,相對于默認策略分別降低了13.8%,19.6%,23.8%;任務(wù)遷移策略執(zhí)行后,工作節(jié)點間數(shù)據(jù)流大小的平均值分別為64 130 tuple/s,29 665 tuple/s,35 213 tuple/s,相比默認策略的運行結(jié)果分別降低了10.4%,13.3%,18.8%,效果稍落后于在線策略.這是因為在線策略是以降低工作節(jié)點間通信開銷為目的進行拓撲中各線程的重新部署,雖然執(zhí)行開銷大且易導(dǎo)致資源占用不均,但優(yōu)化的范圍更廣.然而從優(yōu)化效率的角度來看,當任務(wù)遷移策略執(zhí)行結(jié)束之后,各基準測試中遷移的線程數(shù)量分別為11個、8個和27個,平均遷移1個線程可降低的工作節(jié)點間通信開銷約為0.9%,1.6%,0.7%;而對于在線策略而言,所需重部署的線程數(shù)量即為該拓撲中包含的線程總數(shù),分別為336個、208個和336個,平均遷移1個線程可降低的工作節(jié)點間通信開銷微乎其微,遠小于任務(wù)遷移策略的優(yōu)化效率,這與4.3.2節(jié)中算法執(zhí)行效果分析的結(jié)果是一致的.
5.2.4 真實應(yīng)用場景下的測試
Fig. 9 Comparison of latency on RollingCount among different task scheduling strategies圖9 RollingCount在不同任務(wù)調(diào)度策略下的系統(tǒng)延遲對比
RollingCount是Storm環(huán)境下的一個典型大數(shù)據(jù)應(yīng)用程序,它用于在內(nèi)存中持續(xù)按照某個統(tǒng)計指標(如出現(xiàn)次數(shù))計算窗口內(nèi)的TopN,然后每隔一段時間輸出實時計算后的TopN結(jié)果,能夠廣泛應(yīng)用于各類大數(shù)據(jù)實時排序需求的場景,例如實時熱門微博、廣告和商品等的統(tǒng)計.表3中的參數(shù)window.length和emit.frequency即為設(shè)定的窗口長度和統(tǒng)計頻率,單位為s.本組實驗采用與5.2.1節(jié)中描述相同的方法統(tǒng)計RollingCount分別在Storm默認策略、在線策略和任務(wù)遷移策略下運行的系統(tǒng)延遲,結(jié)果如圖9所示:
由圖9可知,與之前3個基準測試結(jié)果類似,RollingCount的部署需要一個過程.第1個峰值過后,系統(tǒng)延遲趨于平穩(wěn),在線策略和任務(wù)遷移策略開始統(tǒng)計集群中各工作節(jié)點以及工作節(jié)點上各線程占用的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負載信息及各線程之間的數(shù)據(jù)流大小.第155 s時在線策略觸發(fā),拓撲中各任務(wù)在各工作節(jié)點上重新部署,約耗時40 s,平均延遲約2.145 s;任務(wù)遷移策略觸發(fā)于第155 s,約耗時20 s,平均延遲約877.8 ms,僅為在線策略的40.9%左右,執(zhí)行過程中共有17個線程發(fā)生遷移.由此可見,任務(wù)遷移策略有效降低了調(diào)度的執(zhí)行過程對系統(tǒng)實時性造成的負面影響.2種策略執(zhí)行完畢后,系統(tǒng)延遲再次趨于收斂并分別穩(wěn)定在約331.1 ms和339.9 ms,相對于默認策略分別降低約23.7%和21.7%,兩者差距很小,實驗結(jié)果較為理想.可見,在數(shù)據(jù)流大小變化迅速且任務(wù)過載時有發(fā)生的Storm商業(yè)應(yīng)用領(lǐng)域中,使用任務(wù)遷移策略平滑調(diào)整將更有利于保證Storm處理的實時性.
Storm作為大數(shù)據(jù)流式計算的主流框架,已逐漸引起學術(shù)界和工業(yè)界的廣泛關(guān)注.然而其默認的輪詢調(diào)度機制并未考慮到不同工作節(jié)點的自身性能和負載差異,以及工作節(jié)點之間的網(wǎng)絡(luò)傳輸開銷和節(jié)點內(nèi)部的進程與線程通信開銷,無法最大化發(fā)揮Storm集群的性能.近年來已有研究改進了Storm默認調(diào)度機制存在的不足,但仍存在應(yīng)用場景單一和算法開銷過大等問題.本文通過分析Storm基本模型和3種不同的通信方式,建立了Storm異構(gòu)環(huán)境下的資源約束模型、最優(yōu)通信開銷模型和任務(wù)遷移模型,并在此基礎(chǔ)上提出了包含源節(jié)點選擇算法和任務(wù)遷移算法的任務(wù)遷移策略,使系統(tǒng)能夠根據(jù)各工作節(jié)點和各任務(wù)的實時負載情況和任務(wù)間的數(shù)據(jù)流大小,決策并實施任務(wù)的優(yōu)化遷移.最后通過4個基準測試從延遲、資源占用、通信開銷角度證明了算法的有效性.
下一步研究工作主要集中在3個方面:
1) 將本文提出的任務(wù)遷移策略進一步推廣至更為復(fù)雜的Storm商業(yè)應(yīng)用領(lǐng)域,使其適用于多租戶且種類更多的業(yè)務(wù)場景.
2) 目前拓撲執(zhí)行需要的進程和線程數(shù)量完全由用戶(程序員)設(shè)置,研究拓撲中各組件的自適應(yīng)并行度調(diào)節(jié)機制將能在提高節(jié)點資源利用率的同時,有效提高拓撲的執(zhí)行效率.
3) 從拓撲自身的結(jié)構(gòu)特征出發(fā)優(yōu)化算法,在保證異構(gòu)Storm集群高效運行的同時達到負載均衡的效果.
[1] Meng Xiaofeng, Ci Xiang. Big data management: Concepts, techniques and challenges[J]. Journal of Computer Research and Development, 2013, 50(1): 146-169 (in Chinese)
(孟小峰, 慈祥. 大數(shù)據(jù)管理: 概念、技術(shù)與挑戰(zhàn)[J]. 計算機研究與發(fā)展, 2013, 50(1): 146-169)
[2] Chen C L P, Zhang Chunyang. Data-intensive applications, challenges, techniques and technologies: A survey on big data[J]. Information Sciences, 2014, 275(11): 314-347
[3] Kambatla K, Kollias G, Kumar V, et al. Trends in big data analytics[J]. Journal of Parallel and Distributed Computing, 2014, 74(7): 2561-2573
[4] Sun Dawei. Big data stream comuting: Features and challenges[J]. Big Data Research, 2015,1(3): 99-105 (in Chinese)
(孫大為. 大數(shù)據(jù)流式計算: 應(yīng)用特征和技術(shù)挑戰(zhàn)[J]. 大數(shù)據(jù), 2015,1(3): 99-105)
[5] Ranjan R. Streaming big data processing in datacenter clouds[J]. IEEE Cloud Computing, 2014, 1(1): 78-83
[6] Apache. Apache Hadoop[EB/OL]. [2016-08-05]. http://hadoop.apache.org
[7] Vamanan B, Sohail H B, Hasan J, et al. Timetrader: Exploiting latency tail to save datacenter energy for online search[C] //Proc of the 48th Int Symp on Microarchitecture. New York: ACM, 2015: 585-597
[8] Sun Dawei, Zhang Guangyan, Zheng Weimin. Big data stream computing: Technologies and instances[J]. Journal of Software, 2014, 25(4): 839-862 (in Chinese)
(孫大為, 張廣艷, 鄭緯民. 大數(shù)據(jù)流式計算: 關(guān)鍵技術(shù)及系統(tǒng)實例[J]. 軟件學報, 2014, 25(4): 839-862)
[9] Toshniwal A, Taneja S, Shukla A, et al. Storm@Twitter[C] //Proc of the 2014 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2014: 147-156
[10] Alexandrov A, Bergmann R, Ewen S, et al. The stratosphere platform for big data analytics[J]. The VLDB Journal, 2014, 23(6): 939-964
[11] Zaharia M, Das T, Li Haoyuan, et al. Discretized streams: An efficient and fault-tolerant model for stream processing on large clusters[C] //Proc of the 4th USENIX Conf on Hot Topics in Cloud Computing. Berkeley, CA: USENIX Association, 2012: 1-6
[12] Borthakur D, Gray J, Sarma J S, et al. Apache Hadoop goes realtime at Facebook[C] //Proc of the 2011 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2011: 1071-1080
[13] Neumeyer L, Robbins B, Nair A, et al. S4: Distributed stream computing platform[C] //Proc of the 10th IEEE Int Conf on Data Mining Workshops (ICDMW 2010). Piscataway, NJ: IEEE, 2010: 170-177
[14] Fischer M J, Su Xueyuan, Yin Yitong. Assigning tasks for efficiency in Hadoop[C] //Proc of the 22nd Annual ACM Symp on Parallelism in Algorithms and Architectures. New York: ACM, 2010: 30-39
[15] Bhatotia P, Wieder A, Rodrigues R, et al. Incoop: MapReduce for incremental computations[C] //Proc of the 2nd ACM Symp on Cloud Computing. New York: ACM, 2011: 1-14
[16] Borkar V, Carey M, Grover R, et al. Hyracks: A flexible and extensible foundation for data-intensive computing[C] //Proc of the 27th Int Conf on Data Engineering. Piscataway, NJ: IEEE, 2011: 1151-1162
[17] Chen Fangfei, Kodialam M, Lakshman T V. Joint scheduling of processing and shuffle phases in MapReduce systems[C] //Proc of the 2012 IEEE INFOCOM. Piscataway, NJ: IEEE, 2012: 1143-1151
[18] Chen Gang, Chen Ke, Jiang Dawei, et al. E3: An elastic execution engine for scalable data processing[J]. Journal of Information Processing, 2012, 20(1): 65-76
[19] Jin Hui, Yang Xi, Sun Xianhe, et al. ADAPT: Availability-aware MapReduce data placement for non-dedicated distributed computing[C] //Proc of the 32nd Int Conf on Distributed Computing Systems (ICDCS). Piscataway, NJ: IEEE, 2012: 516-525
[20] Kumar V, Andrade H, Gedik B, et al. DEDUCE: At the intersection of MapReduce and stream processing[C] //Proc of the 13th Int Conf on Extending Database Technology. New York: ACM, 2010: 657-662
[21] Condie T, Conway N, Alvaro P, et al. Online aggregation and continuous query support in MapReduce[C] //Proc of the 2010 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2010: 1115-1118
[22] Karve R, Dahiphale D, Chhajer A. Optimizing cloud MapReduce for processing stream data using pipelining[C] //Proc of the 5th UKSim European Symp on Computer Modeling and Simulation (EMS). Piscataway, NJ: IEEE, 2011: 344-349
[23] Backman N, Pattabiraman K, Fonseca R, et al. C-MR: Continuously executing MapReduce workflows on multi-core processors[C] //Proc of the 3rd Int Workshop on MapReduce and Its Applications Date. New York: ACM, 2012: 1-8
[24] Lam W, Liu Lu, Prasad S T S, et al. Muppet: MapReduce-style processing of fast data[J]. Proceedings of the VLDB Endowment, 2012, 5(12): 1814-1825
[25] Aly A M, Sallam A, Gnanasekaran B M, et al. M3: Stream processing on main-memory MapReduce[C] //Proc of the 28th Int Conf on Data Engineering. Piscataway, NJ: IEEE, 2012: 1253-1256
[26] Li K C, Jiang Hai, Yang L T, et al. Big Data: Algorithms, Analytics, and Applications[M]. Boca Raton, FL: CRC Press, 2015: 193-214
[27] Daoud M I, Kharma N. A hybrid heuristic-genetic algorithm for task scheduling in heterogeneous processor networks[J]. Journal of Parallel & Distributed Computing, 2011, 71(11): 1518-1531
[28] Sinnen O, To A, Kaur M. Contention-aware scheduling with task duplication[J]. Journal of Parallel and Distributed Computing, 2011, 71(1): 77-86
[29] Wang Changdong, Lai Jianhuang, Huang Dong, et al. SVStream: A support vector-based algorithm for clustering data streams[J]. IEEE Trans on Knowledge & Data Engineering, 2013, 25(6): 1410-1424
[30] Xu Yuming, Li Kenli, He Ligang, et al. A DAG scheduling scheme on heterogeneous computing systems using double molecular structure-based chemical reaction optimization[J]. Journal of Parallel & Distributed Computing, 2013, 73(9): 1306-1322
[31] Saikrishna P S, Pasumarthy R. Automated control of webserver performance in a cloud environment[C] //Proc of the 2013 IEEE Recent Advances in Intelligent Computational Systems (RAICS). Piscataway, NJ: IEEE, 2013: 239-244
[32] Al-Haidari F, Sqalli M, Salah K. Impact of CPU utilization thresholds and scaling size on autoscaling cloud resources[C] //Proc of the 5th IEEE Int Conf on Cloud Computing Technology and Science (CloudCom). Piscataway, NJ: IEEE, 2013: 256-261
[33] Van d V J S, Van D W B, Lazovik E, et al. Dynamically scaling Apache Storm for the analysis of streaming data[C] //Proc of the 1st IEEE Int Conf on Big Data Computing Service and Applications. Piscataway, NJ: IEEE, 2015: 154-161
[34] Lorido-Botran T, Miguel-Alonso J, Lozano J A. A review of auto-scaling techniques for elastic applications in cloud environments[J]. Journal of Grid Computing, 2014, 12(4): 559-592
[35] Trihinas D, Pallis G, Dikaiakos M D. JCatascopia: Monitoring elastically adaptive applications in the cloud[C] //Proc of the 14th IEEE/ACM Int Symp on Cluster, Cloud and Grid Computing (CCGrid). Piscataway, NJ: IEEE, 2014: 226-235
[36] Nikravesh A Y, Ajila S A, Lung C H. Cloud resource auto-scaling system based on hidden Markov model (HMM)[C] //Proc of the 2014 IEEE Int Conf on Semantic Computing (ICSC). Piscataway, NJ: IEEE, 2014: 124-127
[37] Wolf J, Bansal N, Hildrum K, et al. SODA: An optimizing scheduler for large-scale stream-based distributed computer systems[C] //Proc of the 9th ACM/IFIP/USENIX Int Conf on Middleware. Berlin: Springer, 2008: 306-325
[38] Amini L, Andrade H, Bhagwan R, et al. SPC: A distributed, scalable platform for data mining[C] //Proc of the 4th Int Workshop on Data Mining Standards, Services and Platforms. New York: ACM, 2006: 27-37
[39] Sun Dawei, Fu Ge, Liu Xinran, et al. Optimizing data stream graph for big data stream computing in cloud datacenter environments[J]. International Journal of Advancements in Computing Technology, 2014, 6(5): 53-65
[40] Cordeschi N, Shojafar M, Amendola D, et al. Energy-efficient adaptive networked datacenters for the QoS support of real-time applications[J]. The Journal of Supercomputing, 2014, 71(2): 448-478
[41] Sun Dawei, Zhang Guangyan, Yang Songlin, et al. Re-Stream: Real-time and energy-efficient resource scheduling in big data stream computing environments[J]. Information Sciences, 2015, 319: 92-112
[42] Cardellini V, Grassi V, Lo Presti F, et al. Distributed QoS-aware scheduling in Storm[C] //Proc of the 9th ACM Int Conf on Distributed Event-Based Systems. New York: ACM, 2015: 344-347
[43] Aniello L, Baldoni R, Querzoni L. Adaptive online scheduling in Storm[C] //Proc of the 7th ACM Int Conf on Distributed Event-Based Systems. New York: ACM, 2013: 207-218
[44] Xu Jielong, Chen Zhenhua, Tang Jian, et al. T-Storm: Traffic-aware online scheduling in Storm[C] //Proc of the 34th IEEE Int Conf on Distributed Computing Systems. Piscataway, NJ: IEEE, 2014: 535-544
[45] Peng Boyang, Hosseini M, Hong Zhihao, et al. R-Storm: Resource-aware scheduling in Storm[C] //Proc of the 16th Annual Middleware Conf. New York: ACM, 2015: 149-161
[46] Zhang Manu. Intel-hadoop/storm-benchmark forked from manuzhang/storm-benchmark[EB/OL]. (2015-11-02) [2016-08-05]. https://github.com/intel-hadoop/storm-benchmark
[47] Marz N. Public stormprocessor/storm-benchmark[EB/OL]. (2012-08-20) [2016-08-05]. https://github.com/stormprocessor/storm-benchmark
[48] Martello S, Toth P. Dynamic programming and strong bounds for the 0-1 knapsack problem[J]. Management Science, 1999, 45(3): 414-424
[49] Sarkar U K, Chakrabarti P P, Ghose S, et al. Reducing reexpansions in iterative-deepening search by controlling cutoff bounds[J]. Artificial Intelligence, 1991, 50(2): 207-221
[50] Chekuri C, Khanna S. A polynomial time approximation scheme for the multiple knapsack problem[J]. SIAM Journal on Computing, 2005, 35(3): 713-728
[51] Fayard D, Zissimopoulos V. An approximation algorithm for solving unconstrained two-dimensional knapsack problems[J]. European Journal of Operational Research, 1995, 84(3): 618-632