Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInAllSpecs;
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
Expand Down Expand Up @@ -1771,6 +1771,11 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
else {
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

Long snapshotId = table.getSnapshotId().orElseThrow(() -> new IllegalStateException("Snapshot id must be present"));
Set<Integer> partitionSpecIds = icebergTable.snapshot(snapshotId).allManifests().stream()
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet());

Map<IcebergColumnHandle, Domain> unsupported = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newEnforced = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newUnenforced = new LinkedHashMap<>();
Expand All @@ -1783,7 +1788,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
(columnHandle.getType() == UUID && !(domain.isOnlyNull() || domain.getValues().isAll()))) {
unsupported.put(columnHandle, domain);
}
else if (canEnforceColumnConstraintInAllSpecs(typeOperators, icebergTable, columnHandle, domain)) {
else if (canEnforceColumnConstraintInSpecs(typeOperators, icebergTable, partitionSpecIds, columnHandle, domain)) {
newEnforced.put(columnHandle, domain);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,15 @@ private static String quotedName(String name)
return '"' + name.replace("\"", "\"\"") + '"';
}

public static boolean canEnforceColumnConstraintInAllSpecs(TypeOperators typeOperators, Table table, IcebergColumnHandle columnHandle, Domain domain)
public static boolean canEnforceColumnConstraintInSpecs(
TypeOperators typeOperators,
Table table,
Set<Integer> partitionSpecIds,
IcebergColumnHandle columnHandle,
Domain domain)
{
return table.specs().values().stream()
.filter(partitionSpec -> partitionSpecIds.contains(partitionSpec.specId()))
.allMatch(spec -> canEnforceConstraintWithinPartitioningSpec(typeOperators, spec, columnHandle, domain));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,44 @@ else if (format == AVRO) {
dropTable("test_hour_transform");
}

@Test
public void testPartitionPredicatePushdownWithHistoricalPartitionSpecs()
{
// Start with a bucket transform, which cannot be used for predicate pushdown
String tableName = "test_partition_predicate_pushdown_with_historical_partition_specs";
assertUpdate("CREATE TABLE " + tableName + " (d TIMESTAMP(6), b INTEGER) WITH (partitioning = ARRAY['bucket(b, 3)'])");
@Language("SQL") String selectQuery = "SELECT b FROM " + tableName + " WHERE CAST(d AS date) < DATE '2015-01-02'";

@Language("SQL") String initialValues =
"(TIMESTAMP '1969-12-31 22:22:22.222222', 8)," +
"(TIMESTAMP '1969-12-31 23:33:11.456789', 9)," +
"(TIMESTAMP '1969-12-31 23:44:55.567890', 10)";
assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3);
assertThat(query(selectQuery))
.containsAll("VALUES 8, 9, 10")
.isNotFullyPushedDown(FilterNode.class);

@Language("SQL") String hourTransformValues =
"(TIMESTAMP '2015-01-01 10:01:23.123456', 1)," +
"(TIMESTAMP '2015-01-02 10:10:02.987654', 2)," +
"(TIMESTAMP '2015-01-03 10:55:00.456789', 3)";
// While the bucket transform is still used the hour transform still cannot be used for pushdown
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['hour(d)']");
assertUpdate("INSERT INTO " + tableName + " VALUES " + hourTransformValues, 3);
assertThat(query(selectQuery))
.containsAll("VALUES 1, 8, 9, 10")
.isNotFullyPushedDown(FilterNode.class);

// The old partition scheme is no longer used so pushdown using the hour transform is allowed
assertUpdate("DELETE FROM " + tableName + " WHERE year(d) = 1969", 3);
assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3);
assertThat(query(selectQuery))
.containsAll("VALUES 1, 8, 9, 10")
.isFullyPushedDown();

dropTable(tableName);
}

@Test
public void testDayTransformDate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testDereferencePushdown()

getQueryRunner().execute(format(
"CREATE TABLE %s (col0, col1) WITH (partitioning = ARRAY['col1']) AS" +
" SELECT CAST(row(5, 6) AS row(x bigint, y bigint)) AS col0, 5 AS col1 WHERE false",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an implicit change here that we can now enforce all predicates on empty tables. That broke this test because the table is empty but was not expecting a predicate to be enforced.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But doesn't it mean that now iceberg will fail on valid sql ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke with Konrad offline about this, not a problem

" SELECT CAST(row(5, 6) AS row(x bigint, y bigint)) AS col0, 5 AS col1",
testTable));

Session session = getQueryRunner().getDefaultSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,15 @@ public void testPredicatePushdown()
{
String tableName = "predicate_test";
tester().getQueryRunner().execute(format("CREATE TABLE %s (a, b) AS SELECT 5, 6", tableName));
Long snapshotId = (Long) tester().getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" LIMIT 1", tableName)).getOnlyValue();

PushPredicateIntoTableScan pushPredicateIntoTableScan = new PushPredicateIntoTableScan(tester().getPlannerContext(), tester().getTypeAnalyzer());

IcebergTableHandle icebergTable = new IcebergTableHandle(
SCHEMA_NAME,
tableName,
DATA,
Optional.of(1L),
Optional.of(snapshotId),
"",
"",
1,
Expand Down