diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java index f97e7c9c3eae2..7ecfa9b3d5d8c 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java @@ -67,6 +67,8 @@ public class HttpNativeExecutionTaskResultFetcher private ScheduledFuture scheduledFuture; + private volatile boolean completed; + private long token; public HttpNativeExecutionTaskResultFetcher( @@ -134,6 +136,11 @@ private void throwIfFailed() private void doGetResults() { + if (completed && scheduledFuture != null) { + scheduledFuture.cancel(false); + return; + } + if (bufferMemoryBytes.longValue() >= MAX_BUFFER_SIZE.toBytes()) { return; } @@ -172,8 +179,11 @@ private void onSuccess(PageBufferClient.PagesResponse pagesResponse) } token = nextToken; if (pagesResponse.isClientComplete()) { + completed = true; workerClient.abortResultsAsync(); - scheduledFuture.cancel(false); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } } if (!pages.isEmpty()) { synchronized (taskHasResult) {