diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index b8a672ee327a..32ae8c9c9459 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -19,9 +19,10 @@ package org.apache.iceberg; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import java.util.Map; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -149,14 +150,15 @@ static CloseableIterable planFiles(StaticTableScan scan) { Snapshot snapshot = table.snapshot(scan.snapshot().snapshotId()); boolean caseSensitive = scan.isCaseSensitive(); - // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions - Expression partitionFilter = Projections - .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive) - .project(scan.filter()); + LoadingCache evalCache = Caffeine.newBuilder().build(specId -> { + PartitionSpec spec = table.specs().get(specId); + PartitionSpec transformedSpec = transformSpec(scan.schema(), spec); + return ManifestEvaluator.forRowFilter(scan.filter(), transformedSpec, caseSensitive); + }); ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests()) .caseSensitive(caseSensitive) - .filterPartitions(partitionFilter) + .filterManifests(m -> evalCache.get(m.partitionSpecId()).eval(m)) .select(scan.colStats() ? DataTableScan.SCAN_WITH_STATS_COLUMNS : DataTableScan.SCAN_COLUMNS) .specsById(scan.table().specs()) .ignoreDeleted(); diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 4846ad43579d..0d2316046fc2 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -550,6 +550,126 @@ public void testDeleteFilesTableSelection() throws IOException { Assert.assertEquals(expected, scan.schema().asStruct()); } + @Test + public void testPartitionSpecEvolutionAdditive() { + preparePartitionedTable(); + + // Change spec and add two data files + table.updateSpec() + .addField("id") + .commit(); + PartitionSpec newSpec = table.spec(); + + // Add two data files with new spec + PartitionKey data10Key = new PartitionKey(newSpec, table.schema()); + data10Key.set(0, 0); // data=0 + data10Key.set(1, 10); // id=10 + DataFile data10 = DataFiles.builder(newSpec) + .withPath("/path/to/data-10.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data10Key) + .build(); + PartitionKey data11Key = new PartitionKey(newSpec, table.schema()); + data11Key.set(0, 1); // data=0 + data10Key.set(1, 11); // id=11 + DataFile data11 = DataFiles.builder(newSpec) + .withPath("/path/to/data-11.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data11Key) + .build(); + + table.newFastAppend().appendFile(data10).commit(); + table.newFastAppend().appendFile(data11).commit(); + + Table metadataTable = new PartitionsTable(table.ops(), table); + Expression filter = Expressions.and( + Expressions.equal("partition.id", 10), + Expressions.greaterThan("record_count", 0)); + TableScan scan = metadataTable.newScan().filter(filter); + CloseableIterable tasks = PartitionsTable.planFiles((StaticTableScan) scan); + + // Four data files of old spec, one new data file of new spec + Assert.assertEquals(5, Iterables.size(tasks)); + + filter = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + scan = metadataTable.newScan().filter(filter); + tasks = PartitionsTable.planFiles((StaticTableScan) scan); + + // 1 original data file written by old spec, plus 1 new data file written by new spec + Assert.assertEquals(2, Iterables.size(tasks)); + } + + @Test + public void testPartitionSpecEvolutionRemoval() { + preparePartitionedTable(); + + // Remove partition field + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField("id") + .commit(); + PartitionSpec newSpec = table.spec(); + + // Add two data files with new spec + // Partition Fields are replaced in V1 with void and actually removed in V2 + int partIndex = (formatVersion == 1) ? 1 : 0; + PartitionKey data10Key = new PartitionKey(newSpec, table.schema()); + data10Key.set(partIndex, 10); + DataFile data10 = DataFiles.builder(newSpec) + .withPath("/path/to/data-10.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data10Key) + .build(); + PartitionKey data11Key = new PartitionKey(newSpec, table.schema()); + data11Key.set(partIndex, 11); + DataFile data11 = DataFiles.builder(newSpec) + .withPath("/path/to/data-11.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data11Key) + .build(); + + table.newFastAppend().appendFile(data10).commit(); + table.newFastAppend().appendFile(data11).commit(); + + Table metadataTable = new PartitionsTable(table.ops(), table); + Expression filter = Expressions.and( + Expressions.equal("partition.id", 10), + Expressions.greaterThan("record_count", 0)); + TableScan scan = metadataTable.newScan().filter(filter); + CloseableIterable tasks = PartitionsTable.planFiles((StaticTableScan) scan); + + // Four original files of original spec, one data file written by new spec + Assert.assertEquals(5, Iterables.size(tasks)); + + // Filter for a dropped partition spec field. Correct behavior is that only old partitions are returned. + filter = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + scan = metadataTable.newScan().filter(filter); + tasks = PartitionsTable.planFiles((StaticTableScan) scan); + + if (formatVersion == 1) { + // 1 original data file written by old spec + Assert.assertEquals(1, Iterables.size(tasks)); + } else { + // 1 original data/delete files written by old spec, plus both of new data file/delete file written by new spec + // + // Unlike in V1, V2 does not write (data=null) on newer files' partition data, so these cannot be filtered out + // early in scan planning here. + // + // However, these partition rows are filtered out later in Spark data filtering, as the newer partitions + // will have 'data=null' field added as part of normalization to the Partitions table final schema. + // The Partitions table final schema is a union of fields of all specs, including dropped fields. + Assert.assertEquals(3, Iterables.size(tasks)); + } + } + @Test public void testPartitionColumnNamedPartition() throws Exception { TestTables.clearTables(); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java index 156b53ace375..7519486bc5c7 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java @@ -382,8 +382,7 @@ public void testEntriesMetadataTable() throws ParseException { } @Test public void testPartitionsTableAddRemoveFields() throws ParseException { - sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " + - "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName); + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg ", tableName); initTable(); sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); @@ -447,8 +446,7 @@ public void testPartitionsTableAddRemoveFields() throws ParseException { @Test public void testPartitionsTableRenameFields() throws ParseException { - sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " + - "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName); + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); initTable(); Table table = validationCatalog.loadTable(tableIdent); @@ -546,6 +544,154 @@ public void testPartitionsTableSwitchFields() throws Exception { PARTITIONS); } + @Test + public void testPartitionTableFilterAddRemoveFields() throws ParseException { + // Create un-partitioned table + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); + initTable(); + + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // Partition Table with one partition column + Table table = validationCatalog.loadTable(tableIdent); + table.updateSpec() + .addField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions( + ImmutableList.of(row("d2")), + "STRUCT", + PARTITIONS, + "partition.data = 'd2'"); + + // Partition Table with two partition column + table.updateSpec() + .addField("category") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions(ImmutableList.of(row("d2", null), row("d2", "c2")), + "STRUCT", + PARTITIONS, + "partition.data = 'd2'"); + assertPartitions( + ImmutableList.of(row("d2", "c2")), + "STRUCT", + PARTITIONS, + "partition.category = 'c2'"); + + // Partition Table with first partition column removed + table.updateSpec() + .removeField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + + sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd2')", tableName); + sql("INSERT INTO TABLE %s VALUES (4, 'c4', 'd2')", tableName); + sql("INSERT INTO TABLE %s VALUES (5, 'c2', 'd5')", tableName); + assertPartitions( + ImmutableList.of(row("d2", null), row("d2", "c2")), + "STRUCT", + PARTITIONS, + "partition.data = 'd2'"); + assertPartitions( + ImmutableList.of(row(null, "c2"), row("d2", "c2")), + "STRUCT", + PARTITIONS, + "partition.category = 'c2'"); + } + + @Test + public void testPartitionTableFilterSwitchFields() throws Exception { + // Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292 + // In V1, dropped partition fields show separately when field is re-added + // In V2, re-added field currently conflicts with its deleted form + Assume.assumeTrue(formatVersion == 1); + + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); + initTable(); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + Table table = validationCatalog.loadTable(tableIdent); + + // Two partition columns + table.updateSpec() + .addField("data") + .addField("category") + .commit(); + sql("REFRESH TABLE %s", tableName); + + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // Drop first partition column + table.updateSpec() + .removeField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // Re-add first partition column at the end + table.updateSpec() + .addField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions( + ImmutableList.of( + row(null, "c2", null), + row(null, "c2", "d2"), + row("d2", "c2", null)), + "STRUCT", + PARTITIONS, + "partition.category = 'c2'"); + + assertPartitions( + ImmutableList.of(row(null, "c1", "d1")), + "STRUCT", + PARTITIONS, + "partition.data = 'd1'"); + } + + @Test + public void testPartitionsTableFilterRenameFields() throws ParseException { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); + initTable(); + + Table table = validationCatalog.loadTable(tableIdent); + + table.updateSpec() + .addField("data") + .addField("category") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + table.updateSpec() + .renameField("category", "category_another_name") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions( + ImmutableList.of(row("d1", "c1")), + "STRUCT", + PARTITIONS, + "partition.category_another_name = 'c1'"); + } + @Test public void testMetadataTablesWithUnknownTransforms() { sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);