Flink實作訂單自動好評
???????需求


在電商領域會有這么一個場景,如果用戶買了商品,在訂單完成之后,一定時間之內沒有做出評價,系統自動給與五星好評,我們今天主要使用Flink的定時器來簡單實作這一功能,
???????資料
自定義source模擬生成一些訂單資料.
在這里,我們生了一個最簡單的二元組Tuple3,包含用戶id,訂單id和訂單完成時間三個欄位.
/**
* 自定義source實時產生訂單資料Tuple3<用戶id,訂單id, 訂單生成時間>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
Random random = new Random();
while (flag) {
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
}
}
@Override
public void cancel() {
flag = false;
}
}
???????編碼步驟
1.env
2.source
3.transformation
設定經過interval毫秒用戶未對訂單做出評價,自動給與好評.為了演示方便,設定5s的時間
long interval = 5000L;
分組后使用自定義KeyedProcessFunction完成定時判斷超時訂單并自動好評
dataStream.keyBy(0).process(new TimerProcessFuntion(interval));
3.1定義MapState型別的狀態,key是訂單號,value是訂單完成時間
3.2創建MapState
MapStateDescriptor<String, Long> mapStateDesc =
new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDesc);
3.3注冊定時器
mapState.put(value.f0, value.f1);
ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
3.4定時器被觸發時執行并輸出結果
4.sink
5.execute
???????代碼實作
package cn.itcast.action;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
* Author itcast
* Desc
*/
public class OrderAutomaticFavorableComments {
public static void main(String[] args) throws Exception {
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//TODO 2.source
//Tuple3<用戶id,訂單id,訂單生成時間>
DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());
//TODO 3.transformation
//設定經過interval毫秒用戶未對訂單做出評價,自動給與好評.為了演示方便,設定5s的時間
long interval = 5000L;//5s
//分組后使用自定義KeyedProcessFunction完成定時判斷超時訂單并自動好評
orderDS.keyBy(t -> t.f0)
.process(new TimerProcessFunction(interval));
//TODO 4.sink
//TODO 5.execute
env.execute();
}
/**
* 自定義source實時產生訂單資料Tuple3<用戶id,訂單id, 訂單生成時間>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
Random random = new Random();
while (flag) {
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
}
}
@Override
public void cancel() {
flag = false;
}
}
/**
* 自定義ProcessFunction完成訂單自動好評
* 進來一條資料應該在interval時間后進行判斷該訂單是否超時是否需要自動好評
* abstract class KeyedProcessFunction<K, I, O>
*/
private static class TimerProcessFunction extends KeyedProcessFunction<String, Tuple3<String, String, Long>, Object> {
private long interval;//訂單超時時間 傳進來的是5000ms/5s
public TimerProcessFunction(long interval) {
this.interval = interval;
}
//-0.準備一個State來存盤訂單id和訂單生成時間
private MapState<String, Long> mapState = null;
//-1.初始化
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
//-2.處理每一條資料并存入狀態并注冊定時器
@Override
public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
//Tuple3<用戶id,訂單id, 訂單生成時間> value里面是當前進來的資料里面有訂單生成時間
//把訂單資料保存到狀態中
mapState.put(value.f1, value.f2);//xxx,2020-11-11 00:00:00 || xx,2020-11-11 00:00:01
//該訂單在value.f2 + interval時過期/到期,這時如果沒有評價的話需要系統給與默認好評
//注冊一個定時器在value.f2 + interval時檢查是否需要默認好評
ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);//2020-11-11 00:00:05 || 2020-11-11 00:00:06
}
//-3.執行定時任務
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
//檢查歷史訂單資料(在狀態中存盤著)
//遍歷取出狀態中的訂單資料
Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> map = iterator.next();
String orderId = map.getKey();
Long orderTime = map.getValue();
//先判斷是否好評--實際中應該去呼叫訂單評價系統看是否好評了,我們這里寫個方法模擬一下
if (!isFavorable(orderId)) {//該訂單沒有給好評
//判斷是否超時--不用考慮進來的資料是否過期,統一判斷是否超時更保險!
if (System.currentTimeMillis() - orderTime >= interval) {
System.out.println("orderId:" + orderId + "該訂單已經超時未評價,系統自動給與好評!....");
//移除狀態中的資料,避免后續重復判斷
iterator.remove();
mapState.remove(orderId);
}
} else {
System.out.println("orderId:" + orderId + "該訂單已經評價....");
//移除狀態中的資料,避免后續重復判斷
iterator.remove();
mapState.remove(orderId);
}
}
}
//自定義一個方法模擬訂單系統回傳該訂單是否已經好評
public boolean isFavorable(String orderId) {
return orderId.hashCode() % 2 == 0;
}
}
}
???????效果

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/282887.html
標籤:其他
上一篇:docker 安裝 lnmp環境
