Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,39 @@ the following features:
* :doc:`/sql/create-schema`
* :doc:`/sql/drop-schema`

Table functions
---------------

The connector provides specific :doc:`table functions </functions/table>` to
access BigQuery.

.. _bigquery-query-function:

``query(varchar) -> table``
^^^^^^^^^^^^^^^^^^^^^^^^^^^

The ``query`` function allows you to query the underlying BigQuery directly. It
requires syntax native to BigQuery, because the full query is pushed down and
processed by BigQuery. This can be useful for accessing native features which are
not available in Trino or for improving query performance in situations where
running a query natively may be faster.

For example, group and concatenate all employee IDs by manager ID::

SELECT
*
FROM
TABLE(
bigquery.system.query(
query => 'SELECT
manager_id, STRING_AGG(employee_id)
FROM
company.employees
GROUP BY
manager_id'
)
);

FAQ
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatistics.QueryStatistics;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
Expand All @@ -43,13 +45,16 @@
import java.util.Set;
import java.util.function.Supplier;

import static com.google.cloud.bigquery.JobStatistics.QueryStatistics.StatementType.SELECT;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -211,6 +216,7 @@ Job create(JobInfo jobInfo)

public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposition createDisposition)
{
log.debug("Execute query: %s", sql);
try {
return bigQuery.query(QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(useQueryResultsCache)
Expand All @@ -223,29 +229,34 @@ public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposi
}
}

public TableResult query(TableId table, List<String> requiredColumns, Optional<String> filter, boolean useQueryResultsCache, CreateDisposition createDisposition)
public Schema getSchema(String sql)
{
String sql = selectSql(table, requiredColumns, filter);
log.debug("Execute query: %s", sql);
log.debug("Get schema from query: %s", sql);
JobInfo jobInfo = JobInfo.of(QueryJobConfiguration.newBuilder(sql).setDryRun(true).build());

JobStatistics statistics;
try {
return bigQuery.query(QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(useQueryResultsCache)
.setCreateDisposition(createDisposition)
.build());
statistics = bigQuery.create(jobInfo).getStatistics();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e);
catch (BigQueryException e) {
throw new TrinoException(BIGQUERY_INVALID_STATEMENT, "Failed to get schema for query: " + sql, e);
}

QueryStatistics queryStatistics = (QueryStatistics) statistics;
if (!queryStatistics.getStatementType().equals(SELECT)) {
throw new TrinoException(BIGQUERY_INVALID_STATEMENT, "Unsupported statement type: " + queryStatistics.getStatementType());
Comment thread
ebyhr marked this conversation as resolved.
}

return requireNonNull(queryStatistics.getSchema(), "Cannot determine schema for query");
}

private String selectSql(TableId table, List<String> requiredColumns, Optional<String> filter)
public static String selectSql(TableId table, List<String> requiredColumns, Optional<String> filter)
{
String columns = requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(","));
return selectSql(table, columns, filter);
}

private String selectSql(TableId table, String formattedColumns, Optional<String> filter)
private static String selectSql(TableId table, String formattedColumns, Optional<String> filter)
{
String tableName = fullTableName(table);
String query = format("SELECT %s FROM `%s`", formattedColumns, tableName);
Expand All @@ -270,7 +281,7 @@ public String selectSql(TableId table, String formattedColumns)
return format("SELECT %s FROM `%s`", formattedColumns, tableName);
}

private String fullTableName(TableId remoteTableId)
private static String fullTableName(TableId remoteTableId)
{
String remoteSchemaName = remoteTableId.getDataset();
String remoteTableName = remoteTableId.getTable();
Expand All @@ -280,13 +291,20 @@ private String fullTableName(TableId remoteTableId)

public List<BigQueryColumnHandle> getColumns(BigQueryTableHandle tableHandle)
{
TableInfo tableInfo = getTable(tableHandle.getRemoteTableName().toTableId())
.orElseThrow(() -> new TableNotFoundException(tableHandle.getSchemaTableName()));
if (tableHandle.getProjectedColumns().isPresent()) {
return tableHandle.getProjectedColumns().get().stream()
.map(column -> (BigQueryColumnHandle) column)
.collect(toImmutableList());
}
checkArgument(tableHandle.isNamedRelation(), "Cannot get columns for %s", tableHandle);

TableInfo tableInfo = getTable(tableHandle.asPlainTable().getRemoteTableName().toTableId())
.orElseThrow(() -> new TableNotFoundException(tableHandle.asPlainTable().getSchemaTableName()));
Schema schema = tableInfo.getDefinition().getSchema();
if (schema == null) {
throw new TableNotFoundException(
tableHandle.getSchemaTableName(),
format("Table '%s' has no schema", tableHandle.getSchemaTableName()));
tableHandle.asPlainTable().getSchemaTableName(),
format("Table '%s' has no schema", tableHandle.asPlainTable().getSchemaTableName()));
}
return schema.getFields()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;

Expand All @@ -42,18 +43,21 @@ public class BigQueryConnector
private final BigQueryMetadata metadata;
private final BigQuerySplitManager splitManager;
private final BigQueryPageSourceProvider pageSourceProvider;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public BigQueryConnector(
BigQueryMetadata metadata,
BigQuerySplitManager splitManager,
BigQueryPageSourceProvider pageSourceProvider,
Set<ConnectorTableFunction> connectorTableFunctions,
Set<SessionPropertiesProvider> sessionPropertiesProviders)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.connectorTableFunctions = requireNonNull(connectorTableFunctions, "connectorTableFunctions is null");
this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
.collect(toImmutableList());
Expand Down Expand Up @@ -85,6 +89,12 @@ public ConnectorPageSourceProvider getPageSourceProvider()
return pageSourceProvider;
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.bigquery.ptf.Query;
import io.trino.spi.NodeManager;
import io.trino.spi.ptf.ConnectorTableFunction;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
Expand Down Expand Up @@ -54,6 +56,7 @@ protected void setup(Binder binder)
binder.bind(BigQueryPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(ViewMaterializationCache.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(BigQueryConfig.class);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(BigQuerySessionProperties.class).in(Scopes.SINGLETON);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum BigQueryErrorCode
BIGQUERY_AMBIGUOUS_OBJECT_NAME(3, EXTERNAL),
BIGQUERY_LISTING_DATASET_ERROR(4, EXTERNAL),
BIGQUERY_UNSUPPORTED_OPERATION(5, USER_ERROR),
BIGQUERY_INVALID_STATEMENT(6, USER_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
import com.google.common.collect.Streams;
import io.airlift.log.Logger;
import io.trino.plugin.bigquery.BigQueryClient.RemoteDatabaseObject;
import io.trino.plugin.bigquery.ptf.Query.QueryHandle;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnSchema;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableSchema;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
Expand All @@ -49,9 +52,11 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
Expand All @@ -78,6 +83,7 @@
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE;
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_TIME;
import static io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType.INGESTION;
import static io.trino.plugin.bigquery.BigQueryTableHandle.getPartitionType;
import static io.trino.plugin.bigquery.BigQueryType.toField;
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -221,7 +227,12 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
return null;
}

return new BigQueryTableHandle(schemaTableName, new RemoteTableName(tableInfo.get().getTableId()), tableInfo.get());
return new BigQueryTableHandle(new BigQueryNamedRelationHandle(
schemaTableName,
new RemoteTableName(tableInfo.get().getTableId()),
tableInfo.get().getDefinition().getType().toString(),
getPartitionType(tableInfo.get().getDefinition()),
Optional.ofNullable(tableInfo.get().getDescription())));
Comment thread
ebyhr marked this conversation as resolved.
}

private ConnectorTableHandle getTableHandleIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName)
Expand All @@ -240,7 +251,12 @@ private ConnectorTableHandle getTableHandleIgnoringConflicts(ConnectorSession se
return null;
}

return new BigQueryTableHandle(schemaTableName, new RemoteTableName(tableInfo.get().getTableId()), tableInfo.get());
return new BigQueryTableHandle(new BigQueryNamedRelationHandle(
schemaTableName,
new RemoteTableName(tableInfo.get().getTableId()),
tableInfo.get().getDefinition().getType().toString(),
getPartitionType(tableInfo.get().getDefinition()),
Optional.ofNullable(tableInfo.get().getDescription())));
}

@Override
Expand All @@ -254,11 +270,13 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
for (BigQueryColumnHandle column : client.getColumns(handle)) {
columnMetadata.add(column.getColumnMetadata());
}
if (handle.getPartitionType().isPresent() && handle.getPartitionType().get() == INGESTION) {
columnMetadata.add(PARTITION_DATE.getColumnMetadata());
columnMetadata.add(PARTITION_TIME.getColumnMetadata());
if (handle.isNamedRelation()) {
if (handle.asPlainTable().getPartitionType().isPresent() && handle.asPlainTable().getPartitionType().get() == INGESTION) {
columnMetadata.add(PARTITION_DATE.getColumnMetadata());
columnMetadata.add(PARTITION_TIME.getColumnMetadata());
}
}
return new ConnectorTableMetadata(handle.getSchemaTableName(), columnMetadata.build(), ImmutableMap.of(), handle.getComment());
return new ConnectorTableMetadata(getSchemaTableName(handle), columnMetadata.build(), ImmutableMap.of(), getTableComment(handle));
}

@Override
Expand Down Expand Up @@ -303,9 +321,16 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
log.debug("getColumnHandles(session=%s, tableHandle=%s)", session, tableHandle);

BigQueryTableHandle table = (BigQueryTableHandle) tableHandle;
if (table.getProjectedColumns().isPresent()) {
return table.getProjectedColumns().get().stream()
.collect(toImmutableMap(columnHandle -> ((BigQueryColumnHandle) columnHandle).getName(), identity()));
}

checkArgument(table.isNamedRelation(), "Cannot get columns for %s", tableHandle);

ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
columns.addAll(client.getColumns(table));
if (table.getPartitionType().isPresent() && table.getPartitionType().get() == INGESTION) {
if (table.asPlainTable().getPartitionType().isPresent() && table.asPlainTable().getPartitionType().get() == INGESTION) {
columns.add(PARTITION_DATE.getColumnHandle());
columns.add(PARTITION_TIME.getColumnHandle());
}
Expand Down Expand Up @@ -410,10 +435,10 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
{
BigQueryClient client = bigQueryClientFactory.create(session);
BigQueryTableHandle bigQueryTable = (BigQueryTableHandle) tableHandle;
if (isWildcardTable(TableDefinition.Type.valueOf(bigQueryTable.getType()), bigQueryTable.getRemoteTableName().getTableName())) {
if (isWildcardTable(TableDefinition.Type.valueOf(bigQueryTable.asPlainTable().getType()), bigQueryTable.asPlainTable().getRemoteTableName().getTableName())) {
throw new TrinoException(BIGQUERY_UNSUPPORTED_OPERATION, "This connector does not support dropping wildcard tables");
}
TableId tableId = bigQueryTable.getRemoteTableName().toTableId();
TableId tableId = bigQueryTable.asPlainTable().getRemoteTableName().toTableId();
client.dropTable(tableId);
}

Expand Down Expand Up @@ -468,6 +493,24 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
return Optional.of(new ConstraintApplicationResult<>(updatedHandle, constraint.getSummary(), false));
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
Comment thread
ebyhr marked this conversation as resolved.
{
if (!(handle instanceof QueryHandle)) {
return Optional.empty();
}

ConnectorTableHandle tableHandle = ((QueryHandle) handle).getTableHandle();
ConnectorTableSchema tableSchema = getTableSchema(session, tableHandle);
Map<String, ColumnHandle> columnHandlesByName = getColumnHandles(session, tableHandle);
List<ColumnHandle> columnHandles = tableSchema.getColumns().stream()
.map(ColumnSchema::getName)
.map(columnHandlesByName::get)
.collect(toImmutableList());

return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles));
}

private static boolean containSameElements(Iterable<? extends ColumnHandle> first, Iterable<? extends ColumnHandle> second)
{
return ImmutableSet.copyOf(first).equals(ImmutableSet.copyOf(second));
Expand All @@ -486,6 +529,21 @@ private static SchemaTableName getViewDefinitionSourceTableName(SchemaTableName
table.getTableName().substring(0, table.getTableName().length() - VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX.length()));
}

private static SchemaTableName getSchemaTableName(BigQueryTableHandle handle)
{
return handle.isNamedRelation()
? handle.getRequiredNamedRelation().getSchemaTableName()
// TODO (https://github.com/trinodb/trino/issues/6694) SchemaTableName should not be required for synthetic ConnectorTableHandle
: new SchemaTableName("_generated", "_generated_query");
}

private static Optional<String> getTableComment(BigQueryTableHandle handle)
{
return handle.isNamedRelation()
? handle.getRequiredNamedRelation().getComment()
: Optional.empty();
}

private static SystemTable createSystemTable(ConnectorTableMetadata metadata, Function<TupleDomain<Integer>, RecordCursor> cursor)
{
return new SystemTable()
Expand Down
Loading