簡介
- SparkStreaming消費Kafka實作精確一次性消費. 保證訊息不丟失、不重復消費.
訊息處理的語意
At Least Once (至少處理一次):
- 訊息至少被處理一次
- 可以保證 資料不丟失, 但有可能存在資料重復問題,
At Most Once (最多處理一次)
- 訊息最多被處理一次
- 可以保證 資料不重復, 但有可能存在資料丟失問題.
Exactly Once (剛好處理一次) :
- 訊息剛好被處理一次
- 實際上并不是真的做到只對訊息處理一次, 而是能夠實作
訊息的可靠性和訊息的冪等性, 即對于上下游系統來說不存在資料重復和資料丟失的問題 - 實際上是通過
At Least Once+冪等性處理去實作Exactly Once 語意
消費資料程序及存在的問題

默認消費Kafka后是自動提交偏移量的(默認5秒自動提交一次), 那么就有可能有兩種情況發生
tip: 偏移量就是記錄每個消費者對每個磁區(佇列)消費到哪, 一般保存在 kafka 的consumer_offsets主題中
情況1、先提交了偏移量再處理訊息
- 如果先提交了偏移量后, 處理資料后準備落盤的程序中行程掛了. 但是提交了偏移量, 那么下次會從最新的偏移量位置開始消費, 所以之前沒有落盤的資料就丟失了.

情況2、處理訊息后, 再提交偏移量
- 如果再處理完訊息后, 行程掛了, 無法提交最新消費的偏移量, 那么下次還是會繼續從舊的偏移量位置開始消費, 那么就有可能導致資料的重復消費

可以發現消費一條訊息有兩個步驟處理訊息和提交偏移量, 而我們又無法保證這兩個步驟的原子性, 即同時成功或者同時失敗那么就有可能導致資料的丟失或者重復消費
實作Exactly Once
方法一: 使用事務
- 實作Exactly Once語意的關鍵是保證
處理訊息和提交偏移量的原子性. - 所以只要把這兩個操作放到一個事務里, 不管是先處理訊息和還是先提交偏移量都可以保證訊息不丟失和不重復
實作
- 比如手動維護消費訊息的偏移量, 并把偏移量放到MySQL中, 然后資料的落盤也放到MySQL中, 而MySQL是支持事務的, 那么我們就可以保證著兩個操作的原子性了.
缺點:
- 對存盤層有依賴, 只能使用支持事務的存盤層
- 事務性能不高
- 并且一個存盤層容易存在單點故障壓力過大, 如果做分布式又需要做分布式事務增加了復雜性
方法二: 手動提交偏移量 + 冪等性
- 先確保真正處理完資料后再提交偏移量, 但是可能提交偏移量失敗, 導致重復消費了, 這時就要做資料的冪等性保存了, 即資料無論被保存多少次效果都是一樣的, 不會存在重復資料.
冪等性保存實作
- 有些存盤層本身支持冪等性操作的, 比如MySQL的主鍵盤, 和唯一索引, 相同id插入一次和插入一百次都是一樣的(相同會插入失敗). 還有Eleaticsearch的主鍵id也同樣天然支持冪等操作(相同會覆寫). 還有Redis也是, 支持冪等操作的存盤層遠比支持事務的存盤層多, 并且性能也比事務好
- 如果使用的存盤層本身不支持冪等操作, 可能就需要自己手動實作保證冪等性了或者去重了.
偽代碼實作:
object Test {
case class UserLog(id:Int, name:String){}
def main(args: Array[String]): Unit = {
/**
* 1、初始化SparkStreaming、5秒采集一次資料
*/
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
/**
2、從Redis中讀取消費者組groupId消費主題topic的偏移量offset
*/
val topic = "topic-log"
val groupId = "consumer-007"
/**
3、 對businessProcessing業務處理使用精準一次消費
*/
ExactOneUtil.Builder().streamingContext(ssc).topicGroup(topic, groupId).build(businessProcessing)
ssc.start()
ssc.awaitTermination()
}
/** 業務處理 */
def businessProcessing(offsetDStream: DStream[ConsumerRecord[String, String]], builder: Builder): Unit = {
// 拿到此次Dstream
val jsonObjectDStream: DStream[JSONObject]= offsetDStream.map(msg => {
val jsonObj: JSONObject = JSON.parseObject(msg.value())
// .....
jsonObj
})
jsonObjectDStream.foreachRDD(rdd => {
rdd.foreachPartition(jsonObjList => {
// 假設id是主鍵, 天然支持冪等, 無論保存多少次都是一樣
val resultData: Iterator[UserLog] = jsonObjList.map { obj => {
UserLog(obj.getIntValue("id"), obj.getString("nama"))
}}
// 對處理后的結果reasultDat進行落盤
resultData.toList
// ..... save to MySQL or Es or Redis
})
// 處理完一批rdd資料并確保落盤后提交offset
builder.saveOffsetrange()
})
}
ExactOneUtil
object ExactOneUtil {
var builder: Builder =_
def Builder(): Builder = {
this.builder = new Builder()
this.builder
}
def stop(): Unit ={
this.builder.saveOffsetrange()
}
class Builder {
var topic: String =_
var groupId: String =_
var ssc: StreamingContext =_
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] // key-磁區id, value-偏移量
def topicGroup(topic: String,groupId: String): Builder ={
this.topic = topic
this.groupId = groupId
this
}
def streamingContext(ssc: StreamingContext): Builder = {
this.ssc = ssc
this
}
def build(fun: (DStream[ConsumerRecord[String, String]], Builder) => Unit ): Unit ={
var baseInputDStream: InputDStream[ConsumerRecord[String, String]] = null
var offsetMap: Map[TopicPartition, Long] = RedisOffsetUtil.getOffset(topic, groupId)
if(offsetMap != null && offsetMap.nonEmpty){
baseInputDStream = OffsetKafkaUtil.getKafkaStream(topic, ssc, offsetMap, groupId)
}else{
baseInputDStream = OffsetKafkaUtil.getKafkaStream(topic, ssc, groupId)
}
val offsetDStream = filterOffsetRange(baseInputDStream);
// 業務處理
fun(offsetDStream, this)
}
private def filterOffsetRange(dStream: InputDStream[ConsumerRecord[String, String]]): DStream[ConsumerRecord[String, String]] = {
val offsetDStream: DStream[ConsumerRecord[String, String]] = dStream.transform(rdd => {
// KafkaRDD
this.offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
})
offsetDStream
}
def saveOffsetrange(): Unit ={
RedisOffsetUtil.saveOffset(this.topic, this.groupId, this.offsetRanges)
}
}
}
OffsetUtil
trait OffsetUtil {
// 獲取偏移量
def getOffset(topicName: String, groupId: String): Map[TopicPartition, Long]
// 保存偏移量
def saveOffset(topicName: String, groupId: String, offsetArray: Array[OffsetRange])
}
object OffsetRedisUtil {
/** 1-在Redis 存盤消費者組對某個主題消費的偏移量
*
* Reids 存盤格式設計
* key: 關鍵字 + 主題 + 消費者組
* value: 用Hash存盤
* hash key: 磁區
* hash value: 偏移量
* Key Hash Value
* offset:xx_topic:xx_groupId 磁區id_01 偏移量值
* offset:xx_topic:xx_groupId 磁區id_02 偏移量值
* offset:xx_topic:xx_groupId 磁區id_03 偏移量值
*
* @param topicName 主題名稱
* @param groupId 消費者組
*/
override def saveOffset(topicName: String, groupId: String, offsetArray: Array[OffsetRange]): Unit = {
val keyName: String = createKeyName(topicName, groupId)
// 1-取出每個磁區的最新偏移量到map
val map = new util.HashMap[String, String]()
for (elem <- offsetArray) {
map.put(elem.partition.toString, elem.untilOffset.toString)
}
//
if (map.size() > 0){
JedisUtil.hmset(keyName, map)
}
}
/**
* 2- 從Redis 獲取某個主題的某個消費者組消費的偏移量
*/
override def getOffset(topicName: String, groupId: String): Map[TopicPartition, Long] = {
val keyName: String = createKeyName(topicName, groupId)
val map: util.HashMap[String, String] = JedisUtil.hgetAll(keyName)
//將HashMap[String, String]轉換成Map[TopicPartition, Long] 回傳
import scala.collection.JavaConverters._
map.asScala.map{
case (partitionId, offset) => {
val partition = new TopicPartition(topicName, partitionId.toInt)
(partition, offset.toLong)
}
}.toMap
}
def createKeyName(topicName: String, groupId: String): String = {
"offset" + ":" + topicName + ":" + groupId
}
}
OffsetKafkaUtil
object OffsetKafkaUtil {
var param = collection.mutable.Map(
"bootstrap.servers" -> "192.168.2.102:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest", //latest: 表示自動重置偏移量為最新的偏移量
"enable.auto.commit" -> (false: java.lang.Boolean) // 是否自動提交偏移量
)
//從最新的偏移量位置讀取資料
def getKafkaStream(topic: String,ssc:StreamingContext,groupId:String): InputDStream[ConsumerRecord[String,String]]={
param("group.id")=groupId
KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Array(topic),param)
)
}
//從指定的偏移量位置讀取資料
def getKafkaStream(topic: String,ssc:StreamingContext,offsetMap[TopicPartition,Long],groupId:String): InputDStream[ConsumerRecord[String,String]]={
param("group.id")=groupId
KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Array(topic),param,offsetMap)
)
}
}
其他
pom.xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/280652.html
標籤:其他
上一篇:泰迪杯 A 題(更新中)
