我在 docker 中創建了一個本地 spark 環境。我打算將其用作 CICD 管道的一部分,用于在 spark 環境中執行單元測驗代碼。我有兩個要使用的腳本:1 將創建一組持久的 spark 資料庫和表,另一個將讀取這些表。盡管這些表應該是持久的,但它們只在特定的 spark 會話中持久存在。如果創建新的 spark 會話,則無法訪問這些表,即使它在檔案系統中可見。代碼示例如下:
創建資料庫和表
創建腳本.py
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.appName('Example').getOrCreate()
columns = ["language", "users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
spark.sql("create database if not exists schema1")
df.write.mode("ignore").saveAsTable('schema1.table1')
加載資料
加載資料.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.sql("select * from schema1.table1")
我知道當我運行這個命令時有問題: print(spark.catalog.listDatabases()) 它只能找到資料庫默認值。但是如果我匯入 Create_script.py 那么它會找到 schema1 db。
如何在所有 spark 會話中創建持久表?

uj5u.com熱心網友回復:
這些檔案/repo/test/spark-warehouse只是表的資料,沒有資料庫/表/列的元資訊。
如果您不啟用 Hive,Spark 將使用一個InMemoryCatalog,它是短暫的,僅用于測驗,僅在相同的Spark背景關系中可用。這InMemoryCatalog不提供從檔案系統加載 db/table 的任何功能。
所以有兩種方法:
柱狀格式
spark.write.orc(), 在Create_script.py腳本中將資料寫入 orc/parquet 格式。orc/parquet 格式將列資訊與資料一起存盤。val df = spark.read.orc(),那么createOrReplaceTempView如果你需要在sql中使用它。
使用嵌入 Hive
不需要安裝 Hive,Spark 可以和 embed hive 一起作業,只需兩步。
- 添加 spark-hive 依賴項。(我使用的是使用 pom.xml 管理依賴項的 Java,我不知道如何在 pyspark 中執行此操作)
SparkSession.builder().enableHiveSupport()
然后 data 將是
/repo/test/spark-warehouse/schema1.db,元資訊將是/repo/test/metastore_db,其中包含 Derby db 的檔案。您可以在所有 spark 會話中讀取或寫入表格。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/364797.html
標籤:Python 阿帕奇火花 火花 apache-spark-sql
上一篇:組合月份和年份列以創建日期列
下一篇:PySpark:迭代資料幀串列
