Flink Action 綜合案例
需求
在大資料的實時處理中,實時大屏展示已經成了一個很重要的展示項,比如最有名的雙十一
大屏實時銷售成交額展示,除了這個,還有一些其他場景的應用,比如在后臺系統實時的展示網
站當前的pv、uv等等,其實做法都是類似的,
做一個最簡單的模擬電商統計大屏的小例子,需求如下:
1.實時計算出當天零點截止到當前時間的銷售總額
2.計算出各個分類的銷售top3
3.每秒鐘更新一次統計結果
window: [2020-10-25 19:08:31]: 男裝 = 1051.2
window: [2020-10-25 19:08:31]: 女裝 = 1507.69
window: [2020-10-25 19:08:31]: 辦公 = 1022.95
window: [2020-10-25 19:08:31]: 家具 = 373.47
window: [2020-10-25 19:08:31]: 樂器 = 182.95
window: [2020-10-25 19:08:31]: 游戲 = 820.56
window: [2020-10-25 19:08:31]: 戶外 = 566.54
window: [2020-10-25 19:08:31]: 圖書 = 783.45
window: [2020-10-25 19:08:31]: 家電 = 870.21
window: [2020-10-25 19:08:31]: 洗護 = 1235.08
All>>>>[2020-10-25 19:08:31]: all = 8414.1
Top3>>>>2020-10-25 19:08:31, 男裝 = 1051.2, 洗護 = 1235.08, 女裝 = 1507.69
window: [2020-10-25 19:08:32]: 樂器 = 335.43
window: [2020-10-25 19:08:32]: 游戲 = 820.56
window: [2020-10-25 19:08:32]: 男裝 = 1051.2
window: [2020-10-25 19:08:32]: 美妝 = 519.95
window: [2020-10-25 19:08:32]: 家具 = 373.47
window: [2020-10-25 19:08:32]: 女裝 = 1761.21
window: [2020-10-25 19:08:32]: 辦公 = 1330.54
window: [2020-10-25 19:08:32]: 家電 = 1060.58
window: [2020-10-25 19:08:32]: 洗護 = 1235.08
window: [2020-10-25 19:08:32]: 圖書 = 783.45
window: [2020-10-25 19:08:32]: 戶外 = 660.51
window: [2020-10-25 19:08:32]: 運動 = 667.7
All>>>>[2020-10-25 19:08:32]: all = 10599.68
Top3>>>>2020-10-25 19:08:32, 洗護 = 1235.08, 辦公 = 1330.54, 女裝 = 1761.21
window: [2020-10-25 19:08:33]: 女裝 = 1761.21
window: [2020-10-25 19:08:33]: 辦公 = 1597.3
window: [2020-10-25 19:08:33]: 游戲 = 1020.5
window: [2020-10-25 19:08:33]: 樂器 = 335.43
window: [2020-10-25 19:08:33]: 家具 = 518.09
window: [2020-10-25 19:08:33]: 男裝 = 1461.53
window: [2020-10-25 19:08:33]: 美妝 = 662.53
window: [2020-10-25 19:08:33]: 戶外 = 762.15
window: [2020-10-25 19:08:33]: 運動 = 1152.23
window: [2020-10-25 19:08:33]: 洗護 = 1496.14
window: [2020-10-25 19:08:33]: 家電 = 1434.22
window: [2020-10-25 19:08:33]: 圖書 = 783.45
All>>>>[2020-10-25 19:08:33]: all = 12984.78
Top3>>>>2020-10-25 19:08:33, 洗護 = 1496.14, 女裝 = 1761.21, 辦公 = 1597.3
實作思路如下圖所示:

資料
首先通過自定義 source 模擬訂單的生成,交易訂單物體類:TmallOrder
package xx.xxxxxx.flink.tmall;
import lombok.*;
@Setter
@Getter
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class TmallOrder {
private String orderId ;
private Integer userId ;
private Double orderAmount ;
private String orderTime ;
private String category ; @Override
public String toString() {
return orderId + ", " + userId + ", " + orderAmount + ", " + category + ", " + orderTime;
}
}
自定義資料源,生成交易訂單資料,OrderSource代碼如下:
package xx.xxxxxx.flink.tmall;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 自定義資料源,實時產出訂單資料,封裝至TmallOrder實體物件
*/
public class OrderSource extends RichParallelSourceFunction<TmallOrder> {
private boolean isRunning = true ;
// 商品類別
String category[] = { "女裝", "男裝", "圖書", "家電", "洗護", "美妝", "運動", "游戲", "戶外", "家具", "樂器", "辦公"
};
@Override
public void run(SourceContext<TmallOrder> ctx) throws Exception {
Random random = new Random() ;
FastDateFormat dataFormat = FastDateFormat.getInstance("yyyyMMddHHmmssSSS") ;
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS") ;
while (isRunning){
// 創建訂單
long millis = System.currentTimeMillis();
String orderPrice = String.format((5 + random.nextInt(100)) + ".%2d", 10 + random.nextInt(90));
TmallOrder order = new TmallOrder(
String.format(dataFormat.format(millis) + "%5d", 10000 + random.nextInt(1000)), //
(10000 * (random.nextInt(5) + 1))+ random.nextInt(10000), //
Double.parseDouble(orderPrice), //
format.format(millis), //
category[random.nextInt(category.length)]
);
ctx.collect(order);
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(500));
} }@Override
public void cancel() {
isRunning = false ;
}
}
先統計各個類別銷售額,將其封裝在物體類CategoryAmount,代碼如下:
package xx.xxxxxx.flink.tmall;
import lombok.*;
@Setter
@Getter
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class CategoryAmount {
private String category ;
private Double totalAmount ;
private String computeDateTime ;
@Override
public String toString() {
return "[" + computeDateTime + "]: " + category + " = " + totalAmount;
}
public String toContent() {
return category + " = " + totalAmount;
}
}
Flink Window Trigger
Trigger(觸發器) 的作用
英文單詞 trigger 的意思是觸發,作為名詞是扳機的意思,例如槍支上的扳機就叫 trigger,
所以也有開火的意思,Flink中,window操作需要伴隨對視窗中的資料進行處理的邏輯,也就是窗
口函式,而 Trigger 的作用就是決定何時觸發視窗函式中的邏輯執行,
Trigger 抽象類
Flink中定義了Trigger抽象類,任何trigger必須繼承Trigger類,并實作其中的
onElement()、onProcessingTime()、onEventTime()、clear()等抽象方法,Flink官方提供了幾種常
用的trigger實作,同時,用戶可以根據需求自定義trigger,
Flink提供 Triggers
1.EventTimeTrigger:通過對比Watermark和視窗的Endtime確定是否觸發視窗計算,如果
Watermark大于Window EndTime則觸發,否則不觸發,視窗將繼續等待,
2.ProcessTimeTrigger:通過對比ProcessTime和視窗EndTme確定是否觸發視窗,如果
ProcessTime大于EndTime則觸發計算,否則視窗繼續等待,
3.ContinuousEventTimeTrigger:根據間隔時間,周期性觸發視窗或者Window的結束時間
小于當前EndTime觸發視窗計算,
4.ContinuousProcessingTimeTrigger:根據間隔時間周期性觸發視窗或者Window的結束
時間小于當前ProcessTime觸發視窗計算,
5.CountTrigger:根據接入資料量是否超過設定的闕值判斷是否觸發視窗計算,
6.DeltaTrigger:根據接入資料計算出來的Delta指標是否超過指定的Threshold去判斷是否
觸發視窗計算,
7.PurgingTrigger:可以將任意觸發器作為引數轉換為Purge型別的觸發器,計算完成后數
據將被清理,
函式:AggregateFunction
Flink 的AggregateFunction是一個基于中間計算結果狀態進行增量計算的函式,由于是迭代
計算方式,所以,在視窗處理程序中,不用快取整個視窗的資料,所以效率執行比較高,
該函式會將給定的聚合函式應用于每個視窗和鍵, 對每個元素呼叫聚合函式,以遞增方式聚
合值,并將每個鍵和視窗的狀態保持在一個累加器中,
引數型別:AggregateFunction介面,該介面的繼承關系和方法如下:

自定義聚合函式需要實作AggregateFunction介面類,它有四個介面實作方法:




代碼實作
使用Flink 編程實作每日消費額統計,每隔1秒執行一次,先按照類別統計銷售額,再進行總
金額統計和獲取Top類別,
package xx.xxxxxx.flink.tmall;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
/**
* Flink 實作:模擬簡易雙11實時統計大屏
* - 實時計算出當天零點截止到當前時間的銷售總額
* - 計算出銷售top3類別
* - 每秒鐘更新一次統計結果
*/
public class TmallBigScreen {
public static void main(String[] args) throws Exception {
// 1. 執行環境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 第一、設定時間語意:事件時間EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 資料源-source
DataStreamSource<TmallOrder> orderStream = env.addSource(new OrderSource());
//orderStream.printToErr();
// 3. 資料轉換-transformation
// 第二、設定事件時間欄位及watermark水位線
SingleOutputStreamOperator<TmallOrder> timeStream = orderStream.assignTimestampsAndWatermarks(
// 設定最大允許亂序或延遲資料為5秒
new BoundedOutOfOrdernessTimestampExtractor<TmallOrder>(Time.seconds(5)) {
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS");
@Override
public long extractTimestamp(TmallOrder order) {
long eventTime = System.currentTimeMillis() - 5 * 1000 ;
try{
eventTime = format.parse(order.getOrderTime()).getTime();
}catch (Exception e){ e.printStackTrace(); }
return eventTime;
} }
);
//timeStream.printToErr();
// TODO:step1. 每秒統計今日各個類別銷售額(window size:1d,trigger interval: 1s,keyBy:category)
SingleOutputStreamOperator<CategoryAmount> categoryWindowStream = timeStream
// a. 設定分組欄位:類別category
.keyBy("category")
// b. 事件時間視窗:1 day
.window(TumblingEventTimeWindows.of(Time.days(1)))
// c. 設定觸發器trigger:1 second
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))
// d. 視窗聚合操作,實作AggregateFunction,其中金額轉換BigDecimal
.aggregate(
// TODO: 定義聚合函式, 對資料進行增量聚合操作
new AggregateFunction<TmallOrder, BigDecimal, Double>() { @Override
public BigDecimal createAccumulator() {
// 初始化中間臨時變數,此處創建BigDecimal物件
return new BigDecimal(0);
}@Override
public BigDecimal add(TmallOrder order, BigDecimal accumulator) {
// 獲取訂單金額
Double orderAmount = order.getOrderAmount();
// 累加操作
BigDecimal addBigDecimal = accumulator.add(new BigDecimal(orderAmount));
// 回傳值
return addBigDecimal;
}@Override
public BigDecimal merge(BigDecimal a, BigDecimal b) {
return a.add(b);
}@Override
public Double getResult(BigDecimal accumulator) {
return accumulator.setScale(2, RoundingMode.HALF_UP).doubleValue();
}
}, // TODO: 定義視窗函式, 對視窗資料進行計算并輸出
new WindowFunction<Double, CategoryAmount, Tuple, TimeWindow>() {
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
@Override
public void apply(Tuple tuple,TimeWindow window,Iterable<Double> input,
Collector<CategoryAmount> out) throws Exception {
// 類別category
String category = ((Tuple1<String>)tuple).f0 ;
// 視窗中消費金額
Double windowAmount = input.iterator().next();
// 視窗結束時間
String computeDataTime = format.format(System.currentTimeMillis());
// 輸出結果
out.collect(new CategoryAmount(category, windowAmount, computeDataTime));
} }
);
/*
[2021-01-27 22:15:40]: 辦公 = 120.84
[2021-01-27 22:15:40]: 男裝 = 415.24
[2021-01-27 22:15:40]: 運動 = 319.08
[2021-01-27 22:15:40]: 樂器 = 165.33
[2021-01-27 22:15:40]: 戶外 = 54.12
[2021-01-27 22:15:40]: 家電 = 111.76
[2021-01-27 22:15:40]: 圖書 = 309.51
[2021-01-27 22:15:40]: 游戲 = 92.55
[2021-01-27 22:15:40]: 家具 = 199.87
[2021-01-27 22:15:40]: 美妝 = 164.76
*/
//categoryWindowStream.printToErr();
// TODO: step2. 每秒鐘統計消費額Top3類別和總銷售額(window:1s,processTime 處理時間)
/*
- 銷售總額 ; - Top3銷售的類別 ; - 每秒計算一次輸出
*/
SingleOutputStreamOperator<String> resultStream = categoryWindowStream
// 按照計算時間
.keyBy("computeDataTime")
// 設定視窗:1s
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
// 視窗內聚合,獲取總銷售額和Top類別
.apply(new WindowFunction<CategoryAmount, String, Tuple, TimeWindow>() { @Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<CategoryAmount> input,
Collector<String> out) throws Exception {
// TODO: a-1. 宣告金額累加
BigDecimal sumDecimal = new BigDecimal(0.0) ;
// TODO: a-2. 定義優先佇列,存盤CategoryAmount物件,按照金額降序排序
Queue<CategoryAmount> queue = new PriorityQueue<CategoryAmount>(
4, //
new Comparator<CategoryAmount>() { @Override
public int compare(CategoryAmount o1, CategoryAmount o2) {
int comp = 0;
if(o1.getTotalAmount() > o2.getTotalAmount()){
comp = 1; // 升序
}else if(o1.getTotalAmount() < o2.getTotalAmount()){
comp = -1 ; // 降序
}
return comp;
} } //
);
// TODO: b. 遍歷視窗資料,進行計算
for (CategoryAmount element : input) {
System.out.println("window: " + element);
// b-1. 加入佇列,獲取Top3
queue.add(element);
if (queue.size() > 3) queue.poll();
// b-2. 累加金額
sumDecimal = sumDecimal.add(new BigDecimal(element.getTotalAmount()));
}
// TODO: c. 輸出佇列資料
String computeDataTime = ((Tuple1<String>) tuple).f0 ;
// c-1. 輸出總金額
double sumAmount = sumDecimal.setScale(2, RoundingMode.HALF_UP).doubleValue();
CategoryAmount all = new CategoryAmount("all", sumAmount, computeDataTime);
out.collect("All>>>>" + all.toString());
// c.2. 輸出Top3
StringBuilder builder = new StringBuilder(computeDataTime).append(", ");
for (CategoryAmount item : queue) {
builder.append(item.toContent()).append(", ");
}
String output = builder.toString();
out.collect("Top3>>>>" + output.substring(0, output.length() - 2));
}
});
// 4. 資料終端-sink
resultStream.printToErr();
/*
window: [2021-01-27 22:31:07]: 家電 = 292.35
window: [2021-01-27 22:31:07]: 辦公 = 158.36
window: [2021-01-27 22:31:07]: 男裝 = 114.84
window: [2021-01-27 22:31:07]: 圖書 = 172.78
window: [2021-01-27 22:31:07]: 家具 = 157.0
window: [2021-01-27 22:31:07]: 樂器 = 304.14
window: [2021-01-27 22:31:07]: 洗護 = 101.39
window: [2021-01-27 22:31:07]: 運動 = 313.77
window: [2021-01-27 22:31:07]: 游戲 = 22.7
window: [2021-01-27 22:31:07]: 女裝 = 143.94
window: [2021-01-27 22:31:07]: 戶外 = 45.32
window: [2021-01-27 22:31:07]: 美妝 = 252.45
All>>>>[2021-01-27 22:31:07]: all = 2079.04
Top3>>>>2021-01-27 22:31:07, 家電 = 292.35, 樂器 = 304.14, 運動 = 313.77
*/
// 5. 觸發執行-execute
env.execute(TmallBigScreen.class.getSimpleName());
}
}
運行效果如下所示:

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/289514.html
標籤:其他
上一篇:實時數倉入門訓練營:實時數倉助力互聯網實時決策和精準營銷
下一篇:Hbase
