diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java index 881ae57098acc..5b5beab338a88 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java @@ -23,6 +23,8 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.page.SerializedPage; +import javax.annotation.concurrent.GuardedBy; + import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -67,10 +69,9 @@ public class HttpNativeExecutionTaskResultFetcher private final Object taskHasResult; private final AtomicReference lastException = new AtomicReference<>(); + @GuardedBy("this") private ScheduledFuture scheduledFuture; - private volatile boolean completed; - private long token; public HttpNativeExecutionTaskResultFetcher( @@ -86,7 +87,7 @@ public HttpNativeExecutionTaskResultFetcher( this.taskHasResult = requireNonNull(taskHasResult, "taskHasResult is null"); } - public void start() + public synchronized void start() { scheduledFuture = scheduler.scheduleAtFixedRate(this::doGetResults, 0, @@ -94,7 +95,7 @@ public void start() FETCH_INTERVAL.getUnit()); } - public void stop(boolean success) + public synchronized void stop(boolean success) { if (scheduledFuture != null) { scheduledFuture.cancel(false); @@ -129,7 +130,7 @@ public boolean hasPage() return !pageBuffer.isEmpty(); } - private void throwIfFailed() + private synchronized void throwIfFailed() { if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) { Throwable failure = lastException.get(); @@ -140,11 +141,6 @@ private void throwIfFailed() private void doGetResults() { - if (completed && scheduledFuture != null) { - scheduledFuture.cancel(false); - return; - } - if (bufferMemoryBytes.longValue() >= MAX_BUFFER_SIZE.toBytes()) { return; } @@ -159,7 +155,7 @@ private void doGetResults() } } - private void onSuccess(PageBufferClient.PagesResponse pagesResponse) + private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse) { List pages = pagesResponse.getPages(); long bytes = 0; @@ -185,7 +181,6 @@ private void onSuccess(PageBufferClient.PagesResponse pagesResponse) } token = nextToken; if (pagesResponse.isClientComplete()) { - completed = true; workerClient.abortResultsAsync(taskId); if (scheduledFuture != null) { scheduledFuture.cancel(false);