diff --git a/examples/spark-pi-driver.yaml b/examples/spark-pi-driver.yaml index 7d7772b907543..edb204033878e 100644 --- a/examples/spark-pi-driver.yaml +++ b/examples/spark-pi-driver.yaml @@ -23,7 +23,7 @@ - --class - org.apache.spark.examples.SparkPi - --master - - armada://192.168.1.167:50051 + - armada://armada-server.armada.svc.cluster.local:50051 - --conf - "spark.driver.port=7078" - --conf diff --git a/resource-managers/armada/core/pom.xml b/resource-managers/armada/core/pom.xml index 66fd938f150d0..b602c093227ed 100644 --- a/resource-managers/armada/core/pom.xml +++ b/resource-managers/armada/core/pom.xml @@ -93,6 +93,12 @@ ${project.version} + + io.armadaproject.armada + scala-armada-client_2.13 + 0.1.0-SNAPSHOT + + org.apache.spark spark-core_${scala.binary.version} diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala index 090a2f2672668..a34bc47d31d9e 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala @@ -106,8 +106,8 @@ private[spark] class ArmadaClusterManager extends ExternalClusterManager with Lo new ArmadaClusterSchedulerBackend( scheduler.asInstanceOf[TaskSchedulerImpl], sc, - new ArmadaClient, // FIXME - schedulerExecutorService) + schedulerExecutorService, + masterURL) // snapshotsStore, // executorPodsAllocator, // executorPodsLifecycleEventHandler, diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 9cf707ddea111..6f815b0898b43 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -16,24 +16,27 @@ */ package org.apache.spark.scheduler.cluster.armada -import java.util.concurrent.{ScheduledExecutorService} +import java.util.concurrent.ScheduledExecutorService import scala.collection.mutable.HashMap +import io.armadaproject.armada.ArmadaClient +import k8s.io.api.core.v1.generated.{Container, PodSpec, ResourceRequirements} +import k8s.io.api.core.v1.generated.{EnvVar, EnvVarSource, ObjectFieldSelector} +import k8s.io.apimachinery.pkg.api.resource.generated.Quantity + import org.apache.spark.SparkContext import org.apache.spark.rpc.{RpcAddress, RpcCallContext} import org.apache.spark.scheduler.{ExecutorDecommission, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} -// FIXME: Actually import ArmadaClient -class ArmadaClient {} // TODO: Implement for Armada private[spark] class ArmadaClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, - armadaClient: ArmadaClient, - executorService: ScheduledExecutorService) + executorService: ScheduledExecutorService, + masterURL: String) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { // FIXME @@ -45,7 +48,78 @@ private[spark] class ArmadaClusterSchedulerBackend( conf.getOption("spark.app.id").getOrElse(appId) } - override def start(): Unit = {} + + private def submitJob(): Unit = { + + val urlArray = masterURL.split(":") + // Remove leading "/"'s + val host = if (urlArray(1).startsWith("/")) urlArray(1).substring(2) else urlArray(1) + val port = urlArray(2).toInt + + val driverAddr = sys.env("SPARK_DRIVER_BIND_ADDRESS") + + + val driverURL = s"spark://CoarseGrainedScheduler@$driverAddr:7078" + val source = EnvVarSource().withFieldRef(ObjectFieldSelector() + .withApiVersion("v1").withFieldPath("status.podIP")) + val envVars = Seq( + EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"), + EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"), + EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"), + EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"), + EnvVar().withName("SPARK_EXECUTOR_CORES").withValue("1"), + EnvVar().withName("SPARK_EXECUTOR_MEMORY").withValue("512m"), + EnvVar().withName("SPARK_DRIVER_URL").withValue(driverURL), + EnvVar().withName("SPARK_EXECUTOR_POD_IP").withValueFrom(source) + ) + val executorContainer = Container() + .withName("spark-executor") + .withImagePullPolicy("IfNotPresent") + .withImage("spark:testing") + .withEnv(envVars) + .withCommand(Seq("/opt/entrypoint.sh")) + .withArgs( + Seq( + "executor" + ) + ) + .withResources( + ResourceRequirements( + limits = Map( + "memory" -> Quantity(Option("1000Mi")), + "cpu" -> Quantity(Option("100m")) + ), + requests = Map( + "memory" -> Quantity(Option("1000Mi")), + "cpu" -> Quantity(Option("100m")) + ) + ) + ) + + val podSpec = PodSpec() + .withTerminationGracePeriodSeconds(0) + .withRestartPolicy("Never") + .withContainers(Seq(executorContainer)) + + val testJob = api.submit + .JobSubmitRequestItem() + .withPriority(0) + .withNamespace("default") + .withPodSpec(podSpec) + + val jobSubmitResponse = ArmadaClient(host, port) + .SubmitJobs("test", "executor", Seq(testJob)) + + logInfo(s"Driver Job Submit Response") + for (respItem <- jobSubmitResponse.jobResponseItems) { + logInfo(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") + + } + } + override def start(): Unit = { + submitJob() + } + override def stop(): Unit = {} /* @@ -100,7 +174,7 @@ private[spark] class ArmadaClusterSchedulerBackend( execIDRequester -= rpcAddress // Expected, executors re-establish a connection with an ID case _ => - logDebug(s"No executor found for ${rpcAddress}") + logDebug(s"No executor found for $rpcAddress") } } }