import java.util
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import redis.clients.jedis.{Jedis, JedisPool}
import redis.clients.util.Pool
import scala.collection.mutable.ArrayBuffer
/**
* 創建時間:2022-02-23
* 創建人: xiaotao
* 資料流向:hiveToRedis
* 需求:好書精講專欄頁(二期)
* 專欄頁,分類區,版單串列,服務端回傳,10個講書專輯型別的專輯,按專輯14天內的播放次數,由多到少倒敘排序,
*/
object GoodBookSpeakWellDateToRedis {
private[this] var jedisPool: Pool[Jedis] = _
/**
* 創建redis連接池
* @param host 地址
* @param port 埠號
* @param timeout 超時
* @param password 密碼
*/
def init(host: String, port: Int, timeout: Int, password: String): Unit = {
jedisPool = new JedisPool(new GenericObjectPoolConfig, host, port, timeout, password)
}
/**
* 添加資料到zset中
* @param key zset 的key
* @param story_14d_play_cts score (14天內播放次數)
* @param story_id value 專輯id
*/
def zadd(key: String, story_14d_play_cts: Int, story_id: String): Unit = {
val jedis = jedisPool.getResource
jedis.zadd(key,story_14d_play_cts,story_id)
jedis.close()
}
/**
* 洗掉key中指定元素
*/
def zrem(key: String,member :String): Unit ={
val jedis = jedisPool.getResource
jedis.zrem(key,member)
jedis.close()
}
// 資料存入redis
def getResultToRedis(spark: SparkSession): Unit = {
val resultDataNew: DataFrame = spark.sql(
"""
|select story_id
| ,cast(story_14d_play_cts as int) as story_14d_play_cts
|from ads.ads_ks_log_story_play_hsjj_14h_sum_a_d
|order by story_14d_play_cts desc
|""".stripMargin)
resultDataNew.show(20)
val mapRDD: RDD[(String, Int)] = resultDataNew.rdd.map(row => (row.getString(0), row.getInt(1)))
//今天的10條資料
val nowDate: Array[(String,Int)] = mapRDD.collect()
val jedis = jedisPool.getResource
import scala.collection.JavaConversions._
//昨天的redis中的10條資料
val beforeDate: util.Set[String] = jedis.zrevrange("EXPLAIN_BOOK_ZS", 0, -1)
jedis.close()
//昨天和今天相同的album_id
val sameDate = new ArrayBuffer[String]
//今天相對于昨天不相同的album_id
val nowNoSameDate = new ArrayBuffer[(String, Int)]
//昨天相對于今天不相同的album_id
val beforeNoSameDate: ArrayBuffer[String] = new ArrayBuffer[String]
for(elem <- nowDate){
if (beforeDate.contains(elem._1)){
sameDate += elem._1
zadd("EXPLAIN_BOOK_ZS",elem._2,elem._1)
} else{
nowNoSameDate += elem
}
}
if(sameDate.length >0 & sameDate.length < 10) {
for (elem <- beforeDate) {
if (!sameDate.contains(elem)) {
beforeNoSameDate += elem
}
}
}else if(sameDate.length == 0){
for(elem <- beforeDate){
beforeNoSameDate += elem
}
}
var i:Int = 0
if(sameDate.length!=10){
//洗掉一個添加一個
while (i<=nowNoSameDate.length-1){
zrem("EXPLAIN_BOOK_ZS",beforeNoSameDate(i))
zadd("EXPLAIN_BOOK_ZS",nowNoSameDate(i)._2,nowNoSameDate(i)._1)
i+=1
}
}
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("GoodBookSpeakWellDateToRedis")
// .master("local[*]")
.config("spark.driver.allowMultipleContexts", true)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val password = ""
val host = ""
val port = 6379
val timeout = 1000
init(host, port, timeout, password)
//資料存入redis
getResultToRedis(spark)
spark.stop()
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/432162.html
標籤:其他
上一篇:我的朋友去國外出差回不來了
