From d29358a96b8166f803bbc5076e0d4e31f84afe52 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Mon, 18 Nov 2019 11:27:51 +0800 Subject: [PATCH 01/11] Add sequence number for snapshot --- .../java/org/apache/iceberg/Snapshot.java | 7 +++ .../java/org/apache/iceberg/BaseSnapshot.java | 36 +++++++++++ .../org/apache/iceberg/SnapshotParser.java | 10 +++- .../org/apache/iceberg/SnapshotProducer.java | 5 +- .../org/apache/iceberg/util/JsonUtil.java | 10 ++++ .../apache/iceberg/TestSequenceNumber.java | 60 +++++++++++++++++++ .../source/TestForwardCompatibility.java | 5 ++ 7 files changed, 129 insertions(+), 4 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestSequenceNumber.java diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index 25822c967852..bc3e7cf629d9 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -104,4 +104,11 @@ public interface Snapshot { * @return the location of the manifest list for this Snapshot */ String manifestListLocation(); + + /** + * Return this snapshot's sequence number, or 0 if the table has no snapshot yet. + * + * @return the sequence number of this Snapshot + */ + Long sequenceNumber(); } diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index c7b1559e42d3..937ca8fea867 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -42,6 +42,7 @@ class BaseSnapshot implements Snapshot { private final InputFile manifestList; private final String operation; private final Map summary; + private Long sequenceNumber; // lazily initialized private List manifests = null; @@ -76,6 +77,31 @@ class BaseSnapshot implements Snapshot { } BaseSnapshot(FileIO io, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + InputFile manifestList, + Long sequenceNumber) { + this(ops, snapshotId, parentId, timestampMillis, operation, summary, manifestList); + this.sequenceNumber = sequenceNumber; + } + + BaseSnapshot(TableOperations ops, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + List manifests, + Long sequenceNumber) { + this(ops, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null); + this.manifests = manifests; + this.sequenceNumber = sequenceNumber; + } + + BaseSnapshot(TableOperations ops, long snapshotId, Long parentId, long timestampMillis, @@ -186,6 +212,15 @@ private void cacheChanges() { this.cachedDeletes = deletes.build(); } + @Override + public Long sequenceNumber() { + if (sequenceNumber == null) { + return 0L; + } + + return sequenceNumber; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -194,6 +229,7 @@ public String toString() { .add("operation", operation) .add("summary", summary) .add("manifests", manifests()) + .add("sequence_number", sequenceNumber()) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index 17b8083cdef8..f52b4cbf888e 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -44,6 +44,7 @@ private SnapshotParser() {} private static final String OPERATION = "operation"; private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; + private static final String SEQUENCE_NUMBER = "sequence-number"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { @@ -54,6 +55,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) } generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis()); + if (snapshot.sequenceNumber() != null) { + generator.writeNumberField(SEQUENCE_NUMBER, snapshot.sequenceNumber()); + } + // if there is an operation, write the summary map if (snapshot.operation() != null) { generator.writeObjectFieldStart(SUMMARY); @@ -109,6 +114,7 @@ static Snapshot fromJson(FileIO io, JsonNode node) { parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node); } long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node); + Long sequenceNumber = JsonUtil.getLongOrNull(SEQUENCE_NUMBER, node); Map summary = null; String operation = null; @@ -135,14 +141,14 @@ static Snapshot fromJson(FileIO io, JsonNode node) { String manifestList = JsonUtil.getString(MANIFEST_LIST, node); return new BaseSnapshot( io, versionId, parentId, timestamp, operation, summary, - io.newInputFile(manifestList)); + io.newInputFile(manifestList), sequenceNumber); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be // loaded lazily, if it is needed List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node), location -> new GenericManifestFile(io.newInputFile(location), 0)); - return new BaseSnapshot(io, versionId, parentId, timestamp, operation, summary, manifests); + return new BaseSnapshot(io, versionId, parentId, timestamp, operation, summary, manifests, sequenceNumber); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 05802267a3c8..f8ddc3c27533 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -144,6 +144,7 @@ public Snapshot apply() { base.currentSnapshot().snapshotId() : null; List manifests = apply(base); + long newSequenceNumber = base.currentSnapshot() == null ? 1 : base.currentSnapshot().sequenceNumber() + 1; if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { OutputFile manifestList = manifestListPath(); @@ -170,12 +171,12 @@ manifestList, snapshotId(), parentSnapshotId)) { return new BaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - ops.io().newInputFile(manifestList.location())); + ops.io().newInputFile(manifestList.location()), newSequenceNumber); } else { return new BaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - manifests); + manifests, newSequenceNumber); } } diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 976a1cc3933c..cc9db08d6ece 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -70,6 +70,16 @@ public static long getLong(String property, JsonNode node) { return pNode.asLong(); } + public static Long getLongOrNull(String property, JsonNode node) { + if (!node.has(property)) { + return null; + } + JsonNode pNode = node.get(property); + Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isNumber() && pNode.canConvertToLong(), + "Cannot parse %s from non-string value: %s", property, pNode); + return pNode.asLong(); + } + public static boolean getBool(String property, JsonNode node) { Preconditions.checkArgument(node.has(property), "Cannot parse missing boolean %s", property); JsonNode pNode = node.get(property); diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java new file mode 100644 index 000000000000..2cd1202ff25f --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.junit.Assert; +import org.junit.Test; + + +public class TestSequenceNumber extends TableTestBase { + + @Test + public void testWriteSequenceNumber() { + table.newFastAppend().appendFile(FILE_A).commit(); + + Assert.assertEquals("sequence number should be 1", 1, + table.currentSnapshot().sequenceNumber().longValue()); + + table.newFastAppend().appendFile(FILE_B).commit(); + + Assert.assertEquals("sequence number should be 2", 2, + table.currentSnapshot().sequenceNumber().longValue()); + + } + + @Test + public void testReadSequenceNumber() { + long curSeqNum; + + if (table.currentSnapshot() == null) { + curSeqNum = 0; + } else { + curSeqNum = table.currentSnapshot().sequenceNumber(); + } + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + + Assert.assertEquals(curSeqNum + 2, + TestTables.load(tableDir, "test") + .currentSnapshot() + .sequenceNumber() + .longValue()); + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java index 5c6640efe08d..ff58522fbc4f 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java @@ -257,5 +257,10 @@ public Iterable deletedFiles() { public String manifestListLocation() { return null; } + + @Override + public Long sequenceNumber() { + return null; + } } } From ffc849e41e5e694a09d686a97ca74b049fc8ecaf Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Tue, 3 Dec 2019 20:22:01 +0800 Subject: [PATCH 02/11] Add more unit tests --- .../java/org/apache/iceberg/BaseSnapshot.java | 8 +- .../org/apache/iceberg/util/JsonUtil.java | 2 +- .../apache/iceberg/TestSequenceNumber.java | 145 +++++++++++++++--- 3 files changed, 131 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 937ca8fea867..165e5e95c6df 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -84,11 +84,11 @@ class BaseSnapshot implements Snapshot { Map summary, InputFile manifestList, Long sequenceNumber) { - this(ops, snapshotId, parentId, timestampMillis, operation, summary, manifestList); + this(io, snapshotId, parentId, timestampMillis, operation, summary, manifestList); this.sequenceNumber = sequenceNumber; } - BaseSnapshot(TableOperations ops, + BaseSnapshot(FileIO io, long snapshotId, Long parentId, long timestampMillis, @@ -96,12 +96,12 @@ class BaseSnapshot implements Snapshot { Map summary, List manifests, Long sequenceNumber) { - this(ops, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null); + this(io, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null); this.manifests = manifests; this.sequenceNumber = sequenceNumber; } - BaseSnapshot(TableOperations ops, + BaseSnapshot(FileIO io, long snapshotId, Long parentId, long timestampMillis, diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index cc9db08d6ece..70fc10142b8f 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -76,7 +76,7 @@ public static Long getLongOrNull(String property, JsonNode node) { } JsonNode pNode = node.get(property); Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isNumber() && pNode.canConvertToLong(), - "Cannot parse %s from non-string value: %s", property, pNode); + "Cannot parse %s from non-numeric value: %s", property, pNode); return pNode.asLong(); } diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java index 2cd1202ff25f..5608be6ecde4 100644 --- a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java @@ -19,42 +19,149 @@ package org.apache.iceberg; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.Assert; import org.junit.Test; - public class TestSequenceNumber extends TableTestBase { @Test - public void testWriteSequenceNumber() { + public void testReadWriteSequenceNumber() { table.newFastAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); + } - Assert.assertEquals("sequence number should be 1", 1, - table.currentSnapshot().sequenceNumber().longValue()); + @Test + public void testCommitConflict() { + Transaction txn = table.newTransaction(); + txn.newFastAppend().appendFile(FILE_A).apply(); table.newFastAppend().appendFile(FILE_B).commit(); - Assert.assertEquals("sequence number should be 2", 2, - table.currentSnapshot().sequenceNumber().longValue()); + AssertHelpers.assertThrows("Should failed due to conflict", + IllegalStateException.class, "last operation has not committed", txn::commitTransaction); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + } + + @Test + public void testConcurrentCommit() throws InterruptedException { + ExecutorService threadPool = Executors.newFixedThreadPool(4); + List> tasks = new ArrayList<>(); + + Callable write1 = () -> { + Transaction txn = table.newTransaction(); + txn.newFastAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable write2 = () -> { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_B).commit(); + txn.commitTransaction(); + return null; + }; + + Callable write3 = () -> { + Transaction txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable write4 = () -> { + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).commit(); + txn.commitTransaction(); + return null; + }; + + tasks.add(write1); + tasks.add(write2); + tasks.add(write3); + tasks.add(write4); + threadPool.invokeAll(tasks); + threadPool.shutdown(); + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); } @Test - public void testReadSequenceNumber() { - long curSeqNum; - - if (table.currentSnapshot() == null) { - curSeqNum = 0; - } else { - curSeqNum = table.currentSnapshot().sequenceNumber(); - } + public void testRollBack() { table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); table.newFastAppend().appendFile(FILE_B).commit(); - Assert.assertEquals(curSeqNum + 2, - TestTables.load(tableDir, "test") - .currentSnapshot() - .sequenceNumber() - .longValue()); + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + table.rollback().toSnapshotId(snapshotId).commit(); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + } + + @Test + public void testMultipleTxnOperations() { + Snapshot snapshot; + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + Set toAddFiles = new HashSet<>(); + Set toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.newReplacePartitions().addFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(3, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_C).commit(); + txn.commitTransaction(); + + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(5, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + snapshot = table.currentSnapshot(); + + txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).deleteFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); + + txn = table.newTransaction(); + txn.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() + .longValue()); } } From 09eb9a81e22e6898e5e7d06549211352f661321a Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Tue, 4 Feb 2020 12:18:22 +0800 Subject: [PATCH 03/11] Add sequence number for data file and manifest file --- .../java/org/apache/iceberg/DataFile.java | 12 ++- .../java/org/apache/iceberg/ManifestFile.java | 11 ++- .../java/org/apache/iceberg/TestHelpers.java | 10 ++ .../apache/iceberg/BaseRewriteManifests.java | 7 +- .../java/org/apache/iceberg/FastAppend.java | 20 ++-- .../org/apache/iceberg/GenericDataFile.java | 11 +++ .../apache/iceberg/GenericManifestFile.java | 37 ++++++++ .../iceberg/InheritableMetadataFactory.java | 9 +- .../org/apache/iceberg/ManifestEntry.java | 31 +++++- .../apache/iceberg/ManifestListWriter.java | 7 +- .../org/apache/iceberg/ManifestWriter.java | 25 +++-- .../iceberg/MergingSnapshotProducer.java | 19 +++- .../org/apache/iceberg/SnapshotProducer.java | 15 ++- .../org/apache/iceberg/TableOperations.java | 13 +++ .../org/apache/iceberg/TableTestBase.java | 6 +- .../apache/iceberg/TestSequenceNumber.java | 95 +++++++++++++++++++ .../org/apache/iceberg/TestSnapshotJson.java | 8 +- 17 files changed, 297 insertions(+), 39 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 456082ebcb56..cd416966f9df 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -59,8 +59,9 @@ static StructType getType(StructType partitionType) { optional(128, "upper_bounds", MapType.ofRequired(129, 130, IntegerType.get(), BinaryType.get())), optional(131, "key_metadata", BinaryType.get()), - optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())) - // NEXT ID TO ASSIGN: 134 + optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())), + optional(134, "sequence_number", LongType.get()) + // NEXT ID TO ASSIGN: 135 ); } @@ -152,4 +153,11 @@ static StructType getType(StructType partitionType) { * are determined by these offsets. The returned list must be sorted in ascending order. */ List splitOffsets(); + + /** + * @return The sequence number to identify the order in which data files and deletion files are to be processed. + * If the sequence number is not specified it is inherited from the manifest file struct in the manifest list file. + */ + Long sequenceNumber(); + } diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 176bbbd3293c..8443e0af40b9 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -45,7 +45,9 @@ public interface ManifestFile { ))), optional(512, "added_rows_count", Types.LongType.get()), optional(513, "existing_rows_count", Types.LongType.get()), - optional(514, "deleted_rows_count", Types.LongType.get())); + optional(514, "deleted_rows_count", Types.LongType.get()), + optional(515, "sequence_number", Types.LongType.get()) + ); static Schema schema() { return SCHEMA; @@ -128,6 +130,13 @@ default boolean hasDeletedFiles() { */ Long deletedRowsCount(); + /** + * @return the sequence number of this manifest. The sequence number of manifest stores in manifest list file. Since + * The data files' sequence number is optional, it should inherit the manifest's sequence number if the reader reads + * null from manifest file. + */ + Long sequenceNumber(); + /** * Returns a list of {@link PartitionFieldSummary partition field summaries}. *

diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 5f5018110015..0ea8070d5f31 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -255,6 +255,11 @@ public Long deletedRowsCount() { return deletedRows; } + @Override + public Long sequenceNumber() { + return null; + } + @Override public List partitions() { return partitions; @@ -372,5 +377,10 @@ public DataFile copyWithoutStats() { public List splitOffsets() { return null; } + + @Override + public Long sequenceNumber() { + return null; + } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 39e7a7d1c695..c7a0cad903b6 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -146,7 +146,7 @@ public RewriteManifests addManifest(ManifestFile manifest) { if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { addedManifests.add(manifest); } else { - // the manifest must be rewritten with this update's snapshot ID + // the manifest must be rewritten with this update's snapshot ID and sequence number ManifestFile copiedManifest = copyManifest(manifest); rewrittenAddedManifests.add(copiedManifest); } @@ -178,10 +178,11 @@ public List apply(TableMetadata base) { validateFilesCounts(); - // TODO: add sequence numbers here Iterable newManifestsWithMetadata = Iterables.transform( Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests), - manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()) + .withSequenceNumber(sequenceNumber()) + .build()); // put new manifests at the beginning List apply = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 246e10fa38c1..fbd3eb129c6b 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -100,7 +100,7 @@ public FastAppend appendManifest(ManifestFile manifest) { summaryBuilder.addedManifest(manifest); appendManifests.add(manifest); } else { - // the manifest must be rewritten with this update's snapshot ID + // the manifest must be rewritten with this update's snapshot ID and sequence number ManifestFile copiedManifest = copyManifest(manifest); rewrittenAppendManifests.add(copiedManifest); } @@ -130,17 +130,25 @@ public List apply(TableMetadata base) { throw new RuntimeIOException(e, "Failed to write manifest"); } - // TODO: add sequence numbers here + Iterable newManifestsWithMetadata = Iterables.transform(newManifests, + manifest -> GenericManifestFile.copyOf(manifest).withSequenceNumber(sequenceNumber()).build()); + Iterable appendManifestsWithMetadata = Iterables.transform( Iterables.concat(appendManifests, rewrittenAppendManifests), - manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); - Iterables.addAll(newManifests, appendManifestsWithMetadata); + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()) + .withSequenceNumber(sequenceNumber()) + .build()); + + List manifestsWithMetadata = Lists.newArrayList(); + Iterables.addAll(manifestsWithMetadata, newManifestsWithMetadata); + Iterables.addAll(manifestsWithMetadata, appendManifestsWithMetadata); + if (base.currentSnapshot() != null) { - newManifests.addAll(base.currentSnapshot().manifests()); + manifestsWithMetadata.addAll(base.currentSnapshot().manifests()); } - return newManifests; + return manifestsWithMetadata; } @Override diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 03d1015fb921..ec03899573c6 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -65,6 +65,7 @@ public PartitionData copy() { private Map upperBounds = null; private List splitOffsets = null; private byte[] keyMetadata = null; + private Long sequenceNumber = null; // cached schema private transient org.apache.avro.Schema avroSchema = null; @@ -270,6 +271,11 @@ public List splitOffsets() { return splitOffsets; } + @Override + public Long sequenceNumber() { + return sequenceNumber; + } + @Override public org.apache.avro.Schema getSchema() { if (avroSchema == null) { @@ -332,6 +338,9 @@ public void put(int i, Object v) { case 14: this.splitOffsets = (List) v; return; + case 15: + this.sequenceNumber = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -382,6 +391,8 @@ public Object get(int i) { return keyMetadata(); case 14: return splitOffsets; + case 15: + return sequenceNumber; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index bb741198d794..dbb2b7d3fbd6 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -53,6 +53,7 @@ public class GenericManifestFile private Integer deletedFilesCount = null; private Long deletedRowsCount = null; private List partitions = null; + private Long sequenceNumber = null; /** * Used by Avro reflection to instantiate this class when reading manifest files. @@ -136,6 +137,26 @@ public GenericManifestFile(String path, long length, int specId, Long snapshotId this.fromProjectionPos = null; } + public GenericManifestFile(String path, long length, int specId, Long snapshotId, Long sequenceNumber, + int addedFilesCount, long addedRowsCount, int existingFilesCount, + long existingRowsCount, int deletedFilesCount, long deletedRowsCount, + List partitions) { + this.avroSchema = AVRO_SCHEMA; + this.manifestPath = path; + this.length = length; + this.specId = specId; + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; + this.addedFilesCount = addedFilesCount; + this.addedRowsCount = addedRowsCount; + this.existingFilesCount = existingFilesCount; + this.existingRowsCount = existingRowsCount; + this.deletedFilesCount = deletedFilesCount; + this.deletedRowsCount = deletedRowsCount; + this.partitions = partitions; + this.fromProjectionPos = null; + } + /** * Copy constructor. * @@ -155,6 +176,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { this.deletedRowsCount = toCopy.deletedRowsCount; this.partitions = ImmutableList.copyOf(Iterables.transform(toCopy.partitions, PartitionFieldSummary::copy)); this.fromProjectionPos = toCopy.fromProjectionPos; + this.sequenceNumber = toCopy.sequenceNumber; } /** @@ -226,6 +248,11 @@ public Long deletedRowsCount() { return deletedRowsCount; } + @Override + public Long sequenceNumber() { + return sequenceNumber; + } + @Override public List partitions() { return partitions; @@ -271,6 +298,8 @@ public Object get(int i) { return existingRowsCount; case 10: return deletedRowsCount; + case 11: + return sequenceNumber; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -319,6 +348,9 @@ public void set(int i, T value) { case 10: this.deletedRowsCount = (Long) value; return; + case 11: + this.sequenceNumber = (Long) value; + return; default: // ignore the object, it must be from a newer version of the format } @@ -396,6 +428,11 @@ public CopyBuilder withSnapshotId(Long newSnapshotId) { return this; } + public CopyBuilder withSequenceNumber(Long newSequenceNumber) { + manifestFile.sequenceNumber = newSequenceNumber; + return this; + } + public ManifestFile build() { return manifestFile; } diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java index 14bf2ff176c7..265eba210d51 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -30,21 +30,26 @@ static InheritableMetadata empty() { } static InheritableMetadata fromManifest(ManifestFile manifest) { - return new BaseInheritableMetadata(manifest.snapshotId()); + return new BaseInheritableMetadata(manifest.snapshotId(), manifest.sequenceNumber()); } static class BaseInheritableMetadata implements InheritableMetadata { private final Long snapshotId; + private final Long sequenceNumber; - private BaseInheritableMetadata(Long snapshotId) { + private BaseInheritableMetadata(Long snapshotId, Long sequenceNumber) { this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; } public ManifestEntry apply(ManifestEntry manifestEntry) { if (manifestEntry.snapshotId() == null) { manifestEntry.setSnapshotId(snapshotId); } + if (manifestEntry.sequenceNumber() == null) { + manifestEntry.setSequenceNumber(sequenceNumber); + } return manifestEntry; } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index 728d923830b8..add89a823806 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -52,6 +52,7 @@ public int id() { private Status status = Status.EXISTING; private Long snapshotId = null; private DataFile file = null; + private Long sequenceNumber = null; ManifestEntry(org.apache.avro.Schema schema) { this.schema = schema; @@ -65,6 +66,7 @@ private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) { this.schema = toCopy.schema; this.status = toCopy.status; this.snapshotId = toCopy.snapshotId; + this.sequenceNumber = toCopy.sequenceNumber; if (fullCopy) { this.file = toCopy.file().copy(); } else { @@ -72,23 +74,26 @@ private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) { } } - ManifestEntry wrapExisting(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { this.status = Status.EXISTING; this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; this.file = newFile; return this; } - ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapAppend(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { this.status = Status.ADDED; this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; this.file = newFile; return this; } - ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapDelete(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { this.status = Status.DELETED; this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; this.file = newFile; return this; } @@ -107,6 +112,13 @@ public Long snapshotId() { return snapshotId; } + /** + * @return sequence number of the snapshot in which the file was added to the table + */ + public Long sequenceNumber() { + return sequenceNumber; + } + /** * @return a file */ @@ -126,6 +138,10 @@ public void setSnapshotId(Long snapshotId) { this.snapshotId = snapshotId; } + public void setSequenceNumber(Long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + @Override public void put(int i, Object v) { switch (i) { @@ -138,6 +154,9 @@ public void put(int i, Object v) { case 2: this.file = (DataFile) v; return; + case 3: + this.sequenceNumber = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -152,6 +171,8 @@ public Object get(int i) { return snapshotId; case 2: return file; + case 3: + return sequenceNumber; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -176,7 +197,8 @@ static Schema wrapFileSchema(StructType fileStruct) { return new Schema( required(0, "status", IntegerType.get()), optional(1, "snapshot_id", LongType.get()), - required(2, "data_file", fileStruct)); + required(2, "data_file", fileStruct), + optional(3, "sequence_number", LongType.get())); } @Override @@ -184,6 +206,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("status", status) .add("snapshot_id", snapshotId) + .add("sequence_number", sequenceNumber) .add("file", file) .toString(); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index 0271695d32b9..9cce6081a16e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -31,10 +31,13 @@ class ManifestListWriter implements FileAppender { private final FileAppender writer; - ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) { + ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, Long sequenceNumber) { this.writer = newAppender(snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), - "parent-snapshot-id", String.valueOf(parentSnapshotId))); + "parent-snapshot-id", String.valueOf(parentSnapshotId), + "sequence-number", String.valueOf(sequenceNumber) + ) + ); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index c99c491ee39b..a44526e22d20 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -110,6 +110,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private long existingRows = 0L; private int deletedFiles = 0; private long deletedRows = 0L; + private Long sequenceNumber = null; ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; @@ -120,6 +121,16 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { this.stats = new PartitionSummary(spec); } + ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId, Long sequenceNumber) { + this.file = file; + this.specId = spec.specId(); + this.writer = newAppender(FileFormat.AVRO, spec, file); + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; + this.reused = new ManifestEntry(spec.partitionType()); + this.stats = new PartitionSummary(spec); + } + void addEntry(ManifestEntry entry) { switch (entry.status()) { case ADDED: @@ -150,11 +161,11 @@ void addEntry(ManifestEntry entry) { public void add(DataFile addedFile) { // TODO: this assumes that file is a GenericDataFile that can be written directly to Avro // Eventually, this should check in case there are other DataFile implementations. - addEntry(reused.wrapAppend(snapshotId, addedFile)); + addEntry(reused.wrapAppend(snapshotId, sequenceNumber, addedFile)); } public void add(ManifestEntry entry) { - addEntry(reused.wrapAppend(snapshotId, entry.file())); + addEntry(reused.wrapAppend(snapshotId, sequenceNumber, entry.file())); } /** @@ -163,12 +174,12 @@ public void add(ManifestEntry entry) { * @param existingFile a data file * @param fileSnapshotId snapshot ID when the data file was added to the table */ - public void existing(DataFile existingFile, long fileSnapshotId) { - addEntry(reused.wrapExisting(fileSnapshotId, existingFile)); + public void existing(DataFile existingFile, long fileSnapshotId, long fileSequenceNumber) { + addEntry(reused.wrapExisting(fileSnapshotId, fileSequenceNumber, existingFile)); } void existing(ManifestEntry entry) { - addEntry(reused.wrapExisting(entry.snapshotId(), entry.file())); + addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file())); } /** @@ -179,13 +190,13 @@ void existing(ManifestEntry entry) { * @param deletedFile a data file */ public void delete(DataFile deletedFile) { - addEntry(reused.wrapDelete(snapshotId, deletedFile)); + addEntry(reused.wrapDelete(snapshotId, sequenceNumber, deletedFile)); } void delete(ManifestEntry entry) { // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk // when this Snapshot has been removed or when there are no Snapshots older than this one. - addEntry(reused.wrapDelete(snapshotId, entry.file())); + addEntry(reused.wrapDelete(snapshotId, sequenceNumber, entry.file())); } @Override diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e08ae494787a..6202ab955edd 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -215,7 +215,7 @@ protected void add(ManifestFile manifest) { appendManifests.add(manifest); appendedManifest = manifest; } else { - // the manifest must be rewritten with this update's snapshot ID + // the manifest must be rewritten with this update's snapshot ID and sequence number ManifestFile copiedManifest = copyManifest(manifest); rewrittenAppendManifests.add(copiedManifest); appendedManifest = copiedManifest; @@ -274,10 +274,11 @@ public List apply(TableMetadata base) { newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests); } - // TODO: add sequence numbers here Iterable newManifestsWithMetadata = Iterables.transform( newManifests, - manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()) + .withSequenceNumber(sequenceNumber()) + .build()); // filter any existing manifests List filtered; @@ -667,11 +668,20 @@ private ManifestFile createManifest(int specId, List bin) throws I // should be added to the new manifest if (entry.snapshotId() == snapshotId()) { writer.addEntry(entry); + } else { + // since the original manifest will be cleanup, it needs to save the sequence number to the entry. + if (entry.sequenceNumber() == null) { + entry.setSequenceNumber(manifest.sequenceNumber()); + } } } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) { // adds from this snapshot are still adds, otherwise they should be existing writer.addEntry(entry); } else { + // since the original manifest will be cleanup, it needs to save the sequence number to the entry. + if (entry.sequenceNumber() == null) { + entry.setSequenceNumber(manifest.sequenceNumber()); + } // add all files from the old manifest as existing files writer.existing(entry); } @@ -682,7 +692,8 @@ private ManifestFile createManifest(int specId, List bin) throws I writer.close(); } - ManifestFile manifest = writer.toManifestFile(); + ManifestFile manifest = GenericManifestFile.copyOf(writer.toManifestFile()) + .withSequenceNumber(sequenceNumber()).build(); // update the cache mergeManifests.put(bin, manifest); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index f8ddc3c27533..2c507a6741b7 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -78,6 +78,7 @@ public void accept(String file) { private final AtomicInteger attempt = new AtomicInteger(0); private final List manifestLists = Lists.newArrayList(); private Long snapshotId = null; + private Long sequenceNumber = null; private TableMetadata base = null; private boolean stageOnly = false; private Consumer deleteFunc = defaultDelete; @@ -144,13 +145,13 @@ public Snapshot apply() { base.currentSnapshot().snapshotId() : null; List manifests = apply(base); - long newSequenceNumber = base.currentSnapshot() == null ? 1 : base.currentSnapshot().sequenceNumber() + 1; + long newSequenceNumber = sequenceNumber(); if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { OutputFile manifestList = manifestListPath(); try (ManifestListWriter writer = new ManifestListWriter( - manifestList, snapshotId(), parentSnapshotId)) { + manifestList, snapshotId(), parentSnapshotId, newSequenceNumber)) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -245,6 +246,9 @@ public void commit() { updated = base.replaceCurrentSnapshot(newSnapshot); } + // reset sequence number to null so that the sequence number can be updated when retry + this.sequenceNumber = null; + // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries // to ensure that if a concurrent operation assigns the UUID, this operation will not fail. taskOps.commit(base, updated.withUUID()); @@ -308,6 +312,13 @@ protected long snapshotId() { return snapshotId; } + protected long sequenceNumber() { + if (sequenceNumber == null) { + this.sequenceNumber = ops.newSequenceNumber(); + } + return sequenceNumber; + } + private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java b/core/src/main/java/org/apache/iceberg/TableOperations.java index 35ea6b3c5690..2e1fa3724f81 100644 --- a/core/src/main/java/org/apache/iceberg/TableOperations.java +++ b/core/src/main/java/org/apache/iceberg/TableOperations.java @@ -114,4 +114,17 @@ default long newSnapshotId() { return Math.abs(mostSignificantBits ^ leastSignificantBits); } + /** + * Create a new sequence number for a snapshot + * + * @return a long sequence number + */ + default long newSequenceNumber() { + if (current().currentSnapshot() == null || current().currentSnapshot().sequenceNumber() == null) { + return 1L; + } else { + return current().currentSnapshot().sequenceNumber() + 1; + } + } + } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 012d177d90b6..2bdfd9921fe0 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -180,11 +180,11 @@ ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFi ManifestEntry entry = new ManifestEntry(table.spec().partitionType()); switch (status) { case ADDED: - return entry.wrapAppend(snapshotId, file); + return entry.wrapAppend(snapshotId, null, file); case EXISTING: - return entry.wrapExisting(snapshotId, file); + return entry.wrapExisting(snapshotId, null, file); case DELETED: - return entry.wrapDelete(snapshotId, file); + return entry.wrapDelete(snapshotId, null, file); default: throw new IllegalArgumentException("Unexpected entry status: " + status); } diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java index 5608be6ecde4..6606409042d6 100644 --- a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -26,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Test; @@ -39,6 +41,58 @@ public void testReadWriteSequenceNumber() { Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); } + @Test + public void testSequenceNumberForFastAppend() throws IOException { + ManifestFile manifestFile = writeManifest(FILE_A, FILE_B); + table.newFastAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); + + manifestFile = table.currentSnapshot().manifests().get(0); + + Assert.assertEquals(1, manifestFile.sequenceNumber().longValue()); + + for (ManifestEntry entry : ManifestReader.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path()) || entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals(1, entry.sequenceNumber().longValue()); + } + } + + table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + Assert.assertEquals("minimum sequence number should be 2", + 2, manifestFile.sequenceNumber().longValue()); + } + + @Test + public void testSequenceNumberForMergeAppend() { + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + Assert.assertEquals(1, table.currentSnapshot().manifests().size()); + table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + + Assert.assertEquals("sequence number should be 2", + 2, manifestFile.sequenceNumber().longValue()); + + for (ManifestEntry entry : ManifestReader.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path()) || entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals(1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals(2, entry.sequenceNumber().longValue()); + } + } + } + @Test public void testCommitConflict() { Transaction txn = table.newTransaction(); @@ -51,6 +105,47 @@ public void testCommitConflict() { Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() .longValue()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + ManifestFile manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + for (ManifestEntry entry : ManifestReader.read(manifestFile, table.io(), + table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_C.path())) { + Assert.assertEquals(table.currentSnapshot().sequenceNumber(), entry.sequenceNumber()); + } + } + } + + @Test + public void testSequenceNumberForRewrite() throws IOException { + ManifestFile manifest = writeManifestWithName("manifest-file-1.avro", FILE_A); + + table.newFastAppend() + .appendManifest(manifest) + .commit(); + + table.rewriteManifests() + .clusterBy(file -> "") + .commit(); + + ManifestFile newManifest = table.currentSnapshot().manifests().get(0); + + Assert.assertEquals("the snapshot sequence number should be 1", + 2, table.currentSnapshot().sequenceNumber().longValue()); + + for (ManifestEntry entry : ManifestReader.read(newManifest, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals(1, entry.sequenceNumber().longValue()); + } + } } @Test diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 16ad4ee16a75..335151769bb1 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -87,6 +87,7 @@ public void testJsonConversionWithOperation() { public void testJsonConversionWithManifestList() throws IOException { long parentId = 1; long id = 2; + long seq = 3; List manifests = ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0), new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0)); @@ -96,14 +97,14 @@ public void testJsonConversionWithManifestList() throws IOException { manifestList.deleteOnExit(); try (ManifestListWriter writer = new ManifestListWriter( - Files.localOutput(manifestList), id, parentId)) { + Files.localOutput(manifestList), id, parentId, seq)) { writer.addAll(manifests); } Snapshot expected = new BaseSnapshot( - ops.io(), id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList)); + ops.io(), id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList), seq); Snapshot inMemory = new BaseSnapshot( - ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests); + ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests, seq); Assert.assertEquals("Files should match in memory list", inMemory.manifests(), expected.manifests()); @@ -123,5 +124,6 @@ public void testJsonConversionWithManifestList() throws IOException { expected.manifests(), snapshot.manifests()); Assert.assertNull("Operation should be null", snapshot.operation()); Assert.assertNull("Summary should be null", snapshot.summary()); + Assert.assertEquals("Sequence number should match", expected.sequenceNumber(), snapshot.sequenceNumber()); } } From 82420a2583aeebd235555ef312a84c4f60eae4b6 Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Thu, 6 Feb 2020 00:26:22 +0800 Subject: [PATCH 04/11] add unit test for cherry-pick feature --- .../apache/iceberg/TestSequenceNumber.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java index 6606409042d6..bf58c25f4408 100644 --- a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java @@ -259,4 +259,42 @@ public void testMultipleTxnOperations() { Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() .longValue()); } + + @Test + public void testSequenceNumberForCherryPicking() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // WAP commit + table.newAppend() + .appendFile(FILE_B) + .set("wap.id", "123456789") + .stageOnly() + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 1", + 1, table.currentSnapshot().sequenceNumber().longValue()); + + // pick the snapshot that's staged but not committed + Snapshot wapSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("the snapshot sequence number should be 2", + 2, wapSnapshot.sequenceNumber().longValue()); + + // table has new commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 2", + 2, wapSnapshot.sequenceNumber().longValue()); + + // cherry-pick snapshot + table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit(); + + Assert.assertEquals("the snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber().longValue()); + + } } From 9ac1143cc51d1681d9b178fb001615b5a1a3af6e Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Thu, 6 Feb 2020 18:07:34 +0800 Subject: [PATCH 05/11] update unit tests --- .../apache/iceberg/TestSequenceNumber.java | 118 ++++++++++-------- 1 file changed, 63 insertions(+), 55 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java index bf58c25f4408..c633cec07bde 100644 --- a/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumber.java @@ -34,61 +34,94 @@ public class TestSequenceNumber extends TableTestBase { @Test - public void testReadWriteSequenceNumber() { + public void testSequenceNumberForFastAppend() throws IOException { table.newFastAppend().appendFile(FILE_A).commit(); Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + Assert.assertEquals(1, manifestFile.sequenceNumber().longValue()); + table.newFastAppend().appendFile(FILE_B).commit(); Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); - } + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(2, manifestFile.sequenceNumber().longValue()); - @Test - public void testSequenceNumberForFastAppend() throws IOException { - ManifestFile manifestFile = writeManifest(FILE_A, FILE_B); + manifestFile = writeManifest(FILE_C, FILE_D); table.newFastAppend().appendManifest(manifestFile).commit(); - Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber().longValue()); - manifestFile = table.currentSnapshot().manifests().get(0); - - Assert.assertEquals(1, manifestFile.sequenceNumber().longValue()); + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(3, manifestFile.sequenceNumber().longValue()); for (ManifestEntry entry : ManifestReader.read(manifestFile, table.io(), table.ops().current().specsById()).entries()) { - if (entry.file().path().equals(FILE_A.path()) || entry.file().path().equals(FILE_B.path())) { - Assert.assertEquals(1, entry.sequenceNumber().longValue()); + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals(3, entry.sequenceNumber().longValue()); } } - - table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit(); - - manifestFile = table.currentSnapshot().manifests().stream() - .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) - .collect(Collectors.toList()).get(0); - - Assert.assertEquals("minimum sequence number should be 2", - 2, manifestFile.sequenceNumber().longValue()); } @Test - public void testSequenceNumberForMergeAppend() { + public void testSequenceNumberForMergeAppend() throws IOException { table.updateProperties() .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") .commit(); - table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); - Assert.assertEquals(1, table.currentSnapshot().manifests().size()); - table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit(); - ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + table.newAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); + + table.newAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); + + ManifestFile manifestFile = writeManifest(FILE_C, FILE_D); + table.newAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber().longValue()); - Assert.assertEquals("sequence number should be 2", - 2, manifestFile.sequenceNumber().longValue()); + manifestFile = table.currentSnapshot().manifests().get(0); + + Assert.assertEquals("the sequence number of manifest should be 3", + 3, manifestFile.sequenceNumber().longValue()); for (ManifestEntry entry : ManifestReader.read(manifestFile, table.io(), table.ops().current().specsById()).entries()) { - if (entry.file().path().equals(FILE_A.path()) || entry.file().path().equals(FILE_B.path())) { - Assert.assertEquals(1, entry.sequenceNumber().longValue()); + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 2", 2, entry.sequenceNumber().longValue()); } if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { - Assert.assertEquals(2, entry.sequenceNumber().longValue()); + Assert.assertEquals("the sequence number of data file should be 3", 3, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testSequenceNumberForRewrite() throws IOException { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("the sequence number of snapshot should be 3", + 3, table.currentSnapshot().sequenceNumber().longValue()); + + ManifestFile newManifest = table.currentSnapshot().manifests().get(0); + Assert.assertEquals("the sequence number of manifest should be 3", + 3, newManifest.sequenceNumber().longValue()); + + for (ManifestEntry entry : ManifestReader.read(newManifest, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 2, entry.sequenceNumber().longValue()); } } } @@ -123,31 +156,6 @@ public void testCommitConflict() { } } - @Test - public void testSequenceNumberForRewrite() throws IOException { - ManifestFile manifest = writeManifestWithName("manifest-file-1.avro", FILE_A); - - table.newFastAppend() - .appendManifest(manifest) - .commit(); - - table.rewriteManifests() - .clusterBy(file -> "") - .commit(); - - ManifestFile newManifest = table.currentSnapshot().manifests().get(0); - - Assert.assertEquals("the snapshot sequence number should be 1", - 2, table.currentSnapshot().sequenceNumber().longValue()); - - for (ManifestEntry entry : ManifestReader.read(newManifest, - table.io(), table.ops().current().specsById()).entries()) { - if (entry.file().path().equals(FILE_A.path())) { - Assert.assertEquals(1, entry.sequenceNumber().longValue()); - } - } - } - @Test public void testConcurrentCommit() throws InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(4); From 7313947709eb121818d1fab336673093a5a1df89 Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Sat, 8 Feb 2020 14:00:15 +0800 Subject: [PATCH 06/11] some minor changes --- .../java/org/apache/iceberg/DataFile.java | 2 +- .../java/org/apache/iceberg/TestHelpers.java | 4 ++-- .../apache/iceberg/BaseRewriteManifests.java | 3 ++- .../java/org/apache/iceberg/FastAppend.java | 3 ++- .../org/apache/iceberg/GenericDataFile.java | 4 ++-- .../apache/iceberg/GenericManifestFile.java | 20 ------------------- .../iceberg/InheritableMetadataFactory.java | 2 +- .../java/org/apache/iceberg/MergeAppend.java | 1 + 8 files changed, 11 insertions(+), 28 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index cd416966f9df..ee380a399e21 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -158,6 +158,6 @@ static StructType getType(StructType partitionType) { * @return The sequence number to identify the order in which data files and deletion files are to be processed. * If the sequence number is not specified it is inherited from the manifest file struct in the manifest list file. */ - Long sequenceNumber(); + long sequenceNumber(); } diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 0ea8070d5f31..7757bad9f4ce 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -379,8 +379,8 @@ public List splitOffsets() { } @Override - public Long sequenceNumber() { - return null; + public long sequenceNumber() { + return 0; } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index c7a0cad903b6..fc0eedc8bb1a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -142,11 +142,12 @@ public RewriteManifests addManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.snapshotId() == null || manifest.snapshotId() == -1, "Snapshot id must be assigned during commit"); + Preconditions.checkArgument(manifest.sequenceNumber() == null, "Sequence number must be assigned during commit"); if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { addedManifests.add(manifest); } else { - // the manifest must be rewritten with this update's snapshot ID and sequence number + // the manifest must be rewritten with this update's snapshot ID ManifestFile copiedManifest = copyManifest(manifest); rewrittenAddedManifests.add(copiedManifest); } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index fbd3eb129c6b..fef6bdd0aac5 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -95,12 +95,13 @@ public FastAppend appendManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.snapshotId() == null || manifest.snapshotId() == -1, "Snapshot id must be assigned during commit"); + Preconditions.checkArgument(manifest.sequenceNumber() == null, "Sequence number must be assigned during commit"); if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { summaryBuilder.addedManifest(manifest); appendManifests.add(manifest); } else { - // the manifest must be rewritten with this update's snapshot ID and sequence number + // the manifest must be rewritten with this update's snapshot ID ManifestFile copiedManifest = copyManifest(manifest); rewrittenAppendManifests.add(copiedManifest); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index ec03899573c6..7f86e3c69ada 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -272,8 +272,8 @@ public List splitOffsets() { } @Override - public Long sequenceNumber() { - return sequenceNumber; + public long sequenceNumber() { + return sequenceNumber == null ? 0 : sequenceNumber; } @Override diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index edac605bb954..8aedcefab711 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -138,26 +138,6 @@ public GenericManifestFile(String path, long length, int specId, Long snapshotId this.fromProjectionPos = null; } - public GenericManifestFile(String path, long length, int specId, Long snapshotId, Long sequenceNumber, - int addedFilesCount, long addedRowsCount, int existingFilesCount, - long existingRowsCount, int deletedFilesCount, long deletedRowsCount, - List partitions) { - this.avroSchema = AVRO_SCHEMA; - this.manifestPath = path; - this.length = length; - this.specId = specId; - this.snapshotId = snapshotId; - this.sequenceNumber = sequenceNumber; - this.addedFilesCount = addedFilesCount; - this.addedRowsCount = addedRowsCount; - this.existingFilesCount = existingFilesCount; - this.existingRowsCount = existingRowsCount; - this.deletedFilesCount = deletedFilesCount; - this.deletedRowsCount = deletedRowsCount; - this.partitions = partitions; - this.fromProjectionPos = null; - } - /** * Copy constructor. * diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java index 265eba210d51..fcdcd4dbb5de 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -47,7 +47,7 @@ public ManifestEntry apply(ManifestEntry manifestEntry) { if (manifestEntry.snapshotId() == null) { manifestEntry.setSnapshotId(snapshotId); } - if (manifestEntry.sequenceNumber() == null) { + if (manifestEntry.sequenceNumber() == null && this.sequenceNumber != null) { manifestEntry.setSequenceNumber(sequenceNumber); } return manifestEntry; diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java b/core/src/main/java/org/apache/iceberg/MergeAppend.java index 063315f68b8c..1eae9efc57a8 100644 --- a/core/src/main/java/org/apache/iceberg/MergeAppend.java +++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java @@ -55,6 +55,7 @@ public AppendFiles appendManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.snapshotId() == null || manifest.snapshotId() == -1, "Snapshot id must be assigned during commit"); + Preconditions.checkArgument(manifest.sequenceNumber() == null, "Sequence number must be assigned during commit"); add(manifest); return this; } From e72cc426f417d9316ebd4dc784c6784725e9900b Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Wed, 12 Feb 2020 23:00:09 +0800 Subject: [PATCH 07/11] fix sequence number genration for multi-thread situation --- .../main/java/org/apache/iceberg/SnapshotProducer.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 02cccdb5a4bc..b063fe174cbc 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -78,7 +78,7 @@ public void accept(String file) { private final AtomicInteger attempt = new AtomicInteger(0); private final List manifestLists = Lists.newArrayList(); private Long snapshotId = null; - private Long sequenceNumber = null; + private volatile Long sequenceNumber = null; private TableMetadata base = null; private boolean stageOnly = false; private Consumer deleteFunc = defaultDelete; @@ -329,7 +329,11 @@ protected long snapshotId() { protected long sequenceNumber() { if (sequenceNumber == null) { - this.sequenceNumber = ops.newSequenceNumber(); + synchronized (this) { + if (sequenceNumber == null) { + this.sequenceNumber = ops.newSequenceNumber(); + } + } } return sequenceNumber; } From 75fa8a84a17683d915ff37e70bf19e3526aee532 Mon Sep 17 00:00:00 2001 From: jimmyjchen Date: Mon, 17 Feb 2020 16:06:56 +0800 Subject: [PATCH 08/11] bump the table format version --- core/src/main/java/org/apache/iceberg/TableMetadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index e44e002bdab4..5b2ba8d5cd10 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -43,7 +43,7 @@ * Metadata for a table. */ public class TableMetadata { - static final int TABLE_FORMAT_VERSION = 1; + static final int TABLE_FORMAT_VERSION = 2; static final int INITIAL_SPEC_ID = 0; public static TableMetadata newTableMetadata(Schema schema, From 8ce5e753c7fa1236ab367b0dd5cfbd2ca2dd7d57 Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Mon, 17 Feb 2020 23:09:49 +0800 Subject: [PATCH 09/11] update spec --- site/docs/spec.md | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/site/docs/spec.md b/site/docs/spec.md index 27455cb81711..a8b7266f666d 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -206,11 +206,12 @@ The partition spec for a manifest and the current table schema must be stored in The schema of a manifest file is a struct called `manifest_entry` with the following fields: -| Field id, name | Type | Description | -|----------------------|-----------------------------------------------------------|-----------------------------------------------------------------| -| **`0 status`** | `int` with meaning: `0: EXISTING` `1: ADDED` `2: DELETED` | Used to track additions and deletions | -| **`1 snapshot_id`** | `long` | Snapshot id where the file was added, or deleted if status is 2 | -| **`2 data_file`** | `data_file` `struct` (see below) | File path, partition tuple, metrics, ... | +| Field id, name | Type | Description | +|-------------------------|-----------------------------------------------------------|-------------------------------------------------------------------------| +| **`0 status`** | `int` with meaning: `0: EXISTING` `1: ADDED` `2: DELETED` | Used to track additions and deletions | +| **`1 snapshot_id`** | `long` | Snapshot id where the file was added, or deleted if status is 2 | +| **`2 data_file`** | `data_file` `struct` (see below) | File path, partition tuple, metrics, ... | +| **`3 sequence_number`**| `optional long` | Sequence number of the snapshot in which the file was added | `data_file` is a struct with the following fields: @@ -232,6 +233,7 @@ The schema of a manifest file is a struct called `manifest_entry` with the follo | **`128 upper_bounds`** | `optional map<129: int, 130: binary>` | Map from column id to upper bound in the column serialized as binary [1]. Each value must be greater than or equal to all values in the column for the file. | | **`131 key_metadata`** | `optional binary` | Implementation-specific key metadata for encryption | | **`132 split_offsets`** | `optional list` | Split offsets for the data file. For example, all row group offsets in a Parquet file. Must be sorted ascending. | +| **`134 sequence_number`** | `optional long` | Sequence number of the snapshot in which the file was added | Notes: @@ -246,9 +248,9 @@ Each manifest file must store its partition spec and the current table schema in The manifest entry fields are used to keep track of the snapshot in which files were added or logically deleted. The `data_file` struct is nested inside of the manifest entry so that it can be easily passed to job planning without the manifest entry fields. -When a data file is added to the dataset, it’s manifest entry should store the snapshot ID in which the file was added and set status to 1 (added). +When a data file is added to the dataset, it’s manifest entry should store the snapshot ID and the sequence number in which the file was added and set status to 1 (added). -When a data file is replaced or deleted from the dataset, it’s manifest entry fields store the snapshot ID in which the file was deleted and status 2 (deleted). The file may be deleted from the file system when the snapshot in which it was deleted is garbage collected, assuming that older snapshots have also been garbage collected [1]. +When a data file is replaced or deleted from the dataset, it’s manifest entry fields store the snapshot ID and the sequence number in which the file was deleted and status 2 (deleted). The file may be deleted from the file system when the snapshot in which it was deleted is garbage collected, assuming that older snapshots have also been garbage collected [1]. Notes: @@ -259,6 +261,7 @@ Notes: A snapshot consists of the following fields: * **`snapshot-id`** -- A unique long ID. +* **`sequence-number`** -- A monotonically increasing value that identifies the order in which data files and deletion files are to be processed * **`parent-snapshot-id`** -- (Optional) The snapshot ID of the snapshot’s parent. This field is not present for snapshots that have no parent snapshot, such as snapshots created before this field was added or the first snapshot of a table. * **`timestamp-ms`** -- A timestamp when the snapshot was created. This is used when garbage collecting snapshots. * **`manifests`** -- A list of manifest file locations. The data files in a snapshot are the union of all data files listed in these manifests. (Deprecated in favor of `manifest-list`) @@ -308,8 +311,12 @@ Manifest list files store `manifest_file`, a struct with the following fields: | **`503 added_snapshot_id`** | `long` | ID of the snapshot where the manifest file was added | | **`504 added_files_count`** | `int` | Number of entries in the manifest that have status `ADDED` (1) | | **`505 existing_files_count`** | `int` | Number of entries in the manifest that have status `EXISTING` (0) | -| **`506 deleted_files_count`** | `int` | Number of entries in the manifest that have status `DELETED` (2) | +| **`506 deleted_files_count`** | `int` | Number of entries in the manifest that have status `DELETED` (2) | | **`507 partitions`** | `list<508: field_summary>` (see below) | A list of field summaries for each partition field in the spec. Each field in the list corresponds to a field in the manifest file’s partition spec. | +| **`512 added_row_count`** | `long` | Number of rows in all of files in the manifest that have status `ADDED` | +| **`513 existing_row_count`** | `long` | Number of rows in all of files in the manifest that have status `EXISTING` | +| **`514 deleted_row_count`** | `long` | Number of rows in all of files in the manifest that have status `DELETED` | +| **`515 sequence_number`** | `long` | Sequence number of the snapshot where the manifest file was added | `field_summary` is a struct with the following fields From baeffece06cfc4eb13206846f2bcdccd78d61015 Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Tue, 18 Feb 2020 08:47:36 +0800 Subject: [PATCH 10/11] fix words --- site/docs/spec.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/site/docs/spec.md b/site/docs/spec.md index a8b7266f666d..a3244ad9e5cb 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -313,9 +313,9 @@ Manifest list files store `manifest_file`, a struct with the following fields: | **`505 existing_files_count`** | `int` | Number of entries in the manifest that have status `EXISTING` (0) | | **`506 deleted_files_count`** | `int` | Number of entries in the manifest that have status `DELETED` (2) | | **`507 partitions`** | `list<508: field_summary>` (see below) | A list of field summaries for each partition field in the spec. Each field in the list corresponds to a field in the manifest file’s partition spec. | -| **`512 added_row_count`** | `long` | Number of rows in all of files in the manifest that have status `ADDED` | -| **`513 existing_row_count`** | `long` | Number of rows in all of files in the manifest that have status `EXISTING` | -| **`514 deleted_row_count`** | `long` | Number of rows in all of files in the manifest that have status `DELETED` | +| **`512 added_rows_count`** | `long` | Number of rows in all of files in the manifest that have status `ADDED` | +| **`513 existing_rows_count`** | `long` | Number of rows in all of files in the manifest that have status `EXISTING` | +| **`514 deleted_rows_count`** | `long` | Number of rows in all of files in the manifest that have status `DELETED` | | **`515 sequence_number`** | `long` | Sequence number of the snapshot where the manifest file was added | `field_summary` is a struct with the following fields From e72ce7c87dcf310a6a06092046fe704f70722c52 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 7 Apr 2020 16:31:20 +0800 Subject: [PATCH 11/11] address comments --- core/src/main/java/org/apache/iceberg/GenericDataFile.java | 2 ++ .../main/java/org/apache/iceberg/GenericManifestFile.java | 1 + core/src/main/java/org/apache/iceberg/ManifestWriter.java | 7 +------ 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 27cca7b637c2..fe4c7e0cc347 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -176,6 +176,7 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { this.fileSizeInBytes = toCopy.fileSizeInBytes; this.fileOrdinal = toCopy.fileOrdinal; this.sortColumns = copy(toCopy.sortColumns); + this.sequenceNumber = toCopy.sequenceNumber; if (fullCopy) { // TODO: support lazy conversion to/from map this.columnSizes = copy(toCopy.columnSizes); @@ -421,6 +422,7 @@ public String toString() { .add("file_path", filePath) .add("file_format", format) .add("partition", partitionData) + .add("sequence_number", sequenceNumber()) .add("record_count", recordCount) .add("file_size_in_bytes", fileSizeInBytes) .add("column_sizes", columnSizes) diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 8aedcefab711..15abc2beed65 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -373,6 +373,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("path", manifestPath) .add("length", length) + .add("sequence_number", sequenceNumber()) .add("partition_spec_id", specId) .add("added_snapshot_id", snapshotId) .add("added_data_files_count", addedFilesCount) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index a44526e22d20..ce5b3bac828c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -113,12 +113,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private Long sequenceNumber = null; ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { - this.file = file; - this.specId = spec.specId(); - this.writer = newAppender(FileFormat.AVRO, spec, file); - this.snapshotId = snapshotId; - this.reused = new ManifestEntry(spec.partitionType()); - this.stats = new PartitionSummary(spec); + this(spec, file, snapshotId, null); } ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId, Long sequenceNumber) {