Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e3c6749
fix: retry ExceptionHandler not retrying on IOException due to except…
PhongChuong Feb 10, 2025
6fd24b6
fix: retry ExceptionHandler not retrying on IOException due to except…
PhongChuong Feb 10, 2025
b364224
fix: retry ExceptionHandler not retrying on IOException due to except…
PhongChuong Feb 10, 2025
ac66738
fix: retry ExceptionHandler not retrying on IOException due to except…
PhongChuong Feb 10, 2025
93cd9e3
fix: retry ExceptionHandler not retrying on IOException due to except…
PhongChuong Feb 10, 2025
eaf03dc
cast to HttpBigQueryRpc
PhongChuong Feb 14, 2025
48218b3
cast to HttpBigQueryRpc
PhongChuong Feb 14, 2025
a999f06
fix BigQueryRpc mocks in unit tests
PhongChuong Feb 25, 2025
c5c55ea
fix clirr check and format
PhongChuong Feb 25, 2025
2f6fe5a
fix clirr
PhongChuong Feb 25, 2025
9f2bc64
fix clirr
PhongChuong Feb 25, 2025
28505ad
refactor HttpBigQueryRpc.write
PhongChuong Feb 26, 2025
372cffb
refactor HttpBigQueryRpc.write
PhongChuong Feb 26, 2025
a634f64
refactor ConnectionImpl HttpBigQueryRpc call
PhongChuong Feb 26, 2025
0d0a961
refactor ConnectionImpl HttpBigQueryRpc call
PhongChuong Feb 26, 2025
c1c61ef
refactor create table
PhongChuong Feb 26, 2025
42964fe
refactor create table
PhongChuong Feb 26, 2025
3320628
refactor BigQueryImpl
PhongChuong Feb 26, 2025
d8e944d
refactor ConnectionImpl
PhongChuong Feb 26, 2025
e11525b
add missing unit test for BigQueryImpl deleteJob
PhongChuong Feb 26, 2025
f5f28fc
add tests to validate retry on BIGQUERY_EXCEPTION_HANDLER
PhongChuong Feb 26, 2025
04d443e
handle exception wrapping/unwrapping with new methods
PhongChuong Mar 5, 2025
bf2c4a2
update runWithRetries to use BigQuery ruWithRetries to correctly hand…
PhongChuong Mar 5, 2025
c46deaa
fix unit test
PhongChuong Mar 6, 2025
fec352d
add unit tests for retry in TableDataWriteChannel
PhongChuong Mar 7, 2025
d05ad30
remove unnecessary null check
PhongChuong Mar 11, 2025
88c29a8
fix unit test where null is returned in mocks causing errors
PhongChuong Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions google-cloud-bigquery/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
<method>com.google.api.services.bigquery.model.GetQueryResultsResponse getQueryResultsWithRowLimit(java.lang.String, java.lang.String, java.lang.String, java.lang.Integer)</method>
<justification>getQueryResultsWithRowLimit is just used by ConnectionImpl at the moment so it should be fine to update the signature instead of writing an overloaded method</justification>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigquery/BigQueryOptions*</className>
<method>*getBigQueryRpcV2(*)</method>
<to>com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc</to>
<justification>getBigQueryRpcV2 is protected and is only used within the BigQuery package</justification>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/ExternalTableDefinition*</className>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
import com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc;
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -132,8 +131,8 @@ protected Set<String> getScopes() {
return SCOPES;
}

protected BigQueryRpc getBigQueryRpcV2() {
return (BigQueryRpc) getRpc();
protected HttpBigQueryRpc getBigQueryRpcV2() {
return (HttpBigQueryRpc) getRpc();
}

public String getLocation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.RetryHelper;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
Expand Down Expand Up @@ -52,6 +53,11 @@ public static <V> V runWithRetries(
algorithm,
bigQueryRetryConfig);
} catch (Exception e) {
// Checks for IOException and translate it into BigQueryException. The BigQueryException
// constructor parses the IOException and translate it into internal code.
if (e.getCause() instanceof IOException) {
throw new BigQueryRetryHelperException(new BigQueryException((IOException) e.getCause()));
}
throw new BigQueryRetryHelperException(e.getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.bigquery;

import static com.google.cloud.RetryHelper.runWithRetries;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.google.api.core.BetaApi;
Expand All @@ -28,8 +27,8 @@
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.RetryHelper;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQueryRetryHelper.BigQueryRetryHelperException;
import com.google.cloud.bigquery.JobStatistics.QueryStatistics;
import com.google.cloud.bigquery.JobStatistics.SessionInfo;
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
Expand Down Expand Up @@ -102,6 +101,8 @@ class ConnectionImpl implements Connection {
bufferFvl; // initialized lazily iff we end up using the tabledata.list end point
private BlockingQueue<BigQueryResultImpl.Row>
bufferRow; // initialized lazily iff we end up using Read API
private static final BigQueryRetryConfig EMPTY_RETRY_CONFIG =
BigQueryRetryConfig.newBuilder().build();

ConnectionImpl(
ConnectionSettings connectionSettings,
Expand Down Expand Up @@ -466,12 +467,15 @@ private BigQueryResult queryRpc(
try {
results =
BigQueryRetryHelper.runWithRetries(
() -> bigQueryRpc.queryRpc(projectId, queryRequest),
() ->
bigQueryOptions
.getBigQueryRpcV2()
.queryRpcSkipExceptionTranslation(projectId, queryRequest),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock(),
retryConfig);
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
} catch (BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}

Expand Down Expand Up @@ -914,21 +918,30 @@ private Job getQueryJobRpc(JobId jobId) {
com.google.api.services.bigquery.model.Job jobPb;
try {
jobPb =
runWithRetries(
BigQueryRetryHelper.runWithRetries(
() ->
bigQueryRpc.getQueryJob(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation()),
bigQueryOptions
.getBigQueryRpcV2()
.getQueryJobSkipExceptionTranslation(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation()),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock());
if (bigQueryOptions.getThrowNotFound() && jobPb == null) {
throw new BigQueryException(HTTP_NOT_FOUND, "Query job not found");
bigQueryOptions.getClock(),
EMPTY_RETRY_CONFIG);
} catch (BigQueryRetryHelperException e) {
if (e.getCause() instanceof BigQueryException) {
if (((BigQueryException) e.getCause()).getCode() == HTTP_NOT_FOUND) {
if (bigQueryOptions.getThrowNotFound()) {
throw new BigQueryException(HTTP_NOT_FOUND, "Query job not found");
}
return null;
}
}
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
// getQueryJobSkipExceptionTranslation will never return null so this is safe.
return Job.fromPb(bigQueryOptions.getService(), jobPb);
}

Expand All @@ -948,22 +961,23 @@ TableDataList tableDataListRpc(TableId destinationTable, String pageToken) {
? bigQueryOptions.getProjectId()
: destinationTable.getProject());
TableDataList results =
runWithRetries(
BigQueryRetryHelper.runWithRetries(
() ->
bigQueryOptions
.getBigQueryRpcV2()
.listTableDataWithRowLimit(
.listTableDataWithRowLimitSkipExceptionTranslation(
completeTableId.getProject(),
completeTableId.getDataset(),
completeTableId.getTable(),
connectionSettings.getMaxResultPerPage(),
pageToken),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock());
bigQueryOptions.getClock(),
EMPTY_RETRY_CONFIG);

return results;
} catch (RetryHelper.RetryHelperException e) {
} catch (BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}
Expand Down Expand Up @@ -1177,12 +1191,14 @@ GetQueryResultsResponse getQueryResultsFirstPage(JobId jobId) {
results =
BigQueryRetryHelper.runWithRetries(
() ->
bigQueryRpc.getQueryResultsWithRowLimit(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation(),
connectionSettings.getMaxResultPerPage(),
timeoutMs),
bigQueryOptions
.getBigQueryRpcV2()
.getQueryResultsWithRowLimitSkipExceptionTranslation(
completeJobId.getProject(),
completeJobId.getJob(),
completeJobId.getLocation(),
connectionSettings.getMaxResultPerPage(),
timeoutMs),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock(),
Expand All @@ -1197,7 +1213,7 @@ GetQueryResultsResponse getQueryResultsFirstPage(JobId jobId) {
// with the case where there is a HTTP error
throw new BigQueryException(bigQueryErrors);
}
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
} catch (BigQueryRetryHelperException e) {
logger.log(Level.WARNING, "\n Error occurred while calling getQueryResultsWithRowLimit", e);
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -1442,7 +1458,10 @@ com.google.api.services.bigquery.model.Job createQueryJob(
try {
queryJob =
BigQueryRetryHelper.runWithRetries(
() -> bigQueryRpc.createJobForQuery(jobPb),
() ->
bigQueryOptions
.getBigQueryRpcV2()
.createJobForQuerySkipExceptionTranslation(jobPb),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock(),
Expand Down Expand Up @@ -1482,7 +1501,10 @@ com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
try {
dryRunJob =
BigQueryRetryHelper.runWithRetries(
() -> bigQueryRpc.createJobForQuery(jobPb),
() ->
bigQueryOptions
.getBigQueryRpcV2()
.createJobForQuerySkipExceptionTranslation(jobPb),
bigQueryOptions.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
bigQueryOptions.getClock(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.google.cloud.bigquery;

import static com.google.cloud.RetryHelper.runWithRetries;

import com.google.cloud.BaseWriteChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.bigquery.BigQueryRetryHelper.BigQueryRetryHelperException;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
Expand All @@ -34,6 +33,9 @@
public class TableDataWriteChannel
extends BaseWriteChannel<BigQueryOptions, WriteChannelConfiguration> {

private static final BigQueryRetryConfig EMPTY_RETRY_CONFIG =
BigQueryRetryConfig.newBuilder().build();

private Job job;

TableDataWriteChannel(
Expand All @@ -50,20 +52,22 @@ public class TableDataWriteChannel
protected void flushBuffer(final int length, final boolean last) {
try {
com.google.api.services.bigquery.model.Job jobPb =
runWithRetries(
BigQueryRetryHelper.runWithRetries(
new Callable<com.google.api.services.bigquery.model.Job>() {
@Override
public com.google.api.services.bigquery.model.Job call() {
public com.google.api.services.bigquery.model.Job call() throws IOException {
return getOptions()
.getBigQueryRpcV2()
.write(getUploadId(), getBuffer(), 0, getPosition(), length, last);
.writeSkipExceptionTranslation(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
}
},
getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock());
getOptions().getClock(),
EMPTY_RETRY_CONFIG);
job = jobPb != null ? Job.fromPb(getOptions().getService(), jobPb) : null;
} catch (RetryHelper.RetryHelperException e) {
} catch (BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}
Expand All @@ -78,22 +82,23 @@ private static String open(
final JobId jobId,
final WriteChannelConfiguration writeChannelConfiguration) {
try {
return runWithRetries(
return BigQueryRetryHelper.runWithRetries(
new Callable<String>() {
@Override
public String call() {
public String call() throws IOException {
return options
.getBigQueryRpcV2()
.open(
.openSkipExceptionTranslation(
new com.google.api.services.bigquery.model.Job()
.setConfiguration(writeChannelConfiguration.toPb())
.setJobReference(jobId.toPb()));
}
},
options.getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
options.getClock(),
EMPTY_RETRY_CONFIG);
} catch (BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}
Expand Down
Loading