spark版本是:2.3.0。
kafka的版本是:Kafka_2.10-0.9.0.2.4.0.0-169 。
需求就是想手動維護offsets,可以消費到資料即可,希望大神們幫幫忙 真的很急。
uj5u.com熱心網友回復:
消費的代碼import java.io
import com.unicom.tools.MyUtils
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaMaintainOffset {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName)
val con = new SparkContext(conf)
val ssc = new StreamingContext(con,Seconds(5))
// ssc.checkpoint("D:\\html")
val brokerList = "132.194.94.195:6667,132.194.94.197:6667,132.194.94.198:6667,132.194.94.199:6667,132.194.94.200:6667,132.194.94.201:6667,132.194.94.202:6667,132.194.94.203:6667"
// val brokerList = "master:9092,slave1:9092,slave2:9092"
val kafkaParam: Map[String, String] = Map[String, String](
"metadata.broker.list"-> brokerList,
"auto.offset.reset" -> "smallest",
"enable.auto.commit" -> "false"
)
val fromOffset: Map[TopicAndPartition, Long] = MyUtils.getOffsets(Set("DLTE1","DLTE4","DLTE3","DLTE2","DMC","DLTE5"))
val messageHandler: MessageAndMetadata[String, String] => String = (x: MessageAndMetadata[String, String]) => x.message()
val value2: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,kafkaParam,fromOffset,messageHandler)
value2.foreachRDD(rdd =>{
// RDD非空時,保存 offset 到外部存盤的函式
if (! rdd.isEmpty()) {
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
MyUtils.saveOffsetsToRedis(ranges)
}
/* rdd.foreachPartition(p=>p.foreach(e=>{
var i=0
if (100%i==0) {
val string: String = e.toString
println(string)
}
i=i+1
}))*/
})
ssc.start()
ssc.awaitTermination()
}
}
uj5u.com熱心網友回復:
維護offsets 工具類 這個工具類會報錯 可能是由于spark:2.3.0版本問題 只支持0.10的import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{KafkaCluster, OffsetRange}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object MyUtils {
private val config = new JedisPoolConfig
//private val redisHost = "192.168.252.99"
private val redisHost = "132.194.43.199"
private val redisPort = 6379
// 最大連接
config.setMaxTotal(30)
// 最大空閑
config.setMaxIdle(10)
private val pool = new JedisPool(config, redisHost, redisPort, 10000)
private val topicPrefix = "kafka:topic"
private def getKey(topic: String, groupId: String = "", prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
private def getRedisConnection: Jedis = pool.getResource
// 從 redis 中獲取offsets
private def getOffsetFromRedis(topics: Set[String], groupId: String = ""): Map[TopicAndPartition, Long] = {
val jedis = getRedisConnection
val offsets = for (topic <- topics) yield {
import scala.collection.JavaConversions._
jedis.hgetAll(getKey(topic, groupId)).toMap
.map { case (partition, offset) => TopicAndPartition(topic, partition.toInt) -> offset.toLong }
}
jedis.close()
offsets.flatten.toMap
}
// 將 offsets 保存到 redis
def saveOffsetsToRedis(range: Array[OffsetRange], groupId: String = ""): Unit = {
val jedis = getRedisConnection
val offsets = for (range <- range) yield {
(range.topic, range.partition -> range.untilOffset)
}
val offsetsMap: Map[String, Map[Int, Long]] = offsets.groupBy(_._1).map { case (topic, buffer) => (topic, buffer.map(_._2).toMap) }
for ((topic, partitionAndOffset) <- offsetsMap) {
val offsets = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
import scala.collection.JavaConversions._
jedis.hmset(getKey(topic, groupId), offsets)
}
}
// 給定topics,獲取offset的最大、最小值
private def getMaxMinOffsets(topics: Set[String]): (Map[TopicAndPartition, Long], Map[TopicAndPartition, Long]) = {
// 給定topic引數,獲取kafka連接
val kafkaParams = Map("metadata.broker.list" -> "132.194.94.195:6667,132.194.94.197:6667,132.194.94.198:6667,132.194.94.199:6667,132.194.94.200:6667,132.194.94.201:6667,132.194.94.202:6667,132.194.94.203:6667")
// val kafkaParams = Map("metadata.broker.list" ->"master:9092,slave1:9092,slave2:9092")
val kc = new KafkaCluster(kafkaParams)
// 獲取partition的資訊
val tps = kc.getPartitions(topics).right.get
// 獲取 offset 的資訊(最大值、最小值)
val maxOffsets: Map[TopicAndPartition, Long] = kc.getLatestLeaderOffsets(tps).right.get.mapValues(x => x.offset)
val minOffsets: Map[TopicAndPartition, Long] = kc.getEarliestLeaderOffsets(tps).right.get.mapValues(x => x.offset)
(maxOffsets, minOffsets)
}
// 根據kafka中最大、最小Offset,校驗從redis獲取的offset
def getOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = {
// 從 kafka 中獲取 offsets 的最大最小值
val tuple: (Map[TopicAndPartition, Long], Map[TopicAndPartition, Long]) = getMaxMinOffsets(topics)
val maxOffsets: Map[TopicAndPartition, Long] = tuple._1
val minOffsets: Map[TopicAndPartition, Long] = tuple._2
assert(maxOffsets.keys == minOffsets.keys)
// 從 redis 中獲取 offsets
val offsets: Map[TopicAndPartition, Long] = getOffsetFromRedis(topics)
// 用 最大、最小值校驗 redis 中的 offsets
maxOffsets.map { case (tp, maxOffset) =>
val minOffset = minOffsets(tp)
val curOffset = offsets.getOrElse(tp, 1L)
val newOffset = if (curOffset > maxOffset) maxOffset else if (curOffset < minOffset) minOffset else curOffset
(tp, newOffset)
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/10306.html
標籤:數據倉庫
