From 1040340c4cad22bbd9b5c52af370a851c11f5019 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 13 Apr 2022 13:14:49 -0700 Subject: [PATCH 1/2] Core: Fix filter pushdown for metadata tables with evolved specs (#4520) --- .../org/apache/iceberg/BaseMetadataTable.java | 10 +- .../org/apache/iceberg/DataFilesTable.java | 22 +- .../org/apache/iceberg/PartitionsTable.java | 2 +- .../iceberg/TestMetadataTableScans.java | 197 ++++++++++++++++++ ...tMetadataTablesWithPartitionEvolution.java | 148 ++++++++++++- 5 files changed, 351 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index d9b2f797a447..f4966352ecd6 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -35,7 +35,6 @@ * deserialization. */ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable { - protected static final String PARTITION_FIELD_PREFIX = "partition."; private final PartitionSpec spec = PartitionSpec.unpartitioned(); private final SortOrder sortOrder = SortOrder.unsorted(); private final TableOperations ops; @@ -52,18 +51,17 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) { * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter * expression against the given metadata table. *

- * The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform. + * The resulting partition spec maps partition.X fields to partition X using an identity partition transform. * When this spec is used to project an expression for the given metadata table, the projection will remove - * predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields. + * predicates for non-partition fields (not in the spec) and will remove the "partition." prefix from fields. * * @param metadataTableSchema schema of the metadata table * @param spec spec on which the metadata table schema is based - * @param partitionPrefix prefix to remove from each field in the partition spec * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection */ - static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec, String partitionPrefix) { + static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) { PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false); - spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name())); + spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity")); return identitySpecBuilder.build(); } diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 31c0bea319e7..c20332b7703f 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -19,10 +19,12 @@ package org.apache.iceberg; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.util.Map; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ManifestEvaluator; -import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -108,16 +110,16 @@ protected CloseableIterable planFiles( Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); - // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions - Expression partitionFilter = Projections - .inclusive( - transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX), - caseSensitive) - .project(rowFilter); + Map specsById = table().specs(); - ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter( - partitionFilter, table().spec(), caseSensitive); - CloseableIterable filtered = CloseableIterable.filter(manifests, manifestEval::eval); + LoadingCache evalCache = Caffeine.newBuilder().build(specId -> { + PartitionSpec spec = specsById.get(specId); + PartitionSpec transformedSpec = transformSpec(fileSchema, spec); + return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive); + }); + + CloseableIterable filtered = CloseableIterable.filter(manifests, + manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); // Data tasks produce the table schema, not the projection schema and projection is done by processing engines. // This data task needs to use the table schema, which may not include a partition schema to avoid having an diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index cc9f716a4ddf..dbf1d39d11be 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -111,7 +111,7 @@ static CloseableIterable planFiles(StaticTableScan scan) { // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions Expression partitionFilter = Projections - .inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive) + .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive) .project(scan.filter()); ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests()) diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index af99014df6f2..c54ba16ece6e 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -34,6 +34,7 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -713,6 +714,202 @@ public void testPartitionColumnNamedPartition() throws Exception { validateIncludesPartitionScan(tasksAndEq, 0); } + @Test + public void testFilesTablePartitionFieldRemovalV1() { + Assume.assumeTrue(formatVersion == 1); + preparePartitionedTable(); + + // Change spec and add two data files + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField("id") + .commit(); + PartitionSpec newSpec = table.spec(); + + // Add two data files with new spec + PartitionKey data10Key = new PartitionKey(newSpec, table.schema()); + data10Key.set(1, 10); + DataFile data10 = DataFiles.builder(newSpec) + .withPath("/path/to/data-10.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data10Key) + .build(); + PartitionKey data11Key = new PartitionKey(newSpec, table.schema()); + data10Key.set(1, 11); + DataFile data11 = DataFiles.builder(newSpec) + .withPath("/path/to/data-11.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data11Key) + .build(); + + table.newFastAppend().appendFile(data10).commit(); + table.newFastAppend().appendFile(data11).commit(); + + Table metadataTable = new DataFilesTable(table.ops(), table); + Expression filter = Expressions.and( + Expressions.equal("partition.id", 10), + Expressions.greaterThan("record_count", 0)); + TableScan scan = metadataTable.newScan().filter(filter); + CloseableIterable tasks = scan.planFiles(); + + // All 4 original data files written by old spec, plus one data file written by new spec + Assert.assertEquals(5, Iterables.size(tasks)); + + // We cannot query old partition key in V1 due to https://github.com/apache/iceberg/pull/3411/ + } + + @Test + public void testFilesTablePartitionFieldRemovalV2() { + Assume.assumeTrue(formatVersion == 2); + preparePartitionedTable(); + + // Change spec and add two data and delete files each + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField("id").commit(); + PartitionSpec newSpec = table.spec(); + + // Add two data files and two delete files with new spec + DataFile data10 = DataFiles.builder(newSpec) + .withPath("/path/to/data-10.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartitionPath("id=10") + .build(); + DataFile data11 = DataFiles.builder(newSpec) + .withPath("/path/to/data-11.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartitionPath("id=11") + .build(); + + table.newFastAppend().appendFile(data10).commit(); + table.newFastAppend().appendFile(data11).commit(); + + Table metadataTable = new DataFilesTable(table.ops(), table); + Expression filter = Expressions.and( + Expressions.equal("partition.id", 10), + Expressions.greaterThan("record_count", 0)); + TableScan scan = metadataTable.newScan().filter(filter); + CloseableIterable tasks = scan.planFiles(); + + // All 4 original data files written by old spec, plus one new data file written by new spec + Assert.assertEquals(5, Iterables.size(tasks)); + + filter = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + scan = metadataTable.newScan().filter(filter); + tasks = scan.planFiles(); + + // 1 original data files written by old spec, plus both of new data file written by new spec + Assert.assertEquals(3, Iterables.size(tasks)); + } + + @Test + public void testFilesTablePartitionFieldAddV1() { + Assume.assumeTrue(formatVersion == 1); + preparePartitionedTable(); + + // Change spec and add two data files + table.updateSpec() + .addField("id") + .commit(); + PartitionSpec newSpec = table.spec(); + + // Add two data files with new spec + PartitionKey data10Key = new PartitionKey(newSpec, table.schema()); + data10Key.set(0, 0); // data=0 + data10Key.set(1, 10); // id=10 + DataFile data10 = DataFiles.builder(newSpec) + .withPath("/path/to/data-10.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data10Key) + .build(); + PartitionKey data11Key = new PartitionKey(newSpec, table.schema()); + data11Key.set(0, 1); // data=0 + data10Key.set(1, 11); // id=11 + DataFile data11 = DataFiles.builder(newSpec) + .withPath("/path/to/data-11.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartition(data11Key) + .build(); + + table.newFastAppend().appendFile(data10).commit(); + table.newFastAppend().appendFile(data11).commit(); + + Table metadataTable = new DataFilesTable(table.ops(), table); + Expression filter = Expressions.and( + Expressions.equal("partition.id", 10), + Expressions.greaterThan("record_count", 0)); + TableScan scan = metadataTable.newScan().filter(filter); + CloseableIterable tasks = scan.planFiles(); + + // All 4 original data files written by old spec, plus one new data file written by new spec + Assert.assertEquals(5, Iterables.size(tasks)); + + filter = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + scan = metadataTable.newScan().filter(filter); + tasks = scan.planFiles(); + + // 1 original data file written by old spec, plus 1 new data file written by new spec + Assert.assertEquals(2, Iterables.size(tasks)); + } + + @Test + public void testPartitionSpecEvolutionAdditiveV2() { + Assume.assumeTrue(formatVersion == 2); + preparePartitionedTable(); + + // Change spec and add two data and delete files each + table.updateSpec() + .addField("id") + .commit(); + PartitionSpec newSpec = table.spec(); + + // Add two data files and two delete files with new spec + DataFile data10 = DataFiles.builder(newSpec) + .withPath("/path/to/data-10.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0/id=10") + .build(); + DataFile data11 = DataFiles.builder(newSpec) + .withPath("/path/to/data-11.parquet") + .withRecordCount(10) + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=1/id=11") + .build(); + + table.newFastAppend().appendFile(data10).commit(); + table.newFastAppend().appendFile(data11).commit(); + + Table metadataTable = new DataFilesTable(table.ops(), table); + Expression filter = Expressions.and( + Expressions.equal("partition.id", 10), + Expressions.greaterThan("record_count", 0)); + TableScan scan = metadataTable.newScan().filter(filter); + CloseableIterable tasks = scan.planFiles(); + + // All 4 original data files written by old spec, plus one new data file written by new spec + Assert.assertEquals(5, Iterables.size(tasks)); + + filter = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + scan = metadataTable.newScan().filter(filter); + tasks = scan.planFiles(); + + // 1 original data files written by old spec, plus 1 of new data file written by new spec + Assert.assertEquals(2, Iterables.size(tasks)); + } + private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException { try (CloseableIterable tasks = scan.planTasks()) { Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java index 0e690bd27d1a..a93fef0badfd 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java @@ -22,8 +22,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; @@ -74,14 +72,40 @@ public static Object[][] parameters() { "default-namespace", "default" ), ORC, - formatVersion() + 1 + }, + { "testhive", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default" + ), + ORC, + 2 + }, + { "testhadoop", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hadoop" + ), + PARQUET, + 1 }, { "testhadoop", SparkCatalog.class.getName(), ImmutableMap.of( "type", "hadoop" ), PARQUET, - formatVersion() + 2 + }, + { "spark_catalog", SparkSessionCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "clients", "1", + "parquet-enabled", "false", + "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync + ), + AVRO, + 1 }, { "spark_catalog", SparkSessionCatalog.class.getName(), ImmutableMap.of( @@ -92,17 +116,11 @@ public static Object[][] parameters() { "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync ), AVRO, - formatVersion() + 2 } }; } - private static int formatVersion() { - return RANDOM.nextInt(2) + 1; - } - - private static final Random RANDOM = ThreadLocalRandom.current(); - private final FileFormat fileFormat; private final int formatVersion; @@ -189,6 +207,106 @@ public void testFilesMetadataTable() throws ParseException { } } + @Test + public void testFilesMetadataTableFilter() throws ParseException { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " + + "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName); + initTable(); + + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // verify the metadata tables while the current spec is still unpartitioned + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + Dataset df = loadMetadataTable(tableType); + Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty()); + } + + Table table = validationCatalog.loadTable(tableIdent); + + table.updateSpec() + .addField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // verify the metadata tables after adding the first partition column + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row("d2")), + "STRUCT", + tableType, + "partition.data = 'd2'"); + } + + table.updateSpec() + .addField("category") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // verify the metadata tables after adding the second partition column + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions(ImmutableList.of(row("d2", null), row("d2", "c2")), + "STRUCT", + tableType, + "partition.data = 'd2'"); + } + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row("d2", "c2")), + "STRUCT", + tableType, + "partition.category = 'c2'"); + } + + table.updateSpec() + .removeField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + + // Verify new partitions do not show up for removed 'partition.data=d2' query + sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd2')", tableName); + sql("INSERT INTO TABLE %s VALUES (4, 'c4', 'd2')", tableName); + + // Verify new partitions do show up for 'partition.category=c2' query + sql("INSERT INTO TABLE %s VALUES (5, 'c2', 'd5')", tableName); + + // no new partition should show up for 'data' partition query as partition field has been removed + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row("d2", null), row("d2", "c2")), + "STRUCT", + tableType, + "partition.data = 'd2'"); + } + // new partition shows up from 'category' partition field query + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row(null, "c2"), row("d2", "c2")), + "STRUCT", + tableType, + "partition.category = 'c2'"); + } + + table.updateSpec() + .renameField("category", "category_another_name") + .commit(); + sql("REFRESH TABLE %s", tableName); + + // Verify new partitions do show up for 'category=c2' query + sql("INSERT INTO TABLE %s VALUES (6, 'c2', 'd6')", tableName); + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row(null, "c2"), row(null, "c2"), row("d2", "c2")), + "STRUCT", + tableType, + "partition.category_another_name = 'c2'"); + } + } + @Test public void testEntriesMetadataTable() throws ParseException { sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); @@ -299,7 +417,15 @@ public void testPartitionColumnNamedPartition() { private void assertPartitions(List expectedPartitions, String expectedTypeAsString, MetadataTableType tableType) throws ParseException { + assertPartitions(expectedPartitions, expectedTypeAsString, tableType, null); + } + + private void assertPartitions(List expectedPartitions, String expectedTypeAsString, + MetadataTableType tableType, String filter) throws ParseException { Dataset df = loadMetadataTable(tableType); + if (filter != null) { + df = df.filter(filter); + } DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString); switch (tableType) { From 0ad1a2998fd7911e83d7bf2d45354f11b1d7d359 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 18 Apr 2022 15:22:46 -0700 Subject: [PATCH 2/2] Enable test after rebasing fix #4572 --- .../java/org/apache/iceberg/TestMetadataTableScans.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index c54ba16ece6e..3bc4c438609e 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -757,7 +757,14 @@ public void testFilesTablePartitionFieldRemovalV1() { // All 4 original data files written by old spec, plus one data file written by new spec Assert.assertEquals(5, Iterables.size(tasks)); - // We cannot query old partition key in V1 due to https://github.com/apache/iceberg/pull/3411/ + filter = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + scan = metadataTable.newScan().filter(filter); + tasks = scan.planFiles(); + + // 1 original data file written by old spec (V1 filters out new specs which don't have this value) + Assert.assertEquals(1, Iterables.size(tasks)); } @Test