Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.AbstractSequentialIterator;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -66,8 +67,10 @@
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableMap;

public class TableStatisticsReader
public final class TableStatisticsReader
{
private TableStatisticsReader() {}

private static final Logger log = Logger.get(TableStatisticsReader.class);

// TODO (https://github.com/trinodb/trino/issues/15397): remove support for Trino-specific statistics properties
Expand All @@ -85,35 +88,39 @@ public class TableStatisticsReader

public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv";

private final TypeManager typeManager;
private final ConnectorSession session;
private final Table icebergTable;

private TableStatisticsReader(TypeManager typeManager, ConnectorSession session, Table icebergTable)
{
this.typeManager = typeManager;
this.session = session;
this.icebergTable = icebergTable;
}

public static TableStatistics getTableStatistics(TypeManager typeManager, ConnectorSession session, IcebergTableHandle tableHandle, Table icebergTable)
{
return new TableStatisticsReader(typeManager, session, icebergTable).makeTableStatistics(tableHandle);
return makeTableStatistics(
typeManager,
icebergTable,
tableHandle.getSnapshotId(),
tableHandle.getEnforcedPredicate(),
tableHandle.getUnenforcedPredicate(),
isExtendedStatisticsEnabled(session));
}

private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle)
@VisibleForTesting
public static TableStatistics makeTableStatistics(
TypeManager typeManager,
Table icebergTable,
Optional<Long> snapshot,
TupleDomain<IcebergColumnHandle> enforcedConstraint,
TupleDomain<IcebergColumnHandle> unenforcedConstraint,
boolean extendedStatisticsEnabled)
{
if (tableHandle.getSnapshotId().isEmpty()) {
if (snapshot.isEmpty()) {
// No snapshot, so no data.
return TableStatistics.builder()
.setRowCount(Estimate.of(0))
.build();
}
long snapshotId = tableHandle.getSnapshotId().get();

TupleDomain<IcebergColumnHandle> enforcedPredicate = tableHandle.getEnforcedPredicate();
long snapshotId = snapshot.get();

if (enforcedPredicate.isNone()) {
// Including both enforced and unenforced constraint matches how Splits will eventually be generated and allows
// us to provide more accurate estimates. Stats will be estimated again by FilterStatsCalculator based on the
// unenforced constraint.
TupleDomain<IcebergColumnHandle> effectivePredicate = enforcedConstraint.intersect(unenforcedConstraint);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's capture some info from #16244 (comment) conversation
like that this matches split gen

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to sum it up in the commit message, let me know if theres anything missing.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to sum it up in the commit message,

code comments are easier to notice (and serve slightly different purpose)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a code comment as well

if (effectivePredicate.isNone()) {
return TableStatistics.builder()
.setRowCount(Estimate.of(0))
.build();
Expand All @@ -130,7 +137,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle)
.collect(toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(enforcedPredicate))
.filter(toIcebergExpression(effectivePredicate))
.useSnapshot(snapshotId)
.includeColumnStats();

Expand All @@ -151,10 +158,12 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle)
}

Map<Integer, Long> ndvs = readNdvs(
icebergTable,
snapshotId,
// TODO We don't need NDV information for columns not involved in filters/joins. Engine should provide set of columns
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding this TODO (not related to current PR), the engine does provide the columns required for the query (various connectors save this in their ConnectorTableHandle after applyProjection). Do we try to parse all columns here or only the columns accessed by the query ?

// it makes sense to find NDV information for.
idToColumnHandle.keySet());
idToColumnHandle.keySet(),
extendedStatisticsEnabled);

ImmutableMap.Builder<ColumnHandle, ColumnStatistics> columnHandleBuilder = ImmutableMap.builder();
double recordCount = summary.getRecordCount();
Expand Down Expand Up @@ -210,9 +219,9 @@ else if (columnHandle.getBaseType() == VARBINARY) {
return new TableStatistics(Estimate.of(recordCount), columnHandleBuilder.buildOrThrow());
}

private Map<Integer, Long> readNdvs(long snapshotId, Set<Integer> columnIds)
private static Map<Integer, Long> readNdvs(Table icebergTable, long snapshotId, Set<Integer> columnIds, boolean extendedStatisticsEnabled)
{
if (!isExtendedStatisticsEnabled(session)) {
Comment thread
findepi marked this conversation as resolved.
Outdated
if (!extendedStatisticsEnabled) {
return ImmutableMap.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.filesystem.TrinoFileSystem;
Expand All @@ -27,9 +28,16 @@
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.TestingTypeManager;
import io.trino.spi.type.TypeManager;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionField;
Expand Down Expand Up @@ -67,6 +75,7 @@
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tpch.TpchTable.NATION;
Expand Down Expand Up @@ -473,6 +482,43 @@ public void testDeletingEntirePartitionedTable()
assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(0);
}

@Test
public void testStatsFilePruning()
{
try (TestTable testTable = new TestTable(getQueryRunner()::execute, "test_stats_file_pruning_", "(a INT, b INT) WITH (partitioning = ARRAY['b'])")) {
assertUpdate("INSERT INTO " + testTable.getName() + " VALUES (1, 10), (10, 10)", 2);
assertUpdate("INSERT INTO " + testTable.getName() + " VALUES (200, 10), (300, 20)", 2);

Optional<Long> snapshotId = Optional.of((long) computeScalar("SELECT snapshot_id FROM \"" + testTable.getName() + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"));
TypeManager typeManager = new TestingTypeManager();
Table table = loadTable(testTable.getName());
TableStatistics withNoFilter = TableStatisticsReader.makeTableStatistics(typeManager, table, snapshotId, TupleDomain.all(), TupleDomain.all(), true);
assertEquals(withNoFilter.getRowCount().getValue(), 4.0);

TableStatistics withPartitionFilter = TableStatisticsReader.makeTableStatistics(
typeManager,
table,
snapshotId,
TupleDomain.withColumnDomains(ImmutableMap.of(
new IcebergColumnHandle(ColumnIdentity.primitiveColumnIdentity(1, "b"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()),
Domain.singleValue(INTEGER, 10L))),
TupleDomain.all(),
true);
assertEquals(withPartitionFilter.getRowCount().getValue(), 3.0);

TableStatistics withUnenforcedFilter = TableStatisticsReader.makeTableStatistics(
typeManager,
table,
snapshotId,
TupleDomain.all(),
TupleDomain.withColumnDomains(ImmutableMap.of(
new IcebergColumnHandle(ColumnIdentity.primitiveColumnIdentity(0, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()),
Domain.create(ValueSet.ofRanges(Range.greaterThan(INTEGER, 100L)), true))),
true);
assertEquals(withUnenforcedFilter.getRowCount().getValue(), 2.0);
}
}

private void writeEqualityDeleteToNationTable(Table icebergTable)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ cross join:
cross join:
cross join:
cross join:
final aggregation over ()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi @raunaqmorarka is there a good way to tell if these changes are good or not?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, just run the benchmarks :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexjo2144 do you have the results?

local exchange (GATHER, SINGLE, [])
remote exchange (GATHER, SINGLE, [])
partial aggregation over ()
scan store_sales
cross join:
final aggregation over ()
local exchange (GATHER, SINGLE, [])
Expand All @@ -21,11 +26,6 @@ cross join:
local exchange (GATHER, SINGLE, [])
remote exchange (GATHER, SINGLE, [])
scan reason
final aggregation over ()
local exchange (GATHER, SINGLE, [])
remote exchange (GATHER, SINGLE, [])
partial aggregation over ()
scan store_sales
final aggregation over ()
local exchange (GATHER, SINGLE, [])
remote exchange (GATHER, SINGLE, [])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ local exchange (GATHER, SINGLE, [])
scan lineitem
local exchange (GATHER, SINGLE, [])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["orderkey_4"])
scan orders
remote exchange (REPARTITION, HASH, ["orderkey"])
join (INNER, REPLICATED):
scan lineitem
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
join (INNER, REPLICATED):
scan supplier
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan nation
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["orderkey"])
join (INNER, REPLICATED):
scan lineitem
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
join (INNER, REPLICATED):
scan supplier
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan nation
remote exchange (REPARTITION, HASH, ["orderkey_4"])
scan orders
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["orderkey_32"])
scan lineitem