文章目錄
- 前言
- 專案整體介紹
- 專案主要模塊
- 資料源決議
- 熱門時事商品統計
- 基本需求
- 解決思路
- 按照商品id進行磁區
- 設定時間視窗
- 視窗聚合
- 實時流量統計–熱門頁面
- 基本需求
- 解決思路
- 市場營銷分析–APP市場推廣統計
- 基本需求
- 解決思路
- 市場營銷分析–頁面廣告統計
- 基本需求
- 解決思路
- 專案撰寫
- 熱門商品統計
- 資料分析
- 定義資料輸入輸出的結構
- WATERMARKS
- AGGREGATEFUNCTION自定義聚合規則
- WINDOWFUNCTION自定義視窗處理元素的規則
- 處理函式(PROCESSFUNCTIONS)
- 完整代碼如下
- 資料源改為KAFKA
- 自定義KAFKA生產者
- 實時流量統計
- 頁面瀏覽量統計
- 網站瀏覽總量(PV)
- 獨立訪客數統計(UV)
- 方法一
- 方法二
- 方法三 布隆過濾
- 市場營銷商業指標
- APP市場推廣統計
- 頁面廣告分析
- 過濾黑名單
- 惡意登錄監控
- 方法一
- FLINKCEP - COMPLEX EVENT PROCESSING FOR FLINK
- 方法二 CEP
- 訂單支付實時監控
- 方法一 CEP
- 方法二 直接使用狀態編程
- 來自兩條流的訂單交易匹配
- 方法一:CONNECT + COPROCESS
- 方法二:JOIN
前言
近些年,隨著對實時資料需求越來越高,掀起了一波學習Flink的熱潮,本文借鑒于尚硅谷大資料實戰_電商用戶行為分析(專案開發實戰)學習,原始專案使用Scala,本文嘗試用Java對專案進行重寫,也會結合官方檔案,介紹一些api的用處,話不多說,直接開始我們今天的正題:
專案整體介紹
專案主要模塊
基于對電商用戶行為資料的基本分類,我們可以發現主要有以下三個分析方向:
1.熱門統計
利用用戶的點擊瀏覽行為,進行流量統計、近期熱門商品統計等,
2.偏好統計
利用用戶的偏好行為,比如收藏、喜歡、評分等,進行用戶畫像分析,給出個性化的商品推薦串列,
3.風險控制
利用用戶的常規業務行為,比如登錄、下單、支付等,分析資料,對例外情況進行報警提示,
本專案限于資料,我們只實作熱門統計和風險控制中的部分內容,將包括以下五大模塊:實時熱門商品統計、實時流量統計、市場營銷商業指標統計、惡意登錄監控和訂單支付失效監控,其中細分為以下9個具體指標:
由于對實時性要求較高,我們會用flink作為資料處理的框架,在專案中,我們將綜合運用flink的各種API,基于EventTime去處理基本的業務需求,并且靈活地使用底層的processFunction,基于狀態編程和CEP去處理更加復雜的情形,
資料源決議
行為資料UserBehavior
| 欄位名 | 資料型別 | 說明 |
|---|---|---|
| userId | Long | 加密后的用戶ID |
| itemId | Long | 加密后的商品ID |
| categoryId | Int | 加密后的商品所屬類別ID |
| behavior | String | 用戶行為型別,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’) |
| timestamp | Long | 行為發生的時間戳,單位秒 |
web日志資料
| 欄位名 | 資料型別 | 說明 |
|---|---|---|
| ip | String | 訪問的 IP |
| userId | Long | 訪問的 user ID |
| eventTime | Long | 訪問時間 |
| method | String | 訪問方法 GET/POST/PUT/DELETE |
| url | String | 訪問的 url |
熱門時事商品統計
基本需求
統計近一小時熱門商品,每五秒鐘更新一次
熱門數用瀏覽度pv來衡量
解決思路
過濾出用戶行為中的pv
構建滑動視窗
按照商品id進行磁區
.keyBy(“itemid”)
設定時間視窗
.timeWindow(Time.minutes(60),Time.minutes(5)) 滑動視窗
時間視窗左閉右開,同一份資料可以發送給滿足條件的多份視窗
視窗聚合
.aggregate(new CountAgg(),new WindowResultFunction())
new CountAgg():定義聚合規則
new WindowResultFunction():定義輸出的資料結構
實時流量統計–熱門頁面
基本需求
從web服務器日志中,統計實時熱門訪問頁面
統計每分鐘ip訪問量,取出訪問量最大的五個地址,每五秒更新一次
解決思路
將日志中的時間轉換為時間戳
構建滑動視窗
市場營銷分析–APP市場推廣統計
基本需求
統計APP市場推廣的資料指標
按照不同的推廣渠道,分別統計資料
解決思路
通過濾過,按照不同渠道進行統計
自定義processFunction
市場營銷分析–頁面廣告統計
基本需求
按照不同省份,統計每小時頁面訪問量,五秒鐘統計一次
對于頻繁的點擊行為進行過濾,放入黑名單
解決思路
滑動視窗
利用processFunction進行黑名單過濾
其實需求的具體細節還有很多,代碼實作中再展開
專案撰寫
熱門商品統計
資料分析
// userID,itemId,categoryId,mode,timeStamp
543462,1715,1464116,pv,1511658000
定義資料輸入輸出的結構
// input structure
class UserBehavior {
public long userId;
public long itemId;
public int categoryId;
public String behavior;
public long timeStamp;
public UserBehavior() {
}
public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timeStamp = timeStamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + '\'' +
", timeStamp=" + timeStamp +
'}';
}
}
// output structure
class ItemViewCount{
public long itemID;
public long windowEnd;
public long count;
public ItemViewCount() {
}
public ItemViewCount(long itemID, long windowEnd, long count) {
this.itemID = itemID;
this.windowEnd = windowEnd;
this.count = count;
}
@Override
public String toString() {
return "ItemViewCount{" +
"itemID=" + itemID +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
WATERMARKS
為了使用事件時間語意,Flink 應用程式需要知道事件時間戳對應的欄位,意味著資料流中的每個元素都需要擁有可分配的事件時間戳,其通常通過使用 TimestampAssigner API 從元素中的某個欄位去訪問/提取時間戳,
時間戳的分配與 watermark 的生成是齊頭并進的,其可以告訴 Flink 應用程式事件時間的進度,其可以通過指定 WatermarkGenerator 來配置 watermark 的生成方式,
使用 Flink API 時需要設定一個同時包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy,WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構建自己的 watermark 策略,
// 設定水印,處理亂序資料
// 水印策略,有界無序,定義一個固定延遲事件
// 同時時間的語意,由我們物件中的timeStamp指定
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));
AGGREGATEFUNCTION自定義聚合規則
AggregateFunction比ReduceFunction更加通用,它有三個引數:輸入型別(IN),累加器型別(ACC)和輸出型別(OUT)
class CountAgg implements AggregateFunction<UserBehavior,Long,Long> {
// 定義初始值
@Override
public Long createAccumulator() {
return null;
}
// 組內規則
@Override
public Long add(UserBehavior userBehavior, Long aLong) {
return null;
}
// 回傳值
@Override
public Long getResult(Long aLong) {
return null;
}
// 組間規則
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
WINDOWFUNCTION自定義視窗處理元素的規則
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
void apply(KEY var1, W var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
}
處理函式(PROCESSFUNCTIONS)
ProcessFunction 將事件處理與 Timer,State 結合在一起,使其成為流處理應用的強大構建模塊, 這是使用 Flink 創建事件驅動應用程式的基礎,它和 RichFlatMapFunction 十分相似, 但是增加了 Timer,
這里展示了其中一種ProcessFunction,
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public KeyedProcessFunction() {
}
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
......
class TopNHotItems extends KeyedProcessFunction<Long,ItemViewCount,String> {
public int n;
// 定義一個狀態變數 list state,用來保存所有的 ItemViewCont
public ListState<ItemViewCount> itemState;
public TopNHotItems(int n) {
this.n = n;
}
// // 在執行processElement方法之前,會最先執行并且只執行一次 open 方法
@Override
public void open(Configuration parameters) throws Exception {
itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemState",ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
itemState.add(itemViewCount);
// 注冊 windowEnd+1 的 EventTime Timer, 延遲觸發,當觸發時,說明收齊了屬于windowEnd視窗的所有商品資料,統一排序處理
context.timerService().registerEventTimeTimer(itemViewCount.windowEnd+1);
}
// 定時器觸發時,會執行這個方法
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 已經收集到所有的資料,首先把所有的資料放到一個 List 中
List<ItemViewCount> allItems = new ArrayList<>();
Iterable<ItemViewCount> itemViewCounts = itemState.get();
Iterator<ItemViewCount> iterator = itemViewCounts.iterator();
int cnt=0;
while (iterator.hasNext()) {
if(cnt>=3) break;
allItems.add(iterator.next());
cnt++;
}
// 清除狀態
itemState.clear();
// 按照 count 大小 倒序排序
Collections.sort(allItems, new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
if(o1.count>o2.count) return -1;
else if(o1.count==o2.count) return 0;
else return 1;
}
});
StringBuilder result = new StringBuilder();
result.append("======================================================\n");
// 觸發定時器時,我們多設定了1秒的延遲,這里我們將時間減去0.1獲取到最精確的時間
result.append("時間:").append(new Timestamp(timestamp - 1)).append("\n");
for(ItemViewCount elem:allItems) result.append(elem.toString());
result.append("\n");
result.append("======================================================\n");
out.collect(result.toString());
}
}
完整代碼如下
注釋還是寫得很詳細的,層層遞進
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;
/**
* @Description: 熱門時事商品統計
*/
// input structure
class UserBehavior {
public long userId;
public long itemId;
public int categoryId;
public String behavior;
public long timeStamp;
public UserBehavior() {
}
public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timeStamp = timeStamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + '\'' +
", timeStamp=" + timeStamp +
'}';
}
}
// output structure
class ItemViewCount{
public long itemID;
public long windowEnd;
public long count;
public ItemViewCount() {
}
public ItemViewCount(long itemID, long windowEnd, long count) {
this.itemID = itemID;
this.windowEnd = windowEnd;
this.count = count;
}
@Override
public String toString() {
return "ItemViewCount{" +
"itemID=" + itemID +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
}
public class HotItems {
public static void main(String[] args) throws Exception {
// 定義流處理環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設定并行度
env.setParallelism(1);
// 設定時間特征為事件事件
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// source
// input data
DataStreamSource<String> stringDataStreamSource = env.readTextFile("/UserBehavior.csv");
// transform
// 將原始資料變成UserBehavior型別
SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}
});
// 設定水印,處理亂序資料
// 水印策略,有界無序,定義一個固定延遲事件
// 同時時間的語意,由我們物件中的timeStamp指定
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));
// 過濾出pv資料
userBehaviorWatermark.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
return userBehavior.behavior.equals("pv");
}
})
// 按照itemId聚合
.keyBy(value -> value.itemId)
// Windows can be defined on already partitioned KeyedStreams
// 定義滑動視窗
.timeWindow(Time.hours(1), Time.minutes(5))
// 統計出每種商品的個數,自定義聚合規則,和輸出結構
.aggregate(new CountAgg(), new WindowResult())
// 按照每次視窗結束時間聚合
.keyBy(value->value.windowEnd)
// 輸出每個視窗中點擊量前N名的商品
.process(new TopNHotItems(3))
.print("HotItems");
env.execute();
}
}
class CountAgg implements AggregateFunction<UserBehavior,Long,Long> {
// 定義初始值
@Override
public Long createAccumulator() {
return 0L;
}
// 組內規則
@Override
public Long add(UserBehavior userBehavior, Long aLong) {
return aLong+1;
}
// 回傳值
@Override
public Long getResult(Long aLong) {
return aLong;
}
// 組間規則
@Override
public Long merge(Long aLong, Long acc1) {
return aLong+acc1;
}
}
class WindowResult implements WindowFunction<Long,ItemViewCount,Long, TimeWindow> {
@Override
public void apply(Long aLong, TimeWindow timeWindow, java.lang.Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {
collector.collect(new ItemViewCount(aLong,timeWindow.getEnd(),iterable.iterator().next()));
}
}
class TopNHotItems extends KeyedProcessFunction<Long,ItemViewCount,String> {
public int n;
// 定義一個狀態變數 list state,用來保存所有的 ItemViewCont
public ListState<ItemViewCount> itemState;
public TopNHotItems(int n) {
this.n = n;
}
// // 在執行processElement方法之前,會最先執行并且只執行一次 open 方法
@Override
public void open(Configuration parameters) throws Exception {
itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemState",ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
itemState.add(itemViewCount);
// 注冊 windowEnd+1 的 EventTime Timer, 延遲觸發,當觸發時,說明收齊了屬于windowEnd視窗的所有商品資料,統一排序處理
context.timerService().registerEventTimeTimer(itemViewCount.windowEnd+1);
}
// 定時器觸發時,會執行這個方法
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 已經收集到所有的資料,首先把所有的資料放到一個 List 中
List<ItemViewCount> allItems = new ArrayList<>();
Iterable<ItemViewCount> itemViewCounts = itemState.get();
Iterator<ItemViewCount> iterator = itemViewCounts.iterator();
int cnt=0;
while (iterator.hasNext()) {
if(cnt>=3) break;
allItems.add(iterator.next());
cnt++;
}
// 清除狀態
itemState.clear();
// 按照 count 大小 倒序排序
Collections.sort(allItems, new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
if(o1.count>o2.count) return -1;
else if(o1.count==o2.count) return 0;
else return 1;
}
});
StringBuilder result = new StringBuilder();
result.append("======================================================\n");
// 觸發定時器時,我們多設定了1秒的延遲,這里我們將時間減去0.1獲取到最精確的時間
result.append("時間:").append(new Timestamp(timestamp - 1)).append("\n");
for(ItemViewCount elem:allItems) result.append(elem.toString());
result.append("\n");
result.append("======================================================\n");
out.collect(result.toString());
}
}
第一個模塊的代碼,寫得比較詳細,之后的模塊在文中就只寫核心部分了,
資料源改為KAFKA
Properties properties = new Properties();
properties.setProperty(“bootstrap.servers”,“localhost:9092”);
properties.setProperty(“group.id”, “consumer-group”);
properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
properties.setProperty(“auto.offset.reset”, “latest”);
DataStreamSource stringDataStreamSource = env.addSource(new FlinkKafkaConsumer<>(“hotItem”, new SimpleStringSchema(), properties));
自定義KAFKA生產者
可以從檔案讀取資訊并不斷發送,便于測驗
public class myKafkaProducer {
public static void main(String[] args) throws IOException {
write2kafka("hotItem");
}
public static void write2kafka(String topic) throws IOException {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","192.168.166.200:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(properties);
InputStreamReader read = new InputStreamReader(new FileInputStream("data/UserBehavior.csv"));
BufferedReader bufferedReader = new BufferedReader(read);
while (true) {
String s = bufferedReader.readLine();
if(s!=null) {
producer.send(new ProducerRecord<String, String>(topic,s));
}
}
}
}
實時流量統計
頁面瀏覽量統計
每隔5秒,輸出最近10分鐘內訪問量最多的前N個URL,
套路其實是一樣的,同上
網站瀏覽總量(PV)
統計每小時pv
其實就是一個word count
獨立訪客數統計(UV)
這里涉及到一個去重的操作,flink本身沒有distinct算子,這比較出乎意料,當前場景,有如下幾種去重的方式,
在展示去重方法之前,需要先指出一個api的要點,就是WindowedStream/AllWindowedStream,這兩者最后輸出的都是DataStream,可以apply在有磁區和沒磁區的視窗中,效果雖然沒有process這么強力,但還是不錯的,
方法一
利用set進行去重
public class UniqueVisitor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}
});
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));
userBehaviorWatermark.filter(value -> value.behavior.equals("pv"))
.timeWindowAll(Time.minutes(60))
.apply(new UvCountByWindow())
.print();
env.execute();
}
}
class UvCountByWindow implements AllWindowFunction<UserBehavior,UVCount, TimeWindow> {
@Override
public void apply(TimeWindow window, Iterable<UserBehavior> input, Collector<UVCount> out) {
Set<Long> longs = new HashSet<>();
for (UserBehavior next : input) {
longs.add(next.userId);
}
out.collect(new UVCount(window.getEnd(),longs.size()));
}
}
方法二
利用mapState,思路和set差不多
可以看到這個processFunction并沒有ontimer方法,因為keyedProcessFunction是ProcessFunction的擴展,可以在onTimer獲取timer的key (通過context.getCurrentKey方法),而這個方法并不是
public class UniqueVisitor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}
});
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));
userBehaviorWatermark.filter(value -> value.behavior.equals("pv"))
.timeWindowAll(Time.minutes(60))
// .apply(new UvCountByWindow())
.process(new UvCountByProcess())
.print();
env.execute();
}
}
class UvCountByProcess extends ProcessAllWindowFunction<UserBehavior, UVCount, TimeWindow> {
public MapState<Long, Long> mapState;
public int cnt = 0;
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapState",Long.class , Long.class));
}
@Override
public void process(Context context, Iterable<UserBehavior> iterable, Collector<UVCount> collector) throws Exception {
for(UserBehavior elem:iterable) {
if(!mapState.contains(elem.userId)) {
mapState.put(elem.userId,1L);
cnt++;
}
}
collector.collect(new UVCount(context.window().getEnd(),cnt));
mapState.clear();
}
}
方法三 布隆過濾
上兩種方法都需要用到記憶體在存盤元素,要是資料量很大,會遇到資源不夠的情況,這里采用布隆過濾器,
public class UvWithBloomFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}
});
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));
userBehaviorWatermark.filter(value -> value.behavior.equals("pv"))
.map(new MapFunction<UserBehavior, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(UserBehavior userBehavior) throws Exception {
return new Tuple2<>("dummyKey",userBehavior.userId);
}
})
.keyBy(value->value.f0)
.timeWindow(Time.minutes(60))
// 我們不應該等待視窗關閉才去做 Redis 的連接 -》 資料量可能很大,視窗的記憶體放不下
// 所以這里使用了 觸發視窗操作的API -- 觸發器 trigger
.trigger(new MyTrigger())
.process(new UvCountWithBloom())
.print();
env.execute();
}
}
class MyTrigger extends Trigger<Tuple2<String, Long>, TimeWindow> {
@Override
public TriggerResult onElement(Tuple2<String, Long> stringLongTuple2, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
}
}
// 定義一個布隆過濾器
class Bloom {
public Long cap;
public Bloom() {
}
public Bloom(Long cap) {
this.cap = cap;
}
public Long hash(String value,int seed) {
long result = 0;
for(int i=0;i<value.length();i++) {
result = result * seed + value.charAt(i);
}
return result & (cap-1);
}
@Override
public String toString() {
return "Bloom{" +
"cap=" + cap +
'}';
}
}
class UvCountWithBloom extends ProcessWindowFunction<Tuple2<String, Long>, UVCount, String, TimeWindow> {
// 可以在這里提前定義和redis的鏈接,這里我就不定義了
// 這里我們就定義一個map來表示,其實應該是利用redis的位圖,本部分建議還是看原始倉庫scala的代碼
public Map<Long,Integer> map = new HashMap<>();
// 定義bloom過濾器
public Bloom bloom;
@Override
public void open(Configuration parameters) throws Exception {
bloom = new Bloom(100L);
}
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<UVCount> out) {
// 因為是每來一條資料就判斷一次,所以我們就可以直接用last獲取到這條資料
String userId = elements.iterator().next().f1.toString();
// 計算哈希
long hash = bloom.hash(userId, 61);
// 定義一個標志位,判斷 redis 位圖中有沒有這一位
if (!map.containsKey(hash)) {
map.put(hash, map.getOrDefault(hash,1));
}
map.put(hash, map.get(hash)+1);
out.collect(new UVCount(Long.parseLong(userId),map.get(hash)));
}
}
市場營銷商業指標
APP市場推廣統計
主要有兩個知識點,一個是自定義資料源,這對測驗來說,是一個很好的方式
class SimulateEventSource extends RichParallelSourceFunction<MarketingUserBehavior> {
// 定義是否運行的識別符號
Boolean running = true;
// 定義渠道的集合
String[] channelSet = {"AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba"};
// 定義用戶行為的集合
String[] behaviorTypes = {"BROWSE", "CLICK", "PURCHASE", "UNINSTALL"};
// 定義亂數發生器
Random rand = new Random();
@Override
public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
long count = 0;
long max = Long.MAX_VALUE;
while (count<max && running) {
String userId = String.valueOf(rand.nextLong());
String behaviorType = behaviorTypes[rand.nextInt(behaviorTypes.length)];
String channel = channelSet[rand.nextInt(channelSet.length)];
long timestamp = System.currentTimeMillis();
ctx.collect(new MarketingUserBehavior(userId,behaviorType,channel,timestamp));
count++;
}
}
@Override
public void cancel() {
running = false;
}
}
另一個,在java中,keyby設計到的回傳值要是過于復雜,如果不想定義pojo的話,還是要使用keyselector,否則可能會遇到錯誤,
完整代碼如下
// 定義一個輸入資料的樣例類 保存電商用戶行為的樣例類
// case class MarketingUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)
class MarketingUserBehavior {
public String userId;
public String behavior;
public String channel;
public Long timestamp;
public MarketingUserBehavior(String userId, String behavior, String channel, Long timestamp) {
this.userId = userId;
this.behavior = behavior;
this.channel = channel;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "MarketingUserBehavior{" +
"userId='" + userId + '\'' +
", behavior='" + behavior + '\'' +
", channel='" + channel + '\'' +
", timestamp=" + timestamp +
'}';
}
}
// 定義一個輸出結果的樣例類 保存 市場用戶點擊次數
// case class MarketingViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)
class MarketingViewCount {
public String windowStart;
public String windowEnd;
public String channel;
public String behavior;
public Long count;
public MarketingViewCount(String windowStart, String windowEnd, String channel, String behavior, Long count) {
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.channel = channel;
this.behavior = behavior;
this.count = count;
}
@Override
public String toString() {
return "MarketingViewCount{" +
"windowStart='" + windowStart + '\'' +
", windowEnd='" + windowEnd + '\'' +
", channel='" + channel + '\'' +
", behavior='" + behavior + '\'' +
", count=" + count +
'}';
}
}
public class AppMarketingByChannel {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 自定義資料源
DataStreamSource<MarketingUserBehavior> marketingUserBehaviorDataStreamSource = env.addSource(new SimulateEventSource());
marketingUserBehaviorDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<MarketingUserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event,timeStamp)->event.timestamp))
.filter(value-> !value.behavior.equals("UNINSTALL"))
.map(new MapFunction<MarketingUserBehavior, Tuple2<Tuple2<String,String>,Long>>() {
@Override
public Tuple2<Tuple2<String,String>, Long> map(MarketingUserBehavior marketingUserBehavior) throws Exception {
return new Tuple2<>(new Tuple2<>(marketingUserBehavior.channel, marketingUserBehavior.behavior), 1L);
}
})
.keyBy(new KeySelector<Tuple2<Tuple2<String, String>, Long>, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(Tuple2<Tuple2<String, String>, Long> tuple2LongTuple2) throws Exception {
return tuple2LongTuple2.f0;
}
})
.timeWindow(Time.minutes(60),Time.seconds(10))
.process(new MarketingCountByChannel())
.print();
env.execute();
}
}
class SimulateEventSource extends RichParallelSourceFunction<MarketingUserBehavior> {
// 定義是否運行的識別符號
Boolean running = true;
// 定義渠道的集合
String[] channelSet = {"AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba"};
// 定義用戶行為的集合
String[] behaviorTypes = {"BROWSE", "CLICK", "PURCHASE", "UNINSTALL"};
// 定義亂數發生器
Random rand = new Random();
@Override
public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
long count = 0;
long max = Long.MAX_VALUE;
while (count<max && running) {
String userId = String.valueOf(rand.nextLong());
String behaviorType = behaviorTypes[rand.nextInt(behaviorTypes.length)];
String channel = channelSet[rand.nextInt(channelSet.length)];
long timestamp = System.currentTimeMillis();
ctx.collect(new MarketingUserBehavior(userId,behaviorType,channel,timestamp));
count++;
}
}
@Override
public void cancel() {
running = false;
}
}
class MarketingCountByChannel extends ProcessWindowFunction<Tuple2<Tuple2<String,String>,Long>,MarketingViewCount,Tuple2<String,String>, TimeWindow> {
@Override
public void process(Tuple2<String, String> stringStringTuple2, Context context, Iterable<Tuple2<Tuple2<String, String>, Long>> elements, Collector<MarketingViewCount> out) throws Exception {
// 根據 context 物件分別獲取到 Long 型別的 視窗的開始和結束時間
//context.window.getStart是長整形 所以new 一個 變成String型別
String startTs = String.valueOf(context.window().getStart());
String endTs = String.valueOf(context.window().getEnd());
// 獲取到 渠道
String channel = stringStringTuple2.f0;
// 獲取到 行為
String behaviorType = stringStringTuple2.f1;
// 獲取到 次數
long count = 0;
for (Tuple2<Tuple2<String, String>, Long> element : elements) {
count++;
}
// 輸出結果
out.collect(new MarketingViewCount(startTs, endTs, channel, behaviorType, count));
}
}
頁面廣告分析
按照省份劃分點擊量
過濾黑名單
相比上一個功能多了一個過濾動作,具體過濾規則由需求決定
這里使用了旁路輸出getSideOutput,這次接觸到一個實際案例,識訓還是很大的
//定義側輸出流報警資訊樣例類
// case class BlackListWarning(userId:Long,adId:Long,msg:String)
class BlackListWarning {
public long userId;
public long adId;
public String msg;
public BlackListWarning(long userId, long adId, String msg) {
this.userId = userId;
this.adId = adId;
this.msg = msg;
}
@Override
public String toString() {
return "BlackListWarning{" +
"userId=" + userId +
", adId=" + adId +
", msg='" + msg + '\'' +
'}';
}
}
public class AdAnalysisByProvinceBlack {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/AdClickLog.csv");
SingleOutputStreamOperator<AdClickEvent> adLogStream = stringDataStreamSource.map(new MapFunction<String, AdClickEvent>() {
@Override
public AdClickEvent map(String s) throws Exception {
String[] s1 = s.split(",");
return new AdClickEvent(Long.parseLong(s1[0]), Long.parseLong(s1[1]), s1[2], s1[3], Long.parseLong(s1[4]));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<AdClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.timestamp * 1000));
SingleOutputStreamOperator<AdClickEvent> filterBlackListStream = adLogStream.keyBy(new KeySelector<AdClickEvent, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> getKey(AdClickEvent adClickEvent) throws Exception {
return new Tuple2<>(adClickEvent.userId, adClickEvent.adId);
}
})
.process(new FilterBlackList(100L));
SingleOutputStreamOperator<String> process = filterBlackListStream
.keyBy(value -> value.province)
.timeWindow(Time.minutes(60), Time.seconds(5))
.process(new AdCount());
// process.print();
filterBlackListStream.getSideOutput(new OutputTag<BlackListWarning>("BlackListOutputTag"){}).print();
env.execute();
}
}
class FilterBlackList extends KeyedProcessFunction<Tuple2<Long, Long>, AdClickEvent, AdClickEvent> {
public long cnt;
public FilterBlackList(long cnt) {
this.cnt = cnt;
}
ValueState<Long> count;
ValueState<Boolean> state;
// 定義一個狀態,需要保存當前用戶對當前廣告的點擊量 count
// lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
// 定義一個標識位,用來表示用戶是否已經在黑名單中
// lazy val isSendState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent",classOf[Boolean]))
@Override
public void open(Configuration parameters) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
state = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-sent", Boolean.class));
}
@Override
public void processElement(AdClickEvent value, Context ctx, Collector<AdClickEvent> out) throws Exception {
// 取出狀態資料
Long value1 = count.value();
// 如果是第一個資料,那么注冊第二天0點的定時器,用于清空狀態
if(value1==null || value1==0L) {
count.update(0L);
state.update(false);
long ts = (ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24);
ctx.timerService().registerProcessingTimeTimer(ts);
}
// 判斷 count 值是否達到上限,如果達到,并且之前沒有輸出過報警資訊,那么則報警
if(count.value()>cnt) {
if(!state.value()) {
// 旁路輸出資料
ctx.output(new OutputTag<BlackListWarning>("BlackListOutputTag"){},new BlackListWarning(value.userId,value.adId,"click over "+cnt+" times today"));
// 更新黑名單
state.update(true);
}
return;
}
count.update(count.value()+1);
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<AdClickEvent> out) throws Exception {
count.clear();
state.clear();
}
}
惡意登錄監控
方法一
最樸素的方法
// 輸入的登錄事件樣例類
// case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)
class LoginEvent {
public long userId;
public String ip;
public String eventType;
public long eventTime;
public LoginEvent(long userId, String ip, String eventType, long eventTime) {
this.userId = userId;
this.ip = ip;
this.eventType = eventType;
this.eventTime = eventTime;
}
@Override
public String toString() {
return "LoginEvent{" +
"userId=" + userId +
", ip='" + ip + '\'' +
", eventType='" + eventType + '\'' +
", eventTime=" + eventTime +
'}';
}
}
// 輸出的報警資訊樣例類
// case class Warning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)
class Warning {
public long userId;
public long firstFailTime;
public long lastFailTime;
public String warningMsg;
public Warning(long userId, long firstFailTime, long lastFailTime, String warningMsg) {
this.userId = userId;
this.firstFailTime = firstFailTime;
this.lastFailTime = lastFailTime;
this.warningMsg = warningMsg;
}
@Override
public String toString() {
return "Warning{" +
"userId=" + userId +
", firstFailTime=" + firstFailTime +
", lastFailTime=" + lastFailTime +
", warningMsg='" + warningMsg + '\'' +
'}';
}
}
public class LoginFailTwo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/LoginLog.csv");
stringDataStreamSource.map(new MapFunction<String, LoginEvent>() {
@Override
public LoginEvent map(String s) throws Exception {
String[] split = s.split(",");
return new LoginEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamr) -> event.eventTime * 1000))
.keyBy(value -> value.userId)
.process(new LoginWarning())
.print();
env.execute();
}
}
class LoginWarning extends KeyedProcessFunction<Long, LoginEvent, Warning> {
ListState<LoginEvent> log;
@Override
public void open(Configuration parameters) throws Exception {
log = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("log", LoginEvent.class));
}
@Override
public void processElement(LoginEvent value, Context ctx, Collector<Warning> out) throws Exception {
if(value.eventType.equals("fail")) {
// 先獲取之前失敗的事件
Iterator<LoginEvent> iterator = log.get().iterator();
if(iterator.hasNext()) {
// 如果之前已經有失敗的事件,就做判斷,如果沒有就把當前失敗事件保存進state
LoginEvent next = iterator.next();
if (value.eventTime < next.eventTime + 2){
out.collect(new Warning( value.userId,next.eventTime,value.eventTime,"在2秒內連續兩次登錄失敗,"));
}
// 更新最近一次的登錄失敗事件,保存在狀態里
log.clear();
}
// 如果是第一次登錄失敗,之前把當前記錄 保存至 state
log.add(value);
} else {
// 當前登錄狀態 不為 fail,則直接清除狀態
log.clear();
}
}
}
這有兩個很大的問題,計算的是最近兩秒內的情況,是寫死的,不能改,同時沒有考慮到亂序的資訊流,這個時候我們就需要使用到flink的cep了
FLINKCEP - COMPLEX EVENT PROCESSING FOR FLINK
FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink.
It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.
一個或多個由簡單事件構成的事件流通過簡單的規則匹配,然后輸出用戶想得到的資料–滿足規則的復雜事件,
方法二 CEP
public class LoginFailWithCep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/LoginLog.csv");
KeyedStream<LoginEvent, Long> loginEventStream = stringDataStreamSource.map(new MapFunction<String, LoginEvent>() {
@Override
public LoginEvent map(String s) throws Exception {
String[] split = s.split(",");
return new LoginEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamr) -> event.eventTime * 1000))
.keyBy(value -> value.userId);
// 定義匹配的模式
Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("begin").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("next")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.within(Time.seconds(2));
// 將 pattern 應用到 輸入流 上,得到一個 pattern stream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream, pattern);
// 用 select 方法檢出 符合模式的事件序列
SingleOutputStreamOperator<Warning> select = patternStream.select(new LoginFailMatch());
select.print();
env.execute();
}
}
class LoginFailMatch implements PatternSelectFunction<LoginEvent,Warning> {
@Override
public Warning select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent begin = map.get("begin").iterator().next();
LoginEvent next = map.get("next").iterator().next();
return new Warning(begin.userId,begin.eventTime,next.eventTime,"在2秒內連續2次登錄失敗,");
}
}
這算一個很簡單的cep,做了個簡單的入門,其實需要注意的點還很多
訂單支付實時監控
在電商網站中,訂單的支付作為直接與營銷收入掛鉤的一環,在業務流程中非常重要,對于訂單而言,為了正確控制業務流程,也為了增加用戶的支付意愿,網站一般會設定一個支付失效時間,超過一段時間不支付的訂單就會被取消,另外,對于訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平臺的交易資料來做一個實時對賬,在接下來的內容中,我們將實作這兩個需求,
在電商平臺中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候,用戶下單的行為可以表明用戶對商品的需求,但在現實中,并不是每次下單都會被用戶立刻支付,當拖延一段時間后,用戶支付的意愿會降低,所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防范訂單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設定一個失效時間(比如15分鐘),如果下單后一段時間仍未支付,訂單就會被取消,
方法一 CEP
這里的重點我們是要學一下cep中select函式的方法,原始碼如下
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided {@link PatternSelectFunction} is called. The pattern select function can produce
* exactly one resulting element.
*
* <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
* partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern
* timeout function can produce exactly one resulting element.
*
* <p>You can get the stream of timed-out data resulting from the
* {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
* {@link SingleOutputStreamOperator} resulting from the select operation
* with the same {@link OutputTag}.
*
* @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
* @param patternTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternSelectFunction The pattern select function which is called for each detected
* pattern sequence.
* @param <L> Type of the resulting timeout elements
* @param <R> Type of the resulting elements
* @return {@link DataStream} which contains the resulting elements with the resulting timeout
* elements in a side output.
*/
public <L, R> SingleOutputStreamOperator<R> select(
// 重點是以下幾個引數
final OutputTag<L> timedOutPartialMatchesTag,
final PatternTimeoutFunction<T, L> patternTimeoutFunction,
final PatternSelectFunction<T, R> patternSelectFunction) {
final TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternSelectFunction,
PatternSelectFunction.class,
0,
1,
TypeExtractor.NO_INDEX,
builder.getInputType(),
null,
false);
return select(
timedOutPartialMatchesTag,
patternTimeoutFunction,
rightTypeInfo,
patternSelectFunction);
}
可以發現select函式還可以回傳超時的時間流,比我們想象的強大得多
// 定義輸入的訂單事件樣例類
// case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)
// 定義輸出的訂單檢測結果樣例類
// case class OrderResult(orderId: Long, resultMsg: String)
class OrderResult {
public Long orderId;
public String resultMsg;
public OrderResult(Long orderId, String resultMsg) {
this.orderId = orderId;
this.resultMsg = resultMsg;
}
public OrderResult() {
}
@Override
public String toString() {
return "OrderResult{" +
"orderId=" + orderId +
", resultMsg='" + resultMsg + '\'' +
'}';
}
}
class OrderEvent {
public Long orderId;
public String eventType;
public Long eventTime;
public OrderEvent() {
}
public OrderEvent(Long orderId, String eventType, Long eventTime) {
this.orderId = orderId;
this.eventType = eventType;
this.eventTime = eventTime;
}
@Override
public String toString() {
return "OrderEvent{" +
"orderId=" + orderId +
", eventType='" + eventType + '\'' +
", eventTime=" + eventTime +
'}';
}
}
public class OrderTimeout {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/OrderLog.csv");
SingleOutputStreamOperator<OrderEvent> orderEventStream = stringDataStreamSource.map(new MapFunction<String, OrderEvent>() {
@Override
public OrderEvent map(String s) throws Exception {
String[] split = s.split(",");
return new OrderEvent(Long.parseLong(split[0].trim()), split[1].trim(), Long.parseLong(split[3].trim()));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamr) -> event.eventTime * 1000));
KeyedStream<OrderEvent, Long> keyedStream = orderEventStream.keyBy(value -> value.orderId);
// 定義匹配模式
Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("begin").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return orderEvent.eventType.equals("create");
}
}).followedBy("follow").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return orderEvent.eventType.equals("pay");
}
}).within(Time.minutes(15));
// 應用pattern到stream上
PatternStream<OrderEvent> pattern = CEP.pattern(keyedStream, orderPayPattern);
// select,提取事件序列,超時的事件要做報警提示
OutputTag<OrderResult> orderTimeout = new OutputTag<OrderResult>("orderTimeout"){};
SingleOutputStreamOperator<OrderResult> select = pattern.select(orderTimeout, new OrderTimeoutSelect(), new OrderPaySelect());
select.print();
select.getSideOutput(orderTimeout).print();
env.execute();
}
}
class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent,OrderResult> {
@Override
public OrderResult timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
Long begin = pattern.get("begin").iterator().next().orderId;
return new OrderResult(begin,"timeout");
}
}
class OrderPaySelect implements PatternSelectFunction<OrderEvent, OrderResult> {
@Override
public OrderResult select(Map<String, List<OrderEvent>> pattern) throws Exception {
Long follow = pattern.get("follow").iterator().next().orderId;
return new OrderResult(follow,"payed successfully");
}
}
方法二 直接使用狀態編程
遇到這種問題,直接理清思路就好,就先不寫了
來自兩條流的訂單交易匹配
對于訂單支付事件,用戶支付完成其實并不算完,我們還得確認平臺賬戶上是否到賬了,而往往這會來自不同的日志資訊,所以我們要同時讀入兩條流的資料來做合并處理,這里我們利用connect將兩條流進行連接,然后用自定義的CoProcessFunction進行處理
方法一:CONNECT + COPROCESS
這里使用了流之間的connect
// 輸入輸出的樣例類
// case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
// case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
class ReceiptEvent {
public String txId;
public String payChannel;
public Long timestamp;
public ReceiptEvent(String txId, String payChannel, Long timestamp) {
this.txId = txId;
this.payChannel = payChannel;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "ReceiptEvent{" +
"txId='" + txId + '\'' +
", payChannel='" + payChannel + '\'' +
", timestamp=" + timestamp +
'}';
}
}
class OrderEvent2 {
public Long orderId;
public String eventType;
public String txId;
public Long eventTime;
public OrderEvent2(Long orderId, String eventType, String txId, Long eventTime) {
this.orderId = orderId;
this.eventType = eventType;
this.txId = txId;
this.eventTime = eventTime;
}
@Override
public String toString() {
return "OrderEvent2{" +
"orderId=" + orderId +
", eventType='" + eventType + '\'' +
", txId='" + txId + '\'' +
", eventTime=" + eventTime +
'}';
}
}
public class OrderPayTxMatch {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/OrderLog.csv");
SingleOutputStreamOperator<OrderEvent2> orderEventStream = stringDataStreamSource.map(new MapFunction<String, OrderEvent2>() {
@Override
public OrderEvent2 map(String s) throws Exception {
String[] split = s.split(",");
return new OrderEvent2(Long.parseLong(split[0].trim()), split[1].trim(), split[2].trim(), Long.parseLong(split[3].trim()));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent2>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamr) -> event.eventTime * 1000));
KeyedStream<OrderEvent2, String> keyedStream = orderEventStream.keyBy(value -> value.txId);
DataStreamSource<String> stringDataStreamSource2 = env.readTextFile("data/ReceiptLog.csv");
SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = stringDataStreamSource2.map(new MapFunction<String, ReceiptEvent>() {
@Override
public ReceiptEvent map(String s) throws Exception {
String[] split = s.split(",");
return new ReceiptEvent(split[0].trim(), split[1].trim(), Long.parseLong(split[2].trim()));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<ReceiptEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamr) -> event.timestamp * 1000));
KeyedStream<ReceiptEvent, String> keyedStream2 = receiptEventStream.keyBy(value -> value.txId);
// 定義測輸出流
// OutputTag<OrderEvent2> outputTag = new OutputTag<OrderEvent2>("unmatchedPay") {};
// OutputTag<ReceiptEvent> outputTag2 = new OutputTag<ReceiptEvent>("unmatchedRec") {};
// connect 連接兩條流,匹配事件進行處理
SingleOutputStreamOperator<Tuple2<OrderEvent2, ReceiptEvent>> process = keyedStream.connect(keyedStream2).process(new OrderPayTxDetect());
// 列印輸出
process.print();
process.getSideOutput(new OutputTag<OrderEvent2>("unmatchedPay") {}).print();
process.getSideOutput(new OutputTag<ReceiptEvent>("unmatchedRec") {}).print();
env.execute();
}
}
class OrderPayTxDetect extends CoProcessFunction<OrderEvent2, ReceiptEvent, Tuple2<OrderEvent2, ReceiptEvent>> {
public ValueState<OrderEvent2> pay;
public ValueState<ReceiptEvent> receipt;
@Override
public void open(Configuration parameters) throws Exception {
pay = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent2>("pay", OrderEvent2.class));
receipt = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt", ReceiptEvent.class));
}
@Override
public void processElement1(OrderEvent2 value, Context ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
// pay 來了,考察有沒有對應的 receipt 來過
ReceiptEvent value1 = receipt.value();
if(value1!=null) {
// 如果已經有 receipt,正常輸出到主流
out.collect(new Tuple2<>(value,value1));
receipt.clear();
} else {
// 如果 receipt 還沒來,那么把 pay 存入狀態,注冊一個定時器等待 5 秒
pay.update(value);
ctx.timerService().registerProcessingTimeTimer(value.eventTime*1000L + 5000L);
}
}
@Override
public void processElement2(ReceiptEvent value, Context ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
//receipt來了,判斷有沒有對應的pay來過
OrderEvent2 value1 = pay.value();
if(value1!=null) {
// 如果已經有 pay,正常輸出到主流
out.collect(new Tuple2<>(value1,value));
pay.clear();
} else {
// 如果 pay 還沒來,那么把 receipt 存入狀態,注冊一個定時器等待 3 秒
receipt.update(value);
ctx.timerService().registerProcessingTimeTimer(value.timestamp*1000L + 3000L);
}
}
// 定時觸發, 有兩種情況,所以要判斷當前有沒有pay和receipt
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
//如果 pay 不為空,說明receipt沒來,輸出unmatchedPays
if (pay.value() != null){
ctx.output(new OutputTag<OrderEvent2>("unmatchedPay") {},pay.value());
}
if (receipt.value() != null){
ctx.output(new OutputTag<ReceiptEvent>("unmatchedRec") {},receipt.value());
}
// 清除狀態
pay.clear();
receipt.clear();
}
}
方法二:JOIN
public class OrderPayTxMatchWithJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/OrderLog.csv");
SingleOutputStreamOperator<OrderEvent2> orderEventStream = stringDataStreamSource.map(new MapFunction<String, OrderEvent2>() {
@Override
public OrderEvent2 map(String s) throws Exception {
String[] split = s.split(",");
return new OrderEvent2(Long.parseLong(split[0].trim()), split[1].trim(), split[2].trim(), Long.parseLong(split[3].trim()));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent2>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamr) -> event.eventTime * 1000));
KeyedStream<OrderEvent2, String> keyedStream = orderEventStream.keyBy(value -> value.txId);
DataStreamSource<String> stringDataStreamSource2 = env.readTextFile("data/ReceiptLog.csv");
SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = stringDataStreamSource2.map(new MapFunction<String, ReceiptEvent>() {
@Override
public ReceiptEvent map(String s) throws Exception {
String[] split = s.split(",");
return new ReceiptEvent(split[0].trim(), split[1].trim(), Long.parseLong(split[2].trim()));
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<ReceiptEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamr) -> event.timestamp * 1000));
KeyedStream<ReceiptEvent, String> keyedStream2 = receiptEventStream.keyBy(value -> value.txId);
keyedStream.intervalJoin(keyedStream2)
.between(Time.seconds(-5),Time.seconds(3))
.process(new OrderPayTxDetectWithJoin()).print();
env.execute();
}
}
class OrderPayTxDetectWithJoin extends ProcessJoinFunction<OrderEvent2, ReceiptEvent, Tuple2<OrderEvent2, ReceiptEvent>> {
@Override
public void processElement(OrderEvent2 left, ReceiptEvent right, Context ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
out.collect(new Tuple2<>(left,right));
}
}
電商用戶行為分析-Flink【Java重寫版本】到這里就介紹完了,小伙伴們點贊、收藏、評論,一鍵三連走起呀,下期見~~
最后再次感謝尚硅谷和Xiaoyuyu的指導和分享,本文僅用于交流學習,不用于商業性使用
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/294166.html
標籤:其他
上一篇:Flink 內核原理與實作-應用
