-
Notifications
You must be signed in to change notification settings - Fork 3k
[WIP] Add sequence number for supporting row level delete #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d29358a
ffc849e
09eb9a8
8f1fd1a
82420a2
9ac1143
7313947
e72cc42
f7fd87d
75fa8a8
8ce5e75
baeffec
f38052f
e72ce7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,4 +104,11 @@ public interface Snapshot { | |
| * @return the location of the manifest list for this Snapshot | ||
| */ | ||
| String manifestListLocation(); | ||
|
|
||
| /** | ||
| * Return this snapshot's sequence number, or 0 if the table has no snapshot yet. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which case will this actually return 0? If there is no snapshot, then there is no
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would make sense to return 0 if the snapshot was written without a sequence number, and to use a primitive, right? |
||
| * | ||
| * @return the sequence number of this Snapshot | ||
| */ | ||
| Long sequenceNumber(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ class BaseSnapshot implements Snapshot { | |
| private final InputFile manifestList; | ||
| private final String operation; | ||
| private final Map<String, String> summary; | ||
| private Long sequenceNumber; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chenjunjiedada Can you comment on why the max seq number is not being tracked in Snapshot but instead Snapshot Seq Number is? the Merge-on-Read doc says we use max seq number in the snapshot. Maybe i missed the context. what's the reason for switching to snapshot sequence number?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @prodeezy , you may want to check this: #588 (comment). |
||
|
|
||
| // lazily initialized | ||
| private List<ManifestFile> manifests = null; | ||
|
|
@@ -75,6 +76,31 @@ class BaseSnapshot implements Snapshot { | |
| this.manifestList = manifestList; | ||
| } | ||
|
|
||
| BaseSnapshot(FileIO io, | ||
| long snapshotId, | ||
| Long parentId, | ||
| long timestampMillis, | ||
| String operation, | ||
| Map<String, String> summary, | ||
| InputFile manifestList, | ||
| Long sequenceNumber) { | ||
| this(io, snapshotId, parentId, timestampMillis, operation, summary, manifestList); | ||
| this.sequenceNumber = sequenceNumber; | ||
| } | ||
|
|
||
| BaseSnapshot(FileIO io, | ||
| long snapshotId, | ||
| Long parentId, | ||
| long timestampMillis, | ||
| String operation, | ||
| Map<String, String> summary, | ||
| List<ManifestFile> manifests, | ||
| Long sequenceNumber) { | ||
| this(io, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null); | ||
| this.manifests = manifests; | ||
| this.sequenceNumber = sequenceNumber; | ||
| } | ||
|
|
||
| BaseSnapshot(FileIO io, | ||
| long snapshotId, | ||
| Long parentId, | ||
|
|
@@ -186,6 +212,15 @@ private void cacheChanges() { | |
| this.cachedDeletes = deletes.build(); | ||
| } | ||
|
|
||
| @Override | ||
| public Long sequenceNumber() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, does this mean that all existing snapshots written without sequence numbers will have
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's an initial implementation that needs some discussion. For example, we can generate sequence numbers from the timestamp for existing snapshots.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the timestamp-based approach would work for existing snapshots, but I am not sure how we will propagate this info down to manifests and files. For example, @rdblue, any thoughts on keeping backward compatibility?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we start using sequence numbers, anything without a sequence number should be considered older and should default to 0. That will work as long as there is some time at which we always use sequence numbers. What wouldn't work is if we have some writers that use sequence numbers and some writers that do not because the ones that don't write sequence numbers would be sequenced before writes that do: if some writer creates seq 1 and an older writer commits later, the older writer's new commit would have seq 0. That new writer would probably also drop all sequence numbers from metadata -- the table sequence number and the one for each ManifestFile stored in the ManifestList. One more thing to note: the older writer would reset sequence numbers, but metadata written by newer clients could still contain explicit sequence numbers. For example, a new writer that performs a compaction with older data will write the older data's sequence numbers into metadata. If the sequence number gets reset to 0 by an older writer after that, then sequence numbers get all mixed up. So we will need to ensure that older writers cannot operate on tables once they move to sequence numbers for correctness. That requires bumping the format version. Does that reasoning sound correct, @aokolnychyi, @chenjunjiedada?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that makes sense. Bumping the format version can prevent an old writer from writing data to a table with a new version format. We may still need to consider how to handle the existing data. So the question is do we want to allow writing from the old writer and new writer at the same time. Maybe a spark and a presto are allowed to write to the table at the same time? If we don't want to support the old writer and new writer at the same time, we can update the format version for existing data when committing.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with Ryan that old writers should not be allowed to write to tables with sequence numbers. It seems like the right time to bump the format version and include new manifest types and snapshot id inheritance. Then we can safely assign 0 sequence number to all files that were written before we start assigning new ones. |
||
| if (sequenceNumber == null) { | ||
| return 0L; | ||
| } | ||
|
|
||
| return sequenceNumber; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return MoreObjects.toStringHelper(this) | ||
|
|
@@ -194,6 +229,7 @@ public String toString() { | |
| .add("operation", operation) | ||
| .add("summary", summary) | ||
| .add("manifests", manifests()) | ||
| .add("sequence_number", sequenceNumber()) | ||
| .toString(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,7 @@ public PartitionData copy() { | |
| private Map<Integer, ByteBuffer> upperBounds = null; | ||
| private List<Long> splitOffsets = null; | ||
| private byte[] keyMetadata = null; | ||
| private Long sequenceNumber = null; | ||
|
|
||
| // cached schema | ||
| private transient org.apache.avro.Schema avroSchema = null; | ||
|
|
@@ -175,6 +176,7 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { | |
| this.fileSizeInBytes = toCopy.fileSizeInBytes; | ||
| this.fileOrdinal = toCopy.fileOrdinal; | ||
| this.sortColumns = copy(toCopy.sortColumns); | ||
| this.sequenceNumber = toCopy.sequenceNumber; | ||
| if (fullCopy) { | ||
| // TODO: support lazy conversion to/from map | ||
| this.columnSizes = copy(toCopy.columnSizes); | ||
|
|
@@ -270,6 +272,11 @@ public List<Long> splitOffsets() { | |
| return splitOffsets; | ||
| } | ||
|
|
||
| @Override | ||
| public long sequenceNumber() { | ||
| return sequenceNumber == null ? 0 : sequenceNumber; | ||
| } | ||
|
|
||
| @Override | ||
| public org.apache.avro.Schema getSchema() { | ||
| if (avroSchema == null) { | ||
|
|
@@ -332,6 +339,9 @@ public void put(int i, Object v) { | |
| case 14: | ||
| this.splitOffsets = (List<Long>) v; | ||
| return; | ||
| case 15: | ||
| this.sequenceNumber = (Long) v; | ||
| return; | ||
| default: | ||
| // ignore the object, it must be from a newer version of the format | ||
| } | ||
|
|
@@ -382,6 +392,8 @@ public Object get(int i) { | |
| return keyMetadata(); | ||
| case 14: | ||
| return splitOffsets; | ||
| case 15: | ||
| return sequenceNumber; | ||
| default: | ||
| throw new UnsupportedOperationException("Unknown field ordinal: " + pos); | ||
| } | ||
|
|
@@ -401,7 +413,7 @@ private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionTy | |
|
|
||
| @Override | ||
| public int size() { | ||
| return 15; | ||
| return 16; | ||
| } | ||
|
|
||
| @Override | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add the sequence_number inside the toString() ?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
@@ -410,6 +422,7 @@ public String toString() { | |
| .add("file_path", filePath) | ||
| .add("file_format", format) | ||
| .add("partition", partitionData) | ||
| .add("sequence_number", sequenceNumber()) | ||
| .add("record_count", recordCount) | ||
| .add("file_size_in_bytes", fileSizeInBytes) | ||
| .add("column_sizes", columnSizes) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ public class GenericManifestFile | |
| private Integer deletedFilesCount = null; | ||
| private Long deletedRowsCount = null; | ||
| private List<PartitionFieldSummary> partitions = null; | ||
| private Long sequenceNumber = null; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please also check the similar things in GenericDataFile, such as the constructor ? toString() ? etc.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
|
|
||
| /** | ||
| * Used by Avro reflection to instantiate this class when reading manifest files. | ||
|
|
@@ -156,6 +157,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { | |
| this.deletedRowsCount = toCopy.deletedRowsCount; | ||
| this.partitions = copyList(toCopy.partitions, PartitionFieldSummary::copy); | ||
| this.fromProjectionPos = toCopy.fromProjectionPos; | ||
| this.sequenceNumber = toCopy.sequenceNumber; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -227,6 +229,11 @@ public Long deletedRowsCount() { | |
| return deletedRowsCount; | ||
| } | ||
|
|
||
| @Override | ||
| public Long sequenceNumber() { | ||
| return sequenceNumber; | ||
| } | ||
|
|
||
| @Override | ||
| public List<PartitionFieldSummary> partitions() { | ||
| return partitions; | ||
|
|
@@ -272,6 +279,8 @@ public Object get(int i) { | |
| return existingRowsCount; | ||
| case 10: | ||
| return deletedRowsCount; | ||
| case 11: | ||
| return sequenceNumber; | ||
| default: | ||
| throw new UnsupportedOperationException("Unknown field ordinal: " + pos); | ||
| } | ||
|
|
@@ -320,6 +329,9 @@ public <T> void set(int i, T value) { | |
| case 10: | ||
| this.deletedRowsCount = (Long) value; | ||
| return; | ||
| case 11: | ||
| this.sequenceNumber = (Long) value; | ||
| return; | ||
| default: | ||
| // ignore the object, it must be from a newer version of the format | ||
| } | ||
|
|
@@ -361,6 +373,7 @@ public String toString() { | |
| return MoreObjects.toStringHelper(this) | ||
| .add("path", manifestPath) | ||
| .add("length", length) | ||
| .add("sequence_number", sequenceNumber()) | ||
| .add("partition_spec_id", specId) | ||
| .add("added_snapshot_id", snapshotId) | ||
| .add("added_data_files_count", addedFilesCount) | ||
|
|
@@ -397,6 +410,11 @@ public CopyBuilder withSnapshotId(Long newSnapshotId) { | |
| return this; | ||
| } | ||
|
|
||
| public CopyBuilder withSequenceNumber(Long newSequenceNumber) { | ||
| manifestFile.sequenceNumber = newSequenceNumber; | ||
| return this; | ||
| } | ||
|
|
||
| public ManifestFile build() { | ||
| return manifestFile; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if a seq number is available at manifest file level then we assume all datafiles in that manifestfile have the same seq number. But if there are datafile level seq numbers then should we be tracking any metadata at
ManifestFilelevel? Like Min/Max seq number?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the logic is just the first step that if a seq number is not found in
data_file, then we try to find it from manifestFile. It doesn't contain the logic about computing the min/max value. I would like to use separated fields to track the min/max sequence number in the next step.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok