在spark1.6中,
有如下資料:
df=spark.createDataFrame([(1,2,3,4),(1,3,4,5),(4,5,6,7),(1,3,4,5),(2,3,4,5),(2,2,4,5)],["aa","bb","cc","dd"])
df.registerTempTable ("record")
aa bb cc dd
0 1 2 3 4
1 1 3 4 5
2 4 5 6 7
3 1 3 4 5
4 2 3 4 5
5 2 2 4 5
為了把aa這一列,計數大于1的記錄全部回傳,回傳結果應該如下所示:
aa bb cc dd
0 1 2 3 4
1 1 3 4 5
2 1 3 4 5
3 2 3 4 5
4 2 2 4 5
在mysql中可以執行如下陳述句:
select * from record where aa in (select aa from record group by aa having COUNT(*)>1)
并且能夠回傳所滿足的需求。
現在在spark中,執行的時候,報錯:
Py4JJavaError: An error occurred while calling o30.sql.
: java.lang.RuntimeException: [1.42] failure: ``)'' expected but identifier aa found
select * from record where aa in (select aa from record group by aa having COUNT(*)>1)
^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
請問,如何解決?
uj5u.com熱心網友回復:
spark sql 又不是 MySQLuj5u.com熱心網友回復:
把子查詢抽離出來,dataframe 之間進行轉換,不要用子查詢轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/90964.html
標籤:MySQL
下一篇:死鎖Deadlock
