Flink 用戶電商行為分析
文章目錄
- Flink 用戶電商行為分析
- 1. 實時統計分析
- 1. 1 熱門商品統計
- 1. 2 熱門頁面統計
- 1. 3 網站uv統計
- 2. 業務流程以及風險控制
- 2. 1 頁面廣告黑名單過濾
- 2. 2 惡意登陸監控
- 2. 3 訂單支付失效監控
- 2. 4 支付實時對賬
- 3. 專案地址
1. 實時統計分析
1. 1 熱門商品統計
-
需求描述:每隔5分鐘 實時展示1小時內的該網站的熱門商品的TopN
-
展示的資料形式:
時間視窗資訊:
NO 1:商品ID+瀏覽次數1
NO 2:商品ID+瀏覽次數2
NO 1.商品ID+瀏覽次數3
-
實作思路:
-
- 因為最終要視窗資訊+商品ID 所有keyBy后需要全視窗函式 這樣才能拿到視窗時間+key
-
- 而且需要瀏覽次數 所以需要增量聚合函式 keyBy聚合后來一條資料增量聚合一條 拿到瀏覽次數
-
- 以上1 2步驟后只能拿到 一個商品的瀏覽次數 所以為了拿到1小時內的 根據時間視窗keyBy 使用processFunction 視窗內的商品保存到ListStat中 定時器到達視窗截止時間 輸出ListStat的資料
-
-
代碼
/**
* 做什么 :統計一小時內熱門商品 5分鐘更新一次結果
* 怎么做:
* 1.既然輸出1小時內商品資訊,即輸出歷史資料,且每隔5分鐘觸發一次 即到達視窗結束的時候觸發一次
* 輸出5分鐘內保存的狀態資訊
* 輸出: 視窗結束時間 商品ID 熱門數
* <p>
* 2 那么就要統計數出商品結束時間 商品ID 熱門數
* 熱門數:增量聚合函式
* 結束時間+商品ID:全視窗
* <p>
* 輸出結果:
* 視窗結束時間:2017-11-26 12:20:00.0
* 視窗內容:
* NO 1: 商品ID = 2338453 熱門度 = 27
* NO 2: 商品ID = 812879 熱門度 = 18
* NO 3: 商品ID = 4443059 熱門度 = 18
* NO 4: 商品ID = 3810981 熱門度 = 14
* NO 5: 商品ID = 2364679 熱門度 = 14
*/
public class HotItemsPractise {
public static void main(String[] args) throws Exception {
// 1. 環境準備
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
//2. 準備資料源
DataStream<ItemBean> filterStream = inputStream.map(line -> {
String[] split = line.split(",");
return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}).filter(item -> "pv".equals(item.getBehavior()));
//3. 收集一個商品的聚合結果
DataStream<ItemViewCount> windowsResult = filterStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ItemBean>() {
@Override
public long extractAscendingTimestamp(ItemBean element) {
return element.getTimestamp() * 1000L;
}
}).keyBy("itemId")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new MyAggreateCount(), new MyAllWindowsView());
//4. 收集一小時的聚合結果
SingleOutputStreamOperator<String> windowEnd = windowsResult
.keyBy("windowEnd")
.process(new ItemHotTopN(5));
windowEnd.print();
env.execute("HotItemsPractise");
}
/**
* 視窗函式 增量聚合 為了拿到同個商品的瀏覽次數
*/
public static class MyAggreateCount implements AggregateFunction<ItemBean, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(ItemBean value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
/**
* 全函式:輸入值是增量聚合的結果+key, 為了拿到時間視窗資訊 視窗的截止時間+商品ID
*/
public static class MyAllWindowsView implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
/**
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
long windowEnd = window.getEnd();
long count = input.iterator().next();
long itemId = tuple.getField(0);
out.collect(new ItemViewCount(itemId, windowEnd, count));
}
}
/**
* 將一小時內的商品 保存起來 時間視窗到了排序輸出TopN
*/
public static class ItemHotTopN extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
ListState<ItemViewCount> itemViewCountListState;
private int topN;
public ItemHotTopN(int topN) {
this.topN = topN;
}
@Override
public void open(Configuration parameters) throws Exception {
itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemViewCount", ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount itemViewCount, Context ctx, Collector<String> out) throws Exception {
itemViewCountListState.add(itemViewCount);
ctx.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// ListState轉為ArrayList
ArrayList<ItemViewCount> arraylist = Lists.newArrayList(itemViewCountListState.get().iterator());
arraylist.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return o2.getCount().intValue() - o1.getCount().intValue();
}
});
StringBuilder resultStringBuilder = new StringBuilder();
resultStringBuilder.append("===================================" + "\n");
resultStringBuilder.append("視窗結束時間:").append(new Timestamp(timestamp).toString()).append("\n");
for (int i = 0; i < Math.min(topN, arraylist.size()); i++) {
resultStringBuilder
.append("NO ")
.append(i + 1)
.append(": 商品ID = ")
.append(arraylist.get(i).getItemId())
.append(" 熱門度 = ")
.append(arraylist.get(i).getCount())
.append("\n");
}
resultStringBuilder.append("===================================\n");
out.collect(resultStringBuilder.toString());
Thread.sleep(1000L);
}
}
}
1. 2 熱門頁面統計
- 需求 :每隔5分鐘輸出一小時內瀏覽的熱門頁面
- 輸出結果展示:
視窗結束時間:2015-05-18 13:08:50.0
NO 1: 頁面URL = /blog/tags/puppet?flav=rss20 熱門度 = 11
NO 2: 頁面URL = /projects/xdotool/xdotool.xhtml 熱門度 = 5
NO 3: 頁面URL = /projects/xdotool/ 熱門度 = 4
NO 4: 頁面URL = /?flav=rss20 熱門度 = 4
NO 5: 頁面URL = /robots.txt 熱門度 = 4
-
實作思路:和上一個不同的是 該資料源中的資料的時間非增量
- 怎么保證保證亂序資料不丟
- 1.所以要設定watermark與資料源之間的亂序程度
- 2.設定一定的視窗延遲關閉時間 在初始的時間視窗到了 先聚合資料 后續再來屬于該視窗的資料 來一條計算一條輸出一條
- 3.再有遲到的資料 則直接扔到側輸出流中
- 怎么保證后續遲到的資料 來一條覆寫前面的資料
- 1 先開窗增量聚合 再全視窗聚合 再根據視窗截止時間分組
- 2 根據時間的截止視窗開窗key by 收集視窗截止時間內的所有資料 排序輸出
- 3 如果后續再來了延遲資料 需要更新之前的結果,所以把之間的資料存咋mapstat中 key為 頁面url value為輸出結果
- 怎么保證保證亂序資料不丟
-
代碼
/** * * - _ooOoo_ * - o8888888o * - 88" . "88 * - (| -_- |) * - O\ = /O * - ____/`---'\____ * - . ' \\| |// `. * - / \\||| : |||// \ * - / _||||| -:- |||||- \ * - | | \\\ - /// | | * - | \_| ''\---/'' | | * - \ .-\__ `-` ___/-. / * - ___`. .' /--.--\ `. . __ * - ."" '< `.___\_<|>_/___.' >'"". * - | | : `- \`.;`\ _ /`;.`/ - ` : | | * - \ \ `-. \_ __\ /__ _/ .-` / / * ======`-.____`-.___\_____/___.-`____.-'====== * `=---=' * ............................................. * 佛祖保佑 永無BUG * <p> * 需求 * 每5分鐘輸出一次1小時之內排名前5的頁面 * 小時統計一次結果 ,即開窗是一小時 收集1小時內的統計結果,按照視窗結束時間輸出視窗內的結果,視窗的滑動步長設定為5min */ public class HotPages { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); executionEnvironment.setParallelism(1); DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/apache.log"); SimpleDateFormat simpleFormatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); OutputTag<ApacheLogEvent> lateTag = new OutputTag<ApacheLogEvent>("late_date") { }; DataStream<PageViewCount> streamPageViewCount = stringDataStreamSource.map(line -> { String[] s = line.split(" "); // 日期轉時間戳 Long timestamp = simpleFormatter.parse(s[3]).getTime(); return new ApacheLogEvent(s[0], s[1], timestamp, s[5], s[6]); }).filter(date -> "GET".equals(date.getMethod())) .filter(data -> { // 過濾處css js png ico 結尾的 String regex = "((?!\\.(css|js|png|ico|jpg)$).)*$"; return Pattern.matches(regex, data.getUrl()); }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.seconds(1)) { @Override public long extractTimestamp(ApacheLogEvent apacheLogEvent) { return apacheLogEvent.getTimestamp(); } }).keyBy("url") .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.minutes(1)) .sideOutputLateData(lateTag) .aggregate(new HotPageIncreaseAgg(), new HotPageAllAgg()); SingleOutputStreamOperator<String> windowEnd = streamPageViewCount .keyBy("windowEnd") .process(new MyProcessFunction(5)); // 控制臺輸出 windowEnd.print("data"); windowEnd.getSideOutput(lateTag).print("late_date"); executionEnvironment.execute(); } public static class HotPageIncreaseAgg implements AggregateFunction<ApacheLogEvent, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(ApacheLogEvent value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a + b; } } public static class HotPageAllAgg implements WindowFunction<Long, PageViewCount, Tuple, TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception { String url = tuple.getField(0); Long count = input.iterator().next(); long windowEnd = window.getEnd(); out.collect(new PageViewCount(url, windowEnd, count)); } } public static class MyProcessFunction extends KeyedProcessFunction<Tuple, PageViewCount, String> { private Integer topSize; MapState<String, Long> hotPageCount; public MyProcessFunction(Integer topSize) { this.topSize = topSize; } @Override public void open(Configuration parameters) throws Exception { hotPageCount = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("hot_page_count", String.class, Long.class)); } /** * 如果有遲到資料 需要覆寫就的資料 * 那么定義一個map 加入相同的key 會被覆寫 * 如果時間超過1分鐘 那就清除狀態 */ @Override public void processElement(PageViewCount pageViewCount, Context ctx, Collector<String> out) throws Exception { // map 型別 如果key相同就更新 hotPageCount.put(pageViewCount.getUrl(),pageViewCount.getCount()); ctx.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd()+1); } /** * 輸出map中的結果 * 定時器觸發的時間: watermark >= 定時時間 * @param timestamp * @param ctx * @param out * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { Long currentKey = ctx.getCurrentKey().getField(0); // 判斷是否到了視窗關閉清理的時間, 如果是 直接清空狀態 if (timestamp == currentKey + 60 * 1000L) { hotPageCount.clear(); return; } ArrayList<Map.Entry<String, Long>> pageViewCounts = Lists.newArrayList(hotPageCount.entries()); pageViewCounts.sort(new Comparator<Map.Entry<String, Long>>() { @Override public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { if(o1.getValue() > o2.getValue()) return -1; else if(o1.getValue() < o2.getValue()) return 1; else return 0; } }); StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("===================================\n"); stringBuilder.append("視窗結束時間:").append(new Timestamp(timestamp - 1)).append("\n"); for (int i = 0; i < Math.min(topSize, pageViewCounts.size()); i++) { Map.Entry<String, Long> stringLongEntry = pageViewCounts.get(i); stringBuilder.append("NO ").append(i + 1).append(":") .append(" 頁面URL = ").append(stringLongEntry.getKey()) .append(" 熱門度 = ").append(stringLongEntry.getValue()) .append("\n"); } stringBuilder.append("===============================\n\n"); // 控制輸出頻率 Thread.sleep(1000L); out.collect(stringBuilder.toString()); } } }
1. 3 網站uv統計
-
需求:實時輸出每小時內網站的uv
-
輸出格式: 視窗的截止時間+視窗的獨立訪問人數
-
實作思路:
-
1.設定滾動視窗為1小時 每來一條資料就要觸發計算 那么就需要自定義觸發器,
-
2 . 觸發器的方法是每條資料都去觸發后續的統計邏輯 ,uerid去重,去重邏輯就是每條資料根據uerId去redis中查詢,如果有那么丟棄 如果沒有則count+1,
-
- 每條資料來了 決議出user ID,根據自定義的hash函式決議出在位圖的位置,查詢位置的值為1 則取出視窗截止時間對應的訪問數并輸出,如果為1 設定為1,取出訪問數+1 輸出 將更新后的count數存盤到redis中
存盤格式:
count :哈希結果 存盤格式 “uv_count”,<視窗的截止時間,訪問數>
uesrId :位圖
hash函式:userId的當前位的Ascii*seed+上一位的統計結果
-
-
代碼
/**
* @author :LiangFangWei
* @date: 2021-12-21 15:55
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O\ = /O
* - ____/`---'\____
* - . ' \\| |// `.
* - / \\||| : |||// \
* - / _||||| -:- |||||- \
* - | | \\\ - /// | |
* - | \_| ''\---/'' | |
* - \ .-\__ `-` ___/-. /
* - ___`. .' /--.--\ `. . __
* - ."" '< `.___\_<|>_/___.' >'"".
* - | | : `- \`.;`\ _ /`;.`/ - ` : | |
* - \ \ `-. \_ __\ /__ _/ .-` / /
* ======`-.____`-.___\_____/___.-`____.-'======
* .............................................
* - 佛祖保佑 永無BUG
* <p>
* 需求:實時輸出統計每個小時內的的uv,每個小時內用戶去重數實時輸出
* 思路:
* <p>
* 再一小時的時間視窗內,每來一條資料 觸發計算 ,
* 計算邏輯:
* 1.取當前資料去redis的位圖中查有沒有
* 查詢的key為 時間視窗的結束時間
* 查詢的offset為 userID的hash值
* 2. 如果沒有給查詢的位置 置為1
* <p>
* 取判讀redis的位圖中有沒有
* 如果有丟棄 如果沒有 count+1 將新的值存到redis中
*/
public class HotUVWithBloomFilter {
public static void main(String[] args) throws Exception {
//1.環境準備
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
// 2. 準備資料
DataStreamSource<String> inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
SingleOutputStreamOperator<ItemBean> filterData = inputStream.map(line -> {
String[] split = line.split(",");
return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ItemBean>() {
@Override
public long extractAscendingTimestamp(ItemBean element) {
return element.getTimestamp() * 1000L;
}
}).filter(itemBean -> "pv".equals(itemBean.getBehavior()));
//2.滾動視窗為1小時
SingleOutputStreamOperator<PageViewCount> streamOperator = filterData
.timeWindowAll(Time.hours(1))
//3.定義觸發器 需要定義每來一條資料觸發計算 而不是等全部的視窗再觸發計算
.trigger(new UVTriigger())
// 4 計算邏輯 去redis的位圖查是否有沒有當前userID
.process(new UVProcessFunction());
// 5 如果沒有則 需要插入進去
streamOperator.print();
executionEnvironment.execute();
}
/**
* 定義靜態內部類 不需要將類的定義額外寫在class檔案中
*/
public static class UVTriigger extends Trigger<ItemBean, TimeWindow> {
@Override
public TriggerResult onElement(ItemBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
public static class UVProcessFunction extends ProcessAllWindowFunction<ItemBean, PageViewCount, TimeWindow> {
private Jedis jedis;
private String pageCountKey = "uv_page_count";
private BloomFilter bloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
jedis = new Jedis("localhost", 6379);
bloomFilter = new BloomFilter(1 << 29);
}
/**
* 來一條資料去redis中查
*
* @param context
* @param elements
* @param out
* @throws Exception
*/
@Override
public void process(Context context, Iterable<ItemBean> elements, Collector<PageViewCount> out) throws Exception {
Long windowEnd1 = context.window().getEnd();
String windowEnd = windowEnd1.toString();
ItemBean itemBean = elements.iterator().next();
Long userId = itemBean.getUserId();
long offset = bloomFilter.hash(userId.toString(), 61);
Boolean isExist = jedis.getbit(windowEnd, offset);
if (!isExist) {
jedis.setbit(windowEnd, offset, true);
// count值+1 cont值存盤為hash結構
Long uvCount = 0L; // 初始count值
String uvCountString = jedis.hget(pageCountKey, windowEnd);
if (StringUtils.isNoneBlank(uvCountString)) {
uvCount = Long.valueOf(uvCountString);
}
jedis.hset(pageCountKey, windowEnd, String.valueOf(uvCount + 1));
out.collect(new PageViewCount("uv", windowEnd1, uvCount + 1));
}
}
}
public static class BloomFilter {
// 要去2的冪次方 result&(capacity-1) 才是求余的
private long capacity;
public BloomFilter(long capacity) {
this.capacity = capacity;
}
public long hash(String userId, int seed) {
long result = 0L;
for (int i = 0; i < userId.length(); i++) {
result = result * seed + userId.charAt(i);
}
return result & (capacity - 1);
}
}
}
2. 業務流程以及風險控制
2. 1 頁面廣告黑名單過濾
-
需求: 輸出每個省份每個廣告的點擊數,統計周期是一個小時 輸出間隔是5分鐘,要求如果當天某人都某個廣告點擊次數超過3次 則將該用戶輸出到側輸出流中 ,如果當天內用戶再次點擊該廣告 則計為無效 不做統計
-
輸出格式:
-
blacklist-user> BlackAdUerInfo(uerId=937166, adId=1715, count=click over 3times.)
blacklist-user> BlackAdUerInfo(uerId=161501, adId=36156, count=click over 3times.)—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
—>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:25:00.0, count=5)
—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:30:00.0, count=2)
—>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:30:00.0, count=5)
—>> AdOutputInfo(province=shanghai, windowEnd=2017-11-26 09:30:00.0, count=2)
-
-
統計邏輯:
- 怎么過濾例外資料:根據uerid+adId keyBy 分組 再使用process,該磁區每來一條資料 判斷是否到達設定的點擊數,如果沒有則+1,并且輸出該條記錄 則將該用戶的uerID加入到黑名單(側輸出流中) 并注冊第二天凌晨的定時器 定時器第二天清空改用好點擊次數的狀態
- 后面廣告數的統計 就和前面統計方式如出一轍了
-
代碼
/**
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O\ = /O
* - ____/`---'\____
* - . ' \\| |// `.
* - / \\||| : |||// \
* - / _||||| -:- |||||- \
* - | | \\\ - /// | |
* - | \_| ''\---/'' | |
* - \ .-\__ `-` ___/-. /
* - ___`. .' /--.--\ `. . __
* - ."" '< `.___\_<|>_/___.' >'"".
* - | | : `- \`.;`\ _ /`;.`/ - ` : | |
* - \ \ `-. \_ __\ /__ _/ .-` / /
* ======`-.____`-.___\_____/___.-`____.-'======
* .............................................
* - 佛祖保佑 永無BUG
*
* @author :LiangFangWei
* @date: 2021-12-23 18:58
* @description: 統計每個省份的每個廣告的點擊次數, 如果某個用戶當天對廣告的點擊超過次數 輸出作為一個流輸出
* <p>
* 思路:
* 1.最終輸出形式
* (省,視窗截止時間,總數)
* 2. 創建增量聚合可以拿到總數 全視窗函式可以拿到視窗截止時間 和key
* 3. 例外資料進行過濾,如果用戶在某天對同一廣告的點擊次數如果超過一定次數 則單獨作為流輸出
* 3.1 就要保存某個用戶對某個廣告的點擊次數的狀態,如果超過100次 并加入黑名單 如果在黑名單中直接回傳 什么也不處理和統計
* 3.2 如果沒有超過 那么次數+1 輸出資料
*/
public class AdStatisticsByProvince {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
DataStream<String> inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/AdClickLog.csv");
DataStream<AdvertInfo> processStream1 = inputStream.map(line -> {
String[] split = line.split(",");
return new AdvertInfo(split[0], split[1], split[2], Long.parseLong(split[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdvertInfo>() {
@Override
public long extractAscendingTimestamp(AdvertInfo element) {
return element.getTimeStramp() * 1000L;
}
});
// 過濾掉例外的流資料
SingleOutputStreamOperator<AdvertInfo> fliterBlackStream = processStream1
.keyBy("userId", "adId")
.process(new BlackUserProcess(3));
DataStream<AdOutputInfo> resultStream = fliterBlackStream
.keyBy("province")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new IncreaseAggreateEle(), new AllAggreateCount());
fliterBlackStream.getSideOutput(new OutputTag<BlackAdUerInfo>("blacklist"){}).print("blacklist-user");
resultStream.print("--->");
executionEnvironment.execute();
}
/**
* 過濾處例外資料
*/
public static class BlackUserProcess extends KeyedProcessFunction<Tuple, AdvertInfo, AdvertInfo> {
ValueState<Long> adClickCount;
ValueState<Boolean> isBlackUser;
private int bound;
public BlackUserProcess(int bound) {
this.bound = bound;
}
@Override
public void open(Configuration parameters) throws Exception {
adClickCount = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ad_click_count", Long.class, 0l));
isBlackUser = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is_black_user", Boolean.class, false));
}
/**
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(AdvertInfo value, Context ctx, Collector<AdvertInfo> out) throws Exception {
// 1.判斷是否到了設定的邊界 注意狀態只保留一天
Long userIdClickCount = adClickCount.value();
// 注冊第二天的定時器 如果到了清楚狀態
Long timestamp = ctx.timerService().currentProcessingTime();
Long clserTime = ((timestamp / 24 * 60 * 60 * 1000L) + 1) * 24 * 60 * 60 * 1000L - 8 * 60 * 60 * 1000;
ctx.timerService().registerEventTimeTimer(clserTime);
// 2.如果到了設定了邊界
if (userIdClickCount >= bound) {
// 2.1 沒有在黑名單中
if (!isBlackUser.value()) {
// 加入黑名單 加入到側輸出流中
isBlackUser.update(true);
ctx.output(new OutputTag<BlackAdUerInfo>("blacklist") {
},
new BlackAdUerInfo(value.getUserId(), value.getAdId(), "click over " + userIdClickCount + "times."));
}
// 2.2 在黑名單 直接回傳
return;
}
// 3. 如果沒有達到設定的邊界 更新狀態 輸出該條資料
adClickCount.update(userIdClickCount+1);
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<AdvertInfo> out) throws Exception {
adClickCount.clear();
isBlackUser.clear();
}
}
public static class IncreaseAggreateEle implements AggregateFunction<AdvertInfo, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdvertInfo value, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
public static class AllAggreateCount implements WindowFunction<Long, AdOutputInfo, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<AdOutputInfo> out) throws Exception {
Timestamp formateDate = new Timestamp(window.getEnd());
out.collect(new AdOutputInfo(tuple.getField(0).toString(),formateDate.toString(),input.iterator().next()));
}
}
}
2. 2 惡意登陸監控
-
需求:檢測出兩秒內 連續登陸失敗的2次用用戶
-
實作思路:CEP編程 定義 2秒內連續登陸是吧失敗兩次的規則,并將流應用到改規則上 篩選出應用規則后的流
/**
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O\ = /O
* - ____/`---'\____
* - . ' \\| |// `.
* - / \\||| : |||// \
* - / _||||| -:- |||||- \
* - | | \\\ - /// | |
* - | \_| ''\---/'' | |
* - \ .-\__ `-` ___/-. /
* - ___`. .' /--.--\ `. . __
* - ."" '< `.___\_<|>_/___.' >'"".
* - | | : `- \`.;`\ _ /`;.`/ - ` : | |
* - \ \ `-. \_ __\ /__ _/ .-` / /
* ======`-.____`-.___\_____/___.-`____.-'======
* .............................................
* - 佛祖保佑 永無BUG
*
* @author :LiangFangWei
* @date: 2021-12-26 17:11
* @description: 連續登陸失敗檢測 輸出兩秒內登陸失敗3次的記錄
*/
public class LoginCheck {
public static void main(String[] args) throws Exception {
// 1.定義環境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/LoginLog.csv");
// 2.包裝為物件
KeyedStream<LoginInfo, Tuple> keyedStream = stringDataStreamSource.map(line -> {
String[] split = line.split(",");
return new LoginInfo(split[0], split[2], Long.parseLong(split[3]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginInfo>(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginInfo element) {
return element.getTimeStamp() * 1000L;
}
}).keyBy("status");
// 3.定義規則
// 3.1 創建規則
Pattern<LoginInfo, LoginInfo> failPattern = Pattern.<LoginInfo>begin("loginFailEvent").where(new SimpleCondition<LoginInfo>() {
@Override
public boolean filter(LoginInfo value) throws Exception {
return "fail".equals(value.getStatus());
}
// 連續三次登陸失敗 consecutive 設定為嚴格近鄰
}).times(3).consecutive().within(Time.seconds(5));
// 3.2 規則匹配到流上
PatternStream<LoginInfo> pattern = CEP.pattern(keyedStream, failPattern);
// 3.3 篩選資料
SingleOutputStreamOperator selectStream = pattern.select(new PatternSelectFunction<LoginInfo, LoginFailInfo>() {
/*
* Map中存盤的是規則匹配上的資料
*/
@Override
public LoginFailInfo select(Map<String, List<LoginInfo>> pattern) throws Exception {
List<LoginInfo> loginFailEvent = pattern.get("loginFailEvent");
LoginInfo firstFail = loginFailEvent.get(0);
String userId = firstFail.getUserId();
LoginInfo lastFail = pattern.get("loginFailEvent").get(loginFailEvent.size()-1);
Timestamp firstFailTimeStamp = new Timestamp(firstFail.getTimeStamp() * 1000L);
Timestamp secondFailTimeStamp = new Timestamp(lastFail.getTimeStamp() * 1000L);
return new LoginFailInfo(userId, firstFailTimeStamp.toString(), secondFailTimeStamp.toString(), "連續"+loginFailEvent.size()+"登陸失敗");
}
});
selectStream.print();
executionEnvironment.execute();
}
}
2. 3 訂單支付失效監控
-
需求:實時檢測是15分鐘內下單咩有支付的訂單
-
實作邏輯:
- 定義CEP規則: 15分鐘內有下單和支付的規則,
- 匹配到流上
- 從匹配的流上篩選出匹配的資料,并從map中決議出延遲資料和非延資料
- 匹配的資料會封裝到map中
- 沒匹配上的資料也會輸出到map中
-
代碼
/**
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O\ = /O
* - ____/`---'\____
* - . ' \\| |// `.
* - / \\||| : |||// \
* - / _||||| -:- |||||- \
* - | | \\\ - /// | |
* - | \_| ''\---/'' | |
* - \ .-\__ `-` ___/-. /
* - ___`. .' /--.--\ `. . __
* - ."" '< `.___\_<|>_/___.' >'"".
* - | | : `- \`.;`\ _ /`;.`/ - ` : | |
* - \ \ `-. \_ __\ /__ _/ .-` / /
* ======`-.____`-.___\_____/___.-`____.-'======
* .............................................
* - 佛祖保佑 永無BUG
*
* @description:檢測15分鐘內沒有支付的訂單
*/
public class OrderCheck {
private static final Logger logger = LoggerFactory.getLogger(OrderCheck.class);
public static void main(String[] args) throws Exception {
// 1.
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
SingleOutputStreamOperator<OrderInfo> objectSingleOutputStreamOperator = stringDataStreamSource.map(line -> {
String[] split = line.split(",");
return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderInfo>() {
@Override
public long extractAscendingTimestamp(OrderInfo element) {
return element.getTimeStamp()*1000L;
}
});
// 2 定義規則
Pattern<OrderInfo, OrderInfo> orderPayPattern = Pattern.<OrderInfo>begin("create").where(new SimpleCondition<OrderInfo>() {
@Override
public boolean filter(OrderInfo value) throws Exception {
return "create".equals(value.getStatus());
}
}).followedBy("pay").where(new SimpleCondition<OrderInfo>() {
@Override
public boolean filter(OrderInfo value) throws Exception {
return "pay".equals(value.getStatus());
}
}).within(Time.minutes(15));
// 3. 匹配模式
PatternStream<OrderInfo> orderStream = CEP.pattern(objectSingleOutputStreamOperator.keyBy("orderId"), orderPayPattern);
OutputTag<OrderTimeoutInfo> outputTag = new OutputTag<OrderTimeoutInfo>("timeoutStream") {
};
// 4. 篩選輸出匹配上和超時事件
SingleOutputStreamOperator<OrderTimeoutInfo> resultStream = orderStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());
resultStream.print("payed normally");
resultStream.getSideOutput(outputTag).print("timeout");
executionEnvironment.execute("order timeout detect job");
}
/**
* 什么時候 會判斷超時:定義的時間范圍內如果沒有匹配上或者 就判斷超時
* 輸出到哪里:超時事件會輸出到側輸出流中
*/
public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderInfo,OrderTimeoutInfo>{
@Override
public OrderTimeoutInfo timeout(Map<String, List<OrderInfo>> pattern, long timeoutTimestamp) throws Exception {
logger.error("rrrr_locker: get locker fail: key={}", pattern.toString());
OrderInfo OrderInfo = pattern.get("create").get(0);
return new OrderTimeoutInfo(OrderInfo.getOrderId(),"timeout"+timeoutTimestamp);
}
}
public static class OrderPaySelect implements PatternSelectFunction<OrderInfo, OrderTimeoutInfo>{
@Override
public OrderTimeoutInfo select(Map<String, List<OrderInfo>> pattern) throws Exception {
OrderInfo OrderInfo = pattern.get("pay").get(0);
return new OrderTimeoutInfo(OrderInfo.getOrderId(),"pay");
}
}
}
2. 4 支付實時對賬
-
雙流join
-
代碼
/** * - _ooOoo_ * - o8888888o * - 88" . "88 * - (| -_- |) * - O\ = /O * - ____/`---'\____ * - . ' \\| |// `. * - / \\||| : |||// \ * - / _||||| -:- |||||- \ * - | | \\\ - /// | | * - | \_| ''\---/'' | | * - \ .-\__ `-` ___/-. / * - ___`. .' /--.--\ `. . __ * - ."" '< `.___\_<|>_/___.' >'"". * - | | : `- \`.;`\ _ /`;.`/ - ` : | | * - \ \ `-. \_ __\ /__ _/ .-` / / * ======`-.____`-.___\_____/___.-`____.-'====== * ............................................. * - 佛祖保佑 永無BUG * @author :LiangFangWei * @description: 檢測訂單是否到賬 */ public class OrderPay { private final static OutputTag<OrderInfo> unmatchedPays = new OutputTag<OrderInfo>("unmatchedPays") { }; private final static OutputTag<Receipt> unmatchedReceipts = new OutputTag<Receipt>("unmatchedReceipts") { }; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1. 支付資料 DataStreamSource<String> inputSteam1 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv"); SingleOutputStreamOperator<OrderInfo> orderStream = inputSteam1.map(line -> { String[] split = line.split(","); return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3])); }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderInfo>() { @Override public long extractAscendingTimestamp(OrderInfo element) { return element.getTimeStamp() * 1000L; } }); // 2.入賬資料 DataStreamSource<String> inputStream2 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv"); SingleOutputStreamOperator<Receipt> payStream = inputStream2.map(line -> { String[] split = line.split(","); return new Receipt(split[0], split[1], Long.parseLong(split[2])); }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Receipt>() { @Override public long extractAscendingTimestamp(Receipt element) { return element.getTimeStamp() * 1000L; } }); // 3.雙里join SingleOutputStreamOperator<Tuple2<OrderInfo, Receipt>> resultStream = orderStream.keyBy("payId").connect(payStream.keyBy("payId")).process(new DoubleStreamJoinProcess()); // 4.如果join上回傳 resultStream.print("matched"); resultStream.getSideOutput(unmatchedPays).print("unmatchedPays"); } public static class DoubleStreamJoinProcess extends CoProcessFunction<OrderInfo, Receipt, Tuple2<OrderInfo, Receipt>> { ValueState<OrderInfo> payState; ValueState<Receipt> receiptState; @Override public void open(Configuration parameters) throws Exception { payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderInfo>("pay", OrderInfo.class)); receiptState = getRuntimeContext().getState(new ValueStateDescriptor<Receipt>("receipt", Receipt.class)); } @Override public void processElement1(OrderInfo orderInfo, Context ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception { Receipt receipt = receiptState.value(); // 取出流2 if (receipt != null) { out.collect(new Tuple2<>(orderInfo, receipt)); receiptState.clear(); } else { payState.update(orderInfo); ctx.timerService().registerEventTimeTimer(orderInfo.getTimeStamp() * 1000L + 5000L); } } @Override public void processElement2(Receipt receipt, Context ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception { // 取出流1 OrderInfo orderInfo = payState.value(); if (orderInfo != null) { out.collect(new Tuple2<>(orderInfo, receipt)); payState.clear(); } else { receiptState.update(receipt); ctx.timerService().registerEventTimeTimer(receipt.getTimeStamp() * 1000L + 5000L); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception { if (payState.value() != null) { ctx.output(unmatchedPays, payState.value()); } if (receiptState.value() != null) { ctx.output(unmatchedReceipts, receiptState.value()); } payState.clear(); receiptState.clear(); super.onTimer(timestamp, ctx, out); } } }
3. 專案地址
專案地址歡迎大家來踩踩踩
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/401586.html
標籤:其他
