1、介紹
FlinkCEP是在Flink之上實作的復雜事件處理(CEP)庫,它允許您在無窮無盡的事件流中檢測事件模式,使您有機會掌握資料中重要的內容,通常會用來做一些用戶操作APP的日志風控策略等多種復雜事件,下面詳細以用戶連續10s內登陸失敗超過3次告警為需求,進行全面講解,
1.1、整體需求資料詳解圖

2、官方案例
官方代碼案例如下:
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
2.1、官方案例總結
CEP編程步驟
a)定義模式序列
Pattern.<Class>begin("patternName").API...
基本都是按照如上的套路來新建自定義一個模式規則
后續的可以跟的API可以在官方中查看學習
Event Processing (CEP) | Apache Flink
b)將模式序列作用到流上
CEP.pattern(inputDataStream,pattern)
CEP.pattern()是固定格式寫法,
其中第一個引數,表示需要具體作用的流;
第二個引數,表示具體的自定義的模式,
c)提取匹配上的資料和輸出
由b)生成的流用process API來進行資料處理輸出,繼承PatternProcessFunction,重寫processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out)方法,
第一個引數,表示具體匹配上的資料,其中Map的key就是a)步驟中定義的"patternName"名稱,value就是該名稱具體對應規則匹配上的資料集;
第二個引數,表示沒匹配上的資料側輸出流
第三個引數,表示具體該函式處理完,需要對外輸出的內容收集,
3、需求案例詳解
下面就以從Socket中模擬讀取用戶操作日志資料,來進行資料CEP匹配資料輸出,
以如下代碼把讀進來的資料進行資料打平成JavaBean,該章節的講解以代碼段進行,后續章節會把demo代碼全部貼出來,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 設定成1,是為了能夠觸發watermark來計算
*/
env.setParallelism(1);
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<UserLoginLog>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((SerializableTimestampAssigner<UserLoginLog>) (element, recordTimestamp) -> element.getLoginTime())
);
3.1、使用begin.where.next.where.next
/**
* 10s鐘之內連續3次登陸失敗的才輸出,強制連續
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternOne = Pattern.<UserLoginLog>begin("start").where(new SimpleCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("second").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("third").where(new SimpleCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).within(Time.seconds(10));
如上根據設定判斷登陸狀態是否為失敗開始計數,連續第二條,第三條如果也同樣為失敗的話,就會輸出
//如下日志資料輸入,最終將輸出loginId為:11111、11112、11113、11116、11117、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
3.1.1需求輸出圖解

3.2、使用begin.times
/**
* 10s鐘之內連續3次登陸失敗的才輸出,不強制連續
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternTwo = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).within(Time.seconds(10));
如上根據設定判斷登陸狀態是否為失敗開始計數,只要在10秒之內出現第二條,第三條如果也同樣為失敗的話,就會輸出,該本質就是不需要連續出現,
//如下日志資料輸入,最終將輸出loginId為:11111、11112、11113、11116、11117、11118、11119、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
3.2.1、需求圖解

3.3、使用begin.times.consecutive
/**
* 10s鐘之內連續3次登陸失敗的才輸出,加上 consecutive 之后 就是 強制連續輸出
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternThree = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).consecutive().within(Time.seconds(10));
如上在比3.2的基礎上多加了一個consecutive之后,就變成跟3.1一樣的效果
//如下日志資料輸入,最終將輸出loginId為:11111、11112、11113、11116、11117、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
4、本Demo所有代碼
4.1、pom檔案
<properties>
<flink.version>1.14.3</flink.version>
<hadoop.version>2.7.5</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>2.4.0</kafka.version>
<redis.version>3.3.0</redis.version>
<lombok.version>1.18.6</lombok.version>
<fastjson.verson>1.2.72</fastjson.verson>
<jdk.version>1.8</jdk.version>
</properties>
<dependencyManagement>
<dependencies>
<!--hadoop 依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--flink 依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>force-shading</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--kafka依賴-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--redis依賴-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis.version}</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.verson}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
4.2、UserLoginLog類
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
class UserLoginLog {
/**
* 登陸id
*/
private int loginId;
/**
* 登陸時間
*/
private long loginTime;
/**
* 登陸狀態 1--登陸失敗 0--登陸成功
*/
private int loginStatus;
/**
* 登陸用戶名
*/
private String userName;
}
4.3、MyFlatMapFunction類
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
@Slf4j
public class MyFlatMapFunction implements FlatMapFunction<String, UserLoginLog> {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and
* transforms it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
@Override
public void flatMap(String value, Collector<UserLoginLog> out) throws Exception {
if (StringUtils.isNotBlank(value)) {
UserLoginLog userLoginLog = JSONObject.parseObject(value, UserLoginLog.class);
out.collect(userLoginLog);
}
}
}
4.4、MyPatternProcessFunction類
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
@Slf4j
public class MyPatternProcessFunction extends PatternProcessFunction<UserLoginLog, UserLoginLog> {
/**
* Generates resulting elements given a map of detected pattern events. The events are
* identified by their specified names.
*
* <p>{@link Context#timestamp()} in this case returns the time of the
* last element that was assigned to the match, resulting in this partial match being finished.
*
* @param match map containing the found pattern. Events are identified by their names.
* @param ctx enables access to time features and emitting results through side outputs
* @param out Collector used to output the generated elements
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
@Override
public void processMatch(Map<String, List<UserLoginLog>> match, Context ctx, Collector<UserLoginLog> out) throws Exception {
List<UserLoginLog> start = match.get("start");
out.collect(start.get(0));
}
}
4.4、主類
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
@Slf4j
public class CepLearning {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 設定成1,是為了能夠觸發watermark來計算
*/
env.setParallelism(1);
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<UserLoginLog>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((SerializableTimestampAssigner<UserLoginLog>) (element, recordTimestamp) -> element.getLoginTime())
);
/**
* 10s鐘之內連續3次登陸失敗的才輸出,強制連續
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternOne = Pattern.<UserLoginLog>begin("start").where(new SimpleCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("second").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("third").where(new SimpleCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).within(Time.seconds(10));
/**
* 10s鐘之內連續3次登陸失敗的才輸出,不強制連續
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternTwo = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).within(Time.seconds(10));
/**
* 10s鐘之內連續3次登陸失敗的才輸出,加上 consecutive 之后 就是 強制連續輸出
*/
Pattern<UserLoginLog, UserLoginLog> wherePatternThree = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {
@Override
public boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).consecutive().within(Time.seconds(10));
PatternStream<UserLoginLog> patternStream = CEP.pattern(dataStream, wherePatternOne);
PatternStream<UserLoginLog> patternStream1 = CEP.pattern(dataStream, wherePatternTwo);
PatternStream<UserLoginLog> patternStream2 = CEP.pattern(dataStream, wherePatternThree);
SingleOutputStreamOperator<UserLoginLog> process = patternStream.process(new MyPatternProcessFunction());
SingleOutputStreamOperator<UserLoginLog> process1 = patternStream1.process(new MyPatternProcessFunction());
SingleOutputStreamOperator<UserLoginLog> process2 = patternStream2.process(new MyPatternProcessFunction());
process.print("resultOutPut");
process1.print("resultOutPutTwo");
process2.print("resultOutPutThree");
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/428570.html
標籤:其他
上一篇:Flink學習之Table API(python版本)
下一篇:flume初級使用指南
