spark在風控用戶團伙中的應用
引言
2020年年初業務穩步增長,風控方面遇到了挑戰,作為C2C(類似于得物、咸魚)交易平臺,難免會遇到用戶團伙進行薅平臺羊毛、自買自賣的行為,風控團隊將用戶的歷史資料整合后,希望從中圈選出各個大大小小的團伙,來防止團伙內部和團伙之間進行交易,資料量大,緯度復雜,經過幾個周的磨合嘗試,以及和某些小額貸款公司的交流,終于確定了一套完整的團伙發現體系,最終走向資料的實時化,其中spark的GraphX和streaming占據重要位置,接下來簡述1.0版本的應用,
SparkGraphX
更新中,,,
Neo4j
neo4j是一個開源的圖資料庫(集群版不開源,按節點收費,賊貴,單節點免費),同一業務拆分了一下,能滿足當前的資料需求,只做查詢,不做計算,之前也考慮過計算,效率低,記憶體消耗高,最終選擇了SparkGraphX作為計算,
更新中,,,
SparkStreaming
sparkStreaming(有條件的用flink)做兩件事:
1、秒級實時關系存盤
資料源:binlog(kafka)
資料存盤:neo4j、kafka、redis
處理程序:(1)、普通資料,A用戶的a特征,直接存盤到neo4j、kafka、redis
(2)、多重join的資料,需要兩個topic進行join的,沒有用直接用join,用的是視窗函式+groupByKey
視窗大小是4s鐘,滑動間隔是2s鐘(根據業務本身延時和資料本身延時決定,因為此動作觸發后,app會有幾秒的等待時間,以及多個動作后的結果,保證風控資料到位后,業務才會去呼叫,否則,當羊毛黨都薅完羊毛了,才檢測到有風險,豈不是有些扯淡,所以有些動作比較卡,不是資料擁擠導致的,也許就說程式員在里面寫的sleep(n),哈哈哈),比如uid和支付id的關聯關系,需要兩個表,表中關鍵欄位分表是,(order_id, uid)(order_id, pay_id)
,先將兩個topic合流,然后調出相同欄位
.map(elem => (((elem._1, elem._2), elem._3), 1)) // ((提現表關聯 ,order_id),uid/pay_id)
.reduceByKeyAndWindow((x:Int, y:Int)=>x+y, Seconds(30), Seconds(15))
.map(elem=>(elem._1._1, elem._1._2)) //
.groupByKey() // (統一表名 ,訂單id) => (uid+pay_id)
.map(elem=>{
//整理資料,除了提現表關聯,還有很多關聯關系
}).foreach(存盤)
neo4j
//更新關系
private def relationShip(session: Session, uid: Int, flag_label:String, flag_type:String, fuid: String): Any = {
val sql = s"""
|match (p1:User{userId:$uid})-[r:$flag_type]->(p2:$flag_label{flagId:"$fuid"}) return p1.userId as uid
""".stripMargin
val result: StatementResult = session.run(sql)
if (!result.hasNext) {
val sql1 =
s"""
|MERGE (p1:User{userId:$uid})
|MERGE (p2:$flag_label{flagId:"$fuid"})
""".stripMargin
session.run(sql1)
val sql =
s"""
|MATCH (p1:User{userId:$uid}),(p2:$flag_label{flagId:"$fuid"})
|MERGE (p1)-[r:$flag_type{score:1}]->(p2)
|RETURN p1.userId as uid
""".stripMargin
val rel: StatementResult = session.run(sql)
if (rel.hasNext) {
val record = rel.next()
val uid = record.get("uid").toString
println(uid)
}else{
System.err.println("error:"+uid+flag_label+flag_type+fuid)
}
}
}
/**
* 獲取Driver
* @return
*/
def getDriver(): Driver = {
val url = "bolt://neo4j01:8687"
val user = "user"
val password = "password"
val driver = GraphDatabase.driver(url, AuthTokens.basic(user, password), Config.build()
.withMaxIdleSessions(1000)
.withConnectionLivenessCheckTimeout(10, TimeUnit.SECONDS)
.toConfig)
driver
}
/**
* 獲取Session
* @param driver
* @return
*/
def getSession(driver: Driver): Session = {
val session = driver.session()
session
}
kafka
def resToKafka(ssc: StreamingContext, kafkaDStreamValue: DStream[(String, String, String, String)]): Unit = {
//廣播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.put("group.id", "realtime")
p.put("acks", "all")
p.put("retries ", "1")
p.setProperty("bootstrap.servers", GetPropKey.brokers)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
//寫入Kafka
kafkaDStreamValue.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreachPartition(partition => {
partition.foreach(elem => {
val flag_label = elem._3
if (!flag_label.equals("null")) {
val auth_info = elem._1
val uid = elem._2.toInt
val flag_type = elem._4
val value = dataJson(uid, auth_info, flag_label, flag_type)
kafkaProducer.value.send("risk_user_auth_info", value)
}
})
})
}
})
}
/**
* json格式化
*
* @param uid
* @param fid
* @param flag_label
* @param flag_type
* @return
*/
def dataJson(uid: Int, fid: String, flag_label: String, flag_type: String): String = {
val map = Map(
"user_id" -> uid,
"flag_id" -> fid,
"flag_label" -> flag_label,
"flag_type" -> flag_type
)
JSONObject.apply(map).toString()
}
2、緯度關聯+存盤
資料源:risk_user_auth_info(1中寫入kafka的延時佇列)
資料存盤:redis
處理程序:從延時佇列中取出資料,到redis中查詢出關系,歸結到父節點(迭代版本中加入了演算法進行合并和拆分圖),再存入redis
Drools
更新中,,,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/252999.html
標籤:其他
