import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
object SparkStreamingKafka1 {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\\hadoop\\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo2")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.createOrReplaceTempView("persontab")
/* spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()*/
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved")
}
// .outputMode("complete")
.start()
.awaitTermination()
}
}
我要執行三個查詢。一個是我在上面的代碼中完成的聚合,并且作業正常。另外兩個是 where 子句查詢。如何在此處完成這兩個查詢。是否能夠將所有三個查詢的結果保存在一張表中,還是需要保存在不同的表中?請讓我知道如何以兩種方式做到這一點。
uj5u.com熱心網友回復:
package mysql.kafka.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.{min, max, avg}
object multiplequerieskafkastreaming {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\\hadoop\\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo3")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val stockDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
//getting yesterday's date
val yesterday = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy")
val yesterdaydate = formatter format yesterday
//println(yesterdaydate)
stockDF
.writeStream.foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.persist()
batchDF.createOrReplaceTempView("stocktab")
println(yesterdaydate)
val Current_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price from stocktab where stock_name='abc'")
println("select stock_name,stock_price from stocktab where stock_name='abc'")
Current_stock_pricedf.show()
val min_max_df= batchDF.sparkSession.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from stocktab""")
min_max_df.show()
val Yesterday_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price,date from stocktab where date='" yesterdaydate "'")
Yesterday_stock_pricedf.show()
min_max_df.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved min_max_df")
Current_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","current_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Current_stock_pricedf")
Yesterday_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","Yesterday_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Yesterday_stock_pricedf")
}
// .outputMode("complete")
.start()
.awaitTermination()
}
}
代碼作業正常。在df.sparkSession.sql("")的幫助下,創建了一個臨時表并在 foreachbatch 中進行了多個查詢。供參考:如何在 foreachBatch 中使用臨時表?. 需要在代碼中加入一些例外處理,避免出錯。除此之外,主要代碼作業正常。
謝謝@OneCricketeer的建議。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/322864.html
