Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
b4d625c
Work in progress supporting statefulsets for Spark. TBD if we want st…
holdenk Jul 9, 2021
38676ab
It compiles, yaygit diff!
holdenk Jul 9, 2021
09eb220
Add applicationId to setTotalExpectedExecutors so that we can use thi…
holdenk Jul 9, 2021
922b61a
Put in a restart policy of always. Next TODO (likely we can look at t…
holdenk Jul 9, 2021
2316084
Add podname parsing logic based on https://github.com/spark-volcano-w…
holdenk Jul 9, 2021
1e004d3
Try and plumb through the SPARK_EXECUTOR_POD_NAME so we can process i…
holdenk Jul 9, 2021
10c2922
Move the restart policy change into basic exec feature step instead o…
holdenk Jul 9, 2021
9434bff
Move more of the hijinks into the featuresteps where they fit
holdenk Jul 9, 2021
39ad5f6
Add a parallel pod management property, lets hope this doesn't screw …
holdenk Jul 9, 2021
74e4a67
Fix typo )) -> )
holdenk Jul 13, 2021
5f3bf00
Get it to compile again
holdenk Jul 14, 2021
bb27e9f
Turns out we do want to track snapshots so know about dead pods earli…
holdenk Jul 14, 2021
4d343a4
Refactor the stateful allocator out from the base allocator (TODO: ac…
holdenk Jul 14, 2021
e6fc922
Use scale to update statefulset scale
holdenk Jul 14, 2021
d7a094f
Construct the pod allocator based on user configuration.
holdenk Jul 14, 2021
1c8556f
Start adding new tests (slowly) for statefulset allocator and update …
holdenk Jul 14, 2021
7522169
Initial statefulset mock test
holdenk Jul 14, 2021
64bb5a7
Add second resource profile and scaleup test to StatefulsetAllocatorS…
holdenk Jul 14, 2021
bc15209
Validate the deletions as well
holdenk Jul 14, 2021
37c49a9
Verify that we can allocate with statefulsets. Next up: clean up the …
holdenk Jul 21, 2021
dc602be
Start work to cleanup and validate removal of statefulset on driver exit
holdenk Jul 21, 2021
3c5fb3d
Fix addowner ref
holdenk Jul 21, 2021
57b58e8
Delegate the pod cleanup to the pod allocator so that the statefulset…
holdenk Jul 22, 2021
da8cc6c
Use eventually when checking for set delition because it depends on t…
holdenk Jul 22, 2021
a7da7b2
Make the KubernetesSuite pod log collection resilent to pending pods.
holdenk Jul 22, 2021
799e2ff
Add a minireadwrite test for use with PVCs and not proper DFS
holdenk Jul 23, 2021
ee176f0
Add some tests around the new allocator with PVs
holdenk Jul 23, 2021
a48beb6
maaaybe exec mount
holdenk Jul 27, 2021
500c080
Revert "maaaybe exec mount"
holdenk Jul 27, 2021
d626dc7
Update the mini-read-write test to handle the fact the exec PVCs are …
holdenk Jul 28, 2021
f6540e1
Switch the PV tests back tohaving pvTestTag and MiniKubeTag as needed…
holdenk Jul 28, 2021
5dc1bc4
Scala style cleanups
holdenk Jul 28, 2021
b1ba08c
Delete block when putting over an existing block incase our in-memory…
holdenk Jul 28, 2021
b151d8f
We do the deletion of the pods inside of the executorpodsallocator no…
holdenk Jul 29, 2021
efd2ae7
Handle empty pod specs
holdenk Jul 30, 2021
5e0e939
Update StatefulsetPodsAllocator.scala
holdenk Jul 30, 2021
d8503e7
code review feedback, cleanup the SPARK_LOCAL_DIRS when executing ent…
holdenk Aug 4, 2021
e8eece5
Expose the AbstractPodsAllocator as a @DeveloperApi as suggested/requ…
holdenk Aug 4, 2021
08a24d9
Move the getItems inside of the eventually otherwise we still could h…
holdenk Aug 4, 2021
5362f73
pvTestTag was removed upstream
holdenk Aug 4, 2021
a74598e
Update entrypoint.sh
holdenk Aug 5, 2021
ffa5d24
Fix up how we launch pods allocators
holdenk Aug 5, 2021
ec8bf09
Make a new entry point for executors on Kube so they can request the …
holdenk Aug 5, 2021
2d6dc1c
Add unit tests for dynamically fetching exec id and constructing the …
holdenk Aug 6, 2021
df601c2
Don't parse podnames anymore to get exec ids instead depend on the la…
holdenk Aug 6, 2021
df2af02
Remove the SparkException import we don't need anymore
holdenk Aug 6, 2021
6db2b9f
Add the KubernetesClusterManagerSuite
holdenk Aug 6, 2021
65d89a0
Work in progress supporting statefulsets for Spark. TBD if we want st…
holdenk Jul 9, 2021
88d345c
It compiles, yaygit diff!
holdenk Jul 9, 2021
908d085
Add applicationId to setTotalExpectedExecutors so that we can use thi…
holdenk Jul 9, 2021
7000ff5
Put in a restart policy of always. Next TODO (likely we can look at t…
holdenk Jul 9, 2021
f5375ed
Add podname parsing logic based on https://github.com/spark-volcano-w…
holdenk Jul 9, 2021
b1b04fc
Try and plumb through the SPARK_EXECUTOR_POD_NAME so we can process i…
holdenk Jul 9, 2021
29470f6
Move the restart policy change into basic exec feature step instead o…
holdenk Jul 9, 2021
773ae75
Move more of the hijinks into the featuresteps where they fit
holdenk Jul 9, 2021
315807b
Add a parallel pod management property, lets hope this doesn't screw …
holdenk Jul 9, 2021
f92057e
Fix typo )) -> )
holdenk Jul 13, 2021
e4392aa
Get it to compile again
holdenk Jul 14, 2021
740a16e
Turns out we do want to track snapshots so know about dead pods earli…
holdenk Jul 14, 2021
9ebee9a
Refactor the stateful allocator out from the base allocator (TODO: ac…
holdenk Jul 14, 2021
01a1a97
Use scale to update statefulset scale
holdenk Jul 14, 2021
35c939d
Construct the pod allocator based on user configuration.
holdenk Jul 14, 2021
2052685
Start adding new tests (slowly) for statefulset allocator and update …
holdenk Jul 14, 2021
97255ab
Initial statefulset mock test
holdenk Jul 14, 2021
5a8c298
Add second resource profile and scaleup test to StatefulsetAllocatorS…
holdenk Jul 14, 2021
58dec2c
Validate the deletions as well
holdenk Jul 14, 2021
596236a
Verify that we can allocate with statefulsets. Next up: clean up the …
holdenk Jul 21, 2021
d26e2d9
Start work to cleanup and validate removal of statefulset on driver exit
holdenk Jul 21, 2021
029c682
Fix addowner ref
holdenk Jul 21, 2021
2285340
Delegate the pod cleanup to the pod allocator so that the statefulset…
holdenk Jul 22, 2021
9a51151
Use eventually when checking for set delition because it depends on t…
holdenk Jul 22, 2021
a2d4183
Make the KubernetesSuite pod log collection resilent to pending pods.
holdenk Jul 22, 2021
cd09bc4
Add a minireadwrite test for use with PVCs and not proper DFS
holdenk Jul 23, 2021
d08dd7d
Add some tests around the new allocator with PVs
holdenk Jul 23, 2021
3633936
maaaybe exec mount
holdenk Jul 27, 2021
ce9299b
Revert "maaaybe exec mount"
holdenk Jul 27, 2021
71b9674
Update the mini-read-write test to handle the fact the exec PVCs are …
holdenk Jul 28, 2021
7c21bbc
Switch the PV tests back tohaving pvTestTag and MiniKubeTag as needed…
holdenk Jul 28, 2021
a3c5103
Scala style cleanups
holdenk Jul 28, 2021
3fee3bb
Delete block when putting over an existing block incase our in-memory…
holdenk Jul 28, 2021
f290aef
We do the deletion of the pods inside of the executorpodsallocator no…
holdenk Jul 29, 2021
993ff65
Handle empty pod specs
holdenk Jul 30, 2021
788005d
Update StatefulsetPodsAllocator.scala
holdenk Jul 30, 2021
40c2db3
code review feedback, cleanup the SPARK_LOCAL_DIRS when executing ent…
holdenk Aug 4, 2021
048ea6f
Expose the AbstractPodsAllocator as a @DeveloperApi as suggested/requ…
holdenk Aug 4, 2021
bd79229
Move the getItems inside of the eventually otherwise we still could h…
holdenk Aug 4, 2021
8c81112
pvTestTag was removed upstream
holdenk Aug 4, 2021
2364e4b
Update entrypoint.sh
holdenk Aug 5, 2021
108503e
Fix up how we launch pods allocators
holdenk Aug 5, 2021
ec27fb5
Make a new entry point for executors on Kube so they can request the …
holdenk Aug 5, 2021
7f331c0
Add unit tests for dynamically fetching exec id and constructing the …
holdenk Aug 6, 2021
d87cda2
Don't parse podnames anymore to get exec ids instead depend on the la…
holdenk Aug 6, 2021
9715697
Remove the SparkException import we don't need anymore
holdenk Aug 6, 2021
946d4a7
Add the KubernetesClusterManagerSuite
holdenk Aug 6, 2021
9a432c0
Merge branch 'master' into SPARK-36058-support-replicasets-or-job-api…
holdenk Aug 13, 2021
1ab835a
Merge branch 'SPARK-36058-support-replicasets-or-job-api-like-things'…
holdenk Aug 13, 2021
2e4cd93
Minimize changes by dropping appId from setTotalExpectedExecutors and…
holdenk Aug 13, 2021
7aa79f4
Merge branch 'master' into SPARK-36058-support-replicasets-or-job-api…
holdenk Aug 17, 2021
d1c172d
Merge branch 'master' into SPARK-36058-support-replicasets-or-job-api…
holdenk Aug 17, 2021
fae1cbd
Make sure we call start before setting the total expected execs
holdenk Aug 18, 2021
49ab072
Throw an exception if execs are set before start so it's clearer than…
holdenk Aug 18, 2021
362172c
Merge branch 'master' into SPARK-36058-support-replicasets-or-job-api…
holdenk Aug 20, 2021
0502e19
Merge branch 'master' into SPARK-36058-support-replicasets-or-job-api…
holdenk Aug 23, 2021
6e660a4
Update the Usage instructions for the MiniReadWriteTest
holdenk Aug 23, 2021
a638719
CR feedback from @kbendick - use classof, change config param name, a…
holdenk Aug 23, 2021
4f3c0cc
Merge branch 'master' into SPARK-36058-support-replicasets-or-job-api…
holdenk Aug 24, 2021
5be1942
Merge branch 'master' into SPARK-36058-support-replicasets-or-job-api…
holdenk Aug 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class CoarseGrainedExecutorBackend(

private implicit val formats = DefaultFormats

private[executor] val stopping = new AtomicBoolean(false)
private[spark] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ private[spark] class DiskStore(
*/
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
logWarning(s"Block $blockId is already present in the disk store")
try {
diskManager.getFile(blockId).delete()
} catch {
case e: Exception =>
throw new IllegalStateException(
s"Block $blockId is already present in the disk store and could not delete it $e")
}
}
logDebug(s"Attempting to put block $blockId")
val startTimeNs = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples

import java.io.File
import java.io.PrintWriter

import scala.io.Source._

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.Utils

/**
* Simple test for reading and writing to a distributed
* file system. This example does the following:
*
* 1. Reads local file
* 2. Computes word count on local file
* 3. Writes local file to a local dir on each executor
* 4. Reads the file back from each exec
* 5. Computes word count on the file using Spark
* 6. Compares the word count results
*/
object MiniReadWriteTest {

private val NPARAMS = 1

private def readFile(filename: String): List[String] = {
Utils.tryWithResource(fromFile(filename))(_.getLines().toList)
}

private def printUsage(): Unit = {
val usage = """Mini Read-Write Test
|Usage: localFile
|localFile - (string) location of local file to distribute to executors.""".stripMargin

println(usage)
}

private def parseArgs(args: Array[String]): File = {
if (args.length != NPARAMS) {
printUsage()
System.exit(1)
}

var i = 0

val localFilePath = new File(args(i))
if (!localFilePath.exists) {
System.err.println(s"Given path (${args(i)}) does not exist")
printUsage()
System.exit(1)
}

if (!localFilePath.isFile) {
System.err.println(s"Given path (${args(i)}) is not a file")
printUsage()
System.exit(1)
}
localFilePath
}

def runLocalWordCount(fileContents: List[String]): Int = {
fileContents.flatMap(_.split(" "))
.flatMap(_.split("\t"))
.filter(_.nonEmpty)
.groupBy(w => w)
.mapValues(_.size)
.values
.sum
}

def main(args: Array[String]): Unit = {
val localFilePath = parseArgs(args)

println(s"Performing local word count from ${localFilePath}")
val fileContents = readFile(localFilePath.toString())
println(s"File contents are ${fileContents}")
val localWordCount = runLocalWordCount(fileContents)

println("Creating SparkSession")
val spark = SparkSession
.builder
.appName("Mini Read Write Test")
.getOrCreate()

println("Writing local file to executors")

// uses the fact default parallelism is greater than num execs
val misc = spark.sparkContext.parallelize(1.to(10))
misc.foreachPartition {
x =>
new PrintWriter(localFilePath) {
try {
write(fileContents.mkString("\n"))
} finally {
close()
}}
}

println("Reading file from execs and running Word Count")
val readFileRDD = spark.sparkContext.textFile(localFilePath.toString())

val dWordCount = readFileRDD
.flatMap(_.split(" "))
.flatMap(_.split("\t"))
.filter(_.nonEmpty)
.map(w => (w, 1))
.countByKey()
.values
.sum

spark.stop()
if (localWordCount == dWordCount) {
println(s"Success! Local Word Count $localWordCount and " +
s"D Word Count $dWordCount agree.")
} else {
println(s"Failure! Local Word Count $localWordCount " +
s"and D Word Count $dWordCount disagree.")
}
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_ALLOCATION_PODS_ALLOCATOR =
ConfigBuilder("spark.kubernetes.allocation.pods.allocator")
.doc("Allocator to use for pods. Possible values are direct (the default) and statefulset " +
", or a full class name of a class implementing AbstractPodsAllocator. " +
"Future version may add Job or replicaset. This is a developer API and may change " +
"or be removed at anytime.")
.version("3.3.0")
.stringConf
.createWithDefault("direct")

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private[spark] object Constants {
val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_CLASSPATH = "SPARK_CLASSPATH"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ private[spark] class BasicExecutorFeatureStep(
.withNewFieldRef("v1", "status.podIP")
.build())
.build())
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_NAME)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "metadata.name")
.build())
.build())
} ++ {
if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) {
Option(secMgr.getSecretKey()).map { authSecret =>
Expand Down Expand Up @@ -260,16 +267,22 @@ private[spark] class BasicExecutorFeatureStep(
.withUid(pod.getMetadata.getUid)
.build()
}

val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
case "statefulset" => "Always"
case _ => "Never"
}
val executorPodBuilder = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.addToLabels(kubernetesConf.labels.asJava)
.addToLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfile.id.toString)
.addToAnnotations(kubernetesConf.annotations.asJava)
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withRestartPolicy(policy)
.addToNodeSelector(kubernetesConf.nodeSelector.asJava)
.addToNodeSelector(kubernetesConf.executorNodeSelector.asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.api.model.Pod

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.resource.ResourceProfile


/**
* :: DeveloperApi ::
* A abstract interface for allowing different types of pods allocation.
*
* The internal Spark implementations are [[StatefulsetPodsAllocator]]
* and [[ExecutorPodsAllocator]]. This may be useful for folks integrating with custom schedulers
* such as Volcano, Yunikorn, etc.
*
* This API may change or be removed at anytime.
*
* @since 3.3.0
*/
@DeveloperApi
abstract class AbstractPodsAllocator {
/*
* Set the total expected executors for an application
*/
def setTotalExpectedExecutors(resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit
/*
* Reference to driver pod.
*/
def driverPod: Option[Pod]
/*
* If the pod for a given exec id is deleted.
*/
def isDeleted(executorId: String): Boolean
/*
* Start hook.
*/
def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit
/*
* Stop hook
*/
def stop(applicationId: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.util.{Clock, Utils}

private[spark] class ExecutorPodsAllocator(
class ExecutorPodsAllocator(
conf: SparkConf,
secMgr: SecurityManager,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
clock: Clock) extends Logging {
clock: Clock) extends AbstractPodsAllocator() with Logging {

private val EXECUTOR_ID_COUNTER = new AtomicInteger(0)

Expand Down Expand Up @@ -97,12 +97,15 @@ private[spark] class ExecutorPodsAllocator(

private var lastSnapshot = ExecutorPodsSnapshot()

private var appId: String = _

// Executors that have been deleted by this allocator but not yet detected as deleted in
// a snapshot from the API server. This is used to deny registration from these executors
// if they happen to come up before the deletion takes effect.
@volatile private var deletedExecutorIds = Set.empty[Long]

def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
appId = applicationId
driverPod.foreach { pod =>
// Wait until the driver pod is ready before starting executors, as the headless service won't
// be resolvable by DNS until the driver pod is ready.
Expand Down Expand Up @@ -461,6 +464,16 @@ private[spark] class ExecutorPodsAllocator(
true
}
}

override def stop(applicationId: String): Unit = {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
}
}
}

private[spark] object ExecutorPodsAllocator {
Expand All @@ -471,5 +484,4 @@ private[spark] object ExecutorPodsAllocator {
val r = slots % consumers.size
consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - r).map((_, d))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ object ExecutorPodsSnapshot extends Logging {
}

private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = {
executorPods.map { pod =>
(pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod))
executorPods.flatMap { pod =>
pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) match {
case "EXECID" | null =>
// The exec label has not yet been assigned
None
case id =>
// We have a "real" id label
Some((id.toLong, toState(pod)))
}
}.toMap
}

Expand Down
Loading