張敬環(huán),張志東,劉 鑫
(山西工程技術(shù)學(xué)院,山西 陽(yáng)泉 045097)
隨著經(jīng)濟(jì)的發(fā)展和生活水平的提高,空氣質(zhì)量受到人們前所未有的關(guān)注。為了實(shí)時(shí)監(jiān)測(cè)我校的空氣質(zhì)量,通過(guò)部署的傳感器每隔五分鐘傳輸一次空氣中各個(gè)污染物的含量數(shù)據(jù)至精簡(jiǎn)云儲(chǔ)存,其中主要包括溫度、濕度、PM2.5、PM10、NO2、O3、SO2、CO。通過(guò)在Python中引入Selenium庫(kù)虛擬登錄的方式從精簡(jiǎn)云平臺(tái)爬取這些數(shù)據(jù),并將數(shù)據(jù)存入MySQL數(shù)據(jù)庫(kù)中,然后利用PySpark連接數(shù)據(jù)庫(kù)使用SparkSQL對(duì)總數(shù)據(jù)進(jìn)行分析及拆分,最后用ECHarts將分析后的數(shù)據(jù)進(jìn)行可視化,為我校師生提供全面、準(zhǔn)確、及時(shí)的出行建議與預(yù)警。系統(tǒng)設(shè)計(jì)流程圖如圖1所示。
圖1 系統(tǒng)設(shè)計(jì)流程
首先,在我校選取測(cè)試點(diǎn)部署無(wú)線傳感器并接入精訊云平臺(tái)http://www.sennor.net。精訊云是一款通用性的物聯(lián)網(wǎng)云平臺(tái),部署在騰訊云服務(wù)器上,可以實(shí)現(xiàn)全球任意地方都快速訪問(wèn)與接入[1]。在Python中使用Selenium庫(kù)模擬人為使用瀏覽器的行為,訪問(wèn)與傳感器關(guān)聯(lián)的精訊云賬戶(hù),再通過(guò)模擬點(diǎn)擊的方式在云平臺(tái)中找到所需的數(shù)據(jù)進(jìn)行爬取。并將爬取到的數(shù)據(jù)存入Mysql數(shù)據(jù)庫(kù)中的“jilu”表中。
表1 jilu表結(jié)構(gòu)
空氣質(zhì)量監(jiān)測(cè)的傳感器收集的是各個(gè)空氣污染物在空氣中的含量,并不能很直觀客觀地表現(xiàn)出空氣質(zhì)量的好壞,根據(jù)國(guó)家制定的《環(huán)境空氣質(zhì)量標(biāo)準(zhǔn)》(AQI分級(jí)計(jì)算參考標(biāo)準(zhǔn)GB 3095—2012),構(gòu)造AQI計(jì)算函數(shù)計(jì)算出各污染物的AQI并與爬取到的原始數(shù)據(jù)一同存入數(shù)據(jù)庫(kù)中的總數(shù)據(jù)表中,即表1中的字段aqi。與污染物濃度相對(duì)應(yīng)的空氣質(zhì)量分指數(shù)計(jì)算公式見(jiàn)(1)式。
(1)
由于計(jì)算每個(gè)污染物對(duì)應(yīng)的IAQI時(shí)所用到的計(jì)算公式是同一個(gè),所以本系統(tǒng)為簡(jiǎn)化計(jì)算復(fù)雜度故將計(jì)算公式封裝為一個(gè)計(jì)算函數(shù)。主要代碼如下:
def js_iaqi(iaqi_lo, iaqi_hi, bp_lo, bp_hi, cp):
iaqi = (iaqi_hi - iaqi_lo) * (cp - bp_lo) / (bp_hi - bp_lo) + iaqi_lo
#代碼化計(jì)算公式
return iaqi
由于計(jì)算單個(gè)污染物對(duì)應(yīng)的AQI時(shí)所用到的濃度對(duì)應(yīng)標(biāo)準(zhǔn)是不同的,所以對(duì)每個(gè)污染物的計(jì)算IAQI函數(shù)均為單獨(dú)制作并引入相同的計(jì)算公式,無(wú)法同計(jì)算公式一樣封裝至同一個(gè)函數(shù)中。主要代碼如下:
def js_pm10_iaqi_24h(pm10):
if 0 <= pm10 < 50:
pm10_iaqi = js_iaqi(0,50,0,50,pm10) #各濃度對(duì)應(yīng)參數(shù)
elif 50 <= pm10 <100:
pm10_iaqi = js_iaqi(50,100,50,150,pm10)
elif 100 <= pm10 <150:
pm10_iaqi = js_iaqi(100,150,150,250,pm10)
elif 150 <= pm10 <200:
pm10_iaqi = js_iaqi(150,200,250,350,pm10)
elif 200 <= pm10 <300:
pm10_iaqi = js_iaqi(200,300,350,420,pm10)
elif 300 <= pm10 <400:
pm10_iaqi = js_iaqi(300,400,420,500,pm10)
elif 400 <= pm10 <500:
pm10_iaqi = js_iaqi(400,500,500,600,pm10)
elif pm10 < 0:
return -1
else:
pass
return pm10_iaqi #返回該污染物對(duì)應(yīng)IAQI
由于空氣質(zhì)量監(jiān)測(cè)系統(tǒng)的傳感器每五分鐘就會(huì)收集并傳輸一次數(shù)據(jù),數(shù)據(jù)增長(zhǎng)速度非??欤笃跀?shù)據(jù)量會(huì)越來(lái)越大,如果每次都從數(shù)據(jù)庫(kù)中的總數(shù)據(jù)表來(lái)進(jìn)行數(shù)據(jù)分析的話分析效率會(huì)非常低,因此將總的數(shù)量龐大的數(shù)據(jù)進(jìn)行提前拆分,依據(jù)需求分割成不同的表。
1) 時(shí)AQI表(表2:hour_aqi)——該表為數(shù)據(jù)總表里提取出每小時(shí)AQI的最高值,并記錄該時(shí)間與AQI值存入時(shí)AQI表中;
2) 日AQI表(表3:day_aqi)——該表為數(shù)據(jù)總表中提取出每天AQI的最大值,作為當(dāng)天的AQI的值,并記錄下來(lái)存入日AQI表中;
3) 月AQI表(表4:month_aqi)——該表為數(shù)據(jù)總表中提取出每月AQI的最大值,作為該月的AQI的值,并記錄下來(lái)存入月AQI表中。
表2 hour_aqi表結(jié)構(gòu)
表3 day_aqi表結(jié)構(gòu)
表4 month_aqi表結(jié)構(gòu)
將計(jì)算得到的各個(gè)IAQI計(jì)算出最大值作為AQI并返回[2]。主要代碼如下:
def aqi(l):
pm10_iaqi_24h = js_pm10_iaqi_24h(float(l[6]))
pm2_iaqi_24h = js_pm2_iaqi_24h(float(l[5]))
no2_iaqi_24h = js_no2_iaqi_24h(float(l[7]))
so2_iaqi_24h = js_so2_iaqi_24h(float(l[9]))
o3_iaqi_8h = js_o3_iaqi_8h(float(l[8]))
co_iaqi_24h =js_co_iaqi_24h(float(l[10]))
AQI = max(so2_iaqi_24h, no2_iaqi_24h, pm10_iaqi_24h, co_iaqi_24h, o3_iaqi_8h, pm2_iaqi_24h)
return AQI
本系統(tǒng)的數(shù)據(jù)提取是通過(guò)在Python文件中引入os庫(kù)、sys庫(kù)、pyspark庫(kù)來(lái)實(shí)現(xiàn)的。通過(guò)pyspark連接MySQL數(shù)據(jù)庫(kù)中的某個(gè)表[3],建立一個(gè)臨時(shí)表,在這個(gè)臨時(shí)表中通過(guò)sparkSQL來(lái)運(yùn)行編寫(xiě)好的sql語(yǔ)句。主要代碼如下:
df=sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/air?user=root&password=123456789",dbtable="jilu").load()
df.registerTempTable("jilu") #連接jilu表創(chuàng)建臨時(shí)表
df_day=sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/air?user=root&password=123456789",dbtable="day_aqi").load()
df_day.registerTempTable("day_aqi") #連接day_aqi表創(chuàng)建臨時(shí)表
sql_day = "insert into day_aqi select * from (select riqi,shebei ,max(aqi) from jilu where riqi not in (select riqi from day_aqi) group by riqi,shebei) rq"
ls_day = sqlContext.sql(sql_day).collect() #執(zhí)行sql語(yǔ)句
空氣質(zhì)量監(jiān)測(cè)系統(tǒng)可視化采用了flask框架,app.route(‘/’)設(shè)置默認(rèn)路徑所執(zhí)行的函數(shù),本系統(tǒng)中通過(guò)render_template("main.html")將執(zhí)行函數(shù)后的頁(yè)面跳轉(zhuǎn)至本系統(tǒng)的主頁(yè)。利用Ajax技術(shù)設(shè)置了可視化頁(yè)面的異步更新,以保證數(shù)據(jù)的時(shí)效性和準(zhǔn)確性。運(yùn)用ECHarts技術(shù)繪制各種可視化圖表,方便進(jìn)行各種數(shù)據(jù)的可視化顯示??梢暬笃寥鐖D2所示。
圖2 空氣質(zhì)量監(jiān)測(cè)可視化大屏
本系統(tǒng)綜合運(yùn)用了網(wǎng)絡(luò)爬蟲(chóng)、MySQL、Spark、Ajax、ECharts等技術(shù)[4],將山西工程技術(shù)學(xué)院的空氣質(zhì)量狀況直觀地展示給我校師生。實(shí)現(xiàn)了我校實(shí)時(shí)AQI值顯示、24小時(shí)AQI值變化趨勢(shì),每月AQI等級(jí)天數(shù)的統(tǒng)計(jì)、歷年AQI值變化趨勢(shì)等的可視化展示,從而獲得一個(gè)良好的出行建議。還可以通過(guò)觀測(cè)主要污染物的實(shí)時(shí)監(jiān)測(cè)圖,得出影響AQI的主要因素,進(jìn)而采取有針對(duì)性的環(huán)保措施。