摘 要:針對電信大數(shù)據(jù)在流動人口統(tǒng)計中的處理需求,采用Intel?Hadoop發(fā)行版,設(shè)計Hive數(shù)據(jù)倉庫并進(jìn)行優(yōu)化,重點對性能影響較大的join連接和數(shù)據(jù)傾斜問題進(jìn)行了優(yōu)化。實驗表明,對于TB級數(shù)據(jù),簡單統(tǒng)計如count、sum等可在10分鐘以內(nèi)完成,聚合統(tǒng)計如join、group by等可在30分鐘左右完成,能有效支撐大數(shù)據(jù)環(huán)境下的流動人口統(tǒng)計和監(jiān)測。
關(guān)鍵詞:Hive;優(yōu)化;join;數(shù)據(jù)傾斜
中圖分類號:TP301 文獻(xiàn)標(biāo)識碼:A
1 引言(Introduction)
電信運營商在移動通信業(yè)務(wù)運營過程中,獲取了大量客觀、真實的用戶歷史數(shù)據(jù),這些歷史數(shù)據(jù)可以客觀反映用戶的消費行為,也可以反映影響用戶消費行為的內(nèi)外部因素的變化情況[1]。根據(jù)移動通信客戶的來話與去話等話務(wù)信息,結(jié)合客戶身份資料,可以實現(xiàn)對特定區(qū)域人口的流入、流出情況及流動類型等進(jìn)行分析。
然而,基于移動通信數(shù)據(jù)的流動人口統(tǒng)計面臨諸多挑戰(zhàn):①數(shù)據(jù)源多樣化:CDR(語音、SMS、GPRS、3G、4G等)、計費信息、客戶信息、基站參數(shù)等;②數(shù)據(jù)量大:高達(dá)360TB原始數(shù)據(jù)(某省電信公司);③數(shù)據(jù)增長快速:2TB/天。通信數(shù)據(jù)呈現(xiàn)出大數(shù)據(jù)的特征,既有的技術(shù)架構(gòu)和路線,已不能處理如此海量的電信數(shù)據(jù)。
近年來,涌現(xiàn)出了眾多的大數(shù)據(jù)處理架構(gòu),其中Hadoop開源架構(gòu)應(yīng)用最廣泛,在移動、電信等部門通過部署Hadoop架構(gòu)開展電信大數(shù)據(jù)服務(wù)取得了一定的成效。本文針對電信大數(shù)據(jù)在流動人口統(tǒng)計中的處理需求,采用Intel?Hadoop發(fā)行版,設(shè)計Hive數(shù)據(jù)倉庫并進(jìn)行優(yōu)化,重點對性能影響較大的join連接和數(shù)據(jù)傾斜問題進(jìn)行了優(yōu)化,實現(xiàn)海量數(shù)據(jù)的高效查詢和統(tǒng)計,滿足流動人口的快速統(tǒng)計和分析。
2 Hive數(shù)據(jù)倉庫設(shè)計(Hive data warehouse design)
移動通信大數(shù)據(jù)的流動人口業(yè)務(wù)需求分析:移動通信數(shù)據(jù)的抽取、轉(zhuǎn)換和導(dǎo)入;基于日、月、年的報表統(tǒng)計和數(shù)據(jù)規(guī)模;數(shù)據(jù)倉庫30TB數(shù)據(jù)。現(xiàn)方案采用10臺服務(wù)器,以實現(xiàn)數(shù)據(jù)的高速裝載、查詢和統(tǒng)計分析,如圖1所示。
圖1 Hive數(shù)據(jù)倉庫設(shè)計
Fig.1 Hive data warehouse design
Hive是一個建立在Hadoop之上的數(shù)據(jù)倉庫,用于查詢和分析結(jié)構(gòu)化海量數(shù)據(jù)。采用HDFS進(jìn)行數(shù)據(jù)存儲和Map/Reduce進(jìn)行數(shù)據(jù)操作?;咎攸c包括:
(1)提供類似于SQL的查詢語言。
(2)高擴(kuò)展性(scale-out),動態(tài)擴(kuò)容無須停機(jī)。
(3)針對海量數(shù)據(jù)的高性能查詢和分析系統(tǒng)。
(4)提供靈活的擴(kuò)展性。
(5)復(fù)雜數(shù)據(jù)類型,擴(kuò)展函數(shù)和腳本等。
在運行count、sum等聚合函數(shù)進(jìn)行統(tǒng)計計算時發(fā)現(xiàn),將數(shù)據(jù)從普通數(shù)據(jù)庫導(dǎo)入Hive中,分區(qū)的個數(shù)以及各分區(qū)數(shù)據(jù)量的均衡性會影響Hive的性能。解決辦法就是給導(dǎo)入的表增加一個自增的int類型的字段,用這個字段來進(jìn)行數(shù)據(jù)分割,最后得到的分區(qū)就是均衡的,如圖2所示。
圖2 數(shù)據(jù)分區(qū)
Fig.2 Data partition
3 Hive性能分析和優(yōu)化(Hive performance analysis
and optimization)
Hadoop的分配優(yōu)化主要包含以下三個層面:①底層Map和Reduce的參數(shù)調(diào)優(yōu);②Hive內(nèi)部邏輯優(yōu)化;③SQL代碼邏輯優(yōu)化。
3.1 Map/Reduce端的優(yōu)化
Map/Reduce端的優(yōu)化主要通過分析各個可調(diào)參數(shù)在Map/Reduce任務(wù)運行過程中起到的作用,通過改變參數(shù)大小優(yōu)化底層分配策略。
表1 Map side調(diào)優(yōu)參數(shù)表
Tab.1 Tuning parameter table of map side
選項 類型 默認(rèn)值 描述
io.sort.mb Int 100 緩存Map中間結(jié)果的buffer大?。╥n MB)
io.sort.record.percent float 0.05 io.sort.mb中間來保存Map output記錄邊界的百分比
io.sort.spill.percent float 0.80 Map開始做spill操作的閥值
io.sort.factor Int 10 做merge操作時同時操作的stream數(shù)的上限
min.num.spill.for.combine Int 3 Combiner函數(shù)運行的最小spill數(shù)
Mapred.compress.map.output Boolean False Map中間結(jié)果是否采用壓縮
Mapred.map.output.compression.codec Class name Org.apache.Haddoop.io.compress.defaultcodec Map中間結(jié)果的壓縮格式
表2 Reduce side調(diào)優(yōu)參數(shù)表
Tab.2 Tuning parameter table of Reduce side
選項 類型 默認(rèn)值 描述
Mapred.reduce.parallel.copies Int 5 每個Reduce并行下載Map結(jié)果的最大線程數(shù)
Mapred.reduce.copy.backoff Int 300 Reduce下載線程最大等待時間(in sec)
io.sort.factor Int 10 同上
Mapred.job.shuffle.input.buffer.percent Float 0.7 用來緩存shuffle數(shù)據(jù)的Reduce task heap百分比
Mapred.job.reduce.input.buffer.percent Float 0.0 Sort完成后Reduce計算階段緩存數(shù)據(jù)的百分比
Mapred.job.shuffle.merge.percent Float 0.66 緩存占內(nèi)存多少百分比后做merge操作
3.2 Hive內(nèi)部邏輯優(yōu)化和代碼邏輯優(yōu)化
Hive使用HQL(Hibernate Query Language),HQL不僅提供了類似標(biāo)準(zhǔn)SQL語句的查詢方式,而且提供更加豐富靈活、更為強(qiáng)大的查詢能力,允許用戶自定義Mapper和Reducer來處理更為復(fù)雜的查詢分析任務(wù)。導(dǎo)致Hive性能不佳的原因有兩個:①沒有索引支持,查詢需要暴力掃描全表;②在處理小量數(shù)據(jù)時Map/Reduce框架耗費資源比例過大,即Map/Reduce框架本身具有較高的延遲,導(dǎo)致基于此框架下的HQL查詢也體現(xiàn)高延遲性。優(yōu)化思路:
由于Hive的HQL語言是自動轉(zhuǎn)化為Map/Reduce程序進(jìn)行執(zhí)行的。每個job對應(yīng)一個Map/Reduce框架,所以盡可能減少job的個數(shù)可以減少執(zhí)行時間。
Map/Reduce有其數(shù)據(jù)特性,Hive也有優(yōu)化約定,所以編寫Hive語言時需注意一些規(guī)則,才能提高查詢效率。
本文對性能影響較大的join多表連接和數(shù)據(jù)傾斜等問題實施優(yōu)化。
3.2.1 join優(yōu)化
Hive只支持等值連接(equality joins)、外連接(outer joins)和左半連接(left semi joins)。
join時,每次Map/Reduce任務(wù)的執(zhí)行過程如下:reducer會緩存join序列中前面所有表的記錄,然后通過最后一個表將結(jié)果序列化到HDFS。這有助于減少在reduce端內(nèi)存的使用量。無論是外關(guān)聯(lián)outer join還是內(nèi)關(guān)聯(lián)inner join,如果join的key相同,無論jion多少個表都會合并成一個Map/Reduce任務(wù)。
查詢時,應(yīng)該盡量將小表放在join的左邊,否則會因為緩存浪費大量內(nèi)存。例如:
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key1)
三個表使用同一個join key,生成一次Map/Reduce任務(wù)計算。Reduce端先緩存x表和y表的記錄,然后每次取得z表中的一個記錄就計算一次join結(jié)果。
兩次Map/Reduce任務(wù):
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key2)
生成兩次Map/Reduce任務(wù):第一次緩存x表,用y表序列化;第二次緩存第一次Map/Reduce計算的結(jié)果,然后用z表序列化。
Hive不支持where子句中的子查詢,SQL常用的IN/EXISTS子句需要改寫。IN/EXISTS子查詢在HIVE中一個更高效的實現(xiàn)是利用LEFT SEMI JOIN重寫子查詢語句。LEFT SEMI JOIN的限制是,JOIN右邊的表不能在WHERE子句、SELECT子句或其他地方過濾,只能在ON子句中設(shè)置過濾條件。
SELECT x.key, x.value FROM x WHERE x.key in (SELECT y.key FROM y)
被重寫為:
SELECT x.key, x.val FROM x LEFT SEMI JOIN y on (x.key=y.key)
對于多個子查詢SQL無關(guān)且計算量過大的SQL,可以開啟并行執(zhí)行MR任務(wù),減少計算壓力。
hive.exec.parallel[=true]
hive.exec.parallel.thread.number[=8]
hive.exec.parallel可以控制一個SQL中多個可并行執(zhí)行的job的運行方式。當(dāng)hive.exec.parallel為true的時候,同一個SQL中可以并行執(zhí)行的job會并發(fā)的執(zhí)行。參數(shù)hive.exec.parallel.thread.number就是控制對于同一個SQL來說同時可以運行的job的最大值,該參數(shù)默認(rèn)為8。此時最大可以同時運行8個job。通過修改參數(shù)hive.exec.parallel和hive.exec.parallel.thread.number測試不同情況的執(zhí)行速度,實現(xiàn)性能優(yōu)化和負(fù)載均衡。
3.2.2 數(shù)據(jù)傾斜問題的解決
數(shù)據(jù)傾斜表現(xiàn):任務(wù)進(jìn)度長時間維持在99%(或100%),查看任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量Reduce子任務(wù)未完成。因為其處理的數(shù)據(jù)量和其他Reduce差異過大。單一Reduce的記錄數(shù)與平均記錄數(shù)差異過大,通常可能達(dá)到三倍甚至更多。最長時長遠(yuǎn)大于平均時長。造成數(shù)據(jù)傾斜的主要原因:①key分布不均勻;②業(yè)務(wù)數(shù)據(jù)本身的特性;③建表時考慮不周;④某些SQL語句本身就有數(shù)據(jù)傾斜。
表3 數(shù)據(jù)傾斜
Tab.3 Data skew
關(guān)鍵詞 情形 后果
Join 其中一個表較小,但是key集中 分發(fā)到某一個或幾個Reduce上的數(shù)據(jù)遠(yuǎn)高于平均值
大表與大表,但是分桶的判斷字段0值或空值過多 這些空值都由一個Reduce處理,非常慢
group by group by維度過小,
某特殊值過多 處理某值的Reduce非常耗時
Count Distinct 某值的數(shù)量過多 處理此特殊值的Reduce耗時
(1)參數(shù)調(diào)節(jié)
hive.map.aggr=true
Map 端部分聚合,相當(dāng)于Combiner。
hive.groupby.skewindata=true
數(shù)據(jù)傾斜聚合優(yōu)化,設(shè)置參數(shù)hive.groupby.skewindata=true,控制生成兩個MR Job,第一個MR Job中,Map的輸出結(jié)果會隨機(jī)分配到reduce做一次預(yù)匯總,減少某些key值條數(shù)過多或某些key值條數(shù)過少而造成的數(shù)據(jù)傾斜問題。
(2)SQL語句調(diào)節(jié)
如何Join:關(guān)于驅(qū)動表的選取,選用join key分布最均勻的表作為驅(qū)動表,做好列裁剪和filter操作,以達(dá)到兩表做join的時候,數(shù)據(jù)量相對變小的效果。
大小表Join:使用map join讓小的維度表先進(jìn)內(nèi)存。在map端完成reduce。
大表Join大表:把空值的key變成一個字符串加上隨機(jī)數(shù),把傾斜的數(shù)據(jù)分到不同的reduce上,由于null值關(guān)聯(lián)不上,處理后并不影響最終結(jié)果。
count distinct大量相同特殊值:count distinct時,將特殊值單獨處理。如果還有其他計算需要進(jìn)行g(shù)roup by,可以先將特殊值的記錄單獨處理,再和其他計算結(jié)果進(jìn)行union。
group by維度過?。翰捎胹um() group by的方式來替換count(distinct)完成計算。
舉例:空值產(chǎn)生的數(shù)據(jù)傾斜
日志中,常會有信息丟失的問題,比如log中的user_id和users表中的user_id關(guān)聯(lián),會碰到數(shù)據(jù)傾斜的問題。
解決方法1:user_id為空的不參與關(guān)聯(lián)
select * from log x join users y on x.user_id is not null and x.user_id=y.user_id union all select * from log x where x.user_id is null;
解決方法2:空值的key變成一個字符串加上隨機(jī)數(shù)形成新的key值
select * from log x left outer join users y on case when x.user_id is null then concat(‘hive,rand() ) else x.user_id end=y.user_id;
方法2比方法1效率更高,IO和作業(yè)數(shù)都少了。方法1中l(wèi)og讀取兩次,jobs數(shù)是2,方法2中job數(shù)是1。以上優(yōu)化方法適合無效id,比如-99,“”,null等產(chǎn)生的傾斜問題。
4 結(jié)論(Conclusion)
針對移動通信數(shù)據(jù)中的流動人口統(tǒng)計業(yè)務(wù)需求,設(shè)計Hive數(shù)據(jù)倉庫并進(jìn)行優(yōu)化,重點對性能影響較大的join連接和數(shù)據(jù)傾斜問題進(jìn)行了優(yōu)化,實現(xiàn)了數(shù)據(jù)的高速查詢和統(tǒng)計。簡單統(tǒng)計,如count,sum等10分鐘以內(nèi)完成;聚合統(tǒng)計,如join、group by等30分鐘左右完成,高效完成了流動人口的統(tǒng)計。
參考文獻(xiàn)(References)
[1] 智勇.基于移動通信信息資源的人口流動趨勢研究[J].山東社 會科學(xué),2013(5):102-105.
[2] 王大力.基于移動通信數(shù)據(jù)處理的公安流動人口管理系統(tǒng)設(shè) 計與研究[D].同濟(jì)大學(xué),2012.
[3] 朱珠.基于Hadoop的海量數(shù)據(jù)處理模型研究和應(yīng)用[D].北京 郵電大學(xué),2011.
作者簡介:
周天綺(1976-),男,碩士,講師.研究領(lǐng)域:大數(shù)據(jù)處理.