From 6b1d11b879413ddd21eb788ed481ded222b3f1f9 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Wed, 5 Feb 2025 15:09:56 -0800 Subject: [PATCH 1/5] first attempt --- examples/spark-pi-driver.yaml | 2 +- resource-managers/armada/core/pom.xml | 6 ++ .../cluster/armada/ArmadaClusterManager.scala | 5 +- .../armada/ArmadaClusterManagerBackend.scala | 90 +++++++++++++++++-- .../k8s/KubernetesExecutorBackend.scala | 1 + 5 files changed, 95 insertions(+), 9 deletions(-) diff --git a/examples/spark-pi-driver.yaml b/examples/spark-pi-driver.yaml index 7d7772b907543..edb204033878e 100644 --- a/examples/spark-pi-driver.yaml +++ b/examples/spark-pi-driver.yaml @@ -23,7 +23,7 @@ - --class - org.apache.spark.examples.SparkPi - --master - - armada://192.168.1.167:50051 + - armada://armada-server.armada.svc.cluster.local:50051 - --conf - "spark.driver.port=7078" - --conf diff --git a/resource-managers/armada/core/pom.xml b/resource-managers/armada/core/pom.xml index 66fd938f150d0..b602c093227ed 100644 --- a/resource-managers/armada/core/pom.xml +++ b/resource-managers/armada/core/pom.xml @@ -93,6 +93,12 @@ ${project.version} + + io.armadaproject.armada + scala-armada-client_2.13 + 0.1.0-SNAPSHOT + + org.apache.spark spark-core_${scala.binary.version} diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala index 090a2f2672668..b4e695c3aa954 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala @@ -43,6 +43,7 @@ private[spark] class ArmadaClusterManager extends ExternalClusterManager with Lo override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { val maxTaskFailures = 1 + logInfo("gbj20 cluster manager") new TaskSchedulerImpl(sc, maxTaskFailures, isLocal(sc.conf)) } @@ -106,8 +107,8 @@ private[spark] class ArmadaClusterManager extends ExternalClusterManager with Lo new ArmadaClusterSchedulerBackend( scheduler.asInstanceOf[TaskSchedulerImpl], sc, - new ArmadaClient, // FIXME - schedulerExecutorService) + schedulerExecutorService, + masterURL) // snapshotsStore, // executorPodsAllocator, // executorPodsLifecycleEventHandler, diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 9cf707ddea111..605541bbb8319 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -20,20 +20,24 @@ import java.util.concurrent.{ScheduledExecutorService} import scala.collection.mutable.HashMap +import io.armadaproject.armada.ArmadaClient +import io.grpc.ManagedChannelBuilder +import k8s.io.api.core.v1.generated.{Container, PodSpec, ResourceRequirements} +import k8s.io.api.core.v1.generated.{EnvVar, EnvVarSource, ObjectFieldSelector} +import k8s.io.apimachinery.pkg.api.resource.generated.Quantity + import org.apache.spark.SparkContext import org.apache.spark.rpc.{RpcAddress, RpcCallContext} import org.apache.spark.scheduler.{ExecutorDecommission, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} -// FIXME: Actually import ArmadaClient -class ArmadaClient {} // TODO: Implement for Armada private[spark] class ArmadaClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, - armadaClient: ArmadaClient, - executorService: ScheduledExecutorService) + executorService: ScheduledExecutorService, + masterURL: String) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { // FIXME @@ -45,7 +49,80 @@ private[spark] class ArmadaClusterSchedulerBackend( conf.getOption("spark.app.id").getOrElse(appId) } - override def start(): Unit = {} + + def submitJob(): Unit = { + + var urlArray = masterURL.split(":") + // Remove leading "/"'s + val host = if (urlArray(1).startsWith("/")) urlArray(1).substring(2) else urlArray(1) + val port = urlArray(2).toInt + + val driverAddr = sys.env("SPARK_DRIVER_BIND_ADDRESS") + + + val driverURL = s"spark://CoarseGrainedScheduler@${driverAddr}:7078" + val source = EnvVarSource().withFieldRef(ObjectFieldSelector() + .withApiVersion("v1").withFieldPath("status.podIP")) + val envVars = Seq( + new EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"), + new EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"), + new EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"), + new EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"), + new EnvVar().withName("SPARK_EXECUTOR_CORES").withValue("1"), + new EnvVar().withName("SPARK_EXECUTOR_MEMORY").withValue("512m"), + new EnvVar().withName("SPARK_DRIVER_URL").withValue(driverURL), + new EnvVar().withName("SPARK_EXECUTOR_POD_IP").withValueFrom(source) + ) + val executorContainer = Container() + .withName("spark-executor") + .withImagePullPolicy("IfNotPresent") + .withImage("spark:testing") + .withEnv(envVars) + .withCommand(Seq("/opt/entrypoint.sh")) + .withArgs( + Seq( + "executor" + ) + ) + .withResources( + ResourceRequirements( + limits = Map( + "memory" -> Quantity(Option("1000Mi")), + "cpu" -> Quantity(Option("100m")) + ), + requests = Map( + "memory" -> Quantity(Option("1000Mi")), + "cpu" -> Quantity(Option("100m")) + ) + ) + ) + + val podSpec = PodSpec() + .withTerminationGracePeriodSeconds(0) + .withRestartPolicy("Never") + .withContainers(Seq(executorContainer)) + + val testJob = api.submit + .JobSubmitRequestItem() + .withPriority(0) + .withNamespace("default") + .withPodSpec(podSpec) + + val channel = + ManagedChannelBuilder.forAddress(host, port).usePlaintext().build() + + val jobSubmitResponse = new ArmadaClient(channel).SubmitJobs("test", "executor", Seq(testJob)) + + logInfo(s"Job Submit Response") + for (respItem <- jobSubmitResponse.jobResponseItems) { + logInfo(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") + + } + } + override def start(): Unit = { + submitJob() + } + override def stop(): Unit = {} /* @@ -65,6 +142,7 @@ private[spark] class ArmadaClusterSchedulerBackend( } override def createDriverEndpoint(): DriverEndpoint = { + logInfo("gbj20 driver endpoint") new ArmadaDriverEndpoint() } @@ -84,7 +162,7 @@ private[spark] class ArmadaClusterSchedulerBackend( executorsPendingDecommission.get(id) match { case Some(host) => // We don't pass through the host because by convention the - // host is only populated if the entire host is going away + // host is only populated if the entire host is going away // and we don't know if that's the case or just one container. removeExecutor(id, ExecutorDecommission(None)) case _ => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index e44d7e29ef606..13e60bd90a193 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -193,6 +193,7 @@ private[spark] object KubernetesExecutorBackend extends Logging { printUsageAndExit(classNameForEntry) } } + log.info("gbj20 executor") if (hostname == null) { hostname = Utils.localHostName() From 01cab78dd61c8ec92dff5c9b638062e4d0840f51 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Wed, 5 Feb 2025 15:16:53 -0800 Subject: [PATCH 2/5] updated for latest client --- .../cluster/armada/ArmadaClusterManagerBackend.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 605541bbb8319..3294ba6f5cd28 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{ScheduledExecutorService} import scala.collection.mutable.HashMap import io.armadaproject.armada.ArmadaClient -import io.grpc.ManagedChannelBuilder import k8s.io.api.core.v1.generated.{Container, PodSpec, ResourceRequirements} import k8s.io.api.core.v1.generated.{EnvVar, EnvVarSource, ObjectFieldSelector} import k8s.io.apimachinery.pkg.api.resource.generated.Quantity @@ -108,10 +107,8 @@ private[spark] class ArmadaClusterSchedulerBackend( .withNamespace("default") .withPodSpec(podSpec) - val channel = - ManagedChannelBuilder.forAddress(host, port).usePlaintext().build() - - val jobSubmitResponse = new ArmadaClient(channel).SubmitJobs("test", "executor", Seq(testJob)) + val jobSubmitResponse = ArmadaClient(host, port) + .SubmitJobs("test", "executor", Seq(testJob)) logInfo(s"Job Submit Response") for (respItem <- jobSubmitResponse.jobResponseItems) { @@ -162,7 +159,7 @@ private[spark] class ArmadaClusterSchedulerBackend( executorsPendingDecommission.get(id) match { case Some(host) => // We don't pass through the host because by convention the - // host is only populated if the entire host is going away + // host is only populated if the entire host is going away // and we don't know if that's the case or just one container. removeExecutor(id, ExecutorDecommission(None)) case _ => From e0f00b24be3b7eae4619949e38ed481c9b11194b Mon Sep 17 00:00:00 2001 From: George Jahad Date: Wed, 5 Feb 2025 15:23:43 -0800 Subject: [PATCH 3/5] cleanup --- .../armada/ArmadaClusterManagerBackend.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 3294ba6f5cd28..beed755535c01 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -63,14 +63,14 @@ private[spark] class ArmadaClusterSchedulerBackend( val source = EnvVarSource().withFieldRef(ObjectFieldSelector() .withApiVersion("v1").withFieldPath("status.podIP")) val envVars = Seq( - new EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"), - new EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"), - new EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"), - new EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"), - new EnvVar().withName("SPARK_EXECUTOR_CORES").withValue("1"), - new EnvVar().withName("SPARK_EXECUTOR_MEMORY").withValue("512m"), - new EnvVar().withName("SPARK_DRIVER_URL").withValue(driverURL), - new EnvVar().withName("SPARK_EXECUTOR_POD_IP").withValueFrom(source) + EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"), + EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"), + EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"), + EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"), + EnvVar().withName("SPARK_EXECUTOR_CORES").withValue("1"), + EnvVar().withName("SPARK_EXECUTOR_MEMORY").withValue("512m"), + EnvVar().withName("SPARK_DRIVER_URL").withValue(driverURL), + EnvVar().withName("SPARK_EXECUTOR_POD_IP").withValueFrom(source) ) val executorContainer = Container() .withName("spark-executor") From e24ff418197f113382b074410bd83216cacff4fb Mon Sep 17 00:00:00 2001 From: George Jahad Date: Wed, 5 Feb 2025 15:25:28 -0800 Subject: [PATCH 4/5] cleanup --- .../spark/scheduler/cluster/armada/ArmadaClusterManager.scala | 1 - .../scheduler/cluster/armada/ArmadaClusterManagerBackend.scala | 1 - .../spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala | 1 - 3 files changed, 3 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala index b4e695c3aa954..a34bc47d31d9e 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala @@ -43,7 +43,6 @@ private[spark] class ArmadaClusterManager extends ExternalClusterManager with Lo override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { val maxTaskFailures = 1 - logInfo("gbj20 cluster manager") new TaskSchedulerImpl(sc, maxTaskFailures, isLocal(sc.conf)) } diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index beed755535c01..ac181a96f0b9b 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -139,7 +139,6 @@ private[spark] class ArmadaClusterSchedulerBackend( } override def createDriverEndpoint(): DriverEndpoint = { - logInfo("gbj20 driver endpoint") new ArmadaDriverEndpoint() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index 13e60bd90a193..e44d7e29ef606 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -193,7 +193,6 @@ private[spark] object KubernetesExecutorBackend extends Logging { printUsageAndExit(classNameForEntry) } } - log.info("gbj20 executor") if (hostname == null) { hostname = Utils.localHostName() From a1b86f926cdce0c553f14751b1ffbb4a32790bd3 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Thu, 6 Feb 2025 07:38:11 -0800 Subject: [PATCH 5/5] cleanup --- .../cluster/armada/ArmadaClusterManagerBackend.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index ac181a96f0b9b..6f815b0898b43 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.scheduler.cluster.armada -import java.util.concurrent.{ScheduledExecutorService} +import java.util.concurrent.ScheduledExecutorService import scala.collection.mutable.HashMap @@ -49,9 +49,9 @@ private[spark] class ArmadaClusterSchedulerBackend( } - def submitJob(): Unit = { + private def submitJob(): Unit = { - var urlArray = masterURL.split(":") + val urlArray = masterURL.split(":") // Remove leading "/"'s val host = if (urlArray(1).startsWith("/")) urlArray(1).substring(2) else urlArray(1) val port = urlArray(2).toInt @@ -59,7 +59,7 @@ private[spark] class ArmadaClusterSchedulerBackend( val driverAddr = sys.env("SPARK_DRIVER_BIND_ADDRESS") - val driverURL = s"spark://CoarseGrainedScheduler@${driverAddr}:7078" + val driverURL = s"spark://CoarseGrainedScheduler@$driverAddr:7078" val source = EnvVarSource().withFieldRef(ObjectFieldSelector() .withApiVersion("v1").withFieldPath("status.podIP")) val envVars = Seq( @@ -110,7 +110,7 @@ private[spark] class ArmadaClusterSchedulerBackend( val jobSubmitResponse = ArmadaClient(host, port) .SubmitJobs("test", "executor", Seq(testJob)) - logInfo(s"Job Submit Response") + logInfo(s"Driver Job Submit Response") for (respItem <- jobSubmitResponse.jobResponseItems) { logInfo(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") @@ -174,7 +174,7 @@ private[spark] class ArmadaClusterSchedulerBackend( execIDRequester -= rpcAddress // Expected, executors re-establish a connection with an ID case _ => - logDebug(s"No executor found for ${rpcAddress}") + logDebug(s"No executor found for $rpcAddress") } } }