Windowing TVF
在Flink1.13版本之后出現的替代之前的Group window的產物,官網描述其 is more powerful and effective
//TVF 中的tumble滾動視窗
//tumble(table sensor,descriptor(et),interval '5' second ):作為一張表存在
//特別注意!!!!
//如果在sql中使用了tumble視窗,則一定需要group by,而且group by后一定有window_start,window_end兩個欄位
sql實作TVF的tumble視窗實作
package net.cyan.FlinkSql.TVF;
?
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
?
import java.time.Duration;
?
import static org.apache.flink.table.api.Expressions.$;
?
public class Demo1_Window_TableAPI_Tumble {
public static void main(String[] args) {
//創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//創建表的運行環境
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStream<WaterSensor> waterSensorStream =
env.fromElements(
new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((ws, ts) -> ws.getTs())
?
);
//創建table
Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
//創建表
tabEnv.createTemporaryView("sensor",table);
//執行sql
//TVF 中的tumble滾動視窗
//tumble(table sensor,descriptor(et),interval '5' second ):作為一張表存在
//特別注意!!!!
//如果在sql中使用了tumble視窗,則一定需要group by,而且group by后一定有window_start,window_end兩個欄位
tabEnv.sqlQuery("select" +
" window_start,window_end,id," +
"sum(vc) sum_vc" +
" from table (tumble(table sensor,descriptor(et),interval '5' second ))" +
" group by window_start,window_end,id ")
.execute()
.print();
?
}
}
sql實作TVF的滑動視窗
//TVF 中的hop滾動視窗
//hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作為一張表存在
//first interval :滑動步長, second interval :視窗長度
//特別注意!!!!
// 1.TVF 中滑動視窗的滑動步長與視窗長度必須是整數倍的關系,不然會報錯
// 例如:滑動步長為2,視窗長度就不能為5,可以為6
// 2.如果在sql中使用了hop視窗,則一定需要group by,而且group by后一定有window_start,window_end兩個欄位
package net.cyan.FlinkSql.TVF;
?
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
?
import java.time.Duration;
?
import static org.apache.flink.table.api.Expressions.$;
?
public class Demo2_Window_TVF_Hop {
public static void main(String[] args) {
//創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//創建表的運行環境
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStream<WaterSensor> waterSensorStream =
env.fromElements(
new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((ws, ts) -> ws.getTs())
?
);
//創建table
Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
//創建表
tabEnv.createTemporaryView("sensor",table);
//執行sql
//TVF 中的hop滾動視窗
//hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作為一張表存在
//first interval :滑動步長, second interval :視窗長度
//特別注意!!!!
// 1.TVF 中滑動視窗的滑動步長與視窗長度必須是整數倍的關系,不然會報錯
// 例如:滑動步長為2,視窗長度就不能為5,可以為6
// 2.如果在sql中使用了hop視窗,則一定需要group by,而且group by后一定有window_start,window_end兩個欄位
tabEnv.sqlQuery("select" +
" window_start,window_end,id," +
"sum(vc) sum_vc" +
" from table (hop(table sensor,descriptor(et),interval '2' second,interval '6' second ))" +
" group by window_start,window_end,id ")
.execute()
.print();
?
?
?
}
}
sql實作TVF的累計視窗
累計視窗的應用:
需求:每天每隔一個小時統計一次當天的pv(瀏覽量)
流的方式如何解決:
1、用滾動視窗, 視窗長度設為1h
2、每天的第一個視窗清除狀態,后面的不清,進行狀態的累加
或者
用滾動視窗,長度設定為2day
自定義觸發器,每隔1小時對窗內的元素計算一次,不關閉視窗
sql的方式如何解決?
直接使用累計視窗cumulate
//TVF 中的cumulate累計視窗
//cumulate(table tableName,descriptor(timecol),step,size):作為一張表存在
//tableName:表名
//timecol:時間屬性欄位
//step:累計步長,跟滑動步長類似
//size:視窗長度
//特別注意!!!!
//1.累計視窗的步長與視窗長度同樣是需要整數倍關系
// 2.如果在sql中使用了cumulate視窗,則一定需要group by,而且group by后一定有window_start,window_end兩個欄位
package net.cyan.FlinkSql.TVF;
?
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
?
import java.time.Duration;
?
import static org.apache.flink.table.api.Expressions.$;
?
public class Demo3_Window_TVF_cumulate {
public static void main(String[] args) {
//創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//創建表的運行環境
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStream<WaterSensor> waterSensorStream =
env.fromElements(
new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((ws, ts) -> ws.getTs())
?
);
//創建table
Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
//創建表
tabEnv.createTemporaryView("sensor",table);
//執行sql
//TVF 中的cumulate累計視窗
//cumulate(table tableName,descriptor(timecol),step,size):作為一張表存在
//tableName:表名
//timecol:時間屬性欄位
//step:累計步長,跟滑動步長類似
//size:視窗長度
//特別注意!!!!
//1.累計視窗的步長與視窗長度同樣是需要整數倍關系
// 2.如果在sql中使用了cumulate視窗,則一定需要group by,而且group by后一定有window_start,window_end兩個欄位
tabEnv.sqlQuery("select" +
" window_start,window_end,id," +
" sum(vc) sum_vc" +
" from table (cumulate(table sensor,descriptor(et),interval '2' second,interval '6' second)) " +
"group by window_start,window_end,id")
.execute()
.print();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/525029.html
標籤:其他
上一篇:主定理
