Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
Expand Down Expand Up @@ -142,6 +148,11 @@
<artifactId>trino-hive-formats</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-matching</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static io.trino.plugin.iceberg.aggregation.AggregateExpression.COUNT_AGGREGATE_COLUMN_ID;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.IS_DELETED;
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
Expand Down Expand Up @@ -299,4 +300,9 @@ public boolean isPathColumn()
{
return getColumnIdentity().getId() == FILE_PATH.getId();
}

public boolean isAggregateColumn()
{
return getColumnIdentity().getId() == COUNT_AGGREGATE_COLUMN_ID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class IcebergConfig
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2;
private boolean aggregationPushdownEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally improvement that is not enabled benefits only those who know about it (those who added it + small groups of other people). If there is something preventing the feature from being enabled, it should be code documented as a // TODO code comment, with a link to the issue that's focused solely on that (eg #10974 as it stands today is too broad)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if this doesn't get enable by default, it should be enabled in at least one test class extending from BaseConnectorTest, to get reasonable test coverage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be enabled by default since it's only about counts, which according to spec should be accurate in metadata files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. However, we need to understand what the approach would look like to support other aggregations than count, since the input information, and guarantees, is so much different.


public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -435,4 +436,16 @@ public boolean isStorageSchemaSetWhenHidingIsEnabled()
{
return hideMaterializedViewStorageTable && materializedViewsStorageSchema.isPresent();
}

public boolean isAggregationPushdownEnabled()
{
return aggregationPushdownEnabled;
}

@Config("iceberg.aggregation-pushdown.enabled")
public IcebergConfig setAggregationPushdownEnabled(boolean aggregationPushdownEnabled)
{
this.aggregationPushdownEnabled = aggregationPushdownEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.base.aggregation.AggregateFunctionRewriter;
import io.trino.plugin.base.aggregation.AggregateFunctionRule;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.base.expression.ConnectorExpressionRewriter;
import io.trino.plugin.base.filter.UtcConstraintExtractor;
import io.trino.plugin.base.projection.ApplyProjectionUtil;
import io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveWrittenPartitions;
import io.trino.plugin.hive.metastore.TableInfo;
import io.trino.plugin.iceberg.aggregation.AggregateExpression;
import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer;
import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats;
import io.trino.plugin.iceberg.aggregation.ImplementCountAll;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle;
import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle;
Expand All @@ -51,6 +56,8 @@
import io.trino.spi.ErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.BeginTableExecuteResult;
import io.trino.spi.connector.CatalogHandle;
Expand Down Expand Up @@ -135,6 +142,7 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StatisticsFile;
Expand Down Expand Up @@ -220,6 +228,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
Expand Down Expand Up @@ -327,6 +336,7 @@ public class IcebergMetadata
private final TrinoCatalog catalog;
private final IcebergFileSystemFactory fileSystemFactory;
private final TableStatisticsWriter tableStatisticsWriter;
private final AggregateFunctionRewriter<AggregateExpression, Void> aggregateFunctionRewriter;

private final Map<IcebergTableHandle, TableStatistics> tableStatisticsCache = new ConcurrentHashMap<>();

Expand All @@ -346,6 +356,12 @@ public IcebergMetadata(
this.catalog = requireNonNull(catalog, "catalog is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");

this.aggregateFunctionRewriter = new AggregateFunctionRewriter(
new ConnectorExpressionRewriter<>(ImmutableSet.of()),
ImmutableSet.<AggregateFunctionRule<AggregateExpression, Void>>builder()
.add(new ImplementCountAll())
.build());
}

@Override
Expand Down Expand Up @@ -2653,6 +2669,101 @@ else if (isMetadataColumnId(columnHandle.getId())) {
false));
}

@Override
public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggregation(
ConnectorSession session,
ConnectorTableHandle handle,
List<AggregateFunction> aggregates,
Map<String, ColumnHandle> assignments,
List<List<ColumnHandle>> groupingSets)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) handle;

// Iceberg's metadata cannot be used for aggregation calculation.
// As equality deletes do not reflect at the metadata/count level.
if (hasEqualityDeletes(session, tableHandle)) {
return Optional.empty();
}

if (!isAggregationPushdownEnabled(session)) {
return Optional.empty();
}

// not supporting unenforced predicate
if (!tableHandle.getUnenforcedPredicate().isNone()
&& tableHandle.getUnenforcedPredicate().getDomains().isPresent()
&& !tableHandle.getUnenforcedPredicate().getDomains().get().isEmpty()) {
return Optional.empty();
}

// not supporting group by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty limited. I think it would make sense to support simple grouping sets in the first iteration to future-proof the design.

if (!groupingSets.equals(List.of(List.of()))) {
return Optional.empty();
}

ImmutableList.Builder<ConnectorExpression> projections = ImmutableList.builder();
ImmutableList.Builder<Assignment> resultAssignments = ImmutableList.builder();
ImmutableList.Builder<IcebergColumnHandle> aggregateColumnsBuilder = ImmutableList.builder();

Set<IcebergColumnHandle> projectionsSet = new HashSet<>();

if (aggregates.size() != 1) {
// not handling multiple aggregations for now
return Optional.empty();
}

AggregateFunction aggregate = aggregates.get(0);

Optional<AggregateExpression> rewriteResult = aggregateFunctionRewriter.rewrite(session, aggregate, assignments);
if (rewriteResult.isEmpty()) {
return Optional.empty();
}
AggregateExpression aggregateExpression = rewriteResult.get();

if (aggregateExpression.getFunction().startsWith("count")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not count don't we want to exit/return Optional.empty()?

IcebergColumnHandle aggregateIcebergColumnHandle = new IcebergColumnHandle(aggregateExpression.toColumnIdentity(AggregateExpression.COUNT_AGGREGATE_COLUMN_ID),
aggregate.getOutputType(), List.of(), aggregate.getOutputType(), false, Optional.empty());
aggregateColumnsBuilder.add(aggregateIcebergColumnHandle);
projections.add(new Variable(aggregateIcebergColumnHandle.getName(), aggregateIcebergColumnHandle.getType()));
projectionsSet.add(aggregateIcebergColumnHandle);
resultAssignments.add(new Assignment(aggregateIcebergColumnHandle.getName(), aggregateIcebergColumnHandle, aggregateIcebergColumnHandle.getType()));
}

IcebergTableHandle tableHandleTemp = new IcebergTableHandle(
tableHandle.getCatalog(),
tableHandle.getSchemaName(),
tableHandle.getTableName(),
tableHandle.getTableType(),
tableHandle.getSnapshotId(),
tableHandle.getTableSchemaJson(),
tableHandle.getPartitionSpecJson(),
tableHandle.getFormatVersion(),
tableHandle.getUnenforcedPredicate(),
tableHandle.getEnforcedPredicate(),
tableHandle.getLimit(),
projectionsSet,
tableHandle.getNameMappingJson(),
tableHandle.getTableLocation(),
tableHandle.getStorageProperties(),
tableHandle.isRecordScannedFiles(),
tableHandle.getMaxScannedFileSize(),
tableHandle.getConstraintColumns(),
tableHandle.getForAnalyze());

return Optional.of(new AggregationApplicationResult<>(tableHandleTemp, projections.build(), resultAssignments.build(), ImmutableMap.of(), false));
}

private boolean hasEqualityDeletes(ConnectorSession session, IcebergTableHandle tableHandle)
{
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

if (icebergTable.currentSnapshot().summary().containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the summary guaranteed to contain the total-equality-deletes if equality deletes exists?

i can't find it on https://iceberg.apache.org/spec/#snapshots

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, looks like Snapshot's summary is not updated in the spec.

Though it is being update here.

Seems to be this metric will be updated by equality delete operation, and we can use it.

Added a test-case to show with equalityDelete w/o aggregatePushDown count queries are good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I'd use anything that's not spec'd out as a source of truth for determining if equality deletes exist.
The reason is some other engine/process can technically write equality deletes but not write this summary property and then Trino would pass the check when it should not (potentially leading to correctness issues).

Beyond that there are other arguments to not using this snapshot summary property such as , what if none of the equality deletes even apply for a query with a given predicate? I think we'd still want to do the aggregation push down because it's safe to do so. However, with just a property > 0 check we'd say aggregation pushdown is not possible.

In the Spark Iceberg integration we actually do a scan planning, go through tasks and if a task has any deletes, we determine that we can't move forward with agg pushdown. This addresses this concern but I'm not sure yet how that would look like in the Trino model. I think the splits would need to plumb through this information?

return (Long.parseLong(icebergTable.currentSnapshot().summary().get(SnapshotSummary.TOTAL_EQ_DELETES_PROP)) > 0);
}

return false;
}

private static Set<Integer> identityPartitionColumnsInAllSpecs(Table table)
{
// Extract identity partition column source ids common to ALL specs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import io.trino.plugin.hive.parquet.ParquetPageSource;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext;
import io.trino.plugin.iceberg.aggregation.AggregateIcebergSplit;
import io.trino.plugin.iceberg.aggregation.AggregatePageSource;
import io.trino.plugin.iceberg.delete.DeleteFile;
import io.trino.plugin.iceberg.delete.DeleteFilter;
import io.trino.plugin.iceberg.delete.EqualityDeleteFilter;
Expand Down Expand Up @@ -251,36 +253,43 @@ public ConnectorPageSource createPageSource(
List<ColumnHandle> columns,
DynamicFilter dynamicFilter)
{
IcebergSplit split = (IcebergSplit) connectorSplit;
List<IcebergColumnHandle> icebergColumns = columns.stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList());
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTable;
Schema schema = SchemaParser.fromJson(tableHandle.getTableSchemaJson());
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, split.getPartitionSpecJson());
org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
.toArray(org.apache.iceberg.types.Type[]::new);

return createPageSource(
session,
icebergColumns,
schema,
partitionSpec,
PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes),
split.getDeletes(),
dynamicFilter,
tableHandle.getUnenforcedPredicate(),
split.getFileStatisticsDomain(),
split.getPath(),
split.getStart(),
split.getLength(),
split.getFileSize(),
split.getFileRecordCount(),
split.getPartitionDataJson(),
split.getFileFormat(),
split.getFileIoProperties(),
tableHandle.getNameMappingJson().map(NameMappingParser::fromJson));

if (shouldHandleAggregatePushDown(icebergColumns)) {
AggregateIcebergSplit aggregateIcebergSplit = (AggregateIcebergSplit) connectorSplit;
return new AggregatePageSource(icebergColumns, aggregateIcebergSplit.getTotalCount());
}
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: the else is redundant

IcebergSplit split = (IcebergSplit) connectorSplit;
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTable;
Schema schema = SchemaParser.fromJson(tableHandle.getTableSchemaJson());
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, split.getPartitionSpecJson());
org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
.toArray(org.apache.iceberg.types.Type[]::new);

return createPageSource(
session,
icebergColumns,
schema,
partitionSpec,
PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes),
split.getDeletes(),
dynamicFilter,
tableHandle.getUnenforcedPredicate(),
split.getFileStatisticsDomain(),
split.getPath(),
split.getStart(),
split.getLength(),
split.getFileSize(),
split.getFileRecordCount(),
split.getPartitionDataJson(),
split.getFileFormat(),
split.getFileIoProperties(),
tableHandle.getNameMappingJson().map(NameMappingParser::fromJson));
}
}

public ConnectorPageSource createPageSource(
Expand Down Expand Up @@ -1542,6 +1551,11 @@ private static TrinoException handleException(ParquetDataSourceId dataSourceId,
return new TrinoException(ICEBERG_CURSOR_ERROR, format("Failed to read Parquet file: %s", dataSourceId), exception);
}

private static boolean shouldHandleAggregatePushDown(List<IcebergColumnHandle> columns)
{
return columns.size() == 1 && columns.get(0).isAggregateColumn();
}

public static final class ReaderPageSourceWithRowPositions
{
private final ReaderPageSource readerPageSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public final class IcebergSessionProperties
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
public static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -348,6 +349,11 @@ public IcebergSessionProperties(
"Require filter on partition column",
icebergConfig.isQueryPartitionFilterRequired(),
false))
.add(booleanProperty(
AGGREGATION_PUSHDOWN_ENABLED,
"Enable Aggregation Pushdown",
icebergConfig.isAggregationPushdownEnabled(),
false))
.build();
}

Expand Down Expand Up @@ -568,4 +574,9 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

public static boolean isAggregationPushdownEnabled(ConnectorSession session)
{
return session.getProperty(AGGREGATION_PUSHDOWN_ENABLED, Boolean.class);
}
}
Loading