diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java index d36ad5194040..8611b6733653 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java @@ -183,7 +183,9 @@ public AsyncIterator(Iterator dataIterator, StatementClient client) } } catch (InterruptedException e) { - interrupt(e); + client.close(); + rowQueue.clear(); + throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e)); } finally { semaphore.release(); @@ -196,18 +198,6 @@ public void cancel() { cancelled = true; future.cancel(true); - cleanup(); - } - - public void interrupt(InterruptedException e) - { - cleanup(); - Thread.currentThread().interrupt(); - throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e)); - } - - private void cleanup() - { // When thread interruption is mis-handled by underlying implementation of `client`, the thread which // is working for `future` may be blocked by `rowQueue.put` (`rowQueue` is full) and will never finish // its work. It is necessary to close `client` and drain `rowQueue` to avoid such leaks. @@ -234,7 +224,7 @@ protected T computeNext() semaphore.acquire(); } catch (InterruptedException e) { - interrupt(e); + handleInterrupt(e); } if (rowQueue.isEmpty()) { // If we got here and the queue is empty the thread fetching from the underlying iterator is done. @@ -243,7 +233,7 @@ protected T computeNext() future.get(); } catch (InterruptedException e) { - interrupt(e); + handleInterrupt(e); } catch (ExecutionException e) { throwIfUnchecked(e.getCause()); @@ -253,6 +243,13 @@ protected T computeNext() } return rowQueue.poll(); } + + private void handleInterrupt(InterruptedException e) + { + cancel(); + Thread.currentThread().interrupt(); + throw new RuntimeException(new SQLException("Interrupted", e)); + } } private static class ResultsPageIterator diff --git a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java index 0d4b08abd50e..ed4ccac49707 100644 --- a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java +++ b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java @@ -906,7 +906,7 @@ public void testQueryCancelByInterrupt() assertTrue(queryFinished.await(10, SECONDS)); assertThat(queryFailure.get()) .isInstanceOf(SQLException.class) - .hasMessage("ResultSet thread was interrupted"); + .hasMessage("Interrupted"); assertEquals(getQueryState(queryId.get()), FAILED); }