-
Notifications
You must be signed in to change notification settings - Fork 3k
[WIP] Add sequence number for supporting row level delete #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@chenjunjiedada, thanks for working on this! I'll be interested to review whenever I have a bit of time. |
|
@chenjunjiedada, sorry for the delay. I hope to have more time next week. |
| * | ||
| * @param seqNum the sequential number to assign | ||
| */ | ||
| void setSeqNum(long seqNum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface should not include a setter method. DataFile is effectively immutable.
| optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())) | ||
| // NEXT ID TO ASSIGN: 134 | ||
| optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())), | ||
| optional(134, "sequential_number", LongType.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the field should be named "sequence_number".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
| } | ||
|
|
||
| private ManifestFile writeManifest() throws IOException { | ||
| private ManifestFile writeManifest(long availSeqNum) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sequence numbers are not written into manifests during commit because the order of commits is not known until the commit succeeds. Retries require changing the sequence number.
To get around this, sequence numbers are written into the manifest list for each ManifestFile object, and are inherited when reading a manifest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
|
@chenjunjiedada, I think a good place to start would be to add |
|
@rdblue , sorry for the late response. I will update this weekend. |
705c9d6 to
aff8316
Compare
|
@rdblue, PR updated. Here I use |
| } | ||
| JsonNode pNode = node.get(property); | ||
| Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isNumber() && pNode.canConvertToLong(), | ||
| "Cannot parse %s from non-string value: %s", property, pNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should the error message say "non-string" or "non-numeric"?
| } | ||
|
|
||
| @Override | ||
| public Long sequenceNumber() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, does this mean that all existing snapshots written without sequence numbers will have sequenceNumber as 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an initial implementation that needs some discussion. For example, we can generate sequence numbers from the timestamp for existing snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the timestamp-based approach would work for existing snapshots, but I am not sure how we will propagate this info down to manifests and files. For example, ManifestFile keeps track of the snapshot in which the manifest was added, but that snapshot can be already expired, so we won't be able to fetch its metadata. Maybe, defaulting to 0 won't be an issue, let me think more about this.
@rdblue, any thoughts on keeping backward compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we start using sequence numbers, anything without a sequence number should be considered older and should default to 0. That will work as long as there is some time at which we always use sequence numbers.
What wouldn't work is if we have some writers that use sequence numbers and some writers that do not because the ones that don't write sequence numbers would be sequenced before writes that do: if some writer creates seq 1 and an older writer commits later, the older writer's new commit would have seq 0. That new writer would probably also drop all sequence numbers from metadata -- the table sequence number and the one for each ManifestFile stored in the ManifestList.
One more thing to note: the older writer would reset sequence numbers, but metadata written by newer clients could still contain explicit sequence numbers. For example, a new writer that performs a compaction with older data will write the older data's sequence numbers into metadata. If the sequence number gets reset to 0 by an older writer after that, then sequence numbers get all mixed up. So we will need to ensure that older writers cannot operate on tables once they move to sequence numbers for correctness. That requires bumping the format version.
Does that reasoning sound correct, @aokolnychyi, @chenjunjiedada?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that makes sense. Bumping the format version can prevent an old writer from writing data to a table with a new version format. We may still need to consider how to handle the existing data.
So the question is do we want to allow writing from the old writer and new writer at the same time. Maybe a spark and a presto are allowed to write to the table at the same time? If we don't want to support the old writer and new writer at the same time, we can update the format version for existing data when committing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Ryan that old writers should not be allowed to write to tables with sequence numbers. It seems like the right time to bump the format version and include new manifest types and snapshot id inheritance. Then we can safely assign 0 sequence number to all files that were written before we start assigning new ones.
| String manifestListLocation(); | ||
|
|
||
| /** | ||
| * Return this snapshot's sequence number, or 0 if the table has no snapshot yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case will this actually return 0? If there is no snapshot, then there is no Snapshot object, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make sense to return 0 if the snapshot was written without a sequence number, and to use a primitive, right?
| public void testReadSequenceNumber() { | ||
| long curSeqNum; | ||
|
|
||
| if (table.currentSnapshot() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableTestBase always creates a fresh table for each test, so this if block seems redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the snapshot is produced in SnapshotProducer and the SnapshotProducer has three subtypes BaseRewriteManifests, FastAppend, MergeAppend, so only when commit is triggered inside these classes, for example, append one file and commit, a new snapshot is generated. Here we just create an empty table and no data has been written yet, so the snapshot is null. This is a bit weird though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is table.currentSnapshot() == null will be always true, so it looks like the else branch will never be executed.
| import org.junit.Test; | ||
|
|
||
|
|
||
| public class TestSequenceNumber extends TableTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be great to add more tests. A few possible scenarios:
- Concurrent conflicting operations
- Transactions with multiple operations
- Rollback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I will add more unit tests in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it also help to add a test for picked snapshots (cherry pick feature with WAP)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fbocse! Let me take a look for the cherry-picked snapshot feature first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there are changes to commit workflow (MergingSnapshotProducer), it would be good to test commit-fail-retry scenarios like TestTransaction does.
|
@chenjunjiedada @rdblue do we want to add sequence numbers to manifests/files and to modify |
|
@aokolnychyi, Thanks for your review, I will submit a commit this week to add more unit tests. For the sequence number of Manifests and Datafile, I think the next step to add a PR to add sequence number for manifests in manifestList as Ryan commented before, while I haven't started that yet due to an internal milestone. You can go ahead if you are interested:) |
|
@chenjunjiedada @rdblue, I spent some time prototyping how we can inherit snapshot ids and created #675 as a result. I think we simply need to make sure that logic there is flexible enough to inherit sequence numbers later on. It would be great to hear what you think. |
|
@aokolnychyi, @rdblue , I updated the code with the basic inheritance feature. Could you please help to have a look? Will rebase later. |
| OutputFile out = manifestPath(manifestCount.getAndIncrement()); | ||
|
|
||
| ManifestWriter writer = new ManifestWriter(spec, out, snapshotId()); | ||
| ManifestWriter writer = new ManifestWriter(spec, out, snapshotId(), sequenceNumber()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sequence number may changes when the commit fails, and a sequence number is not like snapshot id that can be used in the next commit. So this logic should be wrong. We may need to store sequence numbers in the manifest list file, it seems too much for manifest list file. @rdblue @aokolnychyi what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should misunderstand the logic, please ignore this.
| } | ||
|
|
||
| // reset sequence number to null so that the sequence number can be updated when retry | ||
| this.sequenceNumber = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the sequence number cannot be unique, so it has to reset to null for next use.
|
Hi @rdblue, @aokolnychyi, I rebased the code. Could you please help to have a look at your convenience? |
|
Thanks, @chenjunjiedada! Will take a look today. |
Conflicts: core/src/main/java/org/apache/iceberg/SnapshotProducer.java
| * @return The sequence number to identify the order in which data files and deletion files are to be processed. | ||
| * If the sequence number is not specified it is inherited from the manifest file struct in the manifest list file. | ||
| */ | ||
| Long sequenceNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this ever return null? I would rather use a valid number and a primitive type to ensure that we always have a sequence number that is valid. For older data, the sequence number should be 0, indicating that it is older than when we started writing sequence numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use boxing here because I don't want to rewrite the manifest file. The sequence number is not determined when committing the update, leave it to null so that we can inherit the sequence number from the value in the manifest list file.
If we use the primitive type, we need to check where the sequence number of the manifest is null or not. It should be doable, let me update this.
| addedManifests.add(manifest); | ||
| } else { | ||
| // the manifest must be rewritten with this update's snapshot ID | ||
| // the manifest must be rewritten with this update's snapshot ID and sequence number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rewrite cannot set the sequence number. If it did, then we would have to rewrite the manifest again if the commit fails and has to retry -- because the sequence number would be incremented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. When the commit fails, it just needs to update the sequence number in the manifest list file and doesn't have to keep sequence number in the manifest file.
| this.fromProjectionPos = null; | ||
| } | ||
|
|
||
| public GenericManifestFile(String path, long length, int specId, Long snapshotId, Long sequenceNumber, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an internal class, so it shouldn't be necessary to add a new constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, no place uses it actually.
|
@rdblue @aokolnychyi, Base on the comments, I assume the next steps are
What do you think? |
|
We already have the logic to check the reading format version so I just bump the version. @rdblue @aokolnychyi could you please help to take another look? Do you think this is ready? |
|
Hm, Looks like we need to update spec.md as well at least. |
|
@rdblue @aokolnychyi , Any more comments or suggestions? |
Conflicts: site/docs/spec.md
| return 16; | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the sequence_number inside the toString() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GenericDataFile(GenericDataFile toCopy, boolean fullCopy) also need to copy the sequence_number I guess.
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, left few comments. Ping @rdblue , @aokolnychyi, any other concerns ? Please help to confirm this spec again so that we can move the row-delete a bit faster, Thanks in advance.
| return 16; | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GenericDataFile(GenericDataFile toCopy, boolean fullCopy) also need to copy the sequence_number I guess.
| private Integer deletedFilesCount = null; | ||
| private Long deletedRowsCount = null; | ||
| private List<PartitionFieldSummary> partitions = null; | ||
| private Long sequenceNumber = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also check the similar things in GenericDataFile, such as the constructor ? toString() ? etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| private long deletedRows = 0L; | ||
| private Long sequenceNumber = null; | ||
|
|
||
| ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could write it as :
this(spec, file, snapshotId, null);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
| private final InputFile manifestList; | ||
| private final String operation; | ||
| private final Map<String, String> summary; | ||
| private Long sequenceNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada Can you comment on why the max seq number is not being tracked in Snapshot but instead Snapshot Seq Number is? the Merge-on-Read doc says we use max seq number in the snapshot. Maybe i missed the context. what's the reason for switching to snapshot sequence number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prodeezy , you may want to check this: #588 (comment).
| * The data files' sequence number is optional, it should inherit the manifest's sequence number if the reader reads | ||
| * null from manifest file. | ||
| */ | ||
| Long sequenceNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if a seq number is available at manifest file level then we assume all datafiles in that manifestfile have the same seq number. But if there are datafile level seq numbers then should we be tracking any metadata at ManifestFile level? Like Min/Max seq number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the logic is just the first step that if a seq number is not found in data_file, then we try to find it from manifestFile. It doesn't contain the logic about computing the min/max value. I would like to use separated fields to track the min/max sequence number in the next step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
@openinx @prodeezy Thanks for your review. This doesn't contain min/max sequence number computing logic right now. The plan is to add the sequence number at first since that would need some essential changes and may break compatibility like we already raised for table format. Once the sequence number is in, we will consider the optimization including min/max sequence number and etc. |
|
@chenjunjiedada Like you mentioned how the Manifestfile::sequenceNumber and DataFile::sequenceNumber are related, can you document (maybe in a field comment) what their relationship is with Snapshot::sequenceNumber? is it used for higher level ordering or default values in case Manifest doesn't have a seuqnceNumber? Will Snapshots be sorted by sequence number? |
|
Close this because it is obsoleted. |
This is an initial commit, need some discussion and review.