Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 126 additions & 45 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ val all_scala_versions = Seq(scala212, scala213)
val default_scala_version = settingKey[String]("Default Scala version")
Global / default_scala_version := scala212

val LATEST_RELEASED_SPARK_VERSION = "3.5.0"
val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT"
val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()

// Dependent library versions
val sparkVersion = "3.5.0"
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
val flinkVersion = "1.16.1"
val hadoopVersion = "3.3.4"
val scalaTestVersion = "3.2.15"
Expand All @@ -62,6 +67,37 @@ crossScalaVersions := Nil
val targetJvm = settingKey[String]("Target JVM version")
Global / targetJvm := "1.8"

/**
* Returns the current spark version, which is the same value as `sparkVersion.value`.
*
* This logic exists in a separate method because some call sites cannot access `sparkVersion.value`
* e.g. callers that are not inside tasks or setting macros.
*/
def getSparkVersion(): String = {
val latestReleasedSparkVersionShort = getMajorMinorPatch(LATEST_RELEASED_SPARK_VERSION) match {
case (maj, min, _) => s"$maj.$min"
}
val allValidSparkVersionInputs = Seq(
"master",
"latest",
SPARK_MASTER_VERSION,
LATEST_RELEASED_SPARK_VERSION,
latestReleasedSparkVersionShort
)

// e.g. build/sbt -DsparkVersion=master, build/sbt -DsparkVersion=4.0.0-SNAPSHOT
val input = sys.props.getOrElse("sparkVersion", LATEST_RELEASED_SPARK_VERSION)
input match {
case LATEST_RELEASED_SPARK_VERSION | "latest" | `latestReleasedSparkVersionShort` =>
LATEST_RELEASED_SPARK_VERSION
case SPARK_MASTER_VERSION | "master" =>
SPARK_MASTER_VERSION
case _ =>
throw new IllegalArgumentException(s"Invalid sparkVersion: $input. Must be one of " +
s"${allValidSparkVersionInputs.mkString("{", ",", "}")}")
}
}

lazy val commonSettings = Seq(
organization := "io.delta",
scalaVersion := default_scala_version.value,
Expand Down Expand Up @@ -90,6 +126,58 @@ lazy val commonSettings = Seq(
unidocSourceFilePatterns := Nil,
)

/**
* Note: we cannot access sparkVersion.value here, since that can only be used within a task or
* setting macro.
*/
def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
case LATEST_RELEASED_SPARK_VERSION => Seq(
scalaVersion := default_scala_version.value,
crossScalaVersions := all_scala_versions,
targetJvm := "1.8",
// For adding staged Spark RC versions, e.g.:
// resolvers += "Apache Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/",
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5",
Antlr4 / antlr4Version := "4.9.3",

// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq(
scalaVersion := scala213,
crossScalaVersions := Seq(scala213),
targetJvm := "17",
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/",
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-master",
Antlr4 / antlr4Version := "4.13.1",
Test / javaOptions ++= Seq(
// Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153)
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.io=ALL-UNNAMED",
"--add-opens=java.base/java.net=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
)

// Java-/Scala-/Uni-Doc Settings
// This isn't working yet against Spark Master.
// 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
// generating delta-spark unidoc compiles delta-iceberg
// 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
// issue above.
)
}

lazy val spark = (project in file("spark"))
.dependsOn(storage)
.enablePlugins(Antlr4Plugin)
Expand All @@ -99,29 +187,26 @@ lazy val spark = (project in file("spark"))
scalaStyleSettings,
sparkMimaSettings,
releaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
"junit" % "junit" % "4.12" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
),
// For adding staged Spark RC versions, Ex:
// resolvers += "Apche Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/",
Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++
listPythonFiles(baseDirectory.value.getParentFile / "python"),

Antlr4 / antlr4Version:= "4.9.3",
Antlr4 / antlr4PackageName := Some("io.delta.sql.parser"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
Expand All @@ -132,10 +217,6 @@ lazy val spark = (project in file("spark"))
// Don't execute in parallel since we can't have multiple Sparks in the same JVM
Test / parallelExecution := false,

scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),

javaOptions += "-Xmx1024m",

// Configurations to speed up tests and reduce memory footprint
Expand Down Expand Up @@ -172,11 +253,11 @@ lazy val spark = (project in file("spark"))
Seq(file)
},
TestParallelization.settings,

// Unidoc settings
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/")),
)
.configureUnidoc(generateScalaDoc = true)
.configureUnidoc(
generatedJavaDoc = getSparkVersion() == LATEST_RELEASED_SPARK_VERSION,
generateScalaDoc = getSparkVersion() == LATEST_RELEASED_SPARK_VERSION
)

lazy val contribs = (project in file("contribs"))
.dependsOn(spark % "compile->compile;test->test;provided->provided")
Expand Down Expand Up @@ -225,7 +306,7 @@ lazy val sharing = (project in file("sharing"))
releaseSettings,
Test / javaOptions ++= Seq("-ea"),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "provided",

"io.delta" %% "delta-sharing-client" % "1.0.4",

Expand All @@ -234,10 +315,10 @@ lazy val sharing = (project in file("sharing"))
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
"junit" % "junit" % "4.12" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests",
)
).configureUnidoc()

Expand Down Expand Up @@ -290,10 +371,10 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.openjdk.jmh" % "jmh-core" % "1.37" % "test",
"org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test",

"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand Down Expand Up @@ -346,7 +427,7 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
).configureUnidoc()

val icebergSparkRuntimeArtifactName = {
val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion)
val (expMaj, expMin, _) = getMajorMinorPatch(defaultSparkVersion)
s"iceberg-spark-runtime-$expMaj.$expMin"
}

Expand All @@ -362,7 +443,7 @@ lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar"))
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.apache.spark" %% "spark-core" % sparkVersion % "test"
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test"
)
)

Expand Down Expand Up @@ -496,7 +577,7 @@ lazy val hudi = (project in file("hudi"))
ExclusionRule(organization = "org.apache.hadoop"),
ExclusionRule(organization = "org.apache.zookeeper"),
),
"org.apache.spark" %% "spark-avro" % sparkVersion % "test" excludeAll ExclusionRule(organization = "org.apache.hadoop"),
"org.apache.spark" %% "spark-avro" % defaultSparkVersion % "test" excludeAll ExclusionRule(organization = "org.apache.hadoop"),
"org.apache.parquet" % "parquet-avro" % "1.12.3" % "compile"
),
assembly / assemblyJarName := s"${name.value}-assembly_${scalaBinaryVersion.value}-${version.value}.jar",
Expand Down Expand Up @@ -974,10 +1055,10 @@ lazy val compatibility = (project in file("connectors/oss-compatibility-tests"))
"io.netty" % "netty-buffer" % "4.1.63.Final" % "test",
"org.scalatest" %% "scalatest" % "3.1.0" % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
)
)
*/
Expand All @@ -992,10 +1073,10 @@ lazy val goldenTables = (project in file("connectors/golden-tables"))
// Test Dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests"
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests"
)
)

Expand All @@ -1018,13 +1099,13 @@ lazy val sqlDeltaImport = (project in file("connectors/sql-delta-import"))
Test / publishArtifact := false,
libraryDependencies ++= Seq(
"io.netty" % "netty-buffer" % "4.1.63.Final" % "test",
"org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "provided",
"org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "provided",
"org.rogach" %% "scallop" % "3.5.1",
"org.scalatest" %% "scalatest" % scalaTestVersionForConnectors % "test",
"com.h2database" % "h2" % "1.4.200" % "test",
"org.apache.spark" % ("spark-catalyst_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "test",
"org.apache.spark" % ("spark-core_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "test",
"org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % sparkVersion % "test"
"org.apache.spark" % ("spark-catalyst_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "test",
"org.apache.spark" % ("spark-core_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "test",
"org.apache.spark" % ("spark-sql_" + sqlDeltaImportScalaVersion(scalaBinaryVersion.value)) % defaultSparkVersion % "test"
)
)

Expand Down
2 changes: 2 additions & 0 deletions project/Unidoc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ object Unidoc {
implicit class UnidocHelper(val projectToUpdate: Project) {
def configureUnidoc(
docTitle: String = null,
generatedJavaDoc: Boolean = true,
generateScalaDoc: Boolean = false
): Project = {
if (!generatedJavaDoc && !generateScalaDoc) return projectToUpdate

var updatedProject: Project = projectToUpdate
if (generateScalaDoc) {
Expand Down
33 changes: 33 additions & 0 deletions spark/src/main/scala-spark-3.5/shims/ColumnDefinitionShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.shims

import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.types.{StructField, StructType}

object ColumnDefinitionShim {

/**
* Helps handle a breaking change in [[org.apache.spark.sql.catalyst.plans.logical.CreateTable]]
* between Spark 3.5 and Spark 4.0:
* - In 3.5, `CreateTable` accepts a `tableSchema: StructType`.
* - In 4.0, `CreateTable` accepts a `columns: Seq[ColumnDefinition]`.
*/
def parseColumns(columns: Seq[StructField], sqlParser: ParserInterface): StructType = {
StructType(columns.toSeq)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.shims

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.streaming.IncrementalExecution

object IncrementalExecutionShim {

/**
* Handles a breaking change in the [[IncrementalExecution]] constructor between Spark 3.5 and
* 4.0:
* - Spark 3.5: no `isFirstBatch: Boolean` param
* - Spark 4.0: adds `isFirstBatch: Boolean` param
*/
def newInstance(
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
incrementalExecution: IncrementalExecution): IncrementalExecution = new IncrementalExecution(
sparkSession,
logicalPlan,
incrementalExecution.outputMode,
incrementalExecution.checkpointLocation,
incrementalExecution.queryId,
incrementalExecution.runId,
incrementalExecution.currentBatchId,
incrementalExecution.prevOffsetSeqMetadata,
incrementalExecution.offsetSeqMetadata,
incrementalExecution.watermarkPropagator
)
}
Loading