Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Flaky connenction close issue #2044

Merged
merged 6 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies

```Groovy
implementation platform('com.google.cloud:libraries-bom:25.2.0')
implementation platform('com.google.cloud:libraries-bom:25.3.0')

implementation 'com.google.cloud:google-cloud-bigquery'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ class ConnectionImpl implements Connection {
Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT);
private final Logger logger = Logger.getLogger(this.getClass().getName());
private BigQueryReadClient bqReadClient;
private static final long EXECUTOR_TIMEOUT_SEC = 5;
private static final long EXECUTOR_TIMEOUT_SEC = 10;
private BlockingQueue<AbstractList<FieldValue>>
bufferFvl; // initialized lazily iff we end up using the tabledata.list end point
private BlockingQueue<BigQueryResultImpl.Row>
bufferRow; // initialized lazily iff we end up using Read API

ConnectionImpl(
ConnectionSettings connectionSettings,
Expand All @@ -107,6 +111,19 @@ class ConnectionImpl implements Connection {
: Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
}

/**
* This method returns the number of records to be stored in the buffer and it ensures that it is
* between a reasonable range
*
* @return The max number of records to be stored in the buffer
*/
private int getBufferSize() {
return (connectionSettings == null
|| connectionSettings.getNumBufferedRows() == null
|| connectionSettings.getNumBufferedRows() < 10000
? 20000
: Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
}
/**
* Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt.
* The pageFetcher threat will not request for any subsequent threads after interrupting and
Expand All @@ -119,12 +136,14 @@ class ConnectionImpl implements Connection {
@BetaApi
@Override
public synchronized boolean close() throws BigQuerySQLException {
flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops
// advancing the cursor
queryTaskExecutor.shutdownNow();
try {
queryTaskExecutor.awaitTermination(
EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) {
return true;
} // else queryTaskExecutor.isShutdown() will be returned outside this try block
} catch (InterruptedException e) {
e.printStackTrace();
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Exception while awaitTermination",
Expand Down Expand Up @@ -328,7 +347,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId);

// Keeps the deserialized records at the row level, which is consumed by BigQueryResult
BlockingQueue<AbstractList<FieldValue>> buffer = new LinkedBlockingDeque<>(bufferSize);
bufferFvl = new LinkedBlockingDeque<>(getBufferSize());

// Keeps the parsed FieldValueLists
BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
Expand All @@ -350,11 +369,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
// throughput

populateBufferAsync(
rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer
rpcResponseQueue, pageCache, bufferFvl); // spawns a thread to populate the buffer

// This will work for pagination as well, as buffer is getting updated asynchronously
return new BigQueryResultImpl<AbstractList<FieldValue>>(
schema, numRows, buffer, bigQueryResultStats);
schema, numRows, bufferFvl, bigQueryResultStats);
}

@VisibleForTesting
Expand Down Expand Up @@ -382,7 +401,7 @@ BigQueryResult processQueryResponseResults(
BigQueryResultStats bigQueryResultStats =
new BigQueryResultStatsImpl(queryStatistics, sessionInfo);

BlockingQueue<AbstractList<FieldValue>> buffer = new LinkedBlockingDeque<>(bufferSize);
bufferFvl = new LinkedBlockingDeque<>(getBufferSize());
BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(
getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
Expand All @@ -399,10 +418,10 @@ BigQueryResult processQueryResponseResults(
parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue);

// Thread to populate the buffer (a blocking queue) shared with the consumer
populateBufferAsync(rpcResponseQueue, pageCache, buffer);
populateBufferAsync(rpcResponseQueue, pageCache, bufferFvl);

return new BigQueryResultImpl<AbstractList<FieldValue>>(
schema, numRows, buffer, bigQueryResultStats);
schema, numRows, bufferFvl, bigQueryResultStats);
}

@VisibleForTesting
Expand All @@ -418,6 +437,11 @@ void runNextPageTaskAsync(
while (pageToken != null) { // paginate for non null token
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown
logger.log(
Level.WARNING,
"\n"
+ Thread.currentThread().getName()
+ " Interrupted @ runNextPageTaskAsync");
break;
}
TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken);
Expand All @@ -430,12 +454,12 @@ void runNextPageTaskAsync(
}
rpcResponseQueue.put(
Tuple.of(
null,
false)); // this will stop the parseDataTask as well in case of interrupt or
// when the pagination completes
null, false)); // this will stop the parseDataTask as well when the pagination
// completes
} catch (Exception e) {
throw new BigQueryException(0, e.getMessage(), e);
}
} // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
// have finished processing the records and even that will be interrupted
};
queryTaskExecutor.execute(nextPageTask);
}
Expand All @@ -458,7 +482,9 @@ void parseRpcDataAsync(
pageCache.put(
Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received.
} catch (InterruptedException e) {
throw new BigQueryException(0, e.getMessage(), e);
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
}

// rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use
Expand All @@ -468,6 +494,14 @@ void parseRpcDataAsync(
try {
boolean hasMorePages = true;
while (hasMorePages) {
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process further data and shutdown
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
break;
}
// no interrupt received till this point, continue processing
Tuple<TableDataList, Boolean> rpcResponse = rpcResponseQueue.take();
TableDataList tabledataList = rpcResponse.x();
hasMorePages = rpcResponse.y();
Expand All @@ -480,55 +514,24 @@ void parseRpcDataAsync(
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted",
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
}
try {
pageCache.put(Tuple.of(null, false)); // no further pages
pageCache.put(Tuple.of(null, false)); // no further pages, graceful exit scenario
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted",
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
}
} // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
// have finished processing the records and even that will be interrupted
};
queryTaskExecutor.execute(parseDataTask);
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a EoS
*
* @param buffer
*/
@InternalApi
void markEoS(BlockingQueue<AbstractList<FieldValue>> buffer) { // package-private
try {
buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a isLast Row
*
* @param buffer
*/
@InternalApi
void markLast(BlockingQueue<BigQueryResultImpl.Row> buffer) { // package-private
try {
buffer.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

@VisibleForTesting
void populateBufferAsync(
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue,
Expand All @@ -549,25 +552,21 @@ void populateBufferAsync(
"\n" + Thread.currentThread().getName() + " Interrupted",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
break;
}

if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()
|| fieldValueLists
== null) { // do not process further pages and shutdown (outerloop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
break;
}

for (FieldValueList fieldValueList : fieldValueLists) {
try {
if (Thread.currentThread()
.isInterrupted()) { // do not process further pages and shutdown (inner loop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding
// EoS
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor
.isShutdown()) { // do not process further pages and shutdown (inner loop)
break;
}
buffer.put(fieldValueList);
Expand All @@ -576,24 +575,55 @@ void populateBufferAsync(
}
}
}

try {
if (Thread.currentThread()
.isInterrupted()) { // clear the buffer for any outstanding records
rpcResponseQueue
.clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
// could trigger
buffer.clear();
}
markEoS(buffer); // All the pages has been processed, put this marker
buffer.put(
new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",
e);
} finally {
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
queryTaskExecutor
.shutdownNow(); // Shutdown the thread pool. All the records are now processed
}
};

queryTaskExecutor.execute(populateBufferRunnable);
}

/**
* In an interrupt scenario, like when the background threads are still working and the user calls
* `connection.close() then we need to add an End of Stream flag in the buffer so that the
* `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync`
* method to do this as the `BlockingQueue.put()` call will error out after the interrupt is
* triggerred
*/
@InternalApi
void flagEndOfStream() { // package-private
try {
if (bufferFvl != null) { // that is tabledata.list endpoint is used
bufferFvl.put(
new EndOfFieldValueList()); // All the pages has been processed, put this marker
} else if (bufferRow != null) {
bufferRow.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} else {
logger.log(
Level.WARNING,
"\n"
+ Thread.currentThread().getName()
+ " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query");
}
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ flagEndOfStream",
e);
}
}

/* Helper method that parse and populate a page with TableRows */
private static Iterable<FieldValueList> getIterableFieldValueList(
Iterable<TableRow> tableDataPb, final Schema schema) {
Expand Down Expand Up @@ -781,17 +811,17 @@ BigQueryResult highThroughPutRead(
;

ReadSession readSession = bqReadClient.createReadSession(builder.build());
BlockingQueue<BigQueryResultImpl.Row> buffer = new LinkedBlockingDeque<>(bufferSize);
bufferRow = new LinkedBlockingDeque<>(getBufferSize());
Map<String, Integer> arrowNameToIndex = new HashMap<>();
// deserialize and populate the buffer async, so that the client isn't blocked
processArrowStreamAsync(
readSession,
buffer,
bufferRow,
new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex),
schema);

logger.log(Level.INFO, "\n Using BigQuery Read API");
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, buffer, stats);
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, bufferRow, stats);

} catch (IOException e) {
throw BigQueryException.translateAndThrow(e);
Expand Down Expand Up @@ -825,8 +855,18 @@ private void processArrowStreamAsync(

} catch (Exception e) {
throw BigQueryException.translateAndThrow(e);
} finally {
markLast(buffer); // marking end of stream
} finally { // logic needed for graceful shutdown
// marking end of stream
try {
buffer.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ markLast",
e);
}
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
};
Expand Down Expand Up @@ -888,7 +928,6 @@ private void processRows(

if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process and shutdown
markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process
break; // exit the loop, root will be cleared in the finally block
}

Expand Down Expand Up @@ -979,9 +1018,6 @@ boolean isFastQuerySupported() {

@VisibleForTesting
boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) {

// TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job
// is not complete
if ((totalRows == null || pageRows == null)
&& Boolean.TRUE.equals(
connectionSettings
Expand All @@ -990,7 +1026,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer
return true;
}

// Schema schema = Schema.fromPb(tableSchema);
// Read API does not yet support Interval Type or QueryParameters
if (containsIntervalType(schema) || hasQueryParameters) {
logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI");
Expand Down
Loading