diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 51bc851f8879..6ad90fde9238 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -499,10 +499,14 @@ Builder add(int sourceId, int fieldId, String name, Transform transform) { } public PartitionSpec build() { - PartitionSpec spec = new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get()); + PartitionSpec spec = buildUnchecked(); checkCompatibility(spec, schema); return spec; } + + PartitionSpec buildUnchecked() { + return new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get()); + } } static void checkCompatibility(PartitionSpec spec, Schema schema) { diff --git a/api/src/main/java/org/apache/iceberg/SortOrder.java b/api/src/main/java/org/apache/iceberg/SortOrder.java index a217e39e28d2..595af6d9f4bb 100644 --- a/api/src/main/java/org/apache/iceberg/SortOrder.java +++ b/api/src/main/java/org/apache/iceberg/SortOrder.java @@ -263,7 +263,18 @@ Builder addSortField(String transformAsString, int sourceId, SortDirection direc return this; } + Builder addSortField(Transform transform, int sourceId, SortDirection direction, NullOrder nullOrder) { + fields.add(new SortField(transform, sourceId, direction, nullOrder)); + return this; + } + public SortOrder build() { + SortOrder sortOrder = buildUnchecked(); + checkCompatibility(sortOrder, schema); + return sortOrder; + } + + SortOrder buildUnchecked() { if (fields.isEmpty()) { if (orderId != null && orderId != 0) { throw new IllegalArgumentException("Unsorted order ID must be 0"); @@ -277,9 +288,7 @@ public SortOrder build() { // default ID to 1 as 0 is reserved for unsorted order int actualOrderId = orderId != null ? orderId : 1; - SortOrder sortOrder = new SortOrder(schema, actualOrderId, fields); - checkCompatibility(sortOrder, schema); - return sortOrder; + return new SortOrder(schema, actualOrderId, fields); } private Transform toTransform(BoundTerm term) { diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 3d0b31e49901..7e779bc29457 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -240,13 +240,10 @@ public void commitTransaction() { } private void commitCreateTransaction() { - // fix up the snapshot log, which should not contain intermediate snapshots - TableMetadata createMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds); - // this operation creates the table. if the commit fails, this cannot retry because another // process has created the same table. try { - ops.commit(null, createMetadata); + ops.commit(null, current); } catch (RuntimeException e) { // the commit failed and no files were committed. clean up each update. @@ -271,9 +268,7 @@ private void commitCreateTransaction() { } private void commitReplaceTransaction(boolean orCreate) { - // fix up the snapshot log, which should not contain intermediate snapshots - TableMetadata replaceMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds); - Map props = base != null ? base.properties() : replaceMetadata.properties(); + Map props = base != null ? base.properties() : current.properties(); try { Tasks.foreach(ops) @@ -300,7 +295,7 @@ private void commitReplaceTransaction(boolean orCreate) { this.base = underlyingOps.current(); // just refreshed } - underlyingOps.commit(base, replaceMetadata); + underlyingOps.commit(base, current); }); } catch (RuntimeException e) { @@ -358,7 +353,7 @@ private void commitSimpleTransaction() { } // fix up the snapshot log, which should not contain intermediate snapshots - underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds)); + underlyingOps.commit(base, current); }); } catch (RuntimeException e) { diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java new file mode 100644 index 000000000000..25d015a48a0d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.Map; +import java.util.Set; + +/** + * Represents a change to table metadata. + */ +public interface MetadataUpdate extends Serializable { + class AssignUUID implements MetadataUpdate { + private final String uuid; + + public AssignUUID(String uuid) { + this.uuid = uuid; + } + + public String uuid() { + return uuid; + } + } + + class UpgradeFormatVersion implements MetadataUpdate { + private final int formatVersion; + + public UpgradeFormatVersion(int formatVersion) { + this.formatVersion = formatVersion; + } + + public int formatVersion() { + return formatVersion; + } + } + + class AddSchema implements MetadataUpdate { + private final Schema schema; + private final int lastColumnId; + + public AddSchema(Schema schema, int lastColumnId) { + this.schema = schema; + this.lastColumnId = lastColumnId; + } + + public Schema schema() { + return schema; + } + + public int lastColumnId() { + return lastColumnId; + } + } + + class SetCurrentSchema implements MetadataUpdate { + private final int schemaId; + + public SetCurrentSchema(int schemaId) { + this.schemaId = schemaId; + } + + public int schemaId() { + return schemaId; + } + } + + class AddPartitionSpec implements MetadataUpdate { + private final PartitionSpec spec; + + public AddPartitionSpec(PartitionSpec spec) { + this.spec = spec; + } + + public PartitionSpec spec() { + return spec; + } + } + + class SetDefaultPartitionSpec implements MetadataUpdate { + private final int specId; + + public SetDefaultPartitionSpec(int schemaId) { + this.specId = schemaId; + } + + public int specId() { + return specId; + } + } + + class AddSortOrder implements MetadataUpdate { + private final SortOrder sortOrder; + + public AddSortOrder(SortOrder sortOrder) { + this.sortOrder = sortOrder; + } + + public SortOrder spec() { + return sortOrder; + } + } + + class SetDefaultSortOrder implements MetadataUpdate { + private final int sortOrderId; + + public SetDefaultSortOrder(int sortOrderId) { + this.sortOrderId = sortOrderId; + } + + public int sortOrderId() { + return sortOrderId; + } + } + + class AddSnapshot implements MetadataUpdate { + private final Snapshot snapshot; + + public AddSnapshot(Snapshot snapshot) { + this.snapshot = snapshot; + } + + public Snapshot snapshot() { + return snapshot; + } + } + + class RemoveSnapshot implements MetadataUpdate { + private final long snapshotId; + + public RemoveSnapshot(long snapshotId) { + this.snapshotId = snapshotId; + } + + public long snapshotId() { + return snapshotId; + } + } + + class SetCurrentSnapshot implements MetadataUpdate { + private final Long snapshotId; + + public SetCurrentSnapshot(Long snapshotId) { + this.snapshotId = snapshotId; + } + + public Long snapshotId() { + return snapshotId; + } + } + + class SetProperties implements MetadataUpdate { + private final Map updated; + + public SetProperties(Map updated) { + this.updated = updated; + } + + public Map updated() { + return updated; + } + } + + class RemoveProperties implements MetadataUpdate { + private final Set removed; + + public RemoveProperties(Set removed) { + this.removed = removed; + } + + public Set removed() { + return removed; + } + } + + class SetLocation implements MetadataUpdate { + private final String location; + + public SetLocation(String location) { + this.location = location; + } + + public String location() { + return location; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 59c71d7b55a8..164e29505c84 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -22,8 +22,6 @@ import java.io.Serializable; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.OptionalInt; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -126,7 +124,7 @@ static TableMetadata newTableMetadata(Schema schema, freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(), freshSortOrderId, ImmutableList.of(freshSortOrder), ImmutableMap.copyOf(properties), -1, ImmutableList.of(), - ImmutableList.of(), ImmutableList.of()); + ImmutableList.of(), ImmutableList.of(), ImmutableList.of()); } public static class SnapshotLogEntry implements HistoryEntry { @@ -240,6 +238,7 @@ public String toString() { private final Map sortOrdersById; private final List snapshotLog; private final List previousFiles; + private final List changes; @SuppressWarnings("checkstyle:CyclomaticComplexity") TableMetadata(String metadataFileLocation, @@ -260,7 +259,8 @@ public String toString() { long currentSnapshotId, List snapshots, List snapshotLog, - List previousFiles) { + List previousFiles, + List changes) { Preconditions.checkArgument(specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); Preconditions.checkArgument(sortOrders != null && !sortOrders.isEmpty(), "Sort orders cannot be null or empty"); Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, @@ -269,6 +269,8 @@ public String toString() { "UUID is required in format v%s", formatVersion); Preconditions.checkArgument(formatVersion > 1 || lastSequenceNumber == 0, "Sequence number must be 0 in v1: %s", lastSequenceNumber); + Preconditions.checkArgument(metadataFileLocation == null || changes.isEmpty(), + "Cannot create TableMetadata with a metadata location and changes"); this.metadataFileLocation = metadataFileLocation; this.formatVersion = formatVersion; @@ -290,6 +292,9 @@ public String toString() { this.snapshotLog = snapshotLog; this.previousFiles = previousFiles; + // changes are carried through until metadata is read from a file + this.changes = changes; + this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber); this.schemasById = indexSchemas(); this.specsById = indexSpecs(specs); @@ -467,308 +472,61 @@ public List previousFiles() { return previousFiles; } + public List changes() { + return changes; + } + public TableMetadata withUUID() { - if (uuid != null) { - return this; - } else { - return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location, - lastSequenceNumber, lastUpdatedMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, - currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); - } + return new Builder(this).assignUUID().build(); } public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) { - PartitionSpec.checkCompatibility(spec(), newSchema); - SortOrder.checkCompatibility(sortOrder(), newSchema); - // rebuild all of the partition specs and sort orders for the new current schema - List updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec)); - List updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order)); - - int newSchemaId = reuseOrCreateNewSchemaId(newSchema); - if (currentSchemaId == newSchemaId && newLastColumnId == lastColumnId) { - // the new spec and last column Id is already current and no change is needed - return this; - } - - ImmutableList.Builder builder = ImmutableList.builder().addAll(schemas); - if (!schemasById.containsKey(newSchemaId)) { - builder.add(new Schema(newSchemaId, newSchema.columns(), newSchema.identifierFieldIds())); - } - - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), newLastColumnId, - newSchemaId, builder.build(), defaultSpecId, updatedSpecs, lastAssignedPartitionId, - defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + return new Builder(this).setCurrentSchema(newSchema, newLastColumnId).build(); } // The caller is responsible to pass a newPartitionSpec with correct partition field IDs public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { - Schema schema = schema(); - - PartitionSpec.checkCompatibility(newPartitionSpec, schema); - ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(newPartitionSpec), - "Spec does not use sequential IDs that are required in v1: %s", newPartitionSpec); - - // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. - int newDefaultSpecId = INITIAL_SPEC_ID; - for (PartitionSpec spec : specs) { - if (newPartitionSpec.compatibleWith(spec)) { - newDefaultSpecId = spec.specId(); - break; - } else if (newDefaultSpecId <= spec.specId()) { - newDefaultSpecId = spec.specId() + 1; - } - } - - if (defaultSpecId == newDefaultSpecId) { - // the new spec is already current and no change is needed - return this; - } - - ImmutableList.Builder builder = ImmutableList.builder() - .addAll(specs); - if (!specsById.containsKey(newDefaultSpecId)) { - // get a fresh spec to ensure the spec ID is set to the new default - builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec)); - } - - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, newDefaultSpecId, - builder.build(), Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId()), - defaultSortOrderId, sortOrders, properties, - currentSnapshotId, snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + return new Builder(this).setDefaultPartitionSpec(newPartitionSpec).build(); } public TableMetadata replaceSortOrder(SortOrder newOrder) { - Schema schema = schema(); - SortOrder.checkCompatibility(newOrder, schema); - - // determine the next order id - int newOrderId = INITIAL_SORT_ORDER_ID; - for (SortOrder order : sortOrders) { - if (order.sameOrder(newOrder)) { - newOrderId = order.orderId(); - break; - } else if (newOrderId <= order.orderId()) { - newOrderId = order.orderId() + 1; - } - } - - if (newOrderId == defaultSortOrderId) { - return this; - } - - ImmutableList.Builder builder = ImmutableList.builder(); - builder.addAll(sortOrders); - - if (!sortOrdersById.containsKey(newOrderId)) { - if (newOrder.isUnsorted()) { - newOrderId = SortOrder.unsorted().orderId(); - builder.add(SortOrder.unsorted()); - } else { - // rebuild the sort order using new column ids - builder.add(freshSortOrder(newOrderId, schema, newOrder)); - } - } - - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + return new Builder(this).setDefaultSortOrder(newOrder).build(); } public TableMetadata addStagedSnapshot(Snapshot snapshot) { - ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber, - "Cannot add snapshot with sequence number %s older than last sequence number %s", - snapshot.sequenceNumber(), lastSequenceNumber); - - List newSnapshots = ImmutableList.builder() - .addAll(snapshots) - .add(snapshot) - .build(); - - return new TableMetadata(null, formatVersion, uuid, location, - snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, - currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId, - defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog, - addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + return new Builder(this).addSnapshot(snapshot).build(); } public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { - // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current - if (snapshotsById.containsKey(snapshot.snapshotId())) { - return setCurrentSnapshotTo(snapshot); - } - - ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber, - "Cannot add snapshot with sequence number %s older than last sequence number %s", - snapshot.sequenceNumber(), lastSequenceNumber); - - List newSnapshots = ImmutableList.builder() - .addAll(snapshots) - .add(snapshot) - .build(); - List newSnapshotLog = ImmutableList.builder() - .addAll(snapshotLog) - .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())) - .build(); - - return new TableMetadata(null, formatVersion, uuid, location, - snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, - currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId, - defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, - addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + return new Builder(this).setCurrentSnapshot(snapshot).build(); } public TableMetadata removeSnapshotsIf(Predicate removeIf) { - List filtered = Lists.newArrayListWithExpectedSize(snapshots.size()); - for (Snapshot snapshot : snapshots) { - // keep the current snapshot and any snapshots that do not match the removeIf condition - if (snapshot.snapshotId() == currentSnapshotId || !removeIf.test(snapshot)) { - filtered.add(snapshot); - } - } - - // update the snapshot log - Set validIds = Sets.newHashSet(Iterables.transform(filtered, Snapshot::snapshotId)); - List newSnapshotLog = Lists.newArrayList(); - for (HistoryEntry logEntry : snapshotLog) { - if (validIds.contains(logEntry.snapshotId())) { - // copy the log entries that are still valid - newSnapshotLog.add(logEntry); - } else { - // any invalid entry causes the history before it to be removed. otherwise, there could be - // history gaps that cause time-travel queries to produce incorrect results. for example, - // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be - // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2 - // and t3 when in fact s2 was the current snapshot. - newSnapshotLog.clear(); - } - } - - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered, - ImmutableList.copyOf(newSnapshotLog), addPreviousFile(metadataFileLocation, lastUpdatedMillis)); - } - - private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { - ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()), - "Cannot set current snapshot to unknown: %s", snapshot.snapshotId()); - ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber, - "Last sequence number %s is less than existing snapshot sequence number %s", - lastSequenceNumber, snapshot.sequenceNumber()); - - if (currentSnapshotId == snapshot.snapshotId()) { - // change is a noop - return this; - } - - long nowMillis = System.currentTimeMillis(); - List newSnapshotLog = ImmutableList.builder() - .addAll(snapshotLog) - .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId())) - .build(); - - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, nowMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), snapshots, - newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + List toRemove = snapshots.stream().filter(removeIf).collect(Collectors.toList()); + return new Builder(this).removeSnapshots(toRemove).build(); } public TableMetadata replaceProperties(Map rawProperties) { ValidationException.check(rawProperties != null, "Cannot set properties to null"); Map newProperties = unreservedProperties(rawProperties); - TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots, - snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis, newProperties)); - int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); - if (formatVersion != newFormatVersion) { - metadata = metadata.upgradeToFormatVersion(newFormatVersion); - } - - return metadata; - } - - public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { - List newSnapshotLog = Lists.newArrayList(); - for (HistoryEntry logEntry : snapshotLog) { - if (!snapshotIds.contains(logEntry.snapshotId())) { - // copy the log entries that are still valid - newSnapshotLog.add(logEntry); + Set removed = Sets.newHashSet(properties.keySet()); + Map updated = Maps.newHashMap(); + for (Map.Entry entry : newProperties.entrySet()) { + removed.remove(entry.getKey()); + String current = properties.get(entry.getKey()); + if (current == null || !current.equals(entry.getValue())) { + updated.put(entry.getKey(), entry.getValue()); } } - ValidationException.check(currentSnapshotId < 0 || // not set - Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId, - "Cannot set invalid snapshot log: latest entry is not the current snapshot"); - - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, - snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); - } - - /** - * Returns an updated {@link TableMetadata} with the current-snapshot-ID set to the given - * snapshot-ID and the snapshot-log reset to contain only the snapshot with the given snapshot-ID. - * - * @param snapshotId ID of a snapshot that must exist, or {@code -1L} to remove the current snapshot - * and return an empty snapshot log. - * @return {@link TableMetadata} with updated {@link #currentSnapshotId} and {@link #snapshotLog} - */ - public TableMetadata withCurrentSnapshotOnly(long snapshotId) { - if ((currentSnapshotId == -1L && snapshotId == -1L && snapshots.isEmpty()) || - (currentSnapshotId == snapshotId && snapshots.size() == 1)) { - return this; - } - List newSnapshotLog = Lists.newArrayList(); - if (snapshotId != -1L) { - Snapshot snapshot = snapshotsById.get(snapshotId); - Preconditions.checkArgument(snapshot != null, "Non-existent snapshot"); - newSnapshotLog.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshotId)); - } - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshotId, - snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); - } - - public TableMetadata withCurrentSchema(int schemaId) { - if (currentSchemaId == schemaId) { - return this; - } - Preconditions.checkArgument(schemasById.containsKey(schemaId), "Non-existent schema"); - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, - snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); - } - - public TableMetadata withDefaultSortOrder(int sortOrderId) { - if (defaultSortOrderId == sortOrderId) { - return this; - } - Preconditions.checkArgument(sortOrdersById.containsKey(sortOrderId), "Non-existent sort-order"); - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, sortOrderId, sortOrders, properties, currentSnapshotId, - snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); - } + int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); - public TableMetadata withDefaultSpec(int specId) { - if (defaultSpecId == specId) { - return this; - } - Preconditions.checkArgument(specsById.containsKey(specId), "Non-existent partition spec"); - return new TableMetadata(null, formatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, specId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, - snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + return new Builder(this) + .setProperties(updated) + .removeProperties(removed) + .upgradeFormatVersion(newFormatVersion) + .build(); } private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) { @@ -835,146 +593,62 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update AtomicInteger newLastColumnId = new AtomicInteger(lastColumnId); Schema freshSchema = TypeUtil.assignFreshIds(updatedSchema, schema(), newLastColumnId::incrementAndGet); - // determine the next spec id - OptionalInt maxSpecId = specs.stream().mapToInt(PartitionSpec::specId).max(); - int nextSpecId = maxSpecId.orElse(TableMetadata.INITIAL_SPEC_ID) + 1; - - // rebuild the partition spec using the new column ids - PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec); - - // reassign partition field ids with existing partition specs in the table - AtomicInteger lastPartitionId = new AtomicInteger(lastAssignedPartitionId); - PartitionSpec newSpec = reassignPartitionIds(freshSpec, lastPartitionId::incrementAndGet); - - // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. - int specId = specs.stream() - .filter(newSpec::compatibleWith) - .findFirst() - .map(PartitionSpec::specId) - .orElse(nextSpecId); - - ImmutableList.Builder specListBuilder = ImmutableList.builder().addAll(specs); - if (!specsById.containsKey(specId)) { - specListBuilder.add(newSpec); - } - - // determine the next order id - OptionalInt maxOrderId = sortOrders.stream().mapToInt(SortOrder::orderId).max(); - int nextOrderId = maxOrderId.isPresent() ? maxOrderId.getAsInt() + 1 : INITIAL_SORT_ORDER_ID; + // rebuild the partition spec using the new column ids and reassign partition field ids to align with existing + // partition specs in the table + PartitionSpec freshSpec = reassignPartitionIds( + freshSpec(INITIAL_SPEC_ID, freshSchema, updatedPartitionSpec), + new AtomicInteger(lastAssignedPartitionId)::incrementAndGet); // rebuild the sort order using new column ids - int freshSortOrderId = updatedSortOrder.isUnsorted() ? updatedSortOrder.orderId() : nextOrderId; - SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, updatedSortOrder); - - // if the order already exists, use the same ID. otherwise, use the fresh order ID - Optional sameSortOrder = sortOrders.stream() - .filter(sortOrder -> sortOrder.sameOrder(freshSortOrder)) - .findAny(); - int orderId = sameSortOrder.map(SortOrder::orderId).orElse(freshSortOrderId); - - ImmutableList.Builder sortOrdersBuilder = ImmutableList.builder().addAll(sortOrders); - if (!sortOrdersById.containsKey(orderId)) { - sortOrdersBuilder.add(freshSortOrder); - } - - Map newProperties = Maps.newHashMap(); - newProperties.putAll(this.properties); - newProperties.putAll(unreservedProperties(updatedProperties)); + SortOrder freshSortOrder = freshSortOrder(INITIAL_SORT_ORDER_ID, freshSchema, updatedSortOrder); // check if there is format version override int newFormatVersion = PropertyUtil.propertyAsInt(updatedProperties, TableProperties.FORMAT_VERSION, formatVersion); - // determine the next schema id - int freshSchemaId = reuseOrCreateNewSchemaId(freshSchema); - ImmutableList.Builder schemasBuilder = ImmutableList.builder().addAll(schemas); - - if (!schemasById.containsKey(freshSchemaId)) { - schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), freshSchema.identifierFieldIds())); - } - - TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, newLocation, - lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(), - specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId()), - orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties), - -1, snapshots, ImmutableList.of(), addPreviousFile(metadataFileLocation, lastUpdatedMillis, newProperties)); - - if (formatVersion != newFormatVersion) { - metadata = metadata.upgradeToFormatVersion(newFormatVersion); - } - - return metadata; + return new Builder(this) + .upgradeFormatVersion(newFormatVersion) + .setCurrentSnapshot(null) + .setCurrentSchema(freshSchema, newLastColumnId.get()) + .setDefaultPartitionSpec(freshSpec) + .setDefaultSortOrder(freshSortOrder) + .setLocation(newLocation) + .setProperties(unreservedProperties(updatedProperties)) + .build(); } public TableMetadata updateLocation(String newLocation) { - return new TableMetadata(null, formatVersion, uuid, newLocation, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, - snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); + return new Builder(this).setLocation(newLocation).build(); } public TableMetadata upgradeToFormatVersion(int newFormatVersion) { - Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, - "Cannot upgrade table to unsupported format version: v%s (supported: v%s)", - newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION); - Preconditions.checkArgument(newFormatVersion >= formatVersion, - "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion); - - if (newFormatVersion == formatVersion) { - return this; - } - - return new TableMetadata(null, newFormatVersion, uuid, location, - lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, - lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, - snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis)); - } - - private List addPreviousFile(String previousFileLocation, long timestampMillis) { - return addPreviousFile(previousFileLocation, timestampMillis, properties); - } - - private List addPreviousFile(String previousFileLocation, long timestampMillis, - Map updatedProperties) { - if (previousFileLocation == null) { - return previousFiles; - } - - int maxSize = Math.max(1, PropertyUtil.propertyAsInt(updatedProperties, - TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)); - - List newMetadataLog; - if (previousFiles.size() >= maxSize) { - int removeIndex = previousFiles.size() - maxSize + 1; - newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size())); - } else { - newMetadataLog = Lists.newArrayList(previousFiles); - } - newMetadataLog.add(new MetadataLogEntry(timestampMillis, previousFileLocation)); - - return newMetadataLog; + return new Builder(this).upgradeFormatVersion(newFormatVersion).build(); } private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) { PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema) .withSpecId(partitionSpec.specId()); - // add all of the fields to the builder. IDs should not change. + // add all the fields to the builder. IDs should not change. for (PartitionField field : partitionSpec.fields()) { specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); } - return specBuilder.build(); + // build without validation because the schema may have changed in a way that makes this spec invalid. the spec + // should still be preserved so that older metadata can be interpreted. + return specBuilder.buildUnchecked(); } private static SortOrder updateSortOrderSchema(Schema schema, SortOrder sortOrder) { SortOrder.Builder builder = SortOrder.builderFor(schema).withOrderId(sortOrder.orderId()); - // add all of the fields to the builder. IDs should not change. + // add all the fields to the builder. IDs should not change. for (SortField field : sortOrder.fields()) { - builder.addSortField(field.transform().toString(), field.sourceId(), field.direction(), field.nullOrder()); + builder.addSortField(field.transform(), field.sourceId(), field.direction(), field.nullOrder()); } - return builder.build(); + // build without validation because the schema may have changed in a way that makes this order invalid. the order + // should still be preserved so that older metadata can be interpreted. + return builder.buildUnchecked(); } private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) { @@ -995,7 +669,11 @@ private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec } private static SortOrder freshSortOrder(int orderId, Schema schema, SortOrder sortOrder) { - SortOrder.Builder builder = SortOrder.builderFor(schema).withOrderId(orderId); + SortOrder.Builder builder = SortOrder.builderFor(schema); + + if (sortOrder.isSorted()) { + builder.withOrderId(orderId); + } for (SortField field : sortOrder.fields()) { // look up the name of the source field in the old schema to get the new schema's id @@ -1047,17 +725,547 @@ private static Map indexSortOrders(List sortOrder return builder.build(); } - private int reuseOrCreateNewSchemaId(Schema newSchema) { - // if the schema already exists, use its id; otherwise use the highest id + 1 - int newSchemaId = currentSchemaId; - for (Schema schema : schemas) { - if (schema.sameSchema(newSchema)) { - newSchemaId = schema.schemaId(); - break; - } else if (schema.schemaId() >= newSchemaId) { - newSchemaId = schema.schemaId() + 1; + public static Builder buildFrom(TableMetadata base) { + return new Builder(base); + } + + public static class Builder { + private final TableMetadata base; + private int formatVersion; + private String uuid; + private Long lastUpdatedMillis; + private String location; + private long lastSequenceNumber; + private int lastColumnId; + private int currentSchemaId; + private final List schemas; + private int defaultSpecId; + private List specs; + private int lastAssignedPartitionId; + private int defaultSortOrderId; + private List sortOrders; + private final Map properties; + private long currentSnapshotId; + private List snapshots; + + // change tracking + private final List changes; + private final int startingChangeCount; + private boolean discardChanges = false; + + // handled in build + private final List snapshotLog; + private final String previousFileLocation; + private final List previousFiles; + + // indexes for convenience + private final Map snapshotsById; + private final Map schemasById; + private final Map specsById; + private final Map sortOrdersById; + + private Builder(TableMetadata base) { + this.base = base; + this.formatVersion = base.formatVersion; + this.uuid = base.uuid; + this.lastUpdatedMillis = null; + this.location = base.location; + this.lastSequenceNumber = base.lastSequenceNumber; + this.lastColumnId = base.lastColumnId; + this.currentSchemaId = base.currentSchemaId; + this.schemas = Lists.newArrayList(base.schemas); + this.defaultSpecId = base.defaultSpecId; + this.specs = Lists.newArrayList(base.specs); + this.lastAssignedPartitionId = base.lastAssignedPartitionId; + this.defaultSortOrderId = base.defaultSortOrderId; + this.sortOrders = Lists.newArrayList(base.sortOrders); + this.properties = Maps.newHashMap(base.properties); + this.currentSnapshotId = base.currentSnapshotId; + this.snapshots = Lists.newArrayList(base.snapshots); + this.changes = Lists.newArrayList(base.changes); + this.startingChangeCount = changes.size(); + + this.snapshotLog = Lists.newArrayList(base.snapshotLog); + this.previousFileLocation = base.metadataFileLocation; + this.previousFiles = base.previousFiles; + + this.snapshotsById = Maps.newHashMap(base.snapshotsById); + this.schemasById = Maps.newHashMap(base.schemasById); + this.specsById = Maps.newHashMap(base.specsById); + this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); + } + + public Builder assignUUID() { + if (uuid == null) { + this.uuid = UUID.randomUUID().toString(); + changes.add(new MetadataUpdate.AssignUUID(uuid)); } + + return this; + } + + public Builder upgradeFormatVersion(int newFormatVersion) { + Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, + "Cannot upgrade table to unsupported format version: v%s (supported: v%s)", + newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION); + Preconditions.checkArgument(newFormatVersion >= formatVersion, + "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion); + + if (newFormatVersion == formatVersion) { + return this; + } + + this.formatVersion = newFormatVersion; + changes.add(new MetadataUpdate.UpgradeFormatVersion(newFormatVersion)); + + return this; + } + + public Builder setCurrentSchema(Schema newSchema, int newLastColumnId) { + setCurrentSchema(addSchemaInternal(newSchema, newLastColumnId)); + return this; + } + + public Builder setCurrentSchema(int schemaId) { + if (currentSchemaId == schemaId) { + return this; + } + + Schema schema = schemasById.get(schemaId); + Preconditions.checkArgument(schema != null, "Cannot set current schema to unknown schema: %s", schemaId); + + // rebuild all the partition specs and sort orders for the new current schema + this.specs = Lists.newArrayList(Iterables.transform(specs, + spec -> updateSpecSchema(schema, spec))); + specsById.clear(); + specsById.putAll(indexSpecs(specs)); + + this.sortOrders = Lists.newArrayList(Iterables.transform(sortOrders, + order -> updateSortOrderSchema(schema, order))); + sortOrdersById.clear(); + sortOrdersById.putAll(indexSortOrders(sortOrders)); + + this.currentSchemaId = schemaId; + + changes.add(new MetadataUpdate.SetCurrentSchema(schemaId)); + + return this; + } + + public Builder addSchema(Schema schema, int newLastColumnId) { + addSchemaInternal(schema, newLastColumnId); + return this; + } + + public Builder setDefaultPartitionSpec(PartitionSpec spec) { + setDefaultPartitionSpec(addPartitionSpecInternal(spec)); + return this; + } + + public Builder setDefaultPartitionSpec(int specId) { + if (defaultSpecId == specId) { + // the new spec is already current and no change is needed + return this; + } + + this.defaultSpecId = specId; + changes.add(new MetadataUpdate.SetDefaultPartitionSpec(specId)); + + return this; + } + + public Builder addPartitionSpec(PartitionSpec spec) { + addPartitionSpecInternal(spec); + return this; + } + + public Builder setDefaultSortOrder(SortOrder order) { + setDefaultSortOrder(addSortOrderInternal(order)); + return this; + } + + public Builder setDefaultSortOrder(int sortOrderId) { + if (sortOrderId == defaultSortOrderId) { + return this; + } + + this.defaultSortOrderId = sortOrderId; + changes.add(new MetadataUpdate.SetDefaultSortOrder(sortOrderId)); + + return this; + } + + public Builder addSortOrder(SortOrder order) { + addSortOrderInternal(order); + return this; + } + + public Builder addSnapshot(Snapshot snapshot) { + if (snapshot == null || snapshotsById.containsKey(snapshot.snapshotId())) { + // change is a noop + return this; + } + + ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber, + "Cannot add snapshot with sequence number %s older than last sequence number %s", + snapshot.sequenceNumber(), lastSequenceNumber); + + this.lastUpdatedMillis = snapshot.timestampMillis(); + this.lastSequenceNumber = snapshot.sequenceNumber(); + snapshots.add(snapshot); + snapshotsById.put(snapshot.snapshotId(), snapshot); + changes.add(new MetadataUpdate.AddSnapshot(snapshot)); + + return this; + } + + public Builder setCurrentSnapshot(Snapshot snapshot) { + addSnapshot(snapshot); + setCurrentSnapshot(snapshot, null); + return this; + } + + public Builder setCurrentSnapshot(long snapshotId) { + if (currentSnapshotId == snapshotId) { + // change is a noop + return this; + } + + Snapshot snapshot = snapshotsById.get(snapshotId); + ValidationException.check(snapshot != null, + "Cannot set current snapshot to unknown: %s", snapshotId); + + setCurrentSnapshot(snapshot, System.currentTimeMillis()); + + return this; + } + + public Builder removeSnapshots(List snapshotsToRemove) { + Set idsToRemove = snapshotsToRemove.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + + List retainedSnapshots = Lists.newArrayListWithExpectedSize(snapshots.size() - idsToRemove.size()); + for (Snapshot snapshot : snapshots) { + long snapshotId = snapshot.snapshotId(); + if (idsToRemove.contains(snapshotId)) { + snapshotsById.remove(snapshotId); + changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId)); + } else { + retainedSnapshots.add(snapshot); + } + } + + this.snapshots = retainedSnapshots; + if (!snapshotsById.containsKey(currentSnapshotId)) { + setCurrentSnapshot(null, System.currentTimeMillis()); + } + + return this; + } + + public Builder setProperties(Map updated) { + if (updated.isEmpty()) { + return this; + } + + properties.putAll(updated); + changes.add(new MetadataUpdate.SetProperties(updated)); + + return this; + } + + public Builder removeProperties(Set removed) { + if (removed.isEmpty()) { + return this; + } + + removed.forEach(properties::remove); + changes.add(new MetadataUpdate.RemoveProperties(removed)); + + return this; + } + + public Builder setLocation(String newLocation) { + if (location != null && location.equals(newLocation)) { + return this; + } + + this.location = newLocation; + changes.add(new MetadataUpdate.SetLocation(newLocation)); + + return this; + } + + public Builder discardChanges() { + this.discardChanges = true; + return this; + } + + public TableMetadata build() { + if (changes.size() == startingChangeCount && !(discardChanges && changes.size() > 0)) { + return base; + } + + if (lastUpdatedMillis == null) { + this.lastUpdatedMillis = System.currentTimeMillis(); + } + + Schema schema = schemasById.get(currentSchemaId); + PartitionSpec.checkCompatibility(specsById.get(defaultSpecId), schema); + SortOrder.checkCompatibility(sortOrdersById.get(defaultSortOrderId), schema); + + List metadataHistory = addPreviousFile( + previousFiles, previousFileLocation, base.lastUpdatedMillis(), properties); + List newSnapshotLog = updateSnapshotLog(snapshotLog, snapshotsById, currentSnapshotId, changes); + + return new TableMetadata( + null, + formatVersion, + uuid, + location, + lastSequenceNumber, + lastUpdatedMillis, + lastColumnId, + currentSchemaId, + ImmutableList.copyOf(schemas), + defaultSpecId, + ImmutableList.copyOf(specs), + lastAssignedPartitionId, + defaultSortOrderId, + ImmutableList.copyOf(sortOrders), + ImmutableMap.copyOf(properties), + currentSnapshotId, + ImmutableList.copyOf(snapshots), + ImmutableList.copyOf(newSnapshotLog), + ImmutableList.copyOf(metadataHistory), + discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes) + ); + } + + private int addSchemaInternal(Schema schema, int newLastColumnId) { + Preconditions.checkArgument(newLastColumnId >= lastColumnId, + "Invalid last column ID: %s < %s (previous last column ID)", newLastColumnId, lastColumnId); + + int newSchemaId = reuseOrCreateNewSchemaId(schema); + boolean schemaFound = schemasById.containsKey(newSchemaId); + if (schemaFound && newLastColumnId == lastColumnId) { + // the new spec and last column id is already current and no change is needed + return newSchemaId; + } + + this.lastColumnId = newLastColumnId; + + Schema newSchema; + if (newSchemaId != schema.schemaId()) { + newSchema = new Schema(newSchemaId, schema.columns(), schema.identifierFieldIds()); + } else { + newSchema = schema; + } + + if (!schemaFound) { + schemas.add(newSchema); + schemasById.put(newSchema.schemaId(), newSchema); + } + + changes.add(new MetadataUpdate.AddSchema(newSchema, lastColumnId)); + + return newSchemaId; + } + + private int reuseOrCreateNewSchemaId(Schema newSchema) { + // if the schema already exists, use its id; otherwise use the highest id + 1 + int newSchemaId = currentSchemaId; + for (Schema schema : schemas) { + if (schema.sameSchema(newSchema)) { + return schema.schemaId(); + } else if (schema.schemaId() >= newSchemaId) { + newSchemaId = schema.schemaId() + 1; + } + } + return newSchemaId; + } + + private int addPartitionSpecInternal(PartitionSpec spec) { + int newSpecId = reuseOrCreateNewSpecId(spec); + if (specsById.containsKey(newSpecId)) { + return newSpecId; + } + + Schema schema = schemasById.get(currentSchemaId); + PartitionSpec.checkCompatibility(spec, schema); + ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(spec), + "Spec does not use sequential IDs that are required in v1: %s", spec); + + PartitionSpec newSpec = freshSpec(newSpecId, schema, spec); + this.lastAssignedPartitionId = Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId()); + specs.add(newSpec); + specsById.put(newSpecId, newSpec); + + changes.add(new MetadataUpdate.AddPartitionSpec(newSpec)); + + return newSpecId; + } + + private int reuseOrCreateNewSpecId(PartitionSpec newSpec) { + // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. + int newSpecId = INITIAL_SPEC_ID; + for (PartitionSpec spec : specs) { + if (newSpec.compatibleWith(spec)) { + return spec.specId(); + } else if (newSpecId <= spec.specId()) { + newSpecId = spec.specId() + 1; + } + } + + return newSpecId; + } + + private int addSortOrderInternal(SortOrder order) { + int newOrderId = reuseOrCreateNewSortOrderId(order); + if (sortOrdersById.containsKey(newOrderId)) { + return newOrderId; + } + + Schema schema = schemasById.get(currentSchemaId); + SortOrder.checkCompatibility(order, schema); + + SortOrder newOrder; + if (order.isUnsorted()) { + newOrder = SortOrder.unsorted(); + } else { + // rebuild the sort order using new column ids + newOrder = freshSortOrder(newOrderId, schema, order); + } + + sortOrders.add(newOrder); + sortOrdersById.put(newOrderId, newOrder); + + changes.add(new MetadataUpdate.AddSortOrder(newOrder)); + + return newOrderId; + } + + private int reuseOrCreateNewSortOrderId(SortOrder newOrder) { + if (newOrder.isUnsorted()) { + return SortOrder.unsorted().orderId(); + } + + // determine the next order id + int newOrderId = INITIAL_SORT_ORDER_ID; + for (SortOrder order : sortOrders) { + if (order.sameOrder(newOrder)) { + return order.orderId(); + } else if (newOrderId <= order.orderId()) { + newOrderId = order.orderId() + 1; + } + } + + return newOrderId; + } + + private void setCurrentSnapshot(Snapshot snapshot, Long currentTimestampMillis) { + if (snapshot == null) { + this.currentSnapshotId = -1; + snapshotLog.clear(); + changes.add(new MetadataUpdate.SetCurrentSnapshot(null)); + return; + } + + if (currentSnapshotId == snapshot.snapshotId()) { + return; + } + + ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber, + "Last sequence number %s is less than existing snapshot sequence number %s", + lastSequenceNumber, snapshot.sequenceNumber()); + + this.lastUpdatedMillis = currentTimestampMillis != null ? currentTimestampMillis : snapshot.timestampMillis(); + this.currentSnapshotId = snapshot.snapshotId(); + snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, snapshot.snapshotId())); + changes.add(new MetadataUpdate.SetCurrentSnapshot(snapshot.snapshotId())); + } + + private static List addPreviousFile( + List previousFiles, String previousFileLocation, long timestampMillis, + Map properties) { + if (previousFileLocation == null) { + return previousFiles; + } + + int maxSize = Math.max(1, PropertyUtil.propertyAsInt(properties, + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)); + + List newMetadataLog; + if (previousFiles.size() >= maxSize) { + int removeIndex = previousFiles.size() - maxSize + 1; + newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size())); + } else { + newMetadataLog = Lists.newArrayList(previousFiles); + } + newMetadataLog.add(new MetadataLogEntry(timestampMillis, previousFileLocation)); + + return newMetadataLog; + } + + /** + * Finds intermediate snapshots that have not been committed as the current snapshot. + * + * @return a set of snapshot ids for all added snapshots that were later replaced as the current snapshot in changes + */ + private static Set intermediateSnapshotIdSet(List changes, long currentSnapshotId) { + Set addedSnapshotIds = Sets.newHashSet(); + Set intermediateSnapshotIds = Sets.newHashSet(); + for (MetadataUpdate update : changes) { + if (update instanceof MetadataUpdate.AddSnapshot) { + // adds must always come before set current snapshot + MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) update; + addedSnapshotIds.add(addSnapshot.snapshot().snapshotId()); + } else if (update instanceof MetadataUpdate.SetCurrentSnapshot) { + Long snapshotId = ((MetadataUpdate.SetCurrentSnapshot) update).snapshotId(); + if (snapshotId != null && addedSnapshotIds.contains(snapshotId) && snapshotId != currentSnapshotId) { + intermediateSnapshotIds.add(snapshotId); + } + } + } + + return intermediateSnapshotIds; + } + + private static List updateSnapshotLog( + List snapshotLog, Map snapshotsById, long currentSnapshotId, + List changes) { + // find intermediate snapshots to suppress incorrect entries in the snapshot log. + // + // transactions can create snapshots that are never the current snapshot because several changes are combined + // by the transaction into one table metadata update. when each intermediate snapshot is added to table metadata, + // it is added to the snapshot log, assuming that it will be the current snapshot. when there are multiple + // snapshot updates, the log must be corrected by suppressing the intermediate snapshot entries. + // + // a snapshot is an intermediate snapshot if it was added but is not the current snapshot. + Set intermediateSnapshotIds = intermediateSnapshotIdSet(changes, currentSnapshotId); + + // update the snapshot log + List newSnapshotLog = Lists.newArrayList(); + for (HistoryEntry logEntry : snapshotLog) { + long snapshotId = logEntry.snapshotId(); + if (snapshotsById.containsKey(snapshotId) && !intermediateSnapshotIds.contains(snapshotId)) { + // copy the log entries that are still valid + newSnapshotLog.add(logEntry); + } else { + // any invalid entry causes the history before it to be removed. otherwise, there could be + // history gaps that cause time-travel queries to produce incorrect results. for example, + // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be + // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2 + // and t3 when in fact s2 was the current snapshot. + newSnapshotLog.clear(); + } + } + + if (snapshotsById.get(currentSnapshotId) != null) { + ValidationException.check(Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId, + "Cannot set invalid snapshot log: latest entry is not the current snapshot"); + } + + return newSnapshotLog; } - return newSchemaId; } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index 1e26fbb9b11b..8810fc1cf8c4 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -433,6 +433,6 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node) return new TableMetadata(metadataLocation, formatVersion, uuid, location, lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentVersionId, - snapshots, entries.build(), metadataEntries.build()); + snapshots, entries.build(), metadataEntries.build(), ImmutableList.of() /* no changes from the file */); } } diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 6ddbeab1d1d4..97f2d98bac05 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -108,7 +108,7 @@ public void testJsonConversion() throws Exception { 7, ImmutableList.of(TEST_SCHEMA, schema), 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId, - Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of()); + Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of(), ImmutableList.of()); String asJson = TableMetadataParser.toJson(expected); TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson); @@ -180,7 +180,8 @@ public void testBackwardCompat() throws Exception { 0, System.currentTimeMillis(), 3, TableMetadata.INITIAL_SCHEMA_ID, ImmutableList.of(schema), 6, ImmutableList.of(spec), spec.lastAssignedFieldId(), TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(sortOrder), ImmutableMap.of("property", "value"), - currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of()); + currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of()); String asJson = toJsonWithoutSpecAndSchemaList(expected); TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson); @@ -302,7 +303,7 @@ public void testJsonWithPreviousMetadataLog() throws Exception { 7, ImmutableList.of(TEST_SCHEMA), 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, - ImmutableList.copyOf(previousMetadataLog)); + ImmutableList.copyOf(previousMetadataLog), ImmutableList.of()); String asJson = TableMetadataParser.toJson(base); TableMetadata metadataFromJson = TableMetadataParser.fromJson(ops.io(), asJson); @@ -322,6 +323,8 @@ public void testAddPreviousMetadataRemoveNone() { new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); + reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId)); + reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId)); long currentTimestamp = System.currentTimeMillis(); List previousMetadataLog = Lists.newArrayList(); previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100, @@ -337,7 +340,7 @@ public void testAddPreviousMetadataRemoveNone() { 7, ImmutableList.of(TEST_SCHEMA), 5, ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, - ImmutableList.copyOf(previousMetadataLog)); + ImmutableList.copyOf(previousMetadataLog), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -362,6 +365,8 @@ public void testAddPreviousMetadataRemoveOne() { new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); + reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId)); + reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId)); long currentTimestamp = System.currentTimeMillis(); List previousMetadataLog = Lists.newArrayList(); previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100, @@ -384,7 +389,7 @@ public void testAddPreviousMetadataRemoveOne() { ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, - ImmutableList.copyOf(previousMetadataLog)); + ImmutableList.copyOf(previousMetadataLog), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -414,6 +419,8 @@ public void testAddPreviousMetadataRemoveMultiple() { new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); + reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId)); + reversedSnapshotLog.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshotId)); long currentTimestamp = System.currentTimeMillis(); List previousMetadataLog = Lists.newArrayList(); previousMetadataLog.add(new MetadataLogEntry(currentTimestamp - 100, @@ -431,12 +438,12 @@ public void testAddPreviousMetadataRemoveMultiple() { "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json"); TableMetadata base = new TableMetadata(latestPreviousMetadata.file(), 1, UUID.randomUUID().toString(), - TEST_LOCATION, 0, currentTimestamp - 50, 3, 7, ImmutableList.of(TEST_SCHEMA), 2, + TEST_LOCATION, 0, currentTimestamp - 50, 3, 7, ImmutableList.of(TEST_SCHEMA), SPEC_5.specId(), ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), - TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(SortOrder.unsorted()), + SortOrder.unsorted().orderId(), ImmutableList.of(SortOrder.unsorted()), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, - ImmutableList.copyOf(previousMetadataLog)); + ImmutableList.copyOf(previousMetadataLog), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -462,7 +469,7 @@ public void testV2UUIDValidation() { LAST_ASSIGNED_COLUMN_ID, 7, ImmutableList.of(TEST_SCHEMA), SPEC_5.specId(), ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L, - ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) + ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) ); } @@ -475,7 +482,7 @@ public void testVersionValidation() { System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID, 7, ImmutableList.of(TEST_SCHEMA), SPEC_5.specId(), ImmutableList.of(SPEC_5), SPEC_5.lastAssignedFieldId(), 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L, - ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) + ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) ); } @@ -581,7 +588,8 @@ public void testInvalidUpdatePartitionSpecForV1Table() throws Exception { .add(1, 1005, "x_partition", "bucket[4]") .build(); String location = "file://tmp/db/table"; - TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of()); + TableMetadata metadata = TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), location, ImmutableMap.of()); AssertHelpers.assertThrows("Should fail to update an invalid partition spec", ValidationException.class, "Spec does not use sequential IDs that are required in v1", diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index 2e0ecd60359d..a24f94f48e8b 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -190,9 +190,10 @@ public void commit(TableMetadata base, TableMetadata updatedMetadata) { throw new CommitFailedException("Injected failure"); } Integer version = VERSIONS.get(tableName); + // remove changes from the committed metadata + this.current = TableMetadata.buildFrom(updatedMetadata).discardChanges().build(); VERSIONS.put(tableName, version == null ? 0 : version + 1); - METADATA.put(tableName, updatedMetadata); - this.current = updatedMetadata; + METADATA.put(tableName, current); } else { throw new CommitFailedException( "Commit failed: table was updated at %d", current.lastUpdatedMillis()); diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index 106cad40cab6..6b406626c88c 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -79,18 +79,19 @@ protected String tableName() { } @Override - protected void refreshFromMetadataLocation(String newLocation, Predicate shouldRetry, - int numRetries) { + protected void refreshFromMetadataLocation(String newLocation, Predicate shouldRetry, int numRetries) { super.refreshFromMetadataLocation(newLocation, shouldRetry, numRetries, this::loadTableMetadata); } private TableMetadata loadTableMetadata(String metadataLocation) { // Update the TableMetadata with the Content of NessieTableState. - return TableMetadataParser.read(io(), metadataLocation) - .withCurrentSnapshotOnly(table.getSnapshotId()) - .withCurrentSchema(table.getSchemaId()) - .withDefaultSortOrder(table.getSortOrderId()) - .withDefaultSpec(table.getSpecId()); + return TableMetadata.buildFrom(TableMetadataParser.read(io(), metadataLocation)) + .setCurrentSnapshot(table.getSnapshotId()) + .setCurrentSchema(table.getSchemaId()) + .setDefaultSortOrder(table.getSortOrderId()) + .setDefaultPartitionSpec(table.getSpecId()) + .discardChanges() + .build(); } @Override