-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Add delete marker metadata column #2538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add delete marker metadata column #2538
Conversation
|
This is a separate part from #2372. cc @openinx @rdblue @aokolnychyi @RussellSpitzer @yyanyy |
|
To be clear, this PR is to enable reading of data while optionally marking rows as deleted rather than actually filtering them out so the delete information can be used in other utilities to rewrite delete files into more compact forms? |
|
@RussellSpitzer you are right! With a delete marker column, we can also simplify the current filtering logic a bit. |
|
|
||
| public static void assertTableRecords(Table table, List<Record> expected) throws IOException { | ||
| table.refresh(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this change? is it because two sets don't match after we add the additional metadata column? but wouldn't _pos and _file also have this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the metadata column projection logic produces additional columns even when the requestedSchema doesn't contain them and that is why we use StructLikeSet. The _pos column shows up only when positional deletes exist, the _deleted marker shows up when any of the deletes exist.
The failed unit test contains only equality delete which produces only _deleted column, so it failed with HashMultiSet comparison. But when the unit test, for example TestIcebergFilesCommitter.TestCommitTwoCheckpointsInSingleTxn, contains a positional delete, the unit test fails as well due to it has the additional column _pos. The following patch for the unit test could test it.
- DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete3));
+ DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory,
+ "pos-delete-file-1",
+ ImmutableList.of(Pair.of(dataFile1.path(), 3L)));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also find this suspicious. Is the extra column in the expected records or the table? I don't think that this PR should change the data produced by IcebergGenerics.read(table).build().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in expected records. The extra column is added in DeleteFileter#fileProjection.
private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
...
// We add it to requiredIds, so that it exists in missingIds when requestedSchema doesn't contain it.
if (!posDeletes.isEmpty()) {
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
}
....
// We append it at the end anyway.
if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
columns.add(MetadataColumns.ROW_POSITION);
}
return new Schema(columns);
}
83356a5 to
3e284b2
Compare
|
cc @jackye1995 as well. I added a couple of unit tests for this, more unit tests could be added in the deletes reader PR. @RussellSpitzer @yyanyy @rdblue @openinx, could you please help to take a look when you have time? |
|
overall looks good to me, I left some discussion comments in the original PR. |
|
Thanks @jackye1995 ! |
| Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored"); | ||
| public static final NestedField ROW_POSITION = NestedField.required( | ||
| Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file"); | ||
| public static final NestedField DELETE_MARK = NestedField.required( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of DELETE_MARK, how about IS_DELETED? I don't think that "mark" is clear enough to describe what this is. Similarly, I think the docs should be "Whether the row has been deleted". There's no need to include "delete mark" because that's identifying something that is not defined (this column is _deleted and "mark" is not introduced), and "or not" is unnecessary because it is implied by "whether".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| protected StructLikeSet rowSetWitIds(int... idsToRetain) { | ||
| protected StructLikeSet rowSetWithIds(int... idsToRetain) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fix this in a separate PR? This file doesn't need to change and it could cause commit conflicts.
| } | ||
| } | ||
|
|
||
| static class DeleteMarkerReader implements ParquetValueReader<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a constant reader that we can reuse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could use the existing constant reader.
| private final long batchOffsetInFile; | ||
|
|
||
| RowPostitionColumnVector(long batchOffsetInFile) { | ||
| RowPositionColumnVector(long batchOffsetInFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix typos in a separate PR? This is already touching quite a few files and this file doesn't need to change.
| } | ||
| } | ||
|
|
||
| private static class DeleteMarkReader implements OrcValueReader<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a constant reader that could be reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use the existing constant reader from parquet and orc readers. I also created a new constant reader for Avro reader.
ef12b5f to
4f76b4a
Compare
4f76b4a to
bf7d7ad
Compare
|
Thanks @rdblue for reviewing! I addressed your comments. |
|
@rdblue Is this ready to merge? The delete rewrites may depend on this. |
|
Thanks, @chenjunjiedada. I'll take a look. |
| // track where the _pos field is located for setRowPositionSupplier | ||
| this.posField = pos; | ||
| } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { | ||
| this.readers[pos] = new ConstantReader<>(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constants are already handled by this class, see the first branch of this if/else logic. I think that it would make more sense to reuse that rather than create a new constant reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@chenjunjiedada, the constant handling in Avro appears to be the only remaining issue. Once you update that, I'll merge this. Thanks for working on it! |
| } | ||
| } | ||
|
|
||
| static class ConstantReader<C> implements ValueReader<C> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada, since this should no longer be needed, can you please remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure,I should remove this first to track all its usage. I believe now no one is using it.
| // track where the _pos field is located for setRowPositionSupplier | ||
| this.posField = pos; | ||
| } else if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.IS_DELETED.fieldId()) { | ||
| this.readers[pos] = new ConstantReader<>(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you convert this to use positions and constants as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, my brain was not working well this early morning... Let me take a coffee first.
|
@rdblue , The last comment was addressed, could we merge this? |
|
Thanks for pinging me, @chenjunjiedada! This looks good now, I'll merge it. |
|
Thanks for the reviewing and merging! @rdblue @jackye1995 @yyanyy ! |
This adds a metadata column to indicate whether a row is deleted or not. A delete marker column can be used when finding the deleted rows as we discussed in #2372, it can also be used to simplify the overall merge on read process. In order to avoid overhead, the delete marker metadata column only projected when the delete files exist.