From 993e6ba953a320c7f096663cdeed7506d5b0c403 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Sat, 17 Sep 2022 21:01:12 +0200 Subject: [PATCH 1/2] Mark internal method as private and give better name --- .../src/main/java/io/trino/jdbc/TrinoResultSet.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java index d36ad5194040..20d0be7524f3 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java @@ -183,7 +183,7 @@ public AsyncIterator(Iterator dataIterator, StatementClient client) } } catch (InterruptedException e) { - interrupt(e); + handleInterrupt(e); } finally { semaphore.release(); @@ -199,7 +199,7 @@ public void cancel() cleanup(); } - public void interrupt(InterruptedException e) + private void handleInterrupt(InterruptedException e) { cleanup(); Thread.currentThread().interrupt(); @@ -234,7 +234,7 @@ protected T computeNext() semaphore.acquire(); } catch (InterruptedException e) { - interrupt(e); + handleInterrupt(e); } if (rowQueue.isEmpty()) { // If we got here and the queue is empty the thread fetching from the underlying iterator is done. @@ -243,7 +243,7 @@ protected T computeNext() future.get(); } catch (InterruptedException e) { - interrupt(e); + handleInterrupt(e); } catch (ExecutionException e) { throwIfUnchecked(e.getCause()); From 3e01a3e51686dd472af59b180fdf91f49fb5979c Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 19 Sep 2022 12:29:48 +0200 Subject: [PATCH 2/2] Improve handling of interrupts in TrinoResultSet Differentiate between interrupts happening in the calling thread and interrupts of the background thread. - use different message - the interrupts happening in the calling thread should result in full cancellation of the background thread. Previously they would close client, but wouldn't set `cancelled` flag. --- .../java/io/trino/jdbc/TrinoResultSet.java | 23 ++++++++----------- .../java/io/trino/jdbc/TestTrinoDriver.java | 2 +- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java index 20d0be7524f3..8611b6733653 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java @@ -183,7 +183,9 @@ public AsyncIterator(Iterator dataIterator, StatementClient client) } } catch (InterruptedException e) { - handleInterrupt(e); + client.close(); + rowQueue.clear(); + throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e)); } finally { semaphore.release(); @@ -196,18 +198,6 @@ public void cancel() { cancelled = true; future.cancel(true); - cleanup(); - } - - private void handleInterrupt(InterruptedException e) - { - cleanup(); - Thread.currentThread().interrupt(); - throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e)); - } - - private void cleanup() - { // When thread interruption is mis-handled by underlying implementation of `client`, the thread which // is working for `future` may be blocked by `rowQueue.put` (`rowQueue` is full) and will never finish // its work. It is necessary to close `client` and drain `rowQueue` to avoid such leaks. @@ -253,6 +243,13 @@ protected T computeNext() } return rowQueue.poll(); } + + private void handleInterrupt(InterruptedException e) + { + cancel(); + Thread.currentThread().interrupt(); + throw new RuntimeException(new SQLException("Interrupted", e)); + } } private static class ResultsPageIterator diff --git a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java index 0d4b08abd50e..ed4ccac49707 100644 --- a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java +++ b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDriver.java @@ -906,7 +906,7 @@ public void testQueryCancelByInterrupt() assertTrue(queryFinished.await(10, SECONDS)); assertThat(queryFailure.get()) .isInstanceOf(SQLException.class) - .hasMessage("ResultSet thread was interrupted"); + .hasMessage("Interrupted"); assertEquals(getQueryState(queryId.get()), FAILED); }