兩者區別:
1.x 版本使用 influxQL 查詢語言,2.x 和 1.8+(beta) 使用 flux 查詢語法;相比V1 移除了database 和 RP,增加了bucket, V2具有以下幾個概念: timestamp、field key、field value、field set、tag key、tag value、tag set、measurement、series、point、bucket、bucket schema、organization 新增的概念: bucket:所有 InfluxDB 資料都存盤在一個存盤桶中,一個桶結合了資料庫的概念和存盤周期(時間每個資料點仍然存在持續時間),一個桶屬于一個組織 bucket schema:具有明確的schema-type的存盤桶需要為每個度量指定顯式架構,測量包含標簽、欄位和時間戳,顯式模式限制了可以寫入該度量的資料的形狀, organization:InfluxDB組織是一組用戶的作業區,所有儀表板、任務、存盤桶和用戶都屬于一個組織,本文所有代碼:https://github.com/Tom-shushu/InfluxDB1.xAnd2.x-SpringBoot 新得閱讀地址:http://www.zhouhong.icu/post/161
一、InfluxDB1.x Docker安裝以及與Boot整合
A、docker安裝InfluxDB1.x (influxdb1.8.4)
1、安裝:
docker run -d --name influxdb -p 8086:8086 influxdb:1.8.4
2、查看
docker ps -a
3、進入docker的influx中
docker exec -it daf88772adc9 /bin/bash4、直接輸入influx啟動
influx

5、修改賬戶密碼
# 顯示用戶 SHOW USERS # 創建用戶 CREATE USER "username" WITH PASSWORD 'password' # 賦予用戶管理員權限 GRANT ALL PRIVILEGES TO username # 創建管理員權限的用戶 CREATE USER <username> WITH PASSWORD '<password>' WITH ALL PRIVILEGES # 修改用戶密碼 SET PASSWORD FOR username = 'password' # 撤消權限 REVOKE ALL ON mydb FROM username # 查看權限 SHOW GRANTS FOR username # 洗掉用戶 DROP USER "username"
6、在組態檔啟用認證
默認情況下,influxdb的組態檔是禁用認證策略的,所以需要修改設定一下, 編輯組態檔vim /etc/influxdb/influxdb.conf,把 [http] 下的 auth-enabled 選項設定為 true7、設定保存策略(多長時間之前的資料需要洗掉)---默認為 autogen 永久不洗掉
a、查看資料庫的保存策略
show retention policies on 資料庫名
例子:
# 選擇使用telegraf資料庫 > use influx_test; Using database influx_test # 查詢資料保存策略 > show retention policies on influx_test name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 truename 策略名稱:默認autogen duration 持續時間: 0s 代表無限制 shardGroupDuration shardGroup資料存盤時間:shardGroup是InfluxDB的一個基本存盤結構, 應該大于這個時間的資料在查詢效率上應該有所降低, replicaN 副本個數:1 代表只有一個副本 default 是否默認策略:true 代表設定為該資料庫的默認策略
b、設定保存策略
# 新建一個策略 CREATE RETENTION POLICY "策略名稱" ON 資料庫名 DURATION 時長 REPLICATION 副本個數; # 新建一個策略并且直接設定為默認策略 CREATE RETENTION POLICY "策略名稱" ON 資料庫名 DURATION 時長 REPLICATION 副本個數 DEFAULT;
例子:
# 創建新的默認策略role_01保留資料時長1小時 > CREATE RETENTION POLICY "1hour" ON influx_test DURATION 1h REPLICATION 1 DEFAULT;
c、修改保存策略
ALTER RETENTION POLICY "策略名稱" ON "資料庫名" DURATION 時長 ALTER RETENTION POLICY "策略名稱" ON "資料庫名" DURATION 時長 DEFAULT
d、洗掉保存策略
drop retention POLICY "策略名" ON "資料庫名"
8、使用桌面可視化工具連接資料庫
工具鏈接:https://github.com/CymaticLabs/InfluxDBStudio/releases/download/v0.2.0-beta.1/InfluxDBStudio-0.2.0.zip
B、InfluxDB1.x與Spring整合(只列舉部分代碼,后面會放上整個專案的GitHub地址)
整個專案結構如下:
1、引入依賴 (其他依賴未顯示全,后面會放上整個專案的GitHub地址)
<dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.20</version> </dependency>
2、新建yml檔案
influx: url: 'http://xxx.xx.xxx.xx:8086' password: 'password' username: 'username'
3、連接配置 InfluxDBConfig
@Data @Configuration @ConfigurationProperties(prefix = "influx") public class InfluxDBConfig { private String url; private String username; private String password; /** * description: 用于查詢 * date: 2022/1/20 23:11 * author: zhouhong * @param * @param null * @return */ @Bean(destroyMethod = "close") public InfluxDB influxDBClient(){ return InfluxDBFactory.connect(this.url, this.username, this.password); } /** * description: 用于寫入 * date: 2022/1/20 23:12 * author: zhouhong * @param * @param null * @return */ @Bean(name = "influxDbWriteApi",destroyMethod = "close") public WriteApi influxDbWriteApi(){ InfluxDBClient influxDBClient = InfluxDBClientFactory.createV1(this.url, this.username, this.password.toCharArray(), "influx_test", "autogen"); return influxDBClient.getWriteApi(); } }
4、封裝用于查詢的方法
@Component public class InfluxUtil { /** * description: 通用查詢 * date: 2022/1/20 23:13 * author: zhouhong * @param * @param null * @return */ public QueryResult query(String command, String database, InfluxDB influxDB) { Query query = new Query(command, database); return influxDB.query(query); } }
5、新建需要寫入的資料的物體類、需要回傳的類(省略,具體參考github示例)InsertParams.java InfluxResult.java
6、新建server層和impl實作類
InfluxServiceImpl.java 如下:/** * description: 時序資料庫Impl * date: 2022/1/16 20:47 * author: zhouhong */ @Service @Slf4j public class InfluxServiceImpl implements InfluxService { @Resource(name = "influxDbWriteApi") private WriteApi influxDbWriteApi; @Resource(name = "influxDBClient") private InfluxDB influxDBClient; @Autowired private InfluxUtil influxUtil; @Override public void insert(InsertParams insertParams) { influxDbWriteApi.writeMeasurement(WritePrecision.MS, insertParams); } @Override public Object queryAll(InsertParams insertParams) { List<InfluxResult> list = new ArrayList<>(); InfluxResult influxResult = new InfluxResult(); String sql = "SELECT * FROM \"influx_test\" WHERE time > '2022-01-16' tz('Asia/Shanghai')"; QueryResult queryResult = influxUtil.query(sql, "influx_test", influxDBClient); queryResult.getResults().get(0).getSeries().get(0).getValues().forEach(item -> { influxResult.setTime(item.get(0).toString()); influxResult.setCurrent(item.get(1).toString()); influxResult.setEnergyUsed(item.get(2).toString()); influxResult.setPower(item.get(3).toString()); influxResult.setVoltage(item.get(4).toString()); list.add(influxResult); }); return list; } @Override public Object querySumByOneDay(InsertParams insertParams) { String sql = "SELECT SUM(voltage) FROM \"influx_test\" WHERE time > '2022-01-18' GROUP BY time(1d) tz('Asia/Shanghai')"; QueryResult queryResult = influxUtil.query(sql, "influx_test", influxDBClient); return queryResult.getResults().get(0).getSeries().get(0); } }
7、controller層 InfluxDbController.java(回傳結果是封裝過后的,詳情見github示例)
@RestController public class InfluxDbController { @Autowired private InfluxService influxService; /** * description: 時序資料庫插入測驗 * date: 2022/1/16 23:00 * author: zhouhong * @param * @param null * @return */ @PostMapping("/influxdb/insert") public ResponseData insert(@RequestBody InsertParams insertParams) { influxService.insert(insertParams); return new SuccessResponseData(); } /** * description: 時序資料庫查詢全部資料測驗 * date: 2022/1/16 23:00 * author: zhouhong * @param * @param null * @return */ @PostMapping("/influxdb/queryAll") public ResponseData query(@RequestBody InsertParams insertParams) { return new SuccessResponseData(influxService.queryAll(insertParams)); } /** * description: 時序資料庫按天查詢當前電壓總和測驗 * date: 2022/1/16 23:00 * author: zhouhong * @param * @param null * @return */ @PostMapping("/influxdb/queryByOneDay") public ResponseData queryByOneDay(@RequestBody InsertParams insertParams) { return new SuccessResponseData(influxService.querySumByOneDay(insertParams)); } }
8、PostMan測驗(注意需要先新建一個 資料庫---influx_test)
8.1 插入測驗 localhost:9998/influxdb/insert
入參:
{ "energyUsed":243.78, "power":54.50, "current":783.34, "voltage":44.09 }
回傳:
{ "success": true, "code": 200, "message": "請求成功", "localizedMsg": "請求成功", "data": null }
8.2、查詢全部(注意,這里回傳結果我封裝了一下)localhost:9998/influxdb/queryAll
入參:
{
}
回傳:
{ "success": true, "code": 200, "message": "請求成功", "localizedMsg": "請求成功", "data": [ { "energyUsed": "243.78", "power": "54.5", "current": "783.34", "voltage": "44.09", "time": "2022-01-20T23:44:00.626+08:00" }, { "energyUsed": "243.78", "power": "54.5", "current": "783.34", "voltage": "44.09", "time": "2022-01-20T23:44:00.626+08:00" } ] }
8.3聚合查詢(統計2022-01-18到現在,以天為單位每天的用電量之和) localhost:9998/influxdb/queryByOneDay 精度問題暫時沒處理
入參:
{ }
回傳:
{ "success": true, "code": 200, "message": "請求成功", "localizedMsg": "請求成功", "data": { "name": "influx_test", "tags": null, "columns": [ "time", "sum" ], "values": [ [ "2022-01-18T00:00:00+08:00", null ], [ "2022-01-19T00:00:00+08:00", null ], [ "2022-01-20T00:00:00+08:00", 481.07000000000005 ] ] } }
C、常見的查詢SQL 后面加上 tz('Asia/Shanghai') 解決時區差
1、查所指定時間之后的所有
SELECT * FROM "real_water_amount" where time > '2022-01-01' tz('Asia/Shanghai')
2、查詢平均值 mean()
SELECT mean(value) FROM "real_water_amount" where time > '2022-01-01' tz('Asia/Shanghai')
3、查詢最大最小值 max() min()
SELECT max(value) FROM "real_water_amount" where time > '2022-01-01' tz('Asia/Shanghai')
4、按年、月、天、周、小時、分鐘、秒統計
SELECT sum(value) FROM "real_water_amount" where time > '2022-01-01' group by time(1d) tz('Asia/Shanghai')
5、按照列過濾
SELECT * FROM "real_water_amount" where time > '2022-01-01' and iotId = '8ecJY59UJd1jwPLBmJA5000000'
二、InfluxDB2.x Docker安裝以及與Boot整合
A、Docker安裝InfluxDB2.x
1、安裝:默認拉取最新版本
docker run -d --name influxdb -p 8086:8086 influxdb
2、查看
docker ps -a
3、瀏覽器訪問 IP:8086 (注意:部署在遠程服務器上需要開啟8086埠安全組)設定賬號密碼
4、然后點擊 Data -- > Buucket 就可以看到我們剛才創建的 名字為 Tom 的 Buucket了
5、點擊 API Tokens 獲取當前用戶的 Token(整合時需要)
6、設定Bucket的保存策略
B、InfluxDB2.x與SpringBoot整合
1、依賴
<dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.20</version> </dependency>
2、yml組態檔
influx: influxUrl: 'http://XXX.XX.XXX.XX:8086' bucket: 'tom' org: 'my_influxdb' token: 'Rt23UemGI_cfS-lFDrurtjh46P1enfhrji-KrZYR04wUR1Yxw_oBCZPL6GmFYSDn20Q9gM_P9DIBhHc2RJjNkA=='
3、配置類
@Setter @Getter public class InfluxBean{ /** * 資料庫url地址 */ private String influxUrl; /** * 桶(表) */ private String bucket; /** * 組織 */ private String org; /** * token */ private String token; /** * 資料庫連接 */ private InfluxDBClient client; /** * 構造方法 */ public InfluxBean(String influxUrl, String bucket, String org, String token) { this.influxUrl = influxUrl; this.bucket = bucket; this.org = org; this.token = token; this.client = getClient(); } /** * 獲取連接 */ private InfluxDBClient getClient() { if (client == null) { client = InfluxDBClientFactory.create(influxUrl, token.toCharArray()); } return client; } /** * 寫入資料(以秒為時間單位) */ public void write(Object object){ try (WriteApi writeApi = client.getWriteApi()) { writeApi.writeMeasurement(bucket, org, WritePrecision.NS, object); } } /** * 讀取資料 */ public List<FluxTable> queryTable(String fluxQuery){ return client.getQueryApi().query(fluxQuery, org); } }
@Data @Configuration @ConfigurationProperties(prefix = "influx") public class InfluxConfig { /** * url地址 */ private String influxUrl; /** * 桶(表) */ private String bucket; /** * 組織 */ private String org; /** * token */ private String token; /** * 初始化bean */ @Bean(name = "influx") public InfluxBean InfluxBean() { return new InfluxBean(influxUrl, bucket, org, token); } }
4、實作類
@Service @Slf4j public class InfluxServiceImpl implements InfluxService { @Resource private InfluxBean influxBean; @Override public void insert(InsertParams insertParams) { insertParams.setTime(Instant.now()); influxBean.write(insertParams); } @Override public List<InfluxResult> queue(){ // 下面兩個 private 方法 賦值給 list 查詢對應的資料 List<FluxTable> list = queryInfluxAll(); List<InfluxResult> results = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { for (int j = 0; j < list.get(i).getRecords().size(); j++) { InfluxResult influxResult = new InfluxResult(); influxResult.setCurrent(list.get(i).getRecords().get(j).getValues().get("current").toString()); influxResult.setEnergyUsed(list.get(i).getRecords().get(j).getValues().get("energyUsed").toString()); influxResult.setPower(list.get(i).getRecords().get(j).getValues().get("power").toString()); influxResult.setVoltage(list.get(i).getRecords().get(j).getValues().get("voltage").toString()); influxResult.setTime(list.get(i).getRecords().get(j).getValues().get("_time").toString()); System.err.println(list.get(i).getRecords().get(j).getValues().toString()); results.add(influxResult); } } return results; } /** * description: 查詢一小時內的InsertParams所有資料 * date: 2022/1/21 13:44 * author: zhouhong * @param * @param null * @return */ private List<FluxTable> queryInfluxAll(){ String query = " from(bucket: \"tom\")" + " |> range(start: -60m, stop: now())" + " |> filter(fn: (r) => r[\"_measurement\"] == \"influx_test\")" + " |> pivot( rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\" )"; return influxBean.queryTable(query); } /** * description: 根據某一個欄位的值過濾(查詢 用電量 energyUsed 為 322 的那條記錄) * date: 2022/1/21 12:44 * author: zhouhong * @param * @param null * @return */ public List<FluxTable> queryFilterByEnergyUsed(){ String query = " from(bucket: \"tom\")" + " |> range(start: -60m, stop: now())" + " |> filter(fn: (r) => r[\"_measurement\"] == \"influx_test\")" + " |> filter(fn: (r) => r[\"energyUsed\"] == \"322\")" + " |> pivot( rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\" )"; return influxBean.queryTable(query); } }
C、測驗
1、插入 localhost:9998/inlfuxdb/insert
入參:
{ "energyUsed":"23.12", "power":"321.60", "current":"782.72", "voltage":"67.43" }
回傳:
{ "success": true, "code": 200, "message": "請求成功", "localizedMsg": "請求成功", "data": null }
2、查詢所有
入參:
{}
回傳:
{ "success": true, "code": 200, "message": "請求成功", "localizedMsg": "請求成功", "data": [ { "energyUsed": "23.12", "power": "321.60", "current": "782.72", "voltage": "67.43", "time": "2022-01-20T17:51:01.819Z" }, { "energyUsed": "243.78", "power": "541.50", "current": "32.34", "voltage": "89.09", "time": "2022-01-20T17:33:47.246Z" } ] }
D、Flux常見查詢陳述句
1、指定資料源:from(bucket:"tom")
指定時間范圍: 使用管道轉發運算子 ( |>) 將資料從資料源通過管道傳輸到range() 函式,該函式指定查詢的時間范圍,它接受兩個引數:start和stop,范圍可以是使用相對負持續時間 或使用絕對時間//使用絕對時間 from(bucket:"tom") |> range(start: 2022-01-05T23:30:00Z, stop: 2022-01-21T00:00:00Z) //過去十五天的資料 from(bucket:"tom") |> range(start: -15d)
2、資料過濾
將范圍資料傳遞到filter()函式中,以根據資料屬性或列縮小結果范圍// 根據 _measurement 和 _field 過濾 from(bucket:"tom") |> range(start: -15d) |> filter(fn: (r) => r._measurement == "influx_test" and r._field == "power" and r.energyUsed == "23.12" )
3、資料轉換
使用函式,將資料聚合為平均值、下采樣資料等from(bucket:"tom") |> range(start: -15d) |> filter(fn: (r) => r._measurement == "influx_test" ) |> window(every: 10m) from(bucket:"tom") |> range(start: -15d) |> filter(fn: (r) => r._measurement == "influx_test" ) |> window(every: 10m) |> mean()其他查詢函式請查看官網:https://docs.influxdata.com/flux/v0.x/stdlib/universe/
輸了不可怕,大不了從頭再來,我們還年輕---周紅
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/418056.html
標籤:Java
上一篇:面試官太難伺候?一個try-catch問出這么多花樣
下一篇:返回列表