馬晟 劉雅倫 陳曉男 沈漪
摘? 要:城市的發(fā)展使得運(yùn)營車輛日益增長,車輛調(diào)度愈發(fā)困難,傳統(tǒng)系統(tǒng)無法滿足現(xiàn)有眾多車輛的監(jiān)控調(diào)度與運(yùn)營。該程序基于大數(shù)據(jù)流處理系統(tǒng),實現(xiàn)了大批量的車輛信息監(jiān)測和實時處理以及車輛的精細(xì)監(jiān)控與軌跡回放??捎糜诰W(wǎng)約車、公交車以及貨運(yùn)集團(tuán)的調(diào)度中心進(jìn)行實時監(jiān)控和訂單把控,以提高車輛調(diào)度的靈活性,達(dá)到最優(yōu)調(diào)度、減少成本的效果。
關(guān)鍵詞:SparkStreaming? 大數(shù)據(jù)? 軌跡回放? 交通
中圖分類號:TP31? ? ? ?文獻(xiàn)標(biāo)識碼:A
Abstract: With the development of the city, the number of operating vehicles is increasing, and the vehicle scheduling is becoming more and more difficult. The traditional system can not meet the monitoring, scheduling and operation of many existing vehicles. Based on the large data stream processing system, the program realizes a large number of vehicle information monitoring and real-time processing, as well as vehicle fine monitoring and track playback. It can be used for real-time monitoring and order control in the dispatching center of online car hailing, buses and freight groups, so as to improve the flexibility of vehicle scheduling, achieve optimal scheduling and reduce costs.
Key Words:SparkStreaming;Bigdata;Track playback;Traffic
隨著城市的發(fā)展,運(yùn)營車輛日益增長,車輛調(diào)度愈發(fā)困難,傳統(tǒng)系統(tǒng)無法滿足現(xiàn)有眾多車輛的監(jiān)控調(diào)度與運(yùn)營。基于大數(shù)據(jù)系統(tǒng)的車輛實時監(jiān)控與調(diào)度需求隨著大數(shù)據(jù)技術(shù)的日趨發(fā)展有了實現(xiàn)的可能。
1? 數(shù)據(jù)處理系統(tǒng)的設(shè)計
該系統(tǒng)實現(xiàn)對海量車輛軌跡數(shù)據(jù)的采集、存儲、實時處理、軌跡回放功能。軌跡數(shù)據(jù)在蓋亞數(shù)據(jù)平臺申請達(dá)到,編程模擬產(chǎn)生實時數(shù)據(jù)流,經(jīng)大數(shù)據(jù)平臺采集處理存入數(shù)據(jù)庫,然后在前臺顯示實時的車輛軌跡[1]。
1.1系統(tǒng)整體架構(gòu)
基于系功能需求,該系統(tǒng)的總體設(shè)計為:先由車輛端上傳坐標(biāo)數(shù)據(jù)(編程模擬產(chǎn)生),flume多源采集,然后寫入kafka的topic,接著通過SparkStreming實時消費(fèi)kafka,再根據(jù)訂單存入redis,最后實現(xiàn)訂單數(shù)據(jù)列表生成以及訂單車輛軌跡回放[2]。它異于傳統(tǒng)數(shù)據(jù)系統(tǒng)的地方是:采用大數(shù)據(jù)流處理框架,具有高吞吐率、高負(fù)載、高可用性、實時性高的優(yōu)點[3]。整個系統(tǒng)的邏輯實現(xiàn)如圖1所示。
1.2數(shù)據(jù)回放模塊設(shè)計
為了模擬真實業(yè)務(wù)場景,該程序基于蓋亞平臺坐標(biāo)數(shù)據(jù)通過數(shù)據(jù)回放模塊模擬數(shù)據(jù)流產(chǎn)生[4]。使用python讀出坐標(biāo)數(shù)據(jù),用多線程并行輸出,從而模擬實際場景中車輛移動匯報的坐標(biāo)打點數(shù)據(jù),達(dá)到采集流數(shù)據(jù)的需求[5]。
核心代碼邏輯如下所示。
#坐標(biāo)數(shù)據(jù)文件寫入
def consumer(queue, writer, csv_file):
while True:
line = queue.get()
deal_line(line, writer, csv_file)
queue.task_done()
#流數(shù)據(jù)文件生成
def producer(queue):
with open(‘test.txt’, ‘r’) as f:
for line in f:
queue.put(line)
queue = JoinableQueue(8)
pc = Process(target=producer, args=(queue,))
for _ in range(cpu_count()):
c1 = Process(target=consumer, args=(queue, writer, csv_file))
#等待生產(chǎn)者進(jìn)程全部生成完畢
pc.join()
#等待所有數(shù)據(jù)全部處理完畢
queue.join()
1.3數(shù)據(jù)采集消費(fèi)模塊設(shè)計
該模塊實現(xiàn)了通過flume采集車輛軌跡流數(shù)據(jù),進(jìn)而推送到消息隊列kafka中。
首先進(jìn)行flume數(shù)據(jù)采集,在采集過程中通過集群形式達(dá)到大數(shù)據(jù)量及多源數(shù)據(jù)采集情況下的負(fù)載均衡及并行采集。設(shè)置flume靜態(tài)攔截器實現(xiàn)在采集到的數(shù)據(jù)的頭數(shù)據(jù)中插入自定義的key-value鍵值對以區(qū)分不同數(shù)據(jù)源,主要配置如下:
a1.sources.r1.interceptors.i1.type = static? ? ? ? ? #設(shè)置靜態(tài)攔截器
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = test_gps_topic#不同的數(shù)據(jù)源取不同的名稱
接著通過kafka集群接收flume采集的大量數(shù)據(jù),以實現(xiàn)數(shù)據(jù)高吞吐率、高可用數(shù)據(jù)傳遞以及數(shù)據(jù)的實時處理,同時通過不同的topic保證不同數(shù)據(jù)流的分區(qū)。
flume監(jiān)聽的文件數(shù)據(jù)發(fā)送到此kafka的主題當(dāng)中,主要配置如下:
a1.sinks.k1.topic = test_gps_topic? ? ? ? ? ?#與前面的靜態(tài)攔截器value值配置相一致
1.4數(shù)據(jù)實時處理模塊設(shè)計
該模塊通過sparkStreaming程序?qū)崿F(xiàn)消費(fèi)kafka中的數(shù)據(jù)存到HBase中,其中的GPS位置經(jīng)緯度信息保存到redis中,存為后續(xù)實時監(jiān)控以及軌跡回放的數(shù)據(jù)源[6]。核心邏輯的Scala代碼如下:
//從kafka里消費(fèi)數(shù)據(jù),把經(jīng)緯度信息存到redis
val result: InputDStream[ConsumerRecord[String, String]] = Tools.getStreamingContextFromHBase
(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")
result.foreachRDD(eachRdd =>{
eachRdd.foreachPartition(eachPartition =>{
val connection: Connection = HBaseUtil.getConnection
val jedis: Jedis = JedisUtil.getJedis
eachPartition.foreach(record =>{
Tools.saveToRedis(connection,jedis,record)
})
1.5軌跡回放模塊設(shè)計
得益于redis內(nèi)存數(shù)據(jù)庫高性能以及可持久化的穩(wěn)定性,該模塊實現(xiàn)回放每個訂單車輛軌跡同時并發(fā)實時讀取到前端,通過高德地圖提供的地圖api接口,訂單號為同一個key的value坐標(biāo)數(shù)據(jù)軌跡點按時間順序呈現(xiàn)在地圖上,從而監(jiān)控每條車輛訂單的車輛軌跡情況[7]。
2? 實驗驗證
2.1 實驗環(huán)境
該次實驗采用了一主二從的CDH集群,機(jī)器配置如圖2所示,集群角色配置如圖3所示。
2.2數(shù)據(jù)集
此次實驗數(shù)據(jù)集來自滴滴蓋亞數(shù)據(jù)平臺的開放數(shù)據(jù),形如表1所示。
首先是數(shù)據(jù)回放模塊的驗證,通過多線程輸出,flume采集源目錄,數(shù)據(jù)如期以多訂單并發(fā)每秒三條的流數(shù)據(jù)形式生成。其次是數(shù)據(jù)實時處理模塊,經(jīng)檢查redis數(shù)據(jù)庫,回放的流數(shù)據(jù)以秒級單位處理寫入到數(shù)據(jù)庫。最后是數(shù)據(jù)回放模塊的驗證,經(jīng)前端程序的讀取,車輛軌跡坐標(biāo)成功呈現(xiàn)在了高德地圖上。
3 結(jié)語
該系統(tǒng)實現(xiàn)了大規(guī)模軌跡數(shù)據(jù)的處理,數(shù)據(jù)的吞吐量、延遲性、精準(zhǔn)度已達(dá)到預(yù)期。程序通過Python模擬車輛軌跡數(shù)據(jù)流的產(chǎn)生,然后通過flume和kafka采集消費(fèi)數(shù)據(jù),sparkStreaming處理數(shù)據(jù)流,完成了模擬現(xiàn)實生活多車輛多數(shù)據(jù)流場景的數(shù)據(jù)產(chǎn)生、處理與軌跡回放。目前程序還停留在雛形階段,未來將在耦合度、靈活度上做出提高。
參考文獻(xiàn)
[1]? 楊小潤.基于深度學(xué)習(xí)的車輛軌跡特征識別與分析[D].南京:南京郵電大學(xué),2020.
[2]? 陸鍵,王可,蔣愚明.基于車輛行駛軌跡的道路不良駕駛行為實時辨識方法[J].交通運(yùn)輸工程學(xué)報,2020,20(6):227-235.
[3] 潘偉博,汪海濤,姜瑛,等.Hadoop集群異常節(jié)點實時檢測與診斷算法[J].陜西理工大學(xué)學(xué)報:自然科學(xué)版,2021,37(4):24-31.
[4]? 鮑裕麟.深度學(xué)習(xí)應(yīng)用場景下的HDFS性能優(yōu)化[D].合肥:中國科學(xué)技術(shù)大學(xué),2021.
[5] 謝楓,婁靜濤,趙凱,等.基于行為識別和曲率約束的車輛軌跡預(yù)測方法研究[J].汽車工程,2019,41(9):1036-1042.
[6] 柯杰.基于SparkStreaming日志實時監(jiān)測系統(tǒng)的設(shè)計與實現(xiàn)[D].南京:東南大學(xué),2017.
[7] 苗莉.大數(shù)據(jù)云計算環(huán)境下的數(shù)據(jù)安全[J].科技資訊,2021,19(2):31-33.