diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java index 70b703dff463b..784ff8278b99b 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskInfoFetcher.java @@ -18,6 +18,7 @@ import com.facebook.presto.server.RequestErrorTracker; import com.facebook.presto.server.smile.BaseResponse; import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient; +import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; import com.facebook.presto.spi.PrestoException; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -104,6 +105,12 @@ public void onSuccess(BaseResponse result) { log.debug("TaskInfoCallback success %s", result.getValue().getTaskId()); taskInfo.set(result.getValue()); + + // Update Spark Accumulators for spark internal metrics + // Note: Updating here also serves as a heartbeat to spark scheduler + // that the task is making progress + PrestoSparkStatsCollectionUtils.collectMetrics(taskInfo.get()); + if (result.getValue().getTaskStatus().getState().isDone()) { synchronized (taskFinished) { taskFinished.notifyAll(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java index 3f10fc5d985ad..105d08292ffdc 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/task/PrestoSparkNativeTaskExecutorFactory.java @@ -50,7 +50,6 @@ import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcessFactory; import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator; import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleWriteInfo; -import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.page.SerializedPage; import com.facebook.presto.spi.plan.OutputNode; @@ -282,9 +281,6 @@ private static void completeTask(CollectionAccumulator taskI } SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(serializeZstdCompressed(taskInfoCodec, taskInfoOptional.get())); taskInfoCollector.add(serializedTaskInfo); - - // Update Spark Accumulators for spark internal metrics - PrestoSparkStatsCollectionUtils.collectMetrics(taskInfoOptional.get()); } private static void processTaskInfoForErrorsOrCompletion(TaskInfo taskInfo)