王 艷,潘晨光
(公安部第一研究所,北京 100048)
實(shí)時(shí)計(jì)算是不斷獲取、計(jì)算和分析大流量數(shù)據(jù),迅速洞察變化原委,自動(dòng)化響應(yīng)變化的數(shù)據(jù)[1]。交互式即席查詢和報(bào)表查詢面臨整合異構(gòu)數(shù)據(jù)源,統(tǒng)一元數(shù)據(jù)存儲(chǔ)和大規(guī)模迭代運(yùn)算模型等難點(diǎn)。公共安全領(lǐng)域積累了大量的人員、案件、軌跡和社會(huì)行為等數(shù)據(jù)信息。實(shí)時(shí)分析和計(jì)算這些持續(xù)大流量的公共安全數(shù)據(jù)是巨大的挑戰(zhàn)。
Hadoop上的Hive追求高吞吐量,導(dǎo)致時(shí)間延遲較高。Hive可支持百億級(jí)的數(shù)據(jù)量,但很難應(yīng)對(duì)秒級(jí)響應(yīng)的需求,只適合做分鐘級(jí)別的離線分析系統(tǒng)而不支持實(shí)時(shí)分析系統(tǒng)[2-3]。Hive的缺陷導(dǎo)致其不能滿足業(yè)務(wù)高速發(fā)展所帶來(lái)的實(shí)時(shí)和高維的數(shù)據(jù)處理需求,但公共安全的情報(bào)分析需實(shí)時(shí)獲取當(dāng)前正在發(fā)生的案件和嫌疑人的狀況。
如何基于公共安全數(shù)據(jù)構(gòu)建大數(shù)據(jù)查詢系統(tǒng)實(shí)現(xiàn)關(guān)系查詢和實(shí)時(shí)跟蹤,是公共安全大數(shù)據(jù)迫切需要解決的問(wèn)題。本文提出了HDFS和Impala相結(jié)合的架構(gòu),搭建了存儲(chǔ)海量數(shù)據(jù)的分布式文件系統(tǒng),實(shí)現(xiàn)了交互式數(shù)據(jù)查詢和分析,提供即席查詢的功能,便于快速獲取數(shù)據(jù)和決策支持?;贖DFS和Impala構(gòu)建大數(shù)據(jù)查詢系統(tǒng)可提供統(tǒng)一的元數(shù)據(jù)訪問(wèn)和管理接口,支持SQL查詢優(yōu)化、列存儲(chǔ)、查詢謂詞下推、高效壓縮技術(shù)、預(yù)先計(jì)算、高效索引和并行查詢等,可按照時(shí)間、空間和業(yè)務(wù)進(jìn)行分層和元數(shù)據(jù)管理,方便構(gòu)建兼容應(yīng)用。
為解決公共安全大數(shù)據(jù)的實(shí)時(shí)查詢問(wèn)題,本文首次并創(chuàng)新性地將Impala計(jì)算引擎應(yīng)用于公共安全大數(shù)據(jù)的智能分析,整合非結(jié)構(gòu)化、半結(jié)構(gòu)化和結(jié)構(gòu)化的數(shù)據(jù)存儲(chǔ)和分析,設(shè)計(jì)了數(shù)據(jù)存儲(chǔ)組織結(jié)構(gòu)和數(shù)據(jù)分層策略,盡量隱藏查詢對(duì)原始文件訪問(wèn)的需求,即席查詢共享存儲(chǔ)、統(tǒng)一計(jì)算,可擴(kuò)展性強(qiáng),實(shí)現(xiàn)了以人查案和以案找人的業(yè)務(wù)功能,取得了較好的實(shí)戰(zhàn)效果。
分布式文件系統(tǒng)適合處理非結(jié)構(gòu)化數(shù)據(jù),而已存儲(chǔ)在數(shù)據(jù)庫(kù)中的數(shù)據(jù)是結(jié)構(gòu)化的,結(jié)構(gòu)化數(shù)據(jù)轉(zhuǎn)換為非結(jié)構(gòu)化數(shù)據(jù)會(huì)丟失很多重要價(jià)值的信息[4]。MapReduce是基于磁盤進(jìn)行數(shù)據(jù)處理,每次計(jì)算要經(jīng)歷從磁盤讀取數(shù)據(jù)、計(jì)算數(shù)據(jù)和保存數(shù)據(jù)等階段,導(dǎo)致運(yùn)行過(guò)程復(fù)雜,迭代任務(wù)時(shí)效低,不適合對(duì)延時(shí)要求高的交互式分析或復(fù)雜迭代的數(shù)據(jù)分析任務(wù)。
Hive是面向行存儲(chǔ)的數(shù)據(jù)庫(kù),不存儲(chǔ)和計(jì)算數(shù)據(jù),底層執(zhí)行依賴MapReduce引擎,不能解決已有關(guān)系數(shù)據(jù)庫(kù)中數(shù)據(jù)的遷移和查詢操作[5]。運(yùn)行機(jī)制是將結(jié)構(gòu)化數(shù)據(jù)文件映射為數(shù)據(jù)庫(kù)表,提供類SQL查詢,并將SQL語(yǔ)句轉(zhuǎn)換為MapReduce任務(wù)運(yùn)行[6]。查詢先轉(zhuǎn)化為映射-歸約作業(yè),再提交給集群以批量方式執(zhí)行。MapReduce調(diào)度只適合批量和周期長(zhǎng)的任務(wù),類似查詢結(jié)構(gòu)化數(shù)據(jù)的業(yè)務(wù)效率低,Hive的運(yùn)行機(jī)制導(dǎo)致查詢速度慢[7]。Hive的缺陷原理如表1所示。
表1 Hive的缺陷原理表
Impala是Cloudera參考Google Dremel思想實(shí)現(xiàn)交互SQL大數(shù)據(jù)查詢,支持Parquet列存儲(chǔ)格式,結(jié)構(gòu)嵌套記錄轉(zhuǎn)換成列存儲(chǔ)[8],高效狀態(tài)機(jī)實(shí)現(xiàn)記錄正向和反向轉(zhuǎn)換,減少了查詢數(shù)據(jù)量;支持多層樹,查詢樹根節(jié)點(diǎn)接收查詢,底層節(jié)點(diǎn)獲取數(shù)據(jù)執(zhí)行查詢,使任務(wù)在數(shù)千個(gè)節(jié)點(diǎn)上并行執(zhí)行和聚合;采用推送方式傳輸數(shù)據(jù),分散了網(wǎng)絡(luò)壓力,提高了任務(wù)的執(zhí)行效率。
Impala主要分為Impalad、StateStore和CLI等模塊[9]。Impalad與DataNode在相同節(jié)點(diǎn)上運(yùn)行,接收查詢請(qǐng)求Coordinator。通過(guò)JNI調(diào)用Java前端解釋SQL查詢語(yǔ)句,生成查詢計(jì)劃樹,通過(guò)調(diào)度器把執(zhí)行計(jì)劃分發(fā)給數(shù)據(jù)對(duì)應(yīng)的Impalad運(yùn)行,讀寫數(shù)據(jù)并行執(zhí)行查詢。StateStore跟蹤集群中Impalad運(yùn)行狀態(tài)和位置信息,創(chuàng)建多線程處理注冊(cè)訂閱和心跳檢測(cè)。進(jìn)程離線后,進(jìn)入recovery模式反復(fù)注冊(cè);進(jìn)程重新加入集群后,自動(dòng)恢復(fù)正常,更新緩存數(shù)據(jù)。CLI提供查詢的命令行、Hue、JDBC和ODBC使用接口。查詢的執(zhí)行過(guò)程如圖1所示。
圖1 Impala的運(yùn)行架構(gòu)
1)客戶端SQL查詢通過(guò)ODBC發(fā)送到集群內(nèi)任一Impalad。查詢規(guī)劃器采用Jflex和CUP解析SQL語(yǔ)句,解析查詢請(qǐng)求為多個(gè)執(zhí)行片段發(fā)送至查詢協(xié)調(diào)器,查詢節(jié)點(diǎn)單獨(dú)原子執(zhí)行相關(guān)操作。
2)查詢規(guī)劃器初始化Impalad執(zhí)行任務(wù),RDBMS存儲(chǔ)表的元數(shù)據(jù)信息。進(jìn)程StateStored調(diào)度查詢請(qǐng)求,分發(fā)metadata數(shù)據(jù),提供對(duì)外的Thrift服務(wù),存儲(chǔ)集群中進(jìn)程的資源。
3)查詢協(xié)調(diào)器執(zhí)行聚合函數(shù)Limit n,截取Top-n,完成局部Aggregation回傳結(jié)果至客戶端。查詢工作引擎通過(guò)流式交換輸出,協(xié)調(diào)客戶端提交查詢請(qǐng)求,分配任務(wù)至其他Impalad并收集執(zhí)行結(jié)果。Impalad執(zhí)行分配的任務(wù),操作本地HDFS和HBase的數(shù)據(jù)完成查詢請(qǐng)求。
由于Hive本身的缺陷,本文提出了采用Impala直接為存儲(chǔ)在HDFS中的數(shù)據(jù)提供快速、交互式SQL查詢的技術(shù)方案。HDFS和Impala結(jié)合的原理是把HDFS接入Impala后端作為存儲(chǔ)引擎,直接從HDFS獲取查詢所需數(shù)據(jù),請(qǐng)求被解析成片段調(diào)度至相應(yīng)節(jié)點(diǎn)上執(zhí)行,某些源數(shù)據(jù)或中間數(shù)據(jù)存放在HDFS中[10]。Impala把多個(gè)執(zhí)行計(jì)劃分配到內(nèi)存中并行執(zhí)行,高效I/O調(diào)度和優(yōu)化的LLVM本地代碼完成初始化,中間結(jié)果在進(jìn)程間進(jìn)行流式回傳。HDFS和Impala架構(gòu)比MapReduce和Hive架構(gòu)的優(yōu)勢(shì)分析如表2所示。
表2 HDFS和Impala結(jié)合架構(gòu)的優(yōu)勢(shì)分析表
HDFS和Impala架構(gòu)的優(yōu)勢(shì)體現(xiàn)在:1)Impala直接在HDFS中存取數(shù)據(jù),不必把中間過(guò)程寫入磁盤,節(jié)省了大量I/O開銷;2)減小了MapReduce的啟動(dòng)作業(yè)開銷,Impala直接從對(duì)應(yīng)服務(wù)進(jìn)程進(jìn)行作業(yè)調(diào)度,提高了執(zhí)行效率;3)去掉MapReduce不太適合做SQL查詢的范式,Impala支持實(shí)時(shí)分析的MPP查詢引擎,降低了不必要的shuffle和sort等開銷;4)采用LLVM統(tǒng)一編譯代碼,減少了通用編譯的開銷;5)支持?jǐn)?shù)據(jù)的I/O調(diào)度機(jī)制,盡量將數(shù)據(jù)分布到所在節(jié)點(diǎn)內(nèi)存中并行完成,省去了大量I/O網(wǎng)絡(luò)開銷。
Impala由JAVA前端與C++后端組成,接收客戶端連接進(jìn)行查詢的Coordinator,通過(guò)JNI接口調(diào)用JAVA前端對(duì)查詢SQL分析生成執(zhí)行計(jì)劃樹。JAVA前端的執(zhí)行計(jì)劃樹以Thrift數(shù)據(jù)格式回傳Impala C++后端。其原子操作由計(jì)劃片段表示,查詢語(yǔ)句可由多個(gè)片段組成,片段0表示執(zhí)行樹的根,匯聚結(jié)果回傳查詢,執(zhí)行樹的葉子結(jié)點(diǎn)由Scan操作,可分布式并行執(zhí)行。
數(shù)據(jù)存儲(chǔ)信息通過(guò)Libhdfs與HDFS進(jìn)行交互,通過(guò)HDFSGetHosts方式獲取文件數(shù)據(jù)塊所在節(jié)點(diǎn)位置信息,Simplescheduler由Round-robin算法實(shí)現(xiàn),通過(guò)調(diào)度器Exec對(duì)生成執(zhí)行計(jì)劃樹分配給對(duì)應(yīng)的后端執(zhí)行器執(zhí)行。調(diào)用GetNext方法獲取計(jì)算結(jié)果,執(zhí)行insert語(yǔ)句將計(jì)算結(jié)果通過(guò)Libhdfs寫回HDFS。Shuffle Join有穩(wěn)定性能,適用大型復(fù)雜關(guān)聯(lián)操作。其流程框圖如圖2所示。
圖2 Impala查詢請(qǐng)求的流程控制
Broadcast Join將右表作小表分發(fā)在Join,Shuffle Join是分發(fā)后左表驅(qū)動(dòng)右表進(jìn)行Join。嵌套類型數(shù)據(jù)Parquet列存儲(chǔ)格式及擴(kuò)展SQL查詢語(yǔ)義通過(guò)基于LLVM的Just-In-Time運(yùn)行時(shí)代碼生成,查詢以最大CPU速度執(zhí)行,能快速擴(kuò)展系統(tǒng)功能。Parquet格式實(shí)現(xiàn) Dictionary Encoding、Bit Packing、Delta Encoding、Run-Length Encoding等壓縮技術(shù),過(guò)濾無(wú)關(guān)數(shù)據(jù)減少I/O。Run Length Encoding在列壓縮中減少3個(gè)數(shù)量級(jí)存儲(chǔ),提升2~3個(gè)數(shù)量級(jí)的內(nèi)存應(yīng)用,Dictionary Encoding對(duì)磁盤空間的占用約為之前的1/20,對(duì)內(nèi)存的占用約為之前的1/5。
碰撞比對(duì)算法的應(yīng)用是對(duì)嫌疑人多種信息進(jìn)行分析處理,查找與嫌疑人或案件的相關(guān)信息、活動(dòng)軌跡和網(wǎng)絡(luò)行為等。為滿足碰撞比對(duì)的需要,將數(shù)據(jù)從HDFS同步到Impala的表中。Impala上運(yùn)行CURE聚類算法設(shè)計(jì)是將改進(jìn)的CURE聚類算法對(duì)訓(xùn)練集進(jìn)行聚類,對(duì)簇進(jìn)行標(biāo)識(shí)基于矩形的建模建立相關(guān)性模型,將待檢測(cè)數(shù)據(jù)與該模型進(jìn)行碰撞比對(duì)。若符合該模型則是與嫌疑人相關(guān)的數(shù)據(jù),否則判斷為與嫌疑人不相關(guān)的數(shù)據(jù)。
CURE算法是自下而上的層次聚類,用定量特征點(diǎn)來(lái)表示簇,合并相鄰簇直到簇的數(shù)目在特定閾值范圍內(nèi)。由于簇的個(gè)數(shù)無(wú)法提前預(yù)設(shè),需對(duì)多個(gè)簇進(jìn)行強(qiáng)制合并或把簇強(qiáng)行分割,影響聚類效果。為提高聚類的質(zhì)量,本文提出將聚合條件設(shè)定為相鄰簇間距離達(dá)到設(shè)定閾值時(shí)聚類形成,簇間相似度決定簇的個(gè)數(shù)。Impala處理大數(shù)據(jù)量時(shí),CURE聚類算法采用隨機(jī)取樣數(shù)據(jù)技術(shù),分區(qū)聚類后將局部聚類的中間結(jié)果進(jìn)行分析得到最后結(jié)果。先局部后整體的方法應(yīng)用到分布式Impala系統(tǒng)中,CURE聚類算法可高效處理海量數(shù)據(jù)。Impala上改進(jìn)CURE算法描述如下:
Dis(X1,X2)表示X1和X2間的距離,其距離度量是歐幾里得距離、曼哈頓距離或閔可夫距離等,本文采用歐幾里得距離。X1和X2是簇時(shí),定義Dis(X1,X2)為相鄰簇中特征點(diǎn)間的距離,即Di(s X1,X2)=min{Di(s ri,r)j,ri∈Q(X1),rj∈Q(X2)}。
步驟1,輸入<key,value>,從源數(shù)據(jù)集中抽取隨機(jī)樣本S,向量di創(chuàng)建簇Ci,實(shí)現(xiàn)S={C1,C2,…,Cn},Q(Ci),Q(Ci)={di}。
步驟2,將樣本S分割,若|S|<2,終止。
步驟3,將S聚類,找出簇集S中特征點(diǎn)相鄰距離的簇Cu、Cv,Dis(Ci,Cj)=min{Dis(Ci,Cj),Ci∈S,Cj∈S,i≠j}。若Dis(Cu、Cv)>w,終止。
步驟4,隨機(jī)取樣剔除孤立點(diǎn),合并簇Cu和Cv,Cnew←Cu?Cv,tmpSet←φ ,計(jì)算 Cnew的 中 心 :
步驟5,對(duì)局部簇聚類,合并距離近的簇,從Cnew中選擇di,若 tmpSet=φ ,Dis(di,tmpSet)=max{dist(dj,tmpSet),dj∈ Cnew},Dis( dj,tmpSet )=min{Dis( di,dk) ,dk∈ tmpSet},將 di并入tmpSet,tmpSet←tmpSet?{di}。
步驟6,簇標(biāo)簽標(biāo)記數(shù)據(jù),若|tmpSet|<min{|Cnew|,λ},執(zhí)行步驟5。
步驟 7,輸出<key,value>收縮代表點(diǎn):Q(Cnew)←{dk+a*(hnew-dk∈ tmpSet),dk},更 新 簇 集 S ← SCu-Cv+Cnew,執(zhí)行步驟2。KD數(shù)存放數(shù)據(jù)點(diǎn),小頂堆存放簇,將簇按照與其最近鄰簇間距離升序排序。
Hadoop平臺(tái)下使用Hive類SQL語(yǔ)句實(shí)現(xiàn)不同粒度的聚合,類SQL語(yǔ)句會(huì)轉(zhuǎn)化為Map和Reduce任務(wù)去執(zhí)行,在某粒度上聚合實(shí)際數(shù)據(jù)時(shí)會(huì)造成的較大開銷,而Hive無(wú)法一次性實(shí)現(xiàn)多粒度融合。為提高在不同粒度的查詢響應(yīng)時(shí)間,基于Impala的改進(jìn)CURE聚類算法將不同粒度上的實(shí)時(shí)數(shù)據(jù)一次性聚合后存儲(chǔ)到Impala中,可識(shí)別任意形狀的簇,不斷凝聚或分裂簇,對(duì)非球形簇的識(shí)別度較高。改進(jìn)CURE聚類算法對(duì)孤立點(diǎn)敏感度低。在簇識(shí)別的過(guò)程中,若簇增長(zhǎng)緩慢或異常的小,可作為異常點(diǎn)來(lái)剔除,降低了孤立點(diǎn)敏感度。
碰撞比對(duì)系統(tǒng)通過(guò)界面拖拽可實(shí)現(xiàn)數(shù)據(jù)的任意碰撞或根據(jù)自定義規(guī)則進(jìn)行碰撞,支持兩兩數(shù)據(jù)源碰撞和多數(shù)據(jù)源碰撞,方便實(shí)現(xiàn)以人找案和以案找人的功能。碰撞比對(duì)系統(tǒng)支持單點(diǎn)碰撞比對(duì)和分布式碰撞比對(duì)。省廳里某些數(shù)據(jù)在本地?cái)?shù)據(jù)源里沒(méi)有碰撞出來(lái),可分步到各地市數(shù)據(jù)源進(jìn)行碰撞,將結(jié)果分別返回并且合并匯總再統(tǒng)一展示。碰撞效率高,比傳統(tǒng)的架構(gòu)要快數(shù)10倍,同時(shí)支持?jǐn)?shù)據(jù)源上傳和碰撞結(jié)果下載。該應(yīng)用準(zhǔn)確并極速地實(shí)現(xiàn)單類多源、多類多源數(shù)據(jù)間的碰撞比對(duì),比傳統(tǒng)基于Oracle數(shù)據(jù)庫(kù)的碰撞比對(duì)性能提高上百倍,大大提高了破案效率。其系統(tǒng)的界面實(shí)現(xiàn)如圖3所示。
由于Hadoop和Hive處理數(shù)據(jù)存在不足,不適合對(duì)延時(shí)要求高的交互式分析、復(fù)雜迭代的數(shù)據(jù)處理和實(shí)時(shí)分析系統(tǒng)。為適應(yīng)公共安全領(lǐng)域?qū)崟r(shí)查詢的應(yīng)用需求,本文創(chuàng)新性提出將Impala框架應(yīng)用于公共安全領(lǐng)域數(shù)據(jù)的實(shí)時(shí)查詢分析中,研制了在Impala和HDFS上運(yùn)行的改進(jìn)CURE碰撞比對(duì)算法,為存儲(chǔ)在HDFS的數(shù)據(jù)提供快速、交互式的ANSI-92 SQL所有子集的SQL查詢,實(shí)現(xiàn)了異構(gòu)數(shù)據(jù)源的統(tǒng)一查詢,其并發(fā)客戶端處理的速度上超越了Hive。Impala不使用緩慢的Hive和MapReduce批處理,通過(guò)與商用并行關(guān)系數(shù)據(jù)庫(kù)中類似分布式查詢引擎,直接從HDFS中用SELECT、Join和統(tǒng)計(jì)函數(shù)查詢數(shù)據(jù)降低了延遲。該系統(tǒng)的實(shí)現(xiàn)對(duì)公安構(gòu)建大規(guī)模的數(shù)據(jù)分析查詢系統(tǒng)具有借鑒意義,可提供技術(shù)參考。
[1]MELNIK S,GUBAREV A,LONG Jingjing,et al.Dremel:innteractive analysis of Web-scale datasets[J].Proceedings of the VLDB Endowment,2010,3(1):330-339.
[2] CDH4里的Impala安裝使用文檔[EB/OL].[2015-01-03].http://download.csdn.net/detail/lostage2/4911752.
[3] ENGLE C,LUPHER A,XIN R,et al.Shark:fast data analysis using coarse-grained distributed memory[EB/OL].[2015-02-03].http://libra.msra.cn/Publication/56916420.
[5]D'ORAZIO L,BIMONTE S.Multidimensional arrays for warehousing data on clouds[C]//Proc.the Data Management in Grid and Peer-to-Peer Systems.Berlin,Heidelberg:Spring-Verlag,2010:26-37.
[6]OLSTON C,REED B,SRIVASTAVA U,et al.Pig latin:A notso-foreign language for data processing[EB/OL].[2015-02-03].http://citeseer.ist.psu.edu/viewdoc/summary?doi=10.1.1.124.5496.
[7] DEBRABANT J,PAVLO A,TU S,et al.Anti-Caching:a new approach to database management system architecture[EB/OL].[2015-02-03].http://www.dajudeng.com/d20120810089e6ef5158fb 770bf68a5518.htm l.
[8] YOU J G,XI J Q,ZHANG P J,et al.A parallel algorithm for closed cube computation[J].Computer and Information Science,2008(8):103-115.
[9]HAN H,LEE Y C,CHOI S,et al.Cloud-aware processing of MapReduce-based OLAP applications[EB/OL].[2015-02-03].http://www.researchgate.net/publication/262242831_Cloud-aware_proces sing_of_MapReduce-based_OLAP_applications.
[10]LICHTENWALTER R N,LUSSIER JT,CHAWLA N V.New perspectives and methods in link prediction[EB/OL].[2015-02-03].http://videolectures.net/kdd2010_lichtenwalter_npml/.