-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Fix incorrect results when writing deletion vectors in Delta Lake #23231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -327,7 +327,7 @@ public CompletableFuture<Collection<Slice>> finish() | |
| insertPageSink.finish().join().stream() | ||
| .map(Slice::getBytes) | ||
| .map(dataFileInfoCodec::fromJson) | ||
| .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info))) | ||
| .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info))) | ||
| .map(mergeResultJsonCodec::toJsonBytes) | ||
| .map(Slices::wrappedBuffer) | ||
| .forEach(fragments::add); | ||
|
|
@@ -345,7 +345,7 @@ public CompletableFuture<Collection<Slice>> finish() | |
| MoreFutures.getDone(cdfPageSink.finish()).stream() | ||
| .map(Slice::getBytes) | ||
| .map(dataFileInfoCodec::fromJson) | ||
| .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info))) | ||
| .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info))) | ||
| .map(mergeResultJsonCodec::toJsonBytes) | ||
| .map(Slices::wrappedBuffer) | ||
| .forEach(fragments::add); | ||
|
|
@@ -365,7 +365,7 @@ private Slice writeMergeResult(Slice path, FileDeletion deletion) | |
| ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); | ||
| long rowCount = parquetMetadata.getBlocks().stream().map(BlockMetadata::rowCount).mapToLong(Long::longValue).sum(); | ||
| RoaringBitmapArray rowsRetained = new RoaringBitmapArray(); | ||
| rowsRetained.addRange(0, rowCount); | ||
| rowsRetained.addRange(0, rowCount - 1); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a test which detects this issue ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The following line throws an exception without this change: assertThat(getEntriesFromJson(3, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(1).getRemove().deletionVector().orElseThrow())
.isEqualTo(deletionVector);(Not ideal test, but I assume it's enough)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What was happening as a consequence of having a too long range before? |
||
| rowsRetained.andNot(deletedRows); | ||
| if (rowsRetained.isEmpty()) { | ||
| // No rows are retained in the file, so we don't need to write deletion vectors. | ||
|
|
@@ -407,7 +407,7 @@ private Slice writeDeletionVector( | |
| deletion.partitionValues, | ||
| readStatistics(parquetMetadata, dataColumns, rowCount), | ||
| Optional.of(deletionVectorEntry)); | ||
| DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.of(newFileInfo)); | ||
| DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.empty(), Optional.of(newFileInfo)); | ||
| return utf8Slice(mergeResultJsonCodec.toJson(result)); | ||
| } | ||
| catch (Throwable e) { | ||
|
|
@@ -426,7 +426,8 @@ private Slice writeDeletionVector( | |
| private Slice onlySourceFile(String sourcePath, FileDeletion deletion) | ||
| { | ||
| String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath); | ||
| DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty()); | ||
| DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. follow-up: do we need any special handling for deletion vectors from shallowly cloned tables? |
||
| DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty()); | ||
| return utf8Slice(mergeResultJsonCodec.toJson(result)); | ||
| } | ||
|
|
||
|
|
@@ -453,7 +454,7 @@ private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion) | |
|
|
||
| Optional<DataFileInfo> newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer); | ||
|
|
||
| DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), newFileInfo); | ||
| DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo); | ||
| return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result))); | ||
| } | ||
| catch (IOException e) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,17 +16,20 @@ | |
| import jakarta.annotation.Nullable; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public record RemoveFileEntry( | ||
| String path, | ||
| @Nullable Map<String, String> partitionValues, | ||
| long deletionTimestamp, | ||
| boolean dataChange) | ||
| boolean dataChange, | ||
| Optional<DeletionVectorEntry> deletionVector) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where does delta lake need
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just noticed |
||
| { | ||
| public RemoveFileEntry | ||
| { | ||
| requireNonNull(path, "path is null"); | ||
| requireNonNull(deletionVector, "deletionVector is null"); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.