diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java index 1e964f597e52d..dd1e1b653521d 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java @@ -71,6 +71,13 @@ public class LocalDispatchQuery private final SettableFuture submitted = SettableFuture.create(); private final AtomicReference> resourceGroupQueryLimits = new AtomicReference<>(Optional.empty()); + // Tracks whether the query has been successfully submitted to SqlQueryManager + // for execution. Once true, SqlQueryManager's finalQueryInfoListener will fire + // queryCompletedEvent with real statistics, so LocalDispatchQuery must NOT fire + // queryImmediateFailureEvent (which emits zeroed stats and would produce a + // duplicate event that can overwrite the real data). + private volatile boolean sentForExecution; + private final boolean retry; private final QueryPrerequisites queryPrerequisites; @@ -208,6 +215,10 @@ private void startExecution(QueryExecution queryExecution, boolean isDispatching try { resourceGroupQueryLimits.get().ifPresent(queryExecution::setResourceGroupQueryLimits); querySubmitter.accept(queryExecution); + // Mark only after successful submission. If querySubmitter throws, + // SqlQueryManager won't have the query, so we still need + // queryImmediateFailureEvent from fail() below. + sentForExecution = true; } catch (Throwable t) { // this should never happen but be safe @@ -344,7 +355,14 @@ public Session getSession() public void fail(Throwable throwable) { if (stateMachine.transitionToFailed(throwable)) { - queryMonitor.queryImmediateFailureEvent(stateMachine.getBasicQueryInfo(Optional.empty()), toFailure(throwable)); + // Only emit the immediate failure event if the query was never sent to + // SqlQueryManager. Post-submission, SqlQueryManager's finalQueryInfoListener + // will emit queryCompletedEvent with real execution statistics. Emitting + // queryImmediateFailureEvent here as well would produce a duplicate event + // with all-zero stats that can overwrite the real data. + if (!sentForExecution) { + queryMonitor.queryImmediateFailureEvent(stateMachine.getBasicQueryInfo(Optional.empty()), toFailure(throwable)); + } } } @@ -352,10 +370,14 @@ public void fail(Throwable throwable) public void cancel() { if (stateMachine.transitionToCanceled()) { - BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty()); - ExecutionFailureInfo failureInfo = queryInfo.getFailureInfo(); - failureInfo = failureInfo != null ? failureInfo : toFailure(new PrestoException(USER_CANCELED, "Query was canceled")); - queryMonitor.queryImmediateFailureEvent(queryInfo, failureInfo); + // Same guard as fail(): avoid emitting zeroed-stats event when + // SqlQueryManager will handle the completion event with real stats. + if (!sentForExecution) { + BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty()); + ExecutionFailureInfo failureInfo = queryInfo.getFailureInfo(); + failureInfo = failureInfo != null ? failureInfo : toFailure(new PrestoException(USER_CANCELED, "Query was canceled")); + queryMonitor.queryImmediateFailureEvent(queryInfo, failureInfo); + } } } diff --git a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java index 70b85cbcf36e7..c57ed6fc96576 100644 --- a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java +++ b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java @@ -70,6 +70,7 @@ import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY; import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_TASK; +import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED; @@ -441,6 +442,108 @@ public void testQueryDispatched() assertFalse(eventListener.getQueryCompletedEvent().isPresent()); } + /** + * Verifies that once a query has been successfully submitted to SqlQueryManager, + * LocalDispatchQuery.fail() does NOT fire queryImmediateFailureEvent (which emits + * all-zero stats). This prevents the bug where DispatchManager's QueryTracker + * races with SqlQueryManager's QueryTracker on time limit enforcement, causing + * a zeroed-stats event to overwrite real execution statistics. + */ + @Test + public void testFailAfterDispatchDoesNotEmitImmediateFailureEvent() + { + QueryStateMachine stateMachine = createStateMachine(); + CountingEventListener eventListener = new CountingEventListener(); + + LocalDispatchQuery query = new LocalDispatchQuery( + stateMachine, + createQueryMonitor(eventListener), + immediateFuture(new MockQueryExecution()), + createClusterSizeMonitor(0), + directExecutor(), + dispatchQuery -> {}, + execution -> {}, // querySubmitter succeeds, so sentForExecution = true + false, + QUERY_PREREQUISITES); + + // Dispatch the query — this calls querySubmitter.accept() and sets sentForExecution + query.startWaitingForResources(); + assertEquals(query.getBasicQueryInfo().getState(), DISPATCHING); + + // Simulate DispatchManager's QueryTracker firing a timeout on this already-dispatched query + query.fail(new PrestoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit")); + + assertEquals(query.getBasicQueryInfo().getState(), FAILED); + assertEquals(query.getBasicQueryInfo().getErrorCode(), EXCEEDED_TIME_LIMIT.toErrorCode()); + // The key assertion: no queryImmediateFailureEvent should have been fired. + // SqlQueryManager's finalQueryInfoListener is responsible for emitting the + // completion event with real stats for dispatched queries. + assertFalse(eventListener.getQueryCompletedEvent().isPresent(), + "queryImmediateFailureEvent should not be fired after query has been sent for execution"); + } + + /** + * Verifies that cancellation after dispatch also does NOT fire + * queryImmediateFailureEvent, same as fail(). + */ + @Test + public void testCancelAfterDispatchDoesNotEmitImmediateFailureEvent() + { + QueryStateMachine stateMachine = createStateMachine(); + CountingEventListener eventListener = new CountingEventListener(); + + LocalDispatchQuery query = new LocalDispatchQuery( + stateMachine, + createQueryMonitor(eventListener), + immediateFuture(new MockQueryExecution()), + createClusterSizeMonitor(0), + directExecutor(), + dispatchQuery -> {}, + execution -> {}, + false, + QUERY_PREREQUISITES); + + query.startWaitingForResources(); + assertEquals(query.getBasicQueryInfo().getState(), DISPATCHING); + + query.cancel(); + + assertEquals(query.getBasicQueryInfo().getState(), FAILED); + assertEquals(query.getBasicQueryInfo().getErrorCode(), USER_CANCELED.toErrorCode()); + assertFalse(eventListener.getQueryCompletedEvent().isPresent(), + "queryImmediateFailureEvent should not be fired after query has been sent for execution"); + } + + /** + * Verifies that fail() before dispatch still correctly fires + * queryImmediateFailureEvent — the pre-dispatch path must remain functional. + */ + @Test + public void testFailBeforeDispatchStillEmitsImmediateFailureEvent() + { + QueryStateMachine stateMachine = createStateMachine(); + CountingEventListener eventListener = new CountingEventListener(); + + LocalDispatchQuery query = new LocalDispatchQuery( + stateMachine, + createQueryMonitor(eventListener), + immediateFuture(new MockQueryExecution()), + createClusterSizeMonitor(0), + directExecutor(), + dispatchQuery -> {}, + execution -> {}, + false, + QUERY_PREREQUISITES); + + // Fail before dispatching — sentForExecution is still false + query.fail(new PrestoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum queued time")); + + assertEquals(query.getBasicQueryInfo().getState(), FAILED); + assertTrue(eventListener.getQueryCompletedEvent().isPresent(), + "queryImmediateFailureEvent should be fired for pre-dispatch failures"); + assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), EXCEEDED_TIME_LIMIT.toErrorCode()); + } + private ClusterSizeMonitor createClusterSizeMonitor(int minimumNodes) { return new ClusterSizeMonitor(new InMemoryNodeManager(), true, minimumNodes, minimumNodes, new Duration(10, MILLISECONDS), 1, 1, new Duration(1, SECONDS), new Duration(1, SECONDS), 0, false);