目錄
- 文章大綱
- InfluxDB 簡介
- 能耗趨勢圖分析
- 第一:先要清楚,資料是通過什么規則保存到InfluxDB資料庫
- 第二:解決InfluxDB時區問題
- 第三:GROUP BY time 自然月
- Spring 整合 InfluxDB
- 初始化配置
- 存盤和查詢資料
- 洗掉資料
- 查詢性能優化
文章大綱

InfluxDB 簡介
InfluxDB是GO語言撰寫的分布式時間序列化資料庫,非常適合對資料(跟隨時間變化而變化的資料)的跟蹤、監控和分析,在我們的專案中,主要是用來收集設備實時上傳的值,從而分析該設備值的趨勢圖和各個設備的能耗占比等一系列功能,InfluxDB的功能很強大,檔案也很詳細,可美中不足的是,它的單機性能并不是很理想,因為InfluxDB存盤的資料量本身是非常巨大的,在執行一些時間范圍比較大的sql陳述句,耗時會很長,甚至直接崩潰,而開源的InfluxDB目前已經不再支持集群,若要通過搭建集群提升性能問題,可以考慮企業版,當然,我們寫的程式也有很大的性能優化空間,
能耗趨勢圖分析
需求:統計指定設備、指定區域、指定分項或者指定能耗型別的能耗趨勢圖,如下圖所示,縱坐標是能耗值,橫坐標是時刻(每小時、每天、每周、每月),

分析:獲取某個區間時刻的值,可以用GROUP BY time 進行時間分組,再用聚合函式LAST或者SUM統計,但這個看似很簡單的需求卻暗藏殺機,SQL陳述句如下
SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC
第一:先要清楚,資料是通過什么規則保存到InfluxDB資料庫
為了記錄設備能耗的實時資料,我們會通過訂閱MQTT通道,當值發生變化后存盤到InfluxDB資料庫中,或者在指定時間范圍內沒有變化也會上傳,這樣做的好處可以避免一些冗余資料,同時也埋下了一個坑,
例如:一臺設備在InfluxDB資料庫中最后一次記錄的時間是15分鐘前,但是sql陳述句是從5分鐘前開始統計,這會導致該設備的其點值就是null,簡單來說:設備的存盤的值正好在分組統計的時間范圍外,解決方法有很多:比如用FILL(previous)函式填充;比如使用time(time_interval,offset_interval)進行時間推移等,但是我比較推薦下面的方法:
先獲取指定開始時間之前的最后值(lastValue),然后再根據回傳值是否為null,來決定是否替換或者更新lastValue,偽代碼如下,
## 獲取該設備的最后記錄值
val lastValue = https://www.cnblogs.com/itdragon/p/"SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <='$startTime'"
## 遍歷查詢結果,將currentValue為 null的值替換
"SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC".forEach {
lastValue = https://www.cnblogs.com/itdragon/p/currentValue?: lastValue
result[time] = currentValue?: lastValue
}
你以為這樣就結束了嗎?還不夠,回傳的time格式化后,你會發現有8小時的時區問題,
第二:解決InfluxDB時區問題
InfluxDB 默認以UTC時間存盤并回傳時間戳,查詢回傳的時間戳對應的也是UTC時間,我們需要通過tz()子句指定時區名稱,比如Asia/Shanghai,若InfluxDB安裝在Windows環境上,可能還會出現 error parsing query: unable to find time zone 錯誤,解決方法是安裝GO語言環境,文章也詳細介紹過,
SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC tz('Asia/Shanghai')
實用tz() 子句后,回傳的時間格式:"2019-11-18T13:50:00+08:00",需要通過 "yyyy-MM-dd'T'HH:mm:ss" 將其格式化,
第三:GROUP BY time 自然月
group by time 支持秒、分鐘、小時、天和周,卻唯獨不支持自然月,如果對資料的精準性要求不高,可以考慮使用30d實作,或者分12次統計,或者有更好的方法,請不吝賜教??!
Spring 整合 InfluxDB
初始化配置
整合分三步:導包、配置、初始化連接
compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP
influx.port=8086
influx.username=admin
influx.password=admin
influx.dbname=database
import org.influxdb.InfluxDB
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.influxdb.dto.Query
import org.influxdb.impl.InfluxDBResultMapper
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy
@Component
class InfluxDbConnector {
val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java)
@Value("\${influx.server}")
lateinit var serverUrl: String
@Value("\${influx.port}")
lateinit var serverPort: String
@Value("\${influx.db-name}")
lateinit var dbName: String
@Value("\${influx.user-name}")
lateinit var userName: String
@Value("\${influx.password}")
lateinit var password: String
lateinit var connection: InfluxDB
val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper()
@PostConstruct
fun initConnection() {
val connectionUrl = "$serverUrl:$serverPort"
connection = InfluxDBFactory.connect(connectionUrl, userName, password)
connection.setDatabase(dbName)
connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS)
}
@PreDestroy
fun closeConnection() {
connection.close()
}
fun <T> query(sql: String, type: Class<T>): List<T> {
logger.info("exec influx query: {}", sql)
val result = connection.query(Query(sql, dbName))
return resultMapper.toPOJO(result, type)
}
fun query(sql: String) {
logger.info("exec influx query: {}", sql)
connection.query(Query(sql, dbName))
}
fun save(points: List<Point>) {
points.forEach { connection.write(it) }
}
}
存盤和查詢資料
定義物體
import java.time.Instant;
@Measurement(name = "tableName")
public class StringVariableResultJ {
@Column(name = "currentValue")
public String value;
@Column(name = "time")
public Instant time;
// ......
}
批量保存資料
val points = equipmentEnergies.map {
Point.measurement(TABLE_NAME_EQUIPMENT)
.tag("equipmentId", it.equipmentId)
.tag("locationId", it.locationId)
.tag("subItemInstanceId", it.subItemInstanceId)
.tag("subItemId", it.subItemId)
.tag("projectId", it.projectId)
.time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS)
.addField("currentValue", it.value.toString().toBigDecimalOrNull()).build()
}
influxDbConnector.save(points)
查詢資料
influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }
專案是用kotlin寫的,可是在用InfluxDBResultMapper.toPOJO 時會出現資料轉換例外的問題,若換成Java的物體類就沒有問題,原因目前沒有找到,
洗掉資料
我在官網檔案上并沒有找到洗掉資料的內容,只有修改資料庫存盤策略,但實際上執行delete sql陳述句是生效的??,資料保留策略目的是讓InfluxDB能夠知道哪些資料是可以丟棄的,從而節省空間,更高效的處理資料,默認是不限制,以下是常見的命令,
# 查看庫存盤規則
> SHOW RETENTION POLICIES ON 資料庫名稱;
[out]:
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 720h0m0s 168h0m0s 1 true
# 修改存盤規則
> ALTER RETENTION POLICY autogen ON 資料庫名稱 DURATION 0;
# 設為默認
> ALTER RETENTION POLICY autogen ON 資料庫名稱 DEFAULT;
#創建規則
> CREATE RETENTION POLICY "規則名" ON 資料庫名稱 DURATION 360h REPLICATION 1;
# 洗掉規則
> DROP RETENTION POLICY 規則名 ON 資料庫名稱;
duration 表示在這個時間外的資料將不會被保留,0表示不限制,default 表示是否為默認規則,其它含義沒有深究,
實際場景中,不同表的資料需要保留的時間也不一樣,此時可以考慮用sql陳述句,用程式定時洗掉資料,
influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$時間' ")
查詢性能優化
對于免費版的InfluxDB是不支持集群,并且默認單次查詢結果最大不超過一萬條,考慮到性能問題,一般通道分頁查詢來減輕服務器壓力,但是對于聚合函式的操作,普通的limit 和 offset并不能滿足其需求,我采取的是分時間端查詢,減少每次查詢的時間范圍,獲取下一次查詢時間點方法,
/**
* 分時間端查詢,減輕Influxdb服務器壓力
*/
fun getInfluxNextEndTime(startTime: Instant, timeSpan: String, number: Long = 2): Instant {
val currentTime = Instant.now()
val localStartTime = LocalDateTime.ofInstant(startTime, ZoneId.systemDefault())
val span = timeSpan.substring(timeSpan.length - 1)
var nextEndTime = when (span) {
"s", "S" -> {
localStartTime.plusHours(number).atZone(ZoneOffset.systemDefault()).toInstant()
}
"m", "M" -> {
localStartTime.plusDays(number).atZone(ZoneOffset.systemDefault()).toInstant()
}
else -> {
currentTime
}
}
if (nextEndTime.isAfter(currentTime)) {
nextEndTime = currentTime
}
return nextEndTime
}
文章到這里就結束了,更多的聚合函式可以看官方檔案:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/2512.html
標籤:其它
下一篇:Redis主從復制
