Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public DirectTrinoClient(
this.queryManager = requireNonNull(queryManager, "queryManager is null");
this.directExchangeClientSupplier = requireNonNull(directExchangeClientSupplier, "directExchangeClientSupplier is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.heartBeatIntervalMillis = requireNonNull(queryManagerConfig, "queryManagerConfig is null").getClientTimeout().toMillis() / 2;
this.heartBeatIntervalMillis = queryManagerConfig.getClientTimeout().toMillis() / 2;
}

public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") String sql, QueryResultsListener queryResultsListener)
Expand Down Expand Up @@ -117,23 +117,7 @@ public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") Str
ListenableFuture<Object> anyCompleteFuture = whenAnyComplete(ImmutableList.of(
queryManager.getStateChange(queryId, state),
exchangeClient.isBlocked()));
while (!anyCompleteFuture.isDone()) {
try {
anyCompleteFuture.get(heartBeatIntervalMillis, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
// continue waiting until the query state changes or the exchange client is blocked.
// we need to periodically record the heartbeat to prevent the query from being canceled
dispatchQuery.recordHeartbeat();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Thread interrupted", e);
}
catch (ExecutionException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error processing query", e.getCause());
}
}
getQueryFutureWithHeartbeats(anyCompleteFuture, dispatchQuery);
}
}

Expand All @@ -157,6 +141,27 @@ private DirectExchangeClient createExchangeClient(DispatchQuery dispatchQuery)
getRetryPolicy(dispatchQuery.getSession()));
}

private void getQueryFutureWithHeartbeats(ListenableFuture<Object> anyCompleteFuture, DispatchQuery dispatchQuery)
{
while (!anyCompleteFuture.isDone()) {
try {
anyCompleteFuture.get(heartBeatIntervalMillis, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
// continue waiting until the query state changes or the exchange client is blocked.
// we need to periodically record the heartbeat to prevent the query from being canceled
dispatchQuery.recordHeartbeat();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Thread interrupted", e);
}
catch (ExecutionException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error processing query", e.getCause());
}
}
}

private static <T> void getQueryFuture(ListenableFuture<T> future)
{
try {
Expand Down