From 7fb981425ac9a5a9996b23b511764b8c3984f3e1 Mon Sep 17 00:00:00 2001 From: Shrinidhi Joshi Date: Wed, 12 Jul 2023 12:12:00 -0700 Subject: [PATCH] [native pos] Propagate taskInfo updates to Spark metrics Currently, we update the spark metrics accumulators only after the task is complete. This means that spark sees no progress or activity on the task until the task is complete. This also means that we can't effectively use the stalled tasks killer in spark scheduler as it relies on spark accumulators being updated regularly to measure if a task is making progress. This commit fixes this by ensuring that everytime we receive TaskInfo from CPP process we immediately update Spark accumulators to let spark know that we are making progress --- .../nativeprocess/HttpNativeExecutionTaskInfoFetcher.java | 7 +++++++ .../task/PrestoSparkNativeTaskExecutorFactory.java | 4 ---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java index 70b703dff463b..784ff8278b99b 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java @@ -18,6 +18,7 @@ import com.facebook.presto.server.RequestErrorTracker; import com.facebook.presto.server.smile.BaseResponse; import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient; +import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; import com.facebook.presto.spi.PrestoException; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -104,6 +105,12 @@ public void onSuccess(BaseResponse result) { log.debug("TaskInfoCallback success %s", result.getValue().getTaskId()); taskInfo.set(result.getValue()); + + // Update Spark Accumulators for spark internal metrics + // Note: Updating here also serves as a heartbeat to spark scheduler + // that the task is making progress + PrestoSparkStatsCollectionUtils.collectMetrics(taskInfo.get()); + if (result.getValue().getTaskStatus().getState().isDone()) { synchronized (taskFinished) { taskFinished.notifyAll(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java index 3f10fc5d985ad..105d08292ffdc 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java @@ -50,7 +50,6 @@ import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcessFactory; import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator; import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleWriteInfo; -import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.page.SerializedPage; import com.facebook.presto.spi.plan.OutputNode; @@ -282,9 +281,6 @@ private static void completeTask(CollectionAccumulator taskI } SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(serializeZstdCompressed(taskInfoCodec, taskInfoOptional.get())); taskInfoCollector.add(serializedTaskInfo); - - // Update Spark Accumulators for spark internal metrics - PrestoSparkStatsCollectionUtils.collectMetrics(taskInfoOptional.get()); } private static void processTaskInfoForErrorsOrCompletion(TaskInfo taskInfo)