我正在嘗試執行為我的 Microsoft 提供的資料生成器功能來測驗到事件中心的流資料。
不幸的是,我不斷收到錯誤
Processing failure: No such file or directory
當我嘗試執行該功能時:
%scala
DummyDataGenerator.start(15)
有人可以查看代碼并幫助解讀我收到錯誤的原因:
class DummyDataGenerator:
streamDirectory = "/FileStore/tables/flight"
None # suppress output
我不確定如何將上述單元格呼叫到函式 DummyDataGenerator
%scala
import scala.util.Random
import java.io._
import java.time._
// Notebook #2 has to set this to 8, we are setting
// it to 200 to "restore" the default behavior.
spark.conf.set("spark.sql.shuffle.partitions", 200)
// Make the username available to all other languages.
// "WARNING: use of the "current" username is unpredictable
// when multiple users are collaborating and should be replaced
// with the notebook ID instead.
val username = com.databricks.logging.AttributionContext.current.tags(com.databricks.logging.BaseTagDefinitions.TAG_USER);
spark.conf.set("com.databricks.training.username", username)
object DummyDataGenerator extends Runnable {
var runner : Thread = null;
val className = getClass().getName()
val streamDirectory = s"dbfs:/tmp/$username/new-flights"
val airlines = Array( ("American", 0.17), ("Delta", 0.12), ("Frontier", 0.14), ("Hawaiian", 0.13), ("JetBlue", 0.15), ("United", 0.11), ("Southwest", 0.18) )
val reasons = Array("Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft")
val rand = new Random(System.currentTimeMillis())
var maxDuration = 3 * 60 * 1000 // default to three minutes
def clean() {
System.out.println("Removing old files for dummy data generator.")
dbutils.fs.rm(streamDirectory, true)
if (dbutils.fs.mkdirs(streamDirectory) == false) {
throw new RuntimeException("Unable to create temp directory.")
}
}
def run() {
val date = LocalDate.now()
val start = System.currentTimeMillis()
while (System.currentTimeMillis() - start < maxDuration) {
try {
val dir = s"/dbfs/tmp/$username/new-flights"
val tempFile = File.createTempFile("flights-", "", new File(dir)).getAbsolutePath() ".csv"
val writer = new PrintWriter(tempFile)
for (airline <- airlines) {
val flightNumber = rand.nextInt(1000) 1000
val deptTime = rand.nextInt(10) 10
val departureTime = LocalDateTime.now().plusHours(-deptTime)
val (name, odds) = airline
val reason = Random.shuffle(reasons.toList).head
val test = rand.nextDouble()
val delay = if (test < odds)
rand.nextInt(60) (30*odds)
else rand.nextInt(10)-5
println(s"- Flight #$flightNumber by $name at $departureTime delayed $delay minutes due to $reason")
writer.println(s""" "$flightNumber","$departureTime","$delay","$reason","$name" """.trim)
}
writer.close()
// wait a couple of seconds
//Thread.sleep(rand.nextInt(5000))
} catch {
case e: Exception => {
printf("* Processing failure: %s%n", e.getMessage())
return;
}
}
}
println("No more flights!")
}
def start(minutes:Int = 5) {
maxDuration = minutes * 60 * 1000
if (runner != null) {
println("Stopping dummy data generator.")
runner.interrupt();
runner.join();
}
println(s"Running dummy data generator for $minutes minutes.")
runner = new Thread(this);
runner.run();
}
def stop() {
start(0)
}
}
DummyDataGenerator.clean()
displayHTML("Imported streaming logic...") // suppress output
uj5u.com熱心網友回復:
由于這一行,此代碼將不適用于社區版:
val dir = s"/dbfs/tmp/$username/new-flights"
因為Databricks 社區版沒有DBFS 保險絲(僅在完整的 Databricks 上受支持)。有可能通過以下方式使其作業:
- 將該目錄更改為本地目錄,例如,
/tmp或類似的東西 - 添加代碼(在 之后
writer.close())以列出flights-*該本地目錄中的檔案,并使用dbutils.fs.mv將它們移動到streamDirectory
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/406420.html
標籤:
下一篇:無法獲取火花案例類的輸出
