我正在嘗試使用 While 回圈將資料發送到我的資料湖。
基本上,目的是在使用以下代碼從我的 Azure 服務總線收到資料時,不斷回圈代碼并將資料發送到我的資料湖:
此代碼從我的服務總線接收訊息
def myfunc():
with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
for msg in receiver:
# print("Received: " str(msg))
themsg = json.loads(str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
此代碼為訊息分配一個變數:
result = myfunc()
以下代碼將訊息發送到我的資料湖
rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
我希望幫助回圈代碼以不斷檢查訊息并將結果發送到我的資料湖。
我相信解決方案是通過 While 回圈完成的,但不確定
uj5u.com熱心網友回復:
僅僅因為你使用 Spark 并不意味著你不能回圈
首先,你只是從你的接收者回傳第一條訊息,所以它應該是這樣的
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
msg = str(next(receiver))
# print("Received: " msg)
themsg = json.loads(msg)
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
要回答你的問題,
while True:
result = json.dumps(myfunc())
rdd = sc.parallelize([result])
spark.read.json(rdd) \ # You should use rdd.toDF().json here instead
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
請記住,輸出檔案名不一致,您可能不希望它們被覆寫
或者,您應該考慮撰寫自己的Source/SparkDataStream定義 SparkSQL 源的類,這樣您的 main 方法中就不需要回圈,并且它由 Spark 本機處理
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/434775.html
標籤:Python 阿帕奇火花 pyspark 天蓝色数据块
上一篇:SparkDDL模式JSON結構
