我想每天通過 Azure Databricks 向 Kafka 發送一次訊息。我希望將訊息作為批處理作業接收。
我需要將它們發送到 kafka 服務器,但我們不想讓集群整天為這項作業運行。
我看到了 databricks writeStream 方法(我還不能讓它作業,但這不是我問題的目的)。看來我需要日以繼夜地進行流式傳輸才能使其運行。
有沒有辦法將它用作批處理作業?我可以將訊息發送到 Kafka 服務器,并在收到訊息后關閉我的集群嗎?
df = spark \
.readStream \
.format("delta") \
.option("numPartitions", 5) \
.option("rowsPerSecond", 5) \
.load('/mnt/sales/marketing/numbers/DELTA/')
(df.select("Sales", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "rferferfeez.eu-west-1.aws.confluent.cloud:9092")
.option("topic", "bingofr")
.option("kafka.sasl.username", "jakich")
.option("kafka.sasl.password", 'ozifjoijfziaihufzihufazhufhzuhfzuoehza')
.option("checkpointLocation", "/mnt/sales/marketing/numbers/temp/")
.option("spark.kafka.clusters.cluster.sasl.token.mechanism", "cluster-buyit")
.option("request.timeout.ms",30) \
.option("includeHeaders", "true") \
.start()
)
kafkashaded.org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元資料中不存在主題 bingofr。

值得注意的是,我們還有活動中心。我會更好地將訊息發送到我們的事件中心,并實作一個寫入 kafka 的觸發函式嗎?
uj5u.com熱心網友回復:
通常,KAFKA 是一種持續的服務/能力。至少,我去過的地方。
我會考慮像 AZURE 這樣的云服務,其中基于每條訊息使用事件中心并使用 KAFKA API。永遠在線,按條訊息付費。
否則,您將需要一個批處理作業來啟動 KAFKA,執行您的操作,然后停止 KAFKA。但是,您沒有說明是否全部在 Databricks 上。
uj5u.com熱心網友回復:
只是想詳細說明@Alex Ott 的評論,因為它似乎有效。
通過添加“.trigger(availableNow=True)”,您可以
“定期啟動集群,處理自上一個周期以來可用的所有內容,然后關閉集群。在某些情況下,這可能會顯著節省成本。”
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
**(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password))
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option("kafka.security.protocol", "SASL_SSL")
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/519596.html
上一篇:基于聚合結果的透視列
