我正在嘗試使用作為 Spark 一部分的 spark thrift 服務器查詢冰山表(S3 中的資料和 Hivemetastore 中的元資料的外部表)。我可以查詢非冰山表,但是當我查詢冰山表時,我遇到了錯誤。我們不能通過 spark thrift 服務器查詢冰山表嗎?
版本詳情
- 火花 - 3.2.1
- 斯卡拉 - 2.12.15
- 冰山火花庫 - iceberg-spark-runtime-3.2_2.12
- 我從 maven 添加了其他 S3 、 AWS 依賴 jar 并添加到 spark jars 檔案夾中。
我已經使用以下命令啟動了 thrift 服務器
start-thriftserver.sh \
--hiveconf hive.metastore.uris=thrift://$ip:$port \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.access.key=$key \
--conf spark.hadoop.fs.s3a.secret.key=$secret \
--conf spark.sql.catalog.iceberg_catalog.uri=thrift://$ip:$port \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.iceberg_catalog.type=hive \
--conf spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf iceberg.engine.hive.enabled=true \
查詢冰山表時直線錯誤select count(*) from $table_name
Error: org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:44)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:191)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:316)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:300)
... 16 more
Caused by: java.lang.InstantiationException
at java.base/jdk.internal.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135)
... 75 more (state=,code=0)
uj5u.com熱心網友回復:
根據您的配置,您似乎正在嘗試使用名為 的目錄iceberg_catalog,但它被配置為 Iceberg 的SparkSessionCatalog.
但是,SparkSessionCatalog它是為 Spark 使用的默認目錄保留的,它允許該目錄與 Iceberg 表和其他格式一起使用。
會話目錄必須命名為spark_catalog。這是 Spark 提出的要求。
因此,您需要使用org.apache.iceberg.SparkCatalog當前配置的名為的單獨目錄iceberg_catalog(名稱由您決定),或者如果您想覆寫默認目錄,以便 Iceberg 表和非 Iceberg 表可以存在于一個目錄中,您'd 需要將目錄名稱更改為spark_catalog并保留當前配置。
請參閱有關添加目錄的檔案。在使用的配置中,會話目錄 ,spark_catalog被覆寫,然后還有一個名為的目錄local,它是不同的,只能有 Iceberg 表。
uj5u.com熱心網友回復:
如果您通過 Spark/Beeline 創建表,并且可以看到該表,但看不到 Hive 中存在的表,這通常意味著 Spark 未配置為使用 Hive 元存盤。
在 Spark 1.6 中,默認情況下 Thrift 服務器以多會話模式運行。這意味著每個 JDBC/ODBC 連接都擁有自己的 SQL 配置和臨時函式注冊表的副本。快取表仍然是共享的。您正在注冊一個臨時表,因此為了查看臨時表,您需要以單會話模式運行 Thrift 服務器。在 spark-default.conf 中將 spark.sql.hive.thriftServer.singleSession 設定為 true。當您在代碼中呼叫 Thrift 服務器的實體時,它應該以單會話模式啟動。當您初始化和注冊臨時表時,它應該在您連接并發出 show tables 命令時顯示。您可以創建一個永久表,在這種情況下,它應該以多會話模式和 Hive 顯示(您有執行此操作的代碼,但它已被注釋掉)。
[root@sandbox conf]# cat hive-site.xml
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://sandbox.hortonworks.com:9083</value>
</property>
</configuration>
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/491787.html
上一篇:如何使用Scala為apache.spark.sql中的RelationalGroupedDataset類應用過濾器?
