資料格式:以腳本模擬生成的:{“price”:18,“id”:123},實時資料,求一個以scala編程實作的flink實時計算demo,實作結果:1、以id分組,每件商品價格累計;2、以id分組,上一分鐘每件商品的銷售額累計;3、銷售總額。謝謝謝謝,萬分感謝。求助無門了。初學者,弄了一個星期了。
要求flink能實作如下spark一樣的計算結果:
object OrderConsumer {
//Redis配置
val dbIndex = 0
//每件商品總銷售額
val orderTotalKey = "app::order::total"
//每件商品上一分鐘銷售額
val oneMinTotalKey = "app::order::product"
//總銷售額
val totalKey = "app::order::all"
def main(args: Array[String]): Unit = {
// 創建 StreamingContext 時間片為1秒
val conf = new SparkConf().setMaster("local").setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(1))
// Kafka 配置
val topics = Set("order")
val brokers = "127.0.0.1:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder")
// 創建一個 direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//決議JSON
val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line._2)))
// 按ID分組統計個數與價格總合
val orders = events.map(x => (x.getString("id"), x.getLong("price"))).groupByKey().map(x => (x._1, x._2.size, x._2.reduceLeft(_ + _)))
//輸出
orders.foreachRDD(x =>
x.foreachPartition(partition =>
partition.foreach(x => {
println("id=" + x._1 + " count=" + x._2 + " price=" + x._3)
//保存到Redis中
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
//每個商品銷售額累加
jedis.hincrBy(orderTotalKey, x._1, x._3)
//上一分鐘第每個商品銷售額
jedis.hset(oneMinTotalKey, x._1.toString, x._3.toString)
//總銷售額累加
jedis.incrBy(totalKey, x._3)
RedisClient.pool.returnResource(jedis)
})
))
ssc.start()
ssc.awaitTermination()
}
}
謝謝
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/8009.html
標籤:Spark
