diff --git a/api/src/main/java/org/apache/iceberg/PartitionField.java b/api/src/main/java/org/apache/iceberg/PartitionField.java index 3964207c893a..4bc5ce5d0075 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionField.java +++ b/api/src/main/java/org/apache/iceberg/PartitionField.java @@ -28,11 +28,13 @@ */ public class PartitionField implements Serializable { private final int sourceId; + private final int fieldId; private final String name; private final Transform transform; - PartitionField(int sourceId, String name, Transform transform) { + PartitionField(int sourceId, int fieldId, String name, Transform transform) { this.sourceId = sourceId; + this.fieldId = fieldId; this.name = name; this.transform = transform; } @@ -44,6 +46,13 @@ public int sourceId() { return sourceId; } + /** + * @return the partition field id across all the table metadata's partition specs + */ + public int fieldId() { + return fieldId; + } + /** * @return the name of this partition field */ @@ -60,7 +69,7 @@ public String name() { @Override public String toString() { - return name + ": " + transform + "(" + sourceId + ")"; + return fieldId + ": " + name + ": " + transform + "(" + sourceId + ")"; } @Override @@ -73,12 +82,13 @@ public boolean equals(Object other) { PartitionField that = (PartitionField) other; return sourceId == that.sourceId && + fieldId == that.fieldId && name.equals(that.name) && transform.equals(that.transform); } @Override public int hashCode() { - return Objects.hashCode(sourceId, name, transform); + return Objects.hashCode(sourceId, fieldId, name, transform); } } diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 9031a7ec7de0..3742f7d73b1b 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.transforms.UnknownTransform; @@ -47,7 +48,7 @@ * represented by a named {@link PartitionField}. */ public class PartitionSpec implements Serializable { - // start assigning IDs for partition fields at 1000 + // IDs for partition fields start at 1000 private static final int PARTITION_DATA_ID_START = 1000; private final Schema schema; @@ -58,14 +59,16 @@ public class PartitionSpec implements Serializable { private transient volatile ListMultimap fieldsBySourceId = null; private transient volatile Class[] lazyJavaClasses = null; private transient volatile List fieldList = null; + private final int lastAssignedFieldId; - private PartitionSpec(Schema schema, int specId, List fields) { + private PartitionSpec(Schema schema, int specId, List fields, int lastAssignedFieldId) { this.schema = schema; this.specId = specId; this.fields = new PartitionField[fields.size()]; for (int i = 0; i < this.fields.length; i += 1) { this.fields[i] = fields.get(i); } + this.lastAssignedFieldId = lastAssignedFieldId; } /** @@ -89,6 +92,10 @@ public List fields() { return lazyFieldList(); } + int lastAssignedFieldId() { + return lastAssignedFieldId; + } + /** * @param fieldId a field id from the source schema * @return the {@link PartitionField field} that partitions the given source field @@ -107,9 +114,8 @@ public StructType partitionType() { PartitionField field = fields[i]; Type sourceType = schema.findType(field.sourceId()); Type resultType = field.transform().getResultType(sourceType); - // assign ids for partition fields starting at PARTITION_DATA_ID_START to leave room for data file's other fields structFields.add( - Types.NestedField.optional(PARTITION_DATA_ID_START + i, field.name(), resultType)); + Types.NestedField.optional(field.fieldId(), field.name(), resultType)); } return Types.StructType.of(structFields); @@ -168,8 +174,8 @@ public String partitionToPath(StructLike data) { } /** - * Returns true if this spec is equivalent to the other, with field names ignored. That is, if - * both specs have the same number of fields, field order, source columns, and transforms. + * Returns true if this spec is equivalent to the other, with field names and partition field ids ignored. + * That is, if both specs have the same number of fields, field order, source columns, and transforms. * * @param other another PartitionSpec * @return true if the specs have the same fields, source columns, and transforms. @@ -275,7 +281,7 @@ public String toString() { } private static final PartitionSpec UNPARTITIONED_SPEC = - new PartitionSpec(new Schema(), 0, ImmutableList.of()); + new PartitionSpec(new Schema(), 0, ImmutableList.of(), PARTITION_DATA_ID_START - 1); /** * Returns a spec for unpartitioned tables. @@ -307,11 +313,16 @@ public static class Builder { private final Set partitionNames = Sets.newHashSet(); private Map timeFields = Maps.newHashMap(); private int specId = 0; + private final AtomicInteger lastAssignedFieldId = new AtomicInteger(PARTITION_DATA_ID_START - 1); private Builder(Schema schema) { this.schema = schema; } + private int nextFieldId() { + return lastAssignedFieldId.incrementAndGet(); + } + private void checkAndAddPartitionName(String name) { checkAndAddPartitionName(name, null); } @@ -357,7 +368,7 @@ Builder identity(String sourceName, String targetName) { Types.NestedField sourceColumn = findSourceColumn(sourceName); checkAndAddPartitionName(targetName, sourceColumn.fieldId()); fields.add(new PartitionField( - sourceColumn.fieldId(), targetName, Transforms.identity(sourceColumn.type()))); + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.identity(sourceColumn.type()))); return this; } @@ -369,7 +380,7 @@ public Builder year(String sourceName, String targetName) { checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( - sourceColumn.fieldId(), targetName, Transforms.year(sourceColumn.type())); + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.year(sourceColumn.type())); checkForRedundantPartitions(field); fields.add(field); return this; @@ -383,7 +394,7 @@ public Builder month(String sourceName, String targetName) { checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( - sourceColumn.fieldId(), targetName, Transforms.month(sourceColumn.type())); + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.month(sourceColumn.type())); checkForRedundantPartitions(field); fields.add(field); return this; @@ -397,7 +408,7 @@ public Builder day(String sourceName, String targetName) { checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( - sourceColumn.fieldId(), targetName, Transforms.day(sourceColumn.type())); + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.day(sourceColumn.type())); checkForRedundantPartitions(field); fields.add(field); return this; @@ -411,7 +422,7 @@ public Builder hour(String sourceName, String targetName) { checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( - sourceColumn.fieldId(), targetName, Transforms.hour(sourceColumn.type())); + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.hour(sourceColumn.type())); checkForRedundantPartitions(field); fields.add(field); return this; @@ -425,7 +436,7 @@ public Builder bucket(String sourceName, int numBuckets, String targetName) { checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); fields.add(new PartitionField( - sourceColumn.fieldId(), targetName, Transforms.bucket(sourceColumn.type(), numBuckets))); + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.bucket(sourceColumn.type(), numBuckets))); return this; } @@ -437,7 +448,7 @@ public Builder truncate(String sourceName, int width, String targetName) { checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); fields.add(new PartitionField( - sourceColumn.fieldId(), targetName, Transforms.truncate(sourceColumn.type(), width))); + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.truncate(sourceColumn.type(), width))); return this; } @@ -445,16 +456,22 @@ public Builder truncate(String sourceName, int width) { return truncate(sourceName, width, sourceName + "_trunc"); } + // add a partition field with an auto-increment partition field id starting from PARTITION_DATA_ID_START Builder add(int sourceId, String name, String transform) { + return add(sourceId, nextFieldId(), name, transform); + } + + Builder add(int sourceId, int fieldId, String name, String transform) { Types.NestedField column = schema.findField(sourceId); checkAndAddPartitionName(name, column.fieldId()); Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId); - fields.add(new PartitionField(sourceId, name, Transforms.fromString(column.type(), transform))); + fields.add(new PartitionField(sourceId, fieldId, name, Transforms.fromString(column.type(), transform))); + lastAssignedFieldId.getAndAccumulate(fieldId, Math::max); return this; } public PartitionSpec build() { - PartitionSpec spec = new PartitionSpec(schema, specId, fields); + PartitionSpec spec = new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get()); checkCompatibility(spec, schema); return spec; } @@ -473,4 +490,13 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) { sourceType, field.transform()); } } + + static boolean hasSequentialIds(PartitionSpec spec) { + for (int i = 0; i < spec.fields.length; i += 1) { + if (spec.fields[i].fieldId() != PARTITION_DATA_ID_START + i) { + return false; + } + } + return true; + } } diff --git a/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java index 48e2d755d61c..b22bc7b6c401 100644 --- a/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java +++ b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java @@ -123,7 +123,6 @@ public void testMultipleDatePartitionsWithDifferentSourceColumns() { PartitionSpec.builderFor(SCHEMA).hour("d").hour("another_d").build(); } - @Test public void testSettingPartitionTransformsWithCustomTargetNames() { Assert.assertEquals(PartitionSpec.builderFor(SCHEMA).year("ts", "custom_year") @@ -205,4 +204,48 @@ public void testMissingSourceColumn() { IllegalArgumentException.class, "Cannot find source column", () -> PartitionSpec.builderFor(SCHEMA).identity("missing").build()); } + + @Test + public void testAutoSettingPartitionFieldIds() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .year("ts", "custom_year") + .bucket("ts", 4, "custom_bucket") + .add(1, "id_partition2", "bucket[4]") + .truncate("s", 1, "custom_truncate") + .build(); + + Assert.assertEquals(1000, spec.fields().get(0).fieldId()); + Assert.assertEquals(1001, spec.fields().get(1).fieldId()); + Assert.assertEquals(1002, spec.fields().get(2).fieldId()); + Assert.assertEquals(1003, spec.fields().get(3).fieldId()); + Assert.assertEquals(1003, spec.lastAssignedFieldId()); + } + + @Test + public void testAddPartitionFieldsWithFieldIds() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .add(1, 1005, "id_partition1", "bucket[4]") + .add(1, 1006, "id_partition2", "bucket[5]") + .add(1, 1002, "id_partition3", "bucket[6]") + .build(); + + Assert.assertEquals(1005, spec.fields().get(0).fieldId()); + Assert.assertEquals(1006, spec.fields().get(1).fieldId()); + Assert.assertEquals(1002, spec.fields().get(2).fieldId()); + Assert.assertEquals(1006, spec.lastAssignedFieldId()); + } + + @Test + public void testAddPartitionFieldsWithAndWithoutFieldIds() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .add(1, "id_partition2", "bucket[5]") + .add(1, 1005, "id_partition1", "bucket[4]") + .truncate("s", 1, "custom_truncate") + .build(); + + Assert.assertEquals(1000, spec.fields().get(0).fieldId()); + Assert.assertEquals(1005, spec.fields().get(1).fieldId()); + Assert.assertEquals(1006, spec.fields().get(2).fieldId()); + Assert.assertEquals(1006, spec.lastAssignedFieldId()); + } } diff --git a/api/src/test/java/org/apache/iceberg/TestTransformSerialization.java b/api/src/test/java/org/apache/iceberg/TestTransformSerialization.java index a0f1eab40b48..cb4616c12449 100644 --- a/api/src/test/java/org/apache/iceberg/TestTransformSerialization.java +++ b/api/src/test/java/org/apache/iceberg/TestTransformSerialization.java @@ -73,6 +73,7 @@ public void testTransforms() throws Exception { PartitionSpec.builderFor(schema).truncate("dec", 10).build(), PartitionSpec.builderFor(schema).truncate("s", 10).build(), PartitionSpec.builderFor(schema).add(6, "dec_unsupported", "unsupported").build(), + PartitionSpec.builderFor(schema).add(6, 1111, "dec_unsupported", "unsupported").build(), }; for (PartitionSpec spec : specs) { diff --git a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java index 1f2c69ca3005..6ae9dfd959c9 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java +++ b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java @@ -39,6 +39,7 @@ private PartitionSpecParser() { private static final String SPEC_ID = "spec-id"; private static final String FIELDS = "fields"; private static final String SOURCE_ID = "source-id"; + private static final String FIELD_ID = "field-id"; private static final String TRANSFORM = "transform"; private static final String NAME = "name"; @@ -101,6 +102,7 @@ static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOE generator.writeStringField(NAME, field.name()); generator.writeStringField(TRANSFORM, field.transform().toString()); generator.writeNumberField(SOURCE_ID, field.sourceId()); + generator.writeNumberField(FIELD_ID, field.fieldId()); generator.writeEndObject(); } generator.writeEndArray(); @@ -138,6 +140,7 @@ private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode "Cannot parse partition spec fields, not an array: %s", json); Iterator elements = json.elements(); + int fieldIdCount = 0; while (elements.hasNext()) { JsonNode element = elements.next(); Preconditions.checkArgument(element.isObject(), @@ -147,7 +150,17 @@ private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode String transform = JsonUtil.getString(TRANSFORM, element); int sourceId = JsonUtil.getInt(SOURCE_ID, element); - builder.add(sourceId, name, transform); + // partition field ids are missing in old PartitionSpec, they always auto-increment from PARTITION_DATA_ID_START + if (element.has(FIELD_ID)) { + builder.add(sourceId, JsonUtil.getInt(FIELD_ID, element), name, transform); + fieldIdCount++; + } else { + builder.add(sourceId, name, transform); + } } + + Preconditions.checkArgument(fieldIdCount == 0 || fieldIdCount == json.size(), + "Cannot parse spec with missing field IDs: %s missing of %s fields.", + json.size() - fieldIdCount, json.size()); } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 4d12cf5ce84c..1d239fde0e38 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -61,6 +61,7 @@ public static TableMetadata newTableMetadata(Schema schema, for (PartitionField field : spec.fields()) { // look up the name of the source field in the old schema to get the new schema's id String sourceName = schema.findColumnName(field.sourceId()); + // reassign all partition fields with fresh partition field Ids to ensure consistency specBuilder.add( freshSchema.findField(sourceName).fieldId(), field.name(), @@ -195,6 +196,7 @@ public String toString() { List snapshots, List snapshotLog, List previousFiles) { + Preconditions.checkArgument(specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, "Unsupported format version: v%s", formatVersion); if (formatVersion > 1) { @@ -352,8 +354,11 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) { currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } + // The caller is responsible to pass a newPartitionSpec with correct partition field IDs public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { PartitionSpec.checkCompatibility(newPartitionSpec, schema); + ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(newPartitionSpec), + "Spec does not use sequential IDs that are required in v1: %s", newPartitionSpec); // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. int newDefaultSpecId = INITIAL_SPEC_ID; @@ -487,8 +492,12 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { currentSnapshotId, snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } + // The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec, - Map updatedProperties) { + Map updatedProperties) { + ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec), + "Spec does not use sequential IDs that are required in v1: %s", updatedPartitionSpec); + AtomicInteger nextLastColumnId = new AtomicInteger(0); Schema freshSchema = TypeUtil.assignFreshIds(updatedSchema, nextLastColumnId::incrementAndGet); @@ -580,7 +589,7 @@ private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec parti // add all of the fields to the builder. IDs should not change. for (PartitionField field : partitionSpec.fields()) { - specBuilder.add(field.sourceId(), field.name(), field.transform().toString()); + specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform().toString()); } return specBuilder.build(); @@ -595,6 +604,7 @@ private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec String sourceName = partitionSpec.schema().findColumnName(field.sourceId()); specBuilder.add( schema.findField(sourceName).fieldId(), + field.fieldId(), field.name(), field.transform().toString()); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index c10ffe63c769..d6d4c0c71b64 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -21,7 +21,9 @@ import com.google.common.collect.Iterables; import java.io.IOException; +import java.util.List; import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; @@ -48,4 +50,45 @@ public void testInvalidUsage() throws IOException { () -> Iterables.getOnlyElement(reader.entries())); } } + + @Test + public void testManifestReaderWithPartitionMetadata() throws IOException { + ManifestFile manifest = writeManifest("manifest.avro", manifestEntry(Status.EXISTING, 123L, FILE_A)); + try (ManifestReader reader = ManifestReader.read(FILE_IO.newInputFile(manifest.path()))) { + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + Assert.assertEquals(123L, (long) entry.snapshotId()); + + List fields = ((PartitionData) entry.file().partition()).getPartitionType().fields(); + Assert.assertEquals(1, fields.size()); + Assert.assertEquals(1000, fields.get(0).fieldId()); + Assert.assertEquals("data_bucket", fields.get(0).name()); + Assert.assertEquals(Types.IntegerType.get(), fields.get(0).type()); + } + } + + @Test + public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(table.schema()) + .bucket("id", 8) + .bucket("data", 16) + .build(); + table.ops().commit(table.ops().current(), table.ops().current().updatePartitionSpec(spec)); + + ManifestFile manifest = writeManifest("manifest.avro", manifestEntry(Status.EXISTING, 123L, FILE_A)); + try (ManifestReader reader = ManifestReader.read(FILE_IO.newInputFile(manifest.path()))) { + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + Assert.assertEquals(123L, (long) entry.snapshotId()); + + List fields = ((PartitionData) entry.file().partition()).getPartitionType().fields(); + Assert.assertEquals(2, fields.size()); + Assert.assertEquals(1000, fields.get(0).fieldId()); + Assert.assertEquals("id_bucket", fields.get(0).name()); + Assert.assertEquals(Types.IntegerType.get(), fields.get(0).type()); + + Assert.assertEquals(1001, fields.get(1).fieldId()); + Assert.assertEquals("data_bucket", fields.get(1).name()); + Assert.assertEquals(Types.IntegerType.get(), fields.get(1).type()); + } + } + } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 21690e43383d..d7b3a48b002a 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; @@ -648,4 +649,98 @@ public void testInvalidAppendManifest() throws IOException { .appendManifest(manifestWithDeletedFiles) .commit()); } + + + @Test + public void testUpdatePartitionSpecFieldIdsForV1Table() { + TableMetadata base = readMetadata(); + + // build the new spec using the table's schema, which uses fresh IDs + PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) + .bucket("id", 16) + .identity("data") + .bucket("data", 4) + .bucket("data", 16, "data_partition") // reuse field id although different target name + .build(); + + // commit the new partition spec to the table manually + table.ops().commit(base, base.updatePartitionSpec(newSpec)); + + List partitionSpecs = table.ops().current().specs(); + PartitionSpec partitionSpec = partitionSpecs.get(0); + Assert.assertEquals(1000, partitionSpec.lastAssignedFieldId()); + + Types.StructType structType = partitionSpec.partitionType(); + List fields = structType.fields(); + Assert.assertEquals(1, fields.size()); + Assert.assertEquals("data_bucket", fields.get(0).name()); + Assert.assertEquals(1000, fields.get(0).fieldId()); + + partitionSpec = partitionSpecs.get(1); + Assert.assertEquals(1003, partitionSpec.lastAssignedFieldId()); + + structType = partitionSpec.partitionType(); + fields = structType.fields(); + Assert.assertEquals(4, fields.size()); + Assert.assertEquals("id_bucket", fields.get(0).name()); + Assert.assertEquals(1000, fields.get(0).fieldId()); + Assert.assertEquals("data", fields.get(1).name()); + Assert.assertEquals(1001, fields.get(1).fieldId()); + Assert.assertEquals("data_bucket", fields.get(2).name()); + Assert.assertEquals(1002, fields.get(2).fieldId()); + Assert.assertEquals("data_partition", fields.get(3).name()); + Assert.assertEquals(1003, fields.get(3).fieldId()); + } + + @Test + public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + TableMetadata base = readMetadata(); + Assert.assertEquals("Should create 1 manifest for initial write", + 1, base.currentSnapshot().manifests().size()); + ManifestFile initialManifest = base.currentSnapshot().manifests().get(0); + + // build the new spec using the table's schema, which uses fresh IDs + PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) + .bucket("id", 8) + .bucket("data", 8) + .build(); + + // commit the new partition spec to the table manually + table.ops().commit(base, base.updatePartitionSpec(newSpec)); + + DataFile newFile = DataFiles.builder(table.spec()) + .copy(FILE_B) + .build(); + + Snapshot pending = table.newAppend() + .appendFile(newFile) + .apply(); + + Assert.assertEquals("Should use 2 manifest files", + 2, pending.manifests().size()); + + // new manifest comes first + validateManifest(pending.manifests().get(0), ids(pending.snapshotId()), files(newFile)); + + Assert.assertEquals("Second manifest should be the initial manifest with the old spec", + initialManifest, pending.manifests().get(1)); + + // field ids of manifest entries in two manifests with different specs of the same source field should be different + ManifestEntry entry = ManifestReader.read(pending.manifests().get(0), FILE_IO).entries().iterator().next(); + Types.NestedField field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0); + Assert.assertEquals(1000, field.fieldId()); + Assert.assertEquals("id_bucket", field.name()); + field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(1); + Assert.assertEquals(1001, field.fieldId()); + Assert.assertEquals("data_bucket", field.name()); + + entry = ManifestReader.read(pending.manifests().get(1), FILE_IO).entries().iterator().next(); + field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0); + Assert.assertEquals(1000, field.fieldId()); + Assert.assertEquals("data_bucket", field.name()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java b/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java index 1f26bd49aa32..4c7d7c4f2730 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java @@ -57,6 +57,7 @@ public void testSpecInfoUnpartitionedTable() { TestTables.TestTable table = TestTables.create(tableDir, "test", schema, spec); Assert.assertEquals(spec, table.spec()); + Assert.assertEquals(spec.lastAssignedFieldId(), table.spec().lastAssignedFieldId()); Assert.assertEquals(ImmutableMap.of(spec.specId(), spec), table.specs()); Assert.assertNull(table.specs().get(Integer.MAX_VALUE)); } @@ -67,12 +68,13 @@ public void testSpecInfoPartitionedTable() { TestTables.TestTable table = TestTables.create(tableDir, "test", schema, spec); Assert.assertEquals(spec, table.spec()); + Assert.assertEquals(spec.lastAssignedFieldId(), table.spec().lastAssignedFieldId()); Assert.assertEquals(ImmutableMap.of(spec.specId(), spec), table.specs()); Assert.assertNull(table.specs().get(Integer.MAX_VALUE)); } @Test - public void testSpecInfoPartitionSpecEvolution() { + public void testSpecInfoPartitionSpecEvolutionForV1Table() { PartitionSpec spec = PartitionSpec.builderFor(schema) .bucket("data", 4) .build(); diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java b/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java new file mode 100644 index 000000000000..dea0d378ad43 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.junit.Assert; +import org.junit.Test; + +public class TestPartitionSpecParser extends TableTestBase { + + @Test + public void testToJsonForV1Table() { + String expected = "{\n" + + " \"spec-id\" : 0,\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"data_bucket\",\n" + + " \"transform\" : \"bucket[16]\",\n" + + " \"source-id\" : 2,\n" + + " \"field-id\" : 1000\n" + + " } ]\n" + + "}"; + Assert.assertEquals(expected, PartitionSpecParser.toJson(table.spec(), true)); + + PartitionSpec spec = PartitionSpec.builderFor(table.schema()) + .bucket("id", 8) + .bucket("data", 16) + .build(); + + table.ops().commit(table.ops().current(), table.ops().current().updatePartitionSpec(spec)); + + expected = "{\n" + + " \"spec-id\" : 1,\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"id_bucket\",\n" + + " \"transform\" : \"bucket[8]\",\n" + + " \"source-id\" : 1,\n" + + " \"field-id\" : 1000\n" + + " }, {\n" + + " \"name\" : \"data_bucket\",\n" + + " \"transform\" : \"bucket[16]\",\n" + + " \"source-id\" : 2,\n" + + " \"field-id\" : 1001\n" + + " } ]\n" + + "}"; + Assert.assertEquals(expected, PartitionSpecParser.toJson(table.spec(), true)); + } + + @Test + public void testFromJsonWithFieldId() { + String specString = "{\n" + + " \"spec-id\" : 1,\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"id_bucket\",\n" + + " \"transform\" : \"bucket[8]\",\n" + + " \"source-id\" : 1,\n" + + " \"field-id\" : 1001\n" + + " }, {\n" + + " \"name\" : \"data_bucket\",\n" + + " \"transform\" : \"bucket[16]\",\n" + + " \"source-id\" : 2,\n" + + " \"field-id\" : 1000\n" + + " } ]\n" + + "}"; + + PartitionSpec spec = PartitionSpecParser.fromJson(table.schema(), specString); + + Assert.assertEquals(2, spec.fields().size()); + // should be the field ids in the JSON + Assert.assertEquals(1001, spec.fields().get(0).fieldId()); + Assert.assertEquals(1000, spec.fields().get(1).fieldId()); + } + + @Test + public void testFromJsonWithoutFieldId() { + String specString = "{\n" + + " \"spec-id\" : 1,\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"id_bucket\",\n" + + " \"transform\" : \"bucket[8]\",\n" + + " \"source-id\" : 1\n" + + " }, {\n" + + " \"name\" : \"data_bucket\",\n" + + " \"transform\" : \"bucket[16]\",\n" + + " \"source-id\" : 2\n" + + " } ]\n" + + "}"; + + PartitionSpec spec = PartitionSpecParser.fromJson(table.schema(), specString); + + Assert.assertEquals(2, spec.fields().size()); + // should be the default assignment + Assert.assertEquals(1000, spec.fields().get(0).fieldId()); + Assert.assertEquals(1001, spec.fields().get(1).fieldId()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index a77530bba8ed..41cec5d272b2 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -38,6 +38,7 @@ import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.TableMetadata.SnapshotLogEntry; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; import org.junit.Assert; @@ -526,4 +527,45 @@ public static String toJsonWithVersion(TableMetadata metadata, int version) { } return writer.toString(); } + + @Test + public void testNewTableMetadataReassignmentAllIds() throws Exception { + Schema schema = new Schema( + Types.NestedField.required(3, "x", Types.LongType.get()), + Types.NestedField.required(4, "y", Types.LongType.get()), + Types.NestedField.required(5, "z", Types.LongType.get()) + ); + + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5) + .add(3, 1005, "x_partition", "bucket[4]") + .add(5, 1005, "z_partition", "bucket[8]") + .build(); + String location = "file://tmp/db/table"; + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of()); + + // newTableMetadata should reassign column ids and partition field ids. + PartitionSpec expected = PartitionSpec.builderFor(metadata.schema()).withSpecId(0) + .add(1, 1000, "x_partition", "bucket[4]") + .add(3, 1001, "z_partition", "bucket[8]") + .build(); + + Assert.assertEquals(expected, metadata.spec()); + } + + @Test + public void testInvalidUpdatePartitionSpecForV1Table() throws Exception { + Schema schema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()) + ); + + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5) + .add(1, 1005, "x_partition", "bucket[4]") + .build(); + String location = "file://tmp/db/table"; + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of()); + + AssertHelpers.assertThrows("Should fail to update an invalid partition spec", + ValidationException.class, "Spec does not use sequential IDs that are required in v1", + () -> metadata.updatePartitionSpec(spec)); + } }