Skip to content

Commit 041cd5d

Browse files
yangwweidongjoon-hyun
authored andcommitted
[SPARK-37049][K8S] executorIdleTimeout should check creationTimestamp instead of startTime
SPARK-33099 added the support to respect `spark.dynamicAllocation.executorIdleTimeout` in `ExecutorPodsAllocator`. However, when it checks if a pending executor pod is timed out, it checks against the pod's [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667), see code [here](https://github.com/apache/spark/blob/c2ba498ff678ddda034cedf45cc17fbeefe922fd/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala#L459). A pending pod `startTime` is empty, and this causes the function `isExecutorIdleTimedOut()` always return true for pending pods. This can be reproduced locally, run the following job ``` ${SPARK_HOME}/bin/spark-submit --master k8s://http://localhost:8001 --deploy-mode cluster --name spark-group-example \ --master k8s://http://localhost:8001 --deploy-mode cluster \ --class org.apache.spark.examples.GroupByTest \ --conf spark.executor.instances=1 \ --conf spark.kubernetes.namespace=spark-test \ --conf spark.kubernetes.executor.request.cores=1 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.shuffleTracking.enabled=true \ --conf spark.shuffle.service.enabled=false \ --conf spark.kubernetes.container.image=local/spark:3.3.0 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar \ 1000 1000 100 1000 ``` the local cluster doesn't have enough resources to run more than 4 executors, the rest of the executor pods will be pending. The job will have task backlogs and triggers to request more executors from K8s: ``` 21/10/19 22:51:45 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1 running: 0. 21/10/19 22:51:51 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1. 21/10/19 22:51:52 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 4 running: 2. 21/10/19 22:51:53 INFO ExecutorPodsAllocator: Going to request 4 executors from Kubernetes for ResourceProfile Id: 0, target: 8 running: 4. ... 21/10/19 22:52:14 INFO ExecutorPodsAllocator: Deleting 39 excess pod requests (23,59,32,41,50,68,35,44,17,8,53,62,26,71,11,56,29,38,47,20,65,5,14,46,64,73,55,49,40,67,58,13,22,31,7,16,52,70,43). 21/10/19 22:52:18 INFO ExecutorPodsAllocator: Deleting 28 excess pod requests (25,34,61,37,10,19,28,60,69,63,45,54,72,36,18,9,27,21,57,12,48,30,39,66,15,42,24,33). ``` At `22:51:45`, it starts to request executors; and at `22:52:14` it starts to delete excess executor pods. This is 29s but spark.dynamicAllocation.executorIdleTimeout is set to 60s. The config was not honored. ### What changes were proposed in this pull request? Change the check from using pod's `startTime` to `creationTimestamp`. [creationTimestamp](https://github.com/kubernetes/apimachinery/blob/e6c90c4366be1504309a6aafe0d816856450f36a/pkg/apis/meta/v1/types.go#L193-L201) is the timestamp when a pod gets created on K8s: ``` // CreationTimestamp is a timestamp representing the server time when this object was // created. It is not guaranteed to be set in happens-before order across separate operations. // Clients may not set this value. It is represented in RFC3339 form and is in UTC. ``` [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667) is the timestamp when pod gets started: ``` // RFC 3339 date and time at which the object was acknowledged by the Kubelet. // This is before the Kubelet pulled the container image(s) for the pod. // +optional ``` a pending pod's startTime is empty. Here is a example of a pending pod: ``` NAMESPACE NAME READY STATUS RESTARTS AGE default pending-pod-example 0/1 Pending 0 2s kubectl get pod pending-pod-example -o yaml | grep creationTimestamp ---> creationTimestamp: "2021-10-19T16:17:52Z" // pending pod has no startTime kubectl get pod pending-pod-example -o yaml | grep startTime ---> // empty // running pod has startTime set to the timestamp when the pod gets started kubectl get pod coredns-558bd4d5db-6qrtx -n kube-system -o yaml | grep startTime f:startTime: {} ---> startTime: "2021-08-04T23:44:44Z" ``` ### Why are the changes needed? This fixed the issue that `spark.dynamicAllocation.executorIdleTimeout` currently is not honored by pending executor pods. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The PR includes the UT changes, that has the testing coverage for this issue. Closes apache#34319 from yangwwei/SPARK-37049. Authored-by: Weiwei Yang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b07dd1a commit 041cd5d

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,11 +456,11 @@ class ExecutorPodsAllocator(
456456

457457
private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = {
458458
try {
459-
val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli()
460-
currentTime - startTime > executorIdleTimeout
459+
val creationTime = Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli()
460+
currentTime - creationTime > executorIdleTimeout
461461
} catch {
462-
case _: Exception =>
463-
logDebug(s"Cannot get startTime of pod ${state.pod}")
462+
case e: Exception =>
463+
logError(s"Cannot get the creationTimestamp of the pod: ${state.pod}", e)
464464
true
465465
}
466466
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ object ExecutorLifecycleTestUtils {
6464

6565
def pendingExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
6666
new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
67+
.editOrNewMetadata()
68+
.withCreationTimestamp(Instant.now.toString)
69+
.endMetadata()
6770
.editOrNewStatus()
6871
.withPhase("pending")
69-
.withStartTime(Instant.now.toString)
7072
.endStatus()
7173
.build()
7274
}

0 commit comments

Comments
 (0)