diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index 9558daa2e7b6..21fb6e6d3040 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -30,6 +31,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructProjection; @@ -118,6 +120,7 @@ protected CloseableIterable planFiles( boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); + Map specs = Maps.newHashMap(table().specs()); return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> { if (snap.manifestListLocation() != null) { @@ -128,14 +131,14 @@ protected CloseableIterable planFiles( .withRecordCount(1) .withFormat(FileFormat.AVRO) .build(); - return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask( + return new ManifestListReadTask(ops.io(), schema(), specs, new BaseFileScanTask( manifestListAsDataFile, null, schemaString, specString, residuals)); } else { return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(), - manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest) + manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest) ); } })); @@ -145,13 +148,13 @@ MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(), static class ManifestListReadTask implements DataTask { private final FileIO io; private final Schema schema; - private final PartitionSpec spec; + private final Map specs; private final FileScanTask manifestListTask; - ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) { + ManifestListReadTask(FileIO io, Schema schema, Map specs, FileScanTask manifestListTask) { this.io = io; this.schema = schema; - this.spec = spec; + this.specs = specs; this.manifestListTask = manifestListTask; } @@ -173,7 +176,7 @@ public CloseableIterable rows() { .build()) { CloseableIterable rowIterable = CloseableIterable.transform(manifests, - manifest -> ManifestsTable.manifestFileToRow(spec, manifest)); + manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest)); StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema); return CloseableIterable.transform(rowIterable, projection::wrap); diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index a818840c8a7e..330bcfa0d9f1 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -20,7 +20,9 @@ package org.apache.iceberg; import java.util.List; +import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; @@ -44,15 +46,12 @@ public class ManifestsTable extends BaseMetadataTable { ))) ); - private final PartitionSpec spec; - ManifestsTable(TableOperations ops, Table table) { this(ops, table, table.name() + ".manifests"); } ManifestsTable(TableOperations ops, Table table, String name) { super(ops, table, name); - this.spec = table.spec(); } @Override @@ -73,10 +72,15 @@ MetadataTableType metadataTableType() { protected DataTask task(TableScan scan) { TableOperations ops = operations(); String location = scan.snapshot().manifestListLocation(); + Map specs = Maps.newHashMap(table().specs()); + return StaticDataTask.of( ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()), schema(), scan.schema(), scan.snapshot().allManifests(), - manifest -> ManifestsTable.manifestFileToRow(spec, manifest) + manifest -> { + PartitionSpec spec = specs.get(manifest.partitionSpecId()); + return ManifestsTable.manifestFileToRow(spec, manifest); + } ); } diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 6233fbfc4356..2bf0b800db32 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -32,6 +32,7 @@ import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -210,7 +211,8 @@ public static StructType partitionType(Table table) { } Map fieldMap = Maps.newHashMap(); - List structFields = Lists.newArrayList(); + Map typeMap = Maps.newHashMap(); + Map nameMap = Maps.newHashMap(); // sort the spec IDs in descending order to pick up the most recent field names List specIds = table.specs().keySet().stream() @@ -222,27 +224,40 @@ public static StructType partitionType(Table table) { for (PartitionField field : spec.fields()) { int fieldId = field.fieldId(); + NestedField structField = spec.partitionType().field(fieldId); PartitionField existingField = fieldMap.get(fieldId); if (existingField == null) { fieldMap.put(fieldId, field); - NestedField structField = spec.partitionType().field(fieldId); - structFields.add(structField); + typeMap.put(fieldId, structField.type()); + nameMap.put(fieldId, structField.name()); + } else { // verify the fields are compatible as they may conflict in v1 tables ValidationException.check(equivalentIgnoringNames(field, existingField), "Conflicting partition fields: ['%s', '%s']", field, existingField); + + // use the correct type for dropped partitions in v1 tables + if (isVoidTransform(existingField) && !isVoidTransform(field)) { + fieldMap.put(fieldId, field); + typeMap.put(fieldId, structField.type()); + } } } } - List sortedStructFields = structFields.stream() - .sorted(Comparator.comparingInt(NestedField::fieldId)) + List sortedStructFields = fieldMap.keySet().stream() + .sorted(Comparator.naturalOrder()) + .map(fieldId -> NestedField.optional(fieldId, nameMap.get(fieldId), typeMap.get(fieldId))) .collect(Collectors.toList()); return StructType.of(sortedStructFields); } + private static boolean isVoidTransform(PartitionField field) { + return field.transform().equals(Transforms.alwaysNull()); + } + private static List> collectUnknownTransforms(Table table) { List> unknownTransforms = Lists.newArrayList(); diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index db0dab49b29d..4846ad43579d 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -21,14 +21,18 @@ import java.io.File; import java.io.IOException; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -82,6 +86,32 @@ private void preparePartitionedTable() { } } + @Test + public void testManifestsTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table manifestsTable = new ManifestsTable(table.ops(), table); + TableScan scan = manifestsTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testManifestsTableAlwaysIgnoresResiduals() throws IOException { table.newFastAppend() @@ -102,6 +132,32 @@ public void testManifestsTableAlwaysIgnoresResiduals() throws IOException { } } + @Test + public void testDataFilesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + TableScan scan = dataFilesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testDataFilesTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -140,6 +196,32 @@ public void testManifestEntriesTableHonorsIgnoreResiduals() throws IOException { validateTaskScanResiduals(scan2, true); } + @Test + public void testManifestEntriesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table manifestEntriesTable = new ManifestEntriesTable(table.ops(), table); + TableScan scan = manifestEntriesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -159,6 +241,32 @@ public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException { validateTaskScanResiduals(scan2, true); } + @Test + public void testAllDataFilesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table allDataFilesTable = new AllDataFilesTable(table.ops(), table); + TableScan scan = allDataFilesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -178,6 +286,59 @@ public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException { validateTaskScanResiduals(scan2, true); } + @Test + public void testAllEntriesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table allEntriesTable = new AllEntriesTable(table.ops(), table); + TableScan scan = allEntriesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + + @Test + public void testAllManifestsTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table allManifestsTable = new AllManifestsTable(table.ops(), table); + + TableScan scan = allManifestsTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -326,6 +487,42 @@ public void testPartitionsTableScanNotNullFilter() { validateIncludesPartitionScan(tasksUnary, 3); } + @Test + public void testFilesTableScanWithDroppedPartition() throws IOException { + preparePartitionedTable(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + TableScan scan = dataFilesTable.newScan(); + + Schema schema = dataFilesTable.schema(); + Types.StructType actualType = schema.findField(DataFile.PARTITION_ID).type().asStructType(); + Types.StructType expectedType = Types.StructType.of( + Types.NestedField.optional(1000, "data_bucket", Types.IntegerType.get()), + Types.NestedField.optional(1001, "data_bucket_16", Types.IntegerType.get()), + Types.NestedField.optional(1002, "data_trunc_2", Types.StringType.get()) + ); + Assert.assertEquals("Partition type must match", expectedType, actualType); + Accessor accessor = schema.accessorForField(1000); + + try (CloseableIterable tasks = scan.planFiles()) { + Set results = StreamSupport.stream(tasks.spliterator(), false) + .flatMap(fileScanTask -> Streams.stream(fileScanTask.asDataTask().rows())) + .map(accessor::get).map(i -> (Integer) i).collect(Collectors.toSet()); + Assert.assertEquals("Partition value must match", Sets.newHashSet(0, 1, 2, 3), results); + } + } + @Test public void testDeleteFilesTableSelection() throws IOException { Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);