From e73936b95fbbe42a85b65caee7de3f64ea0fcf08 Mon Sep 17 00:00:00 2001 From: stephwang Date: Tue, 7 Jul 2020 13:39:58 -0400 Subject: [PATCH] add logic for DML and DDL queries enable requestId add integration tests for fast path multipages query, DML, and DDL queries fix requestId logic update QueryRequestInfo and add mock test add mock test cases for SQL, DML, and DDL clean up code fix IT --- .../google/cloud/bigquery/BigQueryImpl.java | 50 +++++--- .../cloud/bigquery/QueryJobConfiguration.java | 2 +- .../cloud/bigquery/QueryRequestInfo.java | 111 +++++++++++++---- .../cloud/bigquery/BigQueryImplTest.java | 117 +++++++++++++++++- .../cloud/bigquery/it/ITBigQueryTest.java | 51 ++++++++ 5 files changed, 283 insertions(+), 48 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index f37b724077..84d46c8459 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -1187,8 +1187,9 @@ public TableResult query(QueryJobConfiguration configuration, JobOption... optio private TableResult fastQuery( final String projectId, final QueryRequest content, JobOption... options) throws InterruptedException { + com.google.api.services.bigquery.model.QueryResponse results; try { - com.google.api.services.bigquery.model.QueryResponse queryResponse = + results = runWithRetries( new Callable() { @Override @@ -1199,26 +1200,37 @@ public com.google.api.services.bigquery.model.QueryResponse call() { getOptions().getRetrySettings(), EXCEPTION_HANDLER, getOptions().getClock()); - - // Return result if there is only 1 page, otherwise use jobId returned from backend to return - // full results - if (queryResponse.getPageToken() == null) { - return new TableResult( - Schema.fromPb(queryResponse.getSchema()), - queryResponse.getTotalRows().longValue(), - new PageImpl<>( - new TableDataPageFetcher(null, getOptions(), null, optionMap(options)), - null, - transformTableData(queryResponse.getRows()))); - } else { - String jobId = queryResponse.getJobReference().getJobId(); - Job job = getJob(JobId.of(jobId)); - job.waitFor(); - return job.getQueryResults(); - } - } catch (RetryHelper.RetryHelperException e) { + } catch (RetryHelperException e) { throw BigQueryException.translateAndThrow(e); } + Long numRows; + if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) { + // DDL queries + numRows = 0L; + } else if (results.getNumDmlAffectedRows() != null) { + // DML queries + numRows = results.getNumDmlAffectedRows(); + } else { + // SQL queries + numRows = results.getTotalRows().longValue(); + } + + // Return result if there is only 1 page, otherwise use jobId returned from backend to return + // full results + if (results.getPageToken() == null) { + return new TableResult( + Schema.fromPb(results.getSchema()), + numRows, + new PageImpl<>( + new TableDataPageFetcher(null, getOptions(), null, optionMap(options)), + null, + transformTableData(results.getRows()))); + } else { + String jobId = results.getJobReference().getJobId(); + Job job = getJob(JobId.of(jobId)); + job.waitFor(); + return job.getQueryResults(); + } } @Override diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java index c26d5cf0c8..eff8f605c2 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java @@ -851,7 +851,7 @@ ToStringHelper toStringHelper() { .add("flattenResults", flattenResults) .add("priority", priority) .add("tableDefinitions", tableDefinitions) - .add("userQueryCache", useQueryCache) + .add("useQueryCache", useQueryCache) .add("userDefinedFunctions", userDefinedFunctions) .add("createDisposition", createDisposition) .add("writeDisposition", writeDisposition) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryRequestInfo.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryRequestInfo.java index d781c786b3..ef5ce0ace3 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryRequestInfo.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryRequestInfo.java @@ -16,17 +16,40 @@ package com.google.cloud.bigquery; -import com.google.api.services.bigquery.model.JobConfiguration; -import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.QueryParameter; import com.google.api.services.bigquery.model.QueryRequest; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; import com.google.common.collect.Lists; +import java.util.List; +import java.util.Map; +import java.util.UUID; final class QueryRequestInfo { + private static final String REQUEST_ID = UUID.randomUUID().toString(); private QueryJobConfiguration config; + private final List connectionProperties; + private final DatasetId defaultDataset; + private final Boolean dryRun; + private final Map labels; + private final Long maximumBytesBilled; + private final String query; + private final List queryParameters; + private final Boolean useQueryCache; + private final Boolean useLegacySql; QueryRequestInfo(QueryJobConfiguration config) { this.config = config; + this.connectionProperties = config.getConnectionProperties(); + this.defaultDataset = config.getDefaultDataset(); + this.dryRun = config.dryRun(); + this.labels = config.getLabels(); + this.maximumBytesBilled = config.getMaximumBytesBilled(); + this.query = config.getQuery(); + this.queryParameters = config.toPb().getQuery().getQueryParameters(); + this.useLegacySql = config.useLegacySql(); + this.useQueryCache = config.useQueryCache(); } boolean isFastQuerySupported() { @@ -45,37 +68,73 @@ boolean isFastQuerySupported() { } QueryRequest toPb() { - QueryRequest query = new QueryRequest(); - if (config.getConnectionProperties() != null) { - query.setConnectionProperties( - Lists.transform(config.getConnectionProperties(), ConnectionProperty.TO_PB_FUNCTION)); + QueryRequest request = new QueryRequest(); + if (connectionProperties != null) { + request.setConnectionProperties( + Lists.transform(connectionProperties, ConnectionProperty.TO_PB_FUNCTION)); } - if (config.getDefaultDataset() != null) { - query.setDefaultDataset(config.getDefaultDataset().toPb()); + if (defaultDataset != null) { + request.setDefaultDataset(defaultDataset.toPb()); } - if (config.dryRun() != null) { - query.setDryRun(config.dryRun()); + if (dryRun != null) { + request.setDryRun(dryRun); } - if (config.getLabels() != null) { - query.setLabels(config.getLabels()); + if (labels != null) { + request.setLabels(labels); } - if (config.getMaximumBytesBilled() != null) { - query.setMaximumBytesBilled(config.getMaximumBytesBilled()); + if (maximumBytesBilled != null) { + request.setMaximumBytesBilled(maximumBytesBilled); } - query.setQuery(config.getQuery()); - // TODO: add back when supported - // query.setRequestId(UUID.randomUUID().toString()); - JobConfiguration jobConfiguration = config.toPb(); - JobConfigurationQuery configurationQuery = jobConfiguration.getQuery(); - if (configurationQuery.getQueryParameters() != null) { - query.setQueryParameters(configurationQuery.getQueryParameters()); + request.setQuery(query); + request.setRequestId(REQUEST_ID); + if (queryParameters != null) { + request.setQueryParameters(queryParameters); } - if (config.useLegacySql() != null) { - query.setUseLegacySql(config.useLegacySql()); + if (useLegacySql != null) { + request.setUseLegacySql(useLegacySql); } - if (config.useQueryCache() != null) { - query.setUseQueryCache(config.useQueryCache()); + if (useQueryCache != null) { + request.setUseQueryCache(useQueryCache); } - return query; + return request; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("connectionProperties", connectionProperties) + .add("defaultDataset", defaultDataset) + .add("dryRun", dryRun) + .add("labels", labels) + .add("maximumBytesBilled", maximumBytesBilled) + .add("query", query) + .add("requestId", REQUEST_ID) + .add("queryParameters", queryParameters) + .add("useQueryCache", useQueryCache) + .add("useLegacySql", useLegacySql) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hashCode( + connectionProperties, + defaultDataset, + dryRun, + labels, + maximumBytesBilled, + query, + queryParameters, + REQUEST_ID, + useQueryCache, + useLegacySql); + } + + @Override + public boolean equals(Object obj) { + return obj == this + || obj != null + && obj.getClass().equals(QueryRequestInfo.class) + && java.util.Objects.equals(toPb(), ((QueryRequestInfo) obj).toPb()); } } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index 6a79edcb11..f31702bd60 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -36,6 +36,7 @@ import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.QueryRequest; import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableDataInsertAllRequest; import com.google.api.services.bigquery.model.TableDataInsertAllResponse; @@ -212,6 +213,16 @@ public class BigQueryImplTest { .setDefaultDataset(DatasetId.of(PROJECT, DATASET)) .setUseQueryCache(false) .build(); + private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_FOR_DMLQUERY = + QueryJobConfiguration.newBuilder("DML") + .setDefaultDataset(DatasetId.of(PROJECT, DATASET)) + .setUseQueryCache(false) + .build(); + private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_FOR_DDLQUERY = + QueryJobConfiguration.newBuilder("DDL") + .setDefaultDataset(DatasetId.of(PROJECT, DATASET)) + .setUseQueryCache(false) + .build(); private static final JobInfo JOB_INFO = JobInfo.newBuilder(QUERY_JOB_CONFIGURATION_FOR_QUERY) .setJobId(JobId.of(PROJECT, JOB)) @@ -1815,12 +1826,10 @@ public void testQueryRequestCompleted() throws InterruptedException { @Test public void testFastQueryRequestCompleted() throws InterruptedException { - JobId queryJob = JobId.of(PROJECT, JOB); com.google.api.services.bigquery.model.QueryResponse queryResponsePb = new com.google.api.services.bigquery.model.QueryResponse() .setCacheHit(false) .setJobComplete(true) - .setJobReference(queryJob.toPb()) .setKind("bigquery#queryResponse") .setPageToken(null) .setRows(ImmutableList.of(TABLE_ROW)) @@ -2109,6 +2118,110 @@ public void testQueryDryRun() throws Exception { } } + @Test + public void testFastQuerySQLShouldRetry() throws Exception { + com.google.api.services.bigquery.model.QueryResponse responsePb = + new com.google.api.services.bigquery.model.QueryResponse() + .setCacheHit(false) + .setJobComplete(true) + .setRows(ImmutableList.of(TABLE_ROW)) + .setPageToken(null) + .setTotalBytesProcessed(42L) + .setTotalRows(BigInteger.valueOf(1L)) + .setSchema(TABLE_SCHEMA.toPb()) + .setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage"))); + + QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY); + QueryRequest requestPb = requestInfo.toPb(); + + when(bigqueryRpcMock.fastQuery(PROJECT, requestPb)) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenThrow(new BigQueryException(504, "Gateway Timeout")) + .thenReturn(responsePb); + + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY); + assertEquals(1, response.getTotalRows()); + verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb); + } + + @Test + public void testFastQueryDMLShouldRetry() throws Exception { + com.google.api.services.bigquery.model.QueryResponse responsePb = + new com.google.api.services.bigquery.model.QueryResponse() + .setCacheHit(false) + .setJobComplete(true) + .setRows(ImmutableList.of(TABLE_ROW)) + .setPageToken(null) + .setTotalBytesProcessed(42L) + .setNumDmlAffectedRows(1L) + .setSchema(TABLE_SCHEMA.toPb()) + .setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage"))); + + QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY); + QueryRequest requestPb = requestInfo.toPb(); + + when(bigqueryRpcMock.fastQuery(PROJECT, requestPb)) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenThrow(new BigQueryException(504, "Gateway Timeout")) + .thenReturn(responsePb); + + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY); + assertEquals(1, response.getTotalRows()); + verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb); + } + + @Test + public void testFastQueryDDLShouldRetry() throws Exception { + com.google.api.services.bigquery.model.QueryResponse responsePb = + new com.google.api.services.bigquery.model.QueryResponse() + .setCacheHit(false) + .setJobComplete(true) + .setRows(ImmutableList.of(TABLE_ROW)) + .setPageToken(null) + .setTotalBytesProcessed(42L) + .setSchema(TABLE_SCHEMA.toPb()) + .setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage"))); + + QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY); + QueryRequest requestPb = requestInfo.toPb(); + + when(bigqueryRpcMock.fastQuery(PROJECT, requestPb)) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenThrow(new BigQueryException(504, "Gateway Timeout")) + .thenReturn(responsePb); + + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY); + assertEquals(0, response.getTotalRows()); + verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb); + } + @Test public void testCreateRoutine() { RoutineInfo routineInfo = ROUTINE_INFO.setProjectId(OTHER_PROJECT); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 1e3424b71b..921b74b2fa 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -81,6 +81,7 @@ import com.google.cloud.bigquery.RoutineInfo; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLDataType; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDataWriteChannel; @@ -257,6 +258,7 @@ public class ITBigQueryTest { private static final String EXTRACT_MODEL_FILE = "extract_model.csv"; private static final String BUCKET = RemoteStorageHelper.generateBucketName(); private static final TableId TABLE_ID = TableId.of(DATASET, "testing_table"); + private static final TableId TABLE_ID_FASTQUERY = TableId.of(DATASET, "fastquery_testing_table"); private static final String CSV_CONTENT = "StringValue1\nStringValue2\n"; private static final String JSON_CONTENT = "{" @@ -353,6 +355,17 @@ public static void beforeClass() throws InterruptedException, TimeoutException { assertNull(job.getStatus().getError()); LoadJobConfiguration loadJobConfiguration = job.getConfiguration(); assertEquals(labels, loadJobConfiguration.getLabels()); + + LoadJobConfiguration configurationFastQuery = + LoadJobConfiguration.newBuilder( + TABLE_ID_FASTQUERY, "gs://" + BUCKET + "/" + JSON_LOAD_FILE, FormatOptions.json()) + .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) + .setSchema(TABLE_SCHEMA) + .setLabels(labels) + .build(); + Job jobFastQuery = bigquery.create(JobInfo.of(configurationFastQuery)); + jobFastQuery = jobFastQuery.waitFor(); + assertNull(jobFastQuery.getStatus().getError()); } @AfterClass @@ -1409,6 +1422,44 @@ public void testFastQuery() throws InterruptedException { assertEquals(2, rowCount); } + @Test + public void testFastQueryMultiPages() throws InterruptedException { + String query = + "SELECT date, state_name, confirmed_cases FROM `bigquery-public-data.covid19_nyt.us_counties`"; + QueryJobConfiguration config = + QueryJobConfiguration.newBuilder(query).setDefaultDataset(DatasetId.of(DATASET)).build(); + TableResult result = bigquery.query(config); + assertNotNull(result.getNextPage()); + assertNotNull(result.getNextPageToken()); + assertTrue(result.hasNextPage()); + } + + @Test + public void testFastDMLQuery() throws InterruptedException { + String tableName = TABLE_ID_FASTQUERY.getTable(); + String dmlQuery = + String.format("UPDATE %s.%s SET StringField = 'hello' WHERE TRUE", DATASET, tableName); + QueryJobConfiguration config = + QueryJobConfiguration.newBuilder(dmlQuery).setDefaultDataset(DatasetId.of(DATASET)).build(); + TableResult result = bigquery.query(config); + assertEquals(TABLE_SCHEMA, result.getSchema()); + assertEquals(2, result.getTotalRows()); + } + + @Test + public void testFastDDLQuery() throws InterruptedException { + String tableName = "test_table_fast_query_ddl"; + String ddlQuery = + String.format("CREATE OR REPLACE TABLE %s.%s ( StringField STRING )", DATASET, tableName); + QueryJobConfiguration config = + QueryJobConfiguration.newBuilder(ddlQuery).setDefaultDataset(DatasetId.of(DATASET)).build(); + TableResult result = bigquery.query(config); + assertEquals( + Schema.of(Field.newBuilder("StringField", StandardSQLTypeName.STRING).build()), + result.getSchema()); + assertEquals(0, result.getTotalRows()); + } + @Test public void testScriptStatistics() throws InterruptedException { String script =