摘? 要:一般的大數(shù)據(jù)平臺在歷史數(shù)據(jù)處理方面大多都是先通過一些數(shù)據(jù)導入工具比如Sqoop、DataX等進行數(shù)據(jù)全量導入,而實時數(shù)據(jù)處理更加關注的是數(shù)據(jù)的實時性。針對實時數(shù)據(jù)處理問題,文章基于Spark Streaming設計實現(xiàn)了一種實時數(shù)據(jù)處理系統(tǒng),能夠?qū)崿F(xiàn)高效的實時數(shù)據(jù)接入、傳輸、計算校驗和存儲。該系統(tǒng)具有實時數(shù)據(jù)獲取、實時數(shù)據(jù)計算、實時數(shù)據(jù)存儲等特點,為進一步從實時數(shù)據(jù)中獲取有效信息提供了必要的基礎支撐。
關鍵詞:實時;Flume;Kafka;Spark Streaming;HBase
中圖分類號:TP274 ? ? ?文獻標識碼:A 文章編號:2096-4706(2020)20-0010-03
Design and Implementation of Real-time Data Processing System
Based on Spark Streaming
SHI Zhao
(Nanjing University of Posts and Telecommunications,Nanjing? 210023,China)
Abstract:In terms of historical data processing,general big data platforms mostly import full data through some data import tools such as Sqoop,DataX,etc.,while real-time data processing pays more attention to the real-time nature of the data. Aiming at the real-time data processing problem,this paper designs and implements a real-time data processing system based on Spark Streaming,which can realize efficient real-time data access,transmission,calculation,verification and storage. The system has the characteristics of real-time data acquisition,real-time data calculation,real-time data storage,etc.,which provides the necessary basic support for further obtaining effective information from real-time data.
Keywords:real-time;Flume;Kafka;Spark Streaming;HBase
0? 引? 言
隨著大數(shù)據(jù)技術(shù)的發(fā)展,人們越來越重視數(shù)據(jù)中潛藏著的價值。利用大數(shù)據(jù)技術(shù),我們可以從數(shù)據(jù)中挖掘其隱藏的價值,為我們的生產(chǎn)、生活和學習提供有力的指導。社會生產(chǎn)和生活中每時每刻在產(chǎn)生新的實時數(shù)據(jù),但我們對這些數(shù)據(jù)的處理還不充分。如何從這些數(shù)據(jù)中獲取更多有效的信息支撐應用中的實時響應,是目前的研究熱點,Spark Streaming技術(shù)的出現(xiàn),為我們實現(xiàn)高效實時數(shù)據(jù)處理提供了技術(shù)支撐。本文使用Spark Streaming技術(shù)設計了一個實時數(shù)據(jù)處理系統(tǒng),該系統(tǒng)是作者在參與上海德拓信息技術(shù)股份有限公司南京分公司關于離線數(shù)據(jù)采集存儲系統(tǒng)的開發(fā)與維護的經(jīng)驗基礎上提出的一種實時數(shù)據(jù)處理系統(tǒng)。
1? 實時數(shù)據(jù)處理系統(tǒng)的需求
實時數(shù)據(jù)的產(chǎn)生多種多樣,如用戶的瀏覽信息、游客的出行記錄信息、顧客的消費信息等,這些信息會被系統(tǒng)記錄在數(shù)據(jù)庫或日志中。導入這些數(shù)據(jù),需要對這些數(shù)據(jù)庫或者日志監(jiān)控,數(shù)據(jù)寫入數(shù)據(jù)庫或者日志后通過監(jiān)控獲取這些數(shù)據(jù),還需要經(jīng)過數(shù)據(jù)的清洗。數(shù)據(jù)錄入錯誤、數(shù)據(jù)的數(shù)值不正確等多種原因?qū)е碌呐K數(shù)據(jù),若不經(jīng)過校驗和清洗就直接導入到庫中,會產(chǎn)生極大的成本和時間代價。因此獲取到的數(shù)據(jù)需要經(jīng)過數(shù)據(jù)清洗后存入到數(shù)據(jù)庫以避免臟數(shù)據(jù)帶來的影響。
2? 實時數(shù)據(jù)處理系統(tǒng)分析
一個實時數(shù)據(jù)處理過程包含了數(shù)據(jù)的接入、數(shù)據(jù)的傳輸、數(shù)據(jù)的計算校驗和數(shù)據(jù)的存儲,其具體流程如圖1所示。首先需要有數(shù)據(jù)接入,有了數(shù)據(jù)之后需要將數(shù)據(jù)傳輸?shù)较鄳恢玫却龜?shù)據(jù)計算校驗,經(jīng)過計算校驗之后的數(shù)據(jù)才能存儲進數(shù)據(jù)庫。
2.1? 數(shù)據(jù)接入分析
根據(jù)所需獲取的數(shù)據(jù)的來源不同,可分為兩種。若是獲取數(shù)據(jù)庫中的實時增加的數(shù)據(jù),可以開啟數(shù)據(jù)庫的binlog日志[1],然后配置canal中deployer的instances.properties配置文件和adapter中的application.yml以及hbase.yml配置文件,實現(xiàn)源庫和目標庫之間數(shù)據(jù)同步,原理就是數(shù)據(jù)庫之間的主從復制。若是獲取的數(shù)據(jù)為日志文件中的新增數(shù)據(jù),我們可以使用Flume這樣一個高可用的海量日志采集聚合傳輸?shù)墓ぞ遊2]。Flume用來監(jiān)測日志文件的變化,一旦日志文件的內(nèi)容發(fā)生變化,F(xiàn)lume便可以獲取新增的數(shù)據(jù)內(nèi)容。Flume由Source,Channel和Sink組成,Source負責完成數(shù)據(jù)的收集,Channel對Source提供的數(shù)據(jù)進行緩存,Sink取出Channel中的數(shù)據(jù),存入到相應的文件系統(tǒng)(HDFS)、數(shù)據(jù)庫或者Kafka中,其系統(tǒng)架構(gòu)圖如圖2所示。
2.2? 數(shù)據(jù)傳輸分析
數(shù)據(jù)的接入速率與數(shù)據(jù)的處理速率不同,這就需要在數(shù)據(jù)接入和數(shù)據(jù)處理之間加上一個緩沖區(qū)。這個緩沖區(qū)必須是高性能的,而且可用于實時事件響應的場景,Kafka正滿足這些需求。Kafka是Apache下的一個開源流處理平臺,可以處理用戶在日常生活中的所有動作流數(shù)據(jù),同時Kafka可以以集群模式運行,緩解節(jié)點或者服務器之間的壓力。因此我們可以將由Flume監(jiān)測日志文件而獲取的數(shù)據(jù)發(fā)送給Kafka的topic,存儲在Kafka的緩沖區(qū)中,等待Kafka消費者將這些采集到的數(shù)據(jù)“消費”。
2.3? 數(shù)據(jù)計算校驗分析
一般的數(shù)據(jù)計算方法是通過MapReduce完成,但Map-Reduce僅僅支持map和reduce操作,操作單一,map的中間結(jié)果寫入磁盤,reduce的結(jié)果寫入HDFS,大數(shù)據(jù)量的MapReduce操作所花費的時間會很高,因此MapReduce不適合用于實時計算的場景。spark是內(nèi)存計算,避免了多次計算的中間結(jié)果寫到HDFS的I/O開銷,且spark提供的RDD操作很多[3]。因此Spark Streaming[4]用在數(shù)據(jù)計算校驗部分正合適。
2.4? 數(shù)據(jù)存儲分析
實時數(shù)據(jù)導入系統(tǒng)中將經(jīng)過Spark Streaming流式處理[5]后的數(shù)據(jù)存儲到HBase中[6]。HBase具有海量存儲,高并發(fā),極易擴展,成本較低等特點,而且HBase可以同時存儲多版本數(shù)據(jù)(HBase使用不同Timestamp來標識相同Rowkey行對應的不同版本數(shù)據(jù))。HBase查詢數(shù)據(jù)的響應速度很快,這是因為HBase的特殊尋址方式(請求ZooKeeper獲取元數(shù)據(jù),訪問元數(shù)據(jù)獲取RegionServer地址,訪問RegionServer獲取所需數(shù)據(jù)),尋址訪問的同時會把元數(shù)據(jù)的相關信息緩存下來。這樣的訪問方式成就了HBase的快速響應的特點。HBase表中Rowkey的設計至關重要。HBase中有很多region,每個region都有startRowKey和stopRowKey,若Rowkey設計不合理,會導致某個region被頻繁訪問,造成熱點現(xiàn)象,引起節(jié)點性能下降。因此HBase的Rowkey設計需要注意Rowkey的長度一般不超過16個字節(jié);需要保證Rowkey的唯一性(HBase表中的數(shù)據(jù)是以KeyValue的形式存在的,若插入相同Rowkey的值則原先的數(shù)據(jù)會被覆蓋);Rowkey設計要充分利用其有序性;設計的Rowkey應該均勻分布在各個HBase節(jié)點上。
3? 實時數(shù)據(jù)處理系統(tǒng)的設計與實現(xiàn)
通過上一節(jié)的分析,本文實現(xiàn)了一種實時數(shù)據(jù)處理系統(tǒng),通過Flume實時數(shù)據(jù)獲取模塊獲取日志文件和數(shù)據(jù)庫中的實時新增數(shù)據(jù),再將數(shù)據(jù)暫存到Kafka消息中間件,由Spark Streaming調(diào)用Kafka中的數(shù)據(jù)做實時處理,處理后的結(jié)構(gòu)存儲到HBase中。其結(jié)構(gòu)圖如圖3所示。
3.1? 數(shù)據(jù)接入設計
數(shù)據(jù)接入以Flume為例,F(xiàn)lume是由Source、Channel和Sink組成,需要將這三部分配置好并串聯(lián)起來。設置Source端的監(jiān)測命令為tail -F/root/flume.log,監(jiān)測flume.log文件內(nèi)容的變化,若tail-F/root/flume.log命令監(jiān)測到日志文件內(nèi)容變動,F(xiàn)lume會獲取這些內(nèi)容。
3.2? 數(shù)據(jù)傳輸設計
設置flume sink端的類型為Kafka,配置Kafka的boot-strapserver地址并設置Kafka的topic。除此之外還需要設置序列化方式為kafka.serializer.StringEncoder。通過Kafka的bootstrapserver和對應的topic,才能將Source端收集到的數(shù)據(jù)準確無誤的傳輸?shù)終afka指定的topic。配置好Source和Sink后,需要再配置一個Channel將Source端收集到的數(shù)據(jù)傳輸?shù)終afka中。
3.3? 數(shù)據(jù)計算校驗設計
數(shù)據(jù)存儲到了Kafka的topic中,需要創(chuàng)建一個消費者消費采集到的數(shù)據(jù)。這里的消費者是通過Spark Streaming實現(xiàn)的,通過Spark Streaming對采集到的數(shù)據(jù)進行計算校驗。首先需要將Spark Streaming與Kafka連接,才能讀取到topic中的數(shù)據(jù)。因此需要配置bootstrapserver、key.deserializer、value.deserializer,設置topic、groupid,kafkaoffset的維護等級,關閉自動提交。
經(jīng)過上一步取到topic中的數(shù)據(jù)后,就需要對數(shù)據(jù)進行計算校驗了。先將數(shù)據(jù)以空格切分得到一個字符串數(shù)組,若數(shù)組長度或者數(shù)組中的數(shù)據(jù)值不符合需求則不存儲這條數(shù)據(jù)。以WordCount為例,讀取topic中的數(shù)據(jù)后首先需要將這一條數(shù)據(jù)以空格分隔存儲在數(shù)組中,然后遍歷這個數(shù)組并將其轉(zhuǎn)換成一個元組,最后將相同key值的元組聚合得到單詞出現(xiàn)的次數(shù)。其中獲取到的數(shù)據(jù)經(jīng)flatmapRDD、mapToPairRDD和reduceByKeyRDD計算獲得最終結(jié)果。
3.4? 數(shù)據(jù)存儲設計
將Spark Streaming處理計算后的數(shù)據(jù)存入HBase,需要先在HBase數(shù)據(jù)庫中創(chuàng)建好對應的表,建表語句如下:create ‘wordcount,sz,指定表名為WordCount,列族為sz。這里以WordCount為例,統(tǒng)計每個單詞出現(xiàn)的次數(shù),因此Rowkey就是每個RDD中tuple的key值,也就是這些出現(xiàn)過的單詞。表建好后,需要將經(jīng)過RDD處理計算好的數(shù)據(jù)插入到HBase表中。
首先輪訓RDD中所有的元組,然后創(chuàng)建HBase配置對象,指定ZooKeeper的端口號和ZooKeeper的節(jié)點,指定HBase的HMaster節(jié)點,創(chuàng)建連接對象并指定表名從而與HBase連接并把每個元組的key和value插入HBase表中。其中key為HBase表的Rowkey,value為對應列族sz,對應列num的cell中的值。
當?shù)谝慌鷶?shù)據(jù)處理完后第二批數(shù)據(jù)處理時,有可能會有相同的key的數(shù)據(jù),由于HBase可以存儲多版本數(shù)據(jù),這樣的相同key的數(shù)據(jù)插入進數(shù)據(jù)庫時,會被HBase當作新版本的數(shù)據(jù)存儲下來,并不能實現(xiàn)累加,這顯然與WordCount計數(shù)的需求不同。因此需要刪除表中原有的這一條數(shù)據(jù),插入新值與原值的和。因此需要獲取表中的所有Rowkey,每當要插入數(shù)據(jù)時,先檢測是否會有相同的Rowkey出現(xiàn),若表中不存在相同的Rowkey則直接插入該條數(shù)據(jù);若表中存在相同的Rowkey,則需要記錄原有的值并刪除這條數(shù)據(jù),插入原值和現(xiàn)有值的和從而達到計數(shù)的目的。
3.5? 系統(tǒng)運行介紹
整個系統(tǒng)是檢測日志文件的,所以先要向日志文件里寫入數(shù)據(jù),如圖4所示。
運行Spark Streaming代碼,讀取監(jiān)測到的數(shù)據(jù)并做計算處理,程序輸出如圖5所示。
查看HBase中的數(shù)據(jù),檢驗計算后的數(shù)據(jù)是否成功存入,HBase中的數(shù)據(jù)如圖6所示。
4? 結(jié)? 論
隨著大數(shù)據(jù)技術(shù)的不斷發(fā)展,企業(yè)越來越意識到實時數(shù)據(jù)中的價值。因此本文設計了一種基于Spark Streaming的實時數(shù)據(jù)處理系統(tǒng),能夠?qū)崿F(xiàn)實時數(shù)據(jù)的接入、傳輸、計算校驗和存儲。在互聯(lián)網(wǎng)時代,大數(shù)據(jù)技術(shù)已經(jīng)日益成熟,許多企業(yè)已經(jīng)開始著眼于數(shù)據(jù)的隱藏價值,并開始著手建立基于大數(shù)據(jù)的分布式數(shù)據(jù)采集和數(shù)據(jù)應用平臺,該實時數(shù)據(jù)處理系統(tǒng)可以使企業(yè)快人一步處理實時數(shù)據(jù)。但此系統(tǒng)也有些不足,實時數(shù)據(jù)處理系統(tǒng)只是將實時數(shù)據(jù)采集并存儲起來,并沒有對數(shù)據(jù)進行應用,后續(xù)可用機器學習技術(shù)對數(shù)據(jù)的價值進行深層次的挖掘。
參考文獻:
[1] 蘇子權(quán).基于MySQL Binlog的數(shù)據(jù)增量同步系統(tǒng)的設計與實現(xiàn) [D].南京:南京大學,2018.
[2] 袁昌權(quán),胡益群,許光,等.基于Hadoop的高可用數(shù)據(jù)采集與存儲方案 [J].電子技術(shù)與軟件工程,2019(18):169-170.
[3] 吳信東,嵇圣硙.MapReduce與Spark用于大數(shù)據(jù)分析之比較 [J].軟件學報,2018,29(6):1770-1791.
[4] 柯杰.基于Spark Streaming日志實時監(jiān)測系統(tǒng)的設計與實現(xiàn) [D].南京:東南大學,2017.
[5] 李欣.基于Spark/HBase的交通流數(shù)據(jù)存儲及索引模型探討 [J].地理與地理信息科學,2019,35(4):1-8.
作者簡介:施炤(1995.07—),男,漢族,江蘇鎮(zhèn)江人,碩士研究生,研究方向:大數(shù)據(jù)技術(shù)。