一、CEP
一個或多個由簡單事件構成的事件流通過一定的規則匹配,然后輸出用戶想得到的資料,滿足規則的復雜事件。
CEP支持在流上進行模式匹配,根據模式的條件不同,分為連續的條件或不連續的條件;模式的條件允許有時間的限制,當在條件范圍內沒有達到滿足的條件時,會導致模式匹配超時。
、
CEP就相當于在流上對event進行模式匹配。比如 連續兩條登錄失敗日志不超過2秒,則進行錯誤預警。
二、CEP使用流程
2.1先獲取流
case class LoginEvent(userId: String, ip: String, eventType: String, eventTime: String)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val loginEventStream = env.fromCollection(List(
LoginEvent("1", "192.168.0.1", "fail", "1558430842"),
LoginEvent("1", "192.168.0.2", "fail", "1558430843"),
LoginEvent("1", "192.168.0.3", "fail", "1558430844"),
LoginEvent("2", "192.168.10.10", "success", "1558430845")
)).assignAscendingTimestamps(_.eventTime.toLong)
2.2 定義Pattern
val loginFailPattern = Pattern.begin[LoginEvent]("begin")
.where(_.eventType.equals("fail”))//一條登錄失敗
.next("next")
.where(_.eventType.equals("fail”))//下一條登錄event也失敗
.within(Time.seconds(2)//兩條的間隔不超過兩秒
2.3 執行Pattern
PatternStream:
val input = ...
val pattern = ...
val patternStream = CEP.pattern(input, pattern)//一個輸入流+匹配的Pattern
val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
2.4 通過select或flatSelect獲取符合條件的流
select :
val loginFailDataStream = patternStream
.select((pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()
(second.userId, second.ip, second.eventType)
})
其回傳值僅為1條記錄。
flatSelect:通過實作PatternFlatSelectFunction,實作與select相似的功能。唯一的區別就是flatSelect方法可以回傳多條記錄。
1.8版本中,都過時,使用ProcessFunction 獲取。
三、案例
3.1 同用戶2秒內連續兩次登錄失敗
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 自定義測驗資料
val loginStream = env.fromCollection( List(
LoginEvent(1, "192.168.0.1", "fail", 1558430842),
LoginEvent(1, "192.168.0.2", "success", 1558430843),
LoginEvent(1, "192.168.0.3", "fail", 1558430844),
LoginEvent(1, "192.168.0.3", "fail", 1558430847),
LoginEvent(1, "192.168.0.3", "fail", 1558430848),
LoginEvent(2, "192.168.10.10", "success", 1558430850)
) )
.assignAscendingTimestamps(_.eventTime * 1000)
// 定義pattern,對事件流進行模式匹配
val loginFailPattern = Pattern.begin[LoginEvent]("begin")
.where(_.eventType == "fail")
.next("next")
.where(_.eventType == "fail")
.within(Time.seconds(2))
// 在輸入流的基礎上應用pattern,得到匹配的pattern stream
val patternStream = CEP.pattern( loginStream.keyBy(_.userId), loginFailPattern )
// 用select方法從pattern stream中提取輸出資料流
// import scala.collection.Map
// val loginFailDataStream : DataStream[Warning] = patternStream.select( ( patternEvents: Map[String, Iterable[LoginEvent]] ) => {
// // 從Map里取出對應的登錄失敗事件,然后包裝成warning
// val firstFailEvent = patternEvents.getOrElse("begin", null).iterator.next()
// val secondFailEvent = patternEvents.getOrElse("next", null).iterator.next()
// Warning( firstFailEvent.userId, firstFailEvent.eventTime, secondFailEvent.eventTime, "login fail waring" )
// } )
val loginFailDataStream = patternStream.select( new MySelectFuction() )
// 將得到的警告資訊流輸出sink
loginFailDataStream.print("warning")
env.execute("Login Fail Detect with CEP")
}
}
class MySelectFuction() extends PatternSelectFunction[LoginEvent, Warning] {
override def select(patternEvents: util.Map[String, util.List[LoginEvent]]): Warning = {
val firstFailEvent = patternEvents.getOrDefault("begin", null).iterator.next()
val secondFailEvent = patternEvents.getOrDefault("next", null).iterator.next()
Warning(firstFailEvent.userId, firstFailEvent.eventTime, secondFailEvent.eventTime, "login fail waring")
}
}
3.2 用戶下單后15分鐘沒有支付
// 輸入訂單事件資料流
case class OrderEvent( orderId: Long, eventType: String, eventTime: Long )
// 輸出訂單處理結果資料流
case class OrderResult( orderId: Long, resultMsg: String )
object OrderTimeout {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 讀入訂單資料
val orderEventStream = env.fromCollection(List(
OrderEvent(1, "create", 1558430842),
OrderEvent(2, "create", 1558430843),
OrderEvent(2, "other", 1558430845),
OrderEvent(2, "pay", 1558430850),
OrderEvent(1, "pay", 1558431920)
))
.assignAscendingTimestamps(_.eventTime * 1000)
// 定義一個帶時間限制的pattern,選出先創建訂單、之后又支付的事件流
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.followedBy("follow") //寬松連續,中間可以發生其他事情
.where(_.eventType == "pay")
.within(Time.minutes(15))
// 定義一個輸出標簽,用來標明側輸出流
val orderTimeoutOutputTag = OutputTag[OrderResult]("orderTimeout")
// 將pattern作用到input stream上,得到一個pattern stream
val patternStream = CEP.pattern( orderEventStream.keyBy(_.orderId), orderPayPattern )
import scala.collection.Map
// 呼叫select得到最后的復合輸出流
val complexResult: DataStream[OrderResult] = patternStream.select(orderTimeoutOutputTag)(
// pattern timeout function
( orderPayEvents: Map[String, Iterable[OrderEvent]], timestamp: Long ) => {
val timeoutOrderId = orderPayEvents.getOrElse("begin", null).iterator.next().orderId
OrderResult( timeoutOrderId, "order time out" )
}
)(
// pattern select function
( orderPayEvents: Map[String, Iterable[OrderEvent]]) => {
val payedOrderId = orderPayEvents.getOrElse("follow", null).iterator.next().orderId
OrderResult( payedOrderId, "order payed successfully" )
}
)
// 已正常支付的資料流
complexResult.print("payed")
// 從復合輸出流里拿到側輸出流
val timeoutResult = complexResult.getSideOutput( orderTimeoutOutputTag )
timeoutResult.print("timeout")
env.execute("Order Timeout Detect")
}
}
原文轉至https://blog.csdn.net/haibucuoba/article/details/97051972
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/12505.html
標籤:Spark
上一篇:云計算知識學習筆記
