-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Merge new position deletes with old deletes during writing #11273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Merge new position deletes with old deletes during writing #11273
Conversation
| new BasePositionDeltaWriter<>( | ||
| newDataWriter(table, writerFactory, dataFileFactory, context), | ||
| newDeleteWriter(table, writerFactory, deleteFileFactory, context)); | ||
| newDeleteWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to put this behind a spark conf which defaults to loading the previous deletes and doing the merging in the write. Maybe it's better to handle such a conf when building the mapping (if the conf is false, we could return a empty map or don't perform the broadcast).
That way in case users hit some unforseen issue with merging they can disable the behavior dynamically.
Didn't want to introduce more configuration knobs initially to keep it simple but I can see an argument that it's worth it just to have a lever for unforseen issues with performing minor compactions as part of writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the tests in TestRewritePositionDeleteFilesProcedure are expectedly failing after this change since those tests have assertions on expected delete files after performing a set of delete operations but now since there's maintenance happening as part of the write, the number of delete files will be expectedly reduced (e.g.)
If we decide to add a conf, that can be set accordingly, otherwise I'll go ahead and update the tests. At the moment, still leaning towards adding a conf for the reasons mentioned earlier but I'll get others input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead with adding the configuration and table property similar to delete file granularity.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
0af0344 to
61c0e1c
Compare
61c0e1c to
db92efa
Compare
db92efa to
e36ee93
Compare
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really promising!
|
|
||
| public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; | ||
|
|
||
| public static final String MAINTAIN_POSITION_DELETES_DURING_WRITE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we need this configuration. It is pretty clear we want to always maintain position deletes. We will also not support this property in V3. Given that we want to switch to file-scoped position deletes in V2 tables by default, I think we should just always maintain deletes if the granularity is file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, switching write.delete.granularity to file should trigger maintenance.
If set to partition, skip it as we can't do it safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write.delete.granularity to file
[doubt] are any other writers accept from spark respecting this property ? Are other writers also gonna respect this property going forward, if yes how ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is up to an engine in V2. We discuss making it a requirement for V3 on the dev list right now.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
|
|
||
| public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; | ||
|
|
||
| public static final String MAINTAIN_POSITION_DELETES_DURING_WRITE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write.delete.granularity to file
[doubt] are any other writers accept from spark respecting this property ? Are other writers also gonna respect this property going forward, if yes how ?
| } | ||
| } | ||
|
|
||
| protected Map<String, DeleteFileSet> dataToFileScopedDeletes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put an estimate on the size of the HM ? if it goes very high it can fail the query in that case should we let the query fail ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. We need to check what's the actual limit on the object size and how Spark would behave.
ac6da06 to
fc8b39f
Compare
nastra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall this LGTM, just left a few nits
| return location != null ? location.toString() : null; | ||
| } | ||
|
|
||
| public static boolean isFileScopedDelete(DeleteFile deleteFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be worth updating
iceberg/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
Line 174 in c8fe01e
| private boolean isFileScoped(DeleteFile deleteFile) { |
| String partitionStmt = "PARTITIONED BY (id)"; | ||
| sql( | ||
| "CREATE TABLE %s (id int, data string) USING iceberg %s TBLPROPERTIES" | ||
| + "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.delete.granularity'='file')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in checkDeleteFileGranularity() we're using TableProperties rather than plain strings, so I think it would be good to do the same here too
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems almost ready to me.
Like @singhpk234 mentioned, we need to verify Spark can handle this logic if the map gets large. The table state that we currently send to executors is larger than this map but we need to look into how this particular broadcast is handled. If it is split into chunks by the torrent broadcast, that should probably be OK.
@amogh-jahagirdar, could you check it is going to be safe?
core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) { | ||
| Table table = tableBroadcast.value(); | ||
| Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about a helper method and using it directly in statements below?
private Map<String, DeleteFileSet> rewritableDeletes() {
return rewritableDeletesBroadcast != null ? rewritableDeletesBroadcast.getValue() : null;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, done! Note, since it's possible now that rewritable deletes is null in the writer, I explicitly pass in the previous loader to be a function path -> null. Previously I was passing in an empty map so the lookup would return null, but i think your suggestion is better because we can avoid having to do a lookup in the map entirely
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
dd1aec3 to
7744ae5
Compare
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
Outdated
Show resolved
Hide resolved
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
| sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); | ||
| } | ||
|
|
||
| private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand we refactor common logic but this makes tests harder to read. For instance, the reader has no indication we add 4 batches in the init method.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Outdated
Show resolved
Hide resolved
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I left some minor comments/suggestions. We will revisit and benchmark this prior to 1.8.
7744ae5 to
037bfcd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well, Thanks @amogh-jahagirdar !
I am assuming since we went ahead with broadcasting approach, it sends it chunk by chunk using torrent broadcast as @aokolnychyi mentioned, so OOM not a problem ? Was mostly coming from when doing BHJ spark enforces 8GB limit and if anything more than that is observed spark fails the query.
037bfcd to
55e4674
Compare
55e4674 to
d106545
Compare
I collected some data points on memory consumption of the broadcast here https://docs.google.com/document/d/1yUObq45kBIwyofJYhurcQrpsdXQJC6NFIPBJugiZNWI/edit?tab=t.0 Torrent broadcast is performed in a chunked way but I wouldn't say that doesn't mean OOMs aren't possible. The TLDR is that we would have to be at pretty large scale (multiple millions of data files) and have a very large multiple of deletes per data file for OOMs to be hit in most environments, and running position delete maintenance to shrink that ratio + increasing memory should be a practical enough solution. As maintenance of position deletes runs, that ratio becomes more 1:1 between data to delete files. In V3, this will actually be a requirement. I did a look at more distributed approaches to compute this (changing the Spark APIs to pass historical deletes just for a particular executor) but there are limitations on that. One thing I'm looking into further is how the Spark Delta DV integration looks like to handle this, and we can perhaps take some inspiration from that, but don't think there's really any need to wait for all of that. There are relatively simple things we can do to limit the size of the global map, one is removing any unnecessary metadata that executors don't need, for example referenced manifest locations per delete file (that's only needed in the driver for more efficient commits), and also relativizing the paths in memory. That should shrink the total amount of memory that gets used by the paths in all the in-memory structure and has more of an impact the longer file path before the actual data file/delete file name. Edit: One other aspect I looked into is using Spark's My plan is to work towards having the "simple things we can do" in the 1.8 release, so that we further reduce the chance of OOMs in large scale cases. The long term plan is to look at how Spark + delta DV handles this and if it makes sense for us, incorporate that strategy here.
That 8gb limit is specific to broadcast joins, there's no such limit enforced by Spark itself for arbitrary broadcasts (of course there are system limitations that would get hit). |
|
I'll go ahead and merge, thanks for reviewing @singhpk234 @aokolnychyi . As discussed above, I will work towards shrinking the memory consumption of the broadcast until the 1.8 release! |
This change consumes the updated fanout position delete writers in #11222 to maintain position deletes during writes (minor compaction). The mapping of data files to file scoped deletes is broadcasted to executors, where delete file writers merge the historical position deletes with new position deletes. This behavior is behind a Spark conf
maintain-position-deletesand can also be controlled via thewrite.delete.maintain-during-writetable property.By default this maintenance during write is enabled.
ToDo: UpdateProjectionBenchmark may need to include some changes to use file granularity for MoR cases, so that we can look at the impact of this change before/after. The benchmark should probably be run after #11131 gets in