Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.jdbc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -29,9 +30,9 @@
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.stream.Stream;
Expand Down Expand Up @@ -144,27 +145,39 @@ private static <T> Iterator<T> flatten(Iterator<Iterable<T>> iterator, long maxR
return stream.iterator();
}

private static class AsyncIterator<T>
@VisibleForTesting
static class AsyncIterator<T>
extends AbstractIterator<T>
{
private static final int MAX_QUEUED_ROWS = 50_000;
private static final ExecutorService executorService = newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("Trino JDBC worker-%s").setDaemon(true).build());

private final StatementClient client;
private final BlockingQueue<T> rowQueue = new ArrayBlockingQueue<>(MAX_QUEUED_ROWS);
private final BlockingQueue<T> rowQueue;
// Semaphore to indicate that some data is ready.
// Each permit represents a row of data (or that the underlying iterator is exhausted).
private final Semaphore semaphore = new Semaphore(0);
private final CompletableFuture<Void> future;
private final Future<?> future;
private volatile boolean cancelled;
private volatile boolean finished;
Comment thread
xiacongling marked this conversation as resolved.
Outdated
Comment thread
findepi marked this conversation as resolved.
Outdated

public AsyncIterator(Iterator<T> dataIterator, StatementClient client)
{
this(dataIterator, client, Optional.empty());
}

@VisibleForTesting
AsyncIterator(Iterator<T> dataIterator, StatementClient client, Optional<BlockingQueue<T>> queue)
Comment thread
xiacongling marked this conversation as resolved.
Outdated
{
requireNonNull(dataIterator, "dataIterator is null");
this.client = client;
this.future = CompletableFuture.runAsync(() -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.
Now I wonder why we have used a CompletableFuture in the first place.

this.rowQueue = queue.orElseGet(() -> new ArrayBlockingQueue<>(MAX_QUEUED_ROWS));
this.cancelled = false;
this.finished = false;
this.future = executorService.submit(() -> {
Comment thread
xiacongling marked this conversation as resolved.
Outdated
try {
while (dataIterator.hasNext()) {
while (!cancelled && dataIterator.hasNext()) {
rowQueue.put(dataIterator.next());
semaphore.release();
}
Expand All @@ -174,22 +187,46 @@ public AsyncIterator(Iterator<T> dataIterator, StatementClient client)
}
finally {
semaphore.release();
finished = true;
}
}, executorService);
});
}

public void cancel()
{
cancelled = true;
Comment thread
findepi marked this conversation as resolved.
Outdated
future.cancel(true);
cleanup();
}

public void interrupt(InterruptedException e)
{
client.close();
cleanup();
Thread.currentThread().interrupt();
throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
}

private void cleanup()
{
// When thread interruption is mis-handled by underlying implementation of `client`, the thread which
// is working for `future` may be blocked by `rowQueue.put` (`rowQueue` is full) and will never finish
Comment thread
findepi marked this conversation as resolved.
Outdated
// its work. It is necessary to close `client` and drain `rowQueue` to avoid such leaks.
client.close();
rowQueue.clear();
Comment thread
xiacongling marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the order of these actions (client.close, draining) matter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter for most cases. I am just worried that client may fill rowQueue after it has been drained.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it may matter. Let's document the reasoning as a code comment

}

@VisibleForTesting
Future<?> getFuture()
{
return future;
}

@VisibleForTesting
boolean isBackgroundThreadFinished()
{
return finished;
}

@Override
protected T computeNext()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import static java.lang.String.format;

/**
* An integration test for JDBC client interacting with Trino server.
*/
public class TestJdbcResultSet
extends BaseTestJdbcResultSet
{
Expand Down
272 changes: 272 additions & 0 deletions client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoResultSet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.jdbc;

import com.google.common.collect.ImmutableList;
import io.trino.client.ClientSelectedRole;
import io.trino.client.QueryData;
import io.trino.client.QueryStatusInfo;
import io.trino.client.StatementClient;
import io.trino.client.StatementStats;
import org.testng.annotations.Test;

import java.time.ZoneId;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.testng.Assert.assertTrue;

/**
* A unit test for {@link TrinoResultSet}.
*
* @see TestJdbcResultSet for an integration test.
*/
public class TestTrinoResultSet
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have TestJdbcResultSet to test the ResultSet implementation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I notice that. But TestJdbcResultSet is a integration test with test trino server set up. The test cases here are all unit tests. Is it better to keep them separate?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a javadoc explaining what it is and also @see tag -- in both these tests.

like

/**
 * A unit test for {@link TrinoResultSet}.
 * 
 * @see TestJdbcResultSet an integration test.
 */

{
@Test(timeOut = 10000)
public void testIteratorCancelWhenQueueNotFull()
throws Exception
{
AtomicReference<Thread> thread = new AtomicReference<>();
CountDownLatch interruptedButSwallowedLatch = new CountDownLatch(1);
MockAsyncIterator<Iterable<List<Object>>> iterator = new MockAsyncIterator<>(
new Iterator<Iterable<List<Object>>>()
{
@Override
public boolean hasNext()
{
return true;
}

@Override
public Iterable<List<Object>> next()
{
thread.compareAndSet(null, Thread.currentThread());
try {
TimeUnit.MILLISECONDS.sleep(1000);
}
catch (InterruptedException e) {
interruptedButSwallowedLatch.countDown();
}
return ImmutableList.of((ImmutableList.of(new Object())));
}
},
new ArrayBlockingQueue<>(100));

while (thread.get() == null || thread.get().getState() != Thread.State.TIMED_WAITING) {
// wait for thread being waiting
}
iterator.cancel();
while (!iterator.getFuture().isDone() || !iterator.isBackgroundThreadFinished()) {
TimeUnit.MILLISECONDS.sleep(10);
}
boolean interruptedButSwallowed = interruptedButSwallowedLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue(interruptedButSwallowed);
}

@Test(timeOut = 10000)
public void testIteratorCancelWhenQueueIsFull()
throws Exception
{
BlockingQueue<Iterable<List<Object>>> queue = new ArrayBlockingQueue<>(1);
queue.put(ImmutableList.of());
// queue is full at the beginning
AtomicReference<Thread> thread = new AtomicReference<>();
MockAsyncIterator<Iterable<List<Object>>> iterator = new MockAsyncIterator<>(
new Iterator<Iterable<List<Object>>>()
{
@Override
public boolean hasNext()
{
return true;
}

@Override
public Iterable<List<Object>> next()
{
thread.compareAndSet(null, Thread.currentThread());
return ImmutableList.of((ImmutableList.of(new Object())));
}
},
queue);

while (thread.get() == null || thread.get().getState() != Thread.State.WAITING) {
// wait for thread being waiting (for queue being not full)
TimeUnit.MILLISECONDS.sleep(10);
}
iterator.cancel();
while (!iterator.isBackgroundThreadFinished()) {
TimeUnit.MILLISECONDS.sleep(10);
}
}

private static class MockAsyncIterator<T>
extends TrinoResultSet.AsyncIterator<T>
{
public MockAsyncIterator(Iterator<T> dataIterator, BlockingQueue<T> queue)
{
super(
dataIterator,
new StatementClient()
{
@Override
public String getQuery()
{
throw new UnsupportedOperationException();
}

@Override
public ZoneId getTimeZone()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isRunning()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isClientAborted()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isClientError()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isFinished()
{
throw new UnsupportedOperationException();
}

@Override
public StatementStats getStats()
{
throw new UnsupportedOperationException();
}

@Override
public QueryStatusInfo currentStatusInfo()
{
throw new UnsupportedOperationException();
}

@Override
public QueryData currentData()
{
throw new UnsupportedOperationException();
}

@Override
public QueryStatusInfo finalStatusInfo()
{
throw new UnsupportedOperationException();
}

@Override
public Optional<String> getSetCatalog()
{
throw new UnsupportedOperationException();
}

@Override
public Optional<String> getSetSchema()
{
throw new UnsupportedOperationException();
}

@Override
public Optional<String> getSetPath()
{
throw new UnsupportedOperationException();
}

@Override
public Map<String, String> getSetSessionProperties()
{
throw new UnsupportedOperationException();
}

@Override
public Set<String> getResetSessionProperties()
{
throw new UnsupportedOperationException();
}

@Override
public Map<String, ClientSelectedRole> getSetRoles()
{
throw new UnsupportedOperationException();
}

@Override
public Map<String, String> getAddedPreparedStatements()
{
throw new UnsupportedOperationException();
}

@Override
public Set<String> getDeallocatedPreparedStatements()
{
throw new UnsupportedOperationException();
}

@Override
public String getStartedTransactionId()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isClearTransactionId()
{
throw new UnsupportedOperationException();
}

@Override
public boolean advance()
{
throw new UnsupportedOperationException();
}

@Override
public void cancelLeafStage()
{
throw new UnsupportedOperationException();
}

@Override
public void close()
{
// do nothing
}
},
Optional.of(queue));
}
}
}