diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 174030dcd0d84..375743ba13fff 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -37,6 +37,7 @@ import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -74,6 +75,9 @@ class KubernetesSuite extends SparkFunSuite protected override def logForFailedTest(): Unit = { logInfo("\n\n===== EXTRA LOGS FOR THE FAILED TEST\n") + logInfo("BEGIN DESCRIBE PODS for application\n" + + Minikube.describePods(s"spark-app-locator=$appLocator").mkString("\n")) + logInfo("END DESCRIBE PODS for the application") val driverPodOption = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index cc05990893e36..e259979ad0329 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -33,6 +33,7 @@ object ProcessUtils extends Logging { def executeProcess( fullCommand: Array[String], timeout: Long, + dumpOutput: Boolean = true, dumpErrors: Boolean = true, env: Map[String, String] = Map.empty[String, String]): Seq[String] = { val pb = new ProcessBuilder().command(fullCommand: _*) @@ -42,7 +43,9 @@ object ProcessUtils extends Logging { val outputLines = new ArrayBuffer[String] Utils.tryWithResource(proc.getInputStream)( Source.fromInputStream(_, StandardCharsets.UTF_8.name()).getLines().foreach { line => - logInfo(line) + if (dumpOutput) { + logInfo(line) + } outputLines += line }) assert(proc.waitFor(timeout, TimeUnit.SECONDS), diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index c33875243c598..1854bb838c4ac 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -36,18 +36,18 @@ private[spark] object Minikube extends Logging { private val MINIKUBE_PATH = ".minikube" def logVersion(): Unit = { - logInfo(executeMinikube("version").mkString("\n")) + logInfo(executeMinikube(true, "version").mkString("\n")) } def getMinikubeIp: String = { - val outputs = executeMinikube("ip") + val outputs = executeMinikube(true, "ip") .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) assert(outputs.size == 1, "Unexpected amount of output from minikube ip") outputs.head } def getMinikubeStatus: MinikubeStatus.Value = { - val statusString = executeMinikube("status") + val statusString = executeMinikube(true, "status") logInfo(s"Minikube status command output:\n$statusString") // up to minikube version v0.30.0 use this to check for minikube status val oldMinikube = statusString @@ -74,7 +74,7 @@ private[spark] object Minikube extends Logging { "" } else { // For Minikube >=1.9 - Paths.get("profiles", executeMinikube("profile")(0)).toString + Paths.get("profiles", executeMinikube(true, "profile")(0)).toString } val apiServerCertPath = Paths.get(minikubeBasePath, profileDir, "apiserver.crt") val apiServerKeyPath = Paths.get(minikubeBasePath, profileDir, "apiserver.key") @@ -126,18 +126,22 @@ private[spark] object Minikube extends Logging { } } - def executeMinikube(action: String, args: String*): Seq[String] = { + def executeMinikube(logOutput: Boolean, action: String, args: String*): Seq[String] = { ProcessUtils.executeProcess( Array("bash", "-c", s"MINIKUBE_IN_STYLE=true minikube $action ${args.mkString(" ")}"), - MINIKUBE_STARTUP_TIMEOUT_SECONDS).filter{x => + MINIKUBE_STARTUP_TIMEOUT_SECONDS, dumpOutput = logOutput).filter{x => !x.contains("There is a newer version of minikube") && !x.contains("https://github.com/kubernetes") } } def minikubeServiceAction(args: String*): String = { - executeMinikube("service", args: _*).head + executeMinikube(true, "service", args: _*).head } + + def describePods(labels: String): Seq[String] = + Minikube.executeMinikube(false, "kubectl", "--", "describe", "pods", "--all-namespaces", + "-l", labels) } private[spark] object MinikubeStatus extends Enumeration {