diff --git a/build.sbt b/build.sbt index 004f9fe972c..508cc4b5dfc 100644 --- a/build.sbt +++ b/build.sbt @@ -34,8 +34,13 @@ val all_scala_versions = Seq(scala212, scala213) val default_scala_version = settingKey[String]("Default Scala version") Global / default_scala_version := scala212 +val LATEST_RELEASED_SPARK_VERSION = "3.5.0" +val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT" +val sparkVersion = settingKey[String]("Spark version") +spark / sparkVersion := getSparkVersion() + // Dependent library versions -val sparkVersion = "3.5.0" +val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION val flinkVersion = "1.16.1" val hadoopVersion = "3.3.4" val scalaTestVersion = "3.2.15" @@ -62,6 +67,37 @@ crossScalaVersions := Nil val targetJvm = settingKey[String]("Target JVM version") Global / targetJvm := "1.8" +/** + * Returns the current spark version, which is the same value as `sparkVersion.value`. + * + * This logic exists in a separate method because some call sites cannot access `sparkVersion.value` + * e.g. callers that are not inside tasks or setting macros. + */ +def getSparkVersion(): String = { + val latestReleasedSparkVersionShort = getMajorMinorPatch(LATEST_RELEASED_SPARK_VERSION) match { + case (maj, min, _) => s"$maj.$min" + } + val allValidSparkVersionInputs = Seq( + "master", + "latest", + SPARK_MASTER_VERSION, + LATEST_RELEASED_SPARK_VERSION, + latestReleasedSparkVersionShort + ) + + // e.g. build/sbt -DsparkVersion=master, build/sbt -DsparkVersion=4.0.0-SNAPSHOT + val input = sys.props.getOrElse("sparkVersion", LATEST_RELEASED_SPARK_VERSION) + input match { + case LATEST_RELEASED_SPARK_VERSION | "latest" | `latestReleasedSparkVersionShort` => + LATEST_RELEASED_SPARK_VERSION + case SPARK_MASTER_VERSION | "master" => + SPARK_MASTER_VERSION + case _ => + throw new IllegalArgumentException(s"Invalid sparkVersion: $input. Must be one of " + + s"${allValidSparkVersionInputs.mkString("{", ",", "}")}") + } +} + lazy val commonSettings = Seq( organization := "io.delta", scalaVersion := default_scala_version.value, @@ -90,6 +126,58 @@ lazy val commonSettings = Seq( unidocSourceFilePatterns := Nil, ) +/** + * Note: we cannot access sparkVersion.value here, since that can only be used within a task or + * setting macro. + */ +def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match { + case LATEST_RELEASED_SPARK_VERSION => Seq( + scalaVersion := default_scala_version.value, + crossScalaVersions := all_scala_versions, + targetJvm := "1.8", + // For adding staged Spark RC versions, e.g.: + // resolvers += "Apache Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/", + Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5", + Antlr4 / antlr4Version := "4.9.3", + + // Java-/Scala-/Uni-Doc Settings + scalacOptions ++= Seq( + "-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc + ), + unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/")) + ) + + case SPARK_MASTER_VERSION => Seq( + scalaVersion := scala213, + crossScalaVersions := Seq(scala213), + targetJvm := "17", + resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/", + Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-master", + Antlr4 / antlr4Version := "4.13.1", + Test / javaOptions ++= Seq( + // Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153) + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" + ) + + // Java-/Scala-/Uni-Doc Settings + // This isn't working yet against Spark Master. + // 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason, + // generating delta-spark unidoc compiles delta-iceberg + // 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg + // issue above. + ) +} + lazy val spark = (project in file("spark")) .dependsOn(storage) .enablePlugins(Antlr4Plugin) @@ -99,29 +187,26 @@ lazy val spark = (project in file("spark")) scalaStyleSettings, sparkMimaSettings, releaseSettings, + crossSparkSettings(), libraryDependencies ++= Seq( // Adding test classifier seems to break transitive resolution of the core dependencies - "org.apache.spark" %% "spark-hive" % sparkVersion % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", - "org.apache.spark" %% "spark-core" % sparkVersion % "provided", - "org.apache.spark" %% "spark-catalyst" % sparkVersion % "provided", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", // Test deps "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", "junit" % "junit" % "4.12" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", - "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", ), - // For adding staged Spark RC versions, Ex: - // resolvers += "Apche Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/", Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++ listPythonFiles(baseDirectory.value.getParentFile / "python"), - - Antlr4 / antlr4Version:= "4.9.3", Antlr4 / antlr4PackageName := Some("io.delta.sql.parser"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, @@ -132,10 +217,6 @@ lazy val spark = (project in file("spark")) // Don't execute in parallel since we can't have multiple Sparks in the same JVM Test / parallelExecution := false, - scalacOptions ++= Seq( - "-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc - ), - javaOptions += "-Xmx1024m", // Configurations to speed up tests and reduce memory footprint @@ -172,11 +253,11 @@ lazy val spark = (project in file("spark")) Seq(file) }, TestParallelization.settings, - - // Unidoc settings - unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/")), ) - .configureUnidoc(generateScalaDoc = true) + .configureUnidoc( + generatedJavaDoc = getSparkVersion() == LATEST_RELEASED_SPARK_VERSION, + generateScalaDoc = getSparkVersion() == LATEST_RELEASED_SPARK_VERSION + ) lazy val contribs = (project in file("contribs")) .dependsOn(spark % "compile->compile;test->test;provided->provided") @@ -225,7 +306,7 @@ lazy val sharing = (project in file("sharing")) releaseSettings, Test / javaOptions ++= Seq("-ea"), libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "provided", "io.delta" %% "delta-sharing-client" % "1.0.4", @@ -234,10 +315,10 @@ lazy val sharing = (project in file("sharing")) "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", "junit" % "junit" % "4.12" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", - "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests", ) ).configureUnidoc() @@ -290,10 +371,10 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) "org.openjdk.jmh" % "jmh-core" % "1.37" % "test", "org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test", - "org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests", ), javaCheckstyleSettings("kernel/dev/checkstyle.xml"), // Unidoc settings @@ -346,7 +427,7 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb")) ).configureUnidoc() val icebergSparkRuntimeArtifactName = { - val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion) + val (expMaj, expMin, _) = getMajorMinorPatch(defaultSparkVersion) s"iceberg-spark-runtime-$expMaj.$expMin" } @@ -362,7 +443,7 @@ lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar")) libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-client" % hadoopVersion, "org.scalatest" %% "scalatest" % scalaTestVersion % "test", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" + "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" ) ) @@ -496,7 +577,7 @@ lazy val hudi = (project in file("hudi")) ExclusionRule(organization = "org.apache.hadoop"), ExclusionRule(organization = "org.apache.zookeeper"), ), - "org.apache.spark" %% "spark-avro" % sparkVersion % "test" excludeAll ExclusionRule(organization = "org.apache.hadoop"), + "org.apache.spark" %% "spark-avro" % defaultSparkVersion % "test" excludeAll ExclusionRule(organization = "org.apache.hadoop"), "org.apache.parquet" % "parquet-avro" % "1.12.3" % "compile" ), assembly / assemblyJarName := s"${name.value}-assembly_${scalaBinaryVersion.value}-${version.value}.jar", @@ -974,10 +1055,10 @@ lazy val compatibility = (project in file("connectors/oss-compatibility-tests")) "io.netty" % "netty-buffer" % "4.1.63.Final" % "test", "org.scalatest" %% "scalatest" % "3.1.0" % "test", "commons-io" % "commons-io" % "2.8.0" % "test", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test", - "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test", + "org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests", ) ) */ @@ -992,10 +1073,10 @@ lazy val goldenTables = (project in file("connectors/golden-tables")) // Test Dependencies "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "commons-io" % "commons-io" % "2.8.0" % "test", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test", - "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests" + "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test", + "org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests" ) ) @@ -1018,13 +1099,13 @@ lazy val sqlDeltaImport = (project in file("connectors/sql-delta-import")) Test / publishArtifact := false, libraryDependencies ++= Seq( "io.netty" % "netty-buffer" % "4.1.63.Final" % "test", - "org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "provided", + "org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "provided", "org.rogach" %% "scallop" % "3.5.1", "org.scalatest" %% "scalatest" % scalaTestVersionForConnectors % "test", "com.h2database" % "h2" % "1.4.200" % "test", - "org.apache.spark" % ("spark-catalyst_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "test", - "org.apache.spark" % ("spark-core_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "test", - "org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "test" + "org.apache.spark" % ("spark-catalyst_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "test", + "org.apache.spark" % ("spark-core_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "test", + "org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "test" ) ) diff --git a/project/Unidoc.scala b/project/Unidoc.scala index 3c30db1498e..ed050c30f95 100644 --- a/project/Unidoc.scala +++ b/project/Unidoc.scala @@ -45,8 +45,10 @@ object Unidoc { implicit class UnidocHelper(val projectToUpdate: Project) { def configureUnidoc( docTitle: String = null, + generatedJavaDoc: Boolean = true, generateScalaDoc: Boolean = false ): Project = { + if (!generatedJavaDoc && !generateScalaDoc) return projectToUpdate var updatedProject: Project = projectToUpdate if (generateScalaDoc) { diff --git a/spark/src/main/scala-spark-3.5/shims/ColumnDefinitionShim.scala b/spark/src/main/scala-spark-3.5/shims/ColumnDefinitionShim.scala new file mode 100644 index 00000000000..f9b99076a6b --- /dev/null +++ b/spark/src/main/scala-spark-3.5/shims/ColumnDefinitionShim.scala @@ -0,0 +1,33 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.shims + +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.types.{StructField, StructType} + +object ColumnDefinitionShim { + + /** + * Helps handle a breaking change in [[org.apache.spark.sql.catalyst.plans.logical.CreateTable]] + * between Spark 3.5 and Spark 4.0: + * - In 3.5, `CreateTable` accepts a `tableSchema: StructType`. + * - In 4.0, `CreateTable` accepts a `columns: Seq[ColumnDefinition]`. + */ + def parseColumns(columns: Seq[StructField], sqlParser: ParserInterface): StructType = { + StructType(columns.toSeq) + } +} diff --git a/spark/src/main/scala-spark-3.5/shims/IncrementalExecutionShim.scala b/spark/src/main/scala-spark-3.5/shims/IncrementalExecutionShim.scala new file mode 100644 index 00000000000..311f7afa6b9 --- /dev/null +++ b/spark/src/main/scala-spark-3.5/shims/IncrementalExecutionShim.scala @@ -0,0 +1,46 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.streaming.IncrementalExecution + +object IncrementalExecutionShim { + + /** + * Handles a breaking change in the [[IncrementalExecution]] constructor between Spark 3.5 and + * 4.0: + * - Spark 3.5: no `isFirstBatch: Boolean` param + * - Spark 4.0: adds `isFirstBatch: Boolean` param + */ + def newInstance( + sparkSession: SparkSession, + logicalPlan: LogicalPlan, + incrementalExecution: IncrementalExecution): IncrementalExecution = new IncrementalExecution( + sparkSession, + logicalPlan, + incrementalExecution.outputMode, + incrementalExecution.checkpointLocation, + incrementalExecution.queryId, + incrementalExecution.runId, + incrementalExecution.currentBatchId, + incrementalExecution.prevOffsetSeqMetadata, + incrementalExecution.offsetSeqMetadata, + incrementalExecution.watermarkPropagator + ) +} diff --git a/spark/src/main/scala-spark-3.5/shims/UnresolvedTableShim.scala b/spark/src/main/scala-spark-3.5/shims/UnresolvedTableShim.scala new file mode 100644 index 00000000000..a532ca4f68c --- /dev/null +++ b/spark/src/main/scala-spark-3.5/shims/UnresolvedTableShim.scala @@ -0,0 +1,35 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.shims + +import org.apache.spark.sql.catalyst.analysis.UnresolvedTable + +object UnresolvedTableImplicits { + + /** + * Handles a breaking change in [[UnresolvedTable]] constructor between Spark 3.5 and 4.0: + * - Spark 3.5: requires `relationTypeMismatchHint` param + * - Spark 4.0: gets rid of `relationTypeMismatchHint`param + */ + implicit class UnresolvedTableShim(self: UnresolvedTable.type) { + def apply( + tableNameParts: Seq[String], + commandName: String): UnresolvedTable = { + UnresolvedTable(tableNameParts, commandName, relationTypeMismatchHint = None) + } + } +} diff --git a/spark/src/main/scala-spark-master/shims/ColumnDefinitionShim.scala b/spark/src/main/scala-spark-master/shims/ColumnDefinitionShim.scala new file mode 100644 index 00000000000..084ee0d8c28 --- /dev/null +++ b/spark/src/main/scala-spark-master/shims/ColumnDefinitionShim.scala @@ -0,0 +1,34 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.shims + +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.ColumnDefinition +import org.apache.spark.sql.types.StructField + +object ColumnDefinitionShim { + + /** + * Helps handle a breaking change in [[org.apache.spark.sql.catalyst.plans.logical.CreateTable]] + * between Spark 3.5 and Spark 4.0: + * - In 3.5, `CreateTable` accepts a `tableSchema: StructType`. + * - In 4.0, `CreateTable` accepts a `columns: Seq[ColumnDefinition]`. + */ + def parseColumns(columns: Seq[StructField], sqlParser: ParserInterface): Seq[ColumnDefinition] = { + columns.map(ColumnDefinition.fromV1Column(_, sqlParser)).toSeq + } +} diff --git a/spark/src/main/scala-spark-master/shims/IncrementalExecutionShim.scala b/spark/src/main/scala-spark-master/shims/IncrementalExecutionShim.scala new file mode 100644 index 00000000000..912db76bde9 --- /dev/null +++ b/spark/src/main/scala-spark-master/shims/IncrementalExecutionShim.scala @@ -0,0 +1,47 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.streaming.IncrementalExecution + +object IncrementalExecutionShim { + + /** + * Handles a breaking change in the [[IncrementalExecution]] constructor between Spark 3.5 and + * 4.0: + * - Spark 3.5: no `isFirstBatch: Boolean` param + * - Spark 4.0: adds `isFirstBatch: Boolean` param + */ + def newInstance( + sparkSession: SparkSession, + logicalPlan: LogicalPlan, + incrementalExecution: IncrementalExecution): IncrementalExecution = new IncrementalExecution( + sparkSession, + logicalPlan, + incrementalExecution.outputMode, + incrementalExecution.checkpointLocation, + incrementalExecution.queryId, + incrementalExecution.runId, + incrementalExecution.currentBatchId, + incrementalExecution.prevOffsetSeqMetadata, + incrementalExecution.offsetSeqMetadata, + incrementalExecution.watermarkPropagator, + incrementalExecution.isFirstBatch // Spark 4.0 API + ) +} diff --git a/spark/src/main/scala-spark-master/shims/UnresolvedTableShim.scala b/spark/src/main/scala-spark-master/shims/UnresolvedTableShim.scala new file mode 100644 index 00000000000..e28e81ace47 --- /dev/null +++ b/spark/src/main/scala-spark-master/shims/UnresolvedTableShim.scala @@ -0,0 +1,35 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.shims + +import org.apache.spark.sql.catalyst.analysis.UnresolvedTable + +object UnresolvedTableImplicits { + + /** + * Handles a breaking change in [[UnresolvedTable]] constructor between Spark 3.5 and 4.0: + * - Spark 3.5: requires `relationTypeMismatchHint` param + * - Spark 4.0: gets rid of `relationTypeMismatchHint`param + */ + implicit class UnresolvedTableShim(self: UnresolvedTable.type) { + def apply( + tableNameParts: Seq[String], + commandName: String): UnresolvedTable = { + UnresolvedTable(tableNameParts, commandName) + } + } +} diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index d4ece43b8ce..8708e9de14f 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -44,6 +44,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.TimeTravel import org.apache.spark.sql.delta.skipping.clustering.temp.{AlterTableClusterBy, ClusterByParserUtils, ClusterByPlan, ClusterBySpec} +import org.apache.spark.sql.delta.shims.UnresolvedTableImplicits._ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.commands._ @@ -390,7 +391,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { val targetIdentifier = visitTableIdentifier(ctx.table) val tableNameParts = targetIdentifier.database.toSeq :+ targetIdentifier.table - val targetTable = createUnresolvedTable(tableNameParts, "REORG") + val targetTable = UnresolvedTable(tableNameParts, "REORG") val reorgTableSpec = if (ctx.PURGE != null) { DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) @@ -519,12 +520,6 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { builder.build()) } - private def createUnresolvedTable( - tableName: Seq[String], - commandName: String): UnresolvedTable = { - UnresolvedTable(tableName, commandName, relationTypeMismatchHint = None) - } - // Build the text of the CHECK constraint expression. The user-specified whitespace is in the // HIDDEN channel where we can't get to it, so we just paste together all the tokens with a single // space. This produces some strange spacing (e.g. `structCol . arr [ 0 ]`), but right now we @@ -546,7 +541,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { val checkConstraint = ctx.constraint().asInstanceOf[CheckConstraintContext] AlterTableAddConstraint( - createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, + UnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, "ALTER TABLE ... ADD CONSTRAINT"), ctx.name.getText, buildCheckConstraintText(checkConstraint.exprToken().asScala.toSeq)) @@ -555,7 +550,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { override def visitDropTableConstraint( ctx: DropTableConstraintContext): LogicalPlan = withOrigin(ctx) { AlterTableDropConstraint( - createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, + UnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, "ALTER TABLE ... DROP CONSTRAINT"), ctx.name.getText, ifExists = ctx.EXISTS != null) @@ -579,7 +574,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { override def visitAlterTableDropFeature(ctx: AlterTableDropFeatureContext): LogicalPlan = { val truncateHistory = ctx.TRUNCATE != null && ctx.HISTORY != null AlterTableDropFeature( - createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, + UnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, "ALTER TABLE ... DROP FEATURE"), visitFeatureNameValue(ctx.featureName), truncateHistory) @@ -590,7 +585,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { */ override def visitAlterTableClusterBy(ctx: AlterTableClusterByContext): LogicalPlan = { val table = - createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, + UnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq, "ALTER TABLE ... CLUSTER BY") if (ctx.NONE() != null) { AlterTableClusterBy(table, None) diff --git a/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala b/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala index 460049adf4a..82e1f3664b1 100644 --- a/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala +++ b/spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala @@ -20,13 +20,14 @@ import scala.collection.mutable import org.apache.spark.sql.delta.{DeltaErrors, DeltaTableUtils} import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession +import org.apache.spark.sql.delta.shims.ColumnDefinitionShim import org.apache.spark.sql.delta.sources.DeltaSQLConf import io.delta.tables.execution._ import org.apache.spark.annotation._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LogicalPlan, ReplaceTable} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, ReplaceTable} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.SQLExecution @@ -341,7 +342,8 @@ class DeltaTableBuilder private[tables]( val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table) CreateTable( unresolvedTable, - StructType(columns.toSeq), + // Callout: Spark 3.5 returns StructType, Spark 4.0 returns Seq[ColumnDefinition] + ColumnDefinitionShim.parseColumns(columns.toSeq, spark.sessionState.sqlParser), partitioning, tableSpec, ifNotExists) @@ -349,7 +351,8 @@ class DeltaTableBuilder private[tables]( val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table) ReplaceTable( unresolvedTable, - StructType(columns.toSeq), + // Callout: Spark 3.5 returns StructType, Spark 4.0 returns Seq[ColumnDefinition] + ColumnDefinitionShim.parseColumns(columns.toSeq, spark.sessionState.sqlParser), partitioning, tableSpec, orCreate) diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 6f12986c676..490a1238d7a 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, import org.apache.spark.sql.delta.commands.DeltaCommand import org.apache.spark.sql.delta.commands.VacuumCommand import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable +import org.apache.spark.sql.delta.shims.UnresolvedTableImplicits._ import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand} import org.apache.spark.sql.types.StringType @@ -76,8 +77,7 @@ object VacuumTableCommand { horizonHours: Option[Double], dryRun: Boolean): VacuumTableCommand = { val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM") - val unresolvedInventoryTable = inventoryTable.map(rt => - UnresolvedTable(rt.nameParts, "VACUUM", relationTypeMismatchHint = None)) + val unresolvedInventoryTable = inventoryTable.map(rt => UnresolvedTable(rt.nameParts, "VACUUM")) VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala index e47abb3d19e..6fd4c78b924 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.constraints.{Constraint, Constraints} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.shims.IncrementalExecutionShim import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder} @@ -216,18 +217,10 @@ object ColumnWithDefaultExprUtils extends DeltaLogging { df: DataFrame, cols: Column*): DataFrame = { val newMicroBatch = df.select(cols: _*) - val newIncrementalExecution = new IncrementalExecution( + val newIncrementalExecution = IncrementalExecutionShim.newInstance( newMicroBatch.sparkSession, newMicroBatch.queryExecution.logical, - incrementalExecution.outputMode, - incrementalExecution.checkpointLocation, - incrementalExecution.queryId, - incrementalExecution.runId, - incrementalExecution.currentBatchId, - incrementalExecution.prevOffsetSeqMetadata, - incrementalExecution.offsetSeqMetadata, - incrementalExecution.watermarkPropagator - ) + incrementalExecution) newIncrementalExecution.executedPlan // Force the lazy generation of execution plan diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index e0616efbe18..2c35f23f5a7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.shims.UnresolvedTableImplicits._ import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.hadoop.fs.{FileSystem, Path} @@ -586,8 +587,7 @@ object UnresolvedDeltaPathOrIdentifier { cmd: String): LogicalPlan = { (path, tableIdentifier) match { case (Some(p), None) => UnresolvedPathBasedDeltaTable(p, Map.empty, cmd) - case (None, Some(t)) => - UnresolvedTable(t.nameParts, cmd, None) + case (None, Some(t)) => UnresolvedTable(t.nameParts, cmd) case _ => throw new IllegalArgumentException( s"Exactly one of path or tableIdentifier must be provided to $cmd") } @@ -608,8 +608,7 @@ object UnresolvedPathOrIdentifier { tableIdentifier: Option[TableIdentifier], cmd: String): LogicalPlan = { (path, tableIdentifier) match { - case (_, Some(t)) => - UnresolvedTable(t.nameParts, cmd, None) + case (_, Some(t)) => UnresolvedTable(t.nameParts, cmd) case (Some(p), None) => UnresolvedPathBasedTable(p, Map.empty, cmd) case _ => throw new IllegalArgumentException( s"At least one of path or tableIdentifier must be provided to $cmd") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index e364a65ae15..d007f1b5e8b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.shims.UnresolvedTableImplicits._ import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.fs.Path @@ -329,7 +330,7 @@ object DeltaTableV2 { /** Resolves a table identifier into a DeltaTableV2, leveraging standard v2 table resolution. */ def apply(spark: SparkSession, tableId: TableIdentifier, cmd: String): DeltaTableV2 = { - resolve(spark, UnresolvedTable(tableId.nameParts, cmd, None), cmd) + resolve(spark, UnresolvedTable(tableId.nameParts, cmd), cmd) } /** Applies standard v2 table resolution to an unresolved Delta table plan node */ diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 6d60358e6d7..94ae2c4192d 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.CloneTableSQLTestUtils import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN import org.apache.spark.sql.delta.{UnresolvedPathBasedDeltaTable, UnresolvedPathBasedTable} import org.apache.spark.sql.delta.commands.{DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable} +import org.apache.spark.sql.delta.shims.UnresolvedTableImplicits._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable} @@ -40,27 +41,23 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`. val parser = new DeltaSqlParser(null) assert(parser.parsePlan("vacuum 123_") === - VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, None, None, false)) + VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum 1a.123_") === - VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), - None, None, None, false)) + VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123A") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), - None, None, None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123E3_column") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123D_column") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM"), None, None, None, false)) assert(parser.parsePlan("vacuum a.123BD_column") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM"), None, None, None, false)) - assert(parser.parsePlan("vacuum delta.`/tmp/table`") === - VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM", None), + VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM"), None, None, None, false)) - assert(parser.parsePlan("vacuum \"/tmp/table\"") === VacuumTableCommand( UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, None, None, false)) @@ -91,19 +88,19 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("tbl"), "OPTIMIZE")) parsedCmd = parser.parsePlan("OPTIMIZE db.tbl") assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("tbl", "db")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("db", "tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("db", "tbl"), "OPTIMIZE")) parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl") assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("tbl", "db", "catalog_foo")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE")) assert(parser.parsePlan("OPTIMIZE tbl_${system:spark.testing}") === OptimizeTableCommand(None, Some(tblId("tbl_true")), Nil)(Nil)) @@ -135,7 +132,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { assert(parsedCmd === OptimizeTableCommand(None, Some(tblId("/path/to/tbl", "delta")), Nil)(Nil)) assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === - UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE", None)) + UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE")) assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1") === OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(Nil)) @@ -181,7 +178,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Desc detail on a table assert(parser.parsePlan("DESCRIBE DETAIL catalog_foo.db.tbl") === DescribeDeltaDetailCommand( - UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaDetailCommand.CMD_NAME, None), + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaDetailCommand.CMD_NAME), Map.empty)) // Desc detail on a raw path @@ -193,7 +190,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Desc detail on a delta raw path assert(parser.parsePlan("DESCRIBE DETAIL delta.`dummy_raw_path`") === DescribeDeltaDetailCommand( - UnresolvedTable(Seq("delta", "dummy_raw_path"), DescribeDeltaDetailCommand.CMD_NAME, None), + UnresolvedTable(Seq("delta", "dummy_raw_path"), DescribeDeltaDetailCommand.CMD_NAME), Map.empty)) } @@ -201,17 +198,17 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { val parser = new DeltaSqlParser(null) var parsedCmd = parser.parsePlan("DESCRIBE HISTORY catalog_foo.db.tbl") assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child === - UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaHistory.COMMAND_NAME, None)) + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaHistory.COMMAND_NAME)) parsedCmd = parser.parsePlan("DESCRIBE HISTORY delta.`/path/to/tbl`") assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child === - UnresolvedTable(Seq("delta", "/path/to/tbl"), DescribeDeltaHistory.COMMAND_NAME, None)) + UnresolvedTable(Seq("delta", "/path/to/tbl"), DescribeDeltaHistory.COMMAND_NAME)) parsedCmd = parser.parsePlan("DESCRIBE HISTORY '/path/to/tbl'") assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child === UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, DescribeDeltaHistory.COMMAND_NAME)) } private def targetPlanForTable(tableParts: String*): UnresolvedTable = - UnresolvedTable(tableParts.toSeq, "REORG", relationTypeMismatchHint = None) + UnresolvedTable(tableParts.toSeq, "REORG") test("REORG command is parsed as expected") { val parser = new DeltaSqlParser(null) @@ -362,7 +359,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { val parsedCmd = parser.parsePlan(sql) assert(parsedCmd === AlterTableDropFeature( - UnresolvedTable(Seq(table), "ALTER TABLE ... DROP FEATURE", None), + UnresolvedTable(Seq(table), "ALTER TABLE ... DROP FEATURE"), featureName, truncateHistory)) }