Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
this.schema = spec.schema();
this.nameToField = indexSpecByName(spec);
this.transformToField = indexSpecByTransform(spec);
this.lastAssignedPartitionId =
base.specs().stream().mapToInt(PartitionSpec::lastAssignedFieldId).max().orElse(999);
this.lastAssignedPartitionId = base.lastAssignedPartitionId();

spec.fields().stream()
.filter(field -> field.transform() instanceof UnknownTransform)
Expand Down
53 changes: 31 additions & 22 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ static TableMetadata newTableMetadata(Schema schema,
return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
freshSortOrderId, ImmutableList.of(freshSortOrder),
freshSpec.lastAssignedFieldId(), freshSortOrderId, ImmutableList.of(freshSortOrder),
ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
ImmutableList.of(), ImmutableList.of());
}
Expand Down Expand Up @@ -218,6 +218,7 @@ public String toString() {
private final Schema schema;
private final int defaultSpecId;
private final List<PartitionSpec> specs;
private final int lastAssignedPartitionId;
private final int defaultSortOrderId;
private final List<SortOrder> sortOrders;
private final Map<String, String> properties;
Expand All @@ -240,6 +241,7 @@ public String toString() {
Schema schema,
int defaultSpecId,
List<PartitionSpec> specs,
int lastAssignedPartitionId,
int defaultSortOrderId,
List<SortOrder> sortOrders,
Map<String, String> properties,
Expand Down Expand Up @@ -267,6 +269,7 @@ public String toString() {
this.schema = schema;
this.specs = specs;
this.defaultSpecId = defaultSpecId;
this.lastAssignedPartitionId = lastAssignedPartitionId;
this.defaultSortOrderId = defaultSortOrderId;
this.sortOrders = sortOrders;
this.properties = properties;
Expand Down Expand Up @@ -371,6 +374,10 @@ public Map<Integer, PartitionSpec> specsById() {
return specsById;
}

int lastAssignedPartitionId() {
return lastAssignedPartitionId;
}

public int defaultSpecId() {
return defaultSpecId;
}
Expand Down Expand Up @@ -440,7 +447,7 @@ public TableMetadata withUUID() {
return this;
} else {
return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
lastSequenceNumber, lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs,
lastSequenceNumber, lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, lastAssignedPartitionId,
defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
addPreviousFile(file, lastUpdatedMillis));
}
Expand All @@ -454,8 +461,8 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
List<SortOrder> updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order));
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs,
defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
addPreviousFile(file, lastUpdatedMillis));
lastAssignedPartitionId, defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId,
snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

// The caller is responsible to pass a newPartitionSpec with correct partition field IDs
Expand Down Expand Up @@ -489,7 +496,8 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {

return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
builder.build(), defaultSortOrderId, sortOrders, properties,
builder.build(), Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId()),
defaultSortOrderId, sortOrders, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

Expand Down Expand Up @@ -526,7 +534,7 @@ public TableMetadata replaceSortOrder(SortOrder newOrder) {

return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs,
newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog,
lastAssignedPartitionId, newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog,
addPreviousFile(file, lastUpdatedMillis));
}

Expand All @@ -542,8 +550,8 @@ public TableMetadata addStagedSnapshot(Snapshot snapshot) {

return new TableMetadata(null, formatVersion, uuid, location,
snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog,
addPreviousFile(file, lastUpdatedMillis));
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
Expand All @@ -567,8 +575,8 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {

return new TableMetadata(null, formatVersion, uuid, location,
snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog,
addPreviousFile(file, lastUpdatedMillis));
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(),
newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
Expand Down Expand Up @@ -599,7 +607,7 @@ public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {

return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered,
ImmutableList.copyOf(newSnapshotLog), addPreviousFile(file, lastUpdatedMillis));
}

Expand All @@ -622,17 +630,17 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
.build();

return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, nowMillis, lastColumnId, schema, defaultSpecId, specs, defaultSortOrderId,
sortOrders, properties, snapshot.snapshotId(), snapshots, newSnapshotLog,
lastSequenceNumber, nowMillis, lastColumnId, schema, defaultSpecId, specs, lastAssignedPartitionId,
defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), snapshots, newSnapshotLog,
addPreviousFile(file, lastUpdatedMillis));
}

public TableMetadata replaceProperties(Map<String, String> newProperties) {
ValidationException.check(newProperties != null, "Cannot set properties to null");
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots, snapshotLog,
addPreviousFile(file, lastUpdatedMillis, newProperties));
lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots,
snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties));
}

public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
Expand All @@ -650,8 +658,8 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {

return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, newSnapshotLog,
addPreviousFile(file, lastUpdatedMillis));
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

// The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs
Expand Down Expand Up @@ -708,15 +716,16 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update

return new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchema,
specId, specListBuilder.build(), orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties),
specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, freshSpec.lastAssignedFieldId()),
orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
}

public TableMetadata updateLocation(String newLocation) {
return new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
addPreviousFile(file, lastUpdatedMillis));
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
Expand All @@ -732,8 +741,8 @@ public TableMetadata upgradeToFormatVersion(int newFormatVersion) {

return new TableMetadata(null, newFormatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
addPreviousFile(file, lastUpdatedMillis));
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

private List<MetadataLogEntry> addPreviousFile(InputFile previousFile, long timestampMillis) {
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/org/apache/iceberg/TableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private TableMetadataParser() {
static final String PARTITION_SPEC = "partition-spec";
static final String PARTITION_SPECS = "partition-specs";
static final String DEFAULT_SPEC_ID = "default-spec-id";
static final String LAST_ASSIGNED_PARTITION_ID = "last-assigned-partition-id";
static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id";
static final String SORT_ORDERS = "sort-orders";
static final String PROPERTIES = "properties";
Expand Down Expand Up @@ -178,6 +179,8 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro
}
generator.writeEndArray();

generator.writeNumberField(LAST_ASSIGNED_PARTITION_ID, metadata.lastAssignedPartitionId());

generator.writeNumberField(DEFAULT_SORT_ORDER_ID, metadata.defaultSortOrderId());
generator.writeArrayFieldStart(SORT_ORDERS);
for (SortOrder sortOrder : metadata.sortOrders()) {
Expand Down Expand Up @@ -288,6 +291,13 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC)));
}

Integer lastAssignedPartitionId = JsonUtil.getIntOrNull(LAST_ASSIGNED_PARTITION_ID, node);
if (lastAssignedPartitionId == null) {
Preconditions.checkArgument(formatVersion == 1,
"%s must exist in format v%s", LAST_ASSIGNED_PARTITION_ID, formatVersion);
lastAssignedPartitionId = specs.stream().mapToInt(PartitionSpec::lastAssignedFieldId).max().orElse(999);
}

JsonNode sortOrderArray = node.get(SORT_ORDERS);
List<SortOrder> sortOrders;
int defaultSortOrderId;
Expand Down Expand Up @@ -342,7 +352,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {

return new TableMetadata(file, formatVersion, uuid, location,
lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, currentVersionId, snapshots, entries.build(),
metadataEntries.build());
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentVersionId,
snapshots, entries.build(), metadataEntries.build());
}
}
31 changes: 23 additions & 8 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testJsonConversion() throws Exception {
.build();

TableMetadata expected = new TableMetadata(null, 2, UUID.randomUUID().toString(), TEST_LOCATION,
SEQ_NO, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
SEQ_NO, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of());

Expand All @@ -127,6 +127,8 @@ public void testJsonConversion() throws Exception {
expected.defaultSpecId(), metadata.defaultSpecId());
Assert.assertEquals("PartitionSpec map should match",
expected.specs(), metadata.specs());
Assert.assertEquals("lastAssignedFieldId across all PartitionSpecs should match",
expected.spec().lastAssignedFieldId(), metadata.lastAssignedPartitionId());
Assert.assertEquals("Properties should match",
expected.properties(), metadata.properties());
Assert.assertEquals("Snapshot logs should match",
Expand Down Expand Up @@ -159,7 +161,7 @@ public void testBackwardCompat() throws Exception {
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));

TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION,
0, System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec),
0, System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec), spec.lastAssignedFieldId(),
TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(sortOrder), ImmutableMap.of("property", "value"),
currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of());

Expand Down Expand Up @@ -188,6 +190,8 @@ public void testBackwardCompat() throws Exception {
metadata.specs().get(0).compatibleWith(spec));
Assert.assertEquals("PartitionSpec should have ID TableMetadata.INITIAL_SPEC_ID",
TableMetadata.INITIAL_SPEC_ID, metadata.specs().get(0).specId());
Assert.assertEquals("lastAssignedFieldId across all PartitionSpecs should match",
expected.spec().lastAssignedFieldId(), metadata.lastAssignedPartitionId());
Assert.assertEquals("Properties should match",
expected.properties(), metadata.properties());
Assert.assertEquals("Snapshot logs should match",
Expand Down Expand Up @@ -269,7 +273,7 @@ public void testJsonWithPreviousMetadataLog() throws Exception {
"/tmp/000001-" + UUID.randomUUID().toString() + ".metadata.json"));

TableMetadata base = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
Expand Down Expand Up @@ -305,6 +309,7 @@ public void testAddPreviousMetadataRemoveNone() {

TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
TEST_LOCATION, 0, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
SPEC_5.lastAssignedFieldId(),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
Expand Down Expand Up @@ -350,7 +355,7 @@ public void testAddPreviousMetadataRemoveOne() {

TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 5,
ImmutableList.of(SPEC_5), 3, ImmutableList.of(SORT_ORDER_3),
ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
Expand Down Expand Up @@ -401,8 +406,8 @@ public void testAddPreviousMetadataRemoveMultiple() {

TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 2,
ImmutableList.of(SPEC_5), TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(SortOrder.unsorted()),
ImmutableMap.of("property", "value"), currentSnapshotId,
ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), TableMetadata.INITIAL_SORT_ORDER_ID,
ImmutableList.of(SortOrder.unsorted()), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));

Expand All @@ -428,7 +433,7 @@ public void testV2UUIDValidation() {
IllegalArgumentException.class, "UUID is required in format v2",
() -> new TableMetadata(null, 2, null, TEST_LOCATION, SEQ_NO, System.currentTimeMillis(),
LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
);
}
Expand All @@ -440,7 +445,7 @@ public void testVersionValidation() {
IllegalArgumentException.class, "Unsupported format version: v" + unsupportedVersion,
() -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, SEQ_NO,
System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5),
3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L,
ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
);
}
Expand Down Expand Up @@ -476,6 +481,16 @@ public void testParserV2PartitionSpecsValidation() throws Exception {
);
}

@Test
public void testParserV2LastAssignedFieldIdValidation() throws Exception {
String unsupportedVersion = readTableMetadataInputFile("TableMetadataV2MissingLastAssignedPartitionId.json");
AssertHelpers.assertThrows("Should reject v2 metadata without last assigned partition id",
IllegalArgumentException.class, "last-assigned-partition-id must exist in format v2",
() -> TableMetadataParser.fromJson(
ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class))
);
}

@Test
public void testParserV2SortOrderValidation() throws Exception {
String unsupportedVersion = readTableMetadataInputFile("TableMetadataV2MissingSortOrder.json");
Expand Down
Loading