From 469cf86118dadd20e0d64a6dc3280d20ccefa9e3 Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Fri, 13 May 2022 17:12:56 +0530 Subject: [PATCH 1/6] Added EoS mark in populate buffer. changed log level to Fine. Minor refactor --- .../google/cloud/bigquery/ConnectionImpl.java | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index b43615141..8841eb714 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -87,7 +87,7 @@ class ConnectionImpl implements Connection { Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); private final Logger logger = Logger.getLogger(this.getClass().getName()); private BigQueryReadClient bqReadClient; - private static final long EXECUTOR_TIMEOUT_SEC = 5; + private static final long EXECUTOR_TIMEOUT_SEC = 30; ConnectionImpl( ConnectionSettings connectionSettings, @@ -121,12 +121,13 @@ class ConnectionImpl implements Connection { public synchronized boolean close() throws BigQuerySQLException { queryTaskExecutor.shutdownNow(); try { - queryTaskExecutor.awaitTermination( - EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown + if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) { + return true; + } // else queryTaskExecutor.isShutdown() will be returned outside this try block } catch (InterruptedException e) { e.printStackTrace(); logger.log( - Level.WARNING, + Level.FINE, "\n" + Thread.currentThread().getName() + " Exception while awaitTermination", e); // Logging InterruptedException instead of throwing the exception back, close method // will return queryTaskExecutor.isShutdown() @@ -418,6 +419,11 @@ void runNextPageTaskAsync( while (pageToken != null) { // paginate for non null token if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown + logger.log( + Level.FINE, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ runNextPageTaskAsync"); break; } TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken); @@ -431,10 +437,12 @@ void runNextPageTaskAsync( rpcResponseQueue.put( Tuple.of( null, - false)); // this will stop the parseDataTask as well in case of interrupt or - // when the pagination completes + false)); // this will stop the parseDataTask as well when the pagination + // completes } catch (Exception e) { throw new BigQueryException(0, e.getMessage(), e); + } finally { + queryTaskExecutor.shutdownNow(); // for graceful shutdown scenarios } }; queryTaskExecutor.execute(nextPageTask); @@ -479,8 +487,8 @@ void parseRpcDataAsync( } } catch (InterruptedException e) { logger.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted", + Level.FINE, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back } @@ -488,10 +496,12 @@ void parseRpcDataAsync( pageCache.put(Tuple.of(null, false)); // no further pages } catch (InterruptedException e) { logger.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted", + Level.FINE, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back + } finally { + queryTaskExecutor.shutdownNow(); // for graceful shutdown scenarios } }; queryTaskExecutor.execute(parseDataTask); @@ -508,7 +518,7 @@ void markEoS(BlockingQueue> buffer) { // package-privat try { buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker } catch (InterruptedException e) { - logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + logger.log(Level.FINE, "\n" + Thread.currentThread().getName() + " Interrupted", e); } } @@ -525,7 +535,7 @@ void markLast(BlockingQueue buffer) { // package-private new BigQueryResultImpl.Row( null, true)); // All the pages has been processed, put this marker } catch (InterruptedException e) { - logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + logger.log(Level.FINE, "\n" + Thread.currentThread().getName() + " Interrupted", e); } } @@ -545,7 +555,7 @@ void populateBufferAsync( fieldValueLists = nextPageTuple.x(); } catch (InterruptedException e) { logger.log( - Level.WARNING, + Level.FINE, "\n" + Thread.currentThread().getName() + " Interrupted", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back @@ -584,6 +594,8 @@ void populateBufferAsync( .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic // could trigger buffer.clear(); + markEoS(buffer); // Thread has been interrupted, communicate to ResultSet by adding + // EoS } markEoS(buffer); // All the pages has been processed, put this marker } finally { From 0c48196972f307fcf36d711cfa1b0193967eb0e5 Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Fri, 13 May 2022 17:15:24 +0530 Subject: [PATCH 2/6] Updated count assertion @ testConnectionClose --- .../test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 348749b46..6379f2aee 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -2656,8 +2656,7 @@ public void testConnectionClose() throws SQLException { assertTrue(connection.close()); // we should be able to cancel the connection } } - assertTrue( - cnt < 60000); // Few extra records are still read (generally ~10) even after canceling, as + assertTrue(cnt < 100000); // Extra records are still read even after canceling, as // the backgrounds threads are still active while the interrupt occurs and the // buffer and pageCache are cleared } From b2ad3700051189ff610595c1c2e96c35d7f9f8c3 Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Fri, 13 May 2022 22:50:00 +0530 Subject: [PATCH 3/6] Updated condition to trigger `connection.close` at testConnectionClose --- .../test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 6379f2aee..dcfa5265f 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -2652,7 +2652,7 @@ public void testConnectionClose() throws SQLException { int cnt = 0; while (rs.next()) { ++cnt; - if (cnt > 57000) { // breaking at 57K, query reads 300K + if (cnt == 57000) { // breaking at 57000th record, query reads 300K assertTrue(connection.close()); // we should be able to cancel the connection } } From 4d9ce3232c00b91d188b740bdf6d4e5384913bb2 Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Fri, 13 May 2022 22:52:59 +0530 Subject: [PATCH 4/6] Added and wired flagEndOfStream. Refactored and improved Thread interrupt logic --- .../google/cloud/bigquery/ConnectionImpl.java | 197 ++++++++++-------- 1 file changed, 110 insertions(+), 87 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index 8841eb714..ba9acb1f0 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -87,7 +87,11 @@ class ConnectionImpl implements Connection { Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); private final Logger logger = Logger.getLogger(this.getClass().getName()); private BigQueryReadClient bqReadClient; - private static final long EXECUTOR_TIMEOUT_SEC = 30; + private static final long EXECUTOR_TIMEOUT_SEC = 10; + private BlockingQueue> + bufferFvl; // initialized lazily iff we end up using the tabledata.list end point + private BlockingQueue + bufferRow; // initialized lazily iff we end up using Read API ConnectionImpl( ConnectionSettings connectionSettings, @@ -107,6 +111,19 @@ class ConnectionImpl implements Connection { : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000)); } + /** + * This method returns the number of records to be stored in the buffer and it ensures that it is + * between a reasonable range + * + * @return The max number of records to be stored in the buffer + */ + private int getBufferSize() { + return (connectionSettings == null + || connectionSettings.getNumBufferedRows() == null + || connectionSettings.getNumBufferedRows() < 10000 + ? 20000 + : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000)); + } /** * Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt. * The pageFetcher threat will not request for any subsequent threads after interrupting and @@ -119,15 +136,16 @@ class ConnectionImpl implements Connection { @BetaApi @Override public synchronized boolean close() throws BigQuerySQLException { + flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops + // advancing the cursor queryTaskExecutor.shutdownNow(); try { if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) { return true; } // else queryTaskExecutor.isShutdown() will be returned outside this try block } catch (InterruptedException e) { - e.printStackTrace(); logger.log( - Level.FINE, + Level.WARNING, "\n" + Thread.currentThread().getName() + " Exception while awaitTermination", e); // Logging InterruptedException instead of throwing the exception back, close method // will return queryTaskExecutor.isShutdown() @@ -329,7 +347,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) { BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId); // Keeps the deserialized records at the row level, which is consumed by BigQueryResult - BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize); + bufferFvl = new LinkedBlockingDeque<>(getBufferSize()); // Keeps the parsed FieldValueLists BlockingQueue, Boolean>> pageCache = @@ -351,11 +369,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) { // throughput populateBufferAsync( - rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer + rpcResponseQueue, pageCache, bufferFvl); // spawns a thread to populate the buffer // This will work for pagination as well, as buffer is getting updated asynchronously return new BigQueryResultImpl>( - schema, numRows, buffer, bigQueryResultStats); + schema, numRows, bufferFvl, bigQueryResultStats); } @VisibleForTesting @@ -383,7 +401,7 @@ BigQueryResult processQueryResponseResults( BigQueryResultStats bigQueryResultStats = new BigQueryResultStatsImpl(queryStatistics, sessionInfo); - BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize); + bufferFvl = new LinkedBlockingDeque<>(getBufferSize()); BlockingQueue, Boolean>> pageCache = new LinkedBlockingDeque<>( getPageCacheSize(connectionSettings.getNumBufferedRows(), schema)); @@ -400,10 +418,10 @@ BigQueryResult processQueryResponseResults( parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue); // Thread to populate the buffer (a blocking queue) shared with the consumer - populateBufferAsync(rpcResponseQueue, pageCache, buffer); + populateBufferAsync(rpcResponseQueue, pageCache, bufferFvl); return new BigQueryResultImpl>( - schema, numRows, buffer, bigQueryResultStats); + schema, numRows, bufferFvl, bigQueryResultStats); } @VisibleForTesting @@ -420,7 +438,7 @@ void runNextPageTaskAsync( if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown logger.log( - Level.FINE, + Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted @ runNextPageTaskAsync"); @@ -436,14 +454,12 @@ void runNextPageTaskAsync( } rpcResponseQueue.put( Tuple.of( - null, - false)); // this will stop the parseDataTask as well when the pagination - // completes + null, false)); // this will stop the parseDataTask as well when the pagination + // completes } catch (Exception e) { throw new BigQueryException(0, e.getMessage(), e); - } finally { - queryTaskExecutor.shutdownNow(); // for graceful shutdown scenarios - } + } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted }; queryTaskExecutor.execute(nextPageTask); } @@ -466,7 +482,9 @@ void parseRpcDataAsync( pageCache.put( Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received. } catch (InterruptedException e) { - throw new BigQueryException(0, e.getMessage(), e); + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync"); } // rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use @@ -476,6 +494,14 @@ void parseRpcDataAsync( try { boolean hasMorePages = true; while (hasMorePages) { + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown()) { // do not process further data and shutdown + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync"); + break; + } + // no interrupt received till this point, continue processing Tuple rpcResponse = rpcResponseQueue.take(); TableDataList tabledataList = rpcResponse.x(); hasMorePages = rpcResponse.y(); @@ -487,58 +513,25 @@ void parseRpcDataAsync( } } catch (InterruptedException e) { logger.log( - Level.FINE, + Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back } try { - pageCache.put(Tuple.of(null, false)); // no further pages + pageCache.put(Tuple.of(null, false)); // no further pages, graceful exit scenario } catch (InterruptedException e) { logger.log( - Level.FINE, + Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back - } finally { - queryTaskExecutor.shutdownNow(); // for graceful shutdown scenarios - } + } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted }; queryTaskExecutor.execute(parseDataTask); } - /** - * This method is called when the current thread is interrupted, this communicates to ResultSet by - * adding a EoS - * - * @param buffer - */ - @InternalApi - void markEoS(BlockingQueue> buffer) { // package-private - try { - buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker - } catch (InterruptedException e) { - logger.log(Level.FINE, "\n" + Thread.currentThread().getName() + " Interrupted", e); - } - } - - /** - * This method is called when the current thread is interrupted, this communicates to ResultSet by - * adding a isLast Row - * - * @param buffer - */ - @InternalApi - void markLast(BlockingQueue buffer) { // package-private - try { - buffer.put( - new BigQueryResultImpl.Row( - null, true)); // All the pages has been processed, put this marker - } catch (InterruptedException e) { - logger.log(Level.FINE, "\n" + Thread.currentThread().getName() + " Interrupted", e); - } - } - @VisibleForTesting void populateBufferAsync( BlockingQueue> rpcResponseQueue, @@ -555,29 +548,25 @@ void populateBufferAsync( fieldValueLists = nextPageTuple.x(); } catch (InterruptedException e) { logger.log( - Level.FINE, + Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS + break; } if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() || fieldValueLists == null) { // do not process further pages and shutdown (outerloop) - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS break; } for (FieldValueList fieldValueList : fieldValueLists) { try { - if (Thread.currentThread() - .isInterrupted()) { // do not process further pages and shutdown (inner loop) - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding - // EoS + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor + .isShutdown()) { // do not process further pages and shutdown (inner loop) break; } buffer.put(fieldValueList); @@ -586,26 +575,55 @@ void populateBufferAsync( } } } - try { - if (Thread.currentThread() - .isInterrupted()) { // clear the buffer for any outstanding records - rpcResponseQueue - .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic - // could trigger - buffer.clear(); - markEoS(buffer); // Thread has been interrupted, communicate to ResultSet by adding - // EoS - } - markEoS(buffer); // All the pages has been processed, put this marker + buffer.put( + new EndOfFieldValueList()); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", + e); } finally { - queryTaskExecutor.shutdownNow(); // Shutdown the thread pool + queryTaskExecutor + .shutdownNow(); // Shutdown the thread pool. All the records are now processed } }; queryTaskExecutor.execute(populateBufferRunnable); } + /** + * In an interrupt scenario, like when the background threads are still working and the user calls + * `connection.close() then we need to add an End of Stream flag in the buffer so that the + * `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync` + * method to do this as the `BlockingQueue.put()` call will error out after the interrupt is + * triggerred + */ + @InternalApi + void flagEndOfStream() { // package-private + try { + if (bufferFvl != null) { // that is tabledata.list endpoint is used + bufferFvl.put( + new EndOfFieldValueList()); // All the pages has been processed, put this marker + } else if (bufferRow != null) { + bufferRow.put( + new BigQueryResultImpl.Row( + null, true)); // All the pages has been processed, put this marker + } else { + logger.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query"); + } + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ flagEndOfStream", + e); + } + } + /* Helper method that parse and populate a page with TableRows */ private static Iterable getIterableFieldValueList( Iterable tableDataPb, final Schema schema) { @@ -793,17 +811,17 @@ BigQueryResult highThroughPutRead( ; ReadSession readSession = bqReadClient.createReadSession(builder.build()); - BlockingQueue buffer = new LinkedBlockingDeque<>(bufferSize); + bufferRow = new LinkedBlockingDeque<>(getBufferSize()); Map arrowNameToIndex = new HashMap<>(); // deserialize and populate the buffer async, so that the client isn't blocked processArrowStreamAsync( readSession, - buffer, + bufferRow, new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex), schema); logger.log(Level.INFO, "\n Using BigQuery Read API"); - return new BigQueryResultImpl(schema, totalRows, buffer, stats); + return new BigQueryResultImpl(schema, totalRows, bufferRow, stats); } catch (IOException e) { throw BigQueryException.translateAndThrow(e); @@ -837,8 +855,18 @@ private void processArrowStreamAsync( } catch (Exception e) { throw BigQueryException.translateAndThrow(e); - } finally { - markLast(buffer); // marking end of stream + } finally { // logic needed for graceful shutdown + // marking end of stream + try { + buffer.put( + new BigQueryResultImpl.Row( + null, true)); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ markLast", + e); + } queryTaskExecutor.shutdownNow(); // Shutdown the thread pool } }; @@ -900,7 +928,6 @@ private void processRows( if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process and shutdown - markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process break; // exit the loop, root will be cleared in the finally block } @@ -991,9 +1018,6 @@ boolean isFastQuerySupported() { @VisibleForTesting boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) { - - // TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job - // is not complete if ((totalRows == null || pageRows == null) && Boolean.TRUE.equals( connectionSettings @@ -1002,7 +1026,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer return true; } - // Schema schema = Schema.fromPb(tableSchema); // Read API does not yet support Interval Type or QueryParameters if (containsIntervalType(schema) || hasQueryParameters) { logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI"); From 51d36bbb6d9289940bd3e6e4d0f51bfbb81d3d34 Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Sat, 14 May 2022 10:10:35 +0530 Subject: [PATCH 5/6] Add testConnectionClose for checking connection close while using Read API --- .../bigquery/it/ITNightlyBigQueryTest.java | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java index d672967b1..deabba59f 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java @@ -332,7 +332,31 @@ public void testIterateAndOrderDefaultConnSettings() throws SQLException { ++cnt; } assertEquals(LIMIT_RECS, cnt); // all the records were retrieved - connection.close(); + assertTrue(connection.close()); + } + + /* + This tests interrupts the execution in between and checks if it has been interrupted successfully while using ReadAPI + */ + @Test + public void testConnectionClose() throws SQLException { + Connection connection = bigquery.createConnection(); + BigQueryResult bigQueryResult = connection.executeSelect(QUERY); + logger.log(Level.INFO, "Query used: {0}", QUERY); + ResultSet rs = bigQueryResult.getResultSet(); + int cnt = 0; + while (rs.next()) { + ++cnt; + if (cnt == 50000) { // interrupt at 50K + assertTrue(connection.close()); + } + } + assertTrue( + LIMIT_RECS + > cnt); // we stopped at 50K but still we can expect additional records (typically ~100) + // to be retrieved + // as a number of records should have been already buffered. less than + // LIMIT_RECS should be retrieved } @Test From 1396c15a7242db9d0978174637c3a3d86db9ae42 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 16 May 2022 15:34:24 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7afce7e97..621bcf84a 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies ```Groovy -implementation platform('com.google.cloud:libraries-bom:25.2.0') +implementation platform('com.google.cloud:libraries-bom:25.3.0') implementation 'com.google.cloud:google-cloud-bigquery' ```