Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/sphinx/admin/fault-tolerant-execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ depending on the desired :ref:`retry policy <fte-retry-policy>`.
* Fault tolerant execution of :ref:`write operations <sql-write-operations>`
is supported by the following connectors:

* :doc:`/connector/bigquery`
* :doc:`/connector/delta-lake`
* :doc:`/connector/hive`
* :doc:`/connector/iceberg`
Expand Down
8 changes: 8 additions & 0 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange-filesystem</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
Expand Down Expand Up @@ -430,6 +436,7 @@
<exclude>**/TestBigQueryMetadata.java</exclude>
<exclude>**/TestBigQueryInstanceCleaner.java</exclude>
<exclude>**/TestBigQueryCaseInsensitiveMapping.java</exclude>
<exclude>**/TestBigQuery*FailureRecoveryTest.java</exclude>
</excludes>
</configuration>
</plugin>
Expand All @@ -455,6 +462,7 @@
<include>**/TestBigQueryTypeMapping.java</include>
<include>**/TestBigQueryMetadata.java</include>
<include>**/TestBigQueryInstanceCleaner.java</include>
<include>**/TestBigQuery*FailureRecoveryTest.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,23 @@ public class BigQueryInsertTableHandle
private final RemoteTableName remoteTableName;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final String temporaryTableName;
private final String pageSinkIdColumnName;

@JsonCreator
public BigQueryInsertTableHandle(
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes)
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("temporaryTableName") String temporaryTableName,
@JsonProperty("pageSinkIdColumnName") String pageSinkIdColumnName)
{
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must have the same size");
this.temporaryTableName = requireNonNull(temporaryTableName, "temporaryTableName is null");
this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null");
}

@JsonProperty
Expand All @@ -60,4 +66,21 @@ public List<Type> getColumnTypes()
{
return columnTypes;
}

@JsonProperty
public String getTemporaryTableName()
{
return temporaryTableName;
}

public RemoteTableName getTemporaryRemoteTableName()
{
return new RemoteTableName(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), temporaryTableName);
}

@JsonProperty
public String getPageSinkIdColumnName()
{
return pageSinkIdColumnName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
Expand All @@ -30,6 +31,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.plugin.bigquery.BigQueryClient.RemoteDatabaseObject;
Expand Down Expand Up @@ -67,27 +69,33 @@
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;

import javax.inject.Inject;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.bigquery.BigQueryClient.buildColumnHandles;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_UNSUPPORTED_OPERATION;
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE;
Expand All @@ -97,7 +105,7 @@
import static io.trino.plugin.bigquery.BigQueryType.toField;
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
import static io.trino.plugin.bigquery.BigQueryUtil.quote;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.plugin.bigquery.BigQueryUtil.quoted;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -107,6 +115,7 @@ public class BigQueryMetadata
implements ConnectorMetadata
{
private static final Logger log = Logger.get(BigQueryMetadata.class);
private static final Type TRINO_PAGE_SINK_ID_COLUMN_TYPE = BigintType.BIGINT;

static final int DEFAULT_NUMERIC_TYPE_PRECISION = 38;
static final int DEFAULT_NUMERIC_TYPE_SCALE = 9;
Expand Down Expand Up @@ -395,7 +404,7 @@ public void dropSchema(ConnectorSession session, String schemaName)
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
try {
createTable(session, tableMetadata);
createTable(session, tableMetadata, Optional.empty());
}
catch (BigQueryException e) {
if (ignoreExisting && e.getCode() == 409) {
Expand All @@ -408,13 +417,13 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
if (retryMode != RetryMode.NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
return createTable(session, tableMetadata);
ColumnMetadata pageSinkIdColumn = buildPageSinkIdColumn(tableMetadata.getColumns().stream()
.map(ColumnMetadata::getName)
.collect(toImmutableList()));
return createTable(session, tableMetadata, Optional.of(pageSinkIdColumn));
}

private BigQueryOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
private BigQueryOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ColumnMetadata> pageSinkIdColumn)
{
SchemaTableName schemaTableName = tableMetadata.getTable();
String schemaName = schemaTableName.getSchemaName();
Expand All @@ -426,10 +435,13 @@ private BigQueryOutputTableHandle createTable(ConnectorSession session, Connecto

int columnSize = tableMetadata.getColumns().size();
ImmutableList.Builder<Field> fields = ImmutableList.builderWithExpectedSize(columnSize);
// Note: this list is only actually used when pageSinkIdColumn isPresent
ImmutableList.Builder<Field> tempFields = ImmutableList.builderWithExpectedSize(columnSize + 1);
ImmutableList.Builder<String> columnsNames = ImmutableList.builderWithExpectedSize(columnSize);
ImmutableList.Builder<Type> columnsTypes = ImmutableList.builderWithExpectedSize(columnSize);
for (ColumnMetadata column : tableMetadata.getColumns()) {
fields.add(toField(column.getName(), column.getType(), column.getComment()));
tempFields.add(toField(column.getName(), column.getType(), column.getComment()));
columnsNames.add(column.getName());
columnsTypes.add(column.getType());
}
Expand All @@ -438,20 +450,41 @@ private BigQueryOutputTableHandle createTable(ConnectorSession session, Connecto
String projectId = client.getProjectId();
String remoteSchemaName = getRemoteSchemaName(client, projectId, schemaName);

TableId tableId = TableId.of(projectId, remoteSchemaName, tableName);
TableDefinition tableDefinition = StandardTableDefinition.of(Schema.of(fields.build()));
TableId tableId = createTable(client, projectId, remoteSchemaName, tableName, fields.build(), tableMetadata.getComment());

Optional<String> temporaryTableName = pageSinkIdColumn.map(column -> {
tempFields.add(toField(column.getName(), column.getType(), column.getComment()));
String tempTableName = generateTemporaryTableName();
createTable(client, projectId, remoteSchemaName, tempTableName, tempFields.build(), tableMetadata.getComment());
return tempTableName;
});

return new BigQueryOutputTableHandle(
new RemoteTableName(tableId),
columnsNames.build(),
columnsTypes.build(),
temporaryTableName,
pageSinkIdColumn.map(ColumnMetadata::getName));
}

private TableId createTable(BigQueryClient client, String projectId, String datasetName, String tableName, List<Field> fields, Optional<String> tableComment)
{
TableId tableId = TableId.of(projectId, datasetName, tableName);
TableDefinition tableDefinition = StandardTableDefinition.of(Schema.of(fields));
TableInfo.Builder tableInfo = TableInfo.newBuilder(tableId, tableDefinition);
tableMetadata.getComment().ifPresent(tableInfo::setDescription);
tableComment.ifPresent(tableInfo::setDescription);

client.createTable(tableInfo.build());

return new BigQueryOutputTableHandle(new RemoteTableName(tableId), columnsNames.build(), columnsTypes.build());
return tableId;
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
BigQueryOutputTableHandle handle = (BigQueryOutputTableHandle) tableHandle;
checkState(handle.getTemporaryTableName().isPresent(), "Unexpected use of finishCreateTable without a temporaryTableName present");
return finishInsert(session, handle.getRemoteTableName(), handle.getTemporaryRemoteTableName().orElseThrow(), handle.getPageSinkIdColumnName().orElseThrow(), handle.getColumnNames(), fragments);
}

@Override
Expand Down Expand Up @@ -484,27 +517,98 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
if (retryMode != RetryMode.NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
BigQueryTableHandle table = (BigQueryTableHandle) tableHandle;
if (isWildcardTable(TableDefinition.Type.valueOf(table.asPlainTable().getType()), table.asPlainTable().getRemoteTableName().getTableName())) {
throw new TrinoException(BIGQUERY_UNSUPPORTED_OPERATION, "This connector does not support inserting into wildcard tables");
}
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columns.size());
ImmutableList.Builder<Type> columnTypes = ImmutableList.builderWithExpectedSize(columns.size());
ImmutableList.Builder<Field> tempFields = ImmutableList.builderWithExpectedSize(columns.size() + 1);

for (ColumnHandle columnHandle : columns) {
BigQueryColumnHandle column = (BigQueryColumnHandle) columnHandle;
tempFields.add(toField(column.getName(), column.getTrinoType(), column.getColumnMetadata().getComment()));
columnNames.add(column.getName());
columnTypes.add(column.getTrinoType());
}
return new BigQueryInsertTableHandle(table.asPlainTable().getRemoteTableName(), columnNames.build(), columnTypes.build());
ColumnMetadata pageSinkIdColumn = buildPageSinkIdColumn(columnNames.build());
tempFields.add(toField(pageSinkIdColumn.getName(), pageSinkIdColumn.getType(), pageSinkIdColumn.getComment()));

BigQueryClient client = bigQueryClientFactory.create(session);
String projectId = table.asPlainTable().getRemoteTableName().getProjectId();
String schemaName = table.asPlainTable().getRemoteTableName().getDatasetName();

if (!schemaExists(session, schemaName)) {
throw new SchemaNotFoundException(schemaName);
}

String temporaryTableName = generateTemporaryTableName();
createTable(client, projectId, schemaName, temporaryTableName, tempFields.build(), Optional.empty());

return new BigQueryInsertTableHandle(
table.asPlainTable().getRemoteTableName(),
columnNames.build(),
columnTypes.build(),
temporaryTableName,
pageSinkIdColumn.getName());
}

private Optional<ConnectorOutputMetadata> finishInsert(
ConnectorSession session,
RemoteTableName targetTable,
RemoteTableName tempTable,
String pageSinkIdColumnName,
List<String> columnNames,
Collection<Slice> fragments)
{
Closer closer = Closer.create();
closer.register(() -> bigQueryClientFactory.create(session).dropTable(tempTable.toTableId()));

try {
BigQueryClient client = bigQueryClientFactory.create(session);

RemoteTableName pageSinkTable = new RemoteTableName(
targetTable.getProjectId(),
targetTable.getDatasetName(),
generateTemporaryTableName());
createTable(client, pageSinkTable.getProjectId(), pageSinkTable.getDatasetName(), pageSinkTable.getTableName(), ImmutableList.of(toField(pageSinkIdColumnName, TRINO_PAGE_SINK_ID_COLUMN_TYPE, null)), Optional.empty());
closer.register(() -> bigQueryClientFactory.create(session).dropTable(pageSinkTable.toTableId()));

InsertAllRequest.Builder batch = InsertAllRequest.newBuilder(pageSinkTable.toTableId());
fragments.forEach(slice -> batch.addRow(ImmutableMap.of(pageSinkIdColumnName, slice.getLong(0))));
client.insert(batch.build());

String columns = columnNames.stream().map(BigQueryUtil::quote).collect(Collectors.joining(", "));

String insertSql = format("INSERT INTO %s (%s) SELECT %s FROM %s temp_table " +
"WHERE EXISTS (SELECT 1 FROM %s page_sink_table WHERE page_sink_table.%s = temp_table.%s)",
quoted(targetTable),
columns,
columns,
quoted(tempTable),
quoted(pageSinkTable),
quote(pageSinkIdColumnName),
quote(pageSinkIdColumnName));

client.executeUpdate(QueryJobConfiguration.of(insertSql));
}
finally {
try {
closer.close();
}
catch (IOException e) {
throw new TrinoException(BIGQUERY_FAILED_TO_EXECUTE_QUERY, e);
}
}

return Optional.empty();
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
BigQueryInsertTableHandle handle = (BigQueryInsertTableHandle) insertHandle;
return finishInsert(session, handle.getRemoteTableName(), handle.getTemporaryRemoteTableName(), handle.getPageSinkIdColumnName(), handle.getColumnNames(), fragments);
}

@Override
Expand Down Expand Up @@ -677,4 +781,23 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
}
};
}

private static ColumnMetadata buildPageSinkIdColumn(List<String> otherColumnNames)
{
// While it's unlikely this column name will collide with client table columns,
// guarantee it will not by appending a deterministic suffix to it.
String baseColumnName = "trino_page_sink_id";
String columnName = baseColumnName;
int suffix = 1;
while (otherColumnNames.contains(columnName)) {
columnName = baseColumnName + "_" + suffix;
suffix++;
}
return new ColumnMetadata(columnName, TRINO_PAGE_SINK_ID_COLUMN_TYPE);
}

private static String generateTemporaryTableName()
{
return "tmp_trino_" + UUID.randomUUID().toString().replace("-", "");
}
}
Loading