+ *
+ * @throws BigQueryException upon failure
+ * @param connectionSettings
+ */
+ @BetaApi
+ Connection createConnection(@NonNull ConnectionSettings connectionSettings);
+
+ /**
+ * Creates a new BigQuery query connection used for executing queries (not the same as BigQuery
+ * connection properties). It uses the BigQuery Storage Read API for high throughput queries by
+ * default. This overloaded method creates a Connection with default ConnectionSettings for query
+ * execution where default values are set for numBufferedRows (20000), useReadApi (true),
+ * useLegacySql (false).
+ *
+ *
+ *
+ * @throws BigQueryException upon failure
+ */
+ @BetaApi
+ Connection createConnection();
+
/**
* Returns the requested dataset or {@code null} if not found.
*
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryDryRunResult.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryDryRunResult.java
new file mode 100644
index 000000000..0494aa1a9
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryDryRunResult.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.api.core.BetaApi;
+import java.util.List;
+
+public interface BigQueryDryRunResult {
+
+ /** Returns the schema of the results. Null if the schema is not supplied. */
+ @BetaApi
+ Schema getSchema() throws BigQuerySQLException;
+
+ /**
+ * Returns query parameters for standard SQL queries by extracting undeclare query parameters from
+ * the dry run job. See more information:
+ * https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/JobStatistics2.html#getUndeclaredQueryParameters--
+ */
+ @BetaApi
+ List getQueryParameters() throws BigQuerySQLException;
+
+ /** Returns some processing statistics */
+ @BetaApi
+ BigQueryResultStats getStatistics() throws BigQuerySQLException;
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryDryRunResultImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryDryRunResultImpl.java
new file mode 100644
index 000000000..fabb2f2fc
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryDryRunResultImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import java.util.List;
+
+public class BigQueryDryRunResultImpl implements BigQueryDryRunResult {
+ private Schema schema;
+ private List queryParameters;
+ private BigQueryResultStats stats;
+
+ BigQueryDryRunResultImpl(
+ Schema schema,
+ List queryParameters,
+ BigQueryResultStats stats) { // Package-Private access
+ this.schema = schema;
+ this.queryParameters = queryParameters;
+ this.stats = stats;
+ }
+
+ @Override
+ public Schema getSchema() throws BigQuerySQLException {
+ return schema;
+ }
+
+ @Override
+ public List getQueryParameters() throws BigQuerySQLException {
+ return queryParameters;
+ }
+
+ @Override
+ public BigQueryResultStats getStatistics() throws BigQuerySQLException {
+ return stats;
+ }
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java
index 06cbf344c..c42ff6327 100644
--- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java
@@ -134,4 +134,13 @@ static BaseServiceException translateAndThrow(ExecutionException ex) {
BaseServiceException.translate(ex);
throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
}
+
+ static BaseServiceException translateAndThrow(Exception ex) {
+ throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
+ }
+
+ static BaseServiceException translateAndThrowBigQuerySQLException(BigQueryException e)
+ throws BigQuerySQLException {
+ throw new BigQuerySQLException(e.getMessage(), e, e.getErrors());
+ }
}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java
index b2e939df0..3cfbfd652 100644
--- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.paging.Page;
import com.google.api.services.bigquery.model.ErrorProto;
@@ -54,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.checkerframework.checker.nullness.qual.NonNull;
final class BigQueryImpl extends BaseService implements BigQuery {
@@ -351,6 +353,21 @@ public JobId get() {
return create(jobInfo, idProvider, options);
}
+ @Override
+ @BetaApi
+ public Connection createConnection(@NonNull ConnectionSettings connectionSettings)
+ throws BigQueryException {
+ return new ConnectionImpl(connectionSettings, getOptions(), bigQueryRpc, DEFAULT_RETRY_CONFIG);
+ }
+
+ @Override
+ @BetaApi
+ public Connection createConnection() throws BigQueryException {
+ ConnectionSettings defaultConnectionSettings = ConnectionSettings.newBuilder().build();
+ return new ConnectionImpl(
+ defaultConnectionSettings, getOptions(), bigQueryRpc, DEFAULT_RETRY_CONFIG);
+ }
+
@InternalApi("visible for testing")
Job create(JobInfo jobInfo, Supplier idProvider, JobOption... options) {
final boolean idRandom = (jobInfo.getJobId() == null);
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResult.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResult.java
new file mode 100644
index 000000000..6b0c35f67
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResult.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import java.sql.ResultSet;
+
+public interface BigQueryResult {
+
+ /** Returns the schema of the results. */
+ Schema getSchema();
+
+ /**
+ * Returns the total number of rows in the complete result set, which can be more than the number
+ * of rows in the first page of results. This might return -1 if the query is long running and the
+ * job is not complete at the time this object is returned.
+ */
+ long getTotalRows();
+
+ /* Returns the underlying ResultSet Implementation */
+ ResultSet getResultSet();
+
+ /* Returns the query statistics associated with this query. */
+ BigQueryResultStats getBigQueryResultStats();
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultImpl.java
new file mode 100644
index 000000000..7c24ca0dd
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultImpl.java
@@ -0,0 +1,610 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.arrow.vector.util.JsonStringArrayList;
+import org.apache.arrow.vector.util.Text;
+
+public class BigQueryResultImpl implements BigQueryResult {
+
+ private static final String NULL_CURSOR_MSG =
+ "Error occurred while reading the cursor. This could happen if getters are called after we are done reading all the records";
+
+ // This class represents a row of records, the columns are represented as a map
+ // (columnName:columnValue pair)
+ static class Row {
+ private Map value;
+ private boolean isLast;
+
+ public Row(Map value) {
+ this.value = value;
+ }
+
+ public Row(Map value, boolean isLast) {
+ this.value = value;
+ this.isLast = isLast;
+ }
+
+ public Map getValue() {
+ return value;
+ }
+
+ public boolean isLast() {
+ return isLast;
+ }
+
+ public boolean hasField(String fieldName) {
+ return this.value.containsKey(fieldName);
+ }
+
+ public Object get(String fieldName) {
+ return this.value.get(fieldName);
+ }
+ }
+
+ private final Schema schema;
+ private final long totalRows;
+ private final BlockingQueue buffer;
+ private T cursor;
+ private final BigQueryResultSet underlyingResultSet;
+ private final BigQueryResultStats bigQueryResultStats;
+ private final FieldList schemaFieldList;
+
+ public BigQueryResultImpl(
+ Schema schema,
+ long totalRows,
+ BlockingQueue buffer,
+ BigQueryResultStats bigQueryResultStats) {
+ this.schema = schema;
+ this.totalRows = totalRows;
+ this.buffer = buffer;
+ this.underlyingResultSet = new BigQueryResultSet();
+ this.bigQueryResultStats = bigQueryResultStats;
+ this.schemaFieldList = schema.getFields();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public long getTotalRows() {
+ return totalRows;
+ }
+
+ @Override
+ public ResultSet getResultSet() {
+ return underlyingResultSet;
+ }
+
+ private class BigQueryResultSet extends AbstractJdbcResultSet {
+ @Override
+ /*Advances the result set to the next row, returning false if no such row exists. Potentially blocking operation*/
+ public boolean next() throws SQLException {
+ try {
+ cursor = buffer.take(); // advance the cursor,Potentially blocking operation
+ if (isEndOfStream(cursor)) { // check for end of stream
+ cursor = null;
+ return false;
+ } else if (cursor instanceof Row) {
+ Row curTup = (Row) cursor;
+ if (curTup.isLast()) { // last Tuple
+ cursor = null;
+ return false;
+ }
+ return true;
+ } else if (cursor instanceof FieldValueList) { // cursor is advanced, we can return true now
+ return true;
+ } else { // this case should never occur as the cursor will either be a Row of EoS
+ throw new BigQuerySQLException("Could not process the current row");
+ }
+ } catch (InterruptedException e) {
+ throw new SQLException(
+ "Error occurred while advancing the cursor. This could happen when connection is closed while we call the next method");
+ }
+ }
+
+ private boolean isEndOfStream(T cursor) {
+ return cursor instanceof ConnectionImpl.EndOfFieldValueList;
+ }
+
+ @Override
+ public Object getObject(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null) ? null : fieldValue.getValue();
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ return curRow.get(fieldName);
+ }
+ }
+
+ @Override
+ public Object getObject(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ return null;
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null) ? null : fieldValue.getValue();
+ } else { // Data received from Read API (Arrow)
+ return getObject(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public String getString(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ if ((fieldValue == null || fieldValue.getValue() == null)) {
+ return null;
+ } else if (fieldValue
+ .getAttribute()
+ .equals(FieldValue.Attribute.REPEATED)) { // Case for Arrays
+ return fieldValue.getValue().toString();
+ } else {
+ return fieldValue.getStringValue();
+ }
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object currentVal = curRow.get(fieldName);
+ if (currentVal == null) {
+ return null;
+ } else if (currentVal instanceof JsonStringArrayList) { // arrays
+ JsonStringArrayList jsnAry = (JsonStringArrayList) currentVal;
+ return jsnAry.toString();
+ } else if (currentVal instanceof LocalDateTime) {
+ LocalDateTime dateTime = (LocalDateTime) currentVal;
+ return dateTime.toString();
+ } else {
+ Text textVal = (Text) currentVal;
+ return textVal.toString();
+ }
+ }
+ }
+
+ @Override
+ public String getString(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ return null;
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : fieldValue.getStringValue();
+ } else { // Data received from Read API (Arrow)
+ return getString(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public int getInt(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ return 0; // the column value; if the value is SQL NULL, the value returned is 0 as per
+ // java.sql.ResultSet definition
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? 0
+ : fieldValue.getNumericValue().intValue();
+ } else { // Data received from Read API (Arrow)
+
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object curVal = curRow.get(fieldName);
+ if (curVal == null) {
+ return 0;
+ }
+ if (curVal instanceof Text) { // parse from text to int
+ return Integer.parseInt(((Text) curVal).toString());
+ } else if (curVal
+ instanceof
+ Long) { // incase getInt is called for a Long value. Loss of precision might occur
+ return ((Long) curVal).intValue();
+ }
+ return ((BigDecimal) curVal).intValue();
+ }
+ }
+
+ @Override
+ public int getInt(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ return 0; // the column value; if the value is SQL NULL, the value returned is 0 as per
+ // java.sql.ResultSet definition
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? 0
+ : fieldValue.getNumericValue().intValue();
+ } else { // Data received from Read API (Arrow)
+ return getInt(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public long getLong(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? 0L
+ : fieldValue.getNumericValue().longValue();
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object curVal = curRow.get(fieldName);
+ if (curVal == null) {
+ return 0L;
+ } else { // value will be Long or BigDecimal, but are Number
+ return ((Number) curVal).longValue();
+ }
+ }
+ }
+
+ @Override
+ public long getLong(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ return 0L; // the column value; if the value is SQL NULL, the value returned is 0 as per
+ // java.sql.ResultSet definition
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? 0L
+ : fieldValue.getNumericValue().longValue();
+ } else { // Data received from Read API (Arrow)
+ return getInt(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public double getDouble(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? 0d
+ : fieldValue.getNumericValue().doubleValue();
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object curVal = curRow.get(fieldName);
+ return curVal == null ? 0.0d : ((BigDecimal) curVal).doubleValue();
+ }
+ }
+
+ @Override
+ public double getDouble(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ return 0d; // the column value; if the value is SQL NULL, the value returned is 0 as per
+ // java.sql.ResultSet definition
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? 0d
+ : fieldValue.getNumericValue().doubleValue();
+ } else { // Data received from Read API (Arrow)
+ return getDouble(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : BigDecimal.valueOf(fieldValue.getNumericValue().doubleValue());
+ } else { // Data received from Read API (Arrow)
+ return BigDecimal.valueOf(getDouble(fieldName));
+ }
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : BigDecimal.valueOf(fieldValue.getNumericValue().doubleValue());
+ } else { // Data received from Read API (Arrow)
+ return getBigDecimal(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public boolean getBoolean(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return fieldValue.getValue() != null && fieldValue.getBooleanValue();
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object curVal = curRow.get(fieldName);
+ return curVal != null && (Boolean) curVal;
+ }
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return fieldValue.getValue() != null && fieldValue.getBooleanValue();
+ } else { // Data received from Read API (Arrow)
+ return getBoolean(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public byte[] getBytes(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : fieldValue.getBytesValue();
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object curVal = curRow.get(fieldName);
+ return curVal == null ? null : (byte[]) curVal;
+ }
+ }
+
+ @Override
+ public byte[] getBytes(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ return null; // if the value is SQL NULL, the value returned is null
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : fieldValue.getBytesValue();
+ } else { // Data received from Read API (Arrow)
+ return getBytes(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public Timestamp getTimestamp(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ }
+ if (cursor == null) {
+ return null; // if the value is SQL NULL, the value returned is null
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : new Timestamp(
+ fieldValue.getTimestampValue()
+ / 1000); // getTimestampValue returns time in microseconds, and TimeStamp
+ // expects it in millis
+ } else {
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object timeStampVal = curRow.get(fieldName);
+ return timeStampVal == null
+ ? null
+ : new Timestamp((Long) timeStampVal / 1000); // Timestamp is represented as a Long
+ }
+ }
+
+ @Override
+ public Timestamp getTimestamp(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : new Timestamp(
+ fieldValue.getTimestampValue()
+ / 1000); // getTimestampValue returns time in microseconds, and TimeStamp
+ // expects it in millis
+ } else { // Data received from Read API (Arrow)
+ return getTimestamp(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ @Override
+ public Time getTime(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ }
+ if (cursor == null) {
+ return null; // if the value is SQL NULL, the value returned is null
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return getTimeFromFieldVal(fieldValue);
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object timeStampObj = curRow.get(fieldName);
+ return timeStampObj == null
+ ? null
+ : new Time(
+ ((Long) timeStampObj)
+ / 1000); // Time.toString() will return 12:11:35 in GMT as 17:41:35 in
+ // (GMT+5:30). This can be offset using getTimeZoneOffset
+ }
+ }
+
+ private int getTimeZoneOffset() {
+ TimeZone timeZone = TimeZone.getTimeZone(ZoneId.systemDefault());
+ return timeZone.getOffset(new java.util.Date().getTime()); // offset in seconds
+ }
+
+ @Override
+ public Time getTime(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return getTimeFromFieldVal(fieldValue);
+ } else { // Data received from Read API (Arrow)
+ return getTime(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+
+ private Time getTimeFromFieldVal(FieldValue fieldValue) throws SQLException {
+ if (fieldValue.getValue() != null) {
+ // Time ranges from 00:00:00 to 23:59:59.99999. in BigQuery. Parsing it to java.sql.Time
+ String strTime = fieldValue.getStringValue();
+ String[] timeSplt = strTime.split(":");
+ if (timeSplt.length != 3) {
+ throw new SQLException("Can not parse the value " + strTime + " to java.sql.Time");
+ }
+ int hr = Integer.parseInt(timeSplt[0]);
+ int min = Integer.parseInt(timeSplt[1]);
+ int sec = 0, nanoSec = 0;
+ if (timeSplt[2].contains(".")) {
+ String[] secSplt = timeSplt[2].split("\\.");
+ sec = Integer.parseInt(secSplt[0]);
+ nanoSec = Integer.parseInt(secSplt[1]);
+ } else {
+ sec = Integer.parseInt(timeSplt[2]);
+ }
+ return Time.valueOf(LocalTime.of(hr, min, sec, nanoSec));
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Date getDate(String fieldName) throws SQLException {
+ if (fieldName == null) {
+ throw new SQLException("fieldName can't be null");
+ }
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(fieldName);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : Date.valueOf(fieldValue.getStringValue());
+ } else { // Data received from Read API (Arrow)
+ Row curRow = (Row) cursor;
+ if (!curRow.hasField(fieldName)) {
+ throw new SQLException(String.format("Field %s not found", fieldName));
+ }
+ Object dateObj = curRow.get(fieldName);
+ if (dateObj == null) {
+ return null;
+ } else {
+ Integer dateInt = (Integer) dateObj;
+ long dateInMillis =
+ TimeUnit.DAYS.toMillis(
+ Long.valueOf(
+ dateInt)); // For example int 18993 represents 2022-01-01, converting time to
+ // milli seconds
+ return new Date(dateInMillis);
+ }
+ }
+ }
+
+ @Override
+ public Date getDate(int columnIndex) throws SQLException {
+ if (cursor == null) {
+ throw new BigQuerySQLException(NULL_CURSOR_MSG);
+ } else if (cursor instanceof FieldValueList) {
+ FieldValue fieldValue = ((FieldValueList) cursor).get(columnIndex);
+ return (fieldValue == null || fieldValue.getValue() == null)
+ ? null
+ : Date.valueOf(fieldValue.getStringValue());
+ } else { // Data received from Read API (Arrow)
+ return getDate(schemaFieldList.get(columnIndex).getName());
+ }
+ }
+ }
+
+ @Override
+ public BigQueryResultStats getBigQueryResultStats() {
+ return bigQueryResultStats;
+ }
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultStats.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultStats.java
new file mode 100644
index 000000000..a4c37a9b6
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultStats.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.api.core.BetaApi;
+import com.google.cloud.bigquery.JobStatistics.QueryStatistics;
+import com.google.cloud.bigquery.JobStatistics.SessionInfo;
+
+public interface BigQueryResultStats {
+
+ /** Returns query statistics of a query job */
+ @BetaApi
+ QueryStatistics getQueryStatistics();
+
+ /**
+ * Returns SessionInfo contains information about the session if this job is part of one.
+ * JobStatistics2 model class does not allow setSessionInfo so this cannot be set as part of
+ * QueryStatistics when we use jobs.query API.
+ */
+ @BetaApi
+ SessionInfo getSessionInfo();
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultStatsImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultStatsImpl.java
new file mode 100644
index 000000000..53d67f8f3
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultStatsImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.cloud.bigquery.JobStatistics.QueryStatistics;
+import com.google.cloud.bigquery.JobStatistics.SessionInfo;
+
+public class BigQueryResultStatsImpl implements BigQueryResultStats {
+
+ private final QueryStatistics queryStatistics;
+ private final SessionInfo sessionInfo;
+
+ public BigQueryResultStatsImpl(QueryStatistics queryStatistics, SessionInfo sessionInfo) {
+ this.queryStatistics = queryStatistics;
+ this.sessionInfo = sessionInfo;
+ }
+
+ @Override
+ public QueryStatistics getQueryStatistics() {
+ return queryStatistics;
+ }
+
+ @Override
+ public SessionInfo getSessionInfo() {
+ return sessionInfo;
+ }
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuerySQLException.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuerySQLException.java
new file mode 100644
index 000000000..672c6ad3f
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuerySQLException.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * BigQuery service exception.
+ *
+ * @see Google Cloud BigQuery
+ * error codes
+ */
+public final class BigQuerySQLException extends SQLException {
+
+ private static final long serialVersionUID = -5006625989225438209L;
+ private final List errors;
+
+ public BigQuerySQLException() {
+ this.errors = null;
+ }
+
+ public BigQuerySQLException(
+ String msg) { // overloaded constructor with just message as an argument
+ super(msg);
+ this.errors = null;
+ }
+
+ public BigQuerySQLException(List errors) {
+ this.errors = errors;
+ }
+
+ public BigQuerySQLException(List errors, Throwable cause) {
+ super(cause != null ? cause.toString() : null);
+ this.errors = errors;
+ }
+
+ public BigQuerySQLException(String reason, List errors) {
+ super(reason);
+ this.errors = errors;
+ }
+
+ public BigQuerySQLException(String reason, Throwable cause, List errors) {
+ super(reason, cause);
+ this.errors = errors;
+ }
+
+ public BigQuerySQLException(String reason, String sqlState, List errors) {
+ super(reason, sqlState);
+ this.errors = errors;
+ }
+
+ public BigQuerySQLException(
+ String reason, String sqlState, int errorCode, List errors) {
+ super(reason, sqlState, errorCode);
+ this.errors = errors;
+ }
+
+ public BigQuerySQLException(
+ String reason, String sqlState, int errorCode, Throwable cause, List errors) {
+ super(reason, sqlState, errorCode, cause);
+ this.errors = errors;
+ }
+
+ /**
+ * Returns a list of {@link BigQueryError}s that caused this exception. Returns {@code null} if
+ * none exists.
+ */
+ public List getErrors() {
+ return errors;
+ }
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Connection.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Connection.java
new file mode 100644
index 000000000..109838d8b
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Connection.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.api.core.BetaApi;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Connection is a session between a Java application and BigQuery. SQL statements are executed
+ * and results are returned within the context of a connection.
+ */
+public interface Connection {
+
+ /** Sends a query cancel request. This call will return immediately */
+ @BetaApi
+ boolean close() throws BigQuerySQLException;
+
+ /**
+ * Execute a query dry run that returns information on the schema and query parameters of the
+ * query results.
+ *
+ * @param sql typically a static SQL SELECT statement
+ * @exception BigQuerySQLException if a database access error occurs
+ */
+ @BetaApi
+ BigQueryDryRunResult dryRun(String sql) throws BigQuerySQLException;
+
+ /**
+ * Execute a SQL statement that returns a single ResultSet.
+ *
+ *
+ *
+ * @param sql a static SQL SELECT statement
+ * @return a ResultSet that contains the data produced by the query
+ * @exception BigQuerySQLException if a database access error occurs
+ */
+ @BetaApi
+ BigQueryResult executeSelect(String sql) throws BigQuerySQLException;
+
+ /**
+ * This method executes a SQL SELECT query
+ *
+ * @param sql SQL SELECT query
+ * @param parameters named or positional parameters. The set of query parameters must either be
+ * all positional or all named parameters.
+ * @param labels (optional) the labels associated with this query. You can use these to organize
+ * and group your query jobs. Label keys and values can be no longer than 63 characters, can
+ * only contain lowercase letters, numeric characters, underscores and dashes. International
+ * characters are allowed. Label values are optional and Label is a Varargs. You should pass
+ * all the Labels in a single Map .Label keys must start with a letter and each label in the
+ * list must have a different key.
+ * @return BigQueryResult containing the output of the query
+ * @throws BigQuerySQLException
+ */
+ @BetaApi
+ BigQueryResult executeSelect(
+ String sql, List parameters, Map... labels)
+ throws BigQuerySQLException;
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
new file mode 100644
index 000000000..c24a00888
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
@@ -0,0 +1,1240 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import static com.google.cloud.RetryHelper.runWithRetries;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+
+import com.google.api.core.BetaApi;
+import com.google.api.services.bigquery.model.GetQueryResultsResponse;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.QueryParameter;
+import com.google.api.services.bigquery.model.QueryRequest;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.RetryHelper;
+import com.google.cloud.Tuple;
+import com.google.cloud.bigquery.JobStatistics.QueryStatistics;
+import com.google.cloud.bigquery.JobStatistics.SessionInfo;
+import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
+import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
+import com.google.cloud.bigquery.storage.v1.ArrowSchema;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
+import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
+
+/** Implementation for {@link Connection}, the generic BigQuery connection API (not JDBC). */
+class ConnectionImpl implements Connection {
+
+ private final ConnectionSettings connectionSettings;
+ private final BigQueryOptions bigQueryOptions;
+ private final BigQueryRpc bigQueryRpc;
+ private final BigQueryRetryConfig retryConfig;
+ private final int bufferSize; // buffer size in Producer Thread
+ private final int MAX_PROCESS_QUERY_THREADS_CNT = 5;
+ private final ExecutorService queryTaskExecutor =
+ Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT);
+ private final Logger logger = Logger.getLogger(this.getClass().getName());
+ private BigQueryReadClient bqReadClient;
+ private static final long EXECUTOR_TIMEOUT_SEC = 5;
+
+ ConnectionImpl(
+ ConnectionSettings connectionSettings,
+ BigQueryOptions bigQueryOptions,
+ BigQueryRpc bigQueryRpc,
+ BigQueryRetryConfig retryConfig) {
+ this.connectionSettings = connectionSettings;
+ this.bigQueryOptions = bigQueryOptions;
+ this.bigQueryRpc = bigQueryRpc;
+ this.retryConfig = retryConfig;
+ // Sets a reasonable buffer size (a blocking queue) if user input is suboptimal
+ this.bufferSize =
+ (connectionSettings == null
+ || connectionSettings.getNumBufferedRows() == null
+ || connectionSettings.getNumBufferedRows() < 10000
+ ? 20000
+ : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
+ }
+
+ /**
+ * Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt.
+ * The pageFetcher threat will not request for any subsequent threads after interrupting and
+ * shutdown as soon as any ongoing RPC call returns. The producerWorker will not populate the
+ * buffer with any further records and clear the buffer, put a EoF marker and shutdown.
+ *
+ * @return Boolean value true if the threads were interrupted
+ * @throws BigQuerySQLException
+ */
+ @BetaApi
+ @Override
+ public synchronized boolean close() throws BigQuerySQLException {
+ queryTaskExecutor.shutdownNow();
+ try {
+ queryTaskExecutor.awaitTermination(
+ EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Exception while awaitTermination",
+ e); // Logging InterruptedException instead of throwing the exception back, close method
+ // will return queryTaskExecutor.isShutdown()
+ }
+ return queryTaskExecutor.isShutdown(); // check if the executor has been shutdown
+ }
+
+ /**
+ * This method runs a dry run query
+ *
+ * @param sql SQL SELECT statement
+ * @return BigQueryDryRunResult containing List and Schema
+ * @throws BigQuerySQLException
+ */
+ @BetaApi
+ @Override
+ public BigQueryDryRunResult dryRun(String sql) throws BigQuerySQLException {
+ com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
+ Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
+ List queryParametersPb =
+ dryRunJob.getStatistics().getQuery().getUndeclaredQueryParameters();
+ List queryParameters =
+ Lists.transform(queryParametersPb, QUERY_PARAMETER_FROM_PB_FUNCTION);
+ QueryStatistics queryStatistics = JobStatistics.fromPb(dryRunJob);
+ SessionInfo sessionInfo =
+ queryStatistics.getSessionInfo() == null ? null : queryStatistics.getSessionInfo();
+ BigQueryResultStats bigQueryResultStats =
+ new BigQueryResultStatsImpl(queryStatistics, sessionInfo);
+ return new BigQueryDryRunResultImpl(schema, queryParameters, bigQueryResultStats);
+ }
+
+ /**
+ * This method executes a SQL SELECT query
+ *
+ * @param sql SQL SELECT statement
+ * @return BigQueryResult containing the output of the query
+ * @throws BigQuerySQLException
+ */
+ @BetaApi
+ @Override
+ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
+ try {
+ // use jobs.query if all the properties of connectionSettings are supported
+ if (isFastQuerySupported()) {
+ String projectId = bigQueryOptions.getProjectId();
+ QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null);
+ return queryRpc(projectId, queryRequest, false);
+ }
+ // use jobs.insert otherwise
+ com.google.api.services.bigquery.model.Job queryJob =
+ createQueryJob(sql, connectionSettings, null, null);
+ JobId jobId = JobId.fromPb(queryJob.getJobReference());
+ GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
+ return getResultSet(firstPage, jobId, sql, false);
+ } catch (BigQueryException e) {
+ throw new BigQuerySQLException(e.getMessage(), e, e.getErrors());
+ }
+ }
+
+ /**
+ * This method executes a SQL SELECT query
+ *
+ * @param sql SQL SELECT query
+ * @param parameters named or positional parameters. The set of query parameters must either be
+ * all positional or all named parameters.
+ * @param labels the labels associated with this query. You can use these to organize and group
+ * your query jobs. Label keys and values can be no longer than 63 characters, can only
+ * contain lowercase letters, numeric characters, underscores and dashes. International
+ * characters are allowed. Label values are optional and Label is a Varargs. You should pass
+ * all the Labels in a single Map .Label keys must start with a letter and each label in the
+ * list must have a different key.
+ * @return BigQueryResult containing the output of the query
+ * @throws BigQuerySQLException
+ */
+ @BetaApi
+ @Override
+ public BigQueryResult executeSelect(
+ String sql, List parameters, Map... labels)
+ throws BigQuerySQLException {
+ Map labelMap = null;
+ if (labels != null
+ && labels.length == 1) { // We expect label as a key value pair in a single Map
+ labelMap = labels[0];
+ }
+ try {
+ // use jobs.query if possible
+ if (isFastQuerySupported()) {
+ final String projectId = bigQueryOptions.getProjectId();
+ final QueryRequest queryRequest =
+ createQueryRequest(connectionSettings, sql, parameters, labelMap);
+ return queryRpc(projectId, queryRequest, parameters != null);
+ }
+ // use jobs.insert otherwise
+ com.google.api.services.bigquery.model.Job queryJob =
+ createQueryJob(sql, connectionSettings, parameters, labelMap);
+ JobId jobId = JobId.fromPb(queryJob.getJobReference());
+ GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
+ return getResultSet(firstPage, jobId, sql, parameters != null);
+ } catch (BigQueryException e) {
+ throw new BigQuerySQLException(e.getMessage(), e, e.getErrors());
+ }
+ }
+
+ @VisibleForTesting
+ BigQueryResult getResultSet(
+ GetQueryResultsResponse firstPage, JobId jobId, String sql, Boolean hasQueryParameters) {
+ if (firstPage.getJobComplete()
+ && firstPage.getTotalRows()
+ != null) { // firstPage.getTotalRows() is null if job is not complete
+ return getSubsequentQueryResultsWithJob(
+ firstPage.getTotalRows().longValue(),
+ (long) firstPage.getRows().size(),
+ jobId,
+ firstPage,
+ hasQueryParameters);
+ } else { // job is still running, use dryrun to get Schema
+ com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
+ Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
+ // TODO: check how can we get totalRows and pageRows while the job is still running.
+ // `firstPage.getTotalRows()` returns null
+ return getSubsequentQueryResultsWithJob(
+ null, null, jobId, firstPage, schema, hasQueryParameters);
+ }
+ }
+
+ static class EndOfFieldValueList
+ extends AbstractList<
+ FieldValue> { // A reference of this class is used as a token to inform the thread
+ // consuming `buffer` BigQueryResultImpl that we have run out of records
+ @Override
+ public FieldValue get(int index) {
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+ }
+
+ private BigQueryResult queryRpc(
+ final String projectId, final QueryRequest queryRequest, Boolean hasQueryParameters) {
+ com.google.api.services.bigquery.model.QueryResponse results;
+ try {
+ results =
+ BigQueryRetryHelper.runWithRetries(
+ () -> bigQueryRpc.queryRpc(projectId, queryRequest),
+ bigQueryOptions.getRetrySettings(),
+ BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
+ bigQueryOptions.getClock(),
+ retryConfig);
+ } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
+ throw BigQueryException.translateAndThrow(e);
+ }
+
+ if (results.getErrors() != null) {
+ List bigQueryErrors =
+ results.getErrors().stream()
+ .map(BigQueryError.FROM_PB_FUNCTION)
+ .collect(Collectors.toList());
+ // Throwing BigQueryException since there may be no JobId, and we want to stay consistent
+ // with the case where there is an HTTP error
+ throw new BigQueryException(bigQueryErrors);
+ }
+
+ // Query finished running and we can paginate all the results
+ if (results.getJobComplete() && results.getSchema() != null) {
+ return processQueryResponseResults(results);
+ } else {
+ // Query is long-running (> 10s) and hasn't completed yet, or query completed but didn't
+ // return the schema, fallback to jobs.insert path. Some operations don't return the schema
+ // and can be optimized here, but this is left as future work.
+ Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue();
+ Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size());
+ JobId jobId = JobId.fromPb(results.getJobReference());
+ GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
+ return getSubsequentQueryResultsWithJob(
+ totalRows, pageRows, jobId, firstPage, hasQueryParameters);
+ }
+ }
+
+ @VisibleForTesting
+ BigQueryResultStats getBigQueryResultSetStats(JobId jobId) {
+ // Create GetQueryResultsResponse query statistics
+ Job queryJob = getQueryJobRpc(jobId);
+ QueryStatistics queryStatistics = queryJob.getStatistics();
+ SessionInfo sessionInfo =
+ queryStatistics.getSessionInfo() == null ? null : queryStatistics.getSessionInfo();
+ return new BigQueryResultStatsImpl(queryStatistics, sessionInfo);
+ }
+ /* This method processed the first page of GetQueryResultsResponse and then it uses tabledata.list */
+ @VisibleForTesting
+ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
+ Schema schema;
+ long numRows;
+ schema = Schema.fromPb(firstPage.getSchema());
+ numRows = firstPage.getTotalRows().longValue();
+
+ BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId);
+
+ // Keeps the deserialized records at the row level, which is consumed by BigQueryResult
+ BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize);
+
+ // Keeps the parsed FieldValueLists
+ BlockingQueue, Boolean>> pageCache =
+ new LinkedBlockingDeque<>(
+ getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
+
+ // Keeps the raw RPC responses
+ BlockingQueue> rpcResponseQueue =
+ new LinkedBlockingDeque<>(
+ getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
+
+ runNextPageTaskAsync(firstPage.getPageToken(), getDestinationTable(jobId), rpcResponseQueue);
+
+ parseRpcDataAsync(
+ firstPage.getRows(),
+ schema,
+ pageCache,
+ rpcResponseQueue); // parses data on a separate thread, thus maximising processing
+ // throughput
+
+ populateBufferAsync(
+ rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer
+
+ // This will work for pagination as well, as buffer is getting updated asynchronously
+ return new BigQueryResultImpl>(
+ schema, numRows, buffer, bigQueryResultStats);
+ }
+
+ @VisibleForTesting
+ BigQueryResult processQueryResponseResults(
+ com.google.api.services.bigquery.model.QueryResponse results) {
+ Schema schema;
+ long numRows;
+ schema = Schema.fromPb(results.getSchema());
+ numRows =
+ results.getTotalRows() == null
+ ? 0
+ : results.getTotalRows().longValue(); // in case of DML or DDL
+ // QueryResponse only provides cache hits, dmlStats, and sessionInfo as query processing
+ // statistics
+ DmlStats dmlStats =
+ results.getDmlStats() == null ? null : DmlStats.fromPb(results.getDmlStats());
+ Boolean cacheHit = results.getCacheHit();
+ QueryStatistics queryStatistics =
+ QueryStatistics.newBuilder().setDmlStats(dmlStats).setCacheHit(cacheHit).build();
+ // We cannot directly set sessionInfo in QueryStatistics
+ SessionInfo sessionInfo =
+ results.getSessionInfo() == null
+ ? null
+ : JobStatistics.SessionInfo.fromPb(results.getSessionInfo());
+ BigQueryResultStats bigQueryResultStats =
+ new BigQueryResultStatsImpl(queryStatistics, sessionInfo);
+
+ BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize);
+ BlockingQueue, Boolean>> pageCache =
+ new LinkedBlockingDeque<>(
+ getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
+ BlockingQueue> rpcResponseQueue =
+ new LinkedBlockingDeque<>(
+ getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
+
+ JobId jobId = JobId.fromPb(results.getJobReference());
+
+ // Thread to make rpc calls to fetch data from the server
+ runNextPageTaskAsync(results.getPageToken(), getDestinationTable(jobId), rpcResponseQueue);
+
+ // Thread to parse data received from the server to client library objects
+ parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue);
+
+ // Thread to populate the buffer (a blocking queue) shared with the consumer
+ populateBufferAsync(rpcResponseQueue, pageCache, buffer);
+
+ return new BigQueryResultImpl>(
+ schema, numRows, buffer, bigQueryResultStats);
+ }
+
+ @VisibleForTesting
+ void runNextPageTaskAsync(
+ String firstPageToken,
+ TableId destinationTable,
+ BlockingQueue> rpcResponseQueue) {
+ // This thread makes the RPC calls and paginates
+ Runnable nextPageTask =
+ () -> {
+ String pageToken = firstPageToken; // results.getPageToken();
+ try {
+ while (pageToken != null) { // paginate for non null token
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown
+ break;
+ }
+ TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken);
+ pageToken = tabledataList.getPageToken();
+ rpcResponseQueue.put(
+ Tuple.of(
+ tabledataList,
+ true)); // this will be parsed asynchronously without blocking the current
+ // thread
+ }
+ rpcResponseQueue.put(
+ Tuple.of(
+ null,
+ false)); // this will stop the parseDataTask as well in case of interrupt or
+ // when the pagination completes
+ } catch (Exception e) {
+ throw new BigQueryException(0, e.getMessage(), e);
+ }
+ };
+ queryTaskExecutor.execute(nextPageTask);
+ }
+
+ /*
+ This method takes TableDataList from rpcResponseQueue and populates pageCache with FieldValueList
+ */
+ @VisibleForTesting
+ void parseRpcDataAsync(
+ // com.google.api.services.bigquery.model.QueryResponse results,
+ List tableRows,
+ Schema schema,
+ BlockingQueue, Boolean>> pageCache,
+ BlockingQueue> rpcResponseQueue) {
+
+ // parse and put the first page in the pageCache before the other pages are parsed from the RPC
+ // calls
+ Iterable firstFieldValueLists = getIterableFieldValueList(tableRows, schema);
+ try {
+ pageCache.put(
+ Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received.
+ } catch (InterruptedException e) {
+ throw new BigQueryException(0, e.getMessage(), e);
+ }
+
+ // rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use
+ // thread interrupt here
+ Runnable parseDataTask =
+ () -> {
+ try {
+ boolean hasMorePages = true;
+ while (hasMorePages) {
+ Tuple rpcResponse = rpcResponseQueue.take();
+ TableDataList tabledataList = rpcResponse.x();
+ hasMorePages = rpcResponse.y();
+ if (tabledataList != null) {
+ Iterable fieldValueLists =
+ getIterableFieldValueList(tabledataList.getRows(), schema); // Parse
+ pageCache.put(Tuple.of(fieldValueLists, true));
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted",
+ e); // Thread might get interrupted while calling the Cancel method, which is
+ // expected, so logging this instead of throwing the exception back
+ }
+ try {
+ pageCache.put(Tuple.of(null, false)); // no further pages
+ } catch (InterruptedException e) {
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted",
+ e); // Thread might get interrupted while calling the Cancel method, which is
+ // expected, so logging this instead of throwing the exception back
+ }
+ };
+ queryTaskExecutor.execute(parseDataTask);
+ }
+
+ @VisibleForTesting
+ void populateBufferAsync(
+ BlockingQueue> rpcResponseQueue,
+ BlockingQueue, Boolean>> pageCache,
+ BlockingQueue> buffer) {
+ Runnable populateBufferRunnable =
+ () -> { // producer thread populating the buffer
+ Iterable fieldValueLists = null;
+ boolean hasRows = true; // as we have to process the first page
+ while (hasRows) {
+ try {
+ Tuple, Boolean> nextPageTuple = pageCache.take();
+ hasRows = nextPageTuple.y();
+ fieldValueLists = nextPageTuple.x();
+ } catch (InterruptedException e) {
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted",
+ e); // Thread might get interrupted while calling the Cancel method, which is
+ // expected, so logging this instead of throwing the exception back
+ }
+
+ if (Thread.currentThread().isInterrupted()
+ || fieldValueLists
+ == null) { // do not process further pages and shutdown (outerloop)
+ break;
+ }
+
+ for (FieldValueList fieldValueList : fieldValueLists) {
+ try {
+ if (Thread.currentThread()
+ .isInterrupted()) { // do not process further pages and shutdown (inner loop)
+ break;
+ }
+ buffer.put(fieldValueList);
+ } catch (InterruptedException e) {
+ throw new BigQueryException(0, e.getMessage(), e);
+ }
+ }
+ }
+
+ if (Thread.currentThread()
+ .isInterrupted()) { // clear the buffer for any outstanding records
+ buffer.clear();
+ rpcResponseQueue
+ .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
+ // could trigger
+ }
+
+ try {
+ buffer.put(
+ new EndOfFieldValueList()); // All the pages has been processed, put this marker
+ } catch (InterruptedException e) {
+ throw new BigQueryException(0, e.getMessage(), e);
+ } finally {
+ queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
+ }
+ };
+
+ queryTaskExecutor.execute(populateBufferRunnable);
+ }
+
+ /* Helper method that parse and populate a page with TableRows */
+ private static Iterable getIterableFieldValueList(
+ Iterable tableDataPb, final Schema schema) {
+ return ImmutableList.copyOf(
+ Iterables.transform(
+ tableDataPb != null ? tableDataPb : ImmutableList.of(),
+ new Function() {
+ final FieldList fields = schema != null ? schema.getFields() : null;
+
+ @Override
+ public FieldValueList apply(TableRow rowPb) {
+ return FieldValueList.fromPb(rowPb.getF(), fields);
+ }
+ }));
+ }
+
+ /* Helper method that determines the optimal number of caches pages to improve read performance */
+ @VisibleForTesting
+ int getPageCacheSize(Integer numBufferedRows, Schema schema) {
+ final int MIN_CACHE_SIZE = 3; // Min number of pages to cache
+ final int MAX_CACHE_SIZE = 20; // //Min number of pages to cache
+ int numColumns = schema.getFields().size();
+ int numCachedPages;
+ long numCachedRows = numBufferedRows == null ? 0 : numBufferedRows.longValue();
+
+ // TODO: Further enhance this logic depending on customer feedback on memory consumption
+ if (numCachedRows > 10000) {
+ numCachedPages =
+ 2; // the size of numBufferedRows is quite large and as per our tests we should be able to
+ // do enough even with low
+ } else if (numColumns > 15
+ && numCachedRows
+ > 5000) { // too many fields are being read, setting the page size on the lower end
+ numCachedPages = 3;
+ } else if (numCachedRows < 2000
+ && numColumns < 15) { // low pagesize with fewer number of columns, we can cache more pages
+ numCachedPages = 20;
+ } else { // default - under 10K numCachedRows with any number of columns
+ numCachedPages = 5;
+ }
+ return numCachedPages < MIN_CACHE_SIZE
+ ? MIN_CACHE_SIZE
+ : (Math.min(
+ numCachedPages,
+ MAX_CACHE_SIZE)); // numCachedPages should be between the defined min and max
+ }
+
+ /* Returns query results using either tabledata.list or the high throughput Read API */
+ @VisibleForTesting
+ BigQueryResult getSubsequentQueryResultsWithJob(
+ Long totalRows,
+ Long pageRows,
+ JobId jobId,
+ GetQueryResultsResponse firstPage,
+ Boolean hasQueryParameters) {
+ TableId destinationTable = getDestinationTable(jobId);
+ return useReadAPI(totalRows, pageRows, Schema.fromPb(firstPage.getSchema()), hasQueryParameters)
+ ? highThroughPutRead(
+ destinationTable,
+ firstPage.getTotalRows().longValue(),
+ Schema.fromPb(firstPage.getSchema()),
+ getBigQueryResultSetStats(
+ jobId)) // discord first page and stream the entire BigQueryResult using
+ // the Read API
+ : tableDataList(firstPage, jobId);
+ }
+
+ /* Returns query results using either tabledata.list or the high throughput Read API */
+ @VisibleForTesting
+ BigQueryResult getSubsequentQueryResultsWithJob(
+ Long totalRows,
+ Long pageRows,
+ JobId jobId,
+ GetQueryResultsResponse firstPage,
+ Schema schema,
+ Boolean hasQueryParameters) {
+ TableId destinationTable = getDestinationTable(jobId);
+ return useReadAPI(totalRows, pageRows, schema, hasQueryParameters)
+ ? highThroughPutRead(
+ destinationTable,
+ totalRows == null
+ ? -1L
+ : totalRows, // totalRows is null when the job is still running. TODO: Check if
+ // any workaround is possible
+ schema,
+ getBigQueryResultSetStats(
+ jobId)) // discord first page and stream the entire BigQueryResult using
+ // the Read API
+ : tableDataList(firstPage, jobId);
+ }
+
+ /* Returns Job from jobId by calling the jobs.get API */
+ private Job getQueryJobRpc(JobId jobId) {
+ final JobId completeJobId =
+ jobId
+ .setProjectId(bigQueryOptions.getProjectId())
+ .setLocation(
+ jobId.getLocation() == null && bigQueryOptions.getLocation() != null
+ ? bigQueryOptions.getLocation()
+ : jobId.getLocation());
+ com.google.api.services.bigquery.model.Job jobPb;
+ try {
+ jobPb =
+ runWithRetries(
+ () ->
+ bigQueryRpc.getQueryJob(
+ completeJobId.getProject(),
+ completeJobId.getJob(),
+ completeJobId.getLocation()),
+ bigQueryOptions.getRetrySettings(),
+ BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
+ bigQueryOptions.getClock());
+ if (bigQueryOptions.getThrowNotFound() && jobPb == null) {
+ throw new BigQueryException(HTTP_NOT_FOUND, "Query job not found");
+ }
+ } catch (RetryHelper.RetryHelperException e) {
+ throw BigQueryException.translateAndThrow(e);
+ }
+ return Job.fromPb(bigQueryOptions.getService(), jobPb);
+ }
+
+ /* Returns the destinationTable from jobId by calling jobs.get API */
+ @VisibleForTesting
+ TableId getDestinationTable(JobId jobId) {
+ Job job = getQueryJobRpc(jobId);
+ return ((QueryJobConfiguration) job.getConfiguration()).getDestinationTable();
+ }
+
+ @VisibleForTesting
+ TableDataList tableDataListRpc(TableId destinationTable, String pageToken) {
+ try {
+ final TableId completeTableId =
+ destinationTable.setProjectId(
+ Strings.isNullOrEmpty(destinationTable.getProject())
+ ? bigQueryOptions.getProjectId()
+ : destinationTable.getProject());
+ TableDataList results =
+ runWithRetries(
+ () ->
+ bigQueryOptions
+ .getBigQueryRpcV2()
+ .listTableDataWithRowLimit(
+ completeTableId.getProject(),
+ completeTableId.getDataset(),
+ completeTableId.getTable(),
+ connectionSettings.getMaxResultPerPage(),
+ pageToken),
+ bigQueryOptions.getRetrySettings(),
+ BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
+ bigQueryOptions.getClock());
+
+ return results;
+ } catch (RetryHelper.RetryHelperException e) {
+ throw BigQueryException.translateAndThrow(e);
+ }
+ }
+
+ @VisibleForTesting
+ BigQueryResult highThroughPutRead(
+ TableId destinationTable, long totalRows, Schema schema, BigQueryResultStats stats) {
+
+ try {
+ if (bqReadClient == null) { // if the read client isn't already initialized. Not thread safe.
+ bqReadClient = BigQueryReadClient.create();
+ }
+ String parent = String.format("projects/%s", destinationTable.getProject());
+ String srcTable =
+ String.format(
+ "projects/%s/datasets/%s/tables/%s",
+ destinationTable.getProject(),
+ destinationTable.getDataset(),
+ destinationTable.getTable());
+
+ // Read all the columns if the source table (temp table) and stream the data back in Arrow
+ // format
+ ReadSession.Builder sessionBuilder =
+ ReadSession.newBuilder().setTable(srcTable).setDataFormat(DataFormat.ARROW);
+
+ CreateReadSessionRequest.Builder builder =
+ CreateReadSessionRequest.newBuilder()
+ .setParent(parent)
+ .setReadSession(sessionBuilder)
+ .setMaxStreamCount(1) // Currently just one stream is allowed
+ // DO a regex check using order by and use multiple streams
+ ;
+
+ ReadSession readSession = bqReadClient.createReadSession(builder.build());
+ BlockingQueue buffer = new LinkedBlockingDeque<>(bufferSize);
+ Map arrowNameToIndex = new HashMap<>();
+ // deserialize and populate the buffer async, so that the client isn't blocked
+ processArrowStreamAsync(
+ readSession,
+ buffer,
+ new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex),
+ schema);
+
+ logger.log(Level.INFO, "\n Using BigQuery Read API");
+ return new BigQueryResultImpl(schema, totalRows, buffer, stats);
+
+ } catch (IOException e) {
+ throw BigQueryException.translateAndThrow(e);
+ }
+ }
+
+ private void processArrowStreamAsync(
+ ReadSession readSession,
+ BlockingQueue buffer,
+ ArrowRowReader reader,
+ Schema schema) {
+
+ Runnable arrowStreamProcessor =
+ () -> {
+ try {
+ // Use the first stream to perform reading.
+ String streamName = readSession.getStreams(0).getName();
+ ReadRowsRequest readRowsRequest =
+ ReadRowsRequest.newBuilder().setReadStream(streamName).build();
+
+ // Process each block of rows as they arrive and decode using our simple row reader.
+ com.google.api.gax.rpc.ServerStream stream =
+ bqReadClient.readRowsCallable().call(readRowsRequest);
+ for (ReadRowsResponse response : stream) {
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()) { // do not process and shutdown
+ break;
+ }
+ reader.processRows(response.getArrowRecordBatch(), buffer, schema);
+ }
+
+ } catch (Exception e) {
+ throw BigQueryException.translateAndThrow(e);
+ } finally {
+ try {
+ buffer.put(new BigQueryResultImpl.Row(null, true)); // marking end of stream
+ queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, "\n Error occurred ", e);
+ }
+ }
+ };
+
+ queryTaskExecutor.execute(arrowStreamProcessor);
+ }
+
+ private class ArrowRowReader
+ implements AutoCloseable { // TODO: Update to recent version of Arrow to avoid memoryleak
+
+ BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+
+ // Decoder object will be reused to avoid re-allocation and too much garbage collection.
+ private final VectorSchemaRoot root;
+ private final VectorLoader loader;
+
+ private ArrowRowReader(ArrowSchema arrowSchema, Map arrowNameToIndex)
+ throws IOException {
+ org.apache.arrow.vector.types.pojo.Schema schema =
+ MessageSerializer.deserializeSchema(
+ new org.apache.arrow.vector.ipc.ReadChannel(
+ new ByteArrayReadableSeekableByteChannel(
+ arrowSchema.getSerializedSchema().toByteArray())));
+ List vectors = new ArrayList<>();
+ List fields = schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ vectors.add(fields.get(i).createVector(allocator));
+ arrowNameToIndex.put(
+ fields.get(i).getName(),
+ i); // mapping for getting against the field name in the result set
+ }
+ root = new VectorSchemaRoot(vectors);
+ loader = new VectorLoader(root);
+ }
+
+ /** @param batch object returned from the ReadRowsResponse. */
+ private void processRows(
+ ArrowRecordBatch batch, BlockingQueue buffer, Schema schema)
+ throws IOException { // deserialize the values and consume the hash of the values
+ try {
+ org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
+ MessageSerializer.deserializeRecordBatch(
+ new ReadChannel(
+ new ByteArrayReadableSeekableByteChannel(
+ batch.getSerializedRecordBatch().toByteArray())),
+ allocator);
+
+ loader.load(deserializedBatch);
+ // Release buffers from batch (they are still held in the vectors in root).
+ deserializedBatch.close();
+
+ // Parse the vectors using BQ Schema. Deserialize the data at the row level and add it to
+ // the
+ // buffer
+ FieldList fields = schema.getFields();
+ for (int rowNum = 0;
+ rowNum < root.getRowCount();
+ rowNum++) { // for the given number of rows in the batch
+
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()) { // do not process and shutdown
+ break; // exit the loop, root will be cleared in the finally block
+ }
+
+ Map curRow = new HashMap<>();
+ for (int col = 0; col < fields.size(); col++) { // iterate all the vectors for a given row
+ com.google.cloud.bigquery.Field field = fields.get(col);
+ FieldVector curFieldVec =
+ root.getVector(
+ field.getName()); // can be accessed using the index or Vector/column name
+ curRow.put(field.getName(), curFieldVec.getObject(rowNum)); // Added the raw value
+ }
+ buffer.put(new BigQueryResultImpl.Row(curRow));
+ }
+ root.clear(); // TODO: make sure to clear the root while implementing the thread
+ // interruption logic (Connection.close method)
+
+ } catch (RuntimeException | InterruptedException e) {
+ throw BigQueryException.translateAndThrow(e);
+ } finally {
+ try {
+ root.clear();
+ } catch (RuntimeException e) {
+ logger.log(Level.WARNING, "\n Error while clearing VectorSchemaRoot ", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ root.close();
+ allocator.close();
+ }
+ }
+ /*Returns just the first page of GetQueryResultsResponse using the jobId*/
+ @VisibleForTesting
+ GetQueryResultsResponse getQueryResultsFirstPage(JobId jobId) {
+ JobId completeJobId =
+ jobId
+ .setProjectId(bigQueryOptions.getProjectId())
+ .setLocation(
+ jobId.getLocation() == null && bigQueryOptions.getLocation() != null
+ ? bigQueryOptions.getLocation()
+ : jobId.getLocation());
+ try {
+ GetQueryResultsResponse results =
+ BigQueryRetryHelper.runWithRetries(
+ () ->
+ bigQueryRpc.getQueryResultsWithRowLimit(
+ completeJobId.getProject(),
+ completeJobId.getJob(),
+ completeJobId.getLocation(),
+ connectionSettings.getMaxResultPerPage()),
+ bigQueryOptions.getRetrySettings(),
+ BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
+ bigQueryOptions.getClock(),
+ retryConfig);
+
+ if (results.getErrors() != null) {
+ List bigQueryErrors =
+ results.getErrors().stream()
+ .map(BigQueryError.FROM_PB_FUNCTION)
+ .collect(Collectors.toList());
+ // Throwing BigQueryException since there may be no JobId and we want to stay consistent
+ // with the case where there there is a HTTP error
+ throw new BigQueryException(bigQueryErrors);
+ }
+ return results;
+ } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
+ throw BigQueryException.translateAndThrow(e);
+ }
+ }
+
+ @VisibleForTesting
+ boolean isFastQuerySupported() {
+ // TODO: add regex logic to check for scripting
+ return connectionSettings.getClustering() == null
+ && connectionSettings.getCreateDisposition() == null
+ && connectionSettings.getDestinationEncryptionConfiguration() == null
+ && connectionSettings.getDestinationTable() == null
+ && connectionSettings.getJobTimeoutMs() == null
+ && connectionSettings.getMaximumBillingTier() == null
+ && connectionSettings.getPriority() == null
+ && connectionSettings.getRangePartitioning() == null
+ && connectionSettings.getSchemaUpdateOptions() == null
+ && connectionSettings.getTableDefinitions() == null
+ && connectionSettings.getTimePartitioning() == null
+ && connectionSettings.getUserDefinedFunctions() == null
+ && connectionSettings.getWriteDisposition() == null;
+ }
+
+ @VisibleForTesting
+ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) {
+
+ // TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job
+ // is not complete
+ if ((totalRows == null || pageRows == null)
+ && Boolean.TRUE.equals(
+ connectionSettings
+ .getUseReadAPI())) { // totalRows and pageRows are returned null when the job is not
+ // complete
+ return true;
+ }
+
+ // Schema schema = Schema.fromPb(tableSchema);
+ // Read API does not yet support Interval Type or QueryParameters
+ if (containsIntervalType(schema) || hasQueryParameters) {
+ logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI");
+ return false;
+ }
+
+ long resultRatio = totalRows / pageRows;
+ if (Boolean.TRUE.equals(connectionSettings.getUseReadAPI())) {
+ return resultRatio >= connectionSettings.getTotalToPageRowCountRatio()
+ && totalRows > connectionSettings.getMinResultSize();
+ } else {
+ return false;
+ }
+ }
+
+ // Does a BFS iteration to find out if there's an interval type in the schema. Implementation to
+ // be used until ReadAPI supports IntervalType
+ private boolean containsIntervalType(Schema schema) {
+ Queue fields =
+ new LinkedList(schema.getFields());
+ while (!fields.isEmpty()) {
+ com.google.cloud.bigquery.Field curField = fields.poll();
+ if (curField.getType().getStandardType() == StandardSQLTypeName.INTERVAL) {
+ return true;
+ } else if (curField.getType().getStandardType() == StandardSQLTypeName.STRUCT
+ || curField.getType().getStandardType() == StandardSQLTypeName.ARRAY) {
+ fields.addAll(curField.getSubFields());
+ }
+ }
+ return false;
+ }
+
+ // Used for job.query API endpoint
+ @VisibleForTesting
+ QueryRequest createQueryRequest(
+ ConnectionSettings connectionSettings,
+ String sql,
+ List queryParameters,
+ Map labels) {
+ QueryRequest content = new QueryRequest();
+ String requestId = UUID.randomUUID().toString();
+
+ if (connectionSettings.getConnectionProperties() != null) {
+ content.setConnectionProperties(
+ connectionSettings.getConnectionProperties().stream()
+ .map(ConnectionProperty.TO_PB_FUNCTION)
+ .collect(Collectors.toList()));
+ }
+ if (connectionSettings.getDefaultDataset() != null) {
+ content.setDefaultDataset(connectionSettings.getDefaultDataset().toPb());
+ }
+ if (connectionSettings.getMaximumBytesBilled() != null) {
+ content.setMaximumBytesBilled(connectionSettings.getMaximumBytesBilled());
+ }
+ if (connectionSettings.getMaxResults() != null) {
+ content.setMaxResults(connectionSettings.getMaxResults());
+ }
+ if (queryParameters != null) {
+ // content.setQueryParameters(queryParameters);
+ if (queryParameters.get(0).getName() == null) {
+ // If query parameter name is unset, then assume mode is positional
+ content.setParameterMode("POSITIONAL");
+ // pass query parameters
+ List queryParametersPb =
+ Lists.transform(queryParameters, POSITIONAL_PARAMETER_TO_PB_FUNCTION);
+ content.setQueryParameters(queryParametersPb);
+ } else {
+ content.setParameterMode("NAMED");
+ // pass query parameters
+ List queryParametersPb =
+ Lists.transform(queryParameters, NAMED_PARAMETER_TO_PB_FUNCTION);
+ content.setQueryParameters(queryParametersPb);
+ }
+ }
+ if (connectionSettings.getCreateSession() != null) {
+ content.setCreateSession(connectionSettings.getCreateSession());
+ }
+ if (labels != null) {
+ content.setLabels(labels);
+ }
+ content.setQuery(sql);
+ content.setRequestId(requestId);
+ // The new Connection interface only supports StandardSQL dialect
+ content.setUseLegacySql(false);
+ return content;
+ }
+
+ // Used by jobs.getQueryResults API endpoint
+ @VisibleForTesting
+ com.google.api.services.bigquery.model.Job createQueryJob(
+ String sql,
+ ConnectionSettings connectionSettings,
+ List queryParameters,
+ Map labels) {
+ com.google.api.services.bigquery.model.JobConfiguration configurationPb =
+ new com.google.api.services.bigquery.model.JobConfiguration();
+ JobConfigurationQuery queryConfigurationPb = new JobConfigurationQuery();
+ queryConfigurationPb.setQuery(sql);
+ if (queryParameters != null) {
+ if (queryParameters.get(0).getName() == null) {
+ // If query parameter name is unset, then assume mode is positional
+ queryConfigurationPb.setParameterMode("POSITIONAL");
+ // pass query parameters
+ List queryParametersPb =
+ Lists.transform(queryParameters, POSITIONAL_PARAMETER_TO_PB_FUNCTION);
+ queryConfigurationPb.setQueryParameters(queryParametersPb);
+ } else {
+ queryConfigurationPb.setParameterMode("NAMED");
+ // pass query parameters
+ List queryParametersPb =
+ Lists.transform(queryParameters, NAMED_PARAMETER_TO_PB_FUNCTION);
+ queryConfigurationPb.setQueryParameters(queryParametersPb);
+ }
+ }
+ if (connectionSettings.getDestinationTable() != null) {
+ queryConfigurationPb.setDestinationTable(connectionSettings.getDestinationTable().toPb());
+ }
+ if (connectionSettings.getTableDefinitions() != null) {
+ queryConfigurationPb.setTableDefinitions(
+ Maps.transformValues(
+ connectionSettings.getTableDefinitions(),
+ ExternalTableDefinition.TO_EXTERNAL_DATA_FUNCTION));
+ }
+ if (connectionSettings.getUserDefinedFunctions() != null) {
+ queryConfigurationPb.setUserDefinedFunctionResources(
+ connectionSettings.getUserDefinedFunctions().stream()
+ .map(UserDefinedFunction.TO_PB_FUNCTION)
+ .collect(Collectors.toList()));
+ }
+ if (connectionSettings.getCreateDisposition() != null) {
+ queryConfigurationPb.setCreateDisposition(
+ connectionSettings.getCreateDisposition().toString());
+ }
+ if (connectionSettings.getWriteDisposition() != null) {
+ queryConfigurationPb.setWriteDisposition(connectionSettings.getWriteDisposition().toString());
+ }
+ if (connectionSettings.getDefaultDataset() != null) {
+ queryConfigurationPb.setDefaultDataset(connectionSettings.getDefaultDataset().toPb());
+ }
+ if (connectionSettings.getPriority() != null) {
+ queryConfigurationPb.setPriority(connectionSettings.getPriority().toString());
+ }
+ if (connectionSettings.getAllowLargeResults() != null) {
+ queryConfigurationPb.setAllowLargeResults(connectionSettings.getAllowLargeResults());
+ }
+ if (connectionSettings.getUseQueryCache() != null) {
+ queryConfigurationPb.setUseQueryCache(connectionSettings.getUseQueryCache());
+ }
+ if (connectionSettings.getFlattenResults() != null) {
+ queryConfigurationPb.setFlattenResults(connectionSettings.getFlattenResults());
+ }
+ if (connectionSettings.getMaximumBillingTier() != null) {
+ queryConfigurationPb.setMaximumBillingTier(connectionSettings.getMaximumBillingTier());
+ }
+ if (connectionSettings.getMaximumBytesBilled() != null) {
+ queryConfigurationPb.setMaximumBytesBilled(connectionSettings.getMaximumBytesBilled());
+ }
+ if (connectionSettings.getSchemaUpdateOptions() != null) {
+ ImmutableList.Builder schemaUpdateOptionsBuilder = new ImmutableList.Builder<>();
+ for (JobInfo.SchemaUpdateOption schemaUpdateOption :
+ connectionSettings.getSchemaUpdateOptions()) {
+ schemaUpdateOptionsBuilder.add(schemaUpdateOption.name());
+ }
+ queryConfigurationPb.setSchemaUpdateOptions(schemaUpdateOptionsBuilder.build());
+ }
+ if (connectionSettings.getDestinationEncryptionConfiguration() != null) {
+ queryConfigurationPb.setDestinationEncryptionConfiguration(
+ connectionSettings.getDestinationEncryptionConfiguration().toPb());
+ }
+ if (connectionSettings.getTimePartitioning() != null) {
+ queryConfigurationPb.setTimePartitioning(connectionSettings.getTimePartitioning().toPb());
+ }
+ if (connectionSettings.getClustering() != null) {
+ queryConfigurationPb.setClustering(connectionSettings.getClustering().toPb());
+ }
+ if (connectionSettings.getRangePartitioning() != null) {
+ queryConfigurationPb.setRangePartitioning(connectionSettings.getRangePartitioning().toPb());
+ }
+ if (connectionSettings.getConnectionProperties() != null) {
+ queryConfigurationPb.setConnectionProperties(
+ connectionSettings.getConnectionProperties().stream()
+ .map(ConnectionProperty.TO_PB_FUNCTION)
+ .collect(Collectors.toList()));
+ }
+ if (connectionSettings.getCreateSession() != null) {
+ queryConfigurationPb.setCreateSession(connectionSettings.getCreateSession());
+ }
+ if (connectionSettings.getJobTimeoutMs() != null) {
+ configurationPb.setJobTimeoutMs(connectionSettings.getJobTimeoutMs());
+ }
+ if (labels != null) {
+ configurationPb.setLabels(labels);
+ }
+ // The new Connection interface only supports StandardSQL dialect
+ queryConfigurationPb.setUseLegacySql(false);
+ configurationPb.setQuery(queryConfigurationPb);
+
+ com.google.api.services.bigquery.model.Job jobPb =
+ JobInfo.of(QueryJobConfiguration.fromPb(configurationPb)).toPb();
+ com.google.api.services.bigquery.model.Job queryJob;
+ try {
+ queryJob =
+ BigQueryRetryHelper.runWithRetries(
+ () -> bigQueryRpc.createJobForQuery(jobPb),
+ bigQueryOptions.getRetrySettings(),
+ BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
+ bigQueryOptions.getClock(),
+ retryConfig);
+ } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
+ throw BigQueryException.translateAndThrow(e);
+ }
+ return queryJob;
+ }
+
+ // Used by dryRun
+ private com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
+ com.google.api.services.bigquery.model.JobConfiguration configurationPb =
+ new com.google.api.services.bigquery.model.JobConfiguration();
+ configurationPb.setDryRun(true);
+ JobConfigurationQuery queryConfigurationPb = new JobConfigurationQuery();
+ String parameterMode = sql.contains("?") ? "POSITIONAL" : "NAMED";
+ queryConfigurationPb.setParameterMode(parameterMode);
+ queryConfigurationPb.setQuery(sql);
+ // UndeclaredQueryParameter is only supported in StandardSQL
+ queryConfigurationPb.setUseLegacySql(false);
+ if (connectionSettings.getDefaultDataset() != null) {
+ queryConfigurationPb.setDefaultDataset(connectionSettings.getDefaultDataset().toPb());
+ }
+ if (connectionSettings.getCreateSession() != null) {
+ queryConfigurationPb.setCreateSession(connectionSettings.getCreateSession());
+ }
+ configurationPb.setQuery(queryConfigurationPb);
+
+ com.google.api.services.bigquery.model.Job jobPb =
+ JobInfo.of(QueryJobConfiguration.fromPb(configurationPb)).toPb();
+
+ com.google.api.services.bigquery.model.Job dryRunJob;
+ try {
+ dryRunJob =
+ BigQueryRetryHelper.runWithRetries(
+ () -> bigQueryRpc.createJobForQuery(jobPb),
+ bigQueryOptions.getRetrySettings(),
+ BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
+ bigQueryOptions.getClock(),
+ retryConfig);
+ } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
+ throw BigQueryException.translateAndThrow(e);
+ }
+ return dryRunJob;
+ }
+
+ // Convert from Parameter wrapper class to positional QueryParameter generated class
+ private static final Function POSITIONAL_PARAMETER_TO_PB_FUNCTION =
+ value -> {
+ QueryParameter queryParameterPb = new QueryParameter();
+ queryParameterPb.setParameterValue(value.getValue().toValuePb());
+ queryParameterPb.setParameterType(value.getValue().toTypePb());
+ return queryParameterPb;
+ };
+
+ // Convert from Parameter wrapper class to name QueryParameter generated class
+ private static final Function NAMED_PARAMETER_TO_PB_FUNCTION =
+ value -> {
+ QueryParameter queryParameterPb = new QueryParameter();
+ queryParameterPb.setName(value.getName());
+ queryParameterPb.setParameterValue(value.getValue().toValuePb());
+ queryParameterPb.setParameterType(value.getValue().toTypePb());
+ return queryParameterPb;
+ };
+
+ // Convert from QueryParameter class to the Parameter wrapper class
+ private static final Function QUERY_PARAMETER_FROM_PB_FUNCTION =
+ pb ->
+ Parameter.newBuilder()
+ .setName(pb.getName() == null ? "" : pb.getName())
+ .setValue(QueryParameterValue.fromPb(pb.getParameterValue(), pb.getParameterType()))
+ .build();
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionSettings.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionSettings.java
new file mode 100644
index 000000000..ac3b1b1e0
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionSettings.java
@@ -0,0 +1,453 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigquery.JobInfo.CreateDisposition;
+import com.google.cloud.bigquery.JobInfo.SchemaUpdateOption;
+import com.google.cloud.bigquery.JobInfo.WriteDisposition;
+import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/** ConnectionSettings for setting up a BigQuery query connection. */
+@AutoValue
+public abstract class ConnectionSettings {
+ ConnectionSettings() {
+ // Package private so users can't subclass it but AutoValue can.
+ }
+
+ /**
+ * Returns useReadAPI flag, enabled by default. Read API will be used if the underlying conditions
+ * are satisfied and this flag is enabled
+ */
+ @Nullable
+ public abstract Boolean getUseReadAPI();
+
+ /** Returns the synchronous response timeoutMs associated with this query */
+ @Nullable
+ public abstract Long getRequestTimeout();
+
+ /** Returns the connection properties for connection string with this query */
+ @Nullable
+ public abstract List getConnectionProperties();
+
+ /** Returns the default dataset */
+ @Nullable
+ public abstract DatasetId getDefaultDataset();
+
+ /** Returns the limits the bytes billed for this job */
+ @Nullable
+ public abstract Long getMaximumBytesBilled();
+
+ /** Returns the maximum number of rows of data */
+ @Nullable
+ public abstract Long getMaxResults();
+
+ /** Returns the number of rows of data to pre-fetch */
+ @Nullable
+ public abstract Integer getNumBufferedRows();
+
+ @Nullable
+ public abstract Integer getTotalToPageRowCountRatio();
+
+ @Nullable
+ public abstract Integer getMinResultSize();
+
+ @Nullable
+ public abstract Integer getMaxResultPerPage();
+
+ /** Returns whether to look for the result in the query cache */
+ @Nullable
+ public abstract Boolean getUseQueryCache();
+
+ /**
+ * Returns whether nested and repeated fields should be flattened. If set to {@code false} {@link
+ * ConnectionSettings.Builder#setAllowLargeResults(Boolean)} must be {@code true}.
+ *
+ * @see Flatten
+ */
+ @Nullable
+ public abstract Boolean getFlattenResults();
+
+ /**
+ * Returns the BigQuery Storage read API configuration @Nullable public abstract
+ * ReadClientConnectionConfiguration getReadClientConnectionConfiguration();
+ */
+
+ /**
+ * Below properties are only supported by jobs.insert API and not yet supported by jobs.query API
+ * *
+ */
+
+ /** Returns the clustering specification for the destination table. */
+ @Nullable
+ public abstract Clustering getClustering();
+
+ /**
+ * Returns whether the job is allowed to create new tables.
+ *
+ * @see
+ * Create Disposition
+ */
+ @Nullable
+ public abstract CreateDisposition getCreateDisposition();
+
+ /** Returns the custom encryption configuration (e.g., Cloud KMS keys) */
+ @Nullable
+ public abstract EncryptionConfiguration getDestinationEncryptionConfiguration();
+
+ /**
+ * Returns the table where to put query results. If not provided a new table is created. This
+ * value is required if {@link # allowLargeResults()} is {@code true}.
+ */
+ @Nullable
+ public abstract TableId getDestinationTable();
+
+ /** Returns the timeout associated with this job */
+ @Nullable
+ public abstract Long getJobTimeoutMs();
+
+ /** Returns the optional billing tier limit for this job. */
+ @Nullable
+ public abstract Integer getMaximumBillingTier();
+
+ /** Returns the query priority. */
+ @Nullable
+ public abstract Priority getPriority();
+
+ /**
+ * Returns whether the job is enabled to create arbitrarily large results. If {@code true} the
+ * query is allowed to create large results at a slight cost in performance. the query is allowed
+ * to create large results at a slight cost in performance.
+ *
+ * @see Returning
+ * Large Query Results
+ */
+ @Nullable
+ public abstract Boolean getAllowLargeResults();
+
+ /**
+ * Returns whether to create a new session.
+ *
+ * @see Create Sessions
+ */
+ @Nullable
+ public abstract Boolean getCreateSession();
+
+ /** Returns the range partitioning specification for the table */
+ @Nullable
+ public abstract RangePartitioning getRangePartitioning();
+
+ /**
+ * [Experimental] Returns options allowing the schema of the destination table to be updated as a
+ * side effect of the query job. Schema update options are supported in two cases: when
+ * writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination
+ * table is a partition of a table, specified by partition decorators. For normal tables,
+ * WRITE_TRUNCATE will always overwrite the schema.
+ */
+ @Nullable
+ public abstract List getSchemaUpdateOptions();
+
+ /**
+ * Returns the external tables definitions. If querying external data sources outside of BigQuery,
+ * this value describes the data format, location and other properties of the data sources. By
+ * defining these properties, the data sources can be queried as if they were standard BigQuery
+ * tables.
+ */
+ @Nullable
+ public abstract Map getTableDefinitions();
+
+ /** Returns the time partitioning specification for the destination table. */
+ @Nullable
+ public abstract TimePartitioning getTimePartitioning();
+
+ /**
+ * Returns user defined function resources that can be used by this query. Function resources can
+ * either be defined inline ({@link UserDefinedFunction.Type#INLINE}) or loaded from a Google
+ * Cloud Storage URI ({@link UserDefinedFunction.Type#FROM_URI}.
+ */
+ @Nullable
+ public abstract List getUserDefinedFunctions();
+
+ /**
+ * Returns the action that should occur if the destination table already exists.
+ *
+ * @see
+ * Write Disposition
+ */
+ @Nullable
+ public abstract WriteDisposition getWriteDisposition();
+
+ /** Returns a builder pre-populated using the current values of this field. */
+ public abstract Builder toBuilder();
+
+ /** Returns a builder for a {@code ConnectionSettings} object. */
+ public static Builder newBuilder() {
+ return new AutoValue_ConnectionSettings.Builder().withDefaultValues();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ Builder withDefaultValues() {
+ return setUseReadAPI(true) // Read API is enabled by default
+ .setNumBufferedRows(10000) // 10K records will be kept in the buffer (Blocking Queue)
+ .setMinResultSize(200000) // Read API will be enabled when there are at least 100K records
+ .setTotalToPageRowCountRatio(3) // there should be at least 3 pages of records
+ .setMaxResultPerPage(100000); // page size for pagination
+ }
+
+ /**
+ * Sets useReadAPI flag, enabled by default. Read API will be used if the underlying conditions
+ * are satisfied and this flag is enabled
+ *
+ * @param useReadAPI or {@code true} for none
+ */
+ @Nullable
+ public abstract Builder setUseReadAPI(Boolean useReadAPI);
+
+ /**
+ * Sets how long to wait for the query to complete, in milliseconds, before the request times
+ * out and returns. Note that this is only a timeout for the request, not the query. If the
+ * query takes longer to run than the timeout value, the call returns without any results and
+ * with the 'jobComplete' flag set to false. You can call GetQueryResults() to wait for the
+ * query to complete and read the results. The default value is 10000 milliseconds (10 seconds).
+ *
+ * @param timeoutMs or {@code null} for none
+ */
+ public abstract Builder setRequestTimeout(Long timeoutMs);
+
+ /**
+ * Sets a connection-level property to customize query behavior.
+ *
+ * @param connectionProperties connectionProperties or {@code null} for none
+ */
+ public abstract Builder setConnectionProperties(List connectionProperties);
+
+ /**
+ * Sets the default dataset. This dataset is used for all unqualified table names used in the
+ * query.
+ */
+ public abstract Builder setDefaultDataset(DatasetId datasetId);
+
+ /**
+ * Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit
+ * will fail (without incurring a charge). If unspecified, this will be set to your project
+ * default.
+ *
+ * @param maximumBytesBilled maximum bytes billed for this job
+ */
+ public abstract Builder setMaximumBytesBilled(Long maximumBytesBilled);
+
+ /**
+ * Sets the maximum number of rows of data to return per page of results. Setting this flag to a
+ * small value such as 1000 and then paging through results might improve reliability when the
+ * query result set is large. In addition to this limit, responses are also limited to 10 MB. By
+ * default, there is no maximum row count, and only the byte limit applies.
+ *
+ * @param maxResults maxResults or {@code null} for none
+ */
+ public abstract Builder setMaxResults(Long maxResults);
+
+ /**
+ * Sets the number of rows in the buffer (a blocking queue) that query results are consumed
+ * from.
+ *
+ * @param numBufferedRows numBufferedRows or {@code null} for none
+ */
+ public abstract Builder setNumBufferedRows(Integer numBufferedRows);
+
+ /**
+ * Sets a ratio of the total number of records and the records returned in the current page.
+ * This value is checked before calling the Read API
+ *
+ * @param totalToPageRowCountRatio totalToPageRowCountRatio
+ */
+ public abstract Builder setTotalToPageRowCountRatio(Integer totalToPageRowCountRatio);
+
+ /**
+ * Sets the minimum result size for which the Read API will be enabled
+ *
+ * @param minResultSize minResultSize
+ */
+ public abstract Builder setMinResultSize(Integer minResultSize);
+
+ /**
+ * Sets the maximum records per page to be used for pagination. This is used as an input for the
+ * tabledata.list and jobs.getQueryResults RPC calls
+ *
+ * @param maxResultPerPage
+ */
+ public abstract Builder setMaxResultPerPage(Integer maxResultPerPage);
+
+ /**
+ * Sets whether to look for the result in the query cache. The query cache is a best-effort
+ * cache that will be flushed whenever tables in the query are modified. Moreover, the query
+ * cache is only available when {@link ConnectionSettings.Builder#setDestinationTable(TableId)}
+ * is not set.
+ *
+ * @see Query Caching
+ */
+ public abstract Builder setUseQueryCache(Boolean useQueryCache);
+
+ /**
+ * Sets whether nested and repeated fields should be flattened. If set to {@code false} {@link
+ * ConnectionSettings.Builder#setAllowLargeResults(Boolean)} must be {@code true}. By default
+ * results are flattened.
+ *
+ * @see Flatten
+ */
+ public abstract Builder setFlattenResults(Boolean flattenResults);
+
+ /* */
+ /**/
+ /**
+ * Sets the values necessary to determine whether table result will be read using the BigQuery
+ * Storage client Read API. The BigQuery Storage client Read API will be used to read the query
+ * result when the totalToFirstPageSizeRatio (default 3) and minimumTableSize (default 100 rows)
+ * conditions set are met. A ReadSession will be created using the Apache Arrow data format for
+ * serialization.
+ *
+ *
It also sets the maximum number of table rows allowed in buffer before streaming them to
+ * the BigQueryResult.
+ *
+ * @param readClientConnectionConfiguration or {@code null} for none
+ */
+ /*
+ public abstract Builder setReadClientConnectionConfiguration(
+ ReadClientConnectionConfiguration readClientConnectionConfiguration);*/
+
+ /** Sets the clustering specification for the destination table. */
+ public abstract Builder setClustering(Clustering clustering);
+
+ /**
+ * Sets whether the job is allowed to create tables.
+ *
+ * @see
+ * Create Disposition
+ */
+ public abstract Builder setCreateDisposition(CreateDisposition createDisposition);
+
+ /**
+ * Sets the custom encryption configuration (e.g., Cloud KMS keys).
+ *
+ * @param destinationEncryptionConfiguration destinationEncryptionConfiguration or {@code null}
+ * for none
+ */
+ public abstract Builder setDestinationEncryptionConfiguration(
+ EncryptionConfiguration destinationEncryptionConfiguration);
+
+ /**
+ * Sets the table where to put query results. If not provided a new table is created. This value
+ * is required if {@link ConnectionSettings.Builder#setAllowLargeResults(Boolean)} is set to
+ * {@code true}.
+ */
+ public abstract Builder setDestinationTable(TableId destinationTable);
+
+ /**
+ * [Optional] Job timeout in milliseconds. If this time limit is exceeded, BigQuery may attempt
+ * to terminate the job.
+ *
+ * @param jobTimeoutMs jobTimeoutMs or {@code null} for none
+ */
+ public abstract Builder setJobTimeoutMs(Long jobTimeoutMs);
+
+ /**
+ * Limits the billing tier for this job. Queries that have resource usage beyond this tier will
+ * fail (without incurring a charge). If unspecified, this will be set to your project default.
+ *
+ * @param maximumBillingTier maximum billing tier for this job
+ */
+ public abstract Builder setMaximumBillingTier(Integer maximumBillingTier);
+
+ /**
+ * Sets a priority for the query. If not specified the priority is assumed to be {@link
+ * Priority#INTERACTIVE}.
+ */
+ public abstract Builder setPriority(Priority priority);
+
+ /**
+ * Sets whether the job is enabled to create arbitrarily large results. If {@code true} the
+ * query is allowed to create large results at a slight cost in performance. If {@code true}
+ * {@link ConnectionSettings.Builder#setDestinationTable(TableId)} must be provided.
+ *
+ * @see Returning
+ * Large Query Results
+ */
+ public abstract Builder setAllowLargeResults(Boolean allowLargeResults);
+
+ /**
+ * Sets whether to create a new session. If {@code true} a random session id will be generated
+ * by BigQuery. If false, runs query with an existing session_id passed in ConnectionProperty,
+ * otherwise runs query in non-session mode."
+ */
+ public abstract Builder setCreateSession(Boolean createSession);
+
+ /**
+ * Range partitioning specification for this table. Only one of timePartitioning and
+ * rangePartitioning should be specified.
+ *
+ * @param rangePartitioning rangePartitioning or {@code null} for none
+ */
+ public abstract Builder setRangePartitioning(RangePartitioning rangePartitioning);
+
+ /**
+ * [Experimental] Sets options allowing the schema of the destination table to be updated as a
+ * side effect of the query job. Schema update options are supported in two cases: when
+ * writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination
+ * table is a partition of a table, specified by partition decorators. For normal tables,
+ * WRITE_TRUNCATE will always overwrite the schema.
+ */
+ public abstract Builder setSchemaUpdateOptions(List schemaUpdateOptions);
+
+ /**
+ * Sets the external tables definitions. If querying external data sources outside of BigQuery,
+ * this value describes the data format, location and other properties of the data sources. By
+ * defining these properties, the data sources can be queried as if they were standard BigQuery
+ * tables.
+ */
+ public abstract Builder setTableDefinitions(
+ Map tableDefinitions);
+
+ /** Sets the time partitioning specification for the destination table. */
+ public abstract Builder setTimePartitioning(TimePartitioning timePartitioning);
+
+ /**
+ * Sets user defined function resources that can be used by this query. Function resources can
+ * either be defined inline ({@link UserDefinedFunction#inline(String)}) or loaded from a Google
+ * Cloud Storage URI ({@link UserDefinedFunction#fromUri(String)}.
+ */
+ public abstract Builder setUserDefinedFunctions(List userDefinedFunctions);
+
+ /**
+ * Sets the action that should occur if the destination table already exists.
+ *
+ * @see
+ * Write Disposition
+ */
+ public abstract Builder setWriteDisposition(WriteDisposition writeDisposition);
+
+ /** Creates a {@code ConnectionSettings} object. */
+ public abstract ConnectionSettings build();
+ }
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java
index ab9fdabb3..0ef1d1f94 100644
--- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java
@@ -21,6 +21,7 @@
import com.google.api.services.bigquery.model.JobStatistics2;
import com.google.api.services.bigquery.model.JobStatistics3;
import com.google.api.services.bigquery.model.JobStatistics4;
+import com.google.api.services.bigquery.model.QueryParameter;
import com.google.cloud.StringEnumType;
import com.google.cloud.StringEnumValue;
import com.google.common.base.Function;
@@ -339,6 +340,7 @@ public static class QueryStatistics extends JobStatistics {
private final List queryPlan;
private final List timeline;
private final Schema schema;
+ private final List queryParameters;
/**
* StatementType represents possible types of SQL statements reported as part of the
@@ -421,6 +423,7 @@ static final class Builder extends JobStatistics.Builder queryPlan;
private List timeline;
private Schema schema;
+ private List queryParameters;
private Builder() {}
@@ -569,6 +572,11 @@ Builder setSchema(Schema schema) {
return self();
}
+ Builder setQueryParameters(List queryParameters) {
+ this.queryParameters = queryParameters;
+ return self();
+ }
+
@Override
QueryStatistics build() {
return new QueryStatistics(this);
@@ -595,6 +603,7 @@ private QueryStatistics(Builder builder) {
this.queryPlan = builder.queryPlan;
this.timeline = builder.timeline;
this.schema = builder.schema;
+ this.queryParameters = builder.queryParameters;
}
/** Returns query statistics specific to the use of BI Engine. */
@@ -715,6 +724,14 @@ public Schema getSchema() {
return schema;
}
+ /**
+ * Standard SQL only: Returns a list of undeclared query parameters detected during a dry run
+ * validation.
+ */
+ public List getQueryParameters() {
+ return queryParameters;
+ }
+
@Override
ToStringHelper toStringHelper() {
return super.toStringHelper()
@@ -725,7 +742,8 @@ ToStringHelper toStringHelper() {
.add("totalBytesProcessed", totalBytesProcessed)
.add("queryPlan", queryPlan)
.add("timeline", timeline)
- .add("schema", schema);
+ .add("schema", schema)
+ .add("queryParameters", queryParameters);
}
@Override
@@ -746,7 +764,8 @@ public final int hashCode() {
totalBytesBilled,
totalBytesProcessed,
queryPlan,
- schema);
+ schema,
+ queryParameters);
}
@Override
@@ -788,6 +807,9 @@ com.google.api.services.bigquery.model.JobStatistics toPb() {
if (schema != null) {
queryStatisticsPb.setSchema(schema.toPb());
}
+ if (queryParameters != null) {
+ queryStatisticsPb.setUndeclaredQueryParameters(queryParameters);
+ }
return super.toPb().setQuery(queryStatisticsPb);
}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Parameter.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Parameter.java
new file mode 100644
index 000000000..9959feab9
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Parameter.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+
+/* Wrapper class for query parameters */
+@AutoValue
+public abstract class Parameter {
+ Parameter() {
+ // Package private so users can't subclass it but AutoValue can.
+ }
+
+ /**
+ * Returns the name of the query parameter. If unset, this is a positional parameter. Otherwise,
+ * should be unique within a query.
+ *
+ * @return value or {@code null} for none
+ */
+ @Nullable
+ public abstract String getName();
+
+ /** Returns the value for a query parameter along with its type. */
+ public abstract QueryParameterValue getValue();
+
+ /** Returns a builder pre-populated using the current values of this field. */
+ public abstract Builder toBuilder();
+
+ /** Returns a builder for a {@code Parameter} object. */
+ public static Builder newBuilder() {
+ return new AutoValue_Parameter.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /**
+ * [Optional] Sets the name of the query parameter. If unset, this is a positional parameter.
+ * Otherwise, should be unique within a query.
+ *
+ * @param name name or {@code null} for none
+ */
+ public abstract Builder setName(String name);
+
+ /**
+ * Sets the the value for a query parameter along with its type.
+ *
+ * @param parameter parameter or {@code null} for none
+ */
+ public abstract Builder setValue(QueryParameterValue parameter);
+
+ /** Creates a {@code Parameter} object. */
+ public abstract Parameter build();
+ }
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java
index 48ec22caf..cc726bdd1 100644
--- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java
@@ -166,6 +166,11 @@ private Builder(com.google.api.services.bigquery.model.JobConfiguration configur
this();
JobConfigurationQuery queryConfigurationPb = configurationPb.getQuery();
this.query = queryConfigurationPb.getQuery();
+ // Allows to get undeclaredqueryparameters in jobstatistics2
+ if (queryConfigurationPb.getQueryParameters() == null
+ && queryConfigurationPb.getParameterMode() != null) {
+ parameterMode = queryConfigurationPb.getParameterMode();
+ }
if (queryConfigurationPb.getQueryParameters() != null
&& !queryConfigurationPb.getQueryParameters().isEmpty()) {
if (queryConfigurationPb.getQueryParameters().get(0).getName() == null) {
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ReadClientConnectionConfiguration.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ReadClientConnectionConfiguration.java
new file mode 100644
index 000000000..e0805a11e
--- /dev/null
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ReadClientConnectionConfiguration.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+
+/** Represents BigQueryStorage Read client connection information. */
+@AutoValue
+public abstract class ReadClientConnectionConfiguration implements Serializable {
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /**
+ * Sets the total row count to page row count ratio used to determine whether to us the
+ * BigQueryStorage Read client to fetch result sets after the first page.
+ */
+ @Nullable
+ public abstract Builder setTotalToPageRowCountRatio(Long ratio);
+
+ /**
+ * Sets the minimum number of table rows in the query results used to determine whether to us
+ * the BigQueryStorage Read client to fetch result sets after the first page.
+ */
+ @Nullable
+ public abstract Builder setMinResultSize(Long numRows);
+
+ /**
+ * Sets the maximum number of table rows allowed in buffer before streaming them to the
+ * BigQueryResult.
+ */
+ @Nullable
+ public abstract Builder setBufferSize(Long bufferSize);
+
+ /** Creates a {@code ReadClientConnectionConfiguration} object. */
+ public abstract ReadClientConnectionConfiguration build();
+ }
+
+ /** Returns the totalToPageRowCountRatio in this configuration. */
+ public abstract Long getTotalToPageRowCountRatio();
+
+ /** Returns the minResultSize in this configuration. */
+ public abstract Long getMinResultSize();
+
+ /** Returns the bufferSize in this configuration. */
+ public abstract Long getBufferSize();
+
+ public abstract Builder toBuilder();
+
+ /** Returns a builder for a {@code ReadClientConnectionConfiguration} object. */
+ public static Builder newBuilder() {
+ return new AutoValue_ReadClientConnectionConfiguration.Builder();
+ }
+}
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java
index 06488c5b4..871590ca4 100644
--- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java
@@ -122,6 +122,13 @@ Boolean getBoolean(Map