diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java index fd6910da34f1..24697d91c9c6 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.bigquery; +import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.trino.spi.PageBuilder; import io.trino.spi.TrinoException; @@ -82,18 +83,12 @@ public class BigQueryArrowToPageConverter private final BigQueryTypeManager typeManager; private final VectorSchemaRoot root; private final VectorLoader loader; - private final List columnTypes; - private final List columnNames; + private final List columns; public BigQueryArrowToPageConverter(BigQueryTypeManager typeManager, BufferAllocator allocator, Schema schema, List columns) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.columnTypes = requireNonNull(columns, "columns is null").stream() - .map(BigQueryColumnHandle::trinoType) - .collect(toImmutableList()); - this.columnNames = columns.stream() - .map(BigQueryColumnHandle::name) - .collect(toImmutableList()); + this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); List vectors = schema.getFields().stream() .map(field -> field.createVector(allocator)) .collect(toImmutableList()); @@ -106,17 +101,34 @@ public void convert(PageBuilder pageBuilder, ArrowRecordBatch batch) loader.load(batch); pageBuilder.declarePositions(root.getRowCount()); - for (int column = 0; column < columnTypes.size(); column++) { + for (int column = 0; column < columns.size(); column++) { + BigQueryColumnHandle columnHandle = columns.get(column); + FieldVector fieldVector = getFieldVector(root, columnHandle); convertType(pageBuilder.getBlockBuilder(column), - columnTypes.get(column), - root.getVector(toBigQueryColumnName(columnNames.get(column))), + columnHandle.trinoType(), + fieldVector, 0, - root.getVector(toBigQueryColumnName(columnNames.get(column))).getValueCount()); + fieldVector.getValueCount()); } root.clear(); } + private static FieldVector getFieldVector(VectorSchemaRoot root, BigQueryColumnHandle columnHandle) + { + FieldVector fieldVector = root.getVector(toBigQueryColumnName(columnHandle.name())); + + for (String dereferenceName : columnHandle.dereferenceNames()) { + for (FieldVector child : fieldVector.getChildrenFromFields()) { + if (child.getField().getName().equals(dereferenceName)) { + fieldVector = child; + break; + } + } + } + return fieldVector; + } + private void convertType(BlockBuilder output, Type type, FieldVector vector, int offset, int length) { Class javaType = type.getJavaType(); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index a5a19c0169e0..3b6203e30188 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -33,6 +33,7 @@ import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TableResult; import com.google.cloud.http.BaseHttpServiceException; +import com.google.common.base.Joiner; import com.google.common.cache.Cache; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -246,7 +247,7 @@ public Optional getTable(TableId remoteTableId) } } - public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List requiredColumns, Optional filter) + public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List requiredColumns, Optional filter) { String query = selectSql(remoteTableId.getTableId(), requiredColumns, filter); log.debug("query is %s", query); @@ -466,10 +467,19 @@ public TableId getDestinationTable(String sql) return requireNonNull(((QueryJobConfiguration) jobConfiguration).getDestinationTable(), "Cannot determine destination table for query"); } - public static String selectSql(TableId table, List requiredColumns, Optional filter) + public static String selectSql(TableId table, List requiredColumns, Optional filter) { - String columns = requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(",")); - return selectSql(table, columns, filter); + return selectSql(table, + requiredColumns.stream() + .map(column -> Joiner.on('.') + .join(ImmutableList.builder() + .add(format("`%s`", column.name())) + .addAll(column.dereferenceNames().stream() + .map(dereferenceName -> format("`%s`", dereferenceName)) + .collect(toImmutableList())) + .build())) + .collect(joining(",")), + filter); } public static String selectSql(TableId table, String formattedColumns, Optional filter) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java index e529e0581379..699aaf6b40e9 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; @@ -30,6 +31,7 @@ public record BigQueryColumnHandle( String name, + List dereferenceNames, Type trinoType, StandardSQLTypeName bigqueryType, boolean isPushdownSupported, @@ -44,6 +46,7 @@ public record BigQueryColumnHandle( public BigQueryColumnHandle { requireNonNull(name, "name is null"); + dereferenceNames = ImmutableList.copyOf(requireNonNull(dereferenceNames, "dereferenceNames is null")); requireNonNull(trinoType, "trinoType is null"); requireNonNull(bigqueryType, "bigqueryType is null"); requireNonNull(mode, "mode is null"); @@ -62,6 +65,16 @@ public ColumnMetadata getColumnMetadata() .build(); } + @JsonIgnore + public String getQualifiedName() + { + return Joiner.on('.') + .join(ImmutableList.builder() + .add(name) + .addAll(dereferenceNames) + .build()); + } + @JsonIgnore public long getRetainedSizeInBytes() { diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java index f97bada6ec4a..2e7d34c3ad2e 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java @@ -63,6 +63,7 @@ public class BigQueryConfig private String queryLabelName; private String queryLabelFormat; private boolean proxyEnabled; + private boolean projectionPushDownEnabled = true; private int metadataParallelism = 2; public Optional getProjectId() @@ -342,6 +343,19 @@ public BigQueryConfig setProxyEnabled(boolean proxyEnabled) return this; } + public boolean isProjectionPushdownEnabled() + { + return projectionPushDownEnabled; + } + + @Config("bigquery.projection-pushdown-enabled") + @ConfigDescription("Dereference push down for ROW type") + public BigQueryConfig setProjectionPushdownEnabled(boolean projectionPushDownEnabled) + { + this.projectionPushDownEnabled = projectionPushDownEnabled; + return this; + } + @Min(1) @Max(32) public int getMetadataParallelism() diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index b1fee28fc5ea..bb264e69a528 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -32,15 +32,18 @@ import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Ordering; import com.google.common.io.Closer; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.log.Logger; import io.airlift.slice.Slice; +import io.trino.plugin.base.projection.ApplyProjectionUtil; import io.trino.plugin.bigquery.BigQueryClient.RemoteDatabaseObject; import io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType; import io.trino.plugin.bigquery.ptf.Query.QueryHandle; @@ -78,18 +81,22 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.expression.Variable; import io.trino.spi.function.table.ConnectorTableFunctionHandle; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.type.BigintType; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; import org.json.JSONArray; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -108,16 +115,22 @@ import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.Futures.allAsList; import static io.trino.plugin.base.TemporaryTables.generateTemporaryTableName; +import static io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation; +import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; +import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_TABLE_ERROR; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_UNSUPPORTED_OPERATION; import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE; import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_TIME; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isProjectionPushdownEnabled; import static io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType.INGESTION; import static io.trino.plugin.bigquery.BigQueryTableHandle.getPartitionType; import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; @@ -138,6 +151,8 @@ public class BigQueryMetadata { private static final Logger log = Logger.get(BigQueryMetadata.class); private static final Type TRINO_PAGE_SINK_ID_COLUMN_TYPE = BigintType.BIGINT; + private static final Ordering COLUMN_HANDLE_ORDERING = Ordering + .from(Comparator.comparingInt(columnHandle -> columnHandle.dereferenceNames().size())); static final int DEFAULT_NUMERIC_TYPE_PRECISION = 38; static final int DEFAULT_NUMERIC_TYPE_SCALE = 9; @@ -771,7 +786,7 @@ public Optional finishInsert( @Override public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) { - return new BigQueryColumnHandle("$merge_row_id", BIGINT, INT64, true, Field.Mode.REQUIRED, ImmutableList.of(), null, true); + return new BigQueryColumnHandle("$merge_row_id", ImmutableList.of(), BIGINT, INT64, true, Field.Mode.REQUIRED, ImmutableList.of(), null, true); } @Override @@ -882,24 +897,150 @@ public Optional> applyProjecti log.debug("applyProjection(session=%s, handle=%s, projections=%s, assignments=%s)", session, handle, projections, assignments); BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle; + if (!isProjectionPushdownEnabled(session)) { + List newColumns = ImmutableList.copyOf(assignments.values()); + if (bigQueryTableHandle.projectedColumns().isPresent() && containSameElements(newColumns, bigQueryTableHandle.projectedColumns().get())) { + return Optional.empty(); + } - List newColumns = ImmutableList.copyOf(assignments.values()); + ImmutableList.Builder projectedColumns = ImmutableList.builder(); + ImmutableList.Builder assignmentList = ImmutableList.builder(); + assignments.forEach((name, column) -> { + BigQueryColumnHandle columnHandle = (BigQueryColumnHandle) column; + projectedColumns.add(columnHandle); + assignmentList.add(new Assignment(name, column, columnHandle.trinoType())); + }); - if (bigQueryTableHandle.projectedColumns().isPresent() && containSameElements(newColumns, bigQueryTableHandle.projectedColumns().get())) { - return Optional.empty(); + bigQueryTableHandle = bigQueryTableHandle.withProjectedColumns(projectedColumns.build()); + + return Optional.of(new ProjectionApplicationResult<>(bigQueryTableHandle, projections, assignmentList.build(), false)); } - ImmutableList.Builder projectedColumns = ImmutableList.builder(); - ImmutableList.Builder assignmentList = ImmutableList.builder(); - assignments.forEach((name, column) -> { - BigQueryColumnHandle columnHandle = (BigQueryColumnHandle) column; - projectedColumns.add(columnHandle); - assignmentList.add(new Assignment(name, column, columnHandle.trinoType())); - }); + // Create projected column representations for supported sub expressions. Simple column references and chain of + // dereferences on a variable are supported right now. + Set projectedExpressions = projections.stream() + .flatMap(expression -> extractSupportedProjectedColumns(expression).stream()) + .collect(toImmutableSet()); + + Map columnProjections = projectedExpressions.stream() + .collect(toImmutableMap(identity(), ApplyProjectionUtil::createProjectedColumnRepresentation)); + + // all references are simple variables + if (columnProjections.values().stream().allMatch(ProjectedColumnRepresentation::isVariable)) { + Set projectedColumns = ImmutableSet.copyOf(projectParentColumns(assignments.values().stream() + .map(BigQueryColumnHandle.class::cast) + .collect(toImmutableList()))); + if (bigQueryTableHandle.projectedColumns().isPresent() && containSameElements(projectedColumns, bigQueryTableHandle.projectedColumns().get())) { + return Optional.empty(); + } + List assignmentsList = assignments.entrySet().stream() + .map(assignment -> new Assignment( + assignment.getKey(), + assignment.getValue(), + ((BigQueryColumnHandle) assignment.getValue()).trinoType())) + .collect(toImmutableList()); + + return Optional.of(new ProjectionApplicationResult<>( + bigQueryTableHandle.withProjectedColumns(ImmutableList.copyOf(projectedColumns)), + projections, + assignmentsList, + false)); + } + + Map newAssignments = new HashMap<>(); + ImmutableMap.Builder newVariablesBuilder = ImmutableMap.builder(); + ImmutableSet.Builder projectedColumnsBuilder = ImmutableSet.builder(); + + for (Map.Entry entry : columnProjections.entrySet()) { + ConnectorExpression expression = entry.getKey(); + ProjectedColumnRepresentation projectedColumn = entry.getValue(); - bigQueryTableHandle = bigQueryTableHandle.withProjectedColumns(projectedColumns.build()); + BigQueryColumnHandle baseColumnHandle = (BigQueryColumnHandle) assignments.get(projectedColumn.getVariable().getName()); + BigQueryColumnHandle projectedColumnHandle = createProjectedColumnHandle(baseColumnHandle, projectedColumn.getDereferenceIndices(), expression.getType()); + String projectedColumnName = projectedColumnHandle.getQualifiedName(); + + Variable projectedColumnVariable = new Variable(projectedColumnName, expression.getType()); + Assignment newAssignment = new Assignment(projectedColumnName, projectedColumnHandle, expression.getType()); + newAssignments.putIfAbsent(projectedColumnName, newAssignment); + + newVariablesBuilder.put(expression, projectedColumnVariable); + projectedColumnsBuilder.add(projectedColumnHandle); + } + + // Modify projections to refer to new variables + Map newVariables = newVariablesBuilder.buildOrThrow(); + List newProjections = projections.stream() + .map(expression -> replaceWithNewVariables(expression, newVariables)) + .collect(toImmutableList()); + + List outputAssignments = newAssignments.values().stream().collect(toImmutableList()); + return Optional.of(new ProjectionApplicationResult<>( + bigQueryTableHandle.withProjectedColumns(projectParentColumns(ImmutableList.copyOf(projectedColumnsBuilder.build()))), + newProjections, + outputAssignments, + false)); + } - return Optional.of(new ProjectionApplicationResult<>(bigQueryTableHandle, projections, assignmentList.build(), false)); + /** + * Creates a set of parent columns for the input projected columns. For example, + * if input {@param columns} include columns "a.b" and "a.b.c", then they will be projected from a single column "a.b". + */ + @VisibleForTesting + static List projectParentColumns(List columnHandles) + { + List sortedColumnHandles = COLUMN_HANDLE_ORDERING.sortedCopy(columnHandles); + List parentColumns = new ArrayList<>(); + for (BigQueryColumnHandle column : sortedColumnHandles) { + if (!parentColumnExists(parentColumns, column)) { + parentColumns.add(column); + } + } + return parentColumns; + } + + private static boolean parentColumnExists(List existingColumns, BigQueryColumnHandle column) + { + for (BigQueryColumnHandle existingColumn : existingColumns) { + List existingColumnDereferenceNames = existingColumn.dereferenceNames(); + verify( + column.dereferenceNames().size() >= existingColumnDereferenceNames.size(), + "Selected column's dereference size must be greater than or equal to the existing column's dereference size"); + if (existingColumn.name().equals(column.name()) + && column.dereferenceNames().subList(0, existingColumnDereferenceNames.size()).equals(existingColumnDereferenceNames)) { + return true; + } + } + return false; + } + + private BigQueryColumnHandle createProjectedColumnHandle(BigQueryColumnHandle baseColumn, List indices, Type projectedColumnType) + { + if (indices.isEmpty()) { + return baseColumn; + } + + ImmutableList.Builder dereferenceNamesBuilder = ImmutableList.builder(); + dereferenceNamesBuilder.addAll(baseColumn.dereferenceNames()); + + Type type = baseColumn.trinoType(); + for (int index : indices) { + checkArgument(type instanceof RowType, "type should be Row type"); + RowType rowType = (RowType) type; + RowType.Field field = rowType.getFields().get(index); + dereferenceNamesBuilder.add(field.getName() + .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "ROW type does not have field names declared: " + rowType))); + type = field.getType(); + } + return new BigQueryColumnHandle( + baseColumn.name(), + dereferenceNamesBuilder.build(), + projectedColumnType, + typeManager.toStandardSqlTypeName(projectedColumnType), + baseColumn.isPushdownSupported(), + baseColumn.mode(), + baseColumn.subColumns(), + baseColumn.description(), + baseColumn.hidden()); } @Override diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index 23aeac6bcea2..b4af27b1038f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -68,10 +70,12 @@ public ConnectorPageSource createPageSource( log.debug("createPageSource(transaction=%s, session=%s, split=%s, table=%s, columns=%s)", transaction, session, split, table, columns); BigQuerySplit bigQuerySplit = (BigQuerySplit) split; - // We expect columns list requested here to match list passed to ConnectorMetadata.applyProjection. - checkArgument(bigQuerySplit.getColumns().isEmpty() || bigQuerySplit.getColumns().equals(columns), - "Requested columns %s do not match list in split %s", columns, bigQuerySplit.getColumns()); - + Set projectedColumnNames = bigQuerySplit.getColumns().stream().map(BigQueryColumnHandle::name).collect(Collectors.toSet()); + // because we apply logic (download only parent columns - BigQueryMetadata.projectParentColumns) + // columns and split columns could differ + columns.stream() + .map(BigQueryColumnHandle.class::cast) + .forEach(column -> checkArgument(projectedColumnNames.contains(column.name()), "projected columns should contain all reader columns")); if (bigQuerySplit.representsEmptyProjection()) { return new BigQueryEmptyProjectionPageSource(bigQuerySplit.getEmptyRowsToGenerate()); } @@ -121,8 +125,7 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ typeManager, bigQueryClientFactory.create(session), table, - columnHandles.stream().map(BigQueryColumnHandle::name).collect(toImmutableList()), - columnHandles.stream().map(BigQueryColumnHandle::trinoType).collect(toImmutableList()), + columnHandles, filter); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java index db7078f59d31..e0d2bd2fc800 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java @@ -54,6 +54,7 @@ public BigQueryColumnHandle getColumnHandle() { return new BigQueryColumnHandle( trinoColumnName, + ImmutableList.of(), trinoType, bigqueryType, true, diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 2c0f6ef1233f..75bfbb50ef8b 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -45,8 +45,8 @@ import java.util.List; import java.util.Optional; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.bigquery.BigQueryClient.selectSql; import static io.trino.plugin.bigquery.BigQueryTypeManager.toTrinoTimestamp; @@ -75,9 +75,9 @@ public class BigQueryQueryPageSource .toFormatter(); private final BigQueryTypeManager typeManager; - private final List columnNames; - private final List columnTypes; + private final List columnHandles; private final PageBuilder pageBuilder; + private final boolean isQueryFunction; private final TableResult tableResult; private boolean finished; @@ -87,34 +87,35 @@ public BigQueryQueryPageSource( BigQueryTypeManager typeManager, BigQueryClient client, BigQueryTableHandle table, - List columnNames, - List columnTypes, + List columnHandles, Optional filter) { requireNonNull(client, "client is null"); requireNonNull(table, "table is null"); - requireNonNull(columnNames, "columnNames is null"); - requireNonNull(columnTypes, "columnTypes is null"); requireNonNull(filter, "filter is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); - checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes sizes don't match"); - this.columnNames = ImmutableList.copyOf(columnNames); - this.columnTypes = ImmutableList.copyOf(columnTypes); - this.pageBuilder = new PageBuilder(columnTypes); - String sql = buildSql(table, client.getProjectId(), ImmutableList.copyOf(columnNames), filter); + this.columnHandles = requireNonNull(columnHandles, "columnHandles is null"); + this.pageBuilder = new PageBuilder(columnHandles.stream().map(BigQueryColumnHandle::trinoType).collect(toImmutableList())); + this.isQueryFunction = table.relationHandle() instanceof BigQueryQueryRelationHandle; + String sql = buildSql( + table, + client.getProjectId(), + ImmutableList.copyOf(columnHandles), + filter); this.tableResult = client.executeQuery(session, sql); } - private static String buildSql(BigQueryTableHandle table, String projectId, List columnNames, Optional filter) + private String buildSql(BigQueryTableHandle table, String projectId, List columns, Optional filter) { - if (table.relationHandle() instanceof BigQueryQueryRelationHandle queryRelationHandle) { + if (isQueryFunction) { + BigQueryQueryRelationHandle queryRelationHandle = (BigQueryQueryRelationHandle) table.relationHandle(); if (filter.isEmpty()) { return queryRelationHandle.getQuery(); } return "SELECT * FROM (" + queryRelationHandle.getQuery() + " ) WHERE " + filter.get(); } TableId tableId = TableId.of(projectId, table.asPlainTable().getRemoteTableName().datasetName(), table.asPlainTable().getRemoteTableName().tableName()); - return selectSql(tableId, ImmutableList.copyOf(columnNames), filter); + return selectSql(tableId, ImmutableList.copyOf(columns), filter); } @Override @@ -147,9 +148,11 @@ public Page getNextPage() verify(pageBuilder.isEmpty()); for (FieldValueList record : tableResult.iterateAll()) { pageBuilder.declarePosition(); - for (int column = 0; column < columnTypes.size(); column++) { + for (int column = 0; column < columnHandles.size(); column++) { + BigQueryColumnHandle columnHandle = columnHandles.get(column); BlockBuilder output = pageBuilder.getBlockBuilder(column); - appendTo(columnTypes.get(column), record.get(columnNames.get(column)), output); + FieldValue fieldValue = isQueryFunction ? record.get(columnHandle.name()) : record.get(column); + appendTo(columnHandle.trinoType(), fieldValue, output); } } finished = true; diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java index 009ea5ea9237..894b371739a8 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java @@ -32,6 +32,7 @@ public final class BigQuerySessionProperties private static final String VIEW_MATERIALIZATION_WITH_FILTER = "view_materialization_with_filter"; private static final String QUERY_RESULTS_CACHE_ENABLED = "query_results_cache_enabled"; private static final String CREATE_DISPOSITION_TYPE = "create_disposition_type"; + private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; private final List> sessionProperties; @@ -60,6 +61,11 @@ public BigQuerySessionProperties(BigQueryConfig config) CreateDisposition.class, CreateDisposition.CREATE_IF_NEEDED, // https://cloud.google.com/bigquery/docs/cached-results true)) + .add(booleanProperty( + PROJECTION_PUSHDOWN_ENABLED, + "Dereference push down for STRUCT type", + config.isProjectionPushdownEnabled(), + false)) .build(); } @@ -88,4 +94,9 @@ public static CreateDisposition createDisposition(ConnectorSession session) { return session.getProperty(CREATE_DISPOSITION_TYPE, CreateDisposition.class); } + + public static boolean isProjectionPushdownEnabled(ConnectorSession session) + { + return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class); + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index 6aa7bec96afb..712f437d51be 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -42,7 +42,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.OptionalInt; @@ -109,7 +108,6 @@ public ConnectorSplitSource getSplits( BigQueryQueryRelationHandle bigQueryQueryRelationHandle = bigQueryTableHandle.getRequiredQueryRelation(); List columns = bigQueryTableHandle.projectedColumns().orElse(ImmutableList.of()); boolean useStorageApi = bigQueryQueryRelationHandle.isUseStorageApi(); - List projectedColumnsNames = getProjectedColumnNames(columns); // projectedColumnsNames can not be used for generating select sql because the query fails if it does not // include a column name. eg: query => 'SELECT 1' @@ -132,7 +130,7 @@ public ConnectorSplitSource getSplits( log.debug("Using Storage API for running query: %s", query); // filter is already used while constructing the select query - ReadSession readSession = createReadSession(session, tableInfo.getTableId(), ImmutableList.copyOf(projectedColumnsNames), Optional.empty()); + ReadSession readSession = createReadSession(session, tableInfo.getTableId(), ImmutableList.copyOf(columns), Optional.empty()); return new FixedSplitSource(readSession.getStreamsList().stream() .map(stream -> BigQuerySplit.forStream(stream.getName(), getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize()))) .collect(toImmutableList())); @@ -163,7 +161,9 @@ private List readFromBigQuery( log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, filter=[%s])", remoteTableId, projectedColumns, filter); List columns = projectedColumns.get(); - List projectedColumnsNames = new ArrayList<>(getProjectedColumnNames(columns)); + List projectedColumnsNames = getProjectedColumnNames(columns); + ImmutableList.Builder projectedColumnHandles = ImmutableList.builder(); + projectedColumnHandles.addAll(columns); if (isWildcardTable(type, remoteTableId.getTable())) { // Storage API doesn't support reading wildcard tables @@ -178,11 +178,11 @@ private List readFromBigQuery( return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); } tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream() - .map(column -> ((BigQueryColumnHandle) column).name()) - .filter(columnName -> !projectedColumnsNames.contains(columnName)) - .forEach(projectedColumnsNames::add)); + .map(BigQueryColumnHandle.class::cast) + .filter(column -> !projectedColumnsNames.contains(column.name())) + .forEach(projectedColumnHandles::add)); } - ReadSession readSession = createReadSession(session, remoteTableId, ImmutableList.copyOf(projectedColumnsNames), filter); + ReadSession readSession = createReadSession(session, remoteTableId, ImmutableList.copyOf(projectedColumnHandles.build()), filter); String schemaString = getSchemaAsString(readSession); return readSession.getStreamsList().stream() @@ -191,10 +191,10 @@ private List readFromBigQuery( } @VisibleForTesting - ReadSession createReadSession(ConnectorSession session, TableId remoteTableId, List projectedColumnsNames, Optional filter) + ReadSession createReadSession(ConnectorSession session, TableId remoteTableId, List columns, Optional filter) { ReadSessionCreator readSessionCreator = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, arrowSerializationEnabled, viewExpiration, maxReadRowsRetries); - return readSessionCreator.create(session, remoteTableId, projectedColumnsNames, filter, nodeManager.getRequiredWorkerNodes().size()); + return readSessionCreator.create(session, remoteTableId, columns, filter, nodeManager.getRequiredWorkerNodes().size()); } private static List getProjectedColumnNames(List columns) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java index e71ed3f835b5..48c6147cd247 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java @@ -87,8 +87,7 @@ public class BigQueryStorageAvroPageSource private final BigQueryReadClient bigQueryReadClient; private final BigQueryTypeManager typeManager; private final BigQuerySplit split; - private final List columnNames; - private final List columnTypes; + private final List columns; private final AtomicLong readBytes; private final PageBuilder pageBuilder; private final Iterator responses; @@ -104,14 +103,10 @@ public BigQueryStorageAvroPageSource( this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.split = requireNonNull(split, "split is null"); this.readBytes = new AtomicLong(); - requireNonNull(columns, "columns is null"); - this.columnNames = columns.stream() - .map(BigQueryColumnHandle::name) - .collect(toImmutableList()); - this.columnTypes = columns.stream() + this.columns = requireNonNull(columns, "columns is null"); + this.pageBuilder = new PageBuilder(columns.stream() .map(BigQueryColumnHandle::trinoType) - .collect(toImmutableList()); - this.pageBuilder = new PageBuilder(columnTypes); + .collect(toImmutableList())); log.debug("Starting to read from %s", split.getStreamName()); responses = new ReadRowsHelper(bigQueryReadClient, split.getStreamName(), maxReadRowsRetries).readRows(); @@ -146,9 +141,10 @@ public Page getNextPage() Iterable records = parse(response); for (GenericRecord record : records) { pageBuilder.declarePosition(); - for (int column = 0; column < columnTypes.size(); column++) { + for (int column = 0; column < columns.size(); column++) { BlockBuilder output = pageBuilder.getBlockBuilder(column); - appendTo(columnTypes.get(column), record.get(toBigQueryColumnName(columnNames.get(column))), output); + BigQueryColumnHandle columnHandle = columns.get(column); + appendTo(columnHandle.trinoType(), getValueRecord(record, columnHandle), output); } } @@ -157,6 +153,23 @@ public Page getNextPage() return page; } + private static Object getValueRecord(GenericRecord record, BigQueryColumnHandle columnHandle) + { + Object valueRecord = record.get(toBigQueryColumnName(columnHandle.name())); + for (String dereferenceName : columnHandle.dereferenceNames()) { + if (valueRecord == null) { + break; + } + if (valueRecord instanceof GenericRecord genericRecord) { + valueRecord = genericRecord.get(dereferenceName); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to extract dereference value from record"); + } + } + return valueRecord; + } + private void appendTo(Type type, Object value, BlockBuilder output) { if (value == null) { diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java index 28d2a3f9eb3a..5785e0ec7b20 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java @@ -248,7 +248,7 @@ private FieldList toFieldList(RowType rowType) return FieldList.of(fields.build()); } - private StandardSQLTypeName toStandardSqlTypeName(Type type) + StandardSQLTypeName toStandardSqlTypeName(Type type) { if (type == BooleanType.BOOLEAN) { return StandardSQLTypeName.BOOL; @@ -392,6 +392,7 @@ public BigQueryColumnHandle toColumnHandle(Field field) ColumnMapping columnMapping = toTrinoType(field).orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + field)); return new BigQueryColumnHandle( field.getName(), + ImmutableList.of(), columnMapping.type(), field.getType().getStandardType(), columnMapping.isPushdownSupported(), diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java index b0c8e9711e47..8a28d3a99a20 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java @@ -75,7 +75,7 @@ public ReadSessionCreator( this.maxCreateReadSessionRetries = maxCreateReadSessionRetries; } - public ReadSession create(ConnectorSession session, TableId remoteTable, List selectedFields, Optional filter, int currentWorkerCount) + public ReadSession create(ConnectorSession session, TableId remoteTable, List selectedFields, Optional filter, int currentWorkerCount) { BigQueryClient client = bigQueryClientFactory.create(session); TableInfo tableDetails = client.getTable(remoteTable) @@ -84,6 +84,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List filteredSelectedFields = selectedFields.stream() + .map(BigQueryColumnHandle::getQualifiedName) .map(BigQueryUtil::toBigQueryColumnName) .collect(toList()); @@ -134,7 +135,7 @@ String toTableResourceName(TableId tableId) private TableInfo getActualTable( BigQueryClient client, TableInfo remoteTable, - List requiredColumns, + List requiredColumns, Optional filter) { TableDefinition tableDefinition = remoteTable.getDefinition(); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index fc4bf46b9eea..58545bdc4d32 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -82,7 +82,6 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_ADD_COLUMN, SUPPORTS_CREATE_MATERIALIZED_VIEW, SUPPORTS_CREATE_VIEW, - SUPPORTS_DEREFERENCE_PUSHDOWN, SUPPORTS_MERGE, SUPPORTS_NEGATIVE_DATE, SUPPORTS_NOT_NULL_CONSTRAINT, diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java index ec9875b77779..5df1f7c6a436 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java @@ -13,14 +13,21 @@ */ package io.trino.plugin.bigquery; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; +import io.trino.Session; +import io.trino.sql.planner.plan.ProjectNode; +import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.Set; +import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; @@ -83,4 +90,183 @@ public void testSelectFailsForColumnName() } } } + + @Override + @Test + public void testProjectionPushdown() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_", + "(id BIGINT, root ROW(f1 BIGINT, f2 BIGINT))", + ImmutableList.of("(1, ROW(1, 2))", "(2, NULl)", "(3, ROW(NULL, 4))"))) { + String selectQuery = "SELECT id, root.f1 FROM " + testTable.getName(); + String expectedResult = "VALUES (BIGINT '1', BIGINT '1'), (BIGINT '2', NULL), (BIGINT '3', NULL)"; + + // With Projection Pushdown enabled + assertThat(query(selectQuery)) + .matches(expectedResult) + .isFullyPushedDown(); + + // With Projection Pushdown disabled + Session sessionWithoutPushdown = sessionWithProjectionPushdownDisabled(getSession()); + assertThat(query(sessionWithoutPushdown, selectQuery)) + .matches(expectedResult) + .isNotFullyPushedDown(ProjectNode.class); + } + } + + @Override + @Test + public void testProjectionWithCaseSensitiveField() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_with_case_sensitive_field_", + "(id INT, a ROW(\"UPPER_CASE\" INT, \"lower_case\" INT, \"MiXeD_cAsE\" INT))", + ImmutableList.of("(1, ROW(2, 3, 4))", "(5, ROW(6, 7, 8))"))) { + // shippriority column is bigint (not integer) in BigQuery connector + String expected = "VALUES (BIGINT '2', BIGINT '3', BIGINT '4'), (BIGINT '6', BIGINT '7', BIGINT '8')"; + assertThat(query("SELECT \"a\".\"UPPER_CASE\", \"a\".\"lower_case\", \"a\".\"MiXeD_cAsE\" FROM " + testTable.getName())) + .matches(expected) + .isFullyPushedDown(); + assertThat(query("SELECT \"a\".\"upper_case\", \"a\".\"lower_case\", \"a\".\"mixed_case\" FROM " + testTable.getName())) + .matches(expected) + .isFullyPushedDown(); + assertThat(query("SELECT \"a\".\"UPPER_CASE\", \"a\".\"LOWER_CASE\", \"a\".\"MIXED_CASE\" FROM " + testTable.getName())) + .matches(expected) + .isFullyPushedDown(); + } + } + + @Override + @Test + public void testProjectionPushdownMultipleRows() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_multiple_rows_", + "(id INT, nested1 ROW(child1 INT, child2 VARCHAR, child3 INT), nested2 ROW(child1 DOUBLE, child2 BOOLEAN, child3 DATE))", + ImmutableList.of( + "(1, ROW(10, 'a', 100), ROW(10.10, true, DATE '2023-04-19'))", + "(2, ROW(20, 'b', 200), ROW(20.20, false, DATE '1990-04-20'))", + "(4, ROW(40, NULL, 400), NULL)", + "(5, NULL, ROW(NULL, true, NULL))"))) { + // Select one field from one row field + assertThat(query("SELECT id, nested1.child1 FROM " + testTable.getName())) + .matches("VALUES (BIGINT '1', BIGINT '10'), (BIGINT '2', BIGINT '20'), (BIGINT '4', BIGINT '40'), (BIGINT '5', NULL)") + .isFullyPushedDown(); + assertThat(query("SELECT nested2.child3, id FROM " + testTable.getName())) + .matches("VALUES (DATE '2023-04-19', BIGINT '1'), (DATE '1990-04-20', BIGINT '2'), (NULL, BIGINT '4'), (NULL, BIGINT '5')") + .isFullyPushedDown(); + + // Select one field each from multiple row fields + assertThat(query("SELECT nested2.child1, id, nested1.child2 FROM " + testTable.getName())) + .skippingTypesCheck() + .matches("VALUES (DOUBLE '10.10', BIGINT '1', 'a'), (DOUBLE '20.20', BIGINT '2', 'b'), (NULL, BIGINT '4', NULL), (NULL, BIGINT '5', NULL)") + .isFullyPushedDown(); + + // Select multiple fields from one row field + assertThat(query("SELECT nested1.child3, id, nested1.child2 FROM " + testTable.getName())) + .skippingTypesCheck() + .matches("VALUES (BIGINT '100', BIGINT '1', 'a'), (BIGINT '200', BIGINT '2', 'b'), (BIGINT '400', BIGINT '4', NULL), (NULL, BIGINT '5', NULL)") + .isFullyPushedDown(); + assertThat(query("SELECT nested2.child2, nested2.child3, id FROM " + testTable.getName())) + .matches("VALUES (true, DATE '2023-04-19' , BIGINT '1'), (false, DATE '1990-04-20', BIGINT '2'), (NULL, NULL, BIGINT '4'), (true, NULL, BIGINT '5')") + .isFullyPushedDown(); + + // Select multiple fields from multiple row fields + assertThat(query("SELECT id, nested2.child1, nested1.child3, nested2.child2, nested1.child1 FROM " + testTable.getName())) + .matches("VALUES (BIGINT '1', DOUBLE '10.10', BIGINT '100', true, BIGINT '10'), (BIGINT '2', DOUBLE '20.20', BIGINT '200', false, BIGINT '20'), (BIGINT '4', NULL, BIGINT '400', NULL, BIGINT '40'), (BIGINT '5', NULL, NULL, true, NULL)") + .isFullyPushedDown(); + + // Select only nested fields + assertThat(query("SELECT nested2.child2, nested1.child3 FROM " + testTable.getName())) + .matches("VALUES (true, BIGINT '100'), (false, BIGINT '200'), (NULL, BIGINT '400'), (true, NULL)") + .isFullyPushedDown(); + } + } + + @Override + @Test + public void testProjectionPushdownReadsLessData() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_reads_less_data_", + "AS SELECT val AS id, CAST(ROW(val + 1, val + 2) AS ROW(leaf1 BIGINT, leaf2 BIGINT)) AS root FROM UNNEST(SEQUENCE(1, 10)) AS t(val)")) { + MaterializedResult expectedResult = computeActual("SELECT val + 2 FROM UNNEST(SEQUENCE(1, 10)) AS t(val)"); + String selectQuery = "SELECT root.leaf2 FROM " + testTable.getName(); + Session sessionWithoutPushdown = sessionWithProjectionPushdownDisabled(getSession()); + + assertQueryStats( + getSession(), + selectQuery, + statsWithPushdown -> { + DataSize physicalInputDataSizeWithPushdown = statsWithPushdown.getPhysicalInputDataSize(); + DataSize processedDataSizeWithPushdown = statsWithPushdown.getProcessedInputDataSize(); + assertQueryStats( + sessionWithoutPushdown, + selectQuery, + statsWithoutPushdown -> { + if (supportsPhysicalPushdown()) { + assertThat(statsWithoutPushdown.getPhysicalInputDataSize()).isGreaterThan(physicalInputDataSizeWithPushdown); + } + else { + // TODO https://github.com/trinodb/trino/issues/17201 + assertThat(statsWithoutPushdown.getPhysicalInputDataSize()).isEqualTo(physicalInputDataSizeWithPushdown); + } + assertThat(statsWithoutPushdown.getProcessedInputDataSize()).isGreaterThan(processedDataSizeWithPushdown); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(expectedResult.getOnlyColumnAsSet())); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(expectedResult.getOnlyColumnAsSet())); + } + } + + @Override + @Test + public void testProjectionPushdownPhysicalInputSize() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_physical_input_size_", + "AS SELECT val AS id, CAST(ROW(val + 1, val + 2) AS ROW(leaf1 BIGINT, leaf2 BIGINT)) AS root FROM UNNEST(SEQUENCE(1, 10)) AS t(val)")) { + // Verify that the physical input size is smaller when reading the root.leaf1 field compared to reading the root field + assertQueryStats( + getSession(), + "SELECT root FROM " + testTable.getName(), + statsWithSelectRootField -> { + assertQueryStats( + getSession(), + "SELECT root.leaf1 FROM " + testTable.getName(), + statsWithSelectLeafField -> { + if (supportsPhysicalPushdown()) { + assertThat(statsWithSelectLeafField.getPhysicalInputDataSize()).isLessThan(statsWithSelectRootField.getPhysicalInputDataSize()); + } + else { + // TODO https://github.com/trinodb/trino/issues/17201 + assertThat(statsWithSelectLeafField.getPhysicalInputDataSize()).isEqualTo(statsWithSelectRootField.getPhysicalInputDataSize()); + } + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(computeActual("SELECT val + 1 FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getOnlyColumnAsSet())); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(computeActual("SELECT ROW(val + 1, val + 2) FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getOnlyColumnAsSet())); + + // Verify that the physical input size is the same when reading the root field compared to reading both the root and root.leaf1 fields + assertQueryStats( + getSession(), + "SELECT root FROM " + testTable.getName(), + statsWithSelectRootField -> { + assertQueryStats( + getSession(), + "SELECT root, root.leaf1 FROM " + testTable.getName(), + statsWithSelectRootAndLeafField -> { + assertThat(statsWithSelectRootAndLeafField.getPhysicalInputDataSize()).isEqualTo(statsWithSelectRootField.getPhysicalInputDataSize()); + }, + results -> assertEqualsIgnoreOrder(results.getMaterializedRows(), computeActual("SELECT ROW(val + 1, val + 2), val + 1 FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getMaterializedRows())); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(computeActual("SELECT ROW(val + 1, val + 2) FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getOnlyColumnAsSet())); + } + } } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java index 51b8a6dc1a7f..8ba85db57dea 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java @@ -54,6 +54,7 @@ public void testDefaults() .setQueryLabelName(null) .setQueryLabelFormat(null) .setProxyEnabled(false) + .setProjectionPushdownEnabled(true) .setMetadataParallelism(2)); } @@ -82,6 +83,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .put("bigquery.job.label-format", "$TRACE_TOKEN") .put("bigquery.rpc-proxy.enabled", "true") .put("bigquery.metadata.parallelism", "31") + .put("bigquery.projection-pushdown-enabled", "false") .buildOrThrow(); BigQueryConfig expected = new BigQueryConfig() @@ -105,6 +107,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .setQueryLabelName("trino_job_name") .setQueryLabelFormat("$TRACE_TOKEN") .setProxyEnabled(true) + .setProjectionPushdownEnabled(false) .setMetadataParallelism(31); assertFullMapping(properties, expected); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java index 770b665508ab..2af3bd99fbf9 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java @@ -15,9 +15,17 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; +import java.util.List; + +import static com.google.cloud.bigquery.Field.Mode.NULLABLE; +import static com.google.cloud.bigquery.StandardSQLTypeName.BIGNUMERIC; +import static io.trino.plugin.bigquery.BigQueryMetadata.projectParentColumns; import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor; +import static io.trino.spi.type.BigintType.BIGINT; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; public class TestBigQueryMetadata @@ -31,4 +39,89 @@ public void testDatasetNotFoundMessage() .isThrownBy(() -> bigQuery.listTables("test_dataset_not_found")) .matches(e -> e.getCode() == 404 && e.getMessage().contains("Not found: Dataset")); } + + @Test + public void testProjectParentColumnsSingleParent() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + testingColumn("a", ImmutableList.of("b")), + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("b", "c"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(1); + assertThat(parentColumns.getFirst()).isEqualTo(parentColumn); + } + + @Test + public void testProjectParentColumnsSingleParentDifferentOrder() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + testingColumn("a", ImmutableList.of("b")), + testingColumn("a", ImmutableList.of("d", "c", "b")), + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("b", "c"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(1); + assertThat(parentColumns.getFirst()).isEqualTo(parentColumn); + } + + @Test + public void testProjectParentColumnsNoParentDifferentOrder() + { + List columns = ImmutableList.of( + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("d", "c", "b"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(2); + } + + @Test + public void testProjectParentColumnsSingleParentSuddenJump() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + testingColumn("a", ImmutableList.of("d", "c", "b"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(1); + assertThat(parentColumns.getFirst()).isEqualTo(parentColumn); + } + + @Test + public void testProjectParentColumnsMultipleParent() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + BigQueryColumnHandle anotherParentColumn = testingColumn("a1", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + anotherParentColumn, + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("b", "c"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(2); + assertThat(parentColumns).containsExactlyInAnyOrder(parentColumn, anotherParentColumn); + } + + private static BigQueryColumnHandle testingColumn(String name, List dereferenceNames) + { + return new BigQueryColumnHandle( + name, + dereferenceNames, + BIGINT, + BIGNUMERIC, + false, + NULLABLE, + ImmutableList.of(), + "description", + false); + } } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java index d1641759ed37..2e0175d79aef 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java @@ -35,7 +35,6 @@ import static com.google.cloud.bigquery.Field.Mode.REQUIRED; import static com.google.cloud.bigquery.StandardSQLTypeName.INT64; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.bigquery.BigQueryFilterQueryBuilder.buildFilter; import static io.trino.plugin.bigquery.ViewMaterializationCache.TEMP_TABLE_PREFIX; import static io.trino.spi.transaction.IsolationLevel.READ_UNCOMMITTED; @@ -81,7 +80,7 @@ void testBigQueryMaterializedView() assertThat(readSession.getTable()).contains(TEMP_TABLE_PREFIX); // Ignore constraints when creating temporary tables by default (view_materialization_with_filter is false) - BigQueryColumnHandle column = new BigQueryColumnHandle("cnt", BIGINT, INT64, true, REQUIRED, ImmutableList.of(), null, false); + BigQueryColumnHandle column = new BigQueryColumnHandle("cnt", ImmutableList.of(), BIGINT, INT64, true, REQUIRED, ImmutableList.of(), null, false); BigQueryTableHandle tableDifferentFilter = new BigQueryTableHandle(table.relationHandle(), TupleDomain.fromFixedValues(ImmutableMap.of(column, new NullableValue(BIGINT, 0L))), table.projectedColumns()); assertThat(createReadSession(session, tableDifferentFilter).getTable()) .isEqualTo(readSession.getTable()); @@ -110,9 +109,7 @@ private ReadSession createReadSession(ConnectorSession session, BigQueryTableHan return splitManager.createReadSession( session, table.asPlainTable().getRemoteTableName().toTableId(), - table.projectedColumns().orElseThrow().stream() - .map(BigQueryColumnHandle::name) - .collect(toImmutableList()), + table.projectedColumns().orElseThrow(), buildFilter(table.constraint())); }