diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst index b8f22c6a243b..6f8d4ca182f3 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -49,6 +49,7 @@ depending on the desired :ref:`retry policy `. * Fault tolerant execution of :ref:`write operations ` is supported by the following connectors: + * :doc:`/connector/bigquery` * :doc:`/connector/delta-lake` * :doc:`/connector/hive` * :doc:`/connector/iceberg` diff --git a/plugin/trino-bigquery/pom.xml b/plugin/trino-bigquery/pom.xml index 5b923291b20f..840a7d6860ed 100644 --- a/plugin/trino-bigquery/pom.xml +++ b/plugin/trino-bigquery/pom.xml @@ -295,6 +295,12 @@ test + + io.trino + trino-exchange-filesystem + test + + io.trino trino-main @@ -430,6 +436,7 @@ **/TestBigQueryMetadata.java **/TestBigQueryInstanceCleaner.java **/TestBigQueryCaseInsensitiveMapping.java + **/TestBigQuery*FailureRecoveryTest.java @@ -455,6 +462,7 @@ **/TestBigQueryTypeMapping.java **/TestBigQueryMetadata.java **/TestBigQueryInstanceCleaner.java + **/TestBigQuery*FailureRecoveryTest.java diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryInsertTableHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryInsertTableHandle.java index 697b357a1c0d..bcd66ec29aeb 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryInsertTableHandle.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryInsertTableHandle.java @@ -30,17 +30,23 @@ public class BigQueryInsertTableHandle private final RemoteTableName remoteTableName; private final List columnNames; private final List columnTypes; + private final String temporaryTableName; + private final String pageSinkIdColumnName; @JsonCreator public BigQueryInsertTableHandle( @JsonProperty("remoteTableName") RemoteTableName remoteTableName, @JsonProperty("columnNames") List columnNames, - @JsonProperty("columnTypes") List columnTypes) + @JsonProperty("columnTypes") List columnTypes, + @JsonProperty("temporaryTableName") String temporaryTableName, + @JsonProperty("pageSinkIdColumnName") String pageSinkIdColumnName) { this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null"); this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null")); checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must have the same size"); + this.temporaryTableName = requireNonNull(temporaryTableName, "temporaryTableName is null"); + this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null"); } @JsonProperty @@ -60,4 +66,21 @@ public List getColumnTypes() { return columnTypes; } + + @JsonProperty + public String getTemporaryTableName() + { + return temporaryTableName; + } + + public RemoteTableName getTemporaryRemoteTableName() + { + return new RemoteTableName(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), temporaryTableName); + } + + @JsonProperty + public String getPageSinkIdColumnName() + { + return pageSinkIdColumnName; + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index a920f8132a15..0a2253d5327f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -17,6 +17,7 @@ import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; @@ -30,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; +import com.google.common.io.Closer; import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.trino.plugin.bigquery.BigQueryClient.RemoteDatabaseObject; @@ -67,17 +69,21 @@ import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; +import io.trino.spi.type.BigintType; import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; import javax.inject.Inject; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL; @@ -85,9 +91,11 @@ import static com.google.cloud.bigquery.TableDefinition.Type.TABLE; import static com.google.cloud.bigquery.TableDefinition.Type.VIEW; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.plugin.bigquery.BigQueryClient.buildColumnHandles; +import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_UNSUPPORTED_OPERATION; import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE; @@ -97,7 +105,7 @@ import static io.trino.plugin.bigquery.BigQueryType.toField; import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; import static io.trino.plugin.bigquery.BigQueryUtil.quote; -import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.plugin.bigquery.BigQueryUtil.quoted; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -107,6 +115,7 @@ public class BigQueryMetadata implements ConnectorMetadata { private static final Logger log = Logger.get(BigQueryMetadata.class); + private static final Type TRINO_PAGE_SINK_ID_COLUMN_TYPE = BigintType.BIGINT; static final int DEFAULT_NUMERIC_TYPE_PRECISION = 38; static final int DEFAULT_NUMERIC_TYPE_SCALE = 9; @@ -395,7 +404,7 @@ public void dropSchema(ConnectorSession session, String schemaName) public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) { try { - createTable(session, tableMetadata); + createTable(session, tableMetadata, Optional.empty()); } catch (BigQueryException e) { if (ignoreExisting && e.getCode() == 409) { @@ -408,13 +417,13 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe @Override public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { - if (retryMode != RetryMode.NO_RETRIES) { - throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries"); - } - return createTable(session, tableMetadata); + ColumnMetadata pageSinkIdColumn = buildPageSinkIdColumn(tableMetadata.getColumns().stream() + .map(ColumnMetadata::getName) + .collect(toImmutableList())); + return createTable(session, tableMetadata, Optional.of(pageSinkIdColumn)); } - private BigQueryOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + private BigQueryOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional pageSinkIdColumn) { SchemaTableName schemaTableName = tableMetadata.getTable(); String schemaName = schemaTableName.getSchemaName(); @@ -426,10 +435,13 @@ private BigQueryOutputTableHandle createTable(ConnectorSession session, Connecto int columnSize = tableMetadata.getColumns().size(); ImmutableList.Builder fields = ImmutableList.builderWithExpectedSize(columnSize); + // Note: this list is only actually used when pageSinkIdColumn isPresent + ImmutableList.Builder tempFields = ImmutableList.builderWithExpectedSize(columnSize + 1); ImmutableList.Builder columnsNames = ImmutableList.builderWithExpectedSize(columnSize); ImmutableList.Builder columnsTypes = ImmutableList.builderWithExpectedSize(columnSize); for (ColumnMetadata column : tableMetadata.getColumns()) { fields.add(toField(column.getName(), column.getType(), column.getComment())); + tempFields.add(toField(column.getName(), column.getType(), column.getComment())); columnsNames.add(column.getName()); columnsTypes.add(column.getType()); } @@ -438,20 +450,41 @@ private BigQueryOutputTableHandle createTable(ConnectorSession session, Connecto String projectId = client.getProjectId(); String remoteSchemaName = getRemoteSchemaName(client, projectId, schemaName); - TableId tableId = TableId.of(projectId, remoteSchemaName, tableName); - TableDefinition tableDefinition = StandardTableDefinition.of(Schema.of(fields.build())); + TableId tableId = createTable(client, projectId, remoteSchemaName, tableName, fields.build(), tableMetadata.getComment()); + + Optional temporaryTableName = pageSinkIdColumn.map(column -> { + tempFields.add(toField(column.getName(), column.getType(), column.getComment())); + String tempTableName = generateTemporaryTableName(); + createTable(client, projectId, remoteSchemaName, tempTableName, tempFields.build(), tableMetadata.getComment()); + return tempTableName; + }); + + return new BigQueryOutputTableHandle( + new RemoteTableName(tableId), + columnsNames.build(), + columnsTypes.build(), + temporaryTableName, + pageSinkIdColumn.map(ColumnMetadata::getName)); + } + + private TableId createTable(BigQueryClient client, String projectId, String datasetName, String tableName, List fields, Optional tableComment) + { + TableId tableId = TableId.of(projectId, datasetName, tableName); + TableDefinition tableDefinition = StandardTableDefinition.of(Schema.of(fields)); TableInfo.Builder tableInfo = TableInfo.newBuilder(tableId, tableDefinition); - tableMetadata.getComment().ifPresent(tableInfo::setDescription); + tableComment.ifPresent(tableInfo::setDescription); client.createTable(tableInfo.build()); - return new BigQueryOutputTableHandle(new RemoteTableName(tableId), columnsNames.build(), columnsTypes.build()); + return tableId; } @Override public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) { - return Optional.empty(); + BigQueryOutputTableHandle handle = (BigQueryOutputTableHandle) tableHandle; + checkState(handle.getTemporaryTableName().isPresent(), "Unexpected use of finishCreateTable without a temporaryTableName present"); + return finishInsert(session, handle.getRemoteTableName(), handle.getTemporaryRemoteTableName().orElseThrow(), handle.getPageSinkIdColumnName().orElseThrow(), handle.getColumnNames(), fragments); } @Override @@ -484,27 +517,98 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa @Override public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode) { - if (retryMode != RetryMode.NO_RETRIES) { - throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries"); - } BigQueryTableHandle table = (BigQueryTableHandle) tableHandle; if (isWildcardTable(TableDefinition.Type.valueOf(table.asPlainTable().getType()), table.asPlainTable().getRemoteTableName().getTableName())) { throw new TrinoException(BIGQUERY_UNSUPPORTED_OPERATION, "This connector does not support inserting into wildcard tables"); } ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columns.size()); ImmutableList.Builder columnTypes = ImmutableList.builderWithExpectedSize(columns.size()); + ImmutableList.Builder tempFields = ImmutableList.builderWithExpectedSize(columns.size() + 1); + for (ColumnHandle columnHandle : columns) { BigQueryColumnHandle column = (BigQueryColumnHandle) columnHandle; + tempFields.add(toField(column.getName(), column.getTrinoType(), column.getColumnMetadata().getComment())); columnNames.add(column.getName()); columnTypes.add(column.getTrinoType()); } - return new BigQueryInsertTableHandle(table.asPlainTable().getRemoteTableName(), columnNames.build(), columnTypes.build()); + ColumnMetadata pageSinkIdColumn = buildPageSinkIdColumn(columnNames.build()); + tempFields.add(toField(pageSinkIdColumn.getName(), pageSinkIdColumn.getType(), pageSinkIdColumn.getComment())); + + BigQueryClient client = bigQueryClientFactory.create(session); + String projectId = table.asPlainTable().getRemoteTableName().getProjectId(); + String schemaName = table.asPlainTable().getRemoteTableName().getDatasetName(); + + if (!schemaExists(session, schemaName)) { + throw new SchemaNotFoundException(schemaName); + } + + String temporaryTableName = generateTemporaryTableName(); + createTable(client, projectId, schemaName, temporaryTableName, tempFields.build(), Optional.empty()); + + return new BigQueryInsertTableHandle( + table.asPlainTable().getRemoteTableName(), + columnNames.build(), + columnTypes.build(), + temporaryTableName, + pageSinkIdColumn.getName()); + } + + private Optional finishInsert( + ConnectorSession session, + RemoteTableName targetTable, + RemoteTableName tempTable, + String pageSinkIdColumnName, + List columnNames, + Collection fragments) + { + Closer closer = Closer.create(); + closer.register(() -> bigQueryClientFactory.create(session).dropTable(tempTable.toTableId())); + + try { + BigQueryClient client = bigQueryClientFactory.create(session); + + RemoteTableName pageSinkTable = new RemoteTableName( + targetTable.getProjectId(), + targetTable.getDatasetName(), + generateTemporaryTableName()); + createTable(client, pageSinkTable.getProjectId(), pageSinkTable.getDatasetName(), pageSinkTable.getTableName(), ImmutableList.of(toField(pageSinkIdColumnName, TRINO_PAGE_SINK_ID_COLUMN_TYPE, null)), Optional.empty()); + closer.register(() -> bigQueryClientFactory.create(session).dropTable(pageSinkTable.toTableId())); + + InsertAllRequest.Builder batch = InsertAllRequest.newBuilder(pageSinkTable.toTableId()); + fragments.forEach(slice -> batch.addRow(ImmutableMap.of(pageSinkIdColumnName, slice.getLong(0)))); + client.insert(batch.build()); + + String columns = columnNames.stream().map(BigQueryUtil::quote).collect(Collectors.joining(", ")); + + String insertSql = format("INSERT INTO %s (%s) SELECT %s FROM %s temp_table " + + "WHERE EXISTS (SELECT 1 FROM %s page_sink_table WHERE page_sink_table.%s = temp_table.%s)", + quoted(targetTable), + columns, + columns, + quoted(tempTable), + quoted(pageSinkTable), + quote(pageSinkIdColumnName), + quote(pageSinkIdColumnName)); + + client.executeUpdate(QueryJobConfiguration.of(insertSql)); + } + finally { + try { + closer.close(); + } + catch (IOException e) { + throw new TrinoException(BIGQUERY_FAILED_TO_EXECUTE_QUERY, e); + } + } + + return Optional.empty(); } @Override public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) { - return Optional.empty(); + BigQueryInsertTableHandle handle = (BigQueryInsertTableHandle) insertHandle; + return finishInsert(session, handle.getRemoteTableName(), handle.getTemporaryRemoteTableName(), handle.getPageSinkIdColumnName(), handle.getColumnNames(), fragments); } @Override @@ -677,4 +781,23 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect } }; } + + private static ColumnMetadata buildPageSinkIdColumn(List otherColumnNames) + { + // While it's unlikely this column name will collide with client table columns, + // guarantee it will not by appending a deterministic suffix to it. + String baseColumnName = "trino_page_sink_id"; + String columnName = baseColumnName; + int suffix = 1; + while (otherColumnNames.contains(columnName)) { + columnName = baseColumnName + "_" + suffix; + suffix++; + } + return new ColumnMetadata(columnName, TRINO_PAGE_SINK_ID_COLUMN_TYPE); + } + + private static String generateTemporaryTableName() + { + return "tmp_trino_" + UUID.randomUUID().toString().replace("-", ""); + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOutputTableHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOutputTableHandle.java index 5e13c87dc548..cd5b448b34d2 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOutputTableHandle.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOutputTableHandle.java @@ -20,6 +20,7 @@ import io.trino.spi.type.Type; import java.util.List; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -30,17 +31,25 @@ public class BigQueryOutputTableHandle private final RemoteTableName remoteTableName; private final List columnNames; private final List columnTypes; + private final Optional temporaryTableName; + private final Optional pageSinkIdColumnName; @JsonCreator public BigQueryOutputTableHandle( @JsonProperty("remoteTableName") RemoteTableName remoteTableName, @JsonProperty("columnNames") List columnNames, - @JsonProperty("columnTypes") List columnTypes) + @JsonProperty("columnTypes") List columnTypes, + @JsonProperty("temporaryTableName") Optional temporaryTableName, + @JsonProperty("pageSinkIdColumnName") Optional pageSinkIdColumnName) { this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null"); this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null")); checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must have the same size"); + this.temporaryTableName = requireNonNull(temporaryTableName, "temporaryTableName is null"); + this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null"); + checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(), + "temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent"); } @JsonProperty @@ -60,4 +69,21 @@ public List getColumnTypes() { return columnTypes; } + + @JsonProperty + public Optional getTemporaryTableName() + { + return temporaryTableName; + } + + public Optional getTemporaryRemoteTableName() + { + return temporaryTableName.map(tableName -> new RemoteTableName(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), tableName)); + } + + @JsonProperty + public Optional getPageSinkIdColumnName() + { + return pageSinkIdColumnName; + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java index 550c624badd2..4b9f9c126d66 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java @@ -17,14 +17,17 @@ import com.google.cloud.bigquery.TableId; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; +import io.airlift.slice.Slices; import io.trino.spi.Page; import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.connector.ConnectorPageSinkId; import io.trino.spi.type.Type; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static com.google.common.base.Preconditions.checkArgument; @@ -39,15 +42,31 @@ public class BigQueryPageSink private final TableId tableId; private final List columnNames; private final List columnTypes; + private final ConnectorPageSinkId pageSinkId; + private final Optional pageSinkIdColumnName; - public BigQueryPageSink(BigQueryClient client, RemoteTableName remoteTableName, List columnNames, List columnTypes) + public BigQueryPageSink( + BigQueryClient client, + RemoteTableName remoteTableName, + List columnNames, + List columnTypes, + ConnectorPageSinkId pageSinkId, + Optional temporaryTableName, + Optional pageSinkIdColumnName) { this.client = requireNonNull(client, "client is null"); requireNonNull(remoteTableName, "remoteTableName is null"); this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null")); checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must have the same size"); - this.tableId = remoteTableName.toTableId(); + this.pageSinkId = requireNonNull(pageSinkId, "pageSinkId is null"); + requireNonNull(temporaryTableName, "temporaryTableName is null"); + this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null"); + checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(), + "temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent"); + this.tableId = temporaryTableName + .map(tableName -> TableId.of(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), tableName)) + .orElseGet(remoteTableName::toTableId); } @Override @@ -56,6 +75,7 @@ public CompletableFuture appendPage(Page page) InsertAllRequest.Builder batch = InsertAllRequest.newBuilder(tableId); for (int position = 0; position < page.getPositionCount(); position++) { Map row = new HashMap<>(); + pageSinkIdColumnName.ifPresent(column -> row.put(column, pageSinkId.getId())); for (int channel = 0; channel < page.getChannelCount(); channel++) { row.put(columnNames.get(channel), readNativeValue(columnTypes.get(channel), page.getBlock(channel), position)); } @@ -69,7 +89,7 @@ public CompletableFuture appendPage(Page page) @Override public CompletableFuture> finish() { - return completedFuture(ImmutableList.of()); + return completedFuture(ImmutableList.of(Slices.wrappedLongArray(pageSinkId.getId()))); } @Override diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java index f8e15d2f03db..8d7546f83636 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java @@ -23,6 +23,8 @@ import javax.inject.Inject; +import java.util.Optional; + import static java.util.Objects.requireNonNull; public class BigQueryPageSinkProvider @@ -40,13 +42,27 @@ public BigQueryPageSinkProvider(BigQueryClientFactory clientFactory) public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId) { BigQueryOutputTableHandle handle = (BigQueryOutputTableHandle) outputTableHandle; - return new BigQueryPageSink(clientFactory.createBigQueryClient(session), handle.getRemoteTableName(), handle.getColumnNames(), handle.getColumnTypes()); + return new BigQueryPageSink( + clientFactory.createBigQueryClient(session), + handle.getRemoteTableName(), + handle.getColumnNames(), + handle.getColumnTypes(), + pageSinkId, + handle.getTemporaryTableName(), + handle.getPageSinkIdColumnName()); } @Override public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId) { BigQueryInsertTableHandle handle = (BigQueryInsertTableHandle) insertTableHandle; - return new BigQueryPageSink(clientFactory.createBigQueryClient(session), handle.getRemoteTableName(), handle.getColumnNames(), handle.getColumnTypes()); + return new BigQueryPageSink( + clientFactory.createBigQueryClient(session), + handle.getRemoteTableName(), + handle.getColumnNames(), + handle.getColumnTypes(), + pageSinkId, + Optional.of(handle.getTemporaryTableName()), + Optional.of(handle.getPageSinkIdColumnName())); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryUtil.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryUtil.java index 7b22e06514ef..73260da9649c 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryUtil.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryUtil.java @@ -27,6 +27,7 @@ import static com.google.cloud.bigquery.TableDefinition.Type.TABLE; import static com.google.cloud.http.BaseHttpServiceException.UNKNOWN_CODE; import static com.google.common.base.Throwables.getCausalChain; +import static java.lang.String.format; public final class BigQueryUtil { @@ -82,4 +83,9 @@ public static String quote(String name) { return QUOTE + name.replace(QUOTE, ESCAPED_QUOTE) + QUOTE; } + + public static String quoted(RemoteTableName table) + { + return format("%s.%s.%s", quote(table.getProjectId()), quote(table.getDatasetName()), quote(table.getTableName())); + } } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryFailureRecoveryTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryFailureRecoveryTest.java new file mode 100644 index 000000000000..6f078afe32c2 --- /dev/null +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryFailureRecoveryTest.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.common.collect.ImmutableMap; +import io.trino.operator.RetryPolicy; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.testing.BaseFailureRecoveryTest; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testng.SkipException; + +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class BaseBigQueryFailureRecoveryTest + extends BaseFailureRecoveryTest +{ + public BaseBigQueryFailureRecoveryTest(RetryPolicy retryPolicy) + { + super(retryPolicy); + } + + @Override + protected QueryRunner createQueryRunner( + List> requiredTpchTables, + Map configProperties, + Map coordinatorProperties) + throws Exception + { + return BigQueryQueryRunner.createQueryRunner( + configProperties, + coordinatorProperties, + ImmutableMap.of(), + requiredTpchTables, + runner -> { + runner.installPlugin(new FileSystemExchangePlugin()); + runner.loadExchangeManager("filesystem", ImmutableMap.builder() + .put("exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager") + .buildOrThrow()); + }); + } + + @Override + protected boolean areWriteRetriesSupported() + { + return true; + } + + @Override + protected void testAnalyzeTable() + { + assertThatThrownBy(super::testAnalyzeTable).hasMessageMatching("This connector does not support analyze"); + throw new SkipException("skipped"); + } + + @Override + protected void testDelete() + { + assertThatThrownBy(super::testDeleteWithSubquery).hasMessageContaining("This connector does not support modifying table rows"); + throw new SkipException("skipped"); + } + + @Override + protected void testDeleteWithSubquery() + { + assertThatThrownBy(super::testDeleteWithSubquery).hasMessageContaining("This connector does not support modifying table rows"); + throw new SkipException("skipped"); + } + + @Override + protected void testMerge() + { + assertThatThrownBy(super::testMerge).hasMessageContaining("This connector does not support modifying table rows"); + throw new SkipException("skipped"); + } + + @Override + protected void testRefreshMaterializedView() + { + assertThatThrownBy(super::testRefreshMaterializedView) + .hasMessageContaining("This connector does not support creating materialized views"); + throw new SkipException("skipped"); + } + + @Override + protected void testUpdate() + { + assertThatThrownBy(super::testUpdate).hasMessageContaining("This connector does not support modifying table rows"); + throw new SkipException("skipped"); + } + + @Override + protected void testUpdateWithSubquery() + { + assertThatThrownBy(super::testUpdateWithSubquery).hasMessageContaining("This connector does not support modifying table rows"); + throw new SkipException("skipped"); + } +} diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryQueryRunner.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryQueryRunner.java index e490db5d78cc..32a2e4b72d29 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryQueryRunner.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryQueryRunner.java @@ -30,6 +30,7 @@ import io.trino.Session; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; import io.trino.testing.sql.SqlExecutor; import io.trino.tpch.TpchTable; import org.intellij.lang.annotations.Language; @@ -42,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import static com.google.cloud.bigquery.BigQuery.DatasetDeleteOption.deleteContents; import static com.google.cloud.bigquery.BigQuery.DatasetListOption.labelFilter; @@ -65,11 +67,24 @@ public static DistributedQueryRunner createQueryRunner( Map connectorProperties, Iterable> tables) throws Exception + { + return createQueryRunner(extraProperties, ImmutableMap.of(), connectorProperties, tables, runner -> {}); + } + + public static DistributedQueryRunner createQueryRunner( + Map extraProperties, + Map coordinatorProperties, + Map connectorProperties, + Iterable> tables, + Consumer moreSetup) + throws Exception { DistributedQueryRunner queryRunner = null; try { queryRunner = DistributedQueryRunner.builder(createSession()) .setExtraProperties(extraProperties) + .setCoordinatorProperties(coordinatorProperties) + .setAdditionalSetup(moreSetup) .build(); queryRunner.installPlugin(new TpchPlugin()); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryQueryFailureRecoveryTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryQueryFailureRecoveryTest.java new file mode 100644 index 000000000000..e1a05be4c71d --- /dev/null +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryQueryFailureRecoveryTest.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import io.trino.operator.RetryPolicy; + +public class TestBigQueryQueryFailureRecoveryTest + extends BaseBigQueryFailureRecoveryTest +{ + public TestBigQueryQueryFailureRecoveryTest() + { + super(RetryPolicy.QUERY); + } +} diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryTaskFailureRecoveryTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryTaskFailureRecoveryTest.java new file mode 100644 index 000000000000..4456ccb6a6da --- /dev/null +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryTaskFailureRecoveryTest.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import io.trino.operator.RetryPolicy; + +public class TestBigQueryTaskFailureRecoveryTest + extends BaseBigQueryFailureRecoveryTest +{ + public TestBigQueryTaskFailureRecoveryTest() + { + super(RetryPolicy.TASK); + } +}