Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,14 @@ SQL support
-----------

The connector provides read and write access to data and metadata in the
BigQuery database, though write access is limited. In addition to the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😃

BigQuery database. In addition to the
:ref:`globally available <sql-globally-available>` and
:ref:`read operation <sql-read-operations>` statements, the connector supports
the following features:

* :doc:`/sql/insert`
* :doc:`/sql/create-table`
* :doc:`/sql/create-table-as`
* :doc:`/sql/drop-table`
* :doc:`/sql/create-schema`
* :doc:`/sql/drop-schema`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
Expand Down Expand Up @@ -54,6 +56,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -281,6 +284,15 @@ public String selectSql(TableId table, String formattedColumns)
return format("SELECT %s FROM `%s`", formattedColumns, tableName);
}

public void insert(InsertAllRequest insertAllRequest)
{
InsertAllResponse response = bigQuery.insertAll(insertAllRequest);
if (response.hasErrors()) {
// Note that BigQuery doesn't rollback inserted rows
throw new TrinoException(BIGQUERY_FAILED_TO_EXECUTE_QUERY, format("Failed to insert rows: %s", response.getInsertErrors()));
Comment thread
hashhar marked this conversation as resolved.
Outdated
}
}

private static String fullTableName(TableId remoteTableId)
{
String remoteSchemaName = remoteTableId.getDataset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
Expand All @@ -43,6 +44,7 @@ public class BigQueryConnector
private final BigQueryMetadata metadata;
private final BigQuerySplitManager splitManager;
private final BigQueryPageSourceProvider pageSourceProvider;
private final BigQueryPageSinkProvider pageSinkProvider;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final List<PropertyMetadata<?>> sessionProperties;

Expand All @@ -51,12 +53,14 @@ public BigQueryConnector(
BigQueryMetadata metadata,
BigQuerySplitManager splitManager,
BigQueryPageSourceProvider pageSourceProvider,
BigQueryPageSinkProvider pageSinkProvider,
Set<ConnectorTableFunction> connectorTableFunctions,
Set<SessionPropertiesProvider> sessionPropertiesProviders)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.connectorTableFunctions = requireNonNull(connectorTableFunctions, "connectorTableFunctions is null");
this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
Expand Down Expand Up @@ -89,6 +93,12 @@ public ConnectorPageSourceProvider getPageSourceProvider()
return pageSourceProvider;
}

@Override
public ConnectorPageSinkProvider getPageSinkProvider()
{
return pageSinkProvider;
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected void setup(Binder binder)
binder.bind(BigQueryMetadata.class).in(Scopes.SINGLETON);
binder.bind(BigQuerySplitManager.class).in(Scopes.SINGLETON);
binder.bind(BigQueryPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(BigQueryPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(ViewMaterializationCache.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(BigQueryConfig.class);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.type.Type;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class BigQueryInsertTableHandle
implements ConnectorInsertTableHandle
{
private final RemoteTableName remoteTableName;
private final List<String> columnNames;
private final List<Type> columnTypes;

@JsonCreator
public BigQueryInsertTableHandle(
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes)
{
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must have the same size");
}

@JsonProperty
public RemoteTableName getRemoteTableName()
{
return remoteTableName;
}

@JsonProperty
public List<String> getColumnNames()
{
return columnNames;
}

@JsonProperty
public List<Type> getColumnTypes()
{
return columnTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.plugin.bigquery.BigQueryClient.RemoteDatabaseObject;
import io.trino.plugin.bigquery.ptf.Query.QueryHandle;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnSchema;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableSchema;
Expand All @@ -48,6 +53,7 @@
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
Expand All @@ -58,11 +64,13 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;

import javax.inject.Inject;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -86,6 +94,7 @@
import static io.trino.plugin.bigquery.BigQueryTableHandle.getPartitionType;
import static io.trino.plugin.bigquery.BigQueryType.toField;
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -388,9 +397,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
public void dropSchema(ConnectorSession session, String schemaName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
String remoteSchemaName = client.toRemoteDataset(getProjectId(client), schemaName)
.map(RemoteDatabaseObject::getOnlyRemoteName)
.orElseThrow(() -> new SchemaNotFoundException(schemaName));
String remoteSchemaName = getRemoteSchemaName(client, getProjectId(client), schemaName);
client.dropSchema(DatasetId.of(remoteSchemaName));
}

Expand All @@ -408,7 +415,13 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}
}

private void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
return createTable(session, tableMetadata);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also need a defensive check to verify query retries are not enabled?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Thanks for catching that. Do you want to send the PR?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private BigQueryOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
SchemaTableName schemaTableName = tableMetadata.getTable();
String schemaName = schemaTableName.getSchemaName();
Expand All @@ -418,16 +431,34 @@ private void createTable(ConnectorSession session, ConnectorTableMetadata tableM
throw new SchemaNotFoundException(schemaName);
}

List<Field> fields = tableMetadata.getColumns().stream()
.map(column -> toField(column.getName(), column.getType(), column.getComment()))
.collect(toImmutableList());
int columnSize = tableMetadata.getColumns().size();
ImmutableList.Builder<Field> fields = ImmutableList.builderWithExpectedSize(columnSize);
ImmutableList.Builder<String> columnsNames = ImmutableList.builderWithExpectedSize(columnSize);
ImmutableList.Builder<Type> columnsTypes = ImmutableList.builderWithExpectedSize(columnSize);
for (ColumnMetadata column : tableMetadata.getColumns()) {
fields.add(toField(column.getName(), column.getType(), column.getComment()));
columnsNames.add(column.getName());
columnsTypes.add(column.getType());
}

TableId tableId = TableId.of(schemaName, tableName);
TableDefinition tableDefinition = StandardTableDefinition.of(Schema.of(fields));
BigQueryClient client = bigQueryClientFactory.create(session);
String projectId = getProjectId(client);
String remoteSchemaName = getRemoteSchemaName(client, projectId, schemaName);

TableId tableId = TableId.of(projectId, remoteSchemaName, tableName);
TableDefinition tableDefinition = StandardTableDefinition.of(Schema.of(fields.build()));
TableInfo.Builder tableInfo = TableInfo.newBuilder(tableId, tableDefinition);
tableMetadata.getComment().ifPresent(tableInfo::setDescription);

bigQueryClientFactory.create(session).createTable(tableInfo.build());
client.createTable(tableInfo.build());

return new BigQueryOutputTableHandle(new RemoteTableName(tableId), columnsNames.build(), columnsTypes.build());
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}

@Override
Expand All @@ -442,6 +473,32 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
client.dropTable(tableId);
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
if (retryMode != RetryMode.NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
BigQueryTableHandle table = (BigQueryTableHandle) tableHandle;
if (isWildcardTable(TableDefinition.Type.valueOf(table.asPlainTable().getType()), table.asPlainTable().getRemoteTableName().getTableName())) {
throw new TrinoException(BIGQUERY_UNSUPPORTED_OPERATION, "This connector does not support inserting into wildcard tables");
}
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columns.size());
ImmutableList.Builder<Type> columnTypes = ImmutableList.builderWithExpectedSize(columns.size());
for (ColumnHandle columnHandle : columns) {
BigQueryColumnHandle column = (BigQueryColumnHandle) columnHandle;
columnNames.add(column.getName());
columnTypes.add(column.getTrinoType());
}
return new BigQueryInsertTableHandle(table.asPlainTable().getRemoteTableName(), columnNames.build(), columnTypes.build());
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}

@Override
public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjection(
ConnectorSession session,
Expand Down Expand Up @@ -511,6 +568,13 @@ public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTable
return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles));
}

private String getRemoteSchemaName(BigQueryClient client, String projectId, String datasetName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should also call this in createSchema to avoid creating schemas in BigQuery which differ only in case. Separate and pre-existing issue however.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out this is not needed/the wrong thing to do. When creating schemas we already list them and the listing will include the lowercase name so Trino will see a schema already exists - I've added a test to verify this similar to what you added for DROP SCHEMA in #13812

{
return client.toRemoteDataset(projectId, datasetName)
.map(RemoteDatabaseObject::getOnlyRemoteName)
.orElseThrow(() -> new SchemaNotFoundException(datasetName));
}

private static boolean containSameElements(Iterable<? extends ColumnHandle> first, Iterable<? extends ColumnHandle> second)
{
return ImmutableSet.copyOf(first).equals(ImmutableSet.copyOf(second));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.type.Type;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class BigQueryOutputTableHandle
implements ConnectorOutputTableHandle
{
private final RemoteTableName remoteTableName;
private final List<String> columnNames;
private final List<Type> columnTypes;

@JsonCreator
public BigQueryOutputTableHandle(
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes)
{
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must have the same size");
}

@JsonProperty
public RemoteTableName getRemoteTableName()
{
return remoteTableName;
}

@JsonProperty
public List<String> getColumnNames()
{
return columnNames;
}

@JsonProperty
public List<Type> getColumnTypes()
{
return columnTypes;
}
}
Loading