diff --git a/.github/workflows/armada.yml b/.github/workflows/armada.yml
new file mode 100644
index 000000000000..f45d3d50559c
--- /dev/null
+++ b/.github/workflows/armada.yml
@@ -0,0 +1,51 @@
+name: Aramda
+
+on:
+ pull_request:
+
+jobs:
+ armada:
+ name: Armada integration
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ path: spark-armada
+ - uses: actions/checkout@v4
+ with:
+ repository: armadaproject/armada-operator
+ path: armada-operator
+ - run: |
+ cd spark-armada
+ ./build/sbt package -Parmada -Pkubernetes
+ ./bin/docker-image-tool.sh -t testing build
+ docker image save -o ../spark_testing.tar spark:testing
+ cd ..
+
+ cd armada-operator
+ make kind-all
+ ./bin/tooling/kind load image-archive ../spark_testing.tar --name armada
+
+ # sleep a bit, or we see: create queue request failed: rpc error: code = DeadlineExceeded
+ sleep 60
+
+ ./bin/app/armadactl create queue test
+
+ # sleep a bit, or we see: rpc error: code = PermissionDenied desc = could not find queue "test"
+ sleep 60
+
+ ./bin/app/armadactl submit ../spark-armada/examples/spark-driver-job.yaml
+ ./bin/app/armadactl submit ../spark-armada/examples/spark-executor-job.yaml
+
+ # wait for the jobs to start
+ sleep 60
+
+ # inspect jobs
+ kubectl get pods
+ for pod in $(kubectl get pods | grep armada | cut -d " " -f 1)
+ do
+ echo "$pod"
+ kubectl logs pod/$pod
+ echo
+ done
+
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 990ebdef50d3..d44333e3acab 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -237,6 +237,16 @@
+
+ armada
+
+
+ org.apache.spark
+ spark-armada_${scala.binary.version}
+ ${project.version}
+
+
+
hive
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 49b2c86e73cd..43db943a05fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -260,9 +260,10 @@ private[spark] class SparkSubmit extends Logging {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("k8s") => KUBERNETES
+ case m if m.startsWith("armada") => ARMADA
case m if m.startsWith("local") => LOCAL
case _ =>
- error("Master must either be yarn or start with spark, k8s, or local")
+ error("Master must either be yarn or start with spark, k8s, armada, or local")
-1
}
case None => LOCAL // default master or remote mode.
@@ -296,6 +297,15 @@ private[spark] class SparkSubmit extends Logging {
}
}
+ if (clusterManager == ARMADA) {
+ printMessage(s"Armada selected as cluster manager.")
+ if (!Utils.classIsLoadable(ARMADA_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
+ error(
+ s"Could not load ARMADA class \"${ARMADA_CLUSTER_SUBMIT_CLASS}\". " +
+ "This copy of Spark may not have been compiled with ARMADA support.")
+ }
+ }
+
// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
@@ -329,6 +339,8 @@ private[spark] class SparkSubmit extends Logging {
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
+ // TODO: does client/cluster mode matter here?
+ val isArmada = clusterManager == ARMADA
val isCustomClasspathInClusterModeDisallowed =
!sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) &&
args.proxyUser != null &&
@@ -416,6 +428,7 @@ private[spark] class SparkSubmit extends Logging {
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
+ // TODO: May have to do the same/similar for Armada
if (isKubernetesClusterModeDriver) {
// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
// in cluster mode, the archives should be available in the driver's current working
@@ -670,6 +683,7 @@ private[spark] class SparkSubmit extends Logging {
confKey = KEYTAB.key),
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
+ // TODO: Add Armada where appropriate.
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | KUBERNETES,
CLUSTER, confKey = JAR_PACKAGES.key),
@@ -864,6 +878,12 @@ private[spark] class SparkSubmit extends Logging {
}
}
+ if (isArmada) {
+ // FIXME: Make sure we populate what we need here!
+ childMainClass = ARMADA_CLUSTER_SUBMIT_CLASS
+ childArgs ++= Array("--class", args.mainClass)
+ }
+
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
@@ -1071,7 +1091,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val LOCAL = 8
private val KUBERNETES = 16
- private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES
+ private val ARMADA = 32
+ private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES | ARMADA
// Deploy modes
private val CLIENT = 1
@@ -1095,6 +1116,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
+ private[deploy] val ARMADA_CLUSTER_SUBMIT_CLASS =
+ "org.apache.spark.deploy.armada.submit.ArmadaClientApplication"
override def main(args: Array[String]): Unit = {
Option(System.getenv("SPARK_PREFER_IPV6"))
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 0b84c34ce947..3fc230d24744 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -515,7 +515,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
s"""
|Options:
| --master MASTER_URL spark://host:port, yarn,
- | k8s://https://host:port, or local (Default: local[*]).
+ | k8s://https://host:port, armada://host:port,
+ | or local (Default: local[*]).
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
| on one of the worker machines inside the cluster ("cluster")
| (Default: client).
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 4d75f5d7a1fc..7ac7a94d7e02 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -279,12 +279,10 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
executorStageSummaryWrappers.foreach { exec =>
// only the first executor is expected to be excluded
val expectedExcludedFlag = exec.executorId == execIds.head
- assert(exec.info.isBlacklistedForStage === expectedExcludedFlag)
assert(exec.info.isExcludedForStage === expectedExcludedFlag)
}
check[ExecutorSummaryWrapper](execIds.head) { exec =>
- assert(exec.info.blacklistedInStages === Set(stages.head.stageId))
assert(exec.info.excludedInStages === Set(stages.head.stageId))
}
@@ -306,7 +304,6 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
assert(executorStageSummaryWrappersForNode.nonEmpty)
executorStageSummaryWrappersForNode.foreach { exec =>
// both executor is expected to be excluded
- assert(exec.info.isBlacklistedForStage)
assert(exec.info.isExcludedForStage)
}
@@ -467,7 +464,6 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
}
check[ExecutorSummaryWrapper](execIds.head) { exec =>
- assert(exec.info.blacklistedInStages === Set())
assert(exec.info.excludedInStages === Set())
}
@@ -495,7 +491,6 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
stageAttemptId = stages.last.attemptNumber()))
check[ExecutorSummaryWrapper](execIds.head) { exec =>
- assert(exec.info.blacklistedInStages === Set(stages.last.stageId))
assert(exec.info.excludedInStages === Set(stages.last.stageId))
}
@@ -652,14 +647,12 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
time += 1
listener.onExecutorExcluded(SparkListenerExecutorExcluded(time, "1", 42))
check[ExecutorSummaryWrapper]("1") { exec =>
- assert(exec.info.isBlacklisted)
assert(exec.info.isExcluded)
}
time += 1
listener.onExecutorUnexcluded(SparkListenerExecutorUnexcluded(time, "1"))
check[ExecutorSummaryWrapper]("1") { exec =>
- assert(!exec.info.isBlacklisted)
assert(!exec.info.isExcluded)
}
@@ -667,14 +660,12 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
time += 1
listener.onNodeExcluded(SparkListenerNodeExcluded(time, "1.example.com", 2))
check[ExecutorSummaryWrapper]("1") { exec =>
- assert(exec.info.isBlacklisted)
assert(exec.info.isExcluded)
}
time += 1
listener.onNodeUnexcluded(SparkListenerNodeUnexcluded(time, "1.example.com"))
check[ExecutorSummaryWrapper]("1") { exec =>
- assert(!exec.info.isBlacklisted)
assert(!exec.info.isExcluded)
}
diff --git a/examples/runSparkPi.sh b/examples/runSparkPi.sh
new file mode 100755
index 000000000000..bef732a0fb31
--- /dev/null
+++ b/examples/runSparkPi.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+
+# Start up the driver, get it's ip address, then start the executor with it
+set -e
+
+echo
+echo starting SparkPi driver
+armadactl submit examples/spark-pi-driver.yaml > /tmp/jobid.txt
+JOB_ID=`cat /tmp/jobid.txt | awk '{print $5}'`
+cat /tmp/jobid.txt
+echo
+
+
+echo waiting for SparkPi driver to start
+sleep 20
+
+echo
+echo SparkPi driver ip addr:
+IP_ADDR=`kubectl get pod "armada-$JOB_ID-0" -o jsonpath='{.status.podIP}'`
+echo $IP_ADDR
+echo
+
+echo passing drivers ip addr to executor and starting it
+IP_ADDR=$IP_ADDR envsubst < examples/spark-pi-executor.yaml > /tmp/ex.yaml
+armadactl submit /tmp/ex.yaml
+echo
+
+echo SparkPi driver/executor started
\ No newline at end of file
diff --git a/examples/spark-driver-job.yaml b/examples/spark-driver-job.yaml
new file mode 100644
index 000000000000..66a8a3b9d1c1
--- /dev/null
+++ b/examples/spark-driver-job.yaml
@@ -0,0 +1,31 @@
+ queue: test
+ jobSetId: job-set-1
+ jobs:
+ - namespace: default
+ priority: 0
+ podSpec:
+ terminationGracePeriodSeconds: 0
+ restartPolicy: Never
+ containers:
+ - name: spark-driver
+ image: spark:testing
+ env:
+ - name: SPARK_DRIVER_BIND_ADDRESS
+ value: "0.0.0.0:1234"
+ command:
+ - /opt/entrypoint.sh
+ args:
+ - driver
+ - --verbose
+ - --class
+ - org.apache.spark.examples.LocalPi
+ - --master
+ - armada://192.168.1.167:50051
+ - submit
+ resources:
+ limits:
+ memory: 1Gi
+ cpu: 1
+ requests:
+ memory: 1Gi
+ cpu: 1
diff --git a/examples/spark-executor-job.yaml b/examples/spark-executor-job.yaml
new file mode 100644
index 000000000000..f243c3e54fb5
--- /dev/null
+++ b/examples/spark-executor-job.yaml
@@ -0,0 +1,39 @@
+ queue: test
+ jobSetId: job-set-1
+ jobs:
+ - namespace: default
+ priority: 0
+ podSpec:
+ terminationGracePeriodSeconds: 0
+ restartPolicy: Never
+ containers:
+ - name: spark-executor
+ image: spark:testing
+ env:
+ - name: SPARK_EXECUTOR_MEMORY
+ value: "512m"
+ - name: SPARK_DRIVER_URL
+ value: "spark://localhost:1337"
+ - name: SPARK_EXECUTOR_ID
+ value: "1"
+ - name: SPARK_EXECUTOR_CORES
+ value: "1"
+ - name: SPARK_APPLICATION_ID
+ value: "test_spark_app_id"
+ - name: SPARK_EXECUTOR_POD_IP
+ value: "localhost"
+ - name: SPARK_RESOURCE_PROFILE_ID
+ value: "1"
+ - name: SPARK_EXECUTOR_POD_NAME
+ value: "test-pod-name"
+ command:
+ - /opt/entrypoint.sh
+ args:
+ - executor
+ resources:
+ limits:
+ memory: 1Gi
+ cpu: 1
+ requests:
+ memory: 1Gi
+ cpu: 1
diff --git a/examples/spark-pi-driver.yaml b/examples/spark-pi-driver.yaml
new file mode 100644
index 000000000000..edb204033878
--- /dev/null
+++ b/examples/spark-pi-driver.yaml
@@ -0,0 +1,49 @@
+ queue: test
+ jobSetId: job-set-1
+ jobs:
+ - namespace: default
+ priority: 0
+ podSpec:
+ terminationGracePeriodSeconds: 0
+ restartPolicy: Never
+ containers:
+ - name: spark-driver
+ image: spark:testing
+ env:
+ - name: SPARK_DRIVER_BIND_ADDRESS
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ command:
+ - /opt/entrypoint.sh
+ args:
+ - driver
+ - --verbose
+ - --class
+ - org.apache.spark.examples.SparkPi
+ - --master
+ - armada://armada-server.armada.svc.cluster.local:50051
+ - --conf
+ - "spark.driver.port=7078"
+ - --conf
+ - "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5005"
+ - local:///opt/spark/examples/jars/spark-examples.jar
+ - "100"
+ resources:
+ limits:
+ memory: 1Gi
+ cpu: 1
+ requests:
+ memory: 1Gi
+ cpu: 1
+ ports:
+ - containerPort: 7078
+ name: driver-rpc-port
+ protocol: TCP
+ - containerPort: 7079
+ name: blockmanager
+ protocol: TCP
+ - containerPort: 4040
+ name: spark-ui
+ protocol: TCP
diff --git a/examples/spark-pi-executor.yaml b/examples/spark-pi-executor.yaml
new file mode 100644
index 000000000000..995bd1900b72
--- /dev/null
+++ b/examples/spark-pi-executor.yaml
@@ -0,0 +1,44 @@
+ queue: test
+ jobSetId: job-set-1
+ jobs:
+ - namespace: default
+ priority: 0
+ podSpec:
+ terminationGracePeriodSeconds: 0
+ restartPolicy: Never
+ containers:
+ - name: spark-executor
+ image: spark:testing
+ env:
+ - name: SPARK_EXECUTOR_MEMORY
+ value: "512m"
+ - name: SPARK_DRIVER_URL
+ value: "spark://CoarseGrainedScheduler@${IP_ADDR}:7078"
+ - name: SPARK_EXECUTOR_ID
+ value: "1"
+ - name: SPARK_EXECUTOR_CORES
+ value: "1"
+ - name: SPARK_APPLICATION_ID
+ value: "test_spark_app_id"
+ - name: SPARK_EXECUTOR_POD_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ - name: SPARK_RESOURCE_PROFILE_ID
+ value: "0"
+ - name: SPARK_EXECUTOR_POD_NAME
+ value: "test-pod-name"
+ - name: SPARK_JAVA_OPT_0
+ value: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
+ command:
+ - /opt/entrypoint.sh
+ args:
+ - executor
+ resources:
+ limits:
+ memory: 1Gi
+ cpu: 1
+ requests:
+ memory: 1Gi
+ cpu: 1
diff --git a/pom.xml b/pom.xml
index 769dad270af7..db9aa2cb31f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3427,6 +3427,14 @@
+
+ armada
+
+ resource-managers/armada/core
+
+
+
+
hive-thriftserver
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1d25215590af..214f3a6c7056 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -64,10 +64,10 @@ object BuildCommons {
"tags", "sketch", "kvstore", "common-utils", "variant"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects
- val optionallyEnabledProjects@Seq(kubernetes, yarn,
+ val optionallyEnabledProjects@Seq(armada, kubernetes, yarn,
sparkGangliaLgpl, streamingKinesisAsl, profiler,
dockerIntegrationTests, hadoopCloud, kubernetesIntegrationTests) =
- Seq("kubernetes", "yarn",
+ Seq("armada", "kubernetes", "yarn",
"ganglia-lgpl", "streaming-kinesis-asl", "profiler",
"docker-integration-tests", "hadoop-cloud", "kubernetes-integration-tests").map(ProjectRef(buildLocation, _))
diff --git a/resource-managers/armada/core/README.md b/resource-managers/armada/core/README.md
new file mode 100644
index 000000000000..ce065a1ba1b4
--- /dev/null
+++ b/resource-managers/armada/core/README.md
@@ -0,0 +1,18 @@
+Armada Cluster Manager for Spark
+---
+
+Build
+---
+From the spark repository root:
+```bash
+./build/sbt package -Pkubernetes -Parmada
+```
+You may have to adjust `JAVA_HOME` to suit your environment. Spark requires Java 17 and above. `-Parmada` tells
+sbt to enable the armada project.
+
+Test
+---
+```
+./build/sbt armada/testOnly -Parmada
+```
+Runs all tests!
diff --git a/resource-managers/armada/core/pom.xml b/resource-managers/armada/core/pom.xml
new file mode 100644
index 000000000000..e334221f8ace
--- /dev/null
+++ b/resource-managers/armada/core/pom.xml
@@ -0,0 +1,192 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent_2.13
+ 4.1.0-SNAPSHOT
+ ../../../pom.xml
+
+
+ spark-armada_2.13
+ jar
+ Spark Project Armada
+
+ armada
+
+
+
+
+ volcano
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-volcano-source
+ generate-sources
+
+ add-source
+
+
+
+ volcano/src/main/scala
+
+
+
+
+ add-volcano-test-sources
+ generate-test-sources
+
+ add-test-source
+
+
+
+ volcano/src/test/scala
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+
+
+
+ io.armadaproject.armada
+ scala-armada-client_${scala.binary.version}
+ 0.1.0-SNAPSHOT
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.spark
+ spark-tags_${scala.binary.version}
+ test-jar
+ test
+
+
+
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ net.bytebuddy
+ byte-buddy
+ test
+
+
+ net.bytebuddy
+ byte-buddy-agent
+ test
+
+
+
+ org.jmock
+ jmock-junit5
+
+
+ org.junit.jupiter
+ *
+
+
+ org.junit.platform
+ *
+
+
+ test
+
+
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+
diff --git a/resource-managers/armada/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation b/resource-managers/armada/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation
new file mode 100644
index 000000000000..af10cc12a3ac
--- /dev/null
+++ b/resource-managers/armada/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+io.armadaproject.spark.deploy.submit.ArmadaSubmitOperation
diff --git a/resource-managers/armada/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/resource-managers/armada/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 000000000000..cdf3b501b3eb
--- /dev/null
+++ b/resource-managers/armada/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.scheduler.cluster.armada.ArmadaClusterManager
\ No newline at end of file
diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala
new file mode 100644
index 000000000000..bfc6387ada84
--- /dev/null
+++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.armada.submit
+
+/*
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import scala.util.control.Breaks._
+import scala.util.control.NonFatal
+*/
+
+/*
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
+import io.fabric8.kubernetes.client.Watcher.Action
+*/
+import _root_.io.armadaproject.armada.ArmadaClient
+import k8s.io.api.core.v1.generated.{Container, EnvVar, PodSpec, ResourceRequirements}
+import k8s.io.apimachinery.pkg.api.resource.generated.Quantity
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+
+
+/* import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{APP_ID, APP_NAME, SUBMISSION_ID}
+import org.apache.spark.util.Utils
+*/
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+/*
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String],
+ proxyUser: Option[String])
+
+private[spark] object ClientArguments {
+
+ def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+ var mainAppResource: MainAppResource = JavaMainAppResource(None)
+ var mainClass: Option[String] = None
+ val driverArgs = mutable.ArrayBuffer.empty[String]
+ var proxyUser: Option[String] = None
+
+ args.sliding(2, 2).toList.foreach {
+ case Array("--primary-java-resource", primaryJavaResource: String) =>
+ mainAppResource = JavaMainAppResource(Some(primaryJavaResource))
+ case Array("--primary-py-file", primaryPythonResource: String) =>
+ mainAppResource = PythonMainAppResource(primaryPythonResource)
+ case Array("--primary-r-file", primaryRFile: String) =>
+ mainAppResource = RMainAppResource(primaryRFile)
+ case Array("--main-class", clazz: String) =>
+ mainClass = Some(clazz)
+ case Array("--arg", arg: String) =>
+ driverArgs += arg
+ case Array("--proxy-user", user: String) =>
+ proxyUser = Some(user)
+ case other =>
+ val invalid = other.mkString(" ")
+ throw new RuntimeException(s"Unknown arguments: $invalid")
+ }
+
+ require(mainClass.isDefined, "Main class must be specified via --main-class")
+
+ ClientArguments(
+ mainAppResource,
+ mainClass.get,
+ driverArgs.toArray,
+ proxyUser)
+ }
+}
+*/
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a
+ * watcher that monitors and logs the application status. Waits for the application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param conf The kubernetes driver config.
+ * @param builder Responsible for building the base driver pod based on a composition of
+ * implemented features.
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param watcher a watcher that monitors and logs the application status
+ */
+/* FIXME: Have an Armada Client instead.
+private[spark] class Client(
+ conf: KubernetesDriverConf,
+ builder: KubernetesDriverBuilder,
+ kubernetesClient: KubernetesClient,
+ watcher: LoggingPodStatusWatcher) extends Logging {
+
+ def run(): Unit = {
+ val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
+ val configMapName = KubernetesClientUtils.configMapNameDriver
+ val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,
+ conf.sparkConf, resolvedDriverSpec.systemProperties)
+ val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap +
+ (KUBERNETES_NAMESPACE.key -> conf.namespace))
+
+ // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
+ // Spark command builder to pickup on the Java Options present in the ConfigMap
+ val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)
+ .addNewEnv()
+ .withName(ENV_SPARK_CONF_DIR)
+ .withValue(SPARK_CONF_DIR_INTERNAL)
+ .endEnv()
+ .addNewVolumeMount()
+ .withName(SPARK_CONF_VOLUME_DRIVER)
+ .withMountPath(SPARK_CONF_DIR_INTERNAL)
+ .endVolumeMount()
+ .build()
+ val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
+ .editSpec()
+ .addToContainers(resolvedDriverContainer)
+ .addNewVolume()
+ .withName(SPARK_CONF_VOLUME_DRIVER)
+ .withNewConfigMap()
+ .withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava)
+ .withName(configMapName)
+ .endConfigMap()
+ .endVolume()
+ .endSpec()
+ .build()
+ val driverPodName = resolvedDriverPod.getMetadata.getName
+
+ // setup resources before pod creation
+ val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources
+ try {
+ kubernetesClient.resourceList(preKubernetesResources: _*).forceConflicts().serverSideApply()
+ } catch {
+ case NonFatal(e) =>
+ logError("Please check \"kubectl auth can-i create [resource]\" first." +
+ " It should be yes. And please also check your feature step implementation.")
+ kubernetesClient.resourceList(preKubernetesResources: _*).delete()
+ throw e
+ }
+
+ var watch: Watch = null
+ var createdDriverPod: Pod = null
+ try {
+ createdDriverPod =
+ kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
+ } catch {
+ case NonFatal(e) =>
+ kubernetesClient.resourceList(preKubernetesResources: _*).delete()
+ logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
+ throw e
+ }
+
+ // Refresh all pre-resources' owner references
+ try {
+ addOwnerReference(createdDriverPod, preKubernetesResources)
+ kubernetesClient.resourceList(preKubernetesResources: _*).forceConflicts().serverSideApply()
+ } catch {
+ case NonFatal(e) =>
+ kubernetesClient.pods().resource(createdDriverPod).delete()
+ kubernetesClient.resourceList(preKubernetesResources: _*).delete()
+ throw e
+ }
+
+ // setup resources after pod creation, and refresh all resources' owner references
+ try {
+ val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+ addOwnerReference(createdDriverPod, otherKubernetesResources)
+ kubernetesClient.resourceList(otherKubernetesResources: _*).forceConflicts().serverSideApply()
+ } catch {
+ case NonFatal(e) =>
+ kubernetesClient.pods().resource(createdDriverPod).delete()
+ throw e
+ }
+
+ val sId = Client.submissionId(conf.namespace, driverPodName)
+ if (conf.get(WAIT_FOR_APP_COMPLETION)) {
+ breakable {
+ while (true) {
+ val podWithName = kubernetesClient
+ .pods()
+ .inNamespace(conf.namespace)
+ .withName(driverPodName)
+ // Reset resource to old before we start the watch, this is important for race conditions
+ watcher.reset()
+ watch = podWithName.watch(watcher)
+
+ // Send the latest pod state we know to the watcher to make sure we didn't miss anything
+ watcher.eventReceived(Action.MODIFIED, podWithName.get())
+
+ // Break the while loop if the pod is completed or we don't want to wait
+ if (watcher.watchOrStop(sId)) {
+ watch.close()
+ break()
+ }
+ }
+ }
+ } else {
+ logInfo(log"Deployed Spark application ${MDC(APP_NAME, conf.appName)} with " +
+ log"application ID ${MDC(APP_ID, conf.appId)} and " +
+ log"submission ID ${MDC(SUBMISSION_ID, sId)} into Kubernetes")
+ }
+ }
+}
+*/
+
+private[spark] object Client {
+ def submissionId(namespace: String, driverPodName: String): String = s"$namespace:$driverPodName"
+}
+
+/**
+ * Main class and entry point of application submission in KUBERNETES mode.
+ */
+private[spark] class ArmadaClientApplication extends SparkApplication {
+ // FIXME: Find the real way to log properly.
+ def log(msg: String): Unit = {
+ // scalastyle:off println
+ System.err.println(msg)
+ // scalastyle:on println
+ }
+
+ override def start(args: Array[String], conf: SparkConf): Unit = {
+ log("ArmadaClientApplication.start() called!")
+ run(conf)
+ }
+
+ private def run(sparkConf: SparkConf): Unit = {
+ val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master"))
+ log(s"host is $host, port is $port")
+ var armadaClient = new ArmadaClient(ArmadaClient.GetChannel(host, port))
+ if (armadaClient.SubmitHealth().isServing) {
+ log("Submit health good!")
+ } else {
+ log("Could not contact Armada!")
+ }
+
+
+ // # FIXME: Need to check how this is launched whether to submit a job or
+ // to turn into driver / cluster manager mode.
+ val jobId = submitDriverJob(armadaClient, sparkConf)
+ log(s"Got job ID: $jobId")
+ // For constructing the app ID, we can't use the Spark application name, as the app ID is going
+ // to be added as a label to group resources belonging to the same application. Label values are
+ // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
+ // a unique app ID (captured by spark.app.id) in the format below.
+ /*
+ val kubernetesAppId = KubernetesConf.getKubernetesAppId()
+ val kubernetesConf = KubernetesConf.createDriverConf(
+ sparkConf,
+ kubernetesAppId,
+ clientArguments.mainAppResource,
+ clientArguments.mainClass,
+ clientArguments.driverArgs,
+ clientArguments.proxyUser)
+ // The master URL has been checked for validity already in SparkSubmit.
+ // We just need to get rid of the "k8s://" prefix here.
+ val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
+ val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
+
+ Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
+ master,
+ Some(kubernetesConf.namespace),
+ KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
+ SparkKubernetesClientFactory.ClientType.Submission,
+ sparkConf,
+ None)) { kubernetesClient =>
+ val client = new Client(
+ kubernetesConf,
+ new KubernetesDriverBuilder(),
+ kubernetesClient,
+ watcher)
+ client.run()
+ }
+ */
+ ()
+ }
+
+ private def submitDriverJob(armadaClient: ArmadaClient, conf: SparkConf): String = {
+ val envVars = Seq(
+ EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValue("0.0.0.0:1234")
+ )
+ val driverContainer = Container()
+ .withName("spark-driver")
+ .withImagePullPolicy("IfNotPresent")
+ .withImage("spark:testing")
+ .withEnv(envVars)
+ .withCommand(Seq("/opt/entrypoint.sh"))
+ .withArgs(
+ Seq(
+ "driver",
+ "--verbose",
+ "--class",
+ conf.get("spark.app.name"),
+ "--master",
+ "armada://armada-server.armada.svc.cluster.local:50051",
+ "submit"
+ )
+ )
+ .withResources( // FIXME: What are reasonable requests/limits for spark drivers?
+ ResourceRequirements(
+ limits = Map(
+ "memory" -> Quantity(Option("1Gi")),
+ "cpu" -> Quantity(Option("1"))
+ ),
+ requests = Map(
+ "memory" -> Quantity(Option("1Gi")),
+ "cpu" -> Quantity(Option("1"))
+ )
+ )
+ )
+
+ val podSpec = PodSpec()
+ .withTerminationGracePeriodSeconds(0)
+ .withRestartPolicy("Never")
+ .withContainers(Seq(driverContainer))
+
+ val driverJob = api.submit
+ .JobSubmitRequestItem()
+ .withPriority(0)
+ .withNamespace("personal-anonymous")
+ .withPodSpec(podSpec)
+
+ // FIXME: Plumb config for queue, job-set-id
+ val jobSubmitResponse = armadaClient.SubmitJobs("test", "spark-test-1", Seq(driverJob))
+
+ log(s"Job Submit Response $jobSubmitResponse")
+ for (respItem <- jobSubmitResponse.jobResponseItems) {
+ log(s"JobID: ${respItem.jobId} Error: ${respItem.error} ")
+ }
+ jobSubmitResponse.jobResponseItems(0).jobId
+ }
+}
diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaSubmitOperations.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaSubmitOperations.scala
new file mode 100644
index 000000000000..8286dea7172c
--- /dev/null
+++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaSubmitOperations.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.armada.submit
+
+// import scala.jdk.CollectionConverters._
+
+/*
+import K8SSparkSubmitOperation.getGracePeriod
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+*/
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkSubmitOperation
+// import org.apache.spark.deploy.armada.Config.{ARMADA_SUBMIT_GRACE_PERIOD}
+// import org.apache.spark.deploy.k8s.Constants.{SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL}
+// import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
+import org.apache.spark.util.{CommandLineLoggingUtils/* , Utils */}
+
+private sealed trait ArmadaSubmitOperation extends CommandLineLoggingUtils {
+ // TODO: Tear this out
+}
+
+private class KillApplication extends ArmadaSubmitOperation {
+ // TODO: Fill in.
+}
+
+private class ListStatus extends ArmadaSubmitOperation {
+ // TODO: Fill in.
+}
+
+private[spark] class ArmadaSparkSubmitOperation extends SparkSubmitOperation
+ with CommandLineLoggingUtils {
+
+ private def isGlob(name: String): Boolean = {
+ name.last == '*'
+ }
+
+ def execute(submissionId: String, sparkConf: SparkConf, op: ArmadaSubmitOperation): Unit = {
+ /*
+ val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
+ submissionId.split(":", 2) match {
+ case Array(part1, part2@_*) =>
+ val namespace = if (part2.isEmpty) None else Some(part1)
+ val pName = if (part2.isEmpty) part1 else part2.headOption.get
+ Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
+ master,
+ namespace,
+ KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
+ SparkKubernetesClientFactory.ClientType.Submission,
+ sparkConf,
+ None)
+ ) { kubernetesClient =>
+ implicit val client: KubernetesClient = kubernetesClient
+ if (isGlob(pName)) {
+ val ops = namespace match {
+ case Some(ns) =>
+ kubernetesClient
+ .pods
+ .inNamespace(ns)
+ case None =>
+ kubernetesClient
+ .pods
+ }
+ val pods = ops
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+ .list()
+ .getItems
+ .asScala
+ .filter { pod =>
+ pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
+ }.toList
+ op.executeOnGlob(pods, namespace, sparkConf)
+ } else {
+ op.executeOnPod(pName, namespace, sparkConf)
+ }
+ }
+ case _ =>
+ printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
+ }
+ */
+ printMessage("TODO!! ArmadaSparkSubmitOperation:execute!")
+ ()
+ }
+
+ override def kill(submissionId: String, conf: SparkConf): Unit = {
+ printMessage(s"TODO!! IMPLEMENT!! Armada: Submitting a request to kill submission " +
+ s"${submissionId} in ${conf.get("spark.master")}. ")
+ execute(submissionId, conf, new KillApplication)
+ }
+
+ override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
+ printMessage(s"TODO!! IMPLEMENT!! Armada: Submitting a request for the status of submission" +
+ s" ${submissionId} in ${conf.get("spark.master")}.")
+ execute(submissionId, conf, new ListStatus)
+ }
+
+ override def supports(master: String): Boolean = {
+ master.startsWith("armada://")
+ }
+}
diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaUtils.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaUtils.scala
new file mode 100644
index 000000000000..34264b480e08
--- /dev/null
+++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaUtils.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.armada.submit
+
+object ArmadaUtilsExceptions {
+ class MasterUrlParsingException extends RuntimeException
+}
+
+object ArmadaUtils {
+ import ArmadaUtilsExceptions._
+
+ def parseMasterUrl(masterUrl: String): (String, Int) = {
+ val tokens = masterUrl.substring("armada://".length).split(":")
+ if (tokens.length != 2) {
+ throw new MasterUrlParsingException
+ }
+ val host: String = tokens(0)
+ val port: Int = try {
+ tokens(1).toInt
+ } catch {
+ case e: NumberFormatException => -1
+ }
+ if (port < 0) {
+ throw new MasterUrlParsingException
+ }
+ (host, port)
+ }
+}
diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala
new file mode 100644
index 000000000000..a34bc47d31d9
--- /dev/null
+++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManager.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.armada
+
+// import java.io.File
+
+// import io.fabric8.kubernetes.client.Config
+// import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.{SparkConf, SparkContext}
+// import org.apache.spark.deploy.k8s.
+// {KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
+// import org.apache.spark.deploy.k8s.Config._
+// import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
+// import org.apache.spark.internal.LogKeys.MASTER_URL
+// import org.apache.spark.internal.config.TASK_MAX_FAILURES
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+// import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.util.{ThreadUtils} // {Clock, SystemClock, ThreadUtils, Utils}
+
+private[spark] class ArmadaClusterManager extends ExternalClusterManager with Logging {
+ // import SparkMasterRegex._
+
+ override def canCreate(masterURL: String): Boolean = masterURL.startsWith("armada")
+
+ private def isLocal(conf: SparkConf): Boolean =
+ true
+
+ override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
+ val maxTaskFailures = 1
+ new TaskSchedulerImpl(sc, maxTaskFailures, isLocal(sc.conf))
+ }
+
+ override def createSchedulerBackend(
+ sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = {
+ // val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK)
+
+ // TODO: Create Armada client here.
+ /*
+ val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
+ apiServerUri,
+ Some(sc.conf.get(KUBERNETES_NAMESPACE)),
+ authConfPrefix,
+ SparkKubernetesClientFactory.ClientType.Driver,
+ sc.conf,
+ defaultServiceAccountCaCrt)
+
+ if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
+ KubernetesUtils.loadPodFromTemplate(
+ kubernetesClient,
+ sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get,
+ sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME),
+ sc.conf)
+ }
+ */
+
+ val schedulerExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ "kubernetes-executor-maintenance")
+
+ /*
+ ExecutorPodsSnapshot.setShouldCheckAllContainers(
+ sc.conf.get(KUBERNETES_EXECUTOR_CHECK_ALL_CONTAINERS))
+ val sparkContainerName = sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)
+ .getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)
+ ExecutorPodsSnapshot.setSparkContainerName(sparkContainerName)
+ val subscribersExecutor = ThreadUtils
+ .newDaemonThreadPoolScheduledExecutor(
+ "kubernetes-executor-snapshots-subscribers", 2)
+ val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(subscribersExecutor, conf = sc.conf)
+
+ val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
+ sc.conf,
+ kubernetesClient,
+ snapshotsStore)
+
+ val executorPodsAllocator = makeExecutorPodsAllocator(sc, kubernetesClient, snapshotsStore)
+
+ val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
+ snapshotsStore,
+ kubernetesClient,
+ sc.conf)
+
+ val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ "kubernetes-executor-pod-polling-sync")
+ val podsPollingEventSource = new ExecutorPodsPollingSnapshotSource(
+ sc.conf, kubernetesClient, snapshotsStore, eventsPollingExecutor)
+ */
+
+ new ArmadaClusterSchedulerBackend(
+ scheduler.asInstanceOf[TaskSchedulerImpl],
+ sc,
+ schedulerExecutorService,
+ masterURL)
+ // snapshotsStore,
+ // executorPodsAllocator,
+ // executorPodsLifecycleEventHandler,
+ // podsWatchEventSource,
+ // podsPollingEventSource)
+ }
+
+ /*
+ private[armada] def makeExecutorPodsAllocator(
+ sc: SparkContext, kubernetesClient: KubernetesClient,
+ snapshotsStore: ExecutorPodsSnapshotsStore) = {
+ val executorPodsAllocatorName = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
+ case "statefulset" =>
+ classOf[StatefulSetPodsAllocator].getName
+ case "direct" =>
+ classOf[ExecutorPodsAllocator].getName
+ case fullClass =>
+ fullClass
+ }
+
+ val cls = Utils.classForName[AbstractPodsAllocator](executorPodsAllocatorName)
+ val cstr = cls.getConstructor(
+ classOf[SparkConf], classOf[org.apache.spark.SecurityManager],
+ classOf[KubernetesExecutorBuilder], classOf[KubernetesClient],
+ classOf[ExecutorPodsSnapshotsStore], classOf[Clock])
+ cstr.newInstance(
+ sc.conf,
+ sc.env.securityManager,
+ new KubernetesExecutorBuilder(),
+ kubernetesClient,
+ snapshotsStore,
+ new SystemClock())
+ }
+ */
+
+ override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+ scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+ }
+}
diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala
new file mode 100644
index 000000000000..6d3f32ab364d
--- /dev/null
+++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.armada
+
+import java.util.concurrent.ScheduledExecutorService
+
+import scala.collection.mutable.HashMap
+
+import io.armadaproject.armada.ArmadaClient
+import k8s.io.api.core.v1.generated.{Container, PodSpec, ResourceRequirements}
+import k8s.io.api.core.v1.generated.{EnvVar, EnvVarSource, ObjectFieldSelector}
+import k8s.io.apimachinery.pkg.api.resource.generated.Quantity
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
+import org.apache.spark.scheduler.{ExecutorDecommission, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
+
+
+// TODO: Implement for Armada
+private[spark] class ArmadaClusterSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext,
+ executorService: ScheduledExecutorService,
+ masterURL: String)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
+
+ // FIXME
+ private val appId = "fake_app_id_FIXME"
+
+ private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+ override def applicationId(): String = {
+ conf.getOption("spark.app.id").getOrElse(appId)
+ }
+
+
+ private def submitJob(): Unit = {
+
+ val urlArray = masterURL.split(":")
+ // Remove leading "/"'s
+ val host = if (urlArray(1).startsWith("/")) urlArray(1).substring(2) else urlArray(1)
+ val port = urlArray(2).toInt
+
+ val driverAddr = sys.env("SPARK_DRIVER_BIND_ADDRESS")
+
+
+ val driverURL = s"spark://CoarseGrainedScheduler@$driverAddr:7078"
+ val source = EnvVarSource().withFieldRef(ObjectFieldSelector()
+ .withApiVersion("v1").withFieldPath("status.podIP"))
+ val envVars = Seq(
+ EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"),
+ EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"),
+ EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"),
+ EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"),
+ EnvVar().withName("SPARK_EXECUTOR_CORES").withValue("1"),
+ EnvVar().withName("SPARK_EXECUTOR_MEMORY").withValue("512m"),
+ EnvVar().withName("SPARK_DRIVER_URL").withValue(driverURL),
+ EnvVar().withName("SPARK_EXECUTOR_POD_IP").withValueFrom(source)
+ )
+ val executorContainer = Container()
+ .withName("spark-executor")
+ .withImagePullPolicy("IfNotPresent")
+ .withImage("spark:testing")
+ .withEnv(envVars)
+ .withCommand(Seq("/opt/entrypoint.sh"))
+ .withArgs(
+ Seq(
+ "executor"
+ )
+ )
+ .withResources(
+ ResourceRequirements(
+ limits = Map(
+ "memory" -> Quantity(Option("1000Mi")),
+ "cpu" -> Quantity(Option("100m"))
+ ),
+ requests = Map(
+ "memory" -> Quantity(Option("1000Mi")),
+ "cpu" -> Quantity(Option("100m"))
+ )
+ )
+ )
+
+ val podSpec = PodSpec()
+ .withTerminationGracePeriodSeconds(0)
+ .withRestartPolicy("Never")
+ .withContainers(Seq(executorContainer))
+
+ val testJob = api.submit
+ .JobSubmitRequestItem()
+ .withPriority(0)
+ .withNamespace("default")
+ .withPodSpec(podSpec)
+
+ val client = new ArmadaClient(ArmadaClient.GetChannel(host, port))
+ val jobSubmitResponse = client.SubmitJobs("test", "executor", Seq(testJob))
+
+ logInfo(s"Driver Job Submit Response")
+ for (respItem <- jobSubmitResponse.jobResponseItems) {
+ logInfo(s"JobID: ${respItem.jobId} Error: ${respItem.error} ")
+
+ }
+ }
+ override def start(): Unit = {
+ submitJob()
+ }
+
+ override def stop(): Unit = {}
+
+ /*
+ override def doRequestTotalExecutors(
+ resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
+ //podAllocator.setTotalExpectedExecutors(resourceProfileToTotalExecs)
+ //Future.successful(true)
+ }
+ */
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
+ }
+
+ override def getExecutorIds(): Seq[String] = synchronized {
+ super.getExecutorIds()
+ }
+
+ override def createDriverEndpoint(): DriverEndpoint = {
+ new ArmadaDriverEndpoint()
+ }
+
+ private class ArmadaDriverEndpoint extends DriverEndpoint {
+ protected val execIDRequester = new HashMap[RpcAddress, String]
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] =
+ super.receiveAndReply(context)
+ /* generateExecID(context).orElse(
+ ignoreRegisterExecutorAtStoppedContext.orElse(
+ super.receiveAndReply(context))) */
+
+ override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+ val execId = addressToExecutorId.get(rpcAddress)
+ execId match {
+ case Some(id) =>
+ executorsPendingDecommission.get(id) match {
+ case Some(host) =>
+ // We don't pass through the host because by convention the
+ // host is only populated if the entire host is going away
+ // and we don't know if that's the case or just one container.
+ removeExecutor(id, ExecutorDecommission(None))
+ case _ =>
+ // Don't do anything besides disabling the executor - allow the K8s API events to
+ // drive the rest of the lifecycle decisions.
+ // If it's disconnected due to network issues eventually heartbeat will clear it up.
+ disableExecutor(id)
+ }
+ case _ =>
+ val newExecId = execIDRequester.get(rpcAddress)
+ newExecId match {
+ case Some(id) =>
+ execIDRequester -= rpcAddress
+ // Expected, executors re-establish a connection with an ID
+ case _ =>
+ logDebug(s"No executor found for $rpcAddress")
+ }
+ }
+ }
+ }
+}
diff --git a/resource-managers/armada/core/src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackendSuite.scala b/resource-managers/armada/core/src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackendSuite.scala
new file mode 100644
index 000000000000..ac002b4bd277
--- /dev/null
+++ b/resource-managers/armada/core/src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackendSuite.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.armada
+
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+
+class ArmadaClusterManagerBackendSuite extends SparkFunSuite with BeforeAndAfter {
+ private val schedulerExecutorService = new DeterministicScheduler()
+
+ test("FIXME - Fill in!") {
+ assert(1 == 1)
+ }
+}
diff --git a/resource-managers/armada/core/src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaUtilsSuite.scala b/resource-managers/armada/core/src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaUtilsSuite.scala
new file mode 100644
index 000000000000..d5fc1a6185a1
--- /dev/null
+++ b/resource-managers/armada/core/src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaUtilsSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.armada.submit
+
+import org.apache.spark.{/*SparkConf, SparkContext, SparkEnv,*/ SparkFunSuite}
+
+import ArmadaUtilsExceptions._
+
+class ArmadaUtilSuite extends SparkFunSuite {
+ test("parseMasterUrl") {
+ case class TestCase(
+ testUrl: String,
+ expectedHost: String,
+ expectedPort: Int,
+ expectException: Boolean)
+
+ val testCases = List[TestCase](
+ TestCase("armada://localhost:50051", "localhost", 50051, false),
+ TestCase("armada://malformed:url:ohno", "", 0, true),
+ TestCase("armada://badurl", "", 0, true),
+ TestCase("armada://localhost:badport", "", 0, true)
+ )
+
+ for (tc <- testCases) {
+ if (tc.expectException) {
+ var caughtException = try {
+ ArmadaUtils.parseMasterUrl(tc.testUrl)
+ false
+ } catch {
+ case e: MasterUrlParsingException => true
+ }
+ assert(caughtException)
+ } else { // no exception expected.
+ val (host, port) = ArmadaUtils.parseMasterUrl(tc.testUrl)
+ assert(tc.expectedHost == host)
+ assert(tc.expectedPort == port)
+ }
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index f9561b9aa4ed..09b7c52b5b88 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -85,6 +85,7 @@ case "$1" in
"$SPARK_HOME/bin/spark-submit"
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--conf "spark.executorEnv.SPARK_DRIVER_POD_IP=$SPARK_DRIVER_BIND_ADDRESS"
+ --conf "spark.driver.host=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@"
)