Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.UUID
import java.util.regex.Pattern

import com.google.common.io.PatternFilenameFilter
import io.fabric8.kubernetes.api.model.{Container, Pod}
import io.fabric8.kubernetes.api.model.Pod
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
Expand All @@ -43,6 +43,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkAppConf: SparkAppConf = _
private var image: String = _
private var pyImage: String = _
private var containerLocalSparkDistroExamplesJar: String = _
private var appLocator: String = _
private var driverPodName: String = _
Expand All @@ -65,6 +66,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
val imageTag = getTestImageTag
val imageRepo = getTestImageRepo
image = s"$imageRepo/spark:$imageTag"
pyImage = s"$imageRepo/spark-py:$imageTag"

val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
.toFile
Expand Down Expand Up @@ -156,46 +158,140 @@ private[spark] class KubernetesSuite extends SparkFunSuite
})
}

// TODO(ssuchter): Enable the below after debugging
// test("Run PageRank using remote data file") {
// sparkAppConf
// .set("spark.kubernetes.mountDependencies.filesDownloadDir",
// CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
// .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
// runSparkPageRankAndVerifyCompletion(
// appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
// }
test("Run extraJVMOptions check on driver") {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
runSparkJVMCheckAndVerifyCompletion(
expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
}

test("Run SparkRemoteFileTest using a remote data file") {
sparkAppConf
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
runSparkRemoteCheckAndVerifyCompletion(
appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
}

test("Run PySpark on simple pi.py example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
expectedLogOnCompletion = Seq("Pi is roughly 3"),
appArgs = Array("5"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false)
}

test("Run PySpark with Python2 to test a pyfiles example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run PySpark with Python3 to test a pyfiles example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "3")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python3"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

private def runSparkPiAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String] = Array.empty[String],
appLocator: String = appLocator): Unit = {
appLocator: String = appLocator,
isJVM: Boolean = true ): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
Seq("Pi is roughly 3"),
appArgs,
driverPodChecker,
executorPodChecker,
appLocator)
appLocator,
isJVM)
}

private def runSparkPageRankAndVerifyCompletion(
private def runSparkRemoteCheckAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String],
appLocator: String = appLocator): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PAGE_RANK_MAIN_CLASS,
Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
SPARK_REMOTE_MAIN_CLASS,
Seq(s"Mounting of ${appArgs.head} was true"),
appArgs,
driverPodChecker,
executorPodChecker,
appLocator)
appLocator,
true)
}

private def runSparkJVMCheckAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
mainClass: String = SPARK_DRIVER_MAIN_CLASS,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
appArgs: Array[String] = Array("5"),
expectedJVMValue: Seq[String]): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(
appArguments,
sparkAppConf,
TIMEOUT.value.toSeconds.toInt,
sparkHomeDir,
true)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "driver")
.list()
.getItems
.get(0)
doBasicDriverPodCheck(driverPod)

Eventually.eventually(TIMEOUT, INTERVAL) {
expectedJVMValue.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains(e), "The application did not complete.")
}
}
}

private def runSparkApplicationAndVerifyCompletion(
Expand All @@ -205,12 +301,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite
appArgs: Array[String],
driverPodChecker: Pod => Unit,
executorPodChecker: Pod => Unit,
appLocator: String): Unit = {
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
SparkAppLauncher.launch(
appArguments,
sparkAppConf,
TIMEOUT.value.toSeconds.toInt,
sparkHomeDir,
isJVM,
pyFiles)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand Down Expand Up @@ -248,11 +352,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def checkCustomSettings(pod: Pod): Unit = {
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
Expand Down Expand Up @@ -287,14 +402,22 @@ private[spark] object KubernetesSuite {
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"

// val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
val TEST_SECRET_NAME_PREFIX = "test-secret-"
val TEST_SECRET_KEY = "test-key"
val TEST_SECRET_VALUE = "test-data"
val TEST_SECRET_MOUNT_PATH = "/etc/secrets"

// val REMOTE_PAGE_RANK_DATA_FILE =
// "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
// val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
// s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
val REMOTE_PAGE_RANK_DATA_FILE =
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"

// case object ShuffleNotReadyException extends Exception
case object ShuffleNotReadyException extends Exception
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,33 @@ private[spark] case class SparkAppArguments(
appArgs: Array[String])

private[spark] object SparkAppLauncher extends Logging {

def launch(
appArguments: SparkAppArguments,
appConf: SparkAppConf,
timeoutSecs: Int,
sparkHomeDir: Path): Unit = {
sparkHomeDir: Path,
isJVM: Boolean,
pyFiles: Option[String] = None): Unit = {
val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath,
val preCommandLine = if (isJVM) {
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
"--deploy-mode", "cluster",
"--class", appArguments.mainClass,
"--master", appConf.get("spark.master")
) ++ appConf.toStringArray :+
appArguments.mainAppResource) ++
appArguments.appArgs
ProcessUtils.executeProcess(commandLine, timeoutSecs)
"--master", appConf.get("spark.master"))
} else {
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
"--deploy-mode", "cluster",
"--master", appConf.get("spark.master"))
}
val commandLine =
pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++
appConf.toStringArray :+ appArguments.mainAppResource

if (appArguments.appArgs.nonEmpty) {
commandLine += appArguments.appArgs.mkString(" ")
}
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
}
}