-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29953][SS] Don't clean up source files for FileStreamSource if the files belong to the output of FileStreamSink #26590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc. @zsxwing Please also review the assumption here;
I can address the case if we would like to consider the case where metadata log is placed under subdirectory of glob path (like |
| val fileSystem: FileSystem, | ||
| val sourcePath: Path) extends Logging { | ||
|
|
||
| private val srcPathToContainFileStreamSinkMetadata = new mutable.HashMap[Path, Boolean] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cache storing the result of check whether the dir contains metadata dir or not, as we may not want to do the check per batch. This is based on the assumption that a directory won't be changed from having metadata to not having metadata or vice versa, but please let me know if the assumption doesn't sound safe. I'll remove the cache and check per batch.
|
Test build #114057 has finished for PR 26590 at commit
|
|
Test build #114065 has finished for PR 26590 at commit
|
|
Retest this, please |
|
Test build #114092 has finished for PR 26590 at commit
|
As a user I may have a directory structured where |
|
|
||
| srcPathToEntries.filterKeys { srcPath => | ||
| srcPathToContainFileStreamSinkMetadata.get(srcPath) match { | ||
| case Some(v) => !v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on caching since we spare quite a time.
Not yet sure whether it's forbidden to set existing directory as sink (haven't found any explicit statement)?
If it's allowed then cache would contain false because /a/b/c/d file found but no _spark_metadata. All of a sudden a sink query started on /a/b/c which makes cached value invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's the reason I asked for more voices - the value of cache can be invalid at any time, but then we can't cache it and have to check every time which is resource-inefficient. Maybe I'd be even OK to not use cache given we'll do it in background, but want to check if it's only me.
Yeah that's ideal, though ideally we now have to check all subdirectories, and given their status of whether they have metadata or not could be changed, we would end up check all subdirectories of source files per a batch. We might optimize the logic to only check each directory "once" per a batch (regardless of the number of source files) but still not 100% sure it's lightweight enough. |
|
@HeartSaVioR I think we can simply detect whether we are using spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala Line 205 in ba2bc4b
We don't need to do such complicated check because for cases you are checking, we won't go through |
|
@zsxwing Btw, actually I and @gaborgsomogyi considered about edge-cases which the query reads So I feel we still have to make a decision with consideration of possible side-effect; 1) try our best to prevent all known cases with (high?) costs, 2) consider these edge-cases as bad input and we don't care at all (maybe we could document it instead.) What do you think? |
|
@HeartSaVioR Checking all the files in all the directories in each micro-batch is definitely an overkill.
Only the last one is questionable what to do. Considering the possible solution complexity (globbing through the whole tree to find metadata) we can document this as |
+1. I think the fundamental issue is the FileIndex interface doesn't work for complicated things. There are multiple issues here. Another example: if a user is using a glob path in Ideally, the defending codes should be added when doing the file listing if we would like to prevent such cases because it can also prevent reading incorrect files. However, I think that's a pretty large change and probably not worth (I have not yet figured out how to make Hadoop's glob pattern codes understand Hence I suggest we just block the |
…f the source path refers to the output dir of FileStreamSink
82b6c18 to
f9dc1a4
Compare
| validFileEntities.foreach(cleaner.clean) | ||
|
|
||
| case _ => | ||
| logWarning("Ignoring 'cleanSource' option since Spark hasn't figured out whether " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just put logWarning here - I was about to throw IllegalStateException here since it doesn't sound feasible to have some files from commit() and FileStreamSource still cannot decide, but there might be some edge-case so avoided being aggressive here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about throwing an UnsupportedOperationException here:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Line 205 in ba2bc4b
| new MetadataLogFileIndex(sparkSession, qualifiedBasePath, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only "odd" case I can imagine to reach here is,
- the query executed which wrote the commit log of the last batch and stopped before writing the offset for next batch.
- the query is restarted, and
constructNextBatchis called. - somehow the source files are all deleted between 1) and 2), hence FileStreamSource doesn't see any file and cannot decide when
fetchAllFilesis called. constructNextBatchwill callcommitfor previous batch the query executed before.
It's obviously very odd case as the content of source directory are modified (maybe) manually which we don't support the case (so throwing exception would be OK), but I'm not fully sure there's no another edge-cases.
Btw, where do you recommend to add the exception? L287, or L205? If you're suggesting to add the exception in L205, I'm not sure I follow. If I'm understanding correctly, the case if the logic reaches case _ won't reach L205.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not yet see which place is the suggestion refers to.
L205, I'm not sure I follow
+1
L287: As I see this is more or less the should never happen case. The question is whether we can consider edge cases which may hit this. If we miss a valid case and we're throwing exception here we may block a query to start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3. somehow the source files are all deleted between 1) and 2)
This should be a user error.
My general point is we should make sure the data files and the metadata in _spark_metadata are consistent and we should prevent from cleaning up data files that are still tracked. Logging a warning without really deleting files is a solution, however, most of users won't be able to notice this warning from their logs. Hence we should detect this earlier. There is already a variable sourceHasMetadata tracking whether the source is reading from a file stream sink or not. We can check the options and throw an exception when flipping it. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK I guess I got your point now. I'm also in favor of being "fail-fast" and the suggestion fits it. Thanks! Just updated.
|
Thanks for the feedback. Changed the logic to check whether the source is leveraging metadata or not. Please take a look again. |
|
Test build #114262 has finished for PR 26590 at commit
|
|
Test build #114263 has finished for PR 26590 at commit
|
| logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") | ||
| validFileEntities.foreach(cleaner.clean) | ||
| sourceHasMetadata match { | ||
| case Some(true) if !warnedIgnoringCleanSourceOption => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that it's called more than once? Such case case _ => will win.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes missed that. Nice finding.
|
Test build #114406 has finished for PR 26590 at commit
|
|
@zsxwing @gaborgsomogyi I guess I addressed all review comments. Please take next round of reviews. Thanks in advance! |
|
Test build #114741 has finished for PR 26590 at commit
|
|
Test build #114742 has finished for PR 26590 at commit
|
|
Bump. |
|
LGTM. retest this please. Triggering another test since the last run was 3 days ago. |
|
retest this, please |
|
Test build #114918 has finished for PR 26590 at commit
|
|
Thanks! Merging to master, |
|
Thanks all for reviewing and merging! |
|
@zsxwing |
… the files belong to the output of FileStreamSink ### What changes were proposed in this pull request? This patch prevents the cleanup operation in FileStreamSource if the source files belong to the FileStreamSink. This is needed because the output of FileStreamSink can be read with multiple Spark queries and queries will read the files based on the metadata log, which won't reflect the cleanup. To simplify the logic, the patch only takes care of the case of when the source path without glob pattern refers to the output directory of FileStreamSink, via checking FileStreamSource to see whether it leverages metadata directory or not to list the source files. ### Why are the changes needed? Without this patch, if end users turn on cleanup option with the path which is the output of FileStreamSink, there may be out of sync between metadata and available files which may break other queries reading the path. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added UT. Closes apache#26590 from HeartSaVioR/SPARK-29953. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
What changes were proposed in this pull request?
This patch prevents the cleanup operation in FileStreamSource if the source files belong to the FileStreamSink. This is needed because the output of FileStreamSink can be read with multiple Spark queries and queries will read the files based on the metadata log, which won't reflect the cleanup.
To simplify the logic, the patch only takes care of the case of when the source path without glob pattern refers to the output directory of FileStreamSink, via checking FileStreamSource to see whether it leverages metadata directory or not to list the source files.
Why are the changes needed?
Without this patch, if end users turn on cleanup option with the path which is the output of FileStreamSink, there may be out of sync between metadata and available files which may break other queries reading the path.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added UT.