Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
* Snapshots are created by table operations, like {@link AppendFiles} and {@link RewriteFiles}.
*/
public interface Snapshot {
/**
* Return this snapshot's sequence number.
* <p>
* Sequence numbers are assigned when a snapshot is committed.
*
* @return a long sequence number
*/
long sequenceNumber();

/**
* Return this snapshot's ID.
*
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import org.apache.iceberg.io.InputFile;

class BaseSnapshot implements Snapshot {
private static final long INITIAL_SEQUENCE_NUMBER = 0;

private final FileIO io;
private final long snapshotId;
private final Long parentId;
private final long sequenceNumber;
private final long timestampMillis;
private final InputFile manifestList;
private final String operation;
Expand All @@ -60,13 +63,15 @@ class BaseSnapshot implements Snapshot {
}

BaseSnapshot(FileIO io,
long sequenceNumber,
long snapshotId,
Long parentId,
long timestampMillis,
String operation,
Map<String, String> summary,
InputFile manifestList) {
this.io = io;
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
this.timestampMillis = timestampMillis;
Expand All @@ -82,10 +87,15 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
List<ManifestFile> manifests) {
this(io, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
this.manifests = manifests;
}

@Override
public long sequenceNumber() {
return sequenceNumber;
}

@Override
public long snapshotId() {
return snapshotId;
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Iterator;
Expand All @@ -31,7 +32,15 @@
abstract class ManifestListWriter implements FileAppender<ManifestFile> {
static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
long snapshotId, Long parentSnapshotId) {
Preconditions.checkArgument(formatVersion == 1, "Sequence number is required for format v%s", formatVersion);
return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
}

static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
long snapshotId, Long parentSnapshotId, long sequenceNumber) {
if (formatVersion == 1) {
Preconditions.checkArgument(sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER,
"Invalid sequence number for v1 manifest list: %s", sequenceNumber);
return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
}
throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion);
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SnapshotParser {

private SnapshotParser() {}

private static final String SEQUENCE_NUMBER = "sequence-number";
private static final String SNAPSHOT_ID = "snapshot-id";
private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
private static final String TIMESTAMP_MS = "timestamp-ms";
Expand All @@ -48,6 +49,9 @@ private SnapshotParser() {}
static void toJson(Snapshot snapshot, JsonGenerator generator)
throws IOException {
generator.writeStartObject();
if (snapshot.sequenceNumber() > TableMetadata.INITIAL_SEQUENCE_NUMBER) {
generator.writeNumberField(SEQUENCE_NUMBER, snapshot.sequenceNumber());
}
generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
if (snapshot.parentId() != null) {
generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
Expand Down Expand Up @@ -103,7 +107,11 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
Preconditions.checkArgument(node.isObject(),
"Cannot parse table version from a non-object: %s", node);

long versionId = JsonUtil.getLong(SNAPSHOT_ID, node);
long sequenceNumber = TableMetadata.INITIAL_SEQUENCE_NUMBER;
if (node.has(SEQUENCE_NUMBER)) {
sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, node);
}
long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
Long parentId = null;
if (node.has(PARENT_SNAPSHOT_ID)) {
parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
Expand Down Expand Up @@ -134,15 +142,15 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
return new BaseSnapshot(
io, versionId, parentId, timestamp, operation, summary,
io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary,
io.newInputFile(manifestList));

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
// loaded lazily, if it is needed
List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
location -> new GenericManifestFile(io.newInputFile(location), 0));
return new BaseSnapshot(io, versionId, parentId, timestamp, operation, summary, manifests);
return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests);
}
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void accept(String file) {
private final AtomicInteger attempt = new AtomicInteger(0);
private final List<String> manifestLists = Lists.newArrayList();
private volatile Long snapshotId = null;
private TableMetadata base = null;
private TableMetadata base;
private boolean stageOnly = false;
private Consumer<String> deleteFunc = defaultDelete;

Expand Down Expand Up @@ -143,14 +143,15 @@ public Snapshot apply() {
this.base = refresh();
Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();

List<ManifestFile> manifests = apply(base);

if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
OutputFile manifestList = manifestListPath();

try (ManifestListWriter writer = ManifestListWriter.write(
ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId)) {
ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) {

// keep track of the manifest lists created
manifestLists.add(manifestList.location());
Expand All @@ -170,7 +171,7 @@ public Snapshot apply() {
}

return new BaseSnapshot(ops.io(),
snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
ops.io().newInputFile(manifestList.location()));

} else {
Expand Down
Loading