diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 037e0c70b07a..18c6c5099717 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -233,7 +233,6 @@ Commands: Options: -f file (Optional) Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark. - For Java 17, use `-f kubernetes/dockerfiles/spark/Dockerfile.java17` -p file (Optional) Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark. Skips building PySpark docker image if not specified. -R file (Optional) Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark. @@ -277,10 +276,6 @@ Examples: # Note: buildx, which does cross building, needs to do the push during build # So there is no separate push step with -X - - Build and push Java17-based image with tag "v3.3.0" to docker.io/myrepo - $0 -r docker.io/myrepo -t v3.3.0 -f kubernetes/dockerfiles/spark/Dockerfile.java17 build - $0 -r docker.io/myrepo -t v3.3.0 push - EOF } diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 53abf2e77090..ebdb5a447b11 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -64,7 +64,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { case proto.Relation.RelTypeCase.AGGREGATE => transformAggregate(rel.getAggregate) case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql) case proto.Relation.RelTypeCase.LOCAL_RELATION => - transformLocalRelation(rel.getLocalRelation) + transformLocalRelation(rel.getLocalRelation, common) case proto.Relation.RelTypeCase.SAMPLE => transformSample(rel.getSample) case proto.Relation.RelTypeCase.RELTYPE_NOT_SET => throw new IndexOutOfBoundsException("Expected Relation to be set, but is empty.") @@ -122,9 +122,16 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { } } - private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = { + private def transformLocalRelation( + rel: proto.LocalRelation, + common: Option[proto.RelationCommon]): LogicalPlan = { val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq - new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes) + val relation = new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes) + if (common.nonEmpty && common.get.getAlias.nonEmpty) { + logical.SubqueryAlias(identifier = common.get.getAlias, child = relation) + } else { + relation + } } private def transformAttribute(exp: proto.Expression.QualifiedAttribute): Attribute = { diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectDeduplicateSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectDeduplicateSuite.scala deleted file mode 100644 index 88af60581ba2..000000000000 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectDeduplicateSuite.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.connect.planner - -import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} - -/** - * [[SparkConnectPlanTestWithSparkSession]] contains a SparkSession for the connect planner. - * - * It is not recommended to use Catalyst DSL along with this trait because `SharedSparkSession` - * has also defined implicits over Catalyst LogicalPlan which will cause ambiguity with the - * implicits defined in Catalyst DSL. - */ -trait SparkConnectPlanTestWithSparkSession extends SharedSparkSession with SparkConnectPlanTest { - override def getSession(): SparkSession = spark -} - -class SparkConnectDeduplicateSuite extends SparkConnectPlanTestWithSparkSession { - lazy val connectTestRelation = createLocalRelationProto( - Seq( - AttributeReference("id", IntegerType)(), - AttributeReference("key", StringType)(), - AttributeReference("value", StringType)())) - - lazy val sparkTestRelation = { - spark.createDataFrame( - new java.util.ArrayList[Row](), - StructType( - Seq( - StructField("id", IntegerType), - StructField("key", StringType), - StructField("value", StringType)))) - } - - test("Test basic deduplicate") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.plans._ - Dataset.ofRows(spark, transform(connectTestRelation.distinct())) - } - - val sparkPlan = sparkTestRelation.distinct() - comparePlans(connectPlan.queryExecution.analyzed, sparkPlan.queryExecution.analyzed, false) - - val connectPlan2 = { - import org.apache.spark.sql.connect.dsl.plans._ - Dataset.ofRows(spark, transform(connectTestRelation.deduplicate(Seq("key", "value")))) - } - val sparkPlan2 = sparkTestRelation.dropDuplicates(Seq("key", "value")) - comparePlans(connectPlan2.queryExecution.analyzed, sparkPlan2.queryExecution.analyzed, false) - } -} diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 6fc47e07c598..49072982c00c 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -22,20 +22,18 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.connect.proto import org.apache.spark.connect.proto.Expression.UnresolvedStar -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.test.SharedSparkSession /** * Testing trait for SparkConnect tests with some helper methods to make it easier to create new * test cases. */ -trait SparkConnectPlanTest { - - def getSession(): SparkSession = None.orNull +trait SparkConnectPlanTest extends SharedSparkSession { def transform(rel: proto.Relation): LogicalPlan = { - new SparkConnectPlanner(rel, getSession()).transform() + new SparkConnectPlanner(rel, spark).transform() } def readRel: proto.Relation = diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala index 0325b6573bd3..a38b1951eb23 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala @@ -18,10 +18,16 @@ package org.apache.spark.sql.connect.planner import org.apache.spark.connect.proto import org.apache.spark.connect.proto.Join.JoinType -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter, UsingJoin} +import org.apache.spark.sql.{Column, DataFrame, Row} +import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connect.dsl.expressions._ +import org.apache.spark.sql.connect.dsl.plans._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} /** * This suite is based on connect DSL and test that given same dataframe operations, whether @@ -30,81 +36,61 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation */ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { - lazy val connectTestRelation = createLocalRelationProto(Seq($"id".int, $"name".string)) + lazy val connectTestRelation = + createLocalRelationProto( + Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)())) - lazy val connectTestRelation2 = createLocalRelationProto( - Seq($"key".int, $"value".int, $"name".string)) + lazy val connectTestRelation2 = + createLocalRelationProto( + Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)())) - lazy val sparkTestRelation: LocalRelation = LocalRelation($"id".int, $"name".string) + lazy val sparkTestRelation: DataFrame = + spark.createDataFrame( + new java.util.ArrayList[Row](), + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))) - lazy val sparkTestRelation2: LocalRelation = - LocalRelation($"key".int, $"value".int, $"name".string) + lazy val sparkTestRelation2: DataFrame = + spark.createDataFrame( + new java.util.ArrayList[Row](), + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))) test("Basic select") { - val connectPlan = { - // TODO: Scala only allows one implicit per scope so we keep proto implicit imports in - // this scope. Need to find a better way to make two implicits work in the same scope. - import org.apache.spark.sql.connect.dsl.expressions._ - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.select("id".protoAttr)) - } - val sparkPlan = sparkTestRelation.select($"id") - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + val connectPlan = connectTestRelation.select("id".protoAttr) + val sparkPlan = sparkTestRelation.select("id") + comparePlans(connectPlan, sparkPlan) } test("UnresolvedFunction resolution.") { - { - import org.apache.spark.sql.connect.dsl.expressions._ - import org.apache.spark.sql.connect.dsl.plans._ - assertThrows[IllegalArgumentException] { - transform(connectTestRelation.select(callFunction("default.hex", Seq("id".protoAttr)))) - } + assertThrows[IllegalArgumentException] { + transform(connectTestRelation.select(callFunction("default.hex", Seq("id".protoAttr)))) } - val connectPlan = { - import org.apache.spark.sql.connect.dsl.expressions._ - import org.apache.spark.sql.connect.dsl.plans._ - transform( - connectTestRelation.select(callFunction(Seq("default", "hex"), Seq("id".protoAttr)))) - } + val connectPlan = + connectTestRelation.select(callFunction(Seq("default", "hex"), Seq("id".protoAttr))) assertThrows[UnsupportedOperationException] { - connectPlan.analyze + analyzePlan(transform(connectPlan)) } - val validPlan = { - import org.apache.spark.sql.connect.dsl.expressions._ - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.select(callFunction(Seq("hex"), Seq("id".protoAttr)))) - } - assert(validPlan.analyze != null) + val validPlan = connectTestRelation.select(callFunction(Seq("hex"), Seq("id".protoAttr))) + assert(analyzePlan(transform(validPlan)) != null) } test("Basic filter") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.expressions._ - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.where("id".protoAttr < 0)) - } - - val sparkPlan = sparkTestRelation.where($"id" < 0).analyze - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + val connectPlan = connectTestRelation.where("id".protoAttr < 0) + val sparkPlan = sparkTestRelation.where(Column("id") < 0) + comparePlans(connectPlan, sparkPlan) } test("Basic joins with different join types") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.join(connectTestRelation2)) - } + val connectPlan = connectTestRelation.join(connectTestRelation2) val sparkPlan = sparkTestRelation.join(sparkTestRelation2) - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + comparePlans(connectPlan, sparkPlan) + + val connectPlan2 = connectTestRelation.join(connectTestRelation2) + val sparkPlan2 = sparkTestRelation.join(sparkTestRelation2) + comparePlans(connectPlan2, sparkPlan2) - val connectPlan2 = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.join(connectTestRelation2, condition = None)) - } - val sparkPlan2 = sparkTestRelation.join(sparkTestRelation2, condition = None) - comparePlans(connectPlan2.analyze, sparkPlan2.analyze, false) for ((t, y) <- Seq( (JoinType.JOIN_TYPE_LEFT_OUTER, LeftOuter), (JoinType.JOIN_TYPE_RIGHT_OUTER, RightOuter), @@ -112,99 +98,79 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { (JoinType.JOIN_TYPE_LEFT_ANTI, LeftAnti), (JoinType.JOIN_TYPE_LEFT_SEMI, LeftSemi), (JoinType.JOIN_TYPE_INNER, Inner))) { - val connectPlan3 = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.join(connectTestRelation2, t)) - } - val sparkPlan3 = sparkTestRelation.join(sparkTestRelation2, y) - comparePlans(connectPlan3.analyze, sparkPlan3.analyze, false) - } - val connectPlan4 = { - import org.apache.spark.sql.connect.dsl.plans._ - transform( - connectTestRelation.join(connectTestRelation2, JoinType.JOIN_TYPE_INNER, Seq("name"))) + val connectPlan3 = connectTestRelation.join(connectTestRelation2, t, Seq("id")) + val sparkPlan3 = sparkTestRelation.join(sparkTestRelation2, Seq("id"), y.toString) + comparePlans(connectPlan3, sparkPlan3) } - val sparkPlan4 = sparkTestRelation.join(sparkTestRelation2, UsingJoin(Inner, Seq("name"))) - comparePlans(connectPlan4.analyze, sparkPlan4.analyze, false) + + val connectPlan4 = + connectTestRelation.join(connectTestRelation2, JoinType.JOIN_TYPE_INNER, Seq("name")) + val sparkPlan4 = sparkTestRelation.join(sparkTestRelation2, Seq("name"), Inner.toString) + comparePlans(connectPlan4, sparkPlan4) } test("Test sample") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.sample(0, 0.2, false, 1)) - } - val sparkPlan = sparkTestRelation.sample(0, 0.2, false, 1) - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + val connectPlan = connectTestRelation.sample(0, 0.2, false, 1) + val sparkPlan = sparkTestRelation.sample(false, 0.2 - 0, 1) + comparePlans(connectPlan, sparkPlan) } test("column alias") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.expressions._ - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.select("id".protoAttr.as("id2"))) - } - val sparkPlan = sparkTestRelation.select($"id".as("id2")) - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + val connectPlan = connectTestRelation.select("id".protoAttr.as("id2")) + val sparkPlan = sparkTestRelation.select(Column("id").alias("id2")) + comparePlans(connectPlan, sparkPlan) } test("Aggregate with more than 1 grouping expressions") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.expressions._ - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.groupBy("id".protoAttr, "name".protoAttr)()) + withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") { + val connectPlan = + connectTestRelation.groupBy("id".protoAttr, "name".protoAttr)() + val sparkPlan = + sparkTestRelation.groupBy(Column("id"), Column("name")).agg(Map.empty[String, String]) + comparePlans(connectPlan, sparkPlan) } - val sparkPlan = sparkTestRelation.groupBy($"id", $"name")() - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) } test("Test as(alias: String)") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.as("target_table")) - } - + val connectPlan = connectTestRelation.as("target_table") val sparkPlan = sparkTestRelation.as("target_table") - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + comparePlans(connectPlan, sparkPlan) } test("Test StructType in LocalRelation") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.expressions._ - transform(createLocalRelationProtoByQualifiedAttributes(Seq("a".struct("id".int)))) - } - val sparkPlan = LocalRelation($"a".struct($"id".int)) - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + val connectPlan = createLocalRelationProtoByQualifiedAttributes(Seq("a".struct("id".int))) + val sparkPlan = + LocalRelation(AttributeReference("a", StructType(Seq(StructField("id", IntegerType))))()) + comparePlans(connectPlan, sparkPlan) } test("Test limit offset") { - val connectPlan = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.limit(10)) - } + val connectPlan = connectTestRelation.limit(10) val sparkPlan = sparkTestRelation.limit(10) - comparePlans(connectPlan.analyze, sparkPlan.analyze, false) + comparePlans(connectPlan, sparkPlan) - val connectPlan2 = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.offset(2)) - } + val connectPlan2 = connectTestRelation.offset(2) val sparkPlan2 = sparkTestRelation.offset(2) - comparePlans(connectPlan2.analyze, sparkPlan2.analyze, false) + comparePlans(connectPlan2, sparkPlan2) - val connectPlan3 = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.limit(10).offset(2)) - } + val connectPlan3 = connectTestRelation.limit(10).offset(2) val sparkPlan3 = sparkTestRelation.limit(10).offset(2) - comparePlans(connectPlan3.analyze, sparkPlan3.analyze, false) + comparePlans(connectPlan3, sparkPlan3) - val connectPlan4 = { - import org.apache.spark.sql.connect.dsl.plans._ - transform(connectTestRelation.offset(2).limit(10)) - } + val connectPlan4 = connectTestRelation.offset(2).limit(10) val sparkPlan4 = sparkTestRelation.offset(2).limit(10) - comparePlans(connectPlan4.analyze, sparkPlan4.analyze, false) + comparePlans(connectPlan4, sparkPlan4) + } + + test("Test basic deduplicate") { + val connectPlan = connectTestRelation.distinct() + val sparkPlan = sparkTestRelation.distinct() + comparePlans(connectPlan, sparkPlan) + + val connectPlan2 = connectTestRelation.deduplicate(Seq("id", "name")) + val sparkPlan2 = sparkTestRelation.dropDuplicates(Seq("id", "name")) + comparePlans(connectPlan2, sparkPlan2) } private def createLocalRelationProtoByQualifiedAttributes( @@ -215,4 +181,17 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { } proto.Relation.newBuilder().setLocalRelation(localRelationBuilder.build()).build() } + + // This is a function for testing only. This is used when the plan is ready and it only waits + // analyzer to analyze attribute references within the plan. + private def analyzePlan(plan: LogicalPlan): LogicalPlan = { + val connectAnalyzed = analysis.SimpleAnalyzer.execute(plan) + analysis.SimpleAnalyzer.checkAnalysis(connectAnalyzed) + connectAnalyzed + } + + private def comparePlans(connectPlan: proto.Relation, sparkPlan: DataFrame): Unit = { + val connectAnalyzed = analyzePlan(transform(connectPlan)) + comparePlans(connectAnalyzed, sparkPlan.queryExecution.analyzed, false) + } } diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 16347f89463e..4797ee0d0d0b 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -300,14 +300,14 @@ "The must be between (current value = )" ] }, - "WRONG_NUM_ENDPOINTS" : { + "WRONG_NUM_ARGS" : { "message" : [ - "The number of endpoints must be >= 2 to construct intervals but the actual number is ." + "The requires parameters but the actual number is ." ] }, - "WRONG_NUM_PARAMS" : { + "WRONG_NUM_ENDPOINTS" : { "message" : [ - "The requires parameters but the actual number is ." + "The number of endpoints must be >= 2 to construct intervals but the actual number is ." ] } } @@ -422,12 +422,6 @@ "Fail to recognize pattern in the DateTimeFormatter. 1) You can set to \"LEGACY\" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html" ] }, - "FORMAT_DATETIME_BY_NEW_PARSER" : { - "message" : [ - "Spark >= 3.0:", - "Fail to format it to in the new formatter. You can set to \"LEGACY\" to restore the behavior before Spark 3.0, or set to \"CORRECTED\" and treat it as an invalid datetime string." - ] - }, "PARSE_DATETIME_BY_NEW_PARSER" : { "message" : [ "Spark >= 3.0:", @@ -595,28 +589,6 @@ "More than one row returned by a subquery used as an expression." ] }, - "NAMESPACE_ALREADY_EXISTS" : { - "message" : [ - "Cannot create namespace because it already exists.", - "Choose a different name, drop the existing namespace, or add the IF NOT EXISTS clause to tolerate pre-existing namespace." - ], - "sqlState" : "42000" - }, - "NAMESPACE_NOT_EMPTY" : { - "message" : [ - "Cannot drop a namespace because it contains objects.", - "Use DROP NAMESPACE ... CASCADE to drop the namespace and all its objects." - ], - "sqlState" : "42000" - }, - "NAMESPACE_NOT_FOUND" : { - "message" : [ - "The namespace cannot be found. Verify the spelling and correctness of the namespace.", - "If you did not qualify the name with, verify the current_schema() output, or qualify the name with the correctly.", - "To tolerate the error on drop use DROP NAMESPACE IF EXISTS." - ], - "sqlState" : "42000" - }, "NON_LITERAL_PIVOT_VALUES" : { "message" : [ "Literal expressions required for pivot values, found ." diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 2422b003d6d1..d4106c9045dc 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -17,10 +17,10 @@ api-asn1-api/1.0.0-M20//api-asn1-api-1.0.0-M20.jar api-util/1.0.0-M20//api-util-1.0.0-M20.jar arpack/3.0.2//arpack-3.0.2.jar arpack_combined_all/0.1//arpack_combined_all-0.1.jar -arrow-format/9.0.0//arrow-format-9.0.0.jar -arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar -arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar -arrow-vector/9.0.0//arrow-vector-9.0.0.jar +arrow-format/10.0.0//arrow-format-10.0.0.jar +arrow-memory-core/10.0.0//arrow-memory-core-10.0.0.jar +arrow-memory-netty/10.0.0//arrow-memory-netty-10.0.0.jar +arrow-vector/10.0.0//arrow-vector-10.0.0.jar audience-annotations/0.5.0//audience-annotations-0.5.0.jar avro-ipc/1.11.1//avro-ipc-1.11.1.jar avro-mapred/1.11.1//avro-mapred-1.11.1.jar @@ -271,4 +271,4 @@ xz/1.8//xz-1.8.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper-jute/3.6.2//zookeeper-jute-3.6.2.jar zookeeper/3.6.2//zookeeper-3.6.2.jar -zstd-jni/1.5.2-4//zstd-jni-1.5.2-4.jar +zstd-jni/1.5.2-5//zstd-jni-1.5.2-5.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index ecaf4293f247..2d10ad96104e 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -16,10 +16,10 @@ antlr4-runtime/4.9.3//antlr4-runtime-4.9.3.jar aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar arpack/3.0.2//arpack-3.0.2.jar arpack_combined_all/0.1//arpack_combined_all-0.1.jar -arrow-format/9.0.0//arrow-format-9.0.0.jar -arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar -arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar -arrow-vector/9.0.0//arrow-vector-9.0.0.jar +arrow-format/10.0.0//arrow-format-10.0.0.jar +arrow-memory-core/10.0.0//arrow-memory-core-10.0.0.jar +arrow-memory-netty/10.0.0//arrow-memory-netty-10.0.0.jar +arrow-vector/10.0.0//arrow-vector-10.0.0.jar audience-annotations/0.5.0//audience-annotations-0.5.0.jar avro-ipc/1.11.1//avro-ipc-1.11.1.jar avro-mapred/1.11.1//avro-mapred-1.11.1.jar @@ -256,4 +256,4 @@ xz/1.8//xz-1.8.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper-jute/3.6.2//zookeeper-jute-3.6.2.jar zookeeper/3.6.2//zookeeper-3.6.2.jar -zstd-jni/1.5.2-4//zstd-jni-1.5.2-4.jar +zstd-jni/1.5.2-5//zstd-jni-1.5.2-5.jar diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index 2a70bd3f98f4..24bad4db4080 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -32,7 +32,7 @@ RUN $APT_INSTALL software-properties-common git libxml2-dev pkg-config curl wget RUN update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.9 -RUN python3.9 -m pip install numpy pyarrow 'pandas<=1.4.4' scipy unittest-xml-reporting plotly>=4.8 sklearn 'mlflow>=1.0' coverage matplotlib openpyxl +RUN python3.9 -m pip install numpy pyarrow 'pandas<=1.5.0' scipy unittest-xml-reporting plotly>=4.8 sklearn 'mlflow>=1.0' coverage matplotlib openpyxl RUN add-apt-repository ppa:pypy/ppa RUN apt update @@ -45,7 +45,7 @@ RUN mkdir -p /usr/local/pypy/pypy3.7 && \ ln -sf /usr/local/pypy/pypy3.7/bin/pypy /usr/local/bin/pypy3 RUN curl -sS https://bootstrap.pypa.io/get-pip.py | pypy3 -RUN pypy3 -m pip install numpy 'pandas<=1.4.4' scipy coverage matplotlib +RUN pypy3 -m pip install numpy 'pandas<=1.5.0' scipy coverage matplotlib RUN $APT_INSTALL gnupg ca-certificates pandoc RUN echo 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/' >> /etc/apt/sources.list diff --git a/pom.xml b/pom.xml index 707aed043796..41a197f2031b 100644 --- a/pom.xml +++ b/pom.xml @@ -216,7 +216,7 @@ If you are changing Arrow version specification, please check ./python/pyspark/sql/pandas/utils.py, and ./python/setup.py too. --> - 9.0.0 + 10.0.0 org.fusesource.leveldbjni 6.2.0 @@ -802,7 +802,7 @@ com.github.luben zstd-jni - 1.5.2-4 + 1.5.2-5 com.clearspring.analytics diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 20c537e0e672..03baa77090e2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -813,7 +813,7 @@ object KubernetesIntegrationTests { val bindingsDir = s"$sparkHome/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings" val javaImageTag = sys.props.get("spark.kubernetes.test.javaImageTag") val dockerFile = sys.props.getOrElse("spark.kubernetes.test.dockerFile", - s"$sparkHome/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile.java17") + s"$sparkHome/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile") val pyDockerFile = sys.props.getOrElse("spark.kubernetes.test.pyDockerFile", s"$bindingsDir/python/Dockerfile") val rDockerFile = sys.props.getOrElse("spark.kubernetes.test.rDockerFile", diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index 755e111bff8f..350674e5e52d 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -866,8 +866,8 @@ def isin(self: IndexOpsLike, values: Sequence[Any]) -> IndexOpsLike: 5 False Name: animal, dtype: bool - >>> s.rename("a").to_frame().set_index("a").index.isin(['lama']) - Index([True, False, True, False, True, False], dtype='object', name='a') + >>> s.rename("a").to_frame().set_index("a").index.isin(['lama']) # doctest: +SKIP + Index([True, False, True, False, True, False], dtype='bool', name='a') """ if not is_list_like(values): raise TypeError( @@ -910,8 +910,8 @@ def isnull(self: IndexOpsLike) -> IndexOpsLike: 2 True dtype: bool - >>> ser.rename("a").to_frame().set_index("a").index.isna() - Index([False, False, True], dtype='object', name='a') + >>> ser.rename("a").to_frame().set_index("a").index.isna() # doctest: +SKIP + Index([False, False, True], dtype='bool', name='a') """ from pyspark.pandas.indexes import MultiIndex @@ -953,8 +953,8 @@ def notnull(self: IndexOpsLike) -> IndexOpsLike: 2 False dtype: bool - >>> ser.rename("a").to_frame().set_index("a").index.notna() - Index([True, True, False], dtype='object', name='a') + >>> ser.rename("a").to_frame().set_index("a").index.notna() # doctest: +SKIP + Index([True, True, False], dtype='bool', name='a') """ from pyspark.pandas.indexes import MultiIndex diff --git a/python/pyspark/pandas/indexes/datetimes.py b/python/pyspark/pandas/indexes/datetimes.py index b4a7c1e8356a..3343014c6f8b 100644 --- a/python/pyspark/pandas/indexes/datetimes.py +++ b/python/pyspark/pandas/indexes/datetimes.py @@ -284,8 +284,8 @@ def is_month_start(self) -> Index: Examples -------- >>> idx = ps.date_range("2018-02-27", periods=3) - >>> idx.is_month_start - Index([False, False, True], dtype='object') + >>> idx.is_month_start # doctest: +SKIP + Index([False, False, True], dtype='bool') """ return Index(self.to_series().dt.is_month_start) @@ -307,8 +307,8 @@ def is_month_end(self) -> Index: Examples -------- >>> idx = ps.date_range("2018-02-27", periods=3) - >>> idx.is_month_end - Index([False, True, False], dtype='object') + >>> idx.is_month_end # doctest: +SKIP + Index([False, True, False], dtype='bool') """ return Index(self.to_series().dt.is_month_end) @@ -330,8 +330,8 @@ def is_quarter_start(self) -> Index: Examples -------- >>> idx = ps.date_range('2017-03-30', periods=4) - >>> idx.is_quarter_start - Index([False, False, True, False], dtype='object') + >>> idx.is_quarter_start # doctest: +SKIP + Index([False, False, True, False], dtype='bool') """ return Index(self.to_series().dt.is_quarter_start) @@ -353,8 +353,8 @@ def is_quarter_end(self) -> Index: Examples -------- >>> idx = ps.date_range('2017-03-30', periods=4) - >>> idx.is_quarter_end - Index([False, True, False, False], dtype='object') + >>> idx.is_quarter_end # doctest: +SKIP + Index([False, True, False, False], dtype='bool') """ return Index(self.to_series().dt.is_quarter_end) @@ -375,8 +375,8 @@ def is_year_start(self) -> Index: Examples -------- >>> idx = ps.date_range("2017-12-30", periods=3) - >>> idx.is_year_start - Index([False, False, True], dtype='object') + >>> idx.is_year_start # doctest: +SKIP + Index([False, False, True], dtype='bool') """ return Index(self.to_series().dt.is_year_start) @@ -397,8 +397,8 @@ def is_year_end(self) -> Index: Examples -------- >>> idx = ps.date_range("2017-12-30", periods=3) - >>> idx.is_year_end - Index([False, True, False], dtype='object') + >>> idx.is_year_end # doctest: +SKIP + Index([False, True, False], dtype='bool') """ return Index(self.to_series().dt.is_year_end) @@ -420,8 +420,8 @@ def is_leap_year(self) -> Index: Examples -------- >>> idx = ps.date_range("2012-01-01", "2015-01-01", freq="Y") - >>> idx.is_leap_year - Index([True, False, False], dtype='object') + >>> idx.is_leap_year # doctest: +SKIP + Index([True, False, False], dtype='bool') """ return Index(self.to_series().dt.is_leap_year) diff --git a/python/pyspark/pandas/strings.py b/python/pyspark/pandas/strings.py index 774fd6c7ca0b..7d541cbcb6ea 100644 --- a/python/pyspark/pandas/strings.py +++ b/python/pyspark/pandas/strings.py @@ -2315,8 +2315,8 @@ def zfill(self, width: int) -> "ps.Series": added to the left of it (:func:`str.zfill` would have moved it to the left). 1000 remains unchanged as it is longer than width. - >>> s.str.zfill(3) - 0 0-1 + >>> s.str.zfill(3) # doctest: +SKIP + 0 -01 1 001 2 1000 3 None diff --git a/python/pyspark/pandas/supported_api_gen.py b/python/pyspark/pandas/supported_api_gen.py index de072b549277..2af35923afb2 100644 --- a/python/pyspark/pandas/supported_api_gen.py +++ b/python/pyspark/pandas/supported_api_gen.py @@ -98,7 +98,7 @@ def generate_supported_api(output_rst_file_path: str) -> None: Write supported APIs documentation. """ - pandas_latest_version = "1.4.4" + pandas_latest_version = "1.5.0" if LooseVersion(pd.__version__) != LooseVersion(pandas_latest_version): msg = ( "Warning: Latest version of pandas (%s) is required to generate the documentation; " diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 30338b6f91c7..53026016ee26 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -ARG java_image_tag=11-jre-focal +ARG java_image_tag=17-jre FROM eclipse-temurin:${java_image_tag} diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile.java17 b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile.java17 deleted file mode 100644 index 194242996ca7..000000000000 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile.java17 +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# We need to build from debian:bullseye-slim because openjdk switches its underlying OS -# from debian to oraclelinux from openjdk:12 -FROM debian:bullseye-slim - -ARG spark_uid=185 - -# Before building the docker image, first build and make a Spark distribution following -# the instructions in https://spark.apache.org/docs/latest/building-spark.html. -# If this docker file is being used in the context of building your images from a Spark -# distribution, the docker build command should be invoked from the top level directory -# of the Spark distribution. E.g.: -# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile . - -RUN set -ex && \ - apt-get update && \ - ln -s /lib /lib64 && \ - apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps openjdk-17-jre net-tools && \ - mkdir -p /opt/spark && \ - mkdir -p /opt/spark/examples && \ - mkdir -p /opt/spark/work-dir && \ - touch /opt/spark/RELEASE && \ - rm /bin/sh && \ - ln -sv /bin/bash /bin/sh && \ - echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ - chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ - rm -rf /var/cache/apt/* && rm -rf /var/lib/apt/lists/* - -COPY jars /opt/spark/jars -COPY bin /opt/spark/bin -COPY sbin /opt/spark/sbin -COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/ -COPY kubernetes/dockerfiles/spark/decom.sh /opt/ -COPY examples /opt/spark/examples -COPY kubernetes/tests /opt/spark/tests -COPY data /opt/spark/data - -ENV SPARK_HOME /opt/spark - -WORKDIR /opt/spark/work-dir -RUN chmod g+w /opt/spark/work-dir -RUN chmod a+x /opt/decom.sh - -ENTRYPOINT [ "/opt/entrypoint.sh" ] - -# Specify the User that the actual main process will run as -USER ${spark_uid} diff --git a/resource-managers/kubernetes/integration-tests/README.md b/resource-managers/kubernetes/integration-tests/README.md index af0b1ec3dc76..5e784980ab55 100644 --- a/resource-managers/kubernetes/integration-tests/README.md +++ b/resource-managers/kubernetes/integration-tests/README.md @@ -20,9 +20,8 @@ To run tests with Java 11 instead of Java 8, use `--java-image-tag` to specify t To run tests with a custom docker image, use `--docker-file` to specify the Dockerfile. Note that if both `--docker-file` and `--java-image-tag` are used, `--docker-file` is preferred, and the custom Dockerfile need to include a Java installation by itself. -Dockerfile.java17 is an example of custom Dockerfile, and you can specify it to run tests with Java 17. - ./dev/dev-run-integration-tests.sh --docker-file ../docker/src/main/dockerfiles/spark/Dockerfile.java17 + ./dev/dev-run-integration-tests.sh --docker-file ../docker/src/main/dockerfiles/spark/Dockerfile To run tests with Hadoop 2.x instead of Hadoop 3.x, use `--hadoop-profile`. diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 7580982c2330..1857ed911794 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -43,7 +43,7 @@ - Dockerfile.java17 + Dockerfile org.apache.spark.deploy.k8s.integrationtest.YuniKornTag diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index 50050d391592..a047b187dbf7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -127,5 +127,5 @@ class FunctionAlreadyExistsException(errorClass: String, messageParameters: Map[ } class IndexAlreadyExistsException(message: String, cause: Option[Throwable] = None) - extends AnalysisException(errorClass = "INDEX_NOT_FOUND", + extends AnalysisException(errorClass = "INDEX_ALREADY_EXISTS", Map("message" -> message), cause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index fd5da3ff13d8..df6b1c400bb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] { val windowTimeExpressions = p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet - if (windowTimeExpressions.size == 1 && - windowTimeExpressions.head.windowColumn.resolved && - windowTimeExpressions.head.checkInputDataTypes().isSuccess) { + val allWindowTimeExprsResolved = windowTimeExpressions.forall { w => + w.windowColumn.resolved && w.checkInputDataTypes().isSuccess + } - val windowTime = windowTimeExpressions.head + if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) { + val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime => + val metadata = windowTime.windowColumn match { + case a: Attribute => a.metadata + case _ => Metadata.empty + } - val metadata = windowTime.windowColumn match { - case a: Attribute => a.metadata - case _ => Metadata.empty - } + if (!metadata.contains(TimeWindow.marker) && + !metadata.contains(SessionWindow.marker)) { + // FIXME: error framework? + throw new AnalysisException( + "The input is not a correct window column: $windowTime", plan = Some(p)) + } - if (!metadata.contains(TimeWindow.marker) && - !metadata.contains(SessionWindow.marker)) { - // FIXME: error framework? - throw new AnalysisException( - "The input is not a correct window column: $windowTime", plan = Some(p)) - } + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .remove(TimeWindow.marker) + .remove(SessionWindow.marker) + .build() - val newMetadata = new MetadataBuilder() - .withMetadata(metadata) - .remove(TimeWindow.marker) - .remove(SessionWindow.marker) - .build() + val colName = windowTime.sql + + val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)() - val attr = AttributeReference( - "window_time", windowTime.dataType, metadata = newMetadata)() + // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as + // it is, it is going to be bound to the different window even if we apply the same window + // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the + // correct window range. + val subtractExpr = + PreciseTimestampConversion( + Subtract(PreciseTimestampConversion( + GetStructField(windowTime.windowColumn, 1), + windowTime.dataType, LongType), Literal(1L)), + LongType, + windowTime.dataType) - // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as - // it is, it is going to be bound to the different window even if we apply the same window - // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the - // correct window range. - val subtractExpr = - PreciseTimestampConversion( - Subtract(PreciseTimestampConversion( - GetStructField(windowTime.windowColumn, 1), - windowTime.dataType, LongType), Literal(1L)), - LongType, - windowTime.dataType) + val newColumn = Alias(subtractExpr, colName)( + exprId = attr.exprId, explicitMetadata = Some(newMetadata)) - val newColumn = Alias(subtractExpr, "window_time")( - exprId = attr.exprId, explicitMetadata = Some(newMetadata)) + windowTime -> (attr, newColumn) + }.toMap val replacedPlan = p transformExpressions { - case w: WindowTime => attr + case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1 } - replacedPlan.withNewChildren(Project(newColumn +: child.output, child) :: Nil) + val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2) + replacedPlan.withNewChildren( + Project(newColumnsToAdd ++: child.output, child) :: Nil) } else { p // Return unchanged. Analyzer will throw exception later } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala index fa52e6cd8517..3d01ae1b7811 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala @@ -65,7 +65,7 @@ case class CallMethodViaReflection(children: Seq[Expression]) override def checkInputDataTypes(): TypeCheckResult = { if (children.size < 2) { DataTypeMismatch( - errorSubClass = "WRONG_NUM_PARAMS", + errorSubClass = "WRONG_NUM_ARGS", messageParameters = Map( "functionName" -> toSQLId(prettyName), "expectedNum" -> "> 1", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 4d99c3b02a07..116227224fdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -1209,7 +1209,7 @@ case class Least(children: Seq[Expression]) extends ComplexTypeMergingExpression override def checkInputDataTypes(): TypeCheckResult = { if (children.length <= 1) { DataTypeMismatch( - errorSubClass = "WRONG_NUM_PARAMS", + errorSubClass = "WRONG_NUM_ARGS", messageParameters = Map( "functionName" -> toSQLId(prettyName), "expectedNum" -> "> 1", @@ -1300,7 +1300,7 @@ case class Greatest(children: Seq[Expression]) extends ComplexTypeMergingExpress override def checkInputDataTypes(): TypeCheckResult = { if (children.length <= 1) { DataTypeMismatch( - errorSubClass = "WRONG_NUM_PARAMS", + errorSubClass = "WRONG_NUM_ARGS", messageParameters = Map( "functionName" -> toSQLId(prettyName), "expectedNum" -> "> 1", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 4f8ed1953f40..3cdf7b3b0d0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -271,7 +271,7 @@ abstract class HashExpression[E] extends Expression { override def checkInputDataTypes(): TypeCheckResult = { if (children.length < 1) { DataTypeMismatch( - errorSubClass = "WRONG_NUM_PARAMS", + errorSubClass = "WRONG_NUM_ARGS", messageParameters = Map( "functionName" -> toSQLId(prettyName), "expectedNum" -> "> 0", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 3529644aeeac..aab9b0a13c30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -396,7 +396,7 @@ case class JsonTuple(children: Seq[Expression]) override def checkInputDataTypes(): TypeCheckResult = { if (children.length < 2) { DataTypeMismatch( - errorSubClass = "WRONG_NUM_PARAMS", + errorSubClass = "WRONG_NUM_ARGS", messageParameters = Map( "functionName" -> toSQLId(prettyName), "expectedNum" -> "> 1", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 8ae4bb9c29c0..cc47d739d71a 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -276,7 +276,7 @@ case class Elt( override def checkInputDataTypes(): TypeCheckResult = { if (children.size < 2) { DataTypeMismatch( - errorSubClass = "WRONG_NUM_PARAMS", + errorSubClass = "WRONG_NUM_ARGS", messageParameters = Map( "functionName" -> toSQLId(prettyName), "expectedNum" -> "> 1", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 6665d885554f..3c995573d53d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -509,19 +509,21 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe /** * Split the plan for a scalar subquery into the parts above the innermost query block * (first part of returned value), the HAVING clause of the innermost query block - * (optional second part) and the parts below the HAVING CLAUSE (third part). + * (optional second part) and the Aggregate below the HAVING CLAUSE (optional third part). + * When the third part is empty, it means the subquery is a non-aggregated single-row subquery. */ - private def splitSubquery(plan: LogicalPlan) : (Seq[LogicalPlan], Option[Filter], Aggregate) = { + private def splitSubquery( + plan: LogicalPlan): (Seq[LogicalPlan], Option[Filter], Option[Aggregate]) = { val topPart = ArrayBuffer.empty[LogicalPlan] var bottomPart: LogicalPlan = plan while (true) { bottomPart match { case havingPart @ Filter(_, aggPart: Aggregate) => - return (topPart.toSeq, Option(havingPart), aggPart) + return (topPart.toSeq, Option(havingPart), Some(aggPart)) case aggPart: Aggregate => // No HAVING clause - return (topPart.toSeq, None, aggPart) + return (topPart.toSeq, None, Some(aggPart)) case p @ Project(_, child) => topPart += p @@ -531,6 +533,10 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe topPart += s bottomPart = child + case p: LogicalPlan if p.maxRows.exists(_ <= 1) => + // Non-aggregated one row subquery. + return (topPart.toSeq, None, None) + case Filter(_, op) => throw QueryExecutionErrors.unexpectedOperatorInCorrelatedSubquery(op, " below filter") @@ -561,72 +567,80 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe val origOutput = query.output.head val resultWithZeroTups = evalSubqueryOnZeroTups(query) + lazy val planWithoutCountBug = Project( + currentChild.output :+ origOutput, + Join(currentChild, query, LeftOuter, conditions.reduceOption(And), JoinHint.NONE)) + if (resultWithZeroTups.isEmpty) { // CASE 1: Subquery guaranteed not to have the COUNT bug - Project( - currentChild.output :+ origOutput, - Join(currentChild, query, LeftOuter, conditions.reduceOption(And), JoinHint.NONE)) + planWithoutCountBug } else { - // Subquery might have the COUNT bug. Add appropriate corrections. val (topPart, havingNode, aggNode) = splitSubquery(query) - - // The next two cases add a leading column to the outer join input to make it - // possible to distinguish between the case when no tuples join and the case - // when the tuple that joins contains null values. - // The leading column always has the value TRUE. - val alwaysTrueExprId = NamedExpression.newExprId - val alwaysTrueExpr = Alias(Literal.TrueLiteral, - ALWAYS_TRUE_COLNAME)(exprId = alwaysTrueExprId) - val alwaysTrueRef = AttributeReference(ALWAYS_TRUE_COLNAME, - BooleanType)(exprId = alwaysTrueExprId) - - val aggValRef = query.output.head - - if (havingNode.isEmpty) { - // CASE 2: Subquery with no HAVING clause - val subqueryResultExpr = - Alias(If(IsNull(alwaysTrueRef), - resultWithZeroTups.get, - aggValRef), origOutput.name)() - subqueryAttrMapping += ((origOutput, subqueryResultExpr.toAttribute)) - Project( - currentChild.output :+ subqueryResultExpr, - Join(currentChild, - Project(query.output :+ alwaysTrueExpr, query), - LeftOuter, conditions.reduceOption(And), JoinHint.NONE)) - + if (aggNode.isEmpty) { + // SPARK-40862: When the aggregate node is empty, it means the subquery produces + // at most one row and it is not subject to the COUNT bug. + planWithoutCountBug } else { - // CASE 3: Subquery with HAVING clause. Pull the HAVING clause above the join. - // Need to modify any operators below the join to pass through all columns - // referenced in the HAVING clause. - var subqueryRoot: UnaryNode = aggNode - val havingInputs: Seq[NamedExpression] = aggNode.output - - topPart.reverse.foreach { - case Project(projList, _) => - subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) - case s @ SubqueryAlias(alias, _) => - subqueryRoot = SubqueryAlias(alias, subqueryRoot) - case op => throw QueryExecutionErrors.unexpectedOperatorInCorrelatedSubquery(op) + // Subquery might have the COUNT bug. Add appropriate corrections. + val aggregate = aggNode.get + + // The next two cases add a leading column to the outer join input to make it + // possible to distinguish between the case when no tuples join and the case + // when the tuple that joins contains null values. + // The leading column always has the value TRUE. + val alwaysTrueExprId = NamedExpression.newExprId + val alwaysTrueExpr = Alias(Literal.TrueLiteral, + ALWAYS_TRUE_COLNAME)(exprId = alwaysTrueExprId) + val alwaysTrueRef = AttributeReference(ALWAYS_TRUE_COLNAME, + BooleanType)(exprId = alwaysTrueExprId) + + val aggValRef = query.output.head + + if (havingNode.isEmpty) { + // CASE 2: Subquery with no HAVING clause + val subqueryResultExpr = + Alias(If(IsNull(alwaysTrueRef), + resultWithZeroTups.get, + aggValRef), origOutput.name)() + subqueryAttrMapping += ((origOutput, subqueryResultExpr.toAttribute)) + Project( + currentChild.output :+ subqueryResultExpr, + Join(currentChild, + Project(query.output :+ alwaysTrueExpr, query), + LeftOuter, conditions.reduceOption(And), JoinHint.NONE)) + + } else { + // CASE 3: Subquery with HAVING clause. Pull the HAVING clause above the join. + // Need to modify any operators below the join to pass through all columns + // referenced in the HAVING clause. + var subqueryRoot: UnaryNode = aggregate + val havingInputs: Seq[NamedExpression] = aggregate.output + + topPart.reverse.foreach { + case Project(projList, _) => + subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) + case s@SubqueryAlias(alias, _) => + subqueryRoot = SubqueryAlias(alias, subqueryRoot) + case op => throw QueryExecutionErrors.unexpectedOperatorInCorrelatedSubquery(op) + } + + // CASE WHEN alwaysTrue IS NULL THEN resultOnZeroTups + // WHEN NOT (original HAVING clause expr) THEN CAST(null AS ) + // ELSE (aggregate value) END AS (original column name) + val caseExpr = Alias(CaseWhen(Seq( + (IsNull(alwaysTrueRef), resultWithZeroTups.get), + (Not(havingNode.get.condition), Literal.create(null, aggValRef.dataType))), + aggValRef), + origOutput.name)() + + subqueryAttrMapping += ((origOutput, caseExpr.toAttribute)) + + Project( + currentChild.output :+ caseExpr, + Join(currentChild, + Project(subqueryRoot.output :+ alwaysTrueExpr, subqueryRoot), + LeftOuter, conditions.reduceOption(And), JoinHint.NONE)) } - - // CASE WHEN alwaysTrue IS NULL THEN resultOnZeroTups - // WHEN NOT (original HAVING clause expr) THEN CAST(null AS ) - // ELSE (aggregate value) END AS (original column name) - val caseExpr = Alias(CaseWhen(Seq( - (IsNull(alwaysTrueRef), resultWithZeroTups.get), - (Not(havingNode.get.condition), Literal.create(null, aggValRef.dataType))), - aggValRef), - origOutput.name)() - - subqueryAttrMapping += ((origOutput, caseExpr.toAttribute)) - - Project( - currentChild.output :+ caseExpr, - Join(currentChild, - Project(subqueryRoot.output :+ alwaysTrueExpr, subqueryRoot), - LeftOuter, conditions.reduceOption(And), JoinHint.NONE)) - } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index a006687c6ddb..cf7e3524d5bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -978,7 +978,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def corruptedViewReferredTempFunctionsInCatalogError(e: Exception): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1088", messageParameters = Map.empty, cause = Some(e)) @@ -1309,19 +1309,19 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def tableIsNotRowLevelOperationTableError(table: Table): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1122", messageParameters = Map("table" -> table.name())) } def cannotRenameTableWithAlterViewError(): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1123", messageParameters = Map.empty) } private def notSupportedForV2TablesError(cmd: String): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1124", messageParameters = Map("cmd" -> cmd)) } @@ -1355,25 +1355,25 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def databaseFromV1SessionCatalogNotSpecifiedError(): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1125", messageParameters = Map.empty) } def nestedDatabaseUnsupportedByV1SessionCatalogError(catalog: String): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1126", messageParameters = Map("catalog" -> catalog)) } def invalidRepartitionExpressionsError(sortOrders: Seq[Any]): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1127", messageParameters = Map("sortOrders" -> sortOrders.toString())) } def partitionColumnNotSpecifiedError(format: String, partitionColumn: String): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1128", messageParameters = Map( "format" -> format, @@ -2145,7 +2145,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def invalidPatternError(pattern: String, message: String): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1216", - messageParameters = Map("pattern" -> pattern, "message" -> message)) + messageParameters = Map( + "pattern" -> toSQLValue(pattern, StringType), + "message" -> message)) } def tableIdentifierExistsError(tableIdentifier: TableIdentifier): Throwable = { @@ -2305,7 +2307,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def analyzeTableNotSupportedOnViewsError(): Throwable = { - new AnalysisException( + new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1236", messageParameters = Map.empty) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index ba78858debc0..41190d3f2f49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1395,7 +1395,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { def failToRecognizePatternError(pattern: String, e: Throwable): SparkRuntimeException = { new SparkRuntimeException( errorClass = "_LEGACY_ERROR_TEMP_2130", - messageParameters = Map("pattern" -> pattern), + messageParameters = Map("pattern" -> toSQLValue(pattern, StringType)), cause = e) } @@ -2686,7 +2686,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map( "parameter" -> "regexp", "functionName" -> toSQLId(funcName), - "expected" -> pattern)) + "expected" -> toSQLValue(pattern, StringType))) } def tooManyArrayElementsError( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala index e3829311e2dc..a7cdd589606c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala @@ -90,7 +90,7 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer exception = intercept[AnalysisException] { assertSuccess(expr) }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", parameters = messageParameters) } @@ -447,7 +447,7 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer exception = intercept[AnalysisException] { assertSuccess(murmur3Hash) }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", parameters = Map( "sqlExpr" -> "\"hash()\"", "functionName" -> toSQLId(murmur3Hash.prettyName), @@ -459,7 +459,7 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer exception = intercept[AnalysisException] { assertSuccess(xxHash64) }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", parameters = Map( "sqlExpr" -> "\"xxhash64()\"", "functionName" -> toSQLId(xxHash64.prettyName), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RegexpExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RegexpExpressionsSuite.scala index 98a6a9bc19c4..095c2736ae07 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RegexpExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RegexpExpressionsSuite.scala @@ -523,15 +523,15 @@ class RegexpExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkExceptionInExpression[SparkRuntimeException]( RegExpExtract(s, p, r), create_row("1a 2b 14m", "(?l)", 0), - s"$prefix `regexp_extract` is invalid: (?l)") + s"$prefix `regexp_extract` is invalid: '(?l)'") checkExceptionInExpression[SparkRuntimeException]( RegExpExtractAll(s, p, r), create_row("abc", "] [", 0), - s"$prefix `regexp_extract_all` is invalid: ] [") + s"$prefix `regexp_extract_all` is invalid: '] ['") checkExceptionInExpression[SparkRuntimeException]( RegExpInStr(s, p, r), create_row("abc", ", (", 0), - s"$prefix `regexp_instr` is invalid: , (") + s"$prefix `regexp_instr` is invalid: ', ('") } test("RegExpReplace: fails analysis if pos is not a constant") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index 15513037fe1b..f9726c4a6dd5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -1593,7 +1593,7 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val expr1 = Elt(Seq(indexExpr1)) assert(expr1.checkInputDataTypes() == DataTypeMismatch( - errorSubClass = "WRONG_NUM_PARAMS", + errorSubClass = "WRONG_NUM_ARGS", messageParameters = Map( "functionName" -> "`elt`", "expectedNum" -> "> 1", diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 6f111b777a6d..482c72679bb8 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -345,7 +345,7 @@ | org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT weekday('2009-07-30') | struct | | org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT weekofyear('2008-02-20') | struct | | org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct | -| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct | +| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct | | org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT xxhash64('Spark', array(123), 2) | struct | | org.apache.spark.sql.catalyst.expressions.Year | year | SELECT year('2016-07-30') | struct | | org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | struct>> | @@ -413,4 +413,4 @@ | org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT xpath('b1b2b3c1c2','a/b/text()') | structb1b2b3c1c2, a/b/text()):array> | | org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | SELECT xpath_long('12', 'sum(a/b)') | struct12, sum(a/b)):bigint> | | org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | SELECT xpath_short('12', 'sum(a/b)') | struct12, sum(a/b)):smallint> | -| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('bcc','a/c') | structbcc, a/c):string> | \ No newline at end of file +| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('bcc','a/c') | structbcc, a/c):string> | diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out index b3c1e94314d3..27ec604cb450 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out @@ -447,7 +447,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'a'", - "pattern" : "m%aca" + "pattern" : "'m%aca'" } } @@ -462,7 +462,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'a'", - "pattern" : "m%aca" + "pattern" : "'m%aca'" } } @@ -477,7 +477,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'a'", - "pattern" : "m%a%%a" + "pattern" : "'m%a%%a'" } } @@ -492,7 +492,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'a'", - "pattern" : "m%a%%a" + "pattern" : "'m%a%%a'" } } @@ -507,7 +507,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'e'", - "pattern" : "b_ear" + "pattern" : "'b_ear'" } } @@ -522,7 +522,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'e'", - "pattern" : "b_ear" + "pattern" : "'b_ear'" } } @@ -537,7 +537,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'e'", - "pattern" : "b_e__r" + "pattern" : "'b_e__r'" } } @@ -552,7 +552,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "_LEGACY_ERROR_TEMP_1216", "messageParameters" : { "message" : "the escape character is not allowed to precede 'e'", - "pattern" : "b_e__r" + "pattern" : "'b_e__r'" } } diff --git a/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out index 60094af7a991..3474aba39110 100644 --- a/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out @@ -163,7 +163,7 @@ org.apache.spark.SparkRuntimeException "errorClass" : "INVALID_PARAMETER_VALUE", "sqlState" : "22023", "messageParameters" : { - "expected" : "(?l)", + "expected" : "'(?l)'", "functionName" : "`regexp_extract`", "parameter" : "regexp" } @@ -334,7 +334,7 @@ org.apache.spark.SparkRuntimeException "errorClass" : "INVALID_PARAMETER_VALUE", "sqlState" : "22023", "messageParameters" : { - "expected" : "], [", + "expected" : "'], ['", "functionName" : "`regexp_extract_all`", "parameter" : "regexp" } @@ -671,7 +671,7 @@ org.apache.spark.SparkRuntimeException "errorClass" : "INVALID_PARAMETER_VALUE", "sqlState" : "22023", "messageParameters" : { - "expected" : ") ?", + "expected" : "') ?'", "functionName" : "`regexp_instr`", "parameter" : "regexp" } diff --git a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out index 5e04562a648d..a326e009af4d 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out @@ -894,7 +894,7 @@ org.apache.spark.SparkRuntimeException { "errorClass" : "_LEGACY_ERROR_TEMP_2130", "messageParameters" : { - "pattern" : "yyyy-MM-dd GGGGG" + "pattern" : "'yyyy-MM-dd GGGGG'" } } @@ -908,7 +908,7 @@ org.apache.spark.SparkRuntimeException { "errorClass" : "_LEGACY_ERROR_TEMP_2130", "messageParameters" : { - "pattern" : "dd MM yyyy EEEEEE" + "pattern" : "'dd MM yyyy EEEEEE'" } } @@ -922,7 +922,7 @@ org.apache.spark.SparkRuntimeException { "errorClass" : "_LEGACY_ERROR_TEMP_2130", "messageParameters" : { - "pattern" : "dd MM yyyy EEEEE" + "pattern" : "'dd MM yyyy EEEEE'" } } diff --git a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp.sql.out b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp.sql.out index 8d98209e6254..24273560001d 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp.sql.out @@ -871,7 +871,7 @@ org.apache.spark.SparkRuntimeException { "errorClass" : "_LEGACY_ERROR_TEMP_2130", "messageParameters" : { - "pattern" : "yyyy-MM-dd GGGGG" + "pattern" : "'yyyy-MM-dd GGGGG'" } } @@ -885,7 +885,7 @@ org.apache.spark.SparkRuntimeException { "errorClass" : "_LEGACY_ERROR_TEMP_2130", "messageParameters" : { - "pattern" : "dd MM yyyy EEEEEE" + "pattern" : "'dd MM yyyy EEEEEE'" } } @@ -899,7 +899,7 @@ org.apache.spark.SparkRuntimeException { "errorClass" : "_LEGACY_ERROR_TEMP_2130", "messageParameters" : { - "pattern" : "dd MM yyyy EEEEE" + "pattern" : "'dd MM yyyy EEEEE'" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 3f02429fe629..3adf17518182 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -4282,7 +4282,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.select(hash()) }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"hash()\"", @@ -4294,7 +4294,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.selectExpr("hash()") }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"hash()\"", @@ -4310,7 +4310,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.select(xxhash64()) }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"xxhash64()\"", @@ -4322,7 +4322,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.selectExpr("xxhash64()") }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"xxhash64()\"", @@ -4338,7 +4338,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.select(greatest()) }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"greatest()\"", @@ -4351,7 +4351,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.selectExpr("greatest()") }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"greatest()\"", @@ -4368,7 +4368,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.select(least()) }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"least()\"", @@ -4381,7 +4381,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { df.selectExpr("least()") }, - errorClass = "DATATYPE_MISMATCH.WRONG_NUM_PARAMS", + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", sqlState = None, parameters = Map( "sqlExpr" -> "\"least()\"", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index f775eb9ecfc0..a878e0ffa51f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -599,23 +599,37 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25") ).toDF("time") - val e = intercept[AnalysisException] { - df - .withColumn("time2", expr("time - INTERVAL 5 minutes")) - .select( - window($"time", "10 seconds").as("window1"), - window($"time2", "10 seconds").as("window2") - ) - .select( - $"window1.end".cast("string"), - window_time($"window1").cast("string"), - $"window2.end".cast("string"), - window_time($"window2").cast("string") - ) - } - assert(e.getMessage.contains( - "Multiple time/session window expressions would result in a cartesian product of rows, " + - "therefore they are currently not supported")) + val df2 = df + .withColumn("time2", expr("time - INTERVAL 15 minutes")) + .select(window($"time", "10 seconds").as("window1"), $"time2") + .select($"window1", window($"time2", "10 seconds").as("window2")) + + checkAnswer( + df2.select( + $"window1.end".cast("string"), + window_time($"window1").cast("string"), + $"window2.end".cast("string"), + window_time($"window2").cast("string")), + Seq( + Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999", + "2016-03-27 19:23:20", "2016-03-27 19:23:19.999999"), + Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999", + "2016-03-27 19:24:30", "2016-03-27 19:24:29.999999")) + ) + + // check column names + val df3 = df2 + .select( + window_time($"window1").cast("string"), + window_time($"window2").cast("string"), + window_time($"window2").as("wt2_aliased").cast("string") + ) + + val schema = df3.schema + + assert(schema.fields.exists(_.name == "window_time(window1)")) + assert(schema.fields.exists(_.name == "window_time(window2)")) + assert(schema.fields.exists(_.name == "wt2_aliased")) } test("window_time function on agg output") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index dd3ad0f4d6bd..e9aeba9c8206 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3709,7 +3709,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark }, errorClass = "_LEGACY_ERROR_TEMP_1216", parameters = Map( - "pattern" -> "m%@ca", + "pattern" -> "'m%@ca'", "message" -> "the escape character is not allowed to precede '@'")) checkAnswer(sql("SELECT s LIKE 'm@@ca' ESCAPE '@' FROM df"), Row(true)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 4b5863563677..7b67648d4752 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2491,4 +2491,21 @@ class SubquerySuite extends QueryTest Row("a")) } } + + test("SPARK-40862: correlated one-row subquery with non-deterministic expressions") { + import org.apache.spark.sql.functions.udf + withTempView("t1") { + sql("CREATE TEMP VIEW t1 AS SELECT ARRAY('a', 'b') a") + val func = udf(() => "a") + spark.udf.register("func", func.asNondeterministic()) + checkAnswer(sql( + """ + |SELECT ( + | SELECT array_sort(a, (i, j) -> rank[i] - rank[j])[0] || str AS sorted + | FROM (SELECT MAP('a', 1, 'b', 2) rank, func() AS str) + |) FROM t1 + |""".stripMargin), + Row("aa")) + } + } }