diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 2796e145d5de..b8a672ee327a 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -26,7 +26,6 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.StructLikeWrapper; /** * A {@link Table} implementation that exposes a table's partitions as rows. @@ -72,7 +71,7 @@ MetadataTableType metadataTableType() { private DataTask task(StaticTableScan scan) { TableOperations ops = operations(); - Iterable partitions = partitions(scan); + Iterable partitions = partitions(table(), scan); if (table().spec().fields().size() < 1) { // the table is unpartitioned, partitions contains only the root partition return StaticDataTask.of( @@ -93,16 +92,57 @@ private static StaticDataTask.Row convertPartition(Partition partition) { return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount, partition.specId); } - private static Iterable partitions(StaticTableScan scan) { + private static Iterable partitions(Table table, StaticTableScan scan) { CloseableIterable tasks = planFiles(scan); + Types.StructType normalizedPartitionType = Partitioning.partitionType(table); + PartitionMap partitions = new PartitionMap(); + + // cache a position map needed by each partition spec to normalize partitions to final schema + Map normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size()); - PartitionMap partitions = new PartitionMap(scan.table().spec().partitionType()); for (FileScanTask task : tasks) { - partitions.get(task.file().partition()).update(task.file()); + PartitionData original = (PartitionData) task.file().partition(); + int[] normalizedPositions = normalizedPositionsBySpec.computeIfAbsent( + task.spec().specId(), specId -> normalizedPositions(table, specId, normalizedPartitionType)); + PartitionData normalized = normalizePartition(original, normalizedPartitionType, normalizedPositions); + partitions.get(normalized).update(task.file()); } return partitions.all(); } + /** + * Builds an array of the field position of positions in the normalized partition type indexed by + * field position in the original partition type + */ + private static int[] normalizedPositions(Table table, int specId, Types.StructType normalizedType) { + Types.StructType originalType = table.specs().get(specId).partitionType(); + int[] normalizedPositions = new int[originalType.fields().size()]; + for (int originalIndex = 0; originalIndex < originalType.fields().size(); originalIndex++) { + Types.NestedField normalizedField = normalizedType.field(originalType.fields().get(originalIndex).fieldId()); + normalizedPositions[originalIndex] = normalizedType.fields().indexOf(normalizedField); + } + return normalizedPositions; + } + + /** + * Convert a partition data written by an old spec, to table's normalized partition type, which is a common partition + * type for all specs of the table. + * @param originalPartition un-normalized partition data + * @param normalizedPartitionType table's normalized partition type {@link Partitioning#partitionType(Table)} + * @param normalizedPositions field positions in the normalized partition type indexed by field position in + * the original partition type + * @return the normalized partition data + */ + private static PartitionData normalizePartition(PartitionData originalPartition, + Types.StructType normalizedPartitionType, + int[] normalizedPositions) { + PartitionData normalizedPartition = new PartitionData(normalizedPartitionType); + for (int originalIndex = 0; originalIndex < originalPartition.size(); originalIndex++) { + normalizedPartition.put(normalizedPositions[originalIndex], originalPartition.get(originalIndex)); + } + return normalizedPartition; + } + @VisibleForTesting static CloseableIterable planFiles(StaticTableScan scan) { Table table = scan.table(); @@ -140,20 +180,13 @@ private class PartitionsScan extends StaticTableScan { } static class PartitionMap { - private final Map partitions = Maps.newHashMap(); - private final Types.StructType type; - private final StructLikeWrapper reused; - - PartitionMap(Types.StructType type) { - this.type = type; - this.reused = StructLikeWrapper.forType(type); - } + private final Map partitions = Maps.newHashMap(); - Partition get(StructLike key) { - Partition partition = partitions.get(reused.set(key)); + Partition get(PartitionData key) { + Partition partition = partitions.get(key); if (partition == null) { partition = new Partition(key); - partitions.put(StructLikeWrapper.forType(type).set(key), partition); + partitions.put(key, partition); } return partition; } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java index a93fef0badfd..156b53ace375 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java @@ -45,6 +45,7 @@ import org.apache.spark.sql.types.StructType; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -57,6 +58,7 @@ import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES; import static org.apache.iceberg.MetadataTableType.ENTRIES; import static org.apache.iceberg.MetadataTableType.FILES; +import static org.apache.iceberg.MetadataTableType.PARTITIONS; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; @@ -378,6 +380,171 @@ public void testEntriesMetadataTable() throws ParseException { tableType); } } + @Test + public void testPartitionsTableAddRemoveFields() throws ParseException { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " + + "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName); + initTable(); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // verify the metadata tables while the current spec is still unpartitioned + Dataset df = loadMetadataTable(PARTITIONS); + Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty()); + + Table table = validationCatalog.loadTable(tableIdent); + + table.updateSpec() + .addField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // verify the metadata tables after adding the first partition column + assertPartitions( + ImmutableList.of(row(new Object[]{null}), row("d1"), row("d2")), + "STRUCT", + PARTITIONS); + + table.updateSpec() + .addField("category") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + // verify the metadata tables after adding the second partition column + assertPartitions(ImmutableList.of( + row(null, null), + row("d1", null), + row("d1", "c1"), + row("d2", null), + row("d2", "c2")), + "STRUCT", + PARTITIONS); + + // verify the metadata tables after removing the first partition column + table.updateSpec() + .removeField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions( + ImmutableList.of( + row(null, null), + row(null, "c1"), + row(null, "c2"), + row("d1", null), + row("d1", "c1"), + row("d2", null), + row("d2", "c2")), + "STRUCT", + PARTITIONS); + } + + @Test + public void testPartitionsTableRenameFields() throws ParseException { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " + + "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName); + initTable(); + + Table table = validationCatalog.loadTable(tableIdent); + + table.updateSpec() + .addField("data") + .addField("category") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions(ImmutableList.of( + row("d1", "c1"), + row("d2", "c2")), + "STRUCT", + PARTITIONS); + + table.updateSpec() + .renameField("category", "category_another_name") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions( + ImmutableList.of( + row("d1", "c1"), + row("d2", "c2")), + "STRUCT", + PARTITIONS); + } + + @Test + public void testPartitionsTableSwitchFields() throws Exception { + // Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292 + // In V1, dropped partition fields show separately when field is re-added + // In V2, re-added field currently conflicts with its deleted form + Assume.assumeTrue(formatVersion == 1); + + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); + initTable(); + Table table = validationCatalog.loadTable(tableIdent); + + // verify the metadata tables after re-adding the first dropped column in the second location + table.updateSpec() + .addField("data") + .addField("category") + .commit(); + sql("REFRESH TABLE %s", tableName); + + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions(ImmutableList.of( + row("d1", "c1"), + row("d2", "c2")), + "STRUCT", + PARTITIONS); + + table.updateSpec() + .removeField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions( + ImmutableList.of( + row(null, "c1"), + row(null, "c2"), + row("d1", "c1"), + row("d2", "c2")), + "STRUCT", + PARTITIONS); + + table.updateSpec() + .addField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + + sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + + assertPartitions( + ImmutableList.of( + row(null, "c1", null), + row(null, "c1", "d1"), + row(null, "c2", null), + row(null, "c2", "d2"), + row("d1", "c1", null), + row("d2", "c2", null)), + "STRUCT", + PARTITIONS); + } @Test public void testMetadataTablesWithUnknownTransforms() { @@ -429,6 +596,7 @@ private void assertPartitions(List expectedPartitions, String expected DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString); switch (tableType) { + case PARTITIONS: case FILES: case ALL_DATA_FILES: DataType actualFilesType = df.schema().apply("partition").dataType(); @@ -447,6 +615,7 @@ private void assertPartitions(List expectedPartitions, String expected } switch (tableType) { + case PARTITIONS: case FILES: case ALL_DATA_FILES: List actualFilesPartitions = df.orderBy("partition")