Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public BigQueryClient(
this.configProjectId = requireNonNull(configProjectId, "projectId is null");
}

public Optional<RemoteDatabaseObject> toRemoteDataset(DatasetId datasetId)
{
return toRemoteDataset(datasetId.getProject(), datasetId.getDataset());
}

public Optional<RemoteDatabaseObject> toRemoteDataset(String projectId, String datasetName)
{
requireNonNull(projectId, "projectId is null");
Expand Down Expand Up @@ -221,6 +226,16 @@ public String getProjectId()
return projectId;
}

protected DatasetId toDatasetId(String schemaName)
{
return DatasetId.of(getProjectId(), schemaName);
}

protected String toSchemaName(DatasetId datasetId)
{
return datasetId.getDataset();
}
Comment on lines 234 to 237
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little wondering why we need this method. There are still many DatasetId.getDataset usages.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a single use of this method when we translate BigQuery datasets into Trino schemas. In all other places, we receive a Trino schema and do a reverse translation.


public Iterable<Dataset> listDatasets(String projectId)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ private List<String> listRemoteSchemaNames(ConnectorSession session)
public boolean schemaExists(ConnectorSession session, String schemaName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
DatasetId localDatasetId = client.toDatasetId(schemaName);

// Overridden to make sure an error message is returned in case of an ambiguous schema name
log.debug("schemaExists(session=%s)", session);
String projectId = client.getProjectId();
return client.toRemoteDataset(projectId, schemaName)
return client.toRemoteDataset(localDatasetId)
.map(RemoteDatabaseObject::getOnlyRemoteName)
.filter(remoteSchema -> client.getDataset(DatasetId.of(projectId, remoteSchema)) != null)
.filter(remoteSchema -> client.getDataset(DatasetId.of(localDatasetId.getProject(), remoteSchema)) != null)
Copy link
Copy Markdown
Member

@ebyhr ebyhr Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The project-id is retrieved from client and passed to client.getDataset method. How about changing getDataset's method argument for taking only remoteSchema? Same for toRemoteDataset method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DatasetId uniquely identifies a dataset, so if client.getDataset receive it and use a different project ID (uses only part of the composite key) it would be an incorrect behavior which only coincidentally works because we always use the same project ID here.

.isPresent();
}

Expand All @@ -185,15 +185,23 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
BigQueryClient client = bigQueryClientFactory.create(session);

log.debug("listTables(session=%s, schemaName=%s)", session, schemaName);
String projectId = client.getProjectId();

// filter ambiguous schemas
Optional<String> remoteSchema = schemaName.flatMap(schema -> client.toRemoteDataset(projectId, schema)
.filter(dataset -> !dataset.isAmbiguous())
.map(RemoteDatabaseObject::getOnlyRemoteName));

Set<String> remoteSchemaNames = remoteSchema.map(ImmutableSet::of)
.orElseGet(() -> ImmutableSet.copyOf(listRemoteSchemaNames(session)));
String projectId;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

projectId is always the same between if and else, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be


Set<String> remoteSchemaNames;
if (schemaName.isPresent()) {
DatasetId localDatasetId = client.toDatasetId(schemaName.get());
projectId = localDatasetId.getProject();
// filter ambiguous schemas
Optional<String> remoteSchema = client.toRemoteDataset(localDatasetId)
.filter(dataset -> !dataset.isAmbiguous())
.map(RemoteDatabaseObject::getOnlyRemoteName);

remoteSchemaNames = remoteSchema.map(ImmutableSet::of).orElse(ImmutableSet.of());
}
else {
projectId = client.getProjectId();
remoteSchemaNames = ImmutableSet.copyOf(listRemoteSchemaNames(session));
}

return processInParallel(remoteSchemaNames.stream().toList(), remoteSchemaName -> listTablesInRemoteSchema(client, projectId, remoteSchemaName))
.flatMap(Collection::stream)
Expand All @@ -211,7 +219,7 @@ private List<SchemaTableName> listTablesInRemoteSchema(BigQueryClient client, St
.filter(RemoteDatabaseObject::isAmbiguous)
.ifPresentOrElse(
remoteTable -> log.debug("Filtered out [%s.%s] from list of tables due to ambiguous name", remoteSchemaName, table.getTableId().getTable()),
() -> tableNames.add(new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable())));
() -> tableNames.add(new SchemaTableName(client.toSchemaName(DatasetId.of(projectId, table.getTableId().getDataset())), table.getTableId().getTable())));
}
}
catch (BigQueryException e) {
Expand All @@ -230,15 +238,15 @@ private List<SchemaTableName> listTablesInRemoteSchema(BigQueryClient client, St
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
String projectId = client.getProjectId();
log.debug("getTableHandle(session=%s, schemaTableName=%s)", session, schemaTableName);
String remoteSchemaName = client.toRemoteDataset(projectId, schemaTableName.getSchemaName())
DatasetId localDatasetId = client.toDatasetId(schemaTableName.getSchemaName());
String remoteSchemaName = client.toRemoteDataset(localDatasetId)
.map(RemoteDatabaseObject::getOnlyRemoteName)
.orElse(schemaTableName.getSchemaName());
String remoteTableName = client.toRemoteTable(projectId, remoteSchemaName, schemaTableName.getTableName())
.orElse(localDatasetId.getDataset());
String remoteTableName = client.toRemoteTable(localDatasetId.getProject(), remoteSchemaName, schemaTableName.getTableName())
.map(RemoteDatabaseObject::getOnlyRemoteName)
.orElse(schemaTableName.getTableName());
Optional<TableInfo> tableInfo = client.getTable(TableId.of(projectId, remoteSchemaName, remoteTableName));
Optional<TableInfo> tableInfo = client.getTable(TableId.of(localDatasetId.getProject(), remoteSchemaName, remoteTableName));
if (tableInfo.isEmpty()) {
log.debug("Table [%s.%s] was not found", schemaTableName.getSchemaName(), schemaTableName.getTableName());
return null;
Expand All @@ -263,14 +271,14 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
private ConnectorTableHandle getTableHandleIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
String projectId = client.getProjectId();
String remoteSchemaName = client.toRemoteDataset(projectId, schemaTableName.getSchemaName())
DatasetId localDatasetId = client.toDatasetId(schemaTableName.getSchemaName());
String remoteSchemaName = client.toRemoteDataset(localDatasetId)
.map(RemoteDatabaseObject::getAnyRemoteName)
.orElse(schemaTableName.getSchemaName());
String remoteTableName = client.toRemoteTable(projectId, remoteSchemaName, schemaTableName.getTableName())
.orElse(localDatasetId.getDataset());
String remoteTableName = client.toRemoteTable(localDatasetId.getProject(), remoteSchemaName, schemaTableName.getTableName())
.map(RemoteDatabaseObject::getAnyRemoteName)
.orElse(schemaTableName.getTableName());
Optional<TableInfo> tableInfo = client.getTable(TableId.of(projectId, remoteSchemaName, remoteTableName));
Optional<TableInfo> tableInfo = client.getTable(TableId.of(localDatasetId.getProject(), remoteSchemaName, remoteTableName));
if (tableInfo.isEmpty()) {
log.debug("Table [%s.%s] was not found", schemaTableName.getSchemaName(), schemaTableName.getTableName());
return null;
Expand Down Expand Up @@ -309,14 +317,14 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl
private Optional<SystemTable> getViewDefinitionSystemTable(ConnectorSession session, SchemaTableName viewDefinitionTableName, SchemaTableName sourceTableName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
String projectId = client.getProjectId();
String remoteSchemaName = client.toRemoteDataset(projectId, sourceTableName.getSchemaName())
DatasetId localDatasetId = client.toDatasetId(sourceTableName.getSchemaName());
String remoteSchemaName = client.toRemoteDataset(localDatasetId)
.map(RemoteDatabaseObject::getOnlyRemoteName)
.orElseThrow(() -> new TableNotFoundException(viewDefinitionTableName));
String remoteTableName = client.toRemoteTable(projectId, remoteSchemaName, sourceTableName.getTableName())
String remoteTableName = client.toRemoteTable(localDatasetId.getProject(), remoteSchemaName, sourceTableName.getTableName())
.map(RemoteDatabaseObject::getOnlyRemoteName)
.orElseThrow(() -> new TableNotFoundException(viewDefinitionTableName));
TableInfo tableInfo = client.getTable(TableId.of(projectId, remoteSchemaName, remoteTableName))
TableInfo tableInfo = client.getTable(TableId.of(localDatasetId.getProject(), remoteSchemaName, remoteTableName))
.orElseThrow(() -> new TableNotFoundException(viewDefinitionTableName));
if (!(tableInfo.getDefinition() instanceof ViewDefinition)) {
throw new TableNotFoundException(viewDefinitionTableName);
Expand Down Expand Up @@ -422,17 +430,17 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
{
BigQueryClient client = bigQueryClientFactory.create(session);
checkArgument(properties.isEmpty(), "Can't have properties for schema creation");
DatasetInfo datasetInfo = DatasetInfo.newBuilder(client.getProjectId(), schemaName).build();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(client.toDatasetId(schemaName)).build();
client.createSchema(datasetInfo);
}

@Override
public void dropSchema(ConnectorSession session, String schemaName, boolean cascade)
{
BigQueryClient client = bigQueryClientFactory.create(session);
String projectId = client.getProjectId();
String remoteSchemaName = getRemoteSchemaName(client, projectId, schemaName);
client.dropSchema(DatasetId.of(projectId, remoteSchemaName), cascade);
DatasetId localDatasetId = client.toDatasetId(schemaName);
String remoteSchemaName = getRemoteSchemaName(client, localDatasetId.getProject(), localDatasetId.getDataset());
client.dropSchema(DatasetId.of(localDatasetId.getProject(), remoteSchemaName), cascade);
}

private void setRollback(Runnable action)
Expand Down Expand Up @@ -492,8 +500,8 @@ private BigQueryOutputTableHandle createTable(ConnectorSession session, Connecto
}

BigQueryClient client = bigQueryClientFactory.create(session);
String projectId = client.getProjectId();
String remoteSchemaName = getRemoteSchemaName(client, projectId, schemaName);
DatasetId localDatasetId = client.toDatasetId(schemaName);
String remoteSchemaName = getRemoteSchemaName(client, localDatasetId.getProject(), localDatasetId.getDataset());

Closer closer = Closer.create();
setRollback(() -> {
Expand All @@ -505,13 +513,13 @@ private BigQueryOutputTableHandle createTable(ConnectorSession session, Connecto
}
});

TableId tableId = createTable(client, projectId, remoteSchemaName, tableName, fields.build(), tableMetadata.getComment());
TableId tableId = createTable(client, localDatasetId.getProject(), remoteSchemaName, tableName, fields.build(), tableMetadata.getComment());
closer.register(() -> bigQueryClientFactory.create(session).dropTable(tableId));

Optional<String> temporaryTableName = pageSinkIdColumn.map(column -> {
tempFields.add(typeManager.toField(column.getName(), column.getType(), column.getComment()));
String tempTableName = generateTemporaryTableName(session);
TableId tempTableId = createTable(client, projectId, remoteSchemaName, tempTableName, tempFields.build(), tableMetadata.getComment());
TableId tempTableId = createTable(client, localDatasetId.getProject(), remoteSchemaName, tempTableName, tempFields.build(), tableMetadata.getComment());
closer.register(() -> bigQueryClientFactory.create(session).dropTable(tempTableId));
return tempTableName;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import io.trino.spi.connector.ConnectorSession;

interface BigQueryOptionsConfigurer
public interface BigQueryOptionsConfigurer
{
BigQueryOptions.Builder configure(BigQueryOptions.Builder builder, ConnectorSession session);

Expand Down