Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/spark-pi-driver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
- --class
- org.apache.spark.examples.SparkPi
- --master
- armada://192.168.1.167:50051
- armada://armada-server.armada.svc.cluster.local:50051
- --conf
- "spark.driver.port=7078"
- --conf
Expand Down
6 changes: 6 additions & 0 deletions resource-managers/armada/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.armadaproject.armada</groupId>
<artifactId>scala-armada-client_2.13</artifactId>
<version>0.1.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ private[spark] class ArmadaClusterManager extends ExternalClusterManager with Lo
new ArmadaClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc,
new ArmadaClient, // FIXME
schedulerExecutorService)
schedulerExecutorService,
masterURL)
// snapshotsStore,
// executorPodsAllocator,
// executorPodsLifecycleEventHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@
*/
package org.apache.spark.scheduler.cluster.armada

import java.util.concurrent.{ScheduledExecutorService}
import java.util.concurrent.ScheduledExecutorService

import scala.collection.mutable.HashMap

import io.armadaproject.armada.ArmadaClient
import k8s.io.api.core.v1.generated.{Container, PodSpec, ResourceRequirements}
import k8s.io.api.core.v1.generated.{EnvVar, EnvVarSource, ObjectFieldSelector}
import k8s.io.apimachinery.pkg.api.resource.generated.Quantity

import org.apache.spark.SparkContext
import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
import org.apache.spark.scheduler.{ExecutorDecommission, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}

// FIXME: Actually import ArmadaClient
class ArmadaClient {}

// TODO: Implement for Armada
private[spark] class ArmadaClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
armadaClient: ArmadaClient,
executorService: ScheduledExecutorService)
executorService: ScheduledExecutorService,
masterURL: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

// FIXME
Expand All @@ -45,7 +48,78 @@ private[spark] class ArmadaClusterSchedulerBackend(
conf.getOption("spark.app.id").getOrElse(appId)
}

override def start(): Unit = {}

private def submitJob(): Unit = {

val urlArray = masterURL.split(":")
// Remove leading "/"'s
val host = if (urlArray(1).startsWith("/")) urlArray(1).substring(2) else urlArray(1)
val port = urlArray(2).toInt

val driverAddr = sys.env("SPARK_DRIVER_BIND_ADDRESS")


val driverURL = s"spark://CoarseGrainedScheduler@$driverAddr:7078"
val source = EnvVarSource().withFieldRef(ObjectFieldSelector()
.withApiVersion("v1").withFieldPath("status.podIP"))
val envVars = Seq(
EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"),
EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"),
EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"),
EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"),
EnvVar().withName("SPARK_EXECUTOR_CORES").withValue("1"),
EnvVar().withName("SPARK_EXECUTOR_MEMORY").withValue("512m"),
EnvVar().withName("SPARK_DRIVER_URL").withValue(driverURL),
EnvVar().withName("SPARK_EXECUTOR_POD_IP").withValueFrom(source)
)
val executorContainer = Container()
.withName("spark-executor")
.withImagePullPolicy("IfNotPresent")
.withImage("spark:testing")
.withEnv(envVars)
.withCommand(Seq("/opt/entrypoint.sh"))
.withArgs(
Seq(
"executor"
)
)
.withResources(
ResourceRequirements(
limits = Map(
"memory" -> Quantity(Option("1000Mi")),
"cpu" -> Quantity(Option("100m"))
),
requests = Map(
"memory" -> Quantity(Option("1000Mi")),
"cpu" -> Quantity(Option("100m"))
)
)
)

val podSpec = PodSpec()
.withTerminationGracePeriodSeconds(0)
.withRestartPolicy("Never")
.withContainers(Seq(executorContainer))

val testJob = api.submit
.JobSubmitRequestItem()
.withPriority(0)
.withNamespace("default")
.withPodSpec(podSpec)

val jobSubmitResponse = ArmadaClient(host, port)
.SubmitJobs("test", "executor", Seq(testJob))

logInfo(s"Driver Job Submit Response")
for (respItem <- jobSubmitResponse.jobResponseItems) {
logInfo(s"JobID: ${respItem.jobId} Error: ${respItem.error} ")

}
}
override def start(): Unit = {
submitJob()
}

override def stop(): Unit = {}

/*
Expand Down Expand Up @@ -100,7 +174,7 @@ private[spark] class ArmadaClusterSchedulerBackend(
execIDRequester -= rpcAddress
// Expected, executors re-establish a connection with an ID
case _ =>
logDebug(s"No executor found for ${rpcAddress}")
logDebug(s"No executor found for $rpcAddress")
}
}
}
Expand Down
Loading