Flink CEP
- Flink CEP的 概述與分類
- Pattern API
- Pattern分類
- 模式序列
- 模式的檢測
- 匹配事件的提取
- 超時事件的提取
- Flink CEP原理 非確定有限自動機
Flink CEP的 概述與分類
- Flink CEP的 概述
CEP 即Complex Event Processing即復雜事件處理,Flink CEP是在Flink種實作復雜事件處理的庫,處理事件的規則被叫做Pattern,Flink CEP提供了Pattern API,用于對輸入資料進行復雜事件的規則定義,用來提取符合規則的事件序列, - 三種Pattern API: 個體模式,組合模式,模式組
- 應用:實時監控惡意登錄,虛假交易,金融行業識別風險、營銷廣告等
Pattern API
Pattern分類
- 個體模式:個體模式包括單例模式和回圈模式,單例模式只接收一個事件,而回圈模式可以接受多個事件,
(1)量詞
(2 ) 條件 .where,.or().until() utils可作為終止條件,以便清理狀態
start.times(3).where(_.behavior.startsWirh('fav'))
start.oneOrTwo//出現1次或2次
start.timesOrMore(2).optional.greedy//匹配出現0、2或多次
- 組合模式
val start = Pattern.begin('start')
- 模式組: 將一個模式序列作為條件潛逃在一個個體里面,成為一組模式
模式序列
- 嚴格臨近,所有時間按照嚴格的順序出現,中間沒有任何不匹配的事件,由.next()指定
- 寬松臨近,允許中間出現不匹配的事件,由.followedby指定
- 非確定性寬松臨近,表示之前已經匹配過的事件也可以再次使用,由.followedByAny 指定
- 不希望出現某種臨近關系,比如.notNext()
- 所有模式必須以.begin()開始,模式序列不能以.notFollowedBy()結束,not型別的模式不能被optional所修飾,可以為模式指定時間約束,用來要求在多長時間內匹配有效
模式的檢測
指定要查找的模式序列后,就可以將其應用于輸入流以檢測潛在匹配,呼叫CEP.pattern(),給定輸入流和模式,就能得到一個PatternStream,PatternStream里面輸出的是每次匹配成功以后,按模式輸出的匹配流
val input:DataStream[Event] = ...
val pattern:Pattern[Event,_] = ...
val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)
匹配事件的提取
創建PatternStream之后,就可以應用select或者flatSelect方法,從檢測到的事件序列中提取事件,每個匹配成功的事件序列都會呼叫它,select()以一個selectfunction 作為引數,每個成功匹配的事件序列都會呼叫它,select()以一個Map[String,Iterable[IN]] 來接收匹配到的事件序列,其中key就是每個模式的名稱,而value就是所有接收到的事件的Iterable型別,
def selectFn(pattern:Map[String,Iterable[IN]]):OUT={
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
OUT(startEvent,endEvent)
}
package com.lagou.mycep;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class LoginDemo {
public static void main(String[] args) throws Exception {
/**
* 1、資料源
* 2、在資料源上做出watermark
* 3、在watermark上根據id分組keyby
* 4、做出模式pattern
* 5、在資料流上進行模式匹配
* 6、提取匹配成功的資料
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<LoginBean> data = env.fromElements(
new LoginBean(1L, "fail", 1597905234000L),
new LoginBean(1L, "success", 1597905235000L),
new LoginBean(2L, "fail", 1597905236000L),
new LoginBean(2L, "fail", 1597905237000L),
new LoginBean(2L, "fail", 1597905238000L),
new LoginBean(3L, "fail", 1597905239000L),
new LoginBean(3L, "success", 1597905240000L)
);
//2、在資料源上做出watermark
SingleOutputStreamOperator<LoginBean> watermarks = data.assignTimestampsAndWatermarks(new WatermarkStrategy<LoginBean>() {
@Override
public WatermarkGenerator<LoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<LoginBean>() {
long maxTimeStamp = Long.MIN_VALUE;
@Override
public void onEvent(LoginBean event, long eventTimestamp, WatermarkOutput output) {
maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
}
long maxOutOfOrderness = 500l;
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimestamp) -> element.getTs()));
//3、在watermark上根據id分組keyby
KeyedStream<LoginBean, Long> keyed = watermarks.keyBy(value -> value.getId());
//4、做出模式pattern
Pattern<LoginBean, LoginBean> pattern = Pattern.<LoginBean>begin("start").where(new IterativeCondition<LoginBean>() {
@Override
public boolean filter(LoginBean value, Context<LoginBean> ctx) throws Exception {
return value.getState().equals("fail");
}
})
.next("next").where(new IterativeCondition<LoginBean>() {
@Override
public boolean filter(LoginBean value, Context<LoginBean> ctx) throws Exception {
return value.getState().equals("fail");
}
})
.within(Time.seconds(5));
//5、在資料流上進行模式匹配
PatternStream<LoginBean> patternStream = CEP.pattern(keyed, pattern);
//6、提取匹配成功的資料
SingleOutputStreamOperator<Long> result = patternStream.process(new PatternProcessFunction<LoginBean, Long>() {
@Override
public void processMatch(Map<String, List<LoginBean>> match, Context ctx, Collector<Long> out) throws Exception {
// System.out.println(match);
out.collect(match.get("start").get(0).getId());
}
});
result.print();
env.execute();
}
}
超時事件的提取
Flink CEP開發流程:
- DataSource種的資料轉換成DataStream
- 定義Pattern,并將DataStream和Pattern組合轉換成PatternStream
- Pattern經過select,process等算子轉換成DataStream;
- 再次轉換的DataStream經過處理后sink到目標庫
SingleOutputStreamOperator<PayEvent> result = patternStream.select(orderTimeoutOutput,new PatternTimeoutFunction<PayEvent,PayEvent>(){
@Override
public PayEvent timeout(Map<String,List<PayEvent>> map,long l) throws Exception{
return map.get("begin").get(0)
},new PatternSelectFucntion<PayEvent,PayEvent>(){
@Override
public PayEvent select(Map<String,List<PayEvent>> map) throws Exception{
return map.get("pay").get(0);
}
}
})
//獲得未匹配成功的測流輸出
DataStream<PayEvent> sideOutput = result.getSideOutput(orderTimeoutOutput);
完整代碼展示
package com.lagou.mycep;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
public class PayDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<PayBean> data = env.fromElements(
new PayBean(1L, "create", 1597905234000L),
new PayBean(1L, "pay", 1597905235000L),
new PayBean(2L, "create", 1597905236000L),
new PayBean(2L, "pay", 1597905237000L),
new PayBean(3L, "create", 1597905239000L)
);
SingleOutputStreamOperator<PayBean> watermarks = data.assignTimestampsAndWatermarks(new WatermarkStrategy<PayBean>() {
@Override
public WatermarkGenerator<PayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<PayBean>() {
long maxTimeStamp = Long.MIN_VALUE;
long maxOutOfOrderness = 500l;
@Override
public void onEvent(PayBean event, long eventTimestamp, WatermarkOutput output) {
maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimestamp) -> element.getTs()));
KeyedStream<PayBean, Long> keyed = watermarks.keyBy(value -> value.getId());
//pattern
Pattern<PayBean, PayBean> pattern = Pattern.<PayBean>begin("start").where(new IterativeCondition<PayBean>() {
@Override
public boolean filter(PayBean value, Context<PayBean> ctx) throws Exception {
return value.getState().equals("create");
}
})
.followedBy("next").where(new IterativeCondition<PayBean>() {
@Override
public boolean filter(PayBean value, Context<PayBean> ctx) throws Exception {
return value.getState().equals("pay");
}
})
.within(Time.seconds(600));
PatternStream<PayBean> patternStream = CEP.pattern(keyed, pattern);
//select
OutputTag<PayBean> outoftime = new OutputTag<PayBean>("outoftime"){};
SingleOutputStreamOperator<PayBean> result = patternStream.select(outoftime, new PatternTimeoutFunction<PayBean, PayBean>() {
@Override
public PayBean timeout(Map<String, List<PayBean>> pattern, long timeoutTimestamp) throws Exception {
return pattern.get("start").get(0);
}
}, new PatternSelectFunction<PayBean, PayBean>() {
@Override
public PayBean select(Map<String, List<PayBean>> pattern) throws Exception {
return pattern.get("start").get(0);
}
});
DataStream<PayBean> sideOutput = result.getSideOutput(outoftime);
sideOutput.print();
env.execute();
}
}
Flink CEP原理 非確定有限自動機
從開始狀態,根據不同的輸入,自動繼續進行狀態轉換的程序

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/266636.html
標籤:其他
上一篇:Charles教程
