-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Dedup files list generated in BaseSparkAction #2452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| protected Dataset<Row> buildOtherMetadataFileDF(TableOperations ops) { | ||
| List<String> otherMetadataFiles = getOtherMetadataFilePaths(ops); | ||
| return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); | ||
| return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path").distinct(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing a shuffle here, I think we should refine the approach we use to build the list of JSON files. I think what happens now is that we will take previous 100 version files in every version file and add them to the list even though each new version file has only one different entry. Will tables with 2000 snapshots and 100 previous metadata files produce a list with 200000 elements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi This current code does not do a recursive listing of all the version metadata json files.
I have made changes to #2415 to do a recursive listing and also deduped the files as part of the change.
| .as(Encoders.bean(ManifestFileBean.class)); | ||
|
|
||
| return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path"); | ||
| return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path").distinct(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This place probably makes sense to me. We can consider exposing an argument to make the dedup step optional (I am not sure it is a good idea but I want to think this through together). The dedup step we are adding is going to trigger a shuffle. Technically, we are fine in the existing expire snapshots action as it does the dedup in except.
The question is what kind of performance impact deduplicating here will have. We only have duplicates if multiple manifests reference the same files. In rewrite_manifests procedure, we rewrite all manifests, meaning we produce a snapshot with new manifests where entries are old data files. Also, there are updates and deletes that may rewrite manifests.
I think deduplicating here makes sense to me in all cases.
Thoughts, @rdblue @flyrain @RussellSpitzer @karuppayya?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left my thoughts on the other or but I think deduping shouldn't happen here because we end up with multiple shuffle stages in expire snapshots instead of just the one. An api that wants to call distinct can always do so on the return value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi When building the data files, we do a dropDuplicates, which takes care of the deduping currently.
We get duplicates for the following method
protected Dataset<Row> buildManifestFileDF(Table table) {
return loadMetadataTable(table, ALL_MANIFESTS).selectExpr("path as file_path");
}
I think different snapshots reference same manifest files, and hence we get duplicates for the manifests.
We could do dropDuplicates/ distinct while collecting manifest files in BaseSparkAction
But like @RussellSpitzer suggested, this would affect all actions with additional shuffle. We could leave it to the caller to decide the behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the downside of additional shuffle? It should be OK here if its overhead is acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking more about this, I think I agree with @RussellSpitzer. Even let's say we loaded 4 million files where 1 million is unique and 40000 manifests where only 1000 are unique. Doing 2 extra shuffles to find unique data files and manifests is probably going to be more expensive than doing one larger shuffle.
It is important we dedup manifests before reading valid data files (which the existing code does) so we should probably handle this on the caller side. Make sure we do a projection before deduplicating.
That being said, I think we should refine the logic in getOtherMetadataFilePaths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karuppayya, could you check if my assumption here is valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the version metadat files will require deduping.
|
From the discussions above, pushing the responsibility of aggregating the file names to caller seems to be the better idea. |
Dedup files list generated in BaseSparkAction.