Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption
waitForJob(RetryOption.mergeToSettings(DEFAULT_QUERY_JOB_WAIT_SETTINGS, waitOptions));
}

return completedJobResponse == null ? null : reload();
return completedJobResponse == null ? null : bigquery.getJob(getJobId());
} finally {
if (waitFor != null) {
waitFor.end();
Expand Down Expand Up @@ -462,7 +462,30 @@ private QueryResponse waitForQueryResults(
new Callable<QueryResponse>() {
@Override
public QueryResponse call() {
return bigquery.getQueryResults(getJobId(), resultsOptions);
try {
return bigquery.getQueryResults(getJobId(), resultsOptions);
Copy link
Member

@lqiu96 lqiu96 Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think code-wise this probably will work, but I'm not sure I'm the best person to review for logic (yet). I think for this PR someone more familiar with bigquery's internals would be better since this impacts all the RPCs.

I don't fully understand jobs vs queries yet and to me, this seems odd. Shouldn't the queryResults be checking the job status to even get the query results? How does the query get results if the job didn't finish? Seems odd that if the server returns rate limit exception, we attempt to bypass the response and directly poll for the job result and return a dummy value.

If there is a RateLimitException from the server, I think the first thing would be to have stronger backoff requirements so we ease quota/ load. I think that the default 12 hour timeout is too much (unless BQ has long running jobs which I'm not familiar enough to know).

I think if we do support user configurations, then perhaps the user should shorten the total timeout and increase the backoff so they're not sitting waiting 12+ hours.

Perhaps Phong could give us more insight?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Lawrence for the feedback. @PhongChuong, could you please take a look?

Copy link
Contributor

@PhongChuong PhongChuong Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should work but I think we should be reductant to add even more logic to our already complicated retry logic.

Can we verify a few things first before proceeding? I'm thinking out loud:

  1. It seems weird to me that the shouldRetry on 494 should be True when semantically, we expect prevResponse.getCompleted() to be true since the Job == DONE. The value is set here based on GetQueryResultsResponse. Can you verify if the server response with the correct JobStatus that matches that of getJob?
  2. I wonder if we should add the getJob + jobStatus check logic into the shouldRetry section instead.

Regarding the timeout, it is currently possible to set that value. However, our default is as @lqiu96 said extremely long at 12 hours. IIRC, we had a brief discussion regarding changing this value but there was no consensus moving forward. It might be useful t bring this up again during the next BigQuery meeting.

} catch (BigQueryException e) {
// Intercept the exception to check the job's terminal status.
Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS));

if (job != null
&& job.getStatus() != null
&& JobStatus.State.DONE.equals(job.getStatus().getState())) {
// To stop the retry loop, we return a synthetic "completed" response with all
// required fields. The caller (waitForInternal) will then proceed to reload the
// job's actual final (failed) status.
return QueryResponse.newBuilder()
.setCompleted(true)
.setTotalRows(0L) // Required by the builder
.setSchema(Schema.of()) // Required by the builder
.setErrors(ImmutableList.<BigQueryError>of()) // Required by the builder
.build();
}

// If the job is not done, re-throw the original exception to allow
// the configured retry policy to handle it.
throw e;
}
}
},
retrySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,60 @@ public void testWaitForWithBigQueryRetryConfigErrorShouldNotRetry() throws Inter
verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS);
}

@Test
public void testWaitForReturnsFailedJobOnRateLimitError() throws InterruptedException {
// Setup: A running query job and a config to retry on rate limit errors.
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build();
Job queryJob =
new Job(
bigquery,
new JobInfo.BuilderImpl(
JobInfo.newBuilder(queryConfig)
.setJobId(JOB_ID)
.setStatus(new JobStatus(State.RUNNING))
.build()));
BigQueryRetryConfig retryConfig =
BigQueryRetryConfig.newBuilder()
.retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
.build();

// The getQueryResults call always fails with a rate limit error.
BigQueryError rateLimitError =
new BigQueryError("rateLimitExceeded", "US", BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG);
BigQueryException rateLimitException = new BigQueryException(ImmutableList.of(rateLimitError));
when(bigquery.getOptions()).thenReturn(mockOptions);
when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock());
when(bigquery.getQueryResults(eq(JOB_ID), any(BigQuery.QueryResultsOption[].class)))
.thenAnswer(
invocation -> {
throw rateLimitException;
});

// The underlying job has already failed for a different reason.
BigQueryError jobError = new BigQueryError("backendError", "US", "Backend error");
Job failedJob =
queryJob.toBuilder().setStatus(new JobStatus(State.DONE, jobError, null)).build();
when(bigquery.getJob(eq(JOB_ID), any(BigQuery.JobOption[].class)))
.thenAnswer(
invocation -> {
return failedJob;
});

Job completedJob =
queryJob.waitFor(
retryConfig,
RetryOption.totalTimeoutDuration(Duration.ofMillis(500L)),
RetryOption.initialRetryDelayDuration(Duration.ofMillis(100L)));

assertNotNull("The completed job should not be null.", completedJob);
assertEquals("Job should be in a DONE state.", State.DONE, completedJob.getStatus().getState());
assertEquals(
"The job's error should be the backendError.",
jobError,
completedJob.getStatus().getError());
}

@Test
public void testReload() {
JobInfo updatedInfo = JOB_INFO.toBuilder().setEtag("etag").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3393,16 +3393,17 @@ public void testAuthorizeDataset() {
public void testSingleStatementsQueryException() throws InterruptedException {
String invalidQuery =
String.format("INSERT %s.%s VALUES('3', 10);", DATASET, TABLE_ID.getTable());
try {
bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor();
fail("BigQueryException was expected");
} catch (BigQueryException ex) {
assertEquals("invalidQuery", ex.getReason());
assertNotNull(ex.getMessage());
BigQueryError error = ex.getError();
assertEquals("invalidQuery", error.getReason());
assertNotNull(error.getMessage());
}

Job completedJob =
bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor();

assertNotNull(completedJob);
assertNotNull(completedJob.getStatus());

BigQueryError error = completedJob.getStatus().getError();
assertNotNull(error);
assertEquals("invalidQuery", error.getReason());
assertNotNull(error.getMessage());
}

/* TODO(prasmish): replicate the entire test case for executeSelect */
Expand All @@ -3412,16 +3413,17 @@ public void testMultipleStatementsQueryException() throws InterruptedException {
String.format(
"INSERT %s.%s VALUES('3', 10); DELETE %s.%s where c2=3;",
DATASET, TABLE_ID.getTable(), DATASET, TABLE_ID.getTable());
try {
bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor();
fail("BigQueryException was expected");
} catch (BigQueryException ex) {
assertEquals("invalidQuery", ex.getReason());
assertNotNull(ex.getMessage());
BigQueryError error = ex.getError();
assertEquals("invalidQuery", error.getReason());
assertNotNull(error.getMessage());
}

Job completedJob =
bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor();

assertNotNull(completedJob);
assertNotNull(completedJob.getStatus());

BigQueryError error = completedJob.getStatus().getError();
assertNotNull(error);
assertEquals("invalidQuery", error.getReason());
assertNotNull(error.getMessage());
}

@Test
Expand Down