李子乾 朱青 徐雨申
摘 要:隨著市場上自主研發(fā)的數(shù)據(jù)庫的大量出現(xiàn),面對各具特色的數(shù)據(jù)庫因差異性而導致的數(shù)據(jù)etl過程困難的問題,本文探討了一種通過kafka作為可靠的實時數(shù)據(jù)中轉(zhuǎn),然后通過spark streaming任務(wù)來實現(xiàn)數(shù)據(jù)入庫的技術(shù)路線,最終解決了基于電力客服業(yè)務(wù)的數(shù)據(jù)倉庫實時數(shù)據(jù)接入的問題。該技術(shù)路線能夠一定程度上解決一部分數(shù)據(jù)實時數(shù)據(jù)接入的困難。與此同時,可以在數(shù)據(jù)接入過程中進行復雜的數(shù)據(jù)流式計算。
關(guān)鍵詞:實時數(shù)據(jù)接入;kafka offset;spark streaming
中圖分類號:TM769 文獻標識碼:A 文章編號:1671-2064(2020)01-0038-03
1 背景與問題
現(xiàn)如今大數(shù)據(jù)技術(shù)發(fā)展日新月異,數(shù)據(jù)處理框架及方法也與日俱增,但是,數(shù)據(jù)集成的接入方法缺存在一定短板,如在滿足實時數(shù)據(jù)接入的需求上存在一定的難度及不實用性。伴隨著電力客服業(yè)務(wù)體量的擴展及海量業(yè)務(wù)數(shù)據(jù)的不斷增加,數(shù)據(jù)接入的實時性問題成了阻礙進一步數(shù)據(jù)分析應(yīng)用的絆腳石,因此,一種數(shù)據(jù)倉庫實時數(shù)據(jù)接入方法應(yīng)運而生,本文將主要針對電力客服業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)倉庫實時接入方法進行的研究及論證。
在數(shù)據(jù)倉庫建設(shè)過程,一般數(shù)據(jù)倉庫的技術(shù)選型大多采用了mpp集群?;谠朴嬎憷砟畹牟⑿袛?shù)據(jù)庫集群,能夠支持TB到PB級別的結(jié)構(gòu)化數(shù)據(jù)存儲、高效查詢[3]。電力客服數(shù)據(jù)倉庫采用了分層的架構(gòu),在最接近原始業(yè)務(wù)系統(tǒng)數(shù)據(jù)的一層稱為數(shù)據(jù)明細層,詳細數(shù)據(jù)倉庫架構(gòu)如圖1所示。
將業(yè)務(wù)數(shù)據(jù)從數(shù)據(jù)貼源區(qū)接入到數(shù)據(jù)倉庫明細層的過程中,需要實現(xiàn)部分結(jié)果化數(shù)據(jù)實時接入。在傳統(tǒng)的關(guān)系型數(shù)據(jù)庫中,實時數(shù)據(jù)接入常使用oracle goldengate(下簡稱為ogg)來進行數(shù)據(jù)接入。所以,首先考慮使用ogg來進行實時數(shù)據(jù)接入。ogg出于對數(shù)據(jù)可靠性的保證,在數(shù)據(jù)接入的過程中,需要創(chuàng)建checkpoint來記錄數(shù)據(jù)接入任務(wù)的執(zhí)行狀態(tài),以保證如果接入進行中斷的時候,可以通過讀取checkpoint的狀態(tài)數(shù)據(jù)來恢復接入進程中斷前的狀態(tài)。ogg創(chuàng)建的checkpoint的主鍵是包含4個字段,而mpp集群支持數(shù)據(jù)表的最大聯(lián)合主鍵數(shù)為3。因此,mpp集群作為ogg數(shù)據(jù)接入的目的端的時候,無法正常使用檢查點功能,數(shù)據(jù)傳輸?shù)目煽啃允盏搅司薮蟮挠绊憽?/p>
本文在接下來的篇幅中,將詳細介紹,一種可以解決可靠實時數(shù)據(jù)傳輸?shù)募夹g(shù)。
2 解決辦法
為了解決在mpp集群中無法創(chuàng)建checkpointtable的問題。首先嘗試,通過手動的方式來創(chuàng)建checkpointtable,但是自行創(chuàng)建的表,ogg數(shù)據(jù)抽取進程無法識別。
在經(jīng)過一些方式的嘗試后,決定通過在源端和目標端添加中轉(zhuǎn)的方式來實現(xiàn)mpp集群中的實時數(shù)據(jù)接入。
在仔細分析了ogg的checkpoint table的實現(xiàn)機制后,發(fā)現(xiàn)其與kafka的offset機制十分類似。在kafka中,offset是一個用于存儲每個消息被追加到分區(qū)的序列號的變量,offset的值是隨著消息的消費情況不斷更新的。[1]
具體來說,kafka中的Offset分為兩種:Current Offset和Committed Offset。
Current Offset保存在消費者側(cè),表示consumer消費者已經(jīng)接收的消息序號。舉個例子來說,consumer目前接收了10條消息,則當前current offset的值為10。于是消費者下一次消費的時候,就會從第11條消息開始,這樣可以避免每次消費者從topic中獲取消息的時候可以避免重復。
而commited offset保存在broker上,表示consumer消費者消費過的消息序號。舉例來說,消費者接收了10條數(shù)據(jù),此時消費者這邊的current offset是10。但是消費者接收到消息后,是否真正意義上消費了該條消息是不確定的。這里就涉及到kafka的可靠信息傳輸機制,kafka的消息在被消費者消費后,是需要消費者反饋和同步消費情況的。這一特性實現(xiàn)的機制是依靠commitSync和commitAsync兩個方法來實現(xiàn)的。當消費者接收到topic的消息后,current offset會立刻更新到最新的消息序號,然后消費者拿到接收到的數(shù)據(jù)后,開始進行消費也就是計算和處理。完成消費過程后,就會通過調(diào)用commitSync和commitAsync將消息的消費情況返回給topic,broker在接收到commitSync和commitAsync信號后,會將commited offset更新為最新的序號。表示當前已確認消費的序號。
如圖2所示,committed offset為3,current offset為5。這表明當前時間,消費者雖然接收到了5條消息,但是第4條與第5條并未被消費,已經(jīng)消費的消息才到第三條,當?shù)谒臈l和第五條消費之后,消費者會返回一個同步信號給broker,然后committed offset才會更新。而與此同時,current offset會不斷增長,消費者接收數(shù)據(jù)的過程是不會停止的。與消費是同時進行的,兩者并不干擾,但current offset始終比committed offset要大。
Committed offset在broker一端是單獨由一個topic來記錄和管理的。當其更新的時候,最新的commited offset就會被寫入__consumer_offsets的topic中。這樣當kafka出現(xiàn)進程意外停止或者是consumer group成員出現(xiàn)變化,需要consumer rebalance的時候,commited offset就可以保證新的Consumer能夠從正確的位置開始消費一條消息,從而避免重復消費。這樣的話,就可以實現(xiàn)類似于ogg中的checkpoint table的功能。[1]
3 接入設(shè)計
基于上述的kafka offset機制,可以完美地解決mpp集群無法創(chuàng)建檢查點的問題。首先將原始貼源區(qū)數(shù)據(jù)通過ogg的方式實時接入到kafka中,然后通過spark streaming程序訂閱kafka中的消息,將ogg格式的消息轉(zhuǎn)換處理成mpp格式數(shù)據(jù),最終實現(xiàn)數(shù)據(jù)倉庫實時數(shù)據(jù)接入。
3.1 oracle實時數(shù)據(jù)接入kafka
在ogg將數(shù)據(jù)接入到kafka過程中,首先需要通過抽取進程,將數(shù)據(jù)抽取放進本地指定的數(shù)據(jù)文件隊列,然后通過投遞進程,將數(shù)據(jù)文件傳送到目的端,目的端ogg客戶端在接收到數(shù)據(jù)文件后,將文件放入指定的遠程數(shù)據(jù)隊列中,然后通過復制進程,將數(shù)據(jù)文件解析后,以生產(chǎn)者的方式將數(shù)據(jù)發(fā)布到kafka的topic中。Oracle goldengate原理詳細情況如圖3所示。
3.1.1 源與目標端配置管理進程
在源端和目標端完成ogg的安裝后,兩端都需要配置mgr管理進程。Manager進程是ogg的控制進程,運行在源端和目標端上。它主要作用有以下幾個方面:啟動、監(jiān)控、重啟Goldengate的其他進程,報告錯誤及事件,分配數(shù)據(jù)存儲空間,發(fā)布閥值報告等。
在管理進程中需要配置的有:ogg進程的監(jiān)聽端口;出了指定的固定端口以外還需要指定一系列的動態(tài)端口列表,當指定的默認的監(jiān)聽端口不可用時,會在動態(tài)端口中隨機挑選一個作為進程的監(jiān)聽端口;另外需要設(shè)置自動重啟的參數(shù)用于管理進程下某個進程中斷或者是重啟管理進程,管理進程會自動重啟這些進程,設(shè)定重啟的最大次數(shù)以及時間間隔;此外還需要設(shè)定定期清理ogg的傳輸數(shù)據(jù)文件的周期。
3.1.2 oracle源端配置抽取進程以及投遞進程
在源端配置抽取進程,將需要進行實時傳輸?shù)臄?shù)據(jù)表配置進抽取進程。
配置過程中需要配置動態(tài)解析源端數(shù)據(jù)表;需要設(shè)置環(huán)境變量,指定源端數(shù)據(jù)庫以及字符集以及連接源端數(shù)據(jù)庫的密碼;然后指定數(shù)據(jù)庫中抽取出的數(shù)據(jù)的保存位置以及文件名;最后配置需要復制表的清單。
在配置完抽取進程后,需要配置相配合的投遞進程,用于將抽取出來的隊列文件發(fā)送到指定的目的端服務(wù)器上。在配置過程重要配置參數(shù)有,除了同樣需要配置禁止ogg與oracle交互以及動態(tài)解析以外,還需要配置遠程目標端的ip地址和目的端的管理進程的監(jiān)聽端口,用于構(gòu)建點到點的數(shù)據(jù)傳輸通道,同樣需要配置目的端用于存放傳輸?shù)年犃形募穆窂健?/p>
配置好抽取和投遞進程后,需要分別將本地隊列文件路徑和目標端的隊列文件路徑和抽取進程進行綁定。
3.1.3 配置數(shù)據(jù)表define文件
ogg在傳輸數(shù)據(jù)的過程中,需要將傳輸?shù)臄?shù)據(jù)表的詳細定義信息發(fā)送到目標端。
首先,配置一個需要導出表定義文件的表清單,然后在在ogg根目錄下調(diào)用defgen指令執(zhí)行對應(yīng)的配置文件,即可自動生成數(shù)據(jù)表定義文件,然后將表定義文件發(fā)送到目的端的指定目錄下。
3.1.4 kafka目標端配置檢查點
kafka端配置好檢查點后,會自動記錄數(shù)據(jù)同步的當前進度,當程序中斷恢復的時候,將從檢查點表中記錄的最新狀態(tài)還原。
3.1.5 kafka目標端配置復制進程
源端通過抽取進程從數(shù)據(jù)表中抽取了數(shù)據(jù)文件,然后通過投遞進程發(fā)送到目的端。目的端接收到數(shù)據(jù)文件后,需要使用復制進程,將接收的數(shù)據(jù)文件插入到目標端指定的數(shù)據(jù)庫中,此處即將數(shù)據(jù)復制進kafka中,復制進程中設(shè)計詳細的參數(shù)有:
復制進程中需要指定從源端服務(wù)器上傳輸過來的表定義文件;然后定義kafka的詳細配置,以及對數(shù)據(jù)進入kafka指定了固定格式以及方式和復制任務(wù)的報告生成頻率;然后需要設(shè)置復制進程以事務(wù)傳輸時,事務(wù)合并的單位,用這種方式來減少IO操作;最后需要詳細配置源端與目標端的映射關(guān)系。
配置好復制進程后,同樣需要將隊列文件路徑綁定到復制進程上。
3.1.6 配置kafka.props
此處主要需要配置的參數(shù)為topicname,表示數(shù)據(jù)進入的topic名稱,以及數(shù)據(jù)進入topic的指定數(shù)據(jù)格式。
3.1.7 啟動全部進程
將上述配置好的抽取,投遞,復制進程逐一啟動,啟動任務(wù)后,數(shù)據(jù)將實時從源端數(shù)據(jù)庫接入到kafka中。
3.2 kafka數(shù)據(jù)實時接入mpp集群
將kafka topic中數(shù)據(jù)接入到數(shù)據(jù)倉庫中,首先需要將kafka的數(shù)據(jù)取出,需要定義一個消費者來消費topic中數(shù)據(jù)。然后在消費者的程序中將kafka中的數(shù)據(jù)解析成可以加載進入mpp集群的的數(shù)據(jù)格式,可以解析成純數(shù)據(jù)文件,也可以直接將數(shù)據(jù)換成sql語句。
首先,需要通過sparkStreaming來讀取kafka的數(shù)據(jù),這里存在兩種模式,是receiver-base和direct。兩者存在一定的區(qū)別。
Receiver模式首先創(chuàng)建一個receiver也就是接收器從kafka接收數(shù)據(jù)并存儲在Spark executor中,然后用觸發(fā)的方式去處理接收到的數(shù)據(jù)。為了不丟數(shù)據(jù),需要開啟WAL機制,這會將receiver接收到的數(shù)據(jù)寫一份備份到其他的存儲組件中去。
Direct模式,是定期查詢kafka中的每個partition的最新的offset,每個批次拉取上次處理的offset和當前查詢的offset的范圍的數(shù)據(jù)進行處理。這種模式為了保證數(shù)據(jù)傳輸?shù)目煽啃裕琽ffset是需要手動保存的。
這里,本文介紹的方法中,選用了direct模式來消費kafka。
在配置好連接kafka的功能后,將每個接受到的消息轉(zhuǎn)化為sparkStreaming rdd任務(wù),對每個rdd任務(wù)進行數(shù)據(jù)的解析。[4]
由于kafka中保存的ogg同步數(shù)據(jù)格式可以解析成json格式,獲取到j(luò)son數(shù)據(jù)后,可以通過數(shù)據(jù)中的操作類型來將數(shù)據(jù)還原成sql。
在完成sql的解析還原后,只需要通過執(zhí)行jdbc去執(zhí)行該條sql,就可以完成數(shù)據(jù)的入庫。
在整個過程中,通過創(chuàng)建streamingContext類來實現(xiàn)數(shù)據(jù)流的監(jiān)聽功能。[5]
這里通過終端等待的方式,來監(jiān)聽kafka中的數(shù)據(jù),當kafka中產(chǎn)生新的數(shù)據(jù)時,kafka就會給spark streaming發(fā)送signal,而spark程序則會以響應(yīng)的方式,立刻去消費kafka中新產(chǎn)生的數(shù)據(jù)。[2]但是這樣的過程是存在一定的延遲的。在實際的測試過程中,這樣的延遲可能會達到5s左右,對于數(shù)據(jù)倉庫實時數(shù)據(jù)接入的需求來看,這樣程度的延遲是可以接受的。
綜上,上述過程即為數(shù)據(jù)倉庫實時數(shù)據(jù)接入的一種實現(xiàn)方式。
4 結(jié)語
本文詳細介紹了,在傳統(tǒng)的實時數(shù)據(jù)傳輸工具(ogg)存在一定局限性的時候,首先分析了限制工具的影響因素,從中分析出了根本原因,由于mpp集群的主鍵數(shù)量限制,采用將數(shù)據(jù)先實時傳輸?shù)絢afka中,并且通過kafka實現(xiàn)了數(shù)據(jù)的可靠傳輸,然后通過spark Streaming實現(xiàn)了響應(yīng)式的數(shù)據(jù)入庫過程。最終以幾秒的延遲代價,解決了數(shù)據(jù)倉庫mpp集群的實時數(shù)據(jù)接入問題。
從這個技術(shù)路線向外延展,通過kafka中轉(zhuǎn),再到spark Streaming流處理。該條技術(shù)實現(xiàn)完全可以實現(xiàn)更多的數(shù)據(jù)處理與計算。通過spark Streaming,kafka中的數(shù)據(jù)完全可以流向更多的數(shù)據(jù)組件而不僅僅局限于本文所提到的mpp集群。
參考文獻
[1] 費秀宏.基于Kafka的日志處理平臺的研究[D].長春:吉林大學,2017.
[2] 薛瑞,朱曉民.基于Spark Streaming的實時日志處理平臺設(shè)計與實現(xiàn)[J].電信工程技術(shù)與標準化,2015(09):55-58.
[3] EfemG.Mallach.決策支持與數(shù)據(jù)倉庫系統(tǒng)[M].李昭智,李昭勇,譯.北京:電子工業(yè)出版社,2001.
[4] 黨壽江,劉學,王星凱,等.基于Spark Streaming的實時數(shù)據(jù)采集分析系統(tǒng)設(shè)計[J].網(wǎng)絡(luò)新媒體技術(shù),2017(05):48-53.
[5] 韓德志,陳旭光,雷雨馨,等.基于Spark Streaming的實時數(shù)據(jù)分析系統(tǒng)及其應(yīng)用[J].計算機運用,2017(05):1263-1269.