diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java
index 25822c967852..1a7376fffbf9 100644
--- a/api/src/main/java/org/apache/iceberg/Snapshot.java
+++ b/api/src/main/java/org/apache/iceberg/Snapshot.java
@@ -31,6 +31,15 @@
* Snapshots are created by table operations, like {@link AppendFiles} and {@link RewriteFiles}.
*/
public interface Snapshot {
+ /**
+ * Return this snapshot's sequence number.
+ *
+ * Sequence numbers are assigned when a snapshot is committed.
+ *
+ * @return a long sequence number
+ */
+ long sequenceNumber();
+
/**
* Return this snapshot's ID.
*
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 2c82d8b0051f..6054d6fde771 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -35,9 +35,12 @@
import org.apache.iceberg.io.InputFile;
class BaseSnapshot implements Snapshot {
+ private static final long INITIAL_SEQUENCE_NUMBER = 0;
+
private final FileIO io;
private final long snapshotId;
private final Long parentId;
+ private final long sequenceNumber;
private final long timestampMillis;
private final InputFile manifestList;
private final String operation;
@@ -60,6 +63,7 @@ class BaseSnapshot implements Snapshot {
}
BaseSnapshot(FileIO io,
+ long sequenceNumber,
long snapshotId,
Long parentId,
long timestampMillis,
@@ -67,6 +71,7 @@ class BaseSnapshot implements Snapshot {
Map summary,
InputFile manifestList) {
this.io = io;
+ this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
this.timestampMillis = timestampMillis;
@@ -82,10 +87,15 @@ class BaseSnapshot implements Snapshot {
String operation,
Map summary,
List manifests) {
- this(io, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
+ this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
this.manifests = manifests;
}
+ @Override
+ public long sequenceNumber() {
+ return sequenceNumber;
+ }
+
@Override
public long snapshotId() {
return snapshotId;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index e81d496075a4..6c999b899b10 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Iterator;
@@ -31,7 +32,15 @@
abstract class ManifestListWriter implements FileAppender {
static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
long snapshotId, Long parentSnapshotId) {
+ Preconditions.checkArgument(formatVersion == 1, "Sequence number is required for format v%s", formatVersion);
+ return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
+ }
+
+ static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
+ long snapshotId, Long parentSnapshotId, long sequenceNumber) {
if (formatVersion == 1) {
+ Preconditions.checkArgument(sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER,
+ "Invalid sequence number for v1 manifest list: %s", sequenceNumber);
return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
}
throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion);
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
index 17b8083cdef8..db81cded3b30 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
@@ -37,6 +37,7 @@ public class SnapshotParser {
private SnapshotParser() {}
+ private static final String SEQUENCE_NUMBER = "sequence-number";
private static final String SNAPSHOT_ID = "snapshot-id";
private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
private static final String TIMESTAMP_MS = "timestamp-ms";
@@ -48,6 +49,9 @@ private SnapshotParser() {}
static void toJson(Snapshot snapshot, JsonGenerator generator)
throws IOException {
generator.writeStartObject();
+ if (snapshot.sequenceNumber() > TableMetadata.INITIAL_SEQUENCE_NUMBER) {
+ generator.writeNumberField(SEQUENCE_NUMBER, snapshot.sequenceNumber());
+ }
generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
if (snapshot.parentId() != null) {
generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
@@ -103,7 +107,11 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
Preconditions.checkArgument(node.isObject(),
"Cannot parse table version from a non-object: %s", node);
- long versionId = JsonUtil.getLong(SNAPSHOT_ID, node);
+ long sequenceNumber = TableMetadata.INITIAL_SEQUENCE_NUMBER;
+ if (node.has(SEQUENCE_NUMBER)) {
+ sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, node);
+ }
+ long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
Long parentId = null;
if (node.has(PARENT_SNAPSHOT_ID)) {
parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
@@ -134,7 +142,7 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
return new BaseSnapshot(
- io, versionId, parentId, timestamp, operation, summary,
+ io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary,
io.newInputFile(manifestList));
} else {
@@ -142,7 +150,7 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
// loaded lazily, if it is needed
List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
location -> new GenericManifestFile(io.newInputFile(location), 0));
- return new BaseSnapshot(io, versionId, parentId, timestamp, operation, summary, manifests);
+ return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 0d3239fd29d1..fb9509eb7065 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -79,7 +79,7 @@ public void accept(String file) {
private final AtomicInteger attempt = new AtomicInteger(0);
private final List manifestLists = Lists.newArrayList();
private volatile Long snapshotId = null;
- private TableMetadata base = null;
+ private TableMetadata base;
private boolean stageOnly = false;
private Consumer deleteFunc = defaultDelete;
@@ -143,14 +143,15 @@ public Snapshot apply() {
this.base = refresh();
Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
+ long sequenceNumber = base.nextSequenceNumber();
List manifests = apply(base);
- if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
+ if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
OutputFile manifestList = manifestListPath();
try (ManifestListWriter writer = ManifestListWriter.write(
- ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId)) {
+ ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) {
// keep track of the manifest lists created
manifestLists.add(manifestList.location());
@@ -170,7 +171,7 @@ public Snapshot apply() {
}
return new BaseSnapshot(ops.io(),
- snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
+ sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
ops.io().newInputFile(manifestList.location()));
} else {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 4d12cf5ce84c..f3bd7b226f9a 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -43,6 +43,7 @@
* Metadata for a table.
*/
public class TableMetadata {
+ static final long INITIAL_SEQUENCE_NUMBER = 0;
static final int DEFAULT_TABLE_FORMAT_VERSION = 1;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
static final int INITIAL_SPEC_ID = 0;
@@ -69,7 +70,7 @@ public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec freshSpec = specBuilder.build();
return new TableMetadata(null, DEFAULT_TABLE_FORMAT_VERSION, UUID.randomUUID().toString(), location,
- System.currentTimeMillis(),
+ INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
ImmutableList.of(), ImmutableList.of());
@@ -168,6 +169,7 @@ public String toString() {
private final int formatVersion;
private final String uuid;
private final String location;
+ private final long lastSequenceNumber;
private final long lastUpdatedMillis;
private final int lastColumnId;
private final Schema schema;
@@ -185,6 +187,7 @@ public String toString() {
int formatVersion,
String uuid,
String location,
+ long lastSequenceNumber,
long lastUpdatedMillis,
int lastColumnId,
Schema schema,
@@ -197,14 +200,16 @@ public String toString() {
List previousFiles) {
Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
"Unsupported format version: v%s", formatVersion);
- if (formatVersion > 1) {
- Preconditions.checkArgument(uuid != null, "UUID is required in format v%s", formatVersion);
- }
+ Preconditions.checkArgument(formatVersion == 1 || uuid != null,
+ "UUID is required in format v%s", formatVersion);
+ Preconditions.checkArgument(formatVersion > 1 || lastSequenceNumber == 0,
+ "Sequence number must be 0 in v1: %s", lastSequenceNumber);
this.formatVersion = formatVersion;
this.file = file;
this.uuid = uuid;
this.location = location;
+ this.lastSequenceNumber = lastSequenceNumber;
this.lastUpdatedMillis = lastUpdatedMillis;
this.lastColumnId = lastColumnId;
this.schema = schema;
@@ -216,7 +221,7 @@ public String toString() {
this.snapshotLog = snapshotLog;
this.previousFiles = previousFiles;
- this.snapshotsById = indexSnapshots(snapshots);
+ this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
this.specsById = indexSpecs(specs);
HistoryEntry last = null;
@@ -256,6 +261,14 @@ public String uuid() {
return uuid;
}
+ public long lastSequenceNumber() {
+ return lastSequenceNumber;
+ }
+
+ public long nextSequenceNumber() {
+ return formatVersion > 1 ? lastSequenceNumber + 1 : INITIAL_SEQUENCE_NUMBER;
+ }
+
public long lastUpdatedMillis() {
return lastUpdatedMillis;
}
@@ -337,7 +350,7 @@ public TableMetadata withUUID() {
return this;
} else {
return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
- lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties,
+ lastSequenceNumber, lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
}
@@ -348,8 +361,8 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
List updatedSpecs = Lists.transform(specs,
spec -> updateSpecSchema(newSchema, spec));
return new TableMetadata(null, formatVersion, uuid, location,
- System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, properties,
- currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ lastSequenceNumber, System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs,
+ properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
@@ -377,18 +390,23 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
}
return new TableMetadata(null, formatVersion, uuid, location,
- System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
+ lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
builder.build(), properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata addStagedSnapshot(Snapshot snapshot) {
+ ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
+ "Cannot add snapshot with sequence number %s older than last sequence number %s",
+ snapshot.sequenceNumber(), lastSequenceNumber);
+
List newSnapshots = ImmutableList.builder()
.addAll(snapshots)
.add(snapshot)
.build();
+
return new TableMetadata(null, formatVersion, uuid, location,
- snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -398,6 +416,10 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
return setCurrentSnapshotTo(snapshot);
}
+ ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
+ "Cannot add snapshot with sequence number %s older than last sequence number %s",
+ snapshot.sequenceNumber(), lastSequenceNumber);
+
List newSnapshots = ImmutableList.builder()
.addAll(snapshots)
.add(snapshot)
@@ -406,8 +428,9 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
+
return new TableMetadata(null, formatVersion, uuid, location,
- snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -438,7 +461,7 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) {
}
return new TableMetadata(null, formatVersion, uuid, location,
- System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog),
addPreviousFile(file, lastUpdatedMillis));
}
@@ -446,6 +469,9 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) {
private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
ValidationException.check(snapshotsById.containsKey(snapshot.snapshotId()),
"Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
+ ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() <= lastSequenceNumber,
+ "Last sequence number %s is less than existing snapshot sequence number %s",
+ lastSequenceNumber, snapshot.sequenceNumber());
if (currentSnapshotId == snapshot.snapshotId()) {
// change is a noop
@@ -459,14 +485,14 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
.build();
return new TableMetadata(null, formatVersion, uuid, location,
- nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
+ lastSequenceNumber, nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata replaceProperties(Map newProperties) {
ValidationException.check(newProperties != null, "Cannot set properties to null");
return new TableMetadata(null, formatVersion, uuid, location,
- System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
+ lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties));
}
@@ -482,8 +508,9 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) {
ValidationException.check(currentSnapshotId < 0 || // not set
Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
"Cannot set invalid snapshot log: latest entry is not the current snapshot");
+
return new TableMetadata(null, formatVersion, uuid, location,
- System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -522,14 +549,14 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
newProperties.putAll(updatedProperties);
return new TableMetadata(null, formatVersion, uuid, location,
- System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
+ lastSequenceNumber, System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
specId, builder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
}
public TableMetadata updateLocation(String newLocation) {
return new TableMetadata(null, formatVersion, uuid, newLocation,
- System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -545,7 +572,7 @@ public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
}
return new TableMetadata(null, newFormatVersion, uuid, location,
- System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -562,7 +589,7 @@ private List addPreviousFile(InputFile previousFile, long time
int maxSize = Math.max(1, PropertyUtil.propertyAsInt(updatedProperties,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
- List newMetadataLog = null;
+ List newMetadataLog;
if (previousFiles.size() >= maxSize) {
int removeIndex = previousFiles.size() - maxSize + 1;
newMetadataLog = Lists.newArrayList(previousFiles.subList(removeIndex, previousFiles.size()));
@@ -602,10 +629,13 @@ private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec
return specBuilder.build();
}
- private static Map indexSnapshots(List snapshots) {
+ private static Map indexAndValidateSnapshots(List snapshots, long lastSequenceNumber) {
ImmutableMap.Builder builder = ImmutableMap.builder();
- for (Snapshot version : snapshots) {
- builder.put(version.snapshotId(), version);
+ for (Snapshot snap : snapshots) {
+ ValidationException.check(snap.sequenceNumber() <= lastSequenceNumber,
+ "Invalid snapshot with sequence number %s greater than last sequence number %s",
+ snap.sequenceNumber(), lastSequenceNumber);
+ builder.put(snap.snapshotId(), snap);
}
return builder.build();
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 6fca73b2bb44..f746860057f4 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -86,6 +86,7 @@ private TableMetadataParser() {}
static final String FORMAT_VERSION = "format-version";
static final String TABLE_UUID = "table-uuid";
static final String LOCATION = "location";
+ static final String LAST_SEQUENCE_NUMBER = "last-sequence-number";
static final String LAST_UPDATED_MILLIS = "last-updated-ms";
static final String LAST_COLUMN_ID = "last-column-id";
static final String SCHEMA = "schema";
@@ -154,6 +155,9 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro
generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion());
generator.writeStringField(TABLE_UUID, metadata.uuid());
generator.writeStringField(LOCATION, metadata.location());
+ if (metadata.formatVersion() > 1) {
+ generator.writeNumberField(LAST_SEQUENCE_NUMBER, metadata.lastSequenceNumber());
+ }
generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
@@ -233,6 +237,12 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node);
String location = JsonUtil.getString(LOCATION, node);
+ long lastSequenceNumber;
+ if (formatVersion > 1) {
+ lastSequenceNumber = JsonUtil.getLong(LAST_SEQUENCE_NUMBER, node);
+ } else {
+ lastSequenceNumber = TableMetadata.INITIAL_SEQUENCE_NUMBER;
+ }
int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
Schema schema = SchemaParser.fromJson(node.get(SCHEMA));
@@ -298,7 +308,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
}
return new TableMetadata(file, formatVersion, uuid, location,
- lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
+ lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()),
ImmutableList.copyOf(metadataEntries.iterator()));
}
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 9d46584687dc..e70351fcb99d 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -63,7 +63,7 @@ public static Integer getIntOrNull(String property, JsonNode node) {
}
public static long getLong(String property, JsonNode node) {
- Preconditions.checkArgument(node.has(property), "Cannot parse missing int %s", property);
+ Preconditions.checkArgument(node.has(property), "Cannot parse missing long %s", property);
JsonNode pNode = node.get(property);
Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isNumber(),
"Cannot parse %s from non-numeric value: %s", property, pNode);
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
index 5e55d66ccd40..67c1c04fe089 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
@@ -67,6 +67,8 @@ public void testJsonConversionWithOperation() {
String json = SnapshotParser.toJson(expected);
Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
+ Assert.assertEquals("Sequence number should default to 0 for v1",
+ 0, snapshot.sequenceNumber());
Assert.assertEquals("Snapshot ID should match",
expected.snapshotId(), snapshot.snapshotId());
Assert.assertEquals("Timestamp should match",
@@ -100,7 +102,7 @@ public void testJsonConversionWithManifestList() throws IOException {
}
Snapshot expected = new BaseSnapshot(
- ops.io(), id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList));
+ ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, localInput(manifestList));
Snapshot inMemory = new BaseSnapshot(
ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests);
@@ -110,6 +112,8 @@ public void testJsonConversionWithManifestList() throws IOException {
String json = SnapshotParser.toJson(expected);
Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
+ Assert.assertEquals("Sequence number should default to 0",
+ expected.sequenceNumber(), snapshot.sequenceNumber());
Assert.assertEquals("Snapshot ID should match",
expected.snapshotId(), snapshot.snapshotId());
Assert.assertEquals("Timestamp should match",
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index a77530bba8ed..52e7f6882c52 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -66,6 +66,7 @@ public class TestTableMetadata {
Types.NestedField.required(3, "z", Types.LongType.get())
);
+ private static final long SEQ_NO = 34;
private static final int LAST_ASSIGNED_COLUMN_ID = 3;
private static final PartitionSpec SPEC_5 = PartitionSpec.builderFor(TEST_SCHEMA).withSpecId(5).build();
@@ -91,8 +92,8 @@ public void testJsonConversion() throws Exception {
.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
.build();
- TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
- System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+ TableMetadata expected = new TableMetadata(null, 2, UUID.randomUUID().toString(), TEST_LOCATION,
+ SEQ_NO, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of());
@@ -106,6 +107,8 @@ public void testJsonConversion() throws Exception {
expected.uuid(), metadata.uuid());
Assert.assertEquals("Table location should match",
expected.location(), metadata.location());
+ Assert.assertEquals("Last sequence number should match",
+ expected.lastSequenceNumber(), metadata.lastSequenceNumber());
Assert.assertEquals("Last column ID should match",
expected.lastColumnId(), metadata.lastColumnId());
Assert.assertEquals("Schema should match",
@@ -147,7 +150,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception {
List reversedSnapshotLog = Lists.newArrayList();
TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
- System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+ 0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.of());
@@ -184,7 +187,7 @@ public void testBackwardCompat() throws Exception {
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION,
- System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec),
+ 0, System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of());
@@ -197,6 +200,8 @@ public void testBackwardCompat() throws Exception {
Assert.assertNull("Table UUID should not be assigned", metadata.uuid());
Assert.assertEquals("Table location should match",
expected.location(), metadata.location());
+ Assert.assertEquals("Last sequence number should default to 0",
+ expected.lastSequenceNumber(), metadata.lastSequenceNumber());
Assert.assertEquals("Last column ID should match",
expected.lastColumnId(), metadata.lastColumnId());
Assert.assertEquals("Schema should match",
@@ -292,7 +297,7 @@ public void testJsonWithPreviousMetadataLog() throws Exception {
"/tmp/000001-" + UUID.randomUUID().toString() + ".metadata.json"));
TableMetadata base = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
- System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+ 0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -327,7 +332,7 @@ public void testAddPreviousMetadataRemoveNone() {
"/tmp/000003-" + UUID.randomUUID().toString() + ".metadata.json");
TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
- TEST_LOCATION, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
+ TEST_LOCATION, 0, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -372,7 +377,7 @@ public void testAddPreviousMetadataRemoveOne() {
"/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
- TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 5,
+ TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 5,
ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -422,7 +427,7 @@ public void testAddPreviousMetadataRemoveMultiple() {
"/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
- TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 2,
+ TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 2,
ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -447,9 +452,9 @@ public void testAddPreviousMetadataRemoveMultiple() {
public void testV2UUIDValidation() {
AssertHelpers.assertThrows("Should reject v2 metadata without a UUID",
IllegalArgumentException.class, "UUID is required in format v2",
- () -> new TableMetadata(null, 2, null, TEST_LOCATION, System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID,
- TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L, ImmutableList.of(),
- ImmutableList.of(), ImmutableList.of())
+ () -> new TableMetadata(null, 2, null, TEST_LOCATION, SEQ_NO, System.currentTimeMillis(),
+ LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L,
+ ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
);
}
@@ -458,9 +463,9 @@ public void testVersionValidation() {
int unsupportedVersion = TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1;
AssertHelpers.assertThrows("Should reject unsupported metadata",
IllegalArgumentException.class, "Unsupported format version: v" + unsupportedVersion,
- () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, System.currentTimeMillis(),
- LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L,
- ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+ () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, SEQ_NO,
+ System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5),
+ ImmutableMap.of(), -1L, ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
);
}
@@ -493,6 +498,9 @@ public static String toJsonWithVersion(TableMetadata metadata, int version) {
generator.writeStringField(TABLE_UUID, metadata.uuid());
generator.writeStringField(LOCATION, metadata.location());
generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
+ if (version > 1) {
+ generator.writeNumberField(TableMetadataParser.LAST_SEQUENCE_NUMBER, metadata.lastSequenceNumber());
+ }
generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
generator.writeFieldName(SCHEMA);
diff --git a/site/docs/spec.md b/site/docs/spec.md
index 332e247e027a..f05f2ee339af 100644
--- a/site/docs/spec.md
+++ b/site/docs/spec.md
@@ -260,6 +260,7 @@ A snapshot consists of the following fields:
* **`snapshot-id`** -- A unique long ID.
* **`parent-snapshot-id`** -- (Optional) The snapshot ID of the snapshot’s parent. This field is not present for snapshots that have no parent snapshot, such as snapshots created before this field was added or the first snapshot of a table.
+* **`sequence-number`** -- A monotonically increasing long that tracks the order of snapshots in a table. (**v2 only**)
* **`timestamp-ms`** -- A timestamp when the snapshot was created. This is used when garbage collecting snapshots.
* **`manifests`** -- A list of manifest file locations. The data files in a snapshot are the union of all data files listed in these manifests. (Deprecated in favor of `manifest-list`)
* **`manifest-list`** -- (Optional) The location of a manifest list file for this snapshot, which contains a list of manifest files with additional metadata. If present, the manifests field must be omitted.
@@ -369,21 +370,22 @@ When two commits happen at the same time and are based on the same version, only
Table metadata consists of the following fields:
-| Field | Description |
-| ----- | ----------- |
-| **`format-version`** | An integer version number for the format. Currently, this is always 1. Implementations must throw an exception if a table's version is higher than the supported version. |
-| **`table-uuid`** | A UUID that identifies the table, generated when the table is created. Implementations must throw an exception if a table's UUID does not match the expected UUID after refreshing metadata. |
-| **`location`**| The table’s base location. This is used by writers to determine where to store data files, manifest files, and table metadata files. |
-| **`last-updated-ms`**| Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing. |
-| **`last-column-id`**| An integer; the highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. |
-| **`schema`**| The table’s current schema. |
-| **`partition-spec`**| The table’s current partition spec, stored as only fields. Note that this is used by writers to partition data, but is not used when reading because reads use the specs stored in manifest files. (**Deprecated**: use `partition-specs` and `default-spec-id`instead ) |
-| **`partition-specs`**| A list of partition specs, stored as full partition spec objects. |
-| **`default-spec-id`**| ID of the “current” spec that writers should use by default. |
-| **`properties`**| A string to string map of table properties. This is used to control settings that affect reading and writing and is not intended to be used for arbitrary metadata. For example, `commit.retry.num-retries` is used to control the number of commit retries. |
-| **`current-snapshot-id`**| `long` ID of the current table snapshot. |
-| **`snapshots`**| A list of valid snapshots. Valid snapshots are snapshots for which all data files exist in the file system. A data file must not be deleted from the file system until the last snapshot in which it was listed is garbage collected. |
-| **`snapshot-log`**| A list (optional) of timestamp and snapshot ID pairs that encodes changes to the current snapshot for the table. Each time the current-snapshot-id is changed, a new entry should be added with the last-updated-ms and the new current-snapshot-id. When snapshots are expired from the list of valid snapshots, all entries before a snapshot that has expired should be removed. |
+| Format v1 | Format v2 | Field | Description |
+| ---------- | ---------- | ----- | ----------- |
+| _required_ | _required_ | **`format-version`** | An integer version number for the format. Currently, this is always 1. Implementations must throw an exception if a table's version is higher than the supported version. |
+| _optional_ | _required_ | **`table-uuid`** | A UUID that identifies the table, generated when the table is created. Implementations must throw an exception if a table's UUID does not match the expected UUID after refreshing metadata. |
+| _required_ | _required_ | **`location`**| The table's base location. This is used by writers to determine where to store data files, manifest files, and table metadata files. |
+| _omitted_ | _required_ | **`sequence-number`**| The table's highest assigned sequence number, a monotonically increasing long that tracks the order of snapshots in a table. |
+| _required_ | _required_ | **`last-updated-ms`**| Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing. |
+| _required_ | _required_ | **`last-column-id`**| An integer; the highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. |
+| _required_ | _required_ | **`schema`**| The table’s current schema. |
+| _required_ | _omitted_ | **`partition-spec`**| The table’s current partition spec, stored as only fields. Note that this is used by writers to partition data, but is not used when reading because reads use the specs stored in manifest files. (**Deprecated**: use `partition-specs` and `default-spec-id`instead ) |
+| _optional_ | _required_ | **`partition-specs`**| A list of partition specs, stored as full partition spec objects. |
+| _optional_ | _required_ | **`default-spec-id`**| ID of the “current” spec that writers should use by default. |
+| _optional_ | _optional_ | **`properties`**| A string to string map of table properties. This is used to control settings that affect reading and writing and is not intended to be used for arbitrary metadata. For example, `commit.retry.num-retries` is used to control the number of commit retries. |
+| _optional_ | _optional_ | **`current-snapshot-id`**| `long` ID of the current table snapshot. |
+| _optional_ | _optional_ | **`snapshots`**| A list of valid snapshots. Valid snapshots are snapshots for which all data files exist in the file system. A data file must not be deleted from the file system until the last snapshot in which it was listed is garbage collected. |
+| _optional_ | _optional_ | **`snapshot-log`**| A list (optional) of timestamp and snapshot ID pairs that encodes changes to the current snapshot for the table. Each time the current-snapshot-id is changed, a new entry should be added with the last-updated-ms and the new current-snapshot-id. When snapshots are expired from the list of valid snapshots, all entries before a snapshot that has expired should be removed. |
For serialization details, see Appendix C.
@@ -681,13 +683,22 @@ This serialization scheme is for storing single values as individual binary valu
### Version 2
-Writing metadata:
-* Table metadata field `sequence-number` is required.
-* Table metadata field `table-uuid` is required.
-* Table metadata field `partition-specs` is required.
-* Table metadata field `default-spec-id` is required.
+Writing v1 metadata:
+* Table metadata field `last-sequence-number` should not be written.
+* Snapshot field `sequence-number` should not be written.
+
+Reading v1 metadata:
+* Table metadata field `last-sequence-number` must default to 0.
+* Snapshot field `sequence-number` must default to 0.
+
+Writing v2 metadata:
+* Table metadata added required field `last-sequence-number`.
+* Table metadata now requires field `table-uuid`.
+* Table metadata now requires field `partition-specs`.
+* Table metadata now requires field `default-spec-id`.
* Table metadata field `partition-spec` is no longer required and may be omitted.
-* Snapshot field `manifest-list` is required.
-* Snapshot field `manifests` is not allowed.
+* Snapshot added required field field `sequence-number`.
+* Snapshot now requires field `manifest-list`.
+* Snapshot field `manifests` is no longer allowed.
Note that these requirements apply when writing data to a v2 table. Tables that are upgraded from v1 may contain metadata that does not follow these requirements. Implementations should remain backward-compatible with v1 metadata requirements.
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
index 5c6640efe08d..23d6dcb42d16 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
@@ -213,6 +213,11 @@ private static class FakeSnapshot implements Snapshot {
this.manifest = manifest;
}
+ @Override
+ public long sequenceNumber() {
+ return 0;
+ }
+
@Override
public long snapshotId() {
return 1;