From 22f4b483f7a76472136569e57e93cd78ff05a573 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Thu, 14 Aug 2025 23:21:39 +0530 Subject: [PATCH 1/2] Remove redundant requireNonNull --- .../src/main/java/io/trino/client/direct/DirectTrinoClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java b/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java index ba6ca20edeba..119aa1166a7f 100644 --- a/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java +++ b/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java @@ -72,7 +72,7 @@ public DirectTrinoClient( this.queryManager = requireNonNull(queryManager, "queryManager is null"); this.directExchangeClientSupplier = requireNonNull(directExchangeClientSupplier, "directExchangeClientSupplier is null"); this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); - this.heartBeatIntervalMillis = requireNonNull(queryManagerConfig, "queryManagerConfig is null").getClientTimeout().toMillis() / 2; + this.heartBeatIntervalMillis = queryManagerConfig.getClientTimeout().toMillis() / 2; } public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") String sql, QueryResultsListener queryResultsListener) From 7f9138f7b0db7633159e932beda73a5dbfd4d548 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Thu, 14 Aug 2025 23:21:58 +0530 Subject: [PATCH 2/2] Extract heartbeat handling into a separate method for clarity --- .../client/direct/DirectTrinoClient.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java b/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java index 119aa1166a7f..85804519461e 100644 --- a/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java +++ b/core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java @@ -117,23 +117,7 @@ public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") Str ListenableFuture anyCompleteFuture = whenAnyComplete(ImmutableList.of( queryManager.getStateChange(queryId, state), exchangeClient.isBlocked())); - while (!anyCompleteFuture.isDone()) { - try { - anyCompleteFuture.get(heartBeatIntervalMillis, TimeUnit.MILLISECONDS); - } - catch (TimeoutException e) { - // continue waiting until the query state changes or the exchange client is blocked. - // we need to periodically record the heartbeat to prevent the query from being canceled - dispatchQuery.recordHeartbeat(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Thread interrupted", e); - } - catch (ExecutionException e) { - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error processing query", e.getCause()); - } - } + getQueryFutureWithHeartbeats(anyCompleteFuture, dispatchQuery); } } @@ -157,6 +141,27 @@ private DirectExchangeClient createExchangeClient(DispatchQuery dispatchQuery) getRetryPolicy(dispatchQuery.getSession())); } + private void getQueryFutureWithHeartbeats(ListenableFuture anyCompleteFuture, DispatchQuery dispatchQuery) + { + while (!anyCompleteFuture.isDone()) { + try { + anyCompleteFuture.get(heartBeatIntervalMillis, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + // continue waiting until the query state changes or the exchange client is blocked. + // we need to periodically record the heartbeat to prevent the query from being canceled + dispatchQuery.recordHeartbeat(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Thread interrupted", e); + } + catch (ExecutionException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error processing query", e.getCause()); + } + } + } + private static void getQueryFuture(ListenableFuture future) { try {