Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public class LocalDispatchQuery
private final SettableFuture<?> submitted = SettableFuture.create();
private final AtomicReference<Optional<ResourceGroupQueryLimits>> resourceGroupQueryLimits = new AtomicReference<>(Optional.empty());

// Tracks whether the query has been successfully submitted to SqlQueryManager
// for execution. Once true, SqlQueryManager's finalQueryInfoListener will fire
// queryCompletedEvent with real statistics, so LocalDispatchQuery must NOT fire
// queryImmediateFailureEvent (which emits zeroed stats and would produce a
// duplicate event that can overwrite the real data).
private volatile boolean sentForExecution;

private final boolean retry;

private final QueryPrerequisites queryPrerequisites;
Expand Down Expand Up @@ -208,6 +215,10 @@ private void startExecution(QueryExecution queryExecution, boolean isDispatching
try {
resourceGroupQueryLimits.get().ifPresent(queryExecution::setResourceGroupQueryLimits);
querySubmitter.accept(queryExecution);
// Mark only after successful submission. If querySubmitter throws,
// SqlQueryManager won't have the query, so we still need
// queryImmediateFailureEvent from fail() below.
sentForExecution = true;
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
}
catch (Throwable t) {
// this should never happen but be safe
Expand Down Expand Up @@ -344,18 +355,29 @@ public Session getSession()
public void fail(Throwable throwable)
{
if (stateMachine.transitionToFailed(throwable)) {
queryMonitor.queryImmediateFailureEvent(stateMachine.getBasicQueryInfo(Optional.empty()), toFailure(throwable));
// Only emit the immediate failure event if the query was never sent to
// SqlQueryManager. Post-submission, SqlQueryManager's finalQueryInfoListener
// will emit queryCompletedEvent with real execution statistics. Emitting
// queryImmediateFailureEvent here as well would produce a duplicate event
// with all-zero stats that can overwrite the real data.
if (!sentForExecution) {
queryMonitor.queryImmediateFailureEvent(stateMachine.getBasicQueryInfo(Optional.empty()), toFailure(throwable));
}
}
}

@Override
public void cancel()
{
if (stateMachine.transitionToCanceled()) {
BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty());
ExecutionFailureInfo failureInfo = queryInfo.getFailureInfo();
failureInfo = failureInfo != null ? failureInfo : toFailure(new PrestoException(USER_CANCELED, "Query was canceled"));
queryMonitor.queryImmediateFailureEvent(queryInfo, failureInfo);
// Same guard as fail(): avoid emitting zeroed-stats event when
// SqlQueryManager will handle the completion event with real stats.
if (!sentForExecution) {
BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty());
ExecutionFailureInfo failureInfo = queryInfo.getFailureInfo();
failureInfo = failureInfo != null ? failureInfo : toFailure(new PrestoException(USER_CANCELED, "Query was canceled"));
queryMonitor.queryImmediateFailureEvent(queryInfo, failureInfo);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_TASK;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED;
Expand Down Expand Up @@ -441,6 +442,108 @@ public void testQueryDispatched()
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
}

/**
* Verifies that once a query has been successfully submitted to SqlQueryManager,
* LocalDispatchQuery.fail() does NOT fire queryImmediateFailureEvent (which emits
* all-zero stats). This prevents the bug where DispatchManager's QueryTracker
* races with SqlQueryManager's QueryTracker on time limit enforcement, causing
* a zeroed-stats event to overwrite real execution statistics.
*/
@Test
public void testFailAfterDispatchDoesNotEmitImmediateFailureEvent()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();

LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(new MockQueryExecution()),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {}, // querySubmitter succeeds, so sentForExecution = true
false,
QUERY_PREREQUISITES);

// Dispatch the query — this calls querySubmitter.accept() and sets sentForExecution
query.startWaitingForResources();
assertEquals(query.getBasicQueryInfo().getState(), DISPATCHING);

// Simulate DispatchManager's QueryTracker firing a timeout on this already-dispatched query
query.fail(new PrestoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit"));

assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), EXCEEDED_TIME_LIMIT.toErrorCode());
// The key assertion: no queryImmediateFailureEvent should have been fired.
// SqlQueryManager's finalQueryInfoListener is responsible for emitting the
// completion event with real stats for dispatched queries.
assertFalse(eventListener.getQueryCompletedEvent().isPresent(),
"queryImmediateFailureEvent should not be fired after query has been sent for execution");
}

/**
* Verifies that cancellation after dispatch also does NOT fire
* queryImmediateFailureEvent, same as fail().
*/
@Test
public void testCancelAfterDispatchDoesNotEmitImmediateFailureEvent()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();

LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(new MockQueryExecution()),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
QUERY_PREREQUISITES);

query.startWaitingForResources();
assertEquals(query.getBasicQueryInfo().getState(), DISPATCHING);

query.cancel();

assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), USER_CANCELED.toErrorCode());
assertFalse(eventListener.getQueryCompletedEvent().isPresent(),
"queryImmediateFailureEvent should not be fired after query has been sent for execution");
}

/**
* Verifies that fail() before dispatch still correctly fires
* queryImmediateFailureEvent — the pre-dispatch path must remain functional.
*/
@Test
public void testFailBeforeDispatchStillEmitsImmediateFailureEvent()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();

LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(new MockQueryExecution()),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
QUERY_PREREQUISITES);

// Fail before dispatching — sentForExecution is still false
query.fail(new PrestoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum queued time"));

assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertTrue(eventListener.getQueryCompletedEvent().isPresent(),
"queryImmediateFailureEvent should be fired for pre-dispatch failures");
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), EXCEEDED_TIME_LIMIT.toErrorCode());
}

private ClusterSizeMonitor createClusterSizeMonitor(int minimumNodes)
{
return new ClusterSizeMonitor(new InMemoryNodeManager(), true, minimumNodes, minimumNodes, new Duration(10, MILLISECONDS), 1, 1, new Duration(1, SECONDS), new Duration(1, SECONDS), 0, false);
Expand Down
Loading