我想將自定義資料框寫入 eventhub。
val customDf = spark.read.json("path/to/json")
EventHub ConnectionString
val connectionString = new com.microsoft.azure.eventhubs.ConnectionStringBuilder("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxx=").setEventHubName("test")
val ehConf = EventHubsConf(connectionString.toString).setConsumerGroup("testing")
val eventhubSchema = spark.readStream.format("eventhubs").options(ehConf.toMap).option("eventhubs.partition.count", "4").load()
eventhubSchema.printSchema
將顯示 eventhub 主體的默認架構
現在我想把上面的customDf寫到eventhub
Method1:
ds = customDf \
.selectExpr("partitionKey", "body") \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
方法二:
ds = customDf \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
如何將 customDf 寫入 eventhub。我什至做了 select(get_json_object(cast to strong type) 但我得到了
org.apache.spark.sql.AnalysisException: cannot resolve 'body' given input columns
如何將customDf寫入eventhub
uj5u.com熱心網友回復:
您需要將資料框中的資料轉換為單列物件 - 二進制或字串 - 這實際上取決于您的消費者。最簡單的方法是使用to_json struct函式的組合將所有資料打包為 JSON :
import pyspark.sql.functions as F
stream = customDf \
.select(F.to_json(F.struct("*")).alias("body")) \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "...") \
.start()
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/387354.html
標籤:天蓝色 斯卡拉 阿帕奇火花 apache-spark-sql azure-eventhub
上一篇:Scala-覆寫抽象方法的型別類
