文章目錄
- 1、需求:
- 2、知識點:
- 3、方法1:
- 4、方法2:
- 4.1、伴生類創建KafkaProducer包裝器
- 4.2、SparkStreaming消費kafka并寫入kafka
1、需求:
- 使用sparkStreaming對kafka中某topic資料進行資料處理后再重新寫入kafka
2、知識點:
- 廣播變數
- SparkStreaming連接kafka進行消費
- rdd算子寫入kafka
- 懶加載
- 伴生類與伴生物件的使用
- producerRecord手動序列化
3、方法1:
- KafkaProducer不可序列化,只能在foreachPartition內部創建
- 對于每個KafkaProducer都創建一次連接,不靈活且低效,不建議使用
4、方法2:
4.1、伴生類創建KafkaProducer包裝器
- 使用伴生類創建KafkaProducer包裝器,構造引數為生產者物件(懶加載,遇到action算子才真正創建),方法為
生產者物件KafkaProducer發送一條生產記錄producerRecord;然后在伴生物件的apply方法中new對應伴生類的物件(因為該類構造器需要傳入引數,所以順便使用匿名函式創建一個生產者作為物件的引數),這樣在其他地方就可以函式式呼叫該類下的方法了(可以理解為呼叫時已經創建好該類的匿名物件了)
class KafkaSink[K, V](producer: () => KafkaProducer[K, V]) extends Serializable {
//懶漢模式,定義一個producer
lazy val prod: KafkaProducer[K, V] = producer()
def send(topic: String, key: K, value: V) = {
prod.send(new ProducerRecord[K, V](topic, key, value))
}
def send(topic: String, value: V) = {
prod.send(new ProducerRecord[K, V](topic, value))
}
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]) = {
val createKafkaProducer = () => {
val produ = new KafkaProducer[K, V](config)
// //銷毀資料時做檢查,刪了的話,出現垃圾時無法回收
// sys.addShutdownHook{
// produ.close()
// }
produ
}
new KafkaSink[K, V](createKafkaProducer)
}
def apply[K, V](config: Properties): KafkaSink[K, V] = apply(config.toMap)
}
4.2、SparkStreaming消費kafka并寫入kafka
- 配置消費者引數:SparkStreaming使用直連的方式連接kafkatopic(策略為:消費本地資料且按照topic訂閱)
- SparkStreaming在消費時,會同時進行資料處理(根據用戶需求),然后再將清洗后的資料寫入一個新的topic
- 處理后的資料,把它封裝后通過生產者寫入到對應磁區內即可,這里我們采用廣播變數的方式,把封裝好的KafkaSink物件(傳入生產者的引數到伴生物件KafkaSink里,自動呼叫apply方法執行,apply方法回傳的是KafkaSink(producer)物件)傳到每個executor上,每個executor創建一次kafka消費者的連接,提高效率
- 在進行資料處理時,呼叫廣播變數的方法把結果資料封裝寫入到topic中
- 最后,懶加載需要行動算子才會觸發執行,可以加個foreach(x=>x)觸發
- 廣播變數里連接consumer的變數
KafkaSink[String, String](producerParams)因為是伴生物件,直接省略apply方法用伴生型別代替,如果不好理解的話,可以換成KafkaSink.apply(producerParams)
object ReadKafkaTopic_event_attendees_raw {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]")
.setAppName(this.getClass.getSimpleName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.kafka.consumer.poll.ms", "10000") //解決:Exception:after polling for 512,服務器不穩定
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")
//Kafka消費者引數
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "single:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "ea1",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "500" //一次拉取條數
)
val ea: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("event_attendees_raw")
, kafkaParams)
)
//kafka生產者引數
val producerParams = Map (
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG->"single:9092",
ProducerConfig.ACKS_CONFIG->"1",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer],
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer]
)
//用kafkaSink作為廣播的物件
val ks = ssc.sparkContext.broadcast(KafkaSink[String,String](producerParams))
ea.map(x => {
val info = x.value().split(",", -1)
Array(
(info(0), info(1).split(" "), "yes"),
(info(0), info(2).split(" "), "maybe"),
(info(0), info(3).split(" "), "invited"),
(info(0), info(4).split(" "), "no")
)
}).flatMap(x=>x).flatMap(x => x._2.map(y => (x._1, y, x._3)))
.filter(_._2!="").foreachRDD(rdd=>rdd.foreachPartition(iter=>{
iter.map(msg=>{
ks.value.send("event_attendees_ss",msg.productIterator.mkString(","))
}).foreach(x=>x)//懶執行需要行動算子,否則會一直卡著不動
}))
//.toStream.foreach(_.get())迭代器轉化為流,僅觸發行動操作,foreach也是行動算子,就夠了
ssc.start()
ssc.awaitTermination()
}
}
輸出結果:

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/271952.html
標籤:其他
上一篇:gitee相關指令
下一篇:hive sql系列(七)
