Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobException;
import com.google.cloud.bigquery.JobInfo;
Expand All @@ -32,6 +33,7 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.base.Suppliers;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
Expand All @@ -40,6 +42,7 @@
import io.trino.cache.EvictableCacheBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
Expand Down Expand Up @@ -408,6 +411,34 @@ private static String fullTableName(TableId remoteTableId)
return format("%s.%s.%s", remoteTableId.getProject(), remoteTableId.getDataset(), remoteTableId.getTable());
}

public Stream<RelationColumnsMetadata> listRelationColumnsMetadata(ConnectorSession session, BigQueryClient client, String projectId, String remoteSchemaName)
{
TableResult result = client.executeQuery(session, """
SELECT
table_catalog,
table_schema,
table_name,
array_agg(column_name order by ordinal_position),
array_agg(data_type order by ordinal_position),
FROM %s.INFORMATION_SCHEMA.COLUMNS
GROUP BY table_catalog, table_schema, table_name
""".formatted(quote(remoteSchemaName)));
String schemaName = client.toSchemaName(DatasetId.of(projectId, remoteSchemaName));
Copy link
Copy Markdown
Member

@ebyhr ebyhr May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to build DatasetId and call getDataset method (when is it different from remoteSchemaName)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it for consistency. This connector has a local to remote names mapping.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #19860

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it's for extension. Such change should go to the internal repository.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we'd have to revert #19860. It's out of the scope of this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to revert the PR. Just removing this line is sufficient.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, this is required for consistency. We either allow this local-to-remote name mapping, or not.

Copy link
Copy Markdown
Member

@ebyhr ebyhr May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at listRelationCommentMetadata() method. Also, can you write a test that doesn't pass without this line if you still want to keep it? If not, please remove it.

this local-to-remote name mapping

Please explain when schemaName gets a different value from remoteSchemaName in this repository:

    String schemaName = client.toSchemaName(DatasetId.of(projectId, remoteSchemaName));
...
    protected String toSchemaName(DatasetId datasetId)
    {
        return datasetId.getDataset();
    }

return result.streamValues()
.map(row -> {
RemoteTableName remoteTableName = new RemoteTableName(
row.get(0).getStringValue(),
row.get(1).getStringValue(),
row.get(2).getStringValue());
List<String> names = row.get(3).getRepeatedValue().stream().map(FieldValue::getStringValue).collect(toImmutableList());
List<String> types = row.get(4).getRepeatedValue().stream().map(FieldValue::getStringValue).collect(toImmutableList());
verify(names.size() == types.size(), "Mismatched column names and types");
return RelationColumnsMetadata.forTable(
new SchemaTableName(schemaName, remoteTableName.tableName()),
typeManager.convertToTrinoType(names, types, Suppliers.memoize(() -> getTable(remoteTableName.toTableId()))));
});
}

public Stream<RelationCommentMetadata> listRelationCommentMetadata(ConnectorSession session, BigQueryClient client, String schemaName)
{
TableResult result = client.executeQuery(session, """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SaveMode;
Expand Down Expand Up @@ -263,6 +264,50 @@ private List<SchemaTableName> listTablesInRemoteSchema(BigQueryClient client, St
return tableNames.build();
}

@Override
public Iterator<RelationColumnsMetadata> streamRelationColumns(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
if (isLegacyMetadataListing) {
return ConnectorMetadata.super.streamRelationColumns(session, schemaName, relationFilter);
}
BigQueryClient client = bigQueryClientFactory.create(session);
String projectId;
List<String> schemaNames;
if (schemaName.isPresent()) {
DatasetId localDatasetId = client.toDatasetId(schemaName.get());
projectId = localDatasetId.getProject();
String remoteSchemaName = getRemoteSchemaName(client, localDatasetId.getProject(), localDatasetId.getDataset());
schemaNames = List.of(remoteSchemaName);
}
else {
projectId = client.getProjectId();
schemaNames = listRemoteSchemaNames(session);
}
Map<SchemaTableName, RelationColumnsMetadata> resultsByName = schemaNames.stream()
.flatMap(schema -> listRelationColumnsMetadata(session, client, schema, projectId))
.collect(toImmutableMap(RelationColumnsMetadata::name, Functions.identity(), (first, _) -> {
log.debug("Filtered out [%s] from list of tables due to ambiguous name", first.name());
return null;
}));
return relationFilter.apply(resultsByName.keySet()).stream()
.map(resultsByName::get)
.iterator();
}

private static Stream<RelationColumnsMetadata> listRelationColumnsMetadata(ConnectorSession session, BigQueryClient client, String schema, String projectId)
{
try {
return client.listRelationColumnsMetadata(session, client, projectId, schema);
}
catch (BigQueryException e) {
if (e.getCode() == 404) {
log.debug("Dataset disappeared during listing operation: %s", schema);
return Stream.empty();
}
throw new TrinoException(BIGQUERY_LISTING_TABLE_ERROR, "Failed to retrieve tables from BigQuery", e);
}
}

@Override
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Expand All @@ -278,7 +323,7 @@ public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession
}).orElseGet(() -> listSchemaNames(session));
Map<SchemaTableName, RelationCommentMetadata> resultsByName = schemaNames.stream()
.flatMap(schema -> listRelationCommentMetadata(session, client, schema))
.collect(toImmutableMap(RelationCommentMetadata::name, Functions.identity(), (first, second) -> {
.collect(toImmutableMap(RelationCommentMetadata::name, Functions.identity(), (first, _) -> {
log.debug("Filtered out [%s] from list of tables due to ambiguous name", first.name());
return null;
}));
Expand Down
Loading