我正在嘗試使用 writestream 方法將資料寫入 Kafka。源系統為我提供了以下屬性。
topic = 'mytopic'
host = "myhost.us-west-1.aws.confluent.cloud:9092"
userid = 'myuser'
password ='mypassword'
Cluster = 'cluster-numeric-test-03'
雖然我在 stackoverflow 上找到了許多使用主題、主機、用戶 ID、密碼的示例,但我幾乎找不到關于集群的任何檔案。我嘗試使用可以在檔案的這一部分中看到的以下引數:“spark.kafka.clusters.cluster.auth.bootstrap.servers” https://spark.apache.org/docs/latest/structured-streaming- kafka-integration.html#configuration
因此,我使用以下代碼進行連接:
(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password))
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option('spark.kafka.clusters.cluster.auth.bootstrap.servers',Cluster)
.start()
)
我收到以下錯誤:
kafkashaded.org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元資料中不存在主題 mytopic。
uj5u.com熱心網友回復:
實際上我在那里遺漏了一些東西。
.option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "PLAIN")
它可以在不添加集群詳細資訊的情況下作業。
因此,我使用以下代碼進行連接:
**(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password))
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.start()
)**
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/519593.html
