我收到錯誤訊息
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:251)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:115)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:115)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:404)
at io.sekai.core.streaming.KafkaDataGenerator.readFromCSVFile(KafkaDataGenerator.scala:38)
當我加載 csv 檔案時
spark2
.readStream
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
//.schema(schema)
.option("delimiter", ",")
.option("maxFilesPerTrigger", 1)
.csv(path)
我嘗試了另一種格式的選項,例如
spark2
.readStream
.format("csv")
.option("inferSchema", value = true)
.option("header", value = true)
//.schema(schema)
.option("delimiter", ",")
.option("maxFilesPerTrigger", 1)
.csv(path)
我想推斷架構并注釋掉顯式架構用法。
csv 檔案示例如下:
id,Energy Data,Distance,Humidity,Ambient Temperature,Cold Water Temperature,Vibration Value 1,Vibration Value 2,Handle Movement
1,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
2,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
3,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
4,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
5,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
6,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
7,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
8,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
9,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
10,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
這里有什么問題,因為我嚴格按照選項說明進行操作,但是推斷是如何發生的?
uj5u.com熱心網友回復:
您在這里有 2 個選擇:
- 在運行流查詢之前,將資料樣本寫入目的地一次。當您再次運行流式查詢時,將推斷架構。
- 設定
spark.sql.streaming.schemaInference為true:
spark.sql("set spark.sql.streaming.schemaInference=true")
從檔案:
默認情況下,來自基于檔案的源的結構化流需要您指定架構,而不是依賴 Spark 自動推斷它。此限制確保一致的架構將用于流式查詢,即使在失敗的情況下也是如此。對于臨時用例,您可以通過將 spark.sql.streaming.schemaInference 設定為 true 來重新啟用模式推斷。
uj5u.com熱心網友回復:
在創建流式源 DataFrame 時,我們必須指定架構。
從檔案:
默認情況下,來自基于檔案的源的結構化流要求您指定架構,而不是依賴 Spark 自動推斷它。此限制確保一致的架構將用于流式查詢,即使在失敗的情況下也是如此。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/322620.html
