From 909ef14979195fe8bfde66c942eecd73c303b781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Thu, 25 Mar 2021 16:59:00 +0100 Subject: [PATCH 1/3] Initial upload --- .../k8s/integrationtest/KubernetesSuite.scala | 4 ++++ .../k8s/integrationtest/ProcessUtils.scala | 5 ++++- .../backend/minikube/Minikube.scala | 18 +++++++++++------- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 174030dcd0d84..ec97ec0df2be5 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -37,6 +37,7 @@ import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -74,6 +75,9 @@ class KubernetesSuite extends SparkFunSuite protected override def logForFailedTest(): Unit = { logInfo("\n\n===== EXTRA LOGS FOR THE FAILED TEST\n") + logInfo("BEGIN driver DESCRIBE POD\n" + + Minikube.describePods(s"spark-app-locator=$appLocator,spark-role=driver").mkString("\n")) + logInfo("END driver DESCRIBE POD") val driverPodOption = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index cc05990893e36..e259979ad0329 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -33,6 +33,7 @@ object ProcessUtils extends Logging { def executeProcess( fullCommand: Array[String], timeout: Long, + dumpOutput: Boolean = true, dumpErrors: Boolean = true, env: Map[String, String] = Map.empty[String, String]): Seq[String] = { val pb = new ProcessBuilder().command(fullCommand: _*) @@ -42,7 +43,9 @@ object ProcessUtils extends Logging { val outputLines = new ArrayBuffer[String] Utils.tryWithResource(proc.getInputStream)( Source.fromInputStream(_, StandardCharsets.UTF_8.name()).getLines().foreach { line => - logInfo(line) + if (dumpOutput) { + logInfo(line) + } outputLines += line }) assert(proc.waitFor(timeout, TimeUnit.SECONDS), diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index c33875243c598..f3289014610fe 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -36,18 +36,18 @@ private[spark] object Minikube extends Logging { private val MINIKUBE_PATH = ".minikube" def logVersion(): Unit = { - logInfo(executeMinikube("version").mkString("\n")) + logInfo(executeMinikube(true, "version").mkString("\n")) } def getMinikubeIp: String = { - val outputs = executeMinikube("ip") + val outputs = executeMinikube(true, "ip") .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) assert(outputs.size == 1, "Unexpected amount of output from minikube ip") outputs.head } def getMinikubeStatus: MinikubeStatus.Value = { - val statusString = executeMinikube("status") + val statusString = executeMinikube(true, "status") logInfo(s"Minikube status command output:\n$statusString") // up to minikube version v0.30.0 use this to check for minikube status val oldMinikube = statusString @@ -74,7 +74,7 @@ private[spark] object Minikube extends Logging { "" } else { // For Minikube >=1.9 - Paths.get("profiles", executeMinikube("profile")(0)).toString + Paths.get("profiles", executeMinikube(true, "profile")(0)).toString } val apiServerCertPath = Paths.get(minikubeBasePath, profileDir, "apiserver.crt") val apiServerKeyPath = Paths.get(minikubeBasePath, profileDir, "apiserver.key") @@ -126,17 +126,21 @@ private[spark] object Minikube extends Logging { } } - def executeMinikube(action: String, args: String*): Seq[String] = { + def describePods(labels:String): Seq[String] = + Minikube.executeMinikube(false, "kubectl", "--", "describe", "pods", "--all-namespaces", + "-l", labels) + + def executeMinikube(logOutput: Boolean, action: String, args: String*): Seq[String] = { ProcessUtils.executeProcess( Array("bash", "-c", s"MINIKUBE_IN_STYLE=true minikube $action ${args.mkString(" ")}"), - MINIKUBE_STARTUP_TIMEOUT_SECONDS).filter{x => + MINIKUBE_STARTUP_TIMEOUT_SECONDS, dumpOutput = logOutput).filter{x => !x.contains("There is a newer version of minikube") && !x.contains("https://github.com/kubernetes") } } def minikubeServiceAction(args: String*): String = { - executeMinikube("service", args: _*).head + executeMinikube(true, "service", args: _*).head } } From a92cca8a02b69a9401b794569126b78549901bc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Thu, 25 Mar 2021 17:04:45 +0100 Subject: [PATCH 2/3] fix typo --- .../deploy/k8s/integrationtest/backend/minikube/Minikube.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index f3289014610fe..7f2ac15bef5c7 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -126,7 +126,7 @@ private[spark] object Minikube extends Logging { } } - def describePods(labels:String): Seq[String] = + def describePods(labels: String): Seq[String] = Minikube.executeMinikube(false, "kubectl", "--", "describe", "pods", "--all-namespaces", "-l", labels) From 5ca3309af67f1dfb7e2e0d2cce491a6a53fe07c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Fri, 26 Mar 2021 08:35:31 +0100 Subject: [PATCH 3/3] applying review comments --- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 6 +++--- .../k8s/integrationtest/backend/minikube/Minikube.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index ec97ec0df2be5..375743ba13fff 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -75,9 +75,9 @@ class KubernetesSuite extends SparkFunSuite protected override def logForFailedTest(): Unit = { logInfo("\n\n===== EXTRA LOGS FOR THE FAILED TEST\n") - logInfo("BEGIN driver DESCRIBE POD\n" + - Minikube.describePods(s"spark-app-locator=$appLocator,spark-role=driver").mkString("\n")) - logInfo("END driver DESCRIBE POD") + logInfo("BEGIN DESCRIBE PODS for application\n" + + Minikube.describePods(s"spark-app-locator=$appLocator").mkString("\n")) + logInfo("END DESCRIBE PODS for the application") val driverPodOption = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index 7f2ac15bef5c7..1854bb838c4ac 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -126,10 +126,6 @@ private[spark] object Minikube extends Logging { } } - def describePods(labels: String): Seq[String] = - Minikube.executeMinikube(false, "kubectl", "--", "describe", "pods", "--all-namespaces", - "-l", labels) - def executeMinikube(logOutput: Boolean, action: String, args: String*): Seq[String] = { ProcessUtils.executeProcess( Array("bash", "-c", s"MINIKUBE_IN_STYLE=true minikube $action ${args.mkString(" ")}"), @@ -142,6 +138,10 @@ private[spark] object Minikube extends Logging { def minikubeServiceAction(args: String*): String = { executeMinikube(true, "service", args: _*).head } + + def describePods(labels: String): Seq[String] = + Minikube.executeMinikube(false, "kubectl", "--", "describe", "pods", "--all-namespaces", + "-l", labels) } private[spark] object MinikubeStatus extends Enumeration {