Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
Expand Down Expand Up @@ -68,7 +66,6 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_TABLE_ERROR;
Expand Down Expand Up @@ -384,15 +381,6 @@ public String selectSql(TableId table, String formattedColumns)
return format("SELECT %s FROM `%s`", formattedColumns, tableName);
}

public void insert(InsertAllRequest insertAllRequest)
{
InsertAllResponse response = bigQuery.insertAll(insertAllRequest);
if (response.hasErrors()) {
// Note that BigQuery doesn't rollback inserted rows
throw new TrinoException(BIGQUERY_FAILED_TO_EXECUTE_QUERY, format("Failed to insert rows: %s", response.getInsertErrors()));
}
}

private static String fullTableName(TableId remoteTableId)
{
String remoteSchemaName = remoteTableId.getDataset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
*/
package io.trino.plugin.bigquery;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
Expand All @@ -26,6 +26,12 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.ViewDefinition;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -77,6 +83,7 @@
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import org.json.JSONArray;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -95,12 +102,14 @@
import java.util.stream.Stream;

import static com.google.cloud.bigquery.StandardSQLTypeName.INT64;
import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.util.concurrent.Futures.allAsList;
import static io.trino.plugin.base.TemporaryTables.generateTemporaryTableName;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_TABLE_ERROR;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_UNSUPPORTED_OPERATION;
Expand Down Expand Up @@ -129,18 +138,21 @@ public class BigQueryMetadata
private static final String VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX = "$view_definition";

private final BigQueryClientFactory bigQueryClientFactory;
private final BigQueryWriteClientFactory writeClientFactory;
private final BigQueryTypeManager typeManager;
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();
private final ListeningExecutorService executorService;
private final boolean isLegacyMetadataListing;

public BigQueryMetadata(
BigQueryClientFactory bigQueryClientFactory,
BigQueryWriteClientFactory writeClientFactory,
BigQueryTypeManager typeManager,
ListeningExecutorService executorService,
boolean isLegacyMetadataListing)
{
this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory is null");
this.writeClientFactory = requireNonNull(writeClientFactory, "writeClientFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.executorService = requireNonNull(executorService, "executorService is null");
this.isLegacyMetadataListing = isLegacyMetadataListing;
Expand Down Expand Up @@ -674,9 +686,7 @@ private Optional<ConnectorOutputMetadata> finishInsert(
createTable(client, pageSinkTable.projectId(), pageSinkTable.datasetName(), pageSinkTable.tableName(), ImmutableList.of(typeManager.toField(pageSinkIdColumnName, TRINO_PAGE_SINK_ID_COLUMN_TYPE, null)), Optional.empty());
closer.register(() -> bigQueryClientFactory.create(session).dropTable(pageSinkTable.toTableId()));

InsertAllRequest.Builder batch = InsertAllRequest.newBuilder(pageSinkTable.toTableId());
fragments.forEach(slice -> batch.addRow(ImmutableMap.of(pageSinkIdColumnName, slice.getLong(0))));
client.insert(batch.build());
insertIntoSinkTable(session, pageSinkTable, pageSinkIdColumnName, fragments);

String columns = columnNames.stream().map(BigQueryUtil::quote).collect(Collectors.joining(", "));

Expand Down Expand Up @@ -704,6 +714,29 @@ private Optional<ConnectorOutputMetadata> finishInsert(
return Optional.empty();
}

private void insertIntoSinkTable(ConnectorSession session, RemoteTableName pageSinkTable, String pageSinkIdColumnName, Collection<Slice> fragments)
{
try (BigQueryWriteClient writeClient = writeClientFactory.create(session)) {
CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(TableName.of(pageSinkTable.projectId(), pageSinkTable.datasetName(), pageSinkTable.tableName()).toString())
.setWriteStream(WriteStream.newBuilder().setType(COMMITTED).build())
.build();
WriteStream stream = writeClient.createWriteStream(createWriteStreamRequest);
JSONArray batch = new JSONArray();
fragments.forEach(slice -> batch.put(ImmutableMap.of(pageSinkIdColumnName, slice.getLong(0))));
try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(stream.getName(), stream.getTableSchema(), writeClient).build()) {
ApiFuture<AppendRowsResponse> future = writer.append(batch);
AppendRowsResponse response = future.get();
if (response.hasError()) {
throw new TrinoException(BIGQUERY_BAD_WRITE, format("Response has error: %s", response.getError().getMessage()));
}
}
catch (Exception e) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to insert rows", e);
}
}
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ public class DefaultBigQueryMetadataFactory
implements BigQueryMetadataFactory
{
private final BigQueryClientFactory bigQueryClient;
private final BigQueryWriteClientFactory writeClientFactory;
private final ListeningExecutorService executorService;
private final BigQueryTypeManager typeManager;
private final boolean isLegacyMetadataListing;

@Inject
public DefaultBigQueryMetadataFactory(
BigQueryClientFactory bigQueryClient,
BigQueryWriteClientFactory writeClientFactory,
BigQueryTypeManager typeManager,
ListeningExecutorService executorService,
BigQueryConfig config)
{
this.bigQueryClient = requireNonNull(bigQueryClient, "bigQueryClient is null");
this.writeClientFactory = requireNonNull(writeClientFactory, "writeClientFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.executorService = requireNonNull(executorService, "executorService is null");
this.isLegacyMetadataListing = config.isLegacyMetadataListing();
Expand All @@ -42,6 +45,6 @@ public DefaultBigQueryMetadataFactory(
@Override
public BigQueryMetadata create(BigQueryTransactionHandle transaction)
{
return new BigQueryMetadata(bigQueryClient, typeManager, executorService, isLegacyMetadataListing);
return new BigQueryMetadata(bigQueryClient, writeClientFactory, typeManager, executorService, isLegacyMetadataListing);
}
}