diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java index b5ed9e369d1e..7c4195da2fb5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.AbstractIterator; import com.google.common.collect.AbstractSequentialIterator; import com.google.common.collect.ImmutableMap; @@ -66,8 +67,10 @@ import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableMap; -public class TableStatisticsReader +public final class TableStatisticsReader { + private TableStatisticsReader() {} + private static final Logger log = Logger.get(TableStatisticsReader.class); // TODO (https://github.com/trinodb/trino/issues/15397): remove support for Trino-specific statistics properties @@ -85,35 +88,39 @@ public class TableStatisticsReader public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; - private final TypeManager typeManager; - private final ConnectorSession session; - private final Table icebergTable; - - private TableStatisticsReader(TypeManager typeManager, ConnectorSession session, Table icebergTable) - { - this.typeManager = typeManager; - this.session = session; - this.icebergTable = icebergTable; - } - public static TableStatistics getTableStatistics(TypeManager typeManager, ConnectorSession session, IcebergTableHandle tableHandle, Table icebergTable) { - return new TableStatisticsReader(typeManager, session, icebergTable).makeTableStatistics(tableHandle); + return makeTableStatistics( + typeManager, + icebergTable, + tableHandle.getSnapshotId(), + tableHandle.getEnforcedPredicate(), + tableHandle.getUnenforcedPredicate(), + isExtendedStatisticsEnabled(session)); } - private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle) + @VisibleForTesting + public static TableStatistics makeTableStatistics( + TypeManager typeManager, + Table icebergTable, + Optional snapshot, + TupleDomain enforcedConstraint, + TupleDomain unenforcedConstraint, + boolean extendedStatisticsEnabled) { - if (tableHandle.getSnapshotId().isEmpty()) { + if (snapshot.isEmpty()) { // No snapshot, so no data. return TableStatistics.builder() .setRowCount(Estimate.of(0)) .build(); } - long snapshotId = tableHandle.getSnapshotId().get(); - - TupleDomain enforcedPredicate = tableHandle.getEnforcedPredicate(); + long snapshotId = snapshot.get(); - if (enforcedPredicate.isNone()) { + // Including both enforced and unenforced constraint matches how Splits will eventually be generated and allows + // us to provide more accurate estimates. Stats will be estimated again by FilterStatsCalculator based on the + // unenforced constraint. + TupleDomain effectivePredicate = enforcedConstraint.intersect(unenforcedConstraint); + if (effectivePredicate.isNone()) { return TableStatistics.builder() .setRowCount(Estimate.of(0)) .build(); @@ -130,7 +137,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle) .collect(toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); TableScan tableScan = icebergTable.newScan() - .filter(toIcebergExpression(enforcedPredicate)) + .filter(toIcebergExpression(effectivePredicate)) .useSnapshot(snapshotId) .includeColumnStats(); @@ -151,10 +158,12 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle) } Map ndvs = readNdvs( + icebergTable, snapshotId, // TODO We don't need NDV information for columns not involved in filters/joins. Engine should provide set of columns // it makes sense to find NDV information for. - idToColumnHandle.keySet()); + idToColumnHandle.keySet(), + extendedStatisticsEnabled); ImmutableMap.Builder columnHandleBuilder = ImmutableMap.builder(); double recordCount = summary.getRecordCount(); @@ -210,9 +219,9 @@ else if (columnHandle.getBaseType() == VARBINARY) { return new TableStatistics(Estimate.of(recordCount), columnHandleBuilder.buildOrThrow()); } - private Map readNdvs(long snapshotId, Set columnIds) + private static Map readNdvs(Table icebergTable, long snapshotId, Set columnIds, boolean extendedStatisticsEnabled) { - if (!isExtendedStatisticsEnabled(session)) { + if (!extendedStatisticsEnabled) { return ImmutableMap.of(); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index e331984eb0ab..49204ed39c5a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.trino.Session; import io.trino.filesystem.TrinoFileSystem; @@ -27,9 +28,16 @@ import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.Range; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.predicate.ValueSet; +import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.TestingTypeManager; +import io.trino.spi.type.TypeManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionField; @@ -67,6 +75,7 @@ import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tpch.TpchTable.NATION; @@ -473,6 +482,43 @@ public void testDeletingEntirePartitionedTable() assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(0); } + @Test + public void testStatsFilePruning() + { + try (TestTable testTable = new TestTable(getQueryRunner()::execute, "test_stats_file_pruning_", "(a INT, b INT) WITH (partitioning = ARRAY['b'])")) { + assertUpdate("INSERT INTO " + testTable.getName() + " VALUES (1, 10), (10, 10)", 2); + assertUpdate("INSERT INTO " + testTable.getName() + " VALUES (200, 10), (300, 20)", 2); + + Optional snapshotId = Optional.of((long) computeScalar("SELECT snapshot_id FROM \"" + testTable.getName() + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES")); + TypeManager typeManager = new TestingTypeManager(); + Table table = loadTable(testTable.getName()); + TableStatistics withNoFilter = TableStatisticsReader.makeTableStatistics(typeManager, table, snapshotId, TupleDomain.all(), TupleDomain.all(), true); + assertEquals(withNoFilter.getRowCount().getValue(), 4.0); + + TableStatistics withPartitionFilter = TableStatisticsReader.makeTableStatistics( + typeManager, + table, + snapshotId, + TupleDomain.withColumnDomains(ImmutableMap.of( + new IcebergColumnHandle(ColumnIdentity.primitiveColumnIdentity(1, "b"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()), + Domain.singleValue(INTEGER, 10L))), + TupleDomain.all(), + true); + assertEquals(withPartitionFilter.getRowCount().getValue(), 3.0); + + TableStatistics withUnenforcedFilter = TableStatisticsReader.makeTableStatistics( + typeManager, + table, + snapshotId, + TupleDomain.all(), + TupleDomain.withColumnDomains(ImmutableMap.of( + new IcebergColumnHandle(ColumnIdentity.primitiveColumnIdentity(0, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()), + Domain.create(ValueSet.ofRanges(Range.greaterThan(INTEGER, 100L)), true))), + true); + assertEquals(withUnenforcedFilter.getRowCount().getValue(), 2.0); + } + } + private void writeEqualityDeleteToNationTable(Table icebergTable) throws Exception { diff --git a/testing/trino-tests/src/test/resources/sql/presto/tpcds/iceberg/parquet/unpartitioned/q09.plan.txt b/testing/trino-tests/src/test/resources/sql/presto/tpcds/iceberg/parquet/unpartitioned/q09.plan.txt index 4466a2d88feb..5660bb85f04e 100644 --- a/testing/trino-tests/src/test/resources/sql/presto/tpcds/iceberg/parquet/unpartitioned/q09.plan.txt +++ b/testing/trino-tests/src/test/resources/sql/presto/tpcds/iceberg/parquet/unpartitioned/q09.plan.txt @@ -12,6 +12,11 @@ cross join: cross join: cross join: cross join: + final aggregation over () + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + partial aggregation over () + scan store_sales cross join: final aggregation over () local exchange (GATHER, SINGLE, []) @@ -21,11 +26,6 @@ cross join: local exchange (GATHER, SINGLE, []) remote exchange (GATHER, SINGLE, []) scan reason - final aggregation over () - local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - partial aggregation over () - scan store_sales final aggregation over () local exchange (GATHER, SINGLE, []) remote exchange (GATHER, SINGLE, []) diff --git a/testing/trino-tests/src/test/resources/sql/presto/tpch/iceberg/orc/partitioned/q21.plan.txt b/testing/trino-tests/src/test/resources/sql/presto/tpch/iceberg/orc/partitioned/q21.plan.txt index 313eb576ce37..a5c8b20e6ec8 100644 --- a/testing/trino-tests/src/test/resources/sql/presto/tpch/iceberg/orc/partitioned/q21.plan.txt +++ b/testing/trino-tests/src/test/resources/sql/presto/tpch/iceberg/orc/partitioned/q21.plan.txt @@ -14,19 +14,19 @@ local exchange (GATHER, SINGLE, []) scan lineitem local exchange (GATHER, SINGLE, []) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["orderkey_4"]) - scan orders + remote exchange (REPARTITION, HASH, ["orderkey"]) + join (INNER, REPLICATED): + scan lineitem + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + join (INNER, REPLICATED): + scan supplier + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan nation local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["orderkey"]) - join (INNER, REPLICATED): - scan lineitem - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - join (INNER, REPLICATED): - scan supplier - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan nation + remote exchange (REPARTITION, HASH, ["orderkey_4"]) + scan orders local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["orderkey_32"]) scan lineitem