目的:
Spark流式處理是微批次進行處理的
那么雙流join 的時候 如何保證各個批次 以及跨批次進行join呢???
Spark流處理進行雙流join:
1.延遲資料會join不上
2.該如何join
問題
Spark雙流join可能發生的情況:
1.左右
2.無右
3.左無
注意:
左右分布代表 不同的流 有資料
無表示 沒有資料
即:
左無:就是左邊有資料,右邊沒有資料
解決思路:
下圖

測驗
Spark雙流join問題展現
這里就不已Kafka資料進行測驗 :
使用sock資料即可
先測驗資料是否打通:
code:
package code.com.hivemetabi.task
import java.io.Serializable
import code.com.common.util.ParserUtil
import code.com.hivemetabi.bean.TechBean.OrderInfo
import code.com.hivemetabi.bean.TechBean.Order
import com.common.util.ContextUtil
import org.apache.flink.api.java.utils.ParameterTool
/**
* author : sxwang
* date : 2020/11/13 9:36
* version: 1.0
*/
class MultiStreamJoinTask(params: ParameterTool) extends Serializable {
private val className = "MultiStreamJoinTask"
private val jobName = s"bi_${className}"
private val batch: Int = params.getRequired("batch").toInt
private val host: String = params.getRequired("host")
private val port1 = params.getRequired("port1").toInt
private val port2 = params.getRequired("port2").toInt
def execute={
val ssc = ContextUtil.getStreamingContext(jobName, batch)
val s1 = ssc.socketTextStream(host,port1)
.map(data=>{
val fileds = data.split(",")
(fileds(0),ParserUtil.arrConvert2CaseClass[Order](classOf[Order], fileds, 2))
})
val s2 = ssc.socketTextStream(host,port2)
.map(data=>{
val fileds = data.split(",")
(fileds(0),ParserUtil.arrConvert2CaseClass[OrderInfo](classOf[OrderInfo], fileds, 4))
})
//join
s1.fullOuterJoin(s2).print()
ssc.start()
ssc.awaitTermination()
}
}
object MultiStreamJoinTask {
def apply(params: ParameterTool): MultiStreamJoinTask = new MultiStreamJoinTask(params)
def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)
MultiStreamJoinTask(parameterTool).execute
}
}
package code.com.hivemetabi.bean
/**
* author : sxwang
* date : 2020/11/13 9:48
* version: 1.0
*/
object TechBean {
case class Order(orderId:String,orderNum:String)
case class OrderInfo(orderId:String,userName:String,skuName:String,skuNum:String)
case class OrderDetail(var orderId:String,var orderNum:String, var userName:String,var skuName:String,var skuNum:String){
def builderOrder(order:Order): OrderDetail ={
if(null!=order) {
this.orderId = order.orderId
this.orderNum = order.orderNum
}
this
}
def builderOrderInfo(orderInfo:OrderInfo):OrderDetail={
if(null!=orderInfo) {
this.userName=orderInfo.userName
this.skuName=orderInfo.skuName
this.skuNum=orderInfo.skuNum
}
this
}
}
}
結果1:
輸入資料:
port : 8888 8889
8888:
1,200
2,100
3,300
8889 :
1,sxwang,spark,2
1,ygy,flink,3
2,mm,miao,100
3,xmm,miao2,200
結果:
join 的三種情況:
左右:
(2,(Some(Order(2,100)),Some(OrderInfo(2,mm,miao,100))))
(3,(Some(Order(3,300)),Some(OrderInfo(3,xmm,miao2,200))))
(1,(Some(Order(1,200)),Some(OrderInfo(1,sxwang,spark,2))))
(1,(Some(Order(1,200)),Some(OrderInfo(1,ygy,flink,3))))
無右:
(2,(None,Some(OrderInfo(2,mm,miao,100))))
(3,(None,Some(OrderInfo(3,xmm,miao2,200))))
(1,(None,Some(OrderInfo(1,sxwang,spark,2))))
(1,(None,Some(OrderInfo(1,ygy,flink,3))))
左無:
(2,(Some(Order(2,100)),None))
(3,(Some(Order(3,300)),None))
(1,(Some(Order(1,200)),None))
結論:
因為上面代碼里面:
沒有對 join 的情況進行做處理
所以 結果會有三種情況
這就是 Spark雙流join 的問題!!
改進:
因為上面的join 你自己是不知道 到底 哪一邊沒有join上
稍微改進一下
package code.com.hivemetabi.task
import java.io.Serializable
import code.com.common.util.ParserUtil
import code.com.hivemetabi.bean.TechBean.{Order, OrderDetail, OrderInfo}
import com.common.util.ContextUtil
import org.apache.flink.api.java.utils.ParameterTool
/**
* author : sxwang
* date : 2020/11/13 9:36
* version: 1.0
*/
class MultiStreamJoinTask(params: ParameterTool) extends Serializable {
private val className = "MultiStreamJoinTask"
private val jobName = s"bi_${className}"
private val batch: Int = params.getRequired("batch").toInt
private val host: String = params.getRequired("host")
private val port1 = params.getRequired("port1").toInt
private val port2 = params.getRequired("port2").toInt
def execute={
val ssc = ContextUtil.getStreamingContext(jobName, batch)
val s1 = ssc.socketTextStream(host,port1)
.map(data=>{
val fileds = data.split(",")
(fileds(0),ParserUtil.arrConvert2CaseClass[Order](classOf[Order], fileds, 2))
})
val s2 = ssc.socketTextStream(host,port2)
.map(data=>{
val fileds = data.split(",")
(fileds(0),ParserUtil.arrConvert2CaseClass[OrderInfo](classOf[OrderInfo], fileds, 4))
})
//join
s1.fullOuterJoin(s2).map({
case (orderId,(Some(order),Some(orderInfo))) =>{
OrderDetail().builderOrder(order).builderOrderInfo(orderInfo)
}
case (orderId,(None,Some(orderInfo))) =>{
println("左邊沒有匹配上")
}
case (orderId,(Some(order),None)) =>{
println("右邊沒有匹配上")
}
case _=> Nil
}).print()
ssc.start()
ssc.awaitTermination()
}
}
object MultiStreamJoinTask {
def apply(params: ParameterTool): MultiStreamJoinTask = new MultiStreamJoinTask(params)
def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)
MultiStreamJoinTask(parameterTool).execute
}
}
結果:
左邊沒有匹配上
左邊沒有匹配上
左邊沒有匹配上
-------------------------------------------
Time: 1605236390000 ms
-------------------------------------------
()
()
()
右邊沒有匹配上
右邊沒有匹配上
-------------------------------------------
Time: 1605236400000 ms
-------------------------------------------
()
()
-------------------------------------------
Time: 1605236410000 ms
-------------------------------------------
OrderDetail(3,300,xmm,miao2,200)
-------------------------------------------
Time: 1605236420000 ms
-------------------------------------------
-------------------------------------------
Time: 1605236430000 ms
-------------------------------------------
-------------------------------------------
Time: 1605236440000 ms
-------------------------------------------
-------------------------------------------
Time: 1605236450000 ms
-------------------------------------------
-------------------------------------------
Time: 1605236460000 ms
-------------------------------------------
右邊沒有匹配上
右邊沒有匹配上
右邊沒有匹配上
-------------------------------------------
Time: 1605236470000 ms
-------------------------------------------
()
()
()
-------------------------------------------
Time: 1605236480000 ms
-------------------------------------------
左邊沒有匹配上
左邊沒有匹配上
左邊沒有匹配上
左邊沒有匹配上
-------------------------------------------
Time: 1605236490000 ms
-------------------------------------------
()
()
()
()
-------------------------------------------
Time: 1605236500000 ms
-------------------------------------------
-------------------------------------------
Time: 1605236510000 ms
-------------------------------------------
-------------------------------------------
Time: 1605236520000 ms
-------------------------------------------
OrderDetail(2,100,mm,miao,100)
OrderDetail(3,300,xmm,miao2,200)
OrderDetail(1,200,sxwang,spark,2)
OrderDetail(1,200,ygy,flink,3)
-------------------------------------------
Time: 1605236530000 ms
-------------------------------------------
決議結果:
左邊沒有匹配上:說明左邊沒有資料,右邊有資料
右邊沒有匹配上:說明右邊沒有資料,左邊有資料
注意:
我左邊輸入的資料: 3條
1,200
2,100
3,300
右邊輸入的資料: 4條
1,sxwang,spark,2
1,ygy,flink,3
2,mm,miao,100
3,xmm,miao2,200
查看有一個的結果:
左邊沒有匹配上
左邊沒有匹配上
左邊沒有匹配上
-------------------------------------------
Time: 1605236390000 ms
-------------------------------------------
()
()
()
右邊沒有匹配上
右邊沒有匹配上
-------------------------------------------
Time: 1605236400000 ms
-------------------------------------------
()
()
-------------------------------------------
Time: 1605236410000 ms
-------------------------------------------
OrderDetail(3,300,xmm,miao2,200)
說明 :
右邊4條匹配上1條
左邊3條匹配上1條
結果就是 :
匹配上一條
那么 如何都匹配上,就是我們這篇文章的重點!!!!
解決:
上面圖片開始提出了思路,基于次之上給與總結:
1.
與flink流的join原理不同的是:
Spark雙流join是對倆個流做滿外連接 ,
因為網路延遲等關系,不能保證每個視窗中的資料key都能匹配上,
這樣勢必會出現三種情況:
(some,some),(None,some),(Some,None)
2.根據這三種情況,下面做一下詳細決議:
(some,some)——
1號流和2號流中key能正常進行邏輯運算,
但是考慮到2號流后續可能會有剩下的資料到來,
所以需要將1號流中的key保存到redis,以等待接下來的資料
(None,Some)——
找不到1號流中對應key的資料,
需要去redis中查找1號流的快取,如果找不到,
則快取起來,等待1號流
(Some,None)——
找不到2號流中的資料,需要將key保存到redis,以等待接下來的資料,
并且去reids中找2號流的快取,如果有,則join,然后洗掉2號流的快取
這里的中間快取可以使用:
redis、hbase、phoenix、clickhouse 很多
alluxio 不建議使用 為了做雙流join 使用它 維護成本太高 完全沒有必要
還要注意的是:
1.這兩個流 快取的時候 要快取多久???
2號流上面說到 匹配上就刪掉快取里的 匹配上的資料
那么1號流呢??
1.類似watermark 也給一個過期時間!!!
clickhouse、redis hbase 創建表的時候是可以 設定 ttl的
解決:
我使用的是 clickhouse
解決:code
package code.com.hivemetabi.task
import java.io.Serializable
import code.com.common.util.ParserUtil
import code.com.hivemetabi.bean.TechBean._
import com.clickhouse.stream.util.ClickHouseUtil
import com.common.util.ContextUtil
import org.apache.flink.api.java.utils.ParameterTool
/**
* author : sxwang
* date : 2020/11/13 9:36
* version: 1.0
*/
class MultiStreamJoinTask(params: ParameterTool) extends Serializable {
private val className = "MultiStreamJoinTask"
private val jobName = s"bi_${className}"
private val batch: Int = params.getRequired("batch").toInt
private val host: String = params.getRequired("host")
private val port1 = params.getRequired("port1").toInt
private val port2 = params.getRequired("port2").toInt
def execute={
val ssc = ContextUtil.getStreamingContext(jobName, batch)
val s1 = ssc.socketTextStream(host,port1)
.map(data=>{
val fileds = data.split(",")
(fileds(0),ParserUtil.arrConvert2CaseClass[Order](classOf[Order], fileds, 2))
})
val s2 = ssc.socketTextStream(host,port2)
.map(data=>{
val fileds = data.split(",")
(fileds(0),ParserUtil.arrConvert2CaseClass[OrderInfo](classOf[OrderInfo], fileds, 4))
})
//join
s1.fullOuterJoin(s2).map({
case (orderId,(Some(order),Some(orderInfo))) =>{
//1.conn
val ckCon = new ClickHouseUtil()
//2.快取s1
val insertSQL = ParserUtil.getFiledsInfo2InsertSQL[Order](classOf[Order],order,"test","orderCatch").toString()
ckCon.saveToClickHouse(insertSQL)
//ttl
val ttlSQL = ParserUtil.deleteSQLByTTL("test", "orderCatch", 1)
if (!ttlSQL.equals("error")) {
ckCon.saveToClickHouse(ttlSQL)
}
//3. join
val orderDetail = OrderDetail().builderOrder(order).builderOrderInfo(orderInfo)
//4.獲取快取里 s2流 沒有匹配上 s1 的 資料 + 匹配上 就洗掉s2快取里面的 匹配上的資料
val querySQLS2UseS1Key = s"select * from test.orderInfoCatch where orderId='${order.orderId}'"
val rs = ckCon.searchFromClickHouse(querySQLS2UseS1Key)
import scala.collection.JavaConverters._
val orderInfoQueryArray = ParserUtil.resultSetToList[OrderInfoQuery](rs,classOf[OrderInfoQuery]).asScala.toArray
//4.1 join 快取的s2 洗掉 匹配上的 s2
val orderDetails = orderInfoQueryArray.map(orderInfoQuery => {
val orderInfo = OrderInfo().buildOrderInfo(orderInfoQuery)
// 洗掉s2對應的key
val deleteSQL = ParserUtil.getFiledsInfo2DeleteSQLByKey[OrderInfo](classOf[OrderInfo],orderInfo, "test", "orderInfoCatch").toString()
OrderDetail().builderOrder(order).builderOrderInfo(orderInfo)
}).toList
orderDetail::orderDetails
}
case (orderId,(None,Some(orderInfo))) =>{
println("左邊沒有匹配上")
//1.conn
val ckCon = new ClickHouseUtil()
//2.找s1快取
val querySQLS2UseS1Key = s"select * from test.orderCatch where orderId ='${orderInfo.orderId}'"
val rs = ckCon.searchFromClickHouse(querySQLS2UseS1Key)
import scala.collection.JavaConverters._
val orderQueryArray = ParserUtil.resultSetToList[OrderQuery](rs,classOf[OrderQuery]).asScala.toArray
if(!orderQueryArray.nonEmpty){
//快取s2
val insertSQL = ParserUtil.getFiledsInfo2InsertSQL[OrderInfo](classOf[OrderInfo],orderInfo,"test","orderInfoCatch").toString()
println(insertSQL)
ckCon.saveToClickHouse(insertSQL)
Nil
}else{
// join
val orderDetails = orderQueryArray.map(orderQuery => {
val order = Order().buildOrder(orderQuery)
OrderDetail().builderOrder(order).builderOrderInfo(orderInfo)
}).toList
orderDetails
}
}
case (orderId,(Some(order),None)) =>{
println("右邊沒有匹配上")
//1.conn
val ckCon = new ClickHouseUtil()
//2.快取s1
val insertSQL = ParserUtil.getFiledsInfo2InsertSQL[Order](classOf[Order],order,"test","orderCatch").toString()
ckCon.saveToClickHouse(insertSQL)
//ttl
val ttlSQL = ParserUtil.deleteSQLByTTL("test", "orderCatch", 1)
if (!ttlSQL.equals("error")) {
ckCon.saveToClickHouse(ttlSQL)
}
//3.找s2快取 進行join
//4.獲取快取里 s2流 沒有匹配上 s1 的 資料 + 匹配上 就洗掉s2快取里面的 匹配上的資料
val querySQLS2UseS1Key = s"select * from test.orderInfoCatch where orderId='${order.orderId}'"
val rs = ckCon.searchFromClickHouse(querySQLS2UseS1Key)
import scala.collection.JavaConverters._
val orderInfoQueryArray = ParserUtil.resultSetToList[OrderInfoQuery](rs,classOf[OrderInfoQuery]).asScala.toArray
//4.1 join 快取的s2 洗掉 匹配上的 s2
val orderDetails = orderInfoQueryArray.map(orderInfoQuery => {
val orderInfo = OrderInfo().buildOrderInfo(orderInfoQuery)
// 洗掉s2對應的key
val deleteSQL = ParserUtil.getFiledsInfo2DeleteSQLByKey[OrderInfo](classOf[OrderInfo], orderInfo, "test", "orderInfoCatch").toString()
println(deleteSQL)
ckCon.saveToClickHouse(deleteSQL)
OrderDetail().builderOrder(order).builderOrderInfo(orderInfo)
}).toList
orderDetails
}
case _=> Nil
}).print()
ssc.start()
ssc.awaitTermination()
}
}
object MultiStreamJoinTask {
def apply(params: ParameterTool): MultiStreamJoinTask = new MultiStreamJoinTask(params)
def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)
MultiStreamJoinTask(parameterTool).execute
}
}
結果演示:
1.啟動程式
查看 結果:
-------------------------------------------
Time: 1605600660000 ms
-------------------------------------------
-------------------------------------------
Time: 1605600670000 ms
-------------------------------------------
-------------------------------------------
Time: 1605600680000 ms
-------------------------------------------
2.先輸入 右邊流 資料:
查看 clickhouse 保存了右邊資料 說明 :
為匹配到 左邊 流 資料快取在表里
8889輸入資料:
1,sxwang,spark,2
1,ygy,flink,3
2,mm,miao,100
3,xmm,miao2,200
查看結果:
-------------------------------------------
Time: 1605600900000 ms
-------------------------------------------
20/11/17 16:15:06 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/11/17 16:15:06 WARN storage.BlockManager: Block input-1-1605600906600 replicated to only 0 peer(s) instead of 1 peers
20/11/17 16:15:07 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/11/17 16:15:07 WARN storage.BlockManager: Block input-1-1605600907600 replicated to only 0 peer(s) instead of 1 peers
左邊沒有匹配上
insert into test.orderInfoCatch(orderId,userName,skuName,skuNum,dateline) values('2','mm','miao','100','2020-11-17 16:15:10')
左邊沒有匹配上
insert into test.orderInfoCatch(orderId,userName,skuName,skuNum,dateline) values('3','xmm','miao2','200','2020-11-17 16:15:10')
左邊沒有匹配上
insert into test.orderInfoCatch(orderId,userName,skuName,skuNum,dateline) values('1','sxwang','spark','2','2020-11-17 16:15:10')
左邊沒有匹配上
insert into test.orderInfoCatch(orderId,userName,skuName,skuNum,dateline) values('1','ygy','flink','3','2020-11-17 16:15:10')
-------------------------------------------
Time: 1605600910000 ms
-------------------------------------------
List()
List()
List()
List()
-------------------------------------------
Time: 1605600920000 ms
-------------------------------------------
查看click house:
最開始 這兩個快取表都是 空的

3. 左邊插入資料:
理想的結果是:
orderCatch表快取了 左邊 流的資料 右邊流 表里面 匹配上的資料被刪掉
8888插入資料:
1,200
2,100
3,300
查看結果:
-------------------------------------------
Time: 1605601190000 ms
-------------------------------------------
20/11/17 16:19:56 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/11/17 16:19:56 WARN storage.BlockManager: Block input-0-1605601196000 replicated to only 0 peer(s) instead of 1 peers
20/11/17 16:19:57 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/11/17 16:19:57 WARN storage.BlockManager: Block input-0-1605601197000 replicated to only 0 peer(s) instead of 1 peers
右邊沒有匹配上
currentDate: Tue Nov 17 16:20:00 CST 2020
lastUpdatedAt: Tue Nov 17 16:13:10 CST 2020
diff: 410061
alter table test.orderInfoCatch delete where orderId = '2'
右邊沒有匹配上
currentDate: Tue Nov 17 16:20:00 CST 2020
lastUpdatedAt: Tue Nov 17 16:20:00 CST 2020
diff: 280
alter table test.orderInfoCatch delete where orderId = '3'
右邊沒有匹配上
currentDate: Tue Nov 17 16:20:00 CST 2020
lastUpdatedAt: Tue Nov 17 16:20:00 CST 2020
diff: 534
alter table test.orderInfoCatch delete where orderId = '1'
alter table test.orderInfoCatch delete where orderId = '1'
-------------------------------------------
Time: 1605601200000 ms
-------------------------------------------
List(OrderDetail(2,100,'mm','miao','100'))
List(OrderDetail(3,300,'xmm','miao2','200'))
List(OrderDetail(1,200,'sxwang','spark','2'), OrderDetail(1,200,'ygy','flink','3'))
-------------------------------------------
Time: 1605601210000 ms
-------------------------------------------
總結 :
目前 是沒有問題的
查看 click house 快取結果:


注意:
代碼里 我設定了 左表 ttl 是1分鐘 而且是 當當超過1分鐘 有資料插入左流 快取表的時候 會觸發
查看一分鐘后 結果

這都過三分鐘了 :
那么 再往左流 里 快取資料呢??
8888:
4,500
查看結果:
-------------------------------------------
Time: 1605601480000 ms
-------------------------------------------
20/11/17 16:24:43 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/11/17 16:24:43 WARN storage.BlockManager: Block input-0-1605601483400 replicated to only 0 peer(s) instead of 1 peers
右邊沒有匹配上
currentDate: Tue Nov 17 16:24:50 CST 2020
lastUpdatedAt: Tue Nov 17 16:20:00 CST 2020
diff: 290145
-------------------------------------------
Time: 1605601490000 ms
-------------------------------------------
List()
-------------------------------------------
Time: 1605601500000 ms
-------------------------------------------
查看clickhouse:


說明 ttl 是ok的
再測驗 右流 有新的資料產生:
8889插入資料:
4,xmm,miao3,500
5,xmm,miao4,600
那么 如果上面代碼ok:
結果是什么呢??
應該是:
clickhouse 里 :
左流 還是 4 那條資料
右流 快取 5 那條資料 4那條資料被列印
驗證結果:
-------------------------------------------
Time: 1605601750000 ms
-------------------------------------------
20/11/17 16:29:13 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/11/17 16:29:13 WARN storage.BlockManager: Block input-1-1605601753000 replicated to only 0 peer(s) instead of 1 peers
20/11/17 16:29:14 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/11/17 16:29:14 WARN storage.BlockManager: Block input-1-1605601754000 replicated to only 0 peer(s) instead of 1 peers
左邊沒有匹配上
左邊沒有匹配上
insert into test.orderInfoCatch(orderId,userName,skuName,skuNum,dateline) values('5','xmm','miao4','600','2020-11-17 16:29:20')
-------------------------------------------
Time: 1605601760000 ms
-------------------------------------------
List(OrderDetail('4','500',xmm,miao3,500))
List()
-------------------------------------------
Time: 1605601770000 ms
-------------------------------------------
查看clickhouse 結果:


注意:
為什么會列印兩次 左邊沒有匹配上??
因為 右流 輸入兩條資料 方法進來 會列印的
可以說和當前批次沒有匹配到, 再快取里 匹配到
(實際上 沒有做和當前批次匹配 只是 join 判斷 左邊沒有資料 )
ok 那么以上就是 Spark 雙流 join 的 解決思路 以及實作
詳細代碼 :
我相信大家都可以完成
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/224134.html
標籤:其他
