安建民,周一波,彭送庭
(英大泰和人壽保險股份有限公司,北京 100089)
統(tǒng)計分析系統(tǒng)是公司各級管理人員了解前一日公司經(jīng)營情況的重要系統(tǒng)。該系統(tǒng)通過數(shù)據(jù)抽取軟件,每日批處理生成各種統(tǒng)計指標(biāo)。隨著互聯(lián)網(wǎng)的高速發(fā)展,數(shù)據(jù)規(guī)模的不斷增加,數(shù)據(jù)變化速度越來越快[1],用戶希望得到更快的數(shù)據(jù)處理和響應(yīng)時間[2],因此建設(shè)數(shù)據(jù)實時展示系統(tǒng),實現(xiàn)對指標(biāo)數(shù)據(jù)的實時計算變得越來越重要。
為讓公司各級管理人員能夠?qū)崟r看到指標(biāo)數(shù)據(jù),本文設(shè)計的保險大寬表系統(tǒng),利用分布式流計算技術(shù)[3]解決了在分布式環(huán)境下數(shù)據(jù)處理的一致性問題[4]、在數(shù)據(jù)從源數(shù)據(jù)庫Oracle同步到MPP數(shù)據(jù)庫的時效問題以及在界面展示時數(shù)據(jù)的主動推送問題。對比統(tǒng)計分析系統(tǒng)是以T+1[5]的模式對數(shù)據(jù)進行分析處理,無法實時查看最新保單數(shù)據(jù)信息,該系統(tǒng)極大地縮短了數(shù)據(jù)處理時間,實現(xiàn)了保險數(shù)據(jù)基本指標(biāo)T+0模式下的數(shù)據(jù)實時同步和計算[6]。
本文主要采用分布式流計算實現(xiàn)個險渠道大寬表保單數(shù)據(jù)實時計算、多表關(guān)聯(lián)和分布式處理。通過采用分布式流計算技術(shù),應(yīng)用Kafka和Spark Streaming[7]實現(xiàn)分布式并行處理,提升系統(tǒng)計算性能和高可用性[8];應(yīng)用MPP數(shù)據(jù)庫,提高了對海量數(shù)據(jù)的分析處理[9]和并行數(shù)據(jù)計算能力,讓業(yè)務(wù)人員能即時知曉最新保單和匯總數(shù)據(jù)的變化情況。
圖1詳細展示了保險大寬表系統(tǒng)設(shè)計框架。
圖1 保險大寬表系統(tǒng)設(shè)計框架
保險大寬表系統(tǒng)通過采用OGG(oracle golden gate)源端去抽取和投遞保單變化的所有數(shù)據(jù)信息。OGG-BD(OGG-bigdata)端會通過復(fù)制進程把發(fā)送過來的Trail file數(shù)據(jù)存入到分布式消息隊列Kafka中。傳統(tǒng)保險系統(tǒng)數(shù)據(jù)庫應(yīng)用以O(shè)racle為主,在進行數(shù)據(jù)采集和遷移過程中,對數(shù)據(jù)業(yè)務(wù)的處理有很高的時效性要求。OGG是一種成熟的數(shù)據(jù)遷移產(chǎn)品,可以在異構(gòu)的基礎(chǔ)上實現(xiàn)大量數(shù)據(jù)的秒級數(shù)據(jù)采集、轉(zhuǎn)換和投遞。通過解析源數(shù)據(jù)庫在線日志或歸檔日志獲得數(shù)據(jù)的增、刪、改變化,再將這些變化應(yīng)用到目標(biāo)數(shù)據(jù)庫,實現(xiàn)源數(shù)據(jù)庫與目標(biāo)數(shù)據(jù)庫的同步和遷移。
本文針對海量、多源、處理效率要求高的保險業(yè)務(wù)數(shù)據(jù)特點設(shè)計了基于Kafka和Spark Stream?ing的分布式并行處理方案。Kafka是具有高吞吐量的分布式消息訂閱和發(fā)布系統(tǒng)[10]。主要由Pro?ducer(生產(chǎn)者)、Broker(代理)和Consumer(消費者)三大部分構(gòu)成[11]。其中生產(chǎn)者負責(zé)將收集到的數(shù)據(jù)推送到代理,而代理負責(zé)接收這些數(shù)據(jù)信息,并將這些數(shù)據(jù)本地持久化。消費者則直接對這些數(shù)據(jù)進行處理[12]。
在技術(shù)上,本文應(yīng)用SharePlex監(jiān)控Oracle日志文件,實時獲取數(shù)據(jù)庫的操作消息,獲取數(shù)據(jù)庫系統(tǒng)中增、刪、改的數(shù)據(jù)。同時應(yīng)用Kafka記錄監(jiān)控的數(shù)據(jù)信息,Kafka進行消息隊列分發(fā),將數(shù)據(jù)推送到下方進行處理加工操作。
Spark Streaming主要是用來抽取數(shù)據(jù),對數(shù)據(jù)進行并行計算處理。
Spark Streaming是基于Spark上用于處理實時計算業(yè)務(wù)的框架,其實現(xiàn)是把輸入的流數(shù)據(jù)進行切分,切分的數(shù)據(jù)塊用批處理方式進行并行計算處理。
基本的分布式流處理框架包括數(shù)據(jù)接入層、消息緩存層、流處理業(yè)務(wù)層和集群服務(wù)[13]。Kafka主要作用于消息緩存層,Spark Streaming主要作用于流處理業(yè)務(wù)層。如圖2所示。
圖2 流數(shù)據(jù)處理基礎(chǔ)框架
從圖2可以看出,存儲過來的數(shù)據(jù)指將數(shù)據(jù)源加載到消息隊列的過程。對于Kafka來說,是生產(chǎn)者將數(shù)據(jù)載入消息隊列,并解耦數(shù)據(jù)的生產(chǎn)方和使用方,對數(shù)據(jù)進行消息緩存,重建分布式查詢系統(tǒng),提供增量數(shù)據(jù)加載接口模式。通過這種情況可以分布到多個節(jié)點上,不同的數(shù)據(jù)節(jié)點,將其對應(yīng)的數(shù)據(jù)源以消息的形式發(fā)送到對應(yīng)的節(jié)點上。流處理業(yè)務(wù)層負責(zé)消費消息隊列中的數(shù)據(jù),對這些數(shù)據(jù)進行分析處理并得到相應(yīng)的結(jié)果,解決在數(shù)據(jù)載入過程中數(shù)據(jù)庫中可能存在的性能劣勢問題[14]。
Spring Boot主要是用于多層架構(gòu)體系的模型業(yè)務(wù)層,具有降低多層模塊間的耦合性、分層模塊化架構(gòu)應(yīng)用業(yè)務(wù)系統(tǒng)的優(yōu)勢[15]。
保險大寬表系統(tǒng)通過Spring Boot和Spark Streaming實時消費Kafka中的數(shù)據(jù),對數(shù)據(jù)做質(zhì)控、關(guān)聯(lián)的操作,最終形成大寬表的數(shù)據(jù)。Spark Streaming從Kafka消息隊列中按照時間窗口不斷提取數(shù)據(jù),然后進行批處理[16],其主要是對保險大寬表系統(tǒng)里的保單數(shù)據(jù)進行統(tǒng)計分析。然后對處理的數(shù)據(jù)結(jié)果進行存儲,以保單號和時間字符串為key進行存儲,主要存儲到Redis中,如圖3所示。本系統(tǒng)中數(shù)據(jù)質(zhì)控分析中的一些字典信息是通過Redis內(nèi)存緩存來提高查詢分析的效率。在關(guān)聯(lián)過程中,首先判斷已有的Redis里是否已經(jīng)存在這些數(shù)據(jù)信息,如果存在則直接調(diào)用,如果不存在,通過查找數(shù)據(jù)庫里面的內(nèi)容并將其存入Redis且設(shè)置好TTL,方便下次可以直接在Redis中調(diào)用,而不需要再去數(shù)據(jù)庫中進行查找,提高了查詢效率。
圖3 Redis存儲數(shù)據(jù)
保險大寬表系統(tǒng)數(shù)據(jù)存儲采用MPP數(shù)據(jù)庫,其擁有海量的計算能力、容錯能力及優(yōu)秀的擴展性。不僅如此,它還可以讓所有數(shù)據(jù)分布到每個節(jié)點上,使得每個節(jié)點去計算自己的部分數(shù)據(jù),達到并行處理無需人工干預(yù)的目的,并可以通過增加通用硬件去擴充新的計算節(jié)點。
針對普通的節(jié)點,可以將一份數(shù)據(jù)分布在多個節(jié)點上,由此避免由于單個節(jié)點出現(xiàn)故障而導(dǎo)致數(shù)據(jù)丟失和數(shù)據(jù)不可用現(xiàn)象的發(fā)生,對于管理節(jié)點而言,一般也采用高可用設(shè)計,避免單點故障,提高了對數(shù)據(jù)查詢和分析的效率。
將數(shù)據(jù)實時發(fā)送給系統(tǒng)前端,保險大寬表系統(tǒng)使用WebSocket協(xié)議作為服務(wù)化接口。Web?Socket作為一個獨立在TCP上的協(xié)議,本質(zhì)是基于TCP為客戶端和服務(wù)器提供一種socket通信連接,使得客戶端和服務(wù)器端實現(xiàn)雙向通信[17]。對比傳統(tǒng)實時數(shù)據(jù)更新方案,WebSocket[18]可極大地減少網(wǎng)絡(luò)流量與延遲[19]。
保險大寬表系統(tǒng)中所有數(shù)據(jù)采用push方式進行推送,服務(wù)器接收到數(shù)據(jù)后會立即發(fā)送到客戶端上??蛻舳撕头?wù)器之間通過進程創(chuàng)建基于WebSocket技術(shù)的通信連接,系統(tǒng)就能利用服務(wù)器的推送功能實現(xiàn)對保險數(shù)據(jù)指標(biāo)的實時查看。
在采用分布式技術(shù)提高數(shù)據(jù)計算和系統(tǒng)本身的性能時,需要考慮到數(shù)據(jù)的一致性問題,比如同一份保單數(shù)據(jù)在很短的時間內(nèi)先后發(fā)生兩次變更,兩次變更后的數(shù)據(jù)分別是U1和U2。在分布式環(huán)境下,U1和U2可能被隨機分配到不同機器的不同線程上執(zhí)行,執(zhí)行的順序可能會變成先U2后U1,這樣數(shù)據(jù)的一致性無法得到保證,導(dǎo)致目標(biāo)數(shù)據(jù)庫的數(shù)據(jù)發(fā)生錯誤。
基于以上問題,保險大寬表系統(tǒng)利用Kafka同一個Topic的同一個partition順序消費且同一個Topic的同一個partition只能被同一個線程消費數(shù)據(jù)的特點。處理程序時,首先把OGG-Bigdata發(fā)送到Kafka的數(shù)據(jù)根據(jù)數(shù)據(jù)特征分類到同一張保單上,再根據(jù)保單編號為key通過shuffle動作發(fā)布到另外一個Topic,這樣就保證了同一個保單編號的數(shù)據(jù)肯定會分配到同一個partition,處理程序處理shuffle后的Topic,從而保證了數(shù)據(jù)的一致性。如圖4所示。
圖4 數(shù)據(jù)一致性處理過程
在保險大寬表系統(tǒng)中,通過利用Kafka實時收集數(shù)據(jù)并傳到Spark Streaming可進行實時分析,Spark Streaming可以將接收到的數(shù)據(jù)匯總成多個小數(shù)據(jù)集,Spark Streaming每次處理的是一個時間窗口的數(shù)據(jù)流,實質(zhì)上是對這些數(shù)據(jù)進行批量的實時處理[20],如圖5所示。這一特點可以很好的避免并發(fā)數(shù)據(jù)處理中頻繁的任務(wù)分配和調(diào)度問題,能達到次秒級延時的實時處理。
圖5 數(shù)據(jù)實時處理
當(dāng)數(shù)據(jù)從中轉(zhuǎn)數(shù)據(jù)庫導(dǎo)入到MPP數(shù)據(jù)庫中時,關(guān)鍵部分是要保證數(shù)據(jù)同步。MPP數(shù)據(jù)庫具備高性能、高可用和高擴展特性,但其對數(shù)據(jù)的增刪操作比較差,在并發(fā)量大的情況下,性能會更低。因為每天保單系統(tǒng)上都會有大量的變化數(shù)據(jù)產(chǎn)生,從源數(shù)據(jù)庫中采集而得到的數(shù)據(jù)直接導(dǎo)出文本在進行導(dǎo)入會占用很大的空間,耗時長,導(dǎo)致數(shù)據(jù)時效性差,不能達到數(shù)據(jù)實時同步。
保險大寬表系統(tǒng)通過利用Batch(批處理)操作,先定期刪除發(fā)生變化的數(shù)據(jù),把這些變化的數(shù)據(jù)導(dǎo)出為CSV格式文件,通過load操作解決數(shù)據(jù)在增加和更改過程中效率慢的問題,實現(xiàn)數(shù)據(jù)的近實時變化。
本研究通過分析目前保險系統(tǒng)中業(yè)務(wù)人員不能實時查看當(dāng)前保單系統(tǒng)的保單信息和各個指標(biāo)匯總數(shù)據(jù),給出了一種基于分布式流計算實現(xiàn)一個流式大寬表數(shù)據(jù)系統(tǒng),可以實時采集保單系統(tǒng)各項數(shù)據(jù)的變化。通過將Kafka和Spark Streaming結(jié)合的方式實時關(guān)聯(lián)多表數(shù)據(jù),處理數(shù)據(jù)增刪改變化,最后存儲到MPP數(shù)據(jù)庫中,能夠讓保險業(yè)務(wù)人員實時查看最新保單數(shù)據(jù)信息及各個保單指標(biāo)匯總信息。