diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 456082ebcb56..ee380a399e21 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -59,8 +59,9 @@ static StructType getType(StructType partitionType) { optional(128, "upper_bounds", MapType.ofRequired(129, 130, IntegerType.get(), BinaryType.get())), optional(131, "key_metadata", BinaryType.get()), - optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())) - // NEXT ID TO ASSIGN: 134 + optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())), + optional(134, "sequence_number", LongType.get()) + // NEXT ID TO ASSIGN: 135 ); } @@ -152,4 +153,11 @@ static StructType getType(StructType partitionType) { * are determined by these offsets. The returned list must be sorted in ascending order. */ List splitOffsets(); + + /** + * @return The sequence number to identify the order in which data files and deletion files are to be processed. + * If the sequence number is not specified it is inherited from the manifest file struct in the manifest list file. + */ + long sequenceNumber(); + } diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 176bbbd3293c..8443e0af40b9 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -45,7 +45,9 @@ public interface ManifestFile { ))), optional(512, "added_rows_count", Types.LongType.get()), optional(513, "existing_rows_count", Types.LongType.get()), - optional(514, "deleted_rows_count", Types.LongType.get())); + optional(514, "deleted_rows_count", Types.LongType.get()), + optional(515, "sequence_number", Types.LongType.get()) + ); static Schema schema() { return SCHEMA; @@ -128,6 +130,13 @@ default boolean hasDeletedFiles() { */ Long deletedRowsCount(); + /** + * @return the sequence number of this manifest. The sequence number of manifest stores in manifest list file. Since + * The data files' sequence number is optional, it should inherit the manifest's sequence number if the reader reads + * null from manifest file. + */ + Long sequenceNumber(); + /** * Returns a list of {@link PartitionFieldSummary partition field summaries}. *

diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index 25822c967852..bc3e7cf629d9 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -104,4 +104,11 @@ public interface Snapshot { * @return the location of the manifest list for this Snapshot */ String manifestListLocation(); + + /** + * Return this snapshot's sequence number, or 0 if the table has no snapshot yet. + * + * @return the sequence number of this Snapshot + */ + Long sequenceNumber(); } diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 5f5018110015..7757bad9f4ce 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -255,6 +255,11 @@ public Long deletedRowsCount() { return deletedRows; } + @Override + public Long sequenceNumber() { + return null; + } + @Override public List partitions() { return partitions; @@ -372,5 +377,10 @@ public DataFile copyWithoutStats() { public List splitOffsets() { return null; } + + @Override + public long sequenceNumber() { + return 0; + } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 5a9469ca1013..33e40bd78243 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -143,6 +143,7 @@ public RewriteManifests addManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.snapshotId() == null || manifest.snapshotId() == -1, "Snapshot id must be assigned during commit"); + Preconditions.checkArgument(manifest.sequenceNumber() == null, "Sequence number must be assigned during commit"); if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { addedManifests.add(manifest); @@ -179,10 +180,11 @@ public List apply(TableMetadata base) { validateFilesCounts(); - // TODO: add sequence numbers here Iterable newManifestsWithMetadata = Iterables.transform( Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests), - manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()) + .withSequenceNumber(sequenceNumber()) + .build()); // put new manifests at the beginning List apply = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 6f28961fecc6..74d0c3f6d4cb 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -42,6 +42,7 @@ class BaseSnapshot implements Snapshot { private final InputFile manifestList; private final String operation; private final Map summary; + private Long sequenceNumber; // lazily initialized private List manifests = null; @@ -75,6 +76,31 @@ class BaseSnapshot implements Snapshot { this.manifestList = manifestList; } + BaseSnapshot(FileIO io, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + InputFile manifestList, + Long sequenceNumber) { + this(io, snapshotId, parentId, timestampMillis, operation, summary, manifestList); + this.sequenceNumber = sequenceNumber; + } + + BaseSnapshot(FileIO io, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + List manifests, + Long sequenceNumber) { + this(io, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null); + this.manifests = manifests; + this.sequenceNumber = sequenceNumber; + } + BaseSnapshot(FileIO io, long snapshotId, Long parentId, @@ -186,6 +212,15 @@ private void cacheChanges() { this.cachedDeletes = deletes.build(); } + @Override + public Long sequenceNumber() { + if (sequenceNumber == null) { + return 0L; + } + + return sequenceNumber; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -194,6 +229,7 @@ public String toString() { .add("operation", operation) .add("summary", summary) .add("manifests", manifests()) + .add("sequence_number", sequenceNumber()) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 246e10fa38c1..fef6bdd0aac5 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -95,6 +95,7 @@ public FastAppend appendManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.snapshotId() == null || manifest.snapshotId() == -1, "Snapshot id must be assigned during commit"); + Preconditions.checkArgument(manifest.sequenceNumber() == null, "Sequence number must be assigned during commit"); if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { summaryBuilder.addedManifest(manifest); @@ -130,17 +131,25 @@ public List apply(TableMetadata base) { throw new RuntimeIOException(e, "Failed to write manifest"); } - // TODO: add sequence numbers here + Iterable newManifestsWithMetadata = Iterables.transform(newManifests, + manifest -> GenericManifestFile.copyOf(manifest).withSequenceNumber(sequenceNumber()).build()); + Iterable appendManifestsWithMetadata = Iterables.transform( Iterables.concat(appendManifests, rewrittenAppendManifests), - manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); - Iterables.addAll(newManifests, appendManifestsWithMetadata); + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()) + .withSequenceNumber(sequenceNumber()) + .build()); + + List manifestsWithMetadata = Lists.newArrayList(); + Iterables.addAll(manifestsWithMetadata, newManifestsWithMetadata); + Iterables.addAll(manifestsWithMetadata, appendManifestsWithMetadata); + if (base.currentSnapshot() != null) { - newManifests.addAll(base.currentSnapshot().manifests()); + manifestsWithMetadata.addAll(base.currentSnapshot().manifests()); } - return newManifests; + return manifestsWithMetadata; } @Override diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 03d1015fb921..fe4c7e0cc347 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -65,6 +65,7 @@ public PartitionData copy() { private Map upperBounds = null; private List splitOffsets = null; private byte[] keyMetadata = null; + private Long sequenceNumber = null; // cached schema private transient org.apache.avro.Schema avroSchema = null; @@ -175,6 +176,7 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { this.fileSizeInBytes = toCopy.fileSizeInBytes; this.fileOrdinal = toCopy.fileOrdinal; this.sortColumns = copy(toCopy.sortColumns); + this.sequenceNumber = toCopy.sequenceNumber; if (fullCopy) { // TODO: support lazy conversion to/from map this.columnSizes = copy(toCopy.columnSizes); @@ -270,6 +272,11 @@ public List splitOffsets() { return splitOffsets; } + @Override + public long sequenceNumber() { + return sequenceNumber == null ? 0 : sequenceNumber; + } + @Override public org.apache.avro.Schema getSchema() { if (avroSchema == null) { @@ -332,6 +339,9 @@ public void put(int i, Object v) { case 14: this.splitOffsets = (List) v; return; + case 15: + this.sequenceNumber = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -382,6 +392,8 @@ public Object get(int i) { return keyMetadata(); case 14: return splitOffsets; + case 15: + return sequenceNumber; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -401,7 +413,7 @@ private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionTy @Override public int size() { - return 15; + return 16; } @Override @@ -410,6 +422,7 @@ public String toString() { .add("file_path", filePath) .add("file_format", format) .add("partition", partitionData) + .add("sequence_number", sequenceNumber()) .add("record_count", recordCount) .add("file_size_in_bytes", fileSizeInBytes) .add("column_sizes", columnSizes) diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 6b6010b19b7f..15abc2beed65 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -54,6 +54,7 @@ public class GenericManifestFile private Integer deletedFilesCount = null; private Long deletedRowsCount = null; private List partitions = null; + private Long sequenceNumber = null; /** * Used by Avro reflection to instantiate this class when reading manifest files. @@ -156,6 +157,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { this.deletedRowsCount = toCopy.deletedRowsCount; this.partitions = copyList(toCopy.partitions, PartitionFieldSummary::copy); this.fromProjectionPos = toCopy.fromProjectionPos; + this.sequenceNumber = toCopy.sequenceNumber; } /** @@ -227,6 +229,11 @@ public Long deletedRowsCount() { return deletedRowsCount; } + @Override + public Long sequenceNumber() { + return sequenceNumber; + } + @Override public List partitions() { return partitions; @@ -272,6 +279,8 @@ public Object get(int i) { return existingRowsCount; case 10: return deletedRowsCount; + case 11: + return sequenceNumber; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -320,6 +329,9 @@ public void set(int i, T value) { case 10: this.deletedRowsCount = (Long) value; return; + case 11: + this.sequenceNumber = (Long) value; + return; default: // ignore the object, it must be from a newer version of the format } @@ -361,6 +373,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("path", manifestPath) .add("length", length) + .add("sequence_number", sequenceNumber()) .add("partition_spec_id", specId) .add("added_snapshot_id", snapshotId) .add("added_data_files_count", addedFilesCount) @@ -397,6 +410,11 @@ public CopyBuilder withSnapshotId(Long newSnapshotId) { return this; } + public CopyBuilder withSequenceNumber(Long newSequenceNumber) { + manifestFile.sequenceNumber = newSequenceNumber; + return this; + } + public ManifestFile build() { return manifestFile; } diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java index 384ea64baa4d..57e0e26b9d3d 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -30,15 +30,17 @@ static InheritableMetadata empty() { } static InheritableMetadata fromManifest(ManifestFile manifest) { - return new BaseInheritableMetadata(manifest.snapshotId()); + return new BaseInheritableMetadata(manifest.snapshotId(), manifest.sequenceNumber()); } static class BaseInheritableMetadata implements InheritableMetadata { private final Long snapshotId; + private final Long sequenceNumber; - private BaseInheritableMetadata(Long snapshotId) { + private BaseInheritableMetadata(Long snapshotId, Long sequenceNumber) { this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; } @Override @@ -46,6 +48,9 @@ public ManifestEntry apply(ManifestEntry manifestEntry) { if (manifestEntry.snapshotId() == null) { manifestEntry.setSnapshotId(snapshotId); } + if (manifestEntry.sequenceNumber() == null && this.sequenceNumber != null) { + manifestEntry.setSequenceNumber(sequenceNumber); + } return manifestEntry; } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index 728d923830b8..add89a823806 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -52,6 +52,7 @@ public int id() { private Status status = Status.EXISTING; private Long snapshotId = null; private DataFile file = null; + private Long sequenceNumber = null; ManifestEntry(org.apache.avro.Schema schema) { this.schema = schema; @@ -65,6 +66,7 @@ private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) { this.schema = toCopy.schema; this.status = toCopy.status; this.snapshotId = toCopy.snapshotId; + this.sequenceNumber = toCopy.sequenceNumber; if (fullCopy) { this.file = toCopy.file().copy(); } else { @@ -72,23 +74,26 @@ private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) { } } - ManifestEntry wrapExisting(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { this.status = Status.EXISTING; this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; this.file = newFile; return this; } - ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapAppend(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { this.status = Status.ADDED; this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; this.file = newFile; return this; } - ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapDelete(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { this.status = Status.DELETED; this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; this.file = newFile; return this; } @@ -107,6 +112,13 @@ public Long snapshotId() { return snapshotId; } + /** + * @return sequence number of the snapshot in which the file was added to the table + */ + public Long sequenceNumber() { + return sequenceNumber; + } + /** * @return a file */ @@ -126,6 +138,10 @@ public void setSnapshotId(Long snapshotId) { this.snapshotId = snapshotId; } + public void setSequenceNumber(Long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + @Override public void put(int i, Object v) { switch (i) { @@ -138,6 +154,9 @@ public void put(int i, Object v) { case 2: this.file = (DataFile) v; return; + case 3: + this.sequenceNumber = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -152,6 +171,8 @@ public Object get(int i) { return snapshotId; case 2: return file; + case 3: + return sequenceNumber; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -176,7 +197,8 @@ static Schema wrapFileSchema(StructType fileStruct) { return new Schema( required(0, "status", IntegerType.get()), optional(1, "snapshot_id", LongType.get()), - required(2, "data_file", fileStruct)); + required(2, "data_file", fileStruct), + optional(3, "sequence_number", LongType.get())); } @Override @@ -184,6 +206,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("status", status) .add("snapshot_id", snapshotId) + .add("sequence_number", sequenceNumber) .add("file", file) .toString(); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index 0271695d32b9..9cce6081a16e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -31,10 +31,13 @@ class ManifestListWriter implements FileAppender { private final FileAppender writer; - ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) { + ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, Long sequenceNumber) { this.writer = newAppender(snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), - "parent-snapshot-id", String.valueOf(parentSnapshotId))); + "parent-snapshot-id", String.valueOf(parentSnapshotId), + "sequence-number", String.valueOf(sequenceNumber) + ) + ); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index c99c491ee39b..ce5b3bac828c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -110,12 +110,18 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private long existingRows = 0L; private int deletedFiles = 0; private long deletedRows = 0L; + private Long sequenceNumber = null; ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + this(spec, file, snapshotId, null); + } + + ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId, Long sequenceNumber) { this.file = file; this.specId = spec.specId(); this.writer = newAppender(FileFormat.AVRO, spec, file); this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; this.reused = new ManifestEntry(spec.partitionType()); this.stats = new PartitionSummary(spec); } @@ -150,11 +156,11 @@ void addEntry(ManifestEntry entry) { public void add(DataFile addedFile) { // TODO: this assumes that file is a GenericDataFile that can be written directly to Avro // Eventually, this should check in case there are other DataFile implementations. - addEntry(reused.wrapAppend(snapshotId, addedFile)); + addEntry(reused.wrapAppend(snapshotId, sequenceNumber, addedFile)); } public void add(ManifestEntry entry) { - addEntry(reused.wrapAppend(snapshotId, entry.file())); + addEntry(reused.wrapAppend(snapshotId, sequenceNumber, entry.file())); } /** @@ -163,12 +169,12 @@ public void add(ManifestEntry entry) { * @param existingFile a data file * @param fileSnapshotId snapshot ID when the data file was added to the table */ - public void existing(DataFile existingFile, long fileSnapshotId) { - addEntry(reused.wrapExisting(fileSnapshotId, existingFile)); + public void existing(DataFile existingFile, long fileSnapshotId, long fileSequenceNumber) { + addEntry(reused.wrapExisting(fileSnapshotId, fileSequenceNumber, existingFile)); } void existing(ManifestEntry entry) { - addEntry(reused.wrapExisting(entry.snapshotId(), entry.file())); + addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file())); } /** @@ -179,13 +185,13 @@ void existing(ManifestEntry entry) { * @param deletedFile a data file */ public void delete(DataFile deletedFile) { - addEntry(reused.wrapDelete(snapshotId, deletedFile)); + addEntry(reused.wrapDelete(snapshotId, sequenceNumber, deletedFile)); } void delete(ManifestEntry entry) { // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk // when this Snapshot has been removed or when there are no Snapshots older than this one. - addEntry(reused.wrapDelete(snapshotId, entry.file())); + addEntry(reused.wrapDelete(snapshotId, sequenceNumber, entry.file())); } @Override diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java b/core/src/main/java/org/apache/iceberg/MergeAppend.java index 063315f68b8c..1eae9efc57a8 100644 --- a/core/src/main/java/org/apache/iceberg/MergeAppend.java +++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java @@ -55,6 +55,7 @@ public AppendFiles appendManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.snapshotId() == null || manifest.snapshotId() == -1, "Snapshot id must be assigned during commit"); + Preconditions.checkArgument(manifest.sequenceNumber() == null, "Sequence number must be assigned during commit"); add(manifest); return this; } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e08ae494787a..6202ab955edd 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -215,7 +215,7 @@ protected void add(ManifestFile manifest) { appendManifests.add(manifest); appendedManifest = manifest; } else { - // the manifest must be rewritten with this update's snapshot ID + // the manifest must be rewritten with this update's snapshot ID and sequence number ManifestFile copiedManifest = copyManifest(manifest); rewrittenAppendManifests.add(copiedManifest); appendedManifest = copiedManifest; @@ -274,10 +274,11 @@ public List apply(TableMetadata base) { newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests); } - // TODO: add sequence numbers here Iterable newManifestsWithMetadata = Iterables.transform( newManifests, - manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()) + .withSequenceNumber(sequenceNumber()) + .build()); // filter any existing manifests List filtered; @@ -667,11 +668,20 @@ private ManifestFile createManifest(int specId, List bin) throws I // should be added to the new manifest if (entry.snapshotId() == snapshotId()) { writer.addEntry(entry); + } else { + // since the original manifest will be cleanup, it needs to save the sequence number to the entry. + if (entry.sequenceNumber() == null) { + entry.setSequenceNumber(manifest.sequenceNumber()); + } } } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) { // adds from this snapshot are still adds, otherwise they should be existing writer.addEntry(entry); } else { + // since the original manifest will be cleanup, it needs to save the sequence number to the entry. + if (entry.sequenceNumber() == null) { + entry.setSequenceNumber(manifest.sequenceNumber()); + } // add all files from the old manifest as existing files writer.existing(entry); } @@ -682,7 +692,8 @@ private ManifestFile createManifest(int specId, List bin) throws I writer.close(); } - ManifestFile manifest = writer.toManifestFile(); + ManifestFile manifest = GenericManifestFile.copyOf(writer.toManifestFile()) + .withSequenceNumber(sequenceNumber()).build(); // update the cache mergeManifests.put(bin, manifest); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index 17b8083cdef8..f52b4cbf888e 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -44,6 +44,7 @@ private SnapshotParser() {} private static final String OPERATION = "operation"; private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; + private static final String SEQUENCE_NUMBER = "sequence-number"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { @@ -54,6 +55,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) } generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis()); + if (snapshot.sequenceNumber() != null) { + generator.writeNumberField(SEQUENCE_NUMBER, snapshot.sequenceNumber()); + } + // if there is an operation, write the summary map if (snapshot.operation() != null) { generator.writeObjectFieldStart(SUMMARY); @@ -109,6 +114,7 @@ static Snapshot fromJson(FileIO io, JsonNode node) { parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node); } long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node); + Long sequenceNumber = JsonUtil.getLongOrNull(SEQUENCE_NUMBER, node); Map summary = null; String operation = null; @@ -135,14 +141,14 @@ static Snapshot fromJson(FileIO io, JsonNode node) { String manifestList = JsonUtil.getString(MANIFEST_LIST, node); return new BaseSnapshot( io, versionId, parentId, timestamp, operation, summary, - io.newInputFile(manifestList)); + io.newInputFile(manifestList), sequenceNumber); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be // loaded lazily, if it is needed List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node), location -> new GenericManifestFile(io.newInputFile(location), 0)); - return new BaseSnapshot(io, versionId, parentId, timestamp, operation, summary, manifests); + return new BaseSnapshot(io, versionId, parentId, timestamp, operation, summary, manifests, sequenceNumber); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 4cf46007873f..58654b5a7798 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -78,6 +78,7 @@ public void accept(String file) { private final AtomicInteger attempt = new AtomicInteger(0); private final List manifestLists = Lists.newArrayList(); private volatile Long snapshotId = null; + private volatile Long sequenceNumber = null; private TableMetadata base = null; private boolean stageOnly = false; private Consumer deleteFunc = defaultDelete; @@ -144,12 +145,13 @@ public Snapshot apply() { base.currentSnapshot().snapshotId() : null; List manifests = apply(base); + long newSequenceNumber = sequenceNumber(); if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { OutputFile manifestList = manifestListPath(); try (ManifestListWriter writer = new ManifestListWriter( - manifestList, snapshotId(), parentSnapshotId)) { + manifestList, snapshotId(), parentSnapshotId, newSequenceNumber)) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -170,12 +172,12 @@ manifestList, snapshotId(), parentSnapshotId)) { return new BaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - ops.io().newInputFile(manifestList.location())); + ops.io().newInputFile(manifestList.location()), newSequenceNumber); } else { return new BaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - manifests); + manifests, newSequenceNumber); } } @@ -259,6 +261,9 @@ public void commit() { return; } + // reset sequence number to null so that the sequence number can be updated when retry + this.sequenceNumber = null; + // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries // to ensure that if a concurrent operation assigns the UUID, this operation will not fail. taskOps.commit(base, updated.withUUID()); @@ -326,6 +331,17 @@ protected long snapshotId() { return snapshotId; } + protected long sequenceNumber() { + if (sequenceNumber == null) { + synchronized (this) { + if (sequenceNumber == null) { + this.sequenceNumber = ops.newSequenceNumber(); + } + } + } + return sequenceNumber; + } + private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index e44e002bdab4..5b2ba8d5cd10 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -43,7 +43,7 @@ * Metadata for a table. */ public class TableMetadata { - static final int TABLE_FORMAT_VERSION = 1; + static final int TABLE_FORMAT_VERSION = 2; static final int INITIAL_SPEC_ID = 0; public static TableMetadata newTableMetadata(Schema schema, diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java b/core/src/main/java/org/apache/iceberg/TableOperations.java index 35ea6b3c5690..2e1fa3724f81 100644 --- a/core/src/main/java/org/apache/iceberg/TableOperations.java +++ b/core/src/main/java/org/apache/iceberg/TableOperations.java @@ -114,4 +114,17 @@ default long newSnapshotId() { return Math.abs(mostSignificantBits ^ leastSignificantBits); } + /** + * Create a new sequence number for a snapshot + * + * @return a long sequence number + */ + default long newSequenceNumber() { + if (current().currentSnapshot() == null || current().currentSnapshot().sequenceNumber() == null) { + return 1L; + } else { + return current().currentSnapshot().sequenceNumber() + 1; + } + } + } diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 9d46584687dc..e78a8f802d45 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -70,6 +70,16 @@ public static long getLong(String property, JsonNode node) { return pNode.asLong(); } + public static Long getLongOrNull(String property, JsonNode node) { + if (!node.has(property)) { + return null; + } + JsonNode pNode = node.get(property); + Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isNumber() && pNode.canConvertToLong(), + "Cannot parse %s from non-numeric value: %s", property, pNode); + return pNode.asLong(); + } + public static boolean getBool(String property, JsonNode node) { Preconditions.checkArgument(node.has(property), "Cannot parse missing boolean %s", property); JsonNode pNode = node.get(property); diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 012d177d90b6..2bdfd9921fe0 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -180,11 +180,11 @@ ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFi ManifestEntry entry = new ManifestEntry(table.spec().partitionType()); switch (status) { case ADDED: - return entry.wrapAppend(snapshotId, file); + return entry.wrapAppend(snapshotId, null, file); case EXISTING: - return entry.wrapExisting(snapshotId, file); + return entry.wrapExisting(snapshotId, null, file); case DELETED: - return entry.wrapDelete(snapshotId, file); + return entry.wrapDelete(snapshotId, null, file); default: throw new IllegalArgumentException("Unexpected entry status: " + status); } diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java new file mode 100644 index 000000000000..c633cec07bde --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumber extends TableTestBase { + + @Test + public void testSequenceNumberForFastAppend() throws IOException { + table.newFastAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + Assert.assertEquals(1, manifestFile.sequenceNumber().longValue()); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(2, manifestFile.sequenceNumber().longValue()); + + manifestFile = writeManifest(FILE_C, FILE_D); + table.newFastAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber().longValue()); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(3, manifestFile.sequenceNumber().longValue()); + + for (ManifestEntry entry : ManifestReader.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals(3, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testSequenceNumberForMergeAppend() throws IOException { + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + table.newAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); + + table.newAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); + + ManifestFile manifestFile = writeManifest(FILE_C, FILE_D); + table.newAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber().longValue()); + + manifestFile = table.currentSnapshot().manifests().get(0); + + Assert.assertEquals("the sequence number of manifest should be 3", + 3, manifestFile.sequenceNumber().longValue()); + + for (ManifestEntry entry : ManifestReader.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 2", 2, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals("the sequence number of data file should be 3", 3, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testSequenceNumberForRewrite() throws IOException { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("the sequence number of snapshot should be 3", + 3, table.currentSnapshot().sequenceNumber().longValue()); + + ManifestFile newManifest = table.currentSnapshot().manifests().get(0); + Assert.assertEquals("the sequence number of manifest should be 3", + 3, newManifest.sequenceNumber().longValue()); + + for (ManifestEntry entry : ManifestReader.read(newManifest, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 2, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testCommitConflict() { + Transaction txn = table.newTransaction(); + + txn.newFastAppend().appendFile(FILE_A).apply(); + table.newFastAppend().appendFile(FILE_B).commit(); + + AssertHelpers.assertThrows("Should failed due to conflict", + IllegalStateException.class, "last operation has not committed", txn::commitTransaction); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + ManifestFile manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + for (ManifestEntry entry : ManifestReader.read(manifestFile, table.io(), + table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_C.path())) { + Assert.assertEquals(table.currentSnapshot().sequenceNumber(), entry.sequenceNumber()); + } + } + } + + @Test + public void testConcurrentCommit() throws InterruptedException { + ExecutorService threadPool = Executors.newFixedThreadPool(4); + List> tasks = new ArrayList<>(); + + Callable write1 = () -> { + Transaction txn = table.newTransaction(); + txn.newFastAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable write2 = () -> { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_B).commit(); + txn.commitTransaction(); + return null; + }; + + Callable write3 = () -> { + Transaction txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable write4 = () -> { + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).commit(); + txn.commitTransaction(); + return null; + }; + + tasks.add(write1); + tasks.add(write2); + tasks.add(write3); + tasks.add(write4); + threadPool.invokeAll(tasks); + threadPool.shutdown(); + + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.newFastAppend().appendFile(FILE_B).commit(); + + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + table.rollback().toSnapshotId(snapshotId).commit(); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + } + + @Test + public void testMultipleTxnOperations() { + Snapshot snapshot; + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + Set toAddFiles = new HashSet<>(); + Set toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.newReplacePartitions().addFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(3, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_C).commit(); + txn.commitTransaction(); + + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(5, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + snapshot = table.currentSnapshot(); + + txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).deleteFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + } + + @Test + public void testSequenceNumberForCherryPicking() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // WAP commit + table.newAppend() + .appendFile(FILE_B) + .set("wap.id", "123456789") + .stageOnly() + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 1", + 1, table.currentSnapshot().sequenceNumber().longValue()); + + // pick the snapshot that's staged but not committed + Snapshot wapSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("the snapshot sequence number should be 2", + 2, wapSnapshot.sequenceNumber().longValue()); + + // table has new commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 2", + 2, wapSnapshot.sequenceNumber().longValue()); + + // cherry-pick snapshot + table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit(); + + Assert.assertEquals("the snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber().longValue()); + + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 16ad4ee16a75..335151769bb1 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -87,6 +87,7 @@ public void testJsonConversionWithOperation() { public void testJsonConversionWithManifestList() throws IOException { long parentId = 1; long id = 2; + long seq = 3; List manifests = ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0), new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0)); @@ -96,14 +97,14 @@ public void testJsonConversionWithManifestList() throws IOException { manifestList.deleteOnExit(); try (ManifestListWriter writer = new ManifestListWriter( - Files.localOutput(manifestList), id, parentId)) { + Files.localOutput(manifestList), id, parentId, seq)) { writer.addAll(manifests); } Snapshot expected = new BaseSnapshot( - ops.io(), id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList)); + ops.io(), id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList), seq); Snapshot inMemory = new BaseSnapshot( - ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests); + ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests, seq); Assert.assertEquals("Files should match in memory list", inMemory.manifests(), expected.manifests()); @@ -123,5 +124,6 @@ public void testJsonConversionWithManifestList() throws IOException { expected.manifests(), snapshot.manifests()); Assert.assertNull("Operation should be null", snapshot.operation()); Assert.assertNull("Summary should be null", snapshot.summary()); + Assert.assertEquals("Sequence number should match", expected.sequenceNumber(), snapshot.sequenceNumber()); } } diff --git a/site/docs/spec.md b/site/docs/spec.md index 8a7e7a0632ab..a3244ad9e5cb 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -206,11 +206,12 @@ The partition spec for a manifest and the current table schema must be stored in The schema of a manifest file is a struct called `manifest_entry` with the following fields: -| Field id, name | Type | Description | -|----------------------|-----------------------------------------------------------|-----------------------------------------------------------------| -| **`0 status`** | `int` with meaning: `0: EXISTING` `1: ADDED` `2: DELETED` | Used to track additions and deletions | -| **`1 snapshot_id`** | `long` | Snapshot id where the file was added, or deleted if status is 2 | -| **`2 data_file`** | `data_file` `struct` (see below) | File path, partition tuple, metrics, ... | +| Field id, name | Type | Description | +|-------------------------|-----------------------------------------------------------|-------------------------------------------------------------------------| +| **`0 status`** | `int` with meaning: `0: EXISTING` `1: ADDED` `2: DELETED` | Used to track additions and deletions | +| **`1 snapshot_id`** | `long` | Snapshot id where the file was added, or deleted if status is 2 | +| **`2 data_file`** | `data_file` `struct` (see below) | File path, partition tuple, metrics, ... | +| **`3 sequence_number`**| `optional long` | Sequence number of the snapshot in which the file was added | `data_file` is a struct with the following fields: @@ -232,6 +233,7 @@ The schema of a manifest file is a struct called `manifest_entry` with the follo | **`128 upper_bounds`** | `optional map<129: int, 130: binary>` | Map from column id to upper bound in the column serialized as binary [1]. Each value must be greater than or equal to all values in the column for the file. | | **`131 key_metadata`** | `optional binary` | Implementation-specific key metadata for encryption | | **`132 split_offsets`** | `optional list` | Split offsets for the data file. For example, all row group offsets in a Parquet file. Must be sorted ascending. | +| **`134 sequence_number`** | `optional long` | Sequence number of the snapshot in which the file was added | Notes: @@ -246,9 +248,9 @@ Each manifest file must store its partition spec and the current table schema in The manifest entry fields are used to keep track of the snapshot in which files were added or logically deleted. The `data_file` struct is nested inside of the manifest entry so that it can be easily passed to job planning without the manifest entry fields. -When a data file is added to the dataset, it’s manifest entry should store the snapshot ID in which the file was added and set status to 1 (added). +When a data file is added to the dataset, it’s manifest entry should store the snapshot ID and the sequence number in which the file was added and set status to 1 (added). -When a data file is replaced or deleted from the dataset, it’s manifest entry fields store the snapshot ID in which the file was deleted and status 2 (deleted). The file may be deleted from the file system when the snapshot in which it was deleted is garbage collected, assuming that older snapshots have also been garbage collected [1]. +When a data file is replaced or deleted from the dataset, it’s manifest entry fields store the snapshot ID and the sequence number in which the file was deleted and status 2 (deleted). The file may be deleted from the file system when the snapshot in which it was deleted is garbage collected, assuming that older snapshots have also been garbage collected [1]. Notes: @@ -259,6 +261,7 @@ Notes: A snapshot consists of the following fields: * **`snapshot-id`** -- A unique long ID. +* **`sequence-number`** -- A monotonically increasing value that identifies the order in which data files and deletion files are to be processed * **`parent-snapshot-id`** -- (Optional) The snapshot ID of the snapshot’s parent. This field is not present for snapshots that have no parent snapshot, such as snapshots created before this field was added or the first snapshot of a table. * **`timestamp-ms`** -- A timestamp when the snapshot was created. This is used when garbage collecting snapshots. * **`manifests`** -- A list of manifest file locations. The data files in a snapshot are the union of all data files listed in these manifests. (Deprecated in favor of `manifest-list`) @@ -313,6 +316,7 @@ Manifest list files store `manifest_file`, a struct with the following fields: | **`512 added_rows_count`** | `long` | Number of rows in all of files in the manifest that have status `ADDED` | | **`513 existing_rows_count`** | `long` | Number of rows in all of files in the manifest that have status `EXISTING` | | **`514 deleted_rows_count`** | `long` | Number of rows in all of files in the manifest that have status `DELETED` | +| **`515 sequence_number`** | `long` | Sequence number of the snapshot where the manifest file was added | `field_summary` is a struct with the following fields diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java index 5c6640efe08d..ff58522fbc4f 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java @@ -257,5 +257,10 @@ public Iterable deletedFiles() { public String manifestListLocation() { return null; } + + @Override + public Long sequenceNumber() { + return null; + } } }