国产日韩欧美一区二区三区三州_亚洲少妇熟女av_久久久久亚洲av国产精品_波多野结衣网站一区二区_亚洲欧美色片在线91_国产亚洲精品精品国产优播av_日本一区二区三区波多野结衣 _久久国产av不卡

?

空氣質(zhì)量監(jiān)測(cè)大數(shù)據(jù)區(qū)間的統(tǒng)計(jì)問(wèn)題

2019-05-23 02:55劉黎志何經(jīng)緯
關(guān)鍵詞:服務(wù)端調(diào)用空氣質(zhì)量

劉黎志,何經(jīng)緯

智能機(jī)器人湖北省重點(diǎn)實(shí)驗(yàn)室(武漢工程大學(xué)),湖北 武漢 430205

城市空氣質(zhì)量監(jiān)測(cè)站的監(jiān)測(cè)過(guò)程需要記錄大量實(shí)時(shí)數(shù)據(jù),以及根據(jù)實(shí)時(shí)數(shù)據(jù)計(jì)算出的小時(shí)均值數(shù)據(jù)、日均值數(shù)據(jù)和評(píng)價(jià)數(shù)據(jù)[1-3]。湖北省環(huán)境中心站所管轄的102個(gè)自動(dòng)化站每天產(chǎn)生的海量數(shù)據(jù),如果使用關(guān)系型數(shù)據(jù)庫(kù)存儲(chǔ),數(shù)據(jù)檢索的實(shí)時(shí)性和效率將無(wú)法保證。基于Hadoop的大數(shù)據(jù)解決方案的研究為有效存儲(chǔ)和快速檢索空氣質(zhì)量監(jiān)測(cè)數(shù)據(jù)提供了新途徑,其中HBase是建立在Hadoop之上,具有高可靠性、高性能、列存儲(chǔ)、可伸縮、實(shí)時(shí)讀寫(xiě)等特點(diǎn)的數(shù)據(jù)庫(kù)系統(tǒng),HBase通過(guò)指定行鍵(row key)的范圍來(lái)查詢(xún)數(shù)據(jù),為海量的數(shù)據(jù)提供高效率的數(shù)據(jù)維護(hù)及檢索功能[4-11]。

在對(duì)空氣質(zhì)量監(jiān)測(cè)數(shù)據(jù)進(jìn)行查詢(xún)時(shí),通常需要對(duì)某個(gè)監(jiān)測(cè)值或評(píng)價(jià)值進(jìn)行區(qū)間統(tǒng)計(jì),如統(tǒng)計(jì)宜昌市全年NO2的實(shí)時(shí)濃度值在0.00~41.00 μg/m3,43.05~82.00 μg/m3,84.05~123.00 μg/m3,125.05~164.00 μg/m3,166.05~205.00 μg/m3區(qū)間的分布情況;統(tǒng)計(jì)宜昌市的伍家崗站2016年6月輕度污染以上的天數(shù),即 AQI指數(shù)分別在 101~150,151~200,201~ 300,>300的天數(shù)。HBase提供的 Scan方法,每執(zhí)行一次next操作,只會(huì)從服務(wù)端讀取一行數(shù)據(jù),因此掃描多個(gè)Region會(huì)在客戶(hù)端和服務(wù)端之間形成大量的遠(yuǎn)程過(guò)程調(diào)用(remote procedure call,RPC)通訊,從而影響查詢(xún)效率。HBase0.92版本中提出的終端(Endpoint)協(xié)處理器可以在服務(wù)端完成計(jì)數(shù)、求和、求最大值等統(tǒng)計(jì)工作,并將結(jié)果返回到客戶(hù)端,減少了客戶(hù)端到服務(wù)端的RPC調(diào)用,從而極大地提高了統(tǒng)計(jì)查詢(xún)的效率[12-14]。本文將對(duì)如何使用終端(Endpoint)協(xié)處理器對(duì)空氣質(zhì)量監(jiān)測(cè)大數(shù)據(jù)進(jìn)行區(qū)間統(tǒng)計(jì)進(jìn)行討論。

1 空氣質(zhì)量監(jiān)測(cè)大數(shù)據(jù)存儲(chǔ)模式設(shè)計(jì)

基于HBase的空氣質(zhì)量監(jiān)測(cè)大數(shù)據(jù)的存儲(chǔ)模式設(shè)計(jì)如圖1所示。

空氣質(zhì)量存儲(chǔ)模式的具體描述見(jiàn)文獻(xiàn)[15],實(shí)際的應(yīng)用證明該模式可以有效地對(duì)空氣質(zhì)量監(jiān)測(cè)數(shù)據(jù)進(jìn)行存儲(chǔ)及滿足地區(qū)、站點(diǎn)之間的數(shù)據(jù)同比、數(shù)據(jù)環(huán)比、趨勢(shì)分析等查詢(xún)所需的要求。

圖1 空氣質(zhì)量監(jiān)測(cè)數(shù)據(jù)存儲(chǔ)模式Fig.1 Store schema of air quality monitoring data

2 區(qū)間統(tǒng)計(jì)協(xié)處理器

協(xié)處理器分為觀察者(Observer)模式及終端(Endpoint)模式兩種。終端協(xié)處理器可以將數(shù)據(jù)檢索統(tǒng)計(jì)過(guò)程放在服務(wù)端完成,減少客戶(hù)端到服務(wù)端的遠(yuǎn)程過(guò)程調(diào)用產(chǎn)生的通訊開(kāi)銷(xiāo),從而提高統(tǒng)計(jì)效率,使用終端協(xié)處理器對(duì)數(shù)據(jù)進(jìn)行區(qū)間統(tǒng)計(jì)的過(guò)程如圖2所示。

圖2 協(xié)處理器調(diào)用過(guò)程Fig.2 Process procedure of co-processor

數(shù)據(jù)區(qū)間統(tǒng)計(jì)的步驟為:1)定義EMCStat.proto文件,按照protobuf協(xié)議定義區(qū)間統(tǒng)計(jì)協(xié)處理器的 request,response消息格式及 RPC服務(wù);2)定義協(xié)處理器類(lèi)EMCStatEndPoint,實(shí)現(xiàn)EMCStat.proto文件中定義的RPC服務(wù)EMCStatService,服務(wù)中的getEMCStat方法實(shí)現(xiàn)區(qū)間統(tǒng)計(jì)的業(yè)務(wù)邏輯;3)為EMCData表加載 EMCStatEndpoint協(xié)處理器;4)客戶(hù)端調(diào)用EMCStatEndpoint協(xié)處理器,對(duì)分布在不同Region上的數(shù)據(jù)進(jìn)行區(qū)間統(tǒng)計(jì),并輸出結(jié)果。

2.1 Protobuf協(xié)議

終端協(xié)處理器使用protobuf協(xié)議來(lái)定義客戶(hù)端與服務(wù)端進(jìn)行通信的消息格式,實(shí)現(xiàn)空氣質(zhì)量區(qū)間統(tǒng)計(jì)終端協(xié)處理器的protobuf協(xié)議的定義為:

message EMCStatRequest

{ //定義客戶(hù)端請(qǐng)求協(xié)議格式

required string areacode=1;//地區(qū)碼

optional string ssid=2 ;//站點(diǎn)編碼

required string stattime=3;//統(tǒng)計(jì)開(kāi)始時(shí)間

required string endtime=4;//統(tǒng)計(jì)結(jié)束時(shí)間

required string cf=5;//列簇名

required string qual=6;//列限定符

message LHLimit//區(qū)間嵌套消息

required float ll=1;//區(qū)間下限

required float hl=2;//區(qū)間上限

repeated LHLimit lh=7;

//區(qū)間消息可重復(fù),表示可以定義多個(gè)區(qū)間

message EMCStatResponse

{ //定義服務(wù)端返回協(xié)議格式

required string areacode=1;//地區(qū)碼

optional string ssid=2 ;//站點(diǎn)編碼

required string cf=3;//列簇名

required string qual=4 ;//列限定符

message LHCount/區(qū)間統(tǒng)計(jì)結(jié)果嵌套消息

required float ll=1 ;//區(qū)間下限

required float hl=2;//區(qū)間上限

required int64 count=3 ;//區(qū)間計(jì)數(shù)

repeated LHCount lhc=5;

}//可以輸出多個(gè)區(qū)間統(tǒng)計(jì)結(jié)果

service EMCStatService

{//協(xié)議服務(wù)名

//rpc調(diào)用方法名

rpc getEMCStat(EMCStatRequest)

returns(EMCStatResponse);

客戶(hù)端在調(diào)用服務(wù)端的終端協(xié)處理器時(shí),會(huì)根據(jù)EMCStatRequest協(xié)議的格式,向協(xié)處理器傳遞參數(shù),包括:區(qū)間統(tǒng)計(jì)的地區(qū)碼、站點(diǎn)編碼、統(tǒng)計(jì)時(shí)間段,需要統(tǒng)計(jì)的列簇名及列限定符名,統(tǒng)計(jì)區(qū)間集合列表。協(xié)議服務(wù)EMCStatService表示其RPC方法 getEMCStat以EMCStatRequest消息為輸入,在獲取其定義的所需參數(shù)后,執(zhí)行區(qū)間統(tǒng)計(jì)程序,服務(wù)端協(xié)處理器按照EMCStatResponse協(xié)議格式將區(qū)間統(tǒng)計(jì)的結(jié)果返回給客戶(hù)端。所有協(xié)議被定義在EMCStat.proto文件中,使用protoc工具,執(zhí)行protoc--java_out=./src EMCStat.proto命令,可以在項(xiàng)目中生成EMCStatProtos.java文件,該Java文件是區(qū)間統(tǒng)計(jì)協(xié)處理器數(shù)據(jù)交換協(xié)議的代碼實(shí)現(xiàn),文件中定義了EMCStatService抽象類(lèi)以及抽象方法getEMCStat。

2.2 區(qū)間統(tǒng)計(jì)協(xié)處理器實(shí)現(xiàn)

定義EMCStatEndPoint類(lèi)為區(qū)間統(tǒng)計(jì)協(xié)處理器的實(shí)現(xiàn)邏輯類(lèi),該類(lèi)繼承于EMCStatService抽象類(lèi),并實(shí)現(xiàn)了Coprocessor和CoprocessorService接口,EMCStatEndPoint類(lèi)中的getEMCStat方法用于實(shí)現(xiàn)區(qū)間統(tǒng)計(jì)過(guò)程,主要過(guò)程如下:

算 法 getEMCStat(RpcController rpcCt,EMCStatRequest emcsRequest,RpcCallback <EMCStatResponse> done){

輸入:emcsRequest;

輸出:done;

1:Scan sc=new Scan();sc.setMaxVersions();

2:讀取emcsRequest消息的各個(gè)字段,包括地區(qū)碼、站點(diǎn)編碼、統(tǒng)計(jì)時(shí)間段、列簇名、列限定符賦值到對(duì)應(yīng)的變量;

3:根據(jù)emcsRequest消息提供的統(tǒng)計(jì)區(qū)間對(duì)區(qū)間類(lèi)集合列表進(jìn)行初始化,將每個(gè)區(qū)間的計(jì)數(shù)設(shè)置為0;

4:if(站點(diǎn)編碼為空){將地區(qū)下的所有站點(diǎn)編碼添加到lstSSIDS集合,表示統(tǒng)計(jì)所有站點(diǎn)};

5:else{將站點(diǎn)編碼添加到lstSSIDS集合}

6:EMCStatResponse response=null;InternalScanner itScanner=null;

7:for(String assid:lstSSIDS){

8:sc.setStartRow(startKey);//區(qū)間統(tǒng)計(jì) startKey為地區(qū)碼_站點(diǎn)編碼_統(tǒng)計(jì)開(kāi)始時(shí)間

9:sc.setStopRow(endKey);//區(qū)間統(tǒng)計(jì)endKey為地區(qū)碼_站點(diǎn)編碼_統(tǒng)計(jì)結(jié)束時(shí)間

//判斷是否需要對(duì)該region進(jìn)行統(tǒng)計(jì)

10:if(startKey > env.getRegion().getEndKey()||end-Key < env.getRegion().getStartKey()){break;}

11:sc.addColumn(Bytes.toBytes(列簇名),Bytes.to-Bytes(列限定符));

12:itScanner=env.getRegion().getScanner(sc);

13:List<Cell> cellResults=new ArrayList<Cell>();boolean isHasMore=false;

14:do{

15:isHasMore=itScanner.next(cellResults);

16: for(Cell cell:cellResults){根據(jù)cell的值,確定其所在的區(qū)間后,將其集合列表中對(duì)應(yīng)的記數(shù)加1;}

17:cellResults.clear();}}

區(qū)間統(tǒng)計(jì)協(xié)處理器在對(duì)每個(gè)Region進(jìn)行統(tǒng)計(jì)時(shí),可以根據(jù)Region的StartKey和EndKey來(lái)判斷該Region是否參與統(tǒng)計(jì),當(dāng)進(jìn)行區(qū)間統(tǒng)計(jì)的Start-Key大于Region的EndKey或區(qū)間統(tǒng)計(jì)的EndKey小于Region的StartKey時(shí),可直接跳過(guò)該Region。在如圖3所示的5個(gè)區(qū)間統(tǒng)計(jì)中,Region-A參與區(qū)間統(tǒng)計(jì) 2、3、4,不參與區(qū)間統(tǒng)計(jì) 1、5。在區(qū)間統(tǒng)計(jì)過(guò)程中跳過(guò)不需要進(jìn)行統(tǒng)計(jì)的Region,可以加快掃描速度,提高統(tǒng)計(jì)效率。

圖3 Region統(tǒng)計(jì)邏輯Fig.3 Statistic logic of region

算法中的區(qū)間類(lèi)的定義和EMCStatResponse中的消息LHCount的格式一致。EMCStatEndPoint類(lèi)編譯成功后,將其所在的jar包導(dǎo)出,并上傳到Hadoop集群的HDFS分布式文件系統(tǒng)中,使用alter‘EMCData’,‘coprocessor’=>‘hdfs:///jar包的路徑/jar包|EMCStatEndPoint協(xié)處理器的完整類(lèi)名|表示優(yōu)先級(jí)的整數(shù)值|參數(shù)’命令將協(xié)處理器加載到EMCData表。

2.3 客戶(hù)端調(diào)用

客戶(hù)端區(qū)間統(tǒng)計(jì)業(yè)務(wù)邏輯按EMCStatRequest的消息格式定義協(xié)處理器統(tǒng)計(jì)過(guò)程所需要的參數(shù)后,以Batch Call方式調(diào)用EMCData表的區(qū)間統(tǒng)計(jì)協(xié)處理器,由于Batch Call只負(fù)責(zé)對(duì)每個(gè)Region進(jìn)行區(qū)間統(tǒng)計(jì),所以還需要對(duì)每個(gè)Region的區(qū)間統(tǒng)計(jì)結(jié)果進(jìn)行匯總后輸出,過(guò)程如下:

算法:main(String[]args){

輸入:args[0]:地區(qū)碼,args[1]ssid :站點(diǎn)編碼,若為“-”,表示查詢(xún)所有子站點(diǎn);args[2]startTime-endTime 以-分隔;args[3]列簇名:限定符;args[4]lh-hh:lh-hh表示統(tǒng)計(jì)區(qū)間

輸出:每個(gè)region的統(tǒng)計(jì)結(jié)果;

1:根據(jù)args數(shù)組,將用戶(hù)輸入的參數(shù)地區(qū)碼、站點(diǎn)編碼、統(tǒng)計(jì)時(shí)間段、列簇名、列限定符賦值讀取到對(duì)應(yīng)的變量;

2:將統(tǒng)計(jì)區(qū)間讀取到LHLimit類(lèi)型的集合列表中;構(gòu)造EMCStatRequest消息;

3:long beginTime=System.currentTimeMillis();Configuration config=HBaseConfiguration.create();

4:HTable htb=new HTable(config,“EMCData”);

5:Map<byte[],String> resultMaps=htb.coprocessorService(EMCStatService.class,null,null,

6:new Batch.Call<EMCStatService,String>(){//調(diào)用協(xié)處理器

7:public String call(EMCStatService emcStat){

8:ServerRpcController srController=new ServerRpcController();

9:BlockingRpcCallback<EMCStatResponse> bRpcCb=

10:new BlockingRpcCallback<EMCStatResponse>();

11:emcStat.getEMCStat(controller,request,bRpcCb);

12:EMCStatProtos.EMCStatResponse emcsResponse=bRpcCb.get();//得到Response返回消息

13:if(emcsResponse ! =null){List<LHCount> lstlh-Count=emcsResponse.getLhcList();

14:for(LHCount lhc:lstlhCount){輸出每個(gè) region的統(tǒng)計(jì)結(jié)果;將每個(gè)region的區(qū)間統(tǒng)計(jì)結(jié)果進(jìn)行累加;}}

15:return“”;}});

通過(guò)記錄區(qū)間統(tǒng)計(jì)的開(kāi)始和結(jié)束時(shí)間,得到協(xié)處理器區(qū)間統(tǒng)計(jì)所需的時(shí)間,可以快速地與直接使用Scan操作進(jìn)行區(qū)間統(tǒng)計(jì)所需的時(shí)間進(jìn)行比較。

3 實(shí)驗(yàn)部分

實(shí)驗(yàn)環(huán)境的安裝和配置和文獻(xiàn)[16]中描述的一致。模擬的數(shù)據(jù)寫(xiě)入程序按每個(gè)監(jiān)測(cè)項(xiàng)目,每小時(shí)40~60個(gè)實(shí)時(shí)值寫(xiě)入EMCData表,實(shí)時(shí)數(shù)據(jù)寫(xiě)入完成后,自動(dòng)計(jì)算并寫(xiě)入小時(shí)均值及評(píng)價(jià),全天的小時(shí)均值計(jì)算完成后,自動(dòng)計(jì)算并寫(xiě)入全天的日均值及評(píng)價(jià)。

在數(shù)據(jù)的寫(xiě)入過(guò)程中,當(dāng)Region的數(shù)量分別為 3、5、7、9、11時(shí),對(duì)存儲(chǔ) NO2實(shí)時(shí)濃度數(shù)據(jù)的列RTData 按 0.00~41.00 μg/m3,43.05~82.00 μg/m3,84.05~123.00 μg/m3,125.05~164.00 μg/m3,166.05~205.00 μg/m3進(jìn)行區(qū)間統(tǒng)計(jì),參數(shù)為:地區(qū)碼 4201,站點(diǎn)編碼為空,表示統(tǒng)計(jì)該地區(qū)下的所有站點(diǎn)(9個(gè)子站+城區(qū)),統(tǒng)計(jì)時(shí)間段覆蓋所有Region。為減少客戶(hù)端Scan統(tǒng)計(jì)過(guò)程RPC調(diào)用,可以為Scan操作設(shè)置一個(gè)掃描緩存值,表示一次RPC調(diào)用可以從服務(wù)端讀取的行數(shù),從而減少客戶(hù)端RPC請(qǐng)求次數(shù),但掃描緩存值不能設(shè)置太高,否則會(huì)過(guò)多消耗客戶(hù)端內(nèi)存,嚴(yán)重時(shí)還會(huì)導(dǎo)致內(nèi)存溢出,且延長(zhǎng)next操作的時(shí)間,反而降低了查詢(xún)效率。掃描緩存值的設(shè)置需要在減少RPC請(qǐng)求及客戶(hù)端內(nèi)存消耗之間取得平衡,實(shí)驗(yàn)中將掃描緩存值設(shè)置為256。客戶(hù)端Scan的統(tǒng)計(jì)過(guò)程的具體實(shí)現(xiàn)算法類(lèi)似于區(qū)間統(tǒng)計(jì)協(xié)處理器,這里不再具體描述。各區(qū)間值統(tǒng)計(jì)的結(jié)果,使用協(xié)處理器進(jìn)行區(qū)間統(tǒng)計(jì)及客戶(hù)端Scan進(jìn)行統(tǒng)計(jì)所需的時(shí)間如表1所示,時(shí)間對(duì)比如圖4所示。

表1 實(shí)驗(yàn)結(jié)果Tab.1 Experimental results

圖4 協(xié)處理器和客戶(hù)端Scan區(qū)間統(tǒng)計(jì)時(shí)間對(duì)比Fig.4 Time comparison of interval statistics by co-processor and client Scan

從實(shí)驗(yàn)結(jié)果分析,隨著區(qū)間統(tǒng)計(jì)需要掃描Region數(shù)量的增長(zhǎng),客戶(hù)端Scan統(tǒng)計(jì)所需的時(shí)間呈直線增長(zhǎng),而使用協(xié)處理器所需的時(shí)間則增長(zhǎng)平緩,且當(dāng)Region數(shù)量較少時(shí),時(shí)間幾乎沒(méi)有增長(zhǎng)。使用協(xié)處理器進(jìn)行區(qū)間統(tǒng)計(jì)較使用客戶(hù)端Scan至少快一個(gè)數(shù)量級(jí)(10倍)。

4 結(jié) 語(yǔ)

在服務(wù)器端使用Endpoint協(xié)處理器對(duì)城市空氣質(zhì)量監(jiān)測(cè)數(shù)據(jù)進(jìn)行常規(guī)的統(tǒng)計(jì)工作,會(huì)顯著的減少統(tǒng)計(jì)所需的時(shí)間。理論上,若HBase的數(shù)據(jù)表在Hadoop集群的每個(gè)數(shù)據(jù)節(jié)點(diǎn)上的Region數(shù)量相同,且每個(gè)Region的大小相同,由于可以進(jìn)行并行計(jì)算,此時(shí)Endpoint協(xié)處理器的工作效率達(dá)到最佳,這也是實(shí)驗(yàn)中,當(dāng)Region的數(shù)量較少時(shí),區(qū)間統(tǒng)計(jì)的時(shí)間幾乎沒(méi)有增長(zhǎng)的原因。但隨著數(shù)據(jù)的增長(zhǎng),Region的不斷分裂導(dǎo)致其數(shù)量的增加,Region在每個(gè)數(shù)據(jù)節(jié)點(diǎn)上的數(shù)量不再相同,數(shù)據(jù)在各個(gè)Region上的分布也不再均衡,實(shí)驗(yàn)中在進(jìn)行區(qū)間統(tǒng)計(jì)時(shí),客戶(hù)端和ZooKeeper服務(wù)進(jìn)行RPC通訊時(shí)會(huì)出現(xiàn)延遲阻塞的現(xiàn)象,從而導(dǎo)致Region數(shù)量從7增加到9時(shí),區(qū)間統(tǒng)計(jì)所需時(shí)間發(fā)生突變(增加近3倍)。如何有效解決這一問(wèn)題,將是今后的研究方向。

猜你喜歡
服務(wù)端調(diào)用空氣質(zhì)量
烏海市霧對(duì)空氣質(zhì)量的影響
核電項(xiàng)目物項(xiàng)調(diào)用管理的應(yīng)用研究
系統(tǒng)虛擬化環(huán)境下客戶(hù)機(jī)系統(tǒng)調(diào)用信息捕獲與分析①
多人聯(lián)機(jī)對(duì)戰(zhàn)游戲的設(shè)計(jì)與實(shí)現(xiàn)
基于三層結(jié)構(gòu)下機(jī)房管理系統(tǒng)的實(shí)現(xiàn)分析
基于三層結(jié)構(gòu)下機(jī)房管理系統(tǒng)的實(shí)現(xiàn)分析
車(chē)內(nèi)空氣質(zhì)量標(biāo)準(zhǔn)進(jìn)展
重視車(chē)內(nèi)空氣質(zhì)量工作 制造更環(huán)保、更清潔、更健康的汽車(chē)
利用RFC技術(shù)實(shí)現(xiàn)SAP系統(tǒng)接口通信
C++語(yǔ)言中函數(shù)參數(shù)傳遞方式剖析
陆河县| 祁门县| 元江| 临汾市| 蒲城县| 项城市| 达拉特旗| 石屏县| 桂平市| 乌兰察布市| 伊金霍洛旗| 柞水县| 东港市| 宜阳县| 竹山县| 中宁县| 台东市| 昌平区| 徐水县| 太仆寺旗| 固安县| 瓮安县| 连州市| 民县| 邹城市| 颍上县| 元江| 临清市| 伊春市| 西昌市| 屏东县| 长岛县| 石景山区| 柳江县| 高邑县| 黄石市| 会宁县| 德清县| 华安县| 诏安县| 永丰县|