From 340d1cf72b571643f69276b87c5f892dc98cb7e9 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 28 Oct 2021 20:56:38 +0800 Subject: [PATCH 1/8] fixed read metadata failed --- .../org/apache/iceberg/AllManifestsTable.java | 14 ++-- .../org/apache/iceberg/ManifestsTable.java | 11 ++- .../java/org/apache/iceberg/Partitioning.java | 12 +++ .../iceberg/TestMetadataTableScans.java | 75 +++++++++++++++++++ 4 files changed, 102 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index 9558daa2e7b6..ad4287915d40 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -118,6 +119,7 @@ protected CloseableIterable planFiles( boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); + Map specs = table().specs(); return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> { if (snap.manifestListLocation() != null) { @@ -128,14 +130,14 @@ protected CloseableIterable planFiles( .withRecordCount(1) .withFormat(FileFormat.AVRO) .build(); - return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask( + return new ManifestListReadTask(ops.io(), schema(), specs, new BaseFileScanTask( manifestListAsDataFile, null, schemaString, specString, residuals)); } else { return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(), - manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest) + manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest) ); } })); @@ -145,13 +147,13 @@ MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(), static class ManifestListReadTask implements DataTask { private final FileIO io; private final Schema schema; - private final PartitionSpec spec; + private final Map specs; private final FileScanTask manifestListTask; - ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) { + ManifestListReadTask(FileIO io, Schema schema, Map specs, FileScanTask manifestListTask) { this.io = io; this.schema = schema; - this.spec = spec; + this.specs = specs; this.manifestListTask = manifestListTask; } @@ -173,7 +175,7 @@ public CloseableIterable rows() { .build()) { CloseableIterable rowIterable = CloseableIterable.transform(manifests, - manifest -> ManifestsTable.manifestFileToRow(spec, manifest)); + manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest)); StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema); return CloseableIterable.transform(rowIterable, projection::wrap); diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index a818840c8a7e..ddf15686d18c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.util.List; +import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; @@ -44,15 +45,12 @@ public class ManifestsTable extends BaseMetadataTable { ))) ); - private final PartitionSpec spec; - ManifestsTable(TableOperations ops, Table table) { this(ops, table, table.name() + ".manifests"); } ManifestsTable(TableOperations ops, Table table, String name) { super(ops, table, name); - this.spec = table.spec(); } @Override @@ -73,10 +71,15 @@ MetadataTableType metadataTableType() { protected DataTask task(TableScan scan) { TableOperations ops = operations(); String location = scan.snapshot().manifestListLocation(); + Map specs = table().specs(); + return StaticDataTask.of( ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()), schema(), scan.schema(), scan.snapshot().allManifests(), - manifest -> ManifestsTable.manifestFileToRow(spec, manifest) + manifest -> { + PartitionSpec partitionSpec = specs.get(manifest.partitionSpecId()); + return ManifestsTable.manifestFileToRow(partitionSpec, manifest); + } ); } diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 6233fbfc4356..646270b82e9b 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -211,6 +211,7 @@ public static StructType partitionType(Table table) { Map fieldMap = Maps.newHashMap(); List structFields = Lists.newArrayList(); + Map fieldToIndex = Maps.newHashMap(); // sort the spec IDs in descending order to pick up the most recent field names List specIds = table.specs().keySet().stream() @@ -228,11 +229,22 @@ public static StructType partitionType(Table table) { fieldMap.put(fieldId, field); NestedField structField = spec.partitionType().field(fieldId); structFields.add(structField); + + if (Transforms.alwaysNull().equals(field.transform())) { + fieldToIndex.put(fieldId, structFields.size() - 1); + } } else { // verify the fields are compatible as they may conflict in v1 tables ValidationException.check(equivalentIgnoringNames(field, existingField), "Conflicting partition fields: ['%s', '%s']", field, existingField); + + if (Transforms.alwaysNull().equals(existingField.transform()) && + !Transforms.alwaysNull().equals(field.transform())) { + fieldMap.put(fieldId, field); + NestedField structField = spec.partitionType().field(fieldId); + structFields.set(fieldToIndex.get(fieldId), structField); + } } } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index db0dab49b29d..e95e22704ebd 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -21,14 +21,18 @@ import java.io.File; import java.io.IOException; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -82,6 +86,28 @@ private void preparePartitionedTable() { } } + @Test + public void testManifestsTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + Table manifestsTable = new ManifestsTable(table.ops(), table); + TableScan scan = manifestsTable.newScan() + .filter(Expressions.lessThan("length", 10000L)); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); + for (FileScanTask task : tasks) { + Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual()); + } + } + } + @Test public void testManifestsTableAlwaysIgnoresResiduals() throws IOException { table.newFastAppend() @@ -178,6 +204,29 @@ public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException { validateTaskScanResiduals(scan2, true); } + @Test + public void testAllManifestsTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + Table allManifestsTable = new AllManifestsTable(table.ops(), table); + + TableScan scan = allManifestsTable.newScan() + .filter(Expressions.lessThan("length", 10000L)); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); + for (FileScanTask task : tasks) { + Assert.assertTrue("Rows should not be empty", Iterables.size(task.asDataTask().rows()) > 0); + } + } + } + @Test public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -326,6 +375,32 @@ public void testPartitionsTableScanNotNullFilter() { validateIncludesPartitionScan(tasksUnary, 3); } + @Test + public void testFilesTableScanWithDroppedPartition() throws IOException { + preparePartitionedTable(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + TableScan scan = dataFilesTable.newScan(); + + Schema schema = dataFilesTable.schema(); + Types.StructType actualType = schema.findField(DataFile.PARTITION_ID).type().asStructType(); + Types.StructType expectedType = Types.StructType.of( + Types.NestedField.optional(1000, "data_bucket", Types.IntegerType.get()) + ); + Assert.assertEquals("Partition type must match", expectedType, actualType); + Accessor accessor = schema.accessorForField(1000); + + try (CloseableIterable tasks = scan.planFiles()) { + Set results = StreamSupport.stream(tasks.spliterator(), false) + .flatMap(fileScanTask -> Streams.stream(fileScanTask.asDataTask().rows())) + .map(accessor::get).map(i -> (Integer) i).collect(Collectors.toSet()); + Assert.assertEquals("Partition value must match", Sets.newHashSet(0, 1, 2, 3), results); + } + } + @Test public void testDeleteFilesTableSelection() throws IOException { Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2); From b38e747436b96d9f101e406e275df31960132a19 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 28 Oct 2021 21:28:32 +0800 Subject: [PATCH 2/8] fixes --- core/src/test/java/org/apache/iceberg/TestPartitioning.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java index 2610ad5c01cd..94e1386c34a6 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitioning.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java @@ -140,9 +140,10 @@ public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() { .addField("data") .commit(); - // in v1, we use void transforms instead of dropping partition fields + // in v1, we use void transforms instead of dropping partition fields. + // We restored the void transforms with the original dropped partition fields. StructType expectedType = StructType.of( - NestedField.optional(1000, "data_1000", Types.StringType.get()), + NestedField.optional(1000, "data", Types.StringType.get()), NestedField.optional(1001, "data", Types.StringType.get()) ); StructType actualType = Partitioning.partitionType(table); From 0aa180b7ba07378dc613dcc74ae3ab2177edec4c Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 29 Oct 2021 14:49:10 +0800 Subject: [PATCH 3/8] fixes indent --- .../org/apache/iceberg/TestMetadataTableScans.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index e95e22704ebd..428e0779f201 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -89,9 +89,9 @@ private void preparePartitionedTable() { @Test public void testManifestsTableWithDroppedPartition() throws IOException { table.newFastAppend() - .appendFile(FILE_A) - .appendFile(FILE_B) - .commit(); + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.refresh(); @@ -207,9 +207,9 @@ public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException { @Test public void testAllManifestsTableWithDroppedPartition() throws IOException { table.newFastAppend() - .appendFile(FILE_A) - .appendFile(FILE_B) - .commit(); + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.refresh(); @@ -217,7 +217,7 @@ public void testAllManifestsTableWithDroppedPartition() throws IOException { Table allManifestsTable = new AllManifestsTable(table.ops(), table); TableScan scan = allManifestsTable.newScan() - .filter(Expressions.lessThan("length", 10000L)); + .filter(Expressions.lessThan("length", 10000L)); try (CloseableIterable tasks = scan.planFiles()) { Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); From ac228c05a44754894bfb3f4e7e2144acf3833cc2 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Fri, 25 Feb 2022 17:37:11 +0800 Subject: [PATCH 4/8] address comments --- core/src/main/java/org/apache/iceberg/Partitioning.java | 3 +++ .../test/java/org/apache/iceberg/TestMetadataTableScans.java | 2 +- core/src/test/java/org/apache/iceberg/TestPartitioning.java | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 646270b82e9b..adef15fdf5f0 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -239,6 +239,9 @@ public static StructType partitionType(Table table) { "Conflicting partition fields: ['%s', '%s']", field, existingField); + // Void Transforms always revert back to the Type of the original column being transformed. This will lead + // to type errors when analyzing partition fields written for earlier files with a different type. To avoid + // this we always restore NullTransforms to their previous non-null state. if (Transforms.alwaysNull().equals(existingField.transform()) && !Transforms.alwaysNull().equals(field.transform())) { fieldMap.put(fieldId, field); diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 428e0779f201..585021b82e82 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -98,7 +98,7 @@ public void testManifestsTableWithDroppedPartition() throws IOException { Table manifestsTable = new ManifestsTable(table.ops(), table); TableScan scan = manifestsTable.newScan() - .filter(Expressions.lessThan("length", 10000L)); + .filter(Expressions.lessThan("length", 10000L)); try (CloseableIterable tasks = scan.planFiles()) { Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java index 94e1386c34a6..8dda9551065b 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitioning.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java @@ -141,7 +141,6 @@ public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() { .commit(); // in v1, we use void transforms instead of dropping partition fields. - // We restored the void transforms with the original dropped partition fields. StructType expectedType = StructType.of( NestedField.optional(1000, "data", Types.StringType.get()), NestedField.optional(1001, "data", Types.StringType.get()) From 07688e6906cf72cee7790e3cc548baf0e3196d54 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Wed, 2 Mar 2022 17:41:17 +0800 Subject: [PATCH 5/8] adress comments --- .../java/org/apache/iceberg/Partitioning.java | 7 +- .../iceberg/TestMetadataTableScans.java | 148 ++++++++++++++++-- 2 files changed, 139 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index adef15fdf5f0..003ed7eef2c2 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -211,7 +211,8 @@ public static StructType partitionType(Table table) { Map fieldMap = Maps.newHashMap(); List structFields = Lists.newArrayList(); - Map fieldToIndex = Maps.newHashMap(); + // Mapping from a PartitionField with void transform to the index in structFields. + Map voidTransformFieldToIndex = Maps.newHashMap(); // sort the spec IDs in descending order to pick up the most recent field names List specIds = table.specs().keySet().stream() @@ -231,7 +232,7 @@ public static StructType partitionType(Table table) { structFields.add(structField); if (Transforms.alwaysNull().equals(field.transform())) { - fieldToIndex.put(fieldId, structFields.size() - 1); + voidTransformFieldToIndex.put(fieldId, structFields.size() - 1); } } else { // verify the fields are compatible as they may conflict in v1 tables @@ -246,7 +247,7 @@ public static StructType partitionType(Table table) { !Transforms.alwaysNull().equals(field.transform())) { fieldMap.put(fieldId, field); NestedField structField = spec.partitionType().field(fieldId); - structFields.set(fieldToIndex.get(fieldId), structField); + structFields.set(voidTransformFieldToIndex.get(fieldId), structField); } } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 585021b82e82..4846ad43579d 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -96,15 +96,19 @@ public void testManifestsTableWithDroppedPartition() throws IOException { table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.refresh(); + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + Table manifestsTable = new ManifestsTable(table.ops(), table); - TableScan scan = manifestsTable.newScan() - .filter(Expressions.lessThan("length", 10000L)); + TableScan scan = manifestsTable.newScan(); try (CloseableIterable tasks = scan.planFiles()) { - Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); - for (FileScanTask task : tasks) { - Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual()); - } + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); } } @@ -128,6 +132,32 @@ public void testManifestsTableAlwaysIgnoresResiduals() throws IOException { } } + @Test + public void testDataFilesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + TableScan scan = dataFilesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testDataFilesTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -166,6 +196,32 @@ public void testManifestEntriesTableHonorsIgnoreResiduals() throws IOException { validateTaskScanResiduals(scan2, true); } + @Test + public void testManifestEntriesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table manifestEntriesTable = new ManifestEntriesTable(table.ops(), table); + TableScan scan = manifestEntriesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -185,6 +241,32 @@ public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException { validateTaskScanResiduals(scan2, true); } + @Test + public void testAllDataFilesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table allDataFilesTable = new AllDataFilesTable(table.ops(), table); + TableScan scan = allDataFilesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend() @@ -204,6 +286,32 @@ public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException { validateTaskScanResiduals(scan2, true); } + @Test + public void testAllEntriesTableWithDroppedPartition() throws IOException { + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + + Table allEntriesTable = new AllEntriesTable(table.ops(), table); + TableScan scan = allEntriesTable.newScan(); + + try (CloseableIterable tasks = scan.planFiles()) { + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); + } + } + @Test public void testAllManifestsTableWithDroppedPartition() throws IOException { table.newFastAppend() @@ -214,16 +322,20 @@ public void testAllManifestsTableWithDroppedPartition() throws IOException { table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.refresh(); + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + Table allManifestsTable = new AllManifestsTable(table.ops(), table); - TableScan scan = allManifestsTable.newScan() - .filter(Expressions.lessThan("length", 10000L)); + TableScan scan = allManifestsTable.newScan(); try (CloseableIterable tasks = scan.planFiles()) { - Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); - for (FileScanTask task : tasks) { - Assert.assertTrue("Rows should not be empty", Iterables.size(task.asDataTask().rows()) > 0); - } + Assert.assertEquals("Should have one task", 1, Iterables.size(tasks)); } } @@ -382,13 +494,23 @@ public void testFilesTableScanWithDroppedPartition() throws IOException { table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.refresh(); + table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + table.refresh(); + + table.updateSpec().addField(Expressions.truncate("data", 2)).commit(); + Table dataFilesTable = new DataFilesTable(table.ops(), table); TableScan scan = dataFilesTable.newScan(); Schema schema = dataFilesTable.schema(); Types.StructType actualType = schema.findField(DataFile.PARTITION_ID).type().asStructType(); Types.StructType expectedType = Types.StructType.of( - Types.NestedField.optional(1000, "data_bucket", Types.IntegerType.get()) + Types.NestedField.optional(1000, "data_bucket", Types.IntegerType.get()), + Types.NestedField.optional(1001, "data_bucket_16", Types.IntegerType.get()), + Types.NestedField.optional(1002, "data_trunc_2", Types.StringType.get()) ); Assert.assertEquals("Partition type must match", expectedType, actualType); Accessor accessor = schema.accessorForField(1000); From 4bd2c51202adfc9fff275bd6062a7460e1eaf1b3 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Tue, 8 Mar 2022 19:22:06 +0800 Subject: [PATCH 6/8] address comments --- .../java/org/apache/iceberg/Partitioning.java | 34 +++++++++---------- .../org/apache/iceberg/TestPartitioning.java | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 003ed7eef2c2..b4f3e188463c 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -32,6 +32,7 @@ import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -210,9 +211,8 @@ public static StructType partitionType(Table table) { } Map fieldMap = Maps.newHashMap(); - List structFields = Lists.newArrayList(); - // Mapping from a PartitionField with void transform to the index in structFields. - Map voidTransformFieldToIndex = Maps.newHashMap(); + Map typeMap = Maps.newHashMap(); + Map nameMap = Maps.newHashMap(); // sort the spec IDs in descending order to pick up the most recent field names List specIds = table.specs().keySet().stream() @@ -224,41 +224,41 @@ public static StructType partitionType(Table table) { for (PartitionField field : spec.fields()) { int fieldId = field.fieldId(); + NestedField structField = spec.partitionType().field(fieldId); + PartitionField existingField = fieldMap.get(fieldId); if (existingField == null) { fieldMap.put(fieldId, field); - NestedField structField = spec.partitionType().field(fieldId); - structFields.add(structField); + typeMap.put(fieldId, structField.type()); + nameMap.put(fieldId, structField.name()); - if (Transforms.alwaysNull().equals(field.transform())) { - voidTransformFieldToIndex.put(fieldId, structFields.size() - 1); - } } else { // verify the fields are compatible as they may conflict in v1 tables ValidationException.check(equivalentIgnoringNames(field, existingField), "Conflicting partition fields: ['%s', '%s']", field, existingField); - // Void Transforms always revert back to the Type of the original column being transformed. This will lead - // to type errors when analyzing partition fields written for earlier files with a different type. To avoid - // this we always restore NullTransforms to their previous non-null state. - if (Transforms.alwaysNull().equals(existingField.transform()) && - !Transforms.alwaysNull().equals(field.transform())) { + // use the correct type for dropped partitions in v1 tables + if (isVoidTransform(existingField) && !isVoidTransform(field)) { fieldMap.put(fieldId, field); - NestedField structField = spec.partitionType().field(fieldId); - structFields.set(voidTransformFieldToIndex.get(fieldId), structField); + typeMap.put(fieldId, structField.type()); } } } } - List sortedStructFields = structFields.stream() - .sorted(Comparator.comparingInt(NestedField::fieldId)) + List sortedStructFields = fieldMap.keySet().stream() + .sorted(Comparator.naturalOrder()) + .map(fieldId -> NestedField.optional(fieldId, nameMap.get(fieldId), typeMap.get(fieldId))) .collect(Collectors.toList()); return StructType.of(sortedStructFields); } + private static boolean isVoidTransform(PartitionField field) { + return field.transform().equals(Transforms.alwaysNull()); + } + private static List> collectUnknownTransforms(Table table) { List> unknownTransforms = Lists.newArrayList(); diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java index 8dda9551065b..9885499fe81e 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitioning.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java @@ -142,7 +142,7 @@ public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() { // in v1, we use void transforms instead of dropping partition fields. StructType expectedType = StructType.of( - NestedField.optional(1000, "data", Types.StringType.get()), + NestedField.optional(1000, "data_1000", Types.StringType.get()), NestedField.optional(1001, "data", Types.StringType.get()) ); StructType actualType = Partitioning.partitionType(table); From 574ddbc1a58a505108326c91a3f29e027947c164 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Mon, 14 Mar 2022 09:09:29 +0800 Subject: [PATCH 7/8] address comments --- core/src/main/java/org/apache/iceberg/AllManifestsTable.java | 3 ++- core/src/main/java/org/apache/iceberg/ManifestsTable.java | 3 ++- core/src/main/java/org/apache/iceberg/Partitioning.java | 1 - core/src/test/java/org/apache/iceberg/TestPartitioning.java | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index ad4287915d40..21fb6e6d3040 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -31,6 +31,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructProjection; @@ -119,7 +120,7 @@ protected CloseableIterable planFiles( boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); - Map specs = table().specs(); + Map specs = Maps.newHashMap(table().specs()); return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> { if (snap.manifestListLocation() != null) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index ddf15686d18c..7ea6eff352ac 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; @@ -71,7 +72,7 @@ MetadataTableType metadataTableType() { protected DataTask task(TableScan scan) { TableOperations ops = operations(); String location = scan.snapshot().manifestListLocation(); - Map specs = table().specs(); + Map specs = Maps.newHashMap(table().specs()); return StaticDataTask.of( ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()), diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index b4f3e188463c..2bf0b800db32 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -225,7 +225,6 @@ public static StructType partitionType(Table table) { for (PartitionField field : spec.fields()) { int fieldId = field.fieldId(); NestedField structField = spec.partitionType().field(fieldId); - PartitionField existingField = fieldMap.get(fieldId); if (existingField == null) { diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java index 9885499fe81e..2610ad5c01cd 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitioning.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java @@ -140,7 +140,7 @@ public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() { .addField("data") .commit(); - // in v1, we use void transforms instead of dropping partition fields. + // in v1, we use void transforms instead of dropping partition fields StructType expectedType = StructType.of( NestedField.optional(1000, "data_1000", Types.StringType.get()), NestedField.optional(1001, "data", Types.StringType.get()) From 159a0d5a0e94f384c11a7eff6b2ff366b2d195c4 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Mon, 28 Mar 2022 17:51:31 +0800 Subject: [PATCH 8/8] address comments --- core/src/main/java/org/apache/iceberg/ManifestsTable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index 7ea6eff352ac..330bcfa0d9f1 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -78,8 +78,8 @@ protected DataTask task(TableScan scan) { ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()), schema(), scan.schema(), scan.snapshot().allManifests(), manifest -> { - PartitionSpec partitionSpec = specs.get(manifest.partitionSpecId()); - return ManifestsTable.manifestFileToRow(partitionSpec, manifest); + PartitionSpec spec = specs.get(manifest.partitionSpecId()); + return ManifestsTable.manifestFileToRow(spec, manifest); } ); }