呂 悅
(公安部第一研究所,北京 100048)
隨著中共中央、國務院印發(fā)了《數(shù)字中國建設整體布局規(guī)劃》[1],以及國家數(shù)據(jù)局的成立,我國對數(shù)據(jù)治理和利用越來越重視?,F(xiàn)階段,數(shù)據(jù)已經(jīng)成為除土地、勞動力、資本、技術以外的又一項關鍵生產(chǎn)要素,數(shù)據(jù)的戰(zhàn)略價值越來越重要。當前各類企業(yè)中,隨著互聯(lián)網(wǎng)技術的發(fā)展,各信息系統(tǒng)中數(shù)據(jù)量呈現(xiàn)指數(shù)級增長。如何充分釋放海量數(shù)據(jù)的價值,進行數(shù)據(jù)資源的整合利用,逐漸成為各數(shù)據(jù)擁有者關注的重點。
在傳統(tǒng)企業(yè)中,各類業(yè)務需求冗雜,內部信息系統(tǒng)獨立而分散,缺失統(tǒng)一的異構數(shù)據(jù)集成平臺。傳統(tǒng)數(shù)據(jù)開發(fā)模式存在開發(fā)效率低下、相同開發(fā)流程重復建設、各流程需要不同的人工干預等問題[2],須建立一套具備支持異構系統(tǒng)、不同數(shù)據(jù)開發(fā)流程的企業(yè)級數(shù)據(jù)集成平臺。本文基于消息隊列Kafka和開源數(shù)據(jù)同步工具DataX,設計并實現(xiàn)了異構數(shù)據(jù)源之間、可配置開發(fā)流程的企業(yè)級異構數(shù)據(jù)集成平臺。
傳統(tǒng)的數(shù)據(jù)開發(fā)存在以下問題:
1) 各信息系統(tǒng)數(shù)據(jù)源復雜。數(shù)據(jù)類型可分為結構化和非結構化。結構化的數(shù)據(jù)包括MySQL、Oracle等關系型數(shù)據(jù)庫、Excel 文件、xml 文件、JSON 文件等。非結構化數(shù)據(jù)包括視頻、音頻、圖片等。
2) 各項業(yè)務數(shù)據(jù)的集成過程中,存在不同的開發(fā)流程,需單獨配置。由于不同的數(shù)據(jù)存在不同的網(wǎng)絡結構中,如各業(yè)務專網(wǎng)、內外網(wǎng)、DMZ區(qū)等。在數(shù)據(jù)的傳輸過程中,需要額外的配置。同時,不同數(shù)據(jù)的清洗邏輯存在業(yè)務耦合性,需要專業(yè)人員進行邏輯處理。
3) 由于企業(yè)內部各項數(shù)據(jù)種類繁雜,數(shù)據(jù)集成任務多樣,需建立任務監(jiān)控告警流程,對異常任務信息及時告知相關數(shù)據(jù)開發(fā)人員和業(yè)務負責人員。同時,對異常任務記錄并分析出錯原因,建立常見問題庫。
因此,急需建立異構數(shù)據(jù)源、數(shù)據(jù)開發(fā)流程可配置、數(shù)據(jù)任務可監(jiān)控的異構數(shù)據(jù)集成平臺。該平臺應設置備份機制,提供穩(wěn)定的數(shù)據(jù)集成服務,同時采取訪問控制、傳輸加密等安全方案,保障安全性,并支持動態(tài)功能擴展,滿足各類企業(yè)級業(yè)務需求。
通過對現(xiàn)有數(shù)據(jù)集成工具進行調研,發(fā)現(xiàn)常用開源工具包括DataX[3]、Kettle、Apache Sqoop[4]等。
DataX 是阿里巴巴開源的數(shù)據(jù)同步工具,可實現(xiàn)多種數(shù)據(jù)源之間的數(shù)據(jù)同步,如MySQL、Oracle、HDFS、HBase、Hive、Kafka、ES等各種不同類型的數(shù)據(jù)源。DataX 采用了Framework+Plugin 的架構設計,是一種可插拔式的架構設計方式。針對每種類型的數(shù)據(jù)源,DataX 實現(xiàn)讀插件Reader 和寫插件Writer。在數(shù)據(jù)同步時,對各種數(shù)據(jù)源讀寫插件的兩兩組合就可以實現(xiàn)相互同步,架構設計如圖1所示[5]。按照DataX的規(guī)范,將Reader 與Writer 的配置編寫成JSON 腳本文件,然后利用DataX命令就可以實現(xiàn)數(shù)據(jù)同步操作。在進行數(shù)據(jù)同步時,Reader負責將源數(shù)據(jù)表中的數(shù)據(jù)按照用戶的配置以規(guī)定的格式讀取到內存中,然后Writer將內存中的數(shù)據(jù)按照用戶的配置寫入到目的數(shù)據(jù)表中,并可設置Transformer 轉換自定義業(yè)務邏輯。此外,DataX可單獨部署,只需提供JDK與Python環(huán)境即可,并不依賴于其他環(huán)境。
圖1 DataX架構
Kettle 是一款純Java 編寫的開源ETL 工具,可實現(xiàn)對不同數(shù)據(jù)源之間的讀取、轉換以及寫入等操作。Kettle 可對不同數(shù)據(jù)源之間進行數(shù)據(jù)讀取、轉換和寫入,如關系型數(shù)據(jù)庫、非關系型數(shù)據(jù)庫HDFS、HTTP接口、中間件Kafka 等。Kettle 提供可視化的界面,通過對已封裝組件拖拉拽的方式,編排數(shù)據(jù)各個處理流程,實現(xiàn)復雜業(yè)務邏輯的數(shù)據(jù)操作[6]。但是,Kettle 工具本身相比其他工具不夠輕量級,并且編排任務的操作相對復雜,學習成本較高。
Sqoop 是Apache 軟件基金會下的一款開源數(shù)據(jù)同步工具,是命令行界面的應用程序,支持在各種關系型數(shù)據(jù)庫與大數(shù)據(jù)集群之間完成數(shù)據(jù)同步操作,如MySQL和Hive之間進行數(shù)據(jù)同步。Sqoop的實現(xiàn)原理是將用戶輸入的命令轉換成MapReduce任務,然后執(zhí)行該MapReduce任務即可完成數(shù)據(jù)同步操作。因此,Sqoop 需要依賴Hadoop 環(huán)境才能正常運行。而一套大數(shù)據(jù)hadoop 環(huán)境的搭建、運維、性能調優(yōu)等工作成本較高。
由于Sqoop 依賴于大數(shù)據(jù)環(huán)境和Kettle 的學習,成本較高、不夠輕量級,本文選擇DataX作為底層數(shù)據(jù)集成工具。雖然Datax每次執(zhí)行數(shù)據(jù)集成任務只能完成單次數(shù)據(jù)同步,然而將其納入Quartz[7]定時管理,不但實現(xiàn)了數(shù)據(jù)同步的自主進行,還解決了數(shù)據(jù)增量時的同步問題。
另外,為了彌補DataX的延時性,可通過消息隊列實時接收各種數(shù)據(jù)集成通知,并根據(jù)通知內容,觸發(fā)對應數(shù)據(jù)集成任務。消息隊列是一種應用程序之間的通信方案。應用程序通過讀寫出入隊列的消息來通信,而無須專用連接。其中,Kafka是基于發(fā)布/訂閱模式的分布式消息隊列,是應用最廣泛的消息中間件之一[8]。由于其分布式、易于擴展的特點,相比于傳統(tǒng)的消息系統(tǒng)有著巨大優(yōu)勢。其內部支持多訂閱模式,自動平衡消費者與生產(chǎn)者,為發(fā)布和訂閱提供高吞吐量,同時提供消息的持久化選項。另外,在接收數(shù)據(jù)集成通知的數(shù)據(jù)量較多時,Kafka 還可對數(shù)據(jù)進行流量削峰,降低服務器壓力。
因此,結合以上需求分析和技術調研,為了向用戶提供便捷統(tǒng)一數(shù)據(jù)集成服務,采用微服務Spring Boot 框架[9]和Spring Cloud[10]框架,應用開源數(shù)據(jù)開發(fā)工具DataX 和消息隊列Kafka,實現(xiàn)具備高性能、大吞吐量、高可擴展、動態(tài)發(fā)布等特性的異構數(shù)據(jù)集成平臺。
如圖2所示,異構數(shù)據(jù)集成平臺的功能,共分為六大模塊,分別是數(shù)據(jù)源管理、集成任務設計、任務調度、告警管理、用戶管理和數(shù)據(jù)大屏。
圖2 異構數(shù)據(jù)集成平臺功能設計
數(shù)據(jù)源管理,主要用于對不同數(shù)據(jù)源信息的錄入、更新、查詢、刪除等管理功能。根據(jù)數(shù)據(jù)類型,將數(shù)據(jù)源分為結構化和非結構化。其中,對于結構化數(shù)據(jù),常見的數(shù)據(jù)源包括傳統(tǒng)關系型數(shù)據(jù)庫,如,MySQL、Oracle、SQL Server 等。同時,HTTP 接口的形式更是數(shù)據(jù)共享的常用手段。對于非結構化數(shù)據(jù),常見的數(shù)據(jù)源為大數(shù)據(jù)集群、各種小眾數(shù)據(jù)庫、圖片、視頻、音頻等。在用戶錄入數(shù)據(jù)源時,針對關系型數(shù)據(jù)庫、大數(shù)據(jù)集群、HTTP 接口等方式,通過JDBC 連接、發(fā)送HTTP請求等方式,增加測試數(shù)據(jù)源聯(lián)通功能,對用戶填寫的url連接進行驗證,保證數(shù)據(jù)源的準確性。
集成任務設計,主要用于對不同的集成任務流程進行設計。通過對數(shù)據(jù)流轉過程的研究,數(shù)據(jù)集成任務設計為四大階段,即,數(shù)據(jù)抽取、數(shù)據(jù)清洗、數(shù)據(jù)轉換、數(shù)據(jù)加載。在數(shù)據(jù)抽取階段中,可選擇所需數(shù)據(jù)源中對應數(shù)據(jù)。對于關系型數(shù)據(jù)庫和大數(shù)據(jù)集群,可選擇所需數(shù)據(jù)表和對應字段內容。對于HTTP 接口,可增加不同查詢參數(shù),獲取模板JSON數(shù)據(jù)內容,并對模板JSON 數(shù)據(jù)進行解析,選擇所需字段內容。在數(shù)據(jù)清洗階段中,主要對抽取的數(shù)據(jù)進行簡單的邏輯清洗,如,增加數(shù)值范圍判斷、規(guī)范數(shù)據(jù)內容、補全缺失值等。在數(shù)據(jù)轉換階段中,主要對規(guī)范的數(shù)據(jù)進行復雜業(yè)務邏輯處理,增加了數(shù)據(jù)集成任務的靈活性。在數(shù)據(jù)加載階段中,主要對已完成業(yè)務邏輯處理的數(shù)據(jù)加載到目標數(shù)據(jù)源中,須確認處理完成的數(shù)據(jù)內容,并選擇目標數(shù)據(jù)源中對應表信息、字段信息。經(jīng)過以上四個階段后,標準的數(shù)據(jù)集成任務已被建立,可滿足大多數(shù)據(jù)集成任務需求。
任務調度,主要用于對各種數(shù)據(jù)集成任務設定調度計劃。數(shù)據(jù)集成任務的調度可分為定時任務和實時任務。對于定時的數(shù)據(jù)集成任務,通常根據(jù)用戶的業(yè)務需要,設定為每天、每小時、每時、每分等,編排對任務的調度計劃。對于有實時需求的數(shù)據(jù)集成任務,采用對消息隊列Kafka的集成,接收實時數(shù)據(jù)通知,選擇數(shù)據(jù)集成任務進行調度。
告警管理,主要用于對任務執(zhí)行情況進行監(jiān)控,并對異常任務發(fā)送告警內容。每次數(shù)據(jù)集成任務的執(zhí)行日志都可通過此模塊查看。用戶可對已開啟調度的數(shù)據(jù)集成任務設定告警配置,包括告警人、聯(lián)系方式、告警方式、異常任務信息、異常原因等。由于企業(yè)內部的網(wǎng)絡結構復雜,因此,告警方式包括郵箱通知、WebHook 告警、自定義告警等。自定義告警可根據(jù)企業(yè)網(wǎng)絡結構的設置,定制化集成告警功能包,解決網(wǎng)絡隔離的問題,進行告警內容的傳送。
用戶管理,主要用于對各類用戶的賬號、密碼、權限等進行配置。用戶包括管理員、平臺運維和普通用戶三種。普通用戶可使用數(shù)據(jù)源管理、集成任務設計、任務調度、任務告警等功能,進行各類數(shù)據(jù)的集成。由于不同的用戶屬于不同部門,各個部門下的各類數(shù)據(jù)源、數(shù)據(jù)集成任務、告警配置等信息,應只被當前部門下員工獲取和操作,所以,應對不同種類數(shù)據(jù)增加權限控制功能,防止不同部門下的用戶越權訪問和越權操作的行為。針對平臺運維,可檢查各類任務調度計劃和告警配置,修正不合理任務配置并通過平臺的告警配置通知到對應任務負責人。平臺運維還需定期清除各個模塊的無用數(shù)據(jù),保證平臺運行的準確性和穩(wěn)定性。
數(shù)據(jù)大屏,主要用于對異構數(shù)據(jù)集成平臺的整體展示,包括集成任務的執(zhí)行情況、任務告警情況、任務執(zhí)行成功率、任務集成配置、任務告警配置等信息。對于集成任務的執(zhí)行情況,采用條形圖,展示最近一周任務執(zhí)行成功和失敗的數(shù)量。對于任務告警情況,采用折線圖,展示最近一周內任務的告警總數(shù)。對于任務執(zhí)行成功率,采用儀表盤的形式展示。對于任務集成配置和告警配置,采用表格形式展示,并增加自動向上滾動的效果,展示多條數(shù)據(jù)信息。
在系統(tǒng)的實現(xiàn)上,采用SpringBoot 框架和微服務架構作為后端開發(fā)框架,采用vue作為前端開發(fā)框架,采用MySQL數(shù)據(jù)庫,對數(shù)據(jù)源管理、集成任務設計、任務調度、告警管理、用戶管理和數(shù)據(jù)大屏模塊,設計數(shù)據(jù)庫表,基于MySQL 保存各模塊數(shù)據(jù),采用Kafka 消息隊列,用于實時消息推送和訂閱。平臺部署采用Nginx 負載均衡,將用戶請求轉發(fā)到多臺應用服務器上,各臺服務器的運行環(huán)境為Tomcat8 和JDK1.8。整體架構如圖3所示。
對于數(shù)據(jù)源管理模塊,需要完成數(shù)據(jù)源錄入和檢測聯(lián)通的功能。通過JDBC 連接的方式,探測連接情況。JDBC連接是在Java程序與數(shù)據(jù)庫系統(tǒng)之間建立了一條通信的渠道,并為各種數(shù)據(jù)庫提供了連接數(shù)據(jù)庫的規(guī)范。JDBC 應用步驟如圖4 所示。平臺中實現(xiàn)JDBC 連接基類,提供JDBC 驅動、數(shù)據(jù)庫連接等抽象方法,對MySQL、Oracle、SQL Server 等常用數(shù)據(jù)庫實現(xiàn)連接子類,可直接通過url、賬戶名、密碼等配置獲取數(shù)據(jù)庫內容,讀取對應數(shù)據(jù)表結構和存儲內容。當存在小眾數(shù)據(jù)庫連接需求時,可實現(xiàn)基類,自定義數(shù)據(jù)庫連接、數(shù)據(jù)庫驅動和讀取數(shù)據(jù)方式等。
圖4 JDBC連接流程
對于集成任務設計模塊,用戶可自定義編排數(shù)據(jù)集成任務,步驟包括選擇數(shù)據(jù)源、選擇源數(shù)據(jù)表和字段、選擇目的數(shù)據(jù)源、選擇目的數(shù)據(jù)表和字段、增加轉換邏輯等,自動生成調用datax 任務所需的JSON 文件,并存至數(shù)據(jù)庫。平臺以依賴的形式引入datacommon 和data-core 包,集成DataX 執(zhí)行器,用于執(zhí)行DataX任務。并在需要調用數(shù)據(jù)集成任務時,執(zhí)行Engine.entry(params),其中params 用于配置必要參數(shù),包括,任務job、任務模式mode、任務編號jobID 等。另外,也可以增加動態(tài)參數(shù),設定任務執(zhí)行中用戶自定義的參數(shù)內容和轉換邏輯;通過Communication 類,收集每次job運行之后的日志信息,并存儲到日志表中,另外,對于異常運行的任務,應立即執(zhí)行對應告警配置,通知到運維人員和任務開發(fā)人員。整體實現(xiàn)如圖5所示。
圖5 集成任務技術實現(xiàn)
對于任務調度模塊,用戶可對設計完畢的數(shù)據(jù)集成任務設定調度策略。任務調度分為定時和實時兩種策略。兩種任務均繼承任務執(zhí)行類TaskRunner。定時調度,是以cron 表達式的方式,配置定時執(zhí)行方法,并通過Hutool中的定時CronUtil工具包,讀取解析cron表達式,設置定時任務執(zhí)行方式和具體任務執(zhí)行函數(shù)。實時調度,是以集成消息隊列Kafka的形式,由業(yè)務人員作為Kafka 的生產(chǎn)者端發(fā)送通知,統(tǒng)一數(shù)據(jù)集成平臺作為Kafka 中的消費者端接收實時通知,執(zhí)行相關數(shù)據(jù)集成任務。在生產(chǎn)者端,注入KafkaTemplate 模板,調用send 函數(shù),發(fā)送調度命令消息到指定主題,主題名即為任務名;在消費者端使用注解@KafkaListener,監(jiān)聽指定主題的消息,即通過任務名和調度命令消息,之后調用指定數(shù)據(jù)集成任務的執(zhí)行函數(shù)。整體技術實現(xiàn)如圖6所示。
圖6 任務調度技術實現(xiàn)
對于告警管理模塊,一旦出現(xiàn)異常任務執(zhí)行,通常采用郵箱告警方式通知運維人員和開發(fā)人員。郵箱告警實現(xiàn)方式為,在項目中引入javax.mail 郵件依賴,通過MimeMessageHelper 類設置郵件發(fā)件人、收件人、郵件主題、發(fā)送內容等。對于用戶管理模塊,通過Spring Security + OAuth 實現(xiàn)安全認證,設計用戶權限表、系統(tǒng)權限表,記錄各用戶對各模塊數(shù)據(jù)的訪問權限。同時,針對各個模塊的數(shù)據(jù)表,增加創(chuàng)建人字段,記錄數(shù)據(jù)的所屬情況。對于數(shù)據(jù)大屏,用于對統(tǒng)一數(shù)據(jù)集成平臺的整體展示,采用Echarts庫,利用折線圖、Sanky 圖、柱狀圖、散點圖,可視化展示整體數(shù)據(jù)集成情況,包括集成任務基本信息、執(zhí)行成功率、任務告警情況等。
數(shù)據(jù)的重要性已經(jīng)不言而喻。傳統(tǒng)企業(yè)發(fā)揮數(shù)據(jù)價值的第一步就是將來自不同數(shù)據(jù)源、不同業(yè)務領域、不同信息系統(tǒng)的數(shù)據(jù)集成匯聚。而傳統(tǒng)企業(yè)的數(shù)據(jù)開發(fā)模式效率低下,流程煩瑣。同時,不同企業(yè)內部的業(yè)務需求、網(wǎng)絡結構、開發(fā)習慣等不同,需要定制化數(shù)據(jù)集成任務。因此,本文通過開源數(shù)據(jù)開發(fā)工具DataX 和消息隊列Kafka,基于SpringBoot 框架和微服務架構,設計并實現(xiàn)了異構數(shù)據(jù)集成平臺,可靈活配置各類數(shù)據(jù)集成任務,簡化數(shù)據(jù)開發(fā)流程,顯著降低數(shù)據(jù)集成成本,提高人員效率。但本系統(tǒng)仍存在一定不足,例如,對非結構化的圖片、視頻等數(shù)據(jù)的支持度不夠,對大數(shù)據(jù)任務集成的設計略少。下一步可將非結構化數(shù)據(jù)識別、大數(shù)據(jù)任務的構建作為研究的重點。