-
Notifications
You must be signed in to change notification settings - Fork 3k
Dedup files list generated in BaseSparkAction #2452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -159,7 +159,7 @@ protected Dataset<Row> buildValidDataFileDF(Table table) { | |
| .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks | ||
| .as(Encoders.bean(ManifestFileBean.class)); | ||
|
|
||
| return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path"); | ||
| return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path").distinct(); | ||
| } | ||
|
|
||
| protected Dataset<Row> buildManifestFileDF(Table table) { | ||
|
|
@@ -173,7 +173,7 @@ protected Dataset<Row> buildManifestListDF(Table table) { | |
|
|
||
| protected Dataset<Row> buildOtherMetadataFileDF(TableOperations ops) { | ||
| List<String> otherMetadataFiles = getOtherMetadataFilePaths(ops); | ||
| return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); | ||
| return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path").distinct(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of doing a shuffle here, I think we should refine the approach we use to build the list of JSON files. I think what happens now is that we will take previous 100 version files in every version file and add them to the list even though each new version file has only one different entry. Will tables with 2000 snapshots and 100 previous metadata files produce a list with 200000 elements?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aokolnychyi This current code does not do a recursive listing of all the version metadata json files. |
||
| } | ||
|
|
||
| protected Dataset<Row> buildValidMetadataFileDF(Table table, TableOperations ops) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This place probably makes sense to me. We can consider exposing an argument to make the dedup step optional (I am not sure it is a good idea but I want to think this through together). The dedup step we are adding is going to trigger a shuffle. Technically, we are fine in the existing expire snapshots action as it does the dedup in
except.The question is what kind of performance impact deduplicating here will have. We only have duplicates if multiple manifests reference the same files. In
rewrite_manifestsprocedure, we rewrite all manifests, meaning we produce a snapshot with new manifests where entries are old data files. Also, there are updates and deletes that may rewrite manifests.I think deduplicating here makes sense to me in all cases.
Thoughts, @rdblue @flyrain @RussellSpitzer @karuppayya?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left my thoughts on the other or but I think deduping shouldn't happen here because we end up with multiple shuffle stages in expire snapshots instead of just the one. An api that wants to call distinct can always do so on the return value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi When building the data files, we do a
dropDuplicates, which takes care of the deduping currently.We get duplicates for the following method
I think different snapshots reference same manifest files, and hence we get duplicates for the manifests.
We could do
dropDuplicates/distinctwhile collecting manifest files inBaseSparkActionBut like @RussellSpitzer suggested, this would affect all actions with additional shuffle. We could leave it to the caller to decide the behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the downside of additional shuffle? It should be OK here if its overhead is acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking more about this, I think I agree with @RussellSpitzer. Even let's say we loaded 4 million files where 1 million is unique and 40000 manifests where only 1000 are unique. Doing 2 extra shuffles to find unique data files and manifests is probably going to be more expensive than doing one larger shuffle.
It is important we dedup manifests before reading valid data files (which the existing code does) so we should probably handle this on the caller side. Make sure we do a projection before deduplicating.
That being said, I think we should refine the logic in
getOtherMetadataFilePaths.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karuppayya, could you check if my assumption here is valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the version metadat files will require deduping.