diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index 88950b9fb..d0a77d767 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -328,7 +328,7 @@ private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption waitForJob(RetryOption.mergeToSettings(DEFAULT_QUERY_JOB_WAIT_SETTINGS, waitOptions)); } - return completedJobResponse == null ? null : reload(); + return completedJobResponse == null ? null : bigquery.getJob(getJobId()); } finally { if (waitFor != null) { waitFor.end(); @@ -462,7 +462,30 @@ private QueryResponse waitForQueryResults( new Callable() { @Override public QueryResponse call() { - return bigquery.getQueryResults(getJobId(), resultsOptions); + try { + return bigquery.getQueryResults(getJobId(), resultsOptions); + } catch (BigQueryException e) { + // Intercept the exception to check the job's terminal status. + Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS)); + + if (job != null + && job.getStatus() != null + && JobStatus.State.DONE.equals(job.getStatus().getState())) { + // To stop the retry loop, we return a synthetic "completed" response with all + // required fields. The caller (waitForInternal) will then proceed to reload the + // job's actual final (failed) status. + return QueryResponse.newBuilder() + .setCompleted(true) + .setTotalRows(0L) // Required by the builder + .setSchema(Schema.of()) // Required by the builder + .setErrors(ImmutableList.of()) // Required by the builder + .build(); + } + + // If the job is not done, re-throw the original exception to allow + // the configured retry policy to handle it. + throw e; + } } }, retrySettings, diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java index f12d9fcaf..72e671c14 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java @@ -553,6 +553,60 @@ public void testWaitForWithBigQueryRetryConfigErrorShouldNotRetry() throws Inter verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); } + @Test + public void testWaitForReturnsFailedJobOnRateLimitError() throws InterruptedException { + // Setup: A running query job and a config to retry on rate limit errors. + QueryJobConfiguration queryConfig = + QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); + Job queryJob = + new Job( + bigquery, + new JobInfo.BuilderImpl( + JobInfo.newBuilder(queryConfig) + .setJobId(JOB_ID) + .setStatus(new JobStatus(State.RUNNING)) + .build())); + BigQueryRetryConfig retryConfig = + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .build(); + + // The getQueryResults call always fails with a rate limit error. + BigQueryError rateLimitError = + new BigQueryError("rateLimitExceeded", "US", BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG); + BigQueryException rateLimitException = new BigQueryException(ImmutableList.of(rateLimitError)); + when(bigquery.getOptions()).thenReturn(mockOptions); + when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); + when(bigquery.getQueryResults(eq(JOB_ID), any(BigQuery.QueryResultsOption[].class))) + .thenAnswer( + invocation -> { + throw rateLimitException; + }); + + // The underlying job has already failed for a different reason. + BigQueryError jobError = new BigQueryError("backendError", "US", "Backend error"); + Job failedJob = + queryJob.toBuilder().setStatus(new JobStatus(State.DONE, jobError, null)).build(); + when(bigquery.getJob(eq(JOB_ID), any(BigQuery.JobOption[].class))) + .thenAnswer( + invocation -> { + return failedJob; + }); + + Job completedJob = + queryJob.waitFor( + retryConfig, + RetryOption.totalTimeoutDuration(Duration.ofMillis(500L)), + RetryOption.initialRetryDelayDuration(Duration.ofMillis(100L))); + + assertNotNull("The completed job should not be null.", completedJob); + assertEquals("Job should be in a DONE state.", State.DONE, completedJob.getStatus().getState()); + assertEquals( + "The job's error should be the backendError.", + jobError, + completedJob.getStatus().getError()); + } + @Test public void testReload() { JobInfo updatedInfo = JOB_INFO.toBuilder().setEtag("etag").build(); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index c9f6296cc..d0515d8d7 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -3393,16 +3393,17 @@ public void testAuthorizeDataset() { public void testSingleStatementsQueryException() throws InterruptedException { String invalidQuery = String.format("INSERT %s.%s VALUES('3', 10);", DATASET, TABLE_ID.getTable()); - try { - bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor(); - fail("BigQueryException was expected"); - } catch (BigQueryException ex) { - assertEquals("invalidQuery", ex.getReason()); - assertNotNull(ex.getMessage()); - BigQueryError error = ex.getError(); - assertEquals("invalidQuery", error.getReason()); - assertNotNull(error.getMessage()); - } + + Job completedJob = + bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor(); + + assertNotNull(completedJob); + assertNotNull(completedJob.getStatus()); + + BigQueryError error = completedJob.getStatus().getError(); + assertNotNull(error); + assertEquals("invalidQuery", error.getReason()); + assertNotNull(error.getMessage()); } /* TODO(prasmish): replicate the entire test case for executeSelect */ @@ -3412,16 +3413,17 @@ public void testMultipleStatementsQueryException() throws InterruptedException { String.format( "INSERT %s.%s VALUES('3', 10); DELETE %s.%s where c2=3;", DATASET, TABLE_ID.getTable(), DATASET, TABLE_ID.getTable()); - try { - bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor(); - fail("BigQueryException was expected"); - } catch (BigQueryException ex) { - assertEquals("invalidQuery", ex.getReason()); - assertNotNull(ex.getMessage()); - BigQueryError error = ex.getError(); - assertEquals("invalidQuery", error.getReason()); - assertNotNull(error.getMessage()); - } + + Job completedJob = + bigquery.create(JobInfo.of(QueryJobConfiguration.of(invalidQuery))).waitFor(); + + assertNotNull(completedJob); + assertNotNull(completedJob.getStatus()); + + BigQueryError error = completedJob.getStatus().getError(); + assertNotNull(error); + assertEquals("invalidQuery", error.getReason()); + assertNotNull(error.getMessage()); } @Test