Skip to content

Commit

Permalink
Added test for client mode pyspark shell into PythonTestsSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
AzureQ committed Nov 19, 2018
1 parent c2f782b commit 4bf6bc6
Showing 1 changed file with 89 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import scala.collection.JavaConverters._
import org.scalatest.concurrent.Eventually

import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT}

private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

import PythonTestsSuite._
Expand Down Expand Up @@ -89,6 +94,90 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run bin/pyspark in client mode", k8sTestTag) {
val labels = Map("spark-app-selector" -> driverPodName)
val driverPort = 7077
val blockManagerPort = 10000
val driverService = testBackend
.getKubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
.withNewMetadata()
.withName(s"$driverPodName-svc")
.endMetadata()
.withNewSpec()
.withClusterIP("None")
.withSelector(labels.asJava)
.addNewPort()
.withName("driver-port")
.withPort(driverPort)
.withNewTargetPort(driverPort)
.endPort()
.addNewPort()
.withName("block-manager")
.withPort(blockManagerPort)
.withNewTargetPort(blockManagerPort)
.endPort()
.endSpec()
.done()
try {
val driverPod = testBackend
.getKubernetesClient
.pods()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
.withNewMetadata()
.withName(driverPodName)
.withLabels(labels.asJava)
.endMetadata()
.withNewSpec()
.withServiceAccountName(kubernetesTestComponents.serviceAccountName)
.addNewContainer()
.withName("pyspark-example")
.withImage(image)
.withImagePullPolicy("IfNotPresent")
.withCommand("/opt/spark/bin/pyspark")
.addToArgs("--master", s"k8s://https://kubernetes.default.svc")
.addToArgs("--deploy-mode", "client")
.addToArgs("--conf", s"spark.kubernetes.container.image="+pyImage)
.addToArgs(
"--conf",
s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}")
.addToArgs("--conf", "spark.kubernetes.authenticate.oauthTokenFile=" +
"/var/run/secrets/kubernetes.io/serviceaccount/token")
.addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" +
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
.addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName")
.addToArgs("--conf", "spark.executor.memory=500m")
.addToArgs("--conf", "spark.executor.cores=1")
.addToArgs("--conf", "spark.executor.instances=1")
.addToArgs("--conf",
s"spark.driver.host=" +
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
.addToArgs("--conf", s"spark.driver.port=$driverPort")
.addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort")
.endContainer()
.endSpec()
.done()
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPodName)
.getLog
.contains("SparkSession available"), "The application did not complete.")
}
} finally {
// Have to delete the service manually since it doesn't have an owner reference
kubernetesTestComponents
.kubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
.delete(driverService)
}
}

}

private[spark] object PythonTestsSuite {
Expand Down

0 comments on commit 4bf6bc6

Please sign in to comment.