diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 7650190448c8..f5c8e4a5545e 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -22,11 +22,14 @@ import java.io.Serializable; import java.util.Map; import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; /** * Represents a change to table metadata. */ public interface MetadataUpdate extends Serializable { + void applyTo(TableMetadata.Builder metadataBuilder); + class AssignUUID implements MetadataUpdate { private final String uuid; @@ -37,6 +40,11 @@ public AssignUUID(String uuid) { public String uuid() { return uuid; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + throw new UnsupportedOperationException("Not implemented"); + } } class UpgradeFormatVersion implements MetadataUpdate { @@ -49,6 +57,11 @@ public UpgradeFormatVersion(int formatVersion) { public int formatVersion() { return formatVersion; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.upgradeFormatVersion(formatVersion); + } } class AddSchema implements MetadataUpdate { @@ -67,6 +80,11 @@ public Schema schema() { public int lastColumnId() { return lastColumnId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addSchema(schema, lastColumnId); + } } class SetCurrentSchema implements MetadataUpdate { @@ -79,6 +97,11 @@ public SetCurrentSchema(int schemaId) { public int schemaId() { return schemaId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setCurrentSchema(schemaId); + } } class AddPartitionSpec implements MetadataUpdate { @@ -91,6 +114,11 @@ public AddPartitionSpec(PartitionSpec spec) { public PartitionSpec spec() { return spec; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addPartitionSpec(spec); + } } class SetDefaultPartitionSpec implements MetadataUpdate { @@ -103,6 +131,11 @@ public SetDefaultPartitionSpec(int schemaId) { public int specId() { return specId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setDefaultPartitionSpec(specId); + } } class AddSortOrder implements MetadataUpdate { @@ -112,9 +145,14 @@ public AddSortOrder(SortOrder sortOrder) { this.sortOrder = sortOrder; } - public SortOrder spec() { + public SortOrder sortOrder() { return sortOrder; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addSortOrder(sortOrder); + } } class SetDefaultSortOrder implements MetadataUpdate { @@ -127,6 +165,11 @@ public SetDefaultSortOrder(int sortOrderId) { public int sortOrderId() { return sortOrderId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setDefaultSortOrder(sortOrderId); + } } class AddSnapshot implements MetadataUpdate { @@ -139,6 +182,11 @@ public AddSnapshot(Snapshot snapshot) { public Snapshot snapshot() { return snapshot; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addSnapshot(snapshot); + } } class RemoveSnapshot implements MetadataUpdate { @@ -151,6 +199,11 @@ public RemoveSnapshot(long snapshotId) { public long snapshotId() { return snapshotId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeSnapshots(ImmutableSet.of(snapshotId)); + } } class RemoveSnapshotRef implements MetadataUpdate { @@ -163,6 +216,12 @@ public RemoveSnapshotRef(String name) { public String name() { return name; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + // TODO: this should be generalized when tagging is supported + metadataBuilder.removeBranch(name); + } } class SetSnapshotRef implements MetadataUpdate { @@ -181,6 +240,11 @@ public String name() { public long snapshotId() { return snapshotId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setBranchSnapshot(snapshotId, name); + } } class SetProperties implements MetadataUpdate { @@ -193,6 +257,11 @@ public SetProperties(Map updated) { public Map updated() { return updated; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setProperties(updated); + } } class RemoveProperties implements MetadataUpdate { @@ -205,6 +274,11 @@ public RemoveProperties(Set removed) { public Set removed() { return removed; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeProperties(removed); + } } class SetLocation implements MetadataUpdate { @@ -217,5 +291,10 @@ public SetLocation(String location) { public String location() { return location; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setLocation(location); + } } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 2625daa7e55c..02af0ad5e80c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.io.Serializable; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; @@ -407,7 +409,7 @@ public Map specsById() { return specsById; } - int lastAssignedPartitionId() { + public int lastAssignedPartitionId() { return lastAssignedPartitionId; } @@ -754,7 +756,10 @@ public static Builder buildFrom(TableMetadata base) { } public static class Builder { + private static final int LAST_ADDED = -1; + private final TableMetadata base; + private String metadataLocation; private int formatVersion; private String uuid; private Long lastUpdatedMillis; @@ -771,12 +776,15 @@ public static class Builder { private final Map properties; private long currentSnapshotId; private List snapshots; - private Map refs; + private final Map refs; // change tracking private final List changes; private final int startingChangeCount; private boolean discardChanges = false; + private Integer lastAddedSchemaId = null; + private Integer lastAddedSpecId = null; + private Integer lastAddedOrderId = null; // handled in build private final List snapshotLog; @@ -821,6 +829,11 @@ private Builder(TableMetadata base) { this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); } + public Builder withMetadataLocation(String newMetadataLocation) { + this.metadataLocation = newMetadataLocation; + return this; + } + public Builder assignUUID() { if (uuid == null) { this.uuid = UUID.randomUUID().toString(); @@ -853,6 +866,12 @@ public Builder setCurrentSchema(Schema newSchema, int newLastColumnId) { } public Builder setCurrentSchema(int schemaId) { + if (schemaId == -1) { + ValidationException.check(lastAddedSchemaId != null, + "Cannot set last added schema: no schema has been added"); + return setCurrentSchema(lastAddedSchemaId); + } + if (currentSchemaId == schemaId) { return this; } @@ -873,12 +892,17 @@ public Builder setCurrentSchema(int schemaId) { this.currentSchemaId = schemaId; - changes.add(new MetadataUpdate.SetCurrentSchema(schemaId)); + if (lastAddedSchemaId != null && lastAddedSchemaId == schemaId) { + changes.add(new MetadataUpdate.SetCurrentSchema(LAST_ADDED)); + } else { + changes.add(new MetadataUpdate.SetCurrentSchema(schemaId)); + } return this; } public Builder addSchema(Schema schema, int newLastColumnId) { + // TODO: remove requirement for newLastColumnId addSchemaInternal(schema, newLastColumnId); return this; } @@ -889,13 +913,22 @@ public Builder setDefaultPartitionSpec(PartitionSpec spec) { } public Builder setDefaultPartitionSpec(int specId) { + if (specId == -1) { + ValidationException.check(lastAddedSpecId != null, "Cannot set last added spec: no spec has been added"); + return setDefaultPartitionSpec(lastAddedSpecId); + } + if (defaultSpecId == specId) { // the new spec is already current and no change is needed return this; } this.defaultSpecId = specId; - changes.add(new MetadataUpdate.SetDefaultPartitionSpec(specId)); + if (lastAddedSpecId != null && lastAddedSpecId == specId) { + changes.add(new MetadataUpdate.SetDefaultPartitionSpec(LAST_ADDED)); + } else { + changes.add(new MetadataUpdate.SetDefaultPartitionSpec(specId)); + } return this; } @@ -911,12 +944,22 @@ public Builder setDefaultSortOrder(SortOrder order) { } public Builder setDefaultSortOrder(int sortOrderId) { + if (sortOrderId == -1) { + ValidationException.check(lastAddedOrderId != null, + "Cannot set last added sort order: no sort order has been added"); + return setDefaultSortOrder(lastAddedOrderId); + } + if (sortOrderId == defaultSortOrderId) { return this; } this.defaultSortOrderId = sortOrderId; - changes.add(new MetadataUpdate.SetDefaultSortOrder(sortOrderId)); + if (lastAddedOrderId != null && lastAddedOrderId == sortOrderId) { + changes.add(new MetadataUpdate.SetDefaultSortOrder(LAST_ADDED)); + } else { + changes.add(new MetadataUpdate.SetDefaultSortOrder(sortOrderId)); + } return this; } @@ -947,7 +990,7 @@ public Builder addSnapshot(Snapshot snapshot) { public Builder setBranchSnapshot(Snapshot snapshot, String branch) { addSnapshot(snapshot); - setBranchSnapshot(snapshot, branch, null); + setBranchSnapshotInternal(snapshot, branch); return this; } @@ -961,7 +1004,7 @@ public Builder setBranchSnapshot(long snapshotId, String branch) { Snapshot snapshot = snapshotsById.get(snapshotId); ValidationException.check(snapshot != null, "Cannot set %s to unknown snapshot: %s", branch, snapshotId); - setBranchSnapshot(snapshot, branch, System.currentTimeMillis()); + setBranchSnapshotInternal(snapshot, branch); return this; } @@ -983,7 +1026,10 @@ public Builder removeBranch(String branch) { public Builder removeSnapshots(List snapshotsToRemove) { Set idsToRemove = snapshotsToRemove.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return removeSnapshots(idsToRemove); + } + public Builder removeSnapshots(Collection idsToRemove) { List retainedSnapshots = Lists.newArrayListWithExpectedSize(snapshots.size() - idsToRemove.size()); for (Snapshot snapshot : snapshots) { long snapshotId = snapshot.snapshotId(); @@ -1057,6 +1103,12 @@ public TableMetadata build() { this.lastUpdatedMillis = System.currentTimeMillis(); } + // when associated with a metadata file, table metadata must have no changes so that the metadata matches exactly + // what is in the metadata file, which does not store changes. metadata location with changes is inconsistent. + Preconditions.checkArgument( + changes.size() == 0 || discardChanges || metadataLocation == null, + "Cannot set metadata location with changes to table metadata: %s changes", changes.size()); + Schema schema = schemasById.get(currentSchemaId); PartitionSpec.checkCompatibility(specsById.get(defaultSpecId), schema); SortOrder.checkCompatibility(sortOrdersById.get(defaultSortOrderId), schema); @@ -1066,7 +1118,7 @@ public TableMetadata build() { List newSnapshotLog = updateSnapshotLog(snapshotLog, snapshotsById, currentSnapshotId, changes); return new TableMetadata( - null, + metadataLocation, formatVersion, uuid, location, @@ -1098,6 +1150,7 @@ private int addSchemaInternal(Schema schema, int newLastColumnId) { boolean schemaFound = schemasById.containsKey(newSchemaId); if (schemaFound && newLastColumnId == lastColumnId) { // the new spec and last column id is already current and no change is needed + this.lastAddedSchemaId = newSchemaId; return newSchemaId; } @@ -1117,6 +1170,8 @@ private int addSchemaInternal(Schema schema, int newLastColumnId) { changes.add(new MetadataUpdate.AddSchema(newSchema, lastColumnId)); + this.lastAddedSchemaId = newSchemaId; + return newSchemaId; } @@ -1136,6 +1191,7 @@ private int reuseOrCreateNewSchemaId(Schema newSchema) { private int addPartitionSpecInternal(PartitionSpec spec) { int newSpecId = reuseOrCreateNewSpecId(spec); if (specsById.containsKey(newSpecId)) { + this.lastAddedSpecId = newSpecId; return newSpecId; } @@ -1151,6 +1207,8 @@ private int addPartitionSpecInternal(PartitionSpec spec) { changes.add(new MetadataUpdate.AddPartitionSpec(newSpec)); + this.lastAddedSpecId = newSpecId; + return newSpecId; } @@ -1171,6 +1229,7 @@ private int reuseOrCreateNewSpecId(PartitionSpec newSpec) { private int addSortOrderInternal(SortOrder order) { int newOrderId = reuseOrCreateNewSortOrderId(order); if (sortOrdersById.containsKey(newOrderId)) { + this.lastAddedOrderId = newOrderId; return newOrderId; } @@ -1190,6 +1249,8 @@ private int addSortOrderInternal(SortOrder order) { changes.add(new MetadataUpdate.AddSortOrder(newOrder)); + this.lastAddedOrderId = newOrderId; + return newOrderId; } @@ -1211,7 +1272,7 @@ private int reuseOrCreateNewSortOrderId(SortOrder newOrder) { return newOrderId; } - private void setBranchSnapshot(Snapshot snapshot, String branch, Long currentTimestampMillis) { + private void setBranchSnapshotInternal(Snapshot snapshot, String branch) { long replacementSnapshotId = snapshot.snapshotId(); SnapshotRef ref = refs.get(branch); if (ref != null) { @@ -1225,7 +1286,9 @@ private void setBranchSnapshot(Snapshot snapshot, String branch, Long currentTim "Last sequence number %s is less than existing snapshot sequence number %s", lastSequenceNumber, snapshot.sequenceNumber()); - this.lastUpdatedMillis = currentTimestampMillis != null ? currentTimestampMillis : snapshot.timestampMillis(); + // if the snapshot was added in this change set, use its timestamp + this.lastUpdatedMillis = isAddedSnapshot(snapshot.snapshotId()) ? + snapshot.timestampMillis() : System.currentTimeMillis(); if (SnapshotRef.MAIN_BRANCH.equals(branch)) { this.currentSnapshotId = replacementSnapshotId; @@ -1328,5 +1391,16 @@ private static List updateSnapshotLog( return newSnapshotLog; } + + private boolean isAddedSnapshot(long snapshotId) { + return changes(MetadataUpdate.AddSnapshot.class) + .anyMatch(add -> add.snapshot().snapshotId() == snapshotId); + } + + private Stream changes(Class updateClass) { + return changes.stream() + .filter(updateClass::isInstance) + .map(updateClass::cast); + } } }