-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20568][SS] Provide option to clean up completed files in streaming query #22952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I feel the patch is simple to skip verifying manually against HDFS, but I'll try to spin up HDFS cluster and test this manually. EDIT: also verified with HDFS cluster. |
|
Test build #98491 has finished for PR 22952 at commit
|
|
Test build #98493 has finished for PR 22952 at commit
|
|
cc. @zsxwing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @HeartSaVioR .
Renaming is expensive in S3, isn't it? I don't worry about HDFS, but do you know if there exist potential side effects like performance degradation in the cloud environment, especially with continuous processing mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dongjoon-hyun , thanks for pointing out good point! I was being concerned about only filesystem/HDFS case and not familiar with cloud environment.
I guess we have possible options here:
- Rename in background thread.
For option 1, we may want to restrict the max files to enqueue, and when it reaches the max we may handle some of them synchronously. And we also may need to postpone JVM shutdown until all enqueued files are renamed.
- Provide additional option: delete (two options - 'rename' / 'delete' - are mutually exclusive)
Actually the actions end users are expected to take are 1. moving to archive directory (with compression or not) 2. delete periodically. If moving/renaming require non-trivial cost, end users may want to just delete files directly without backing up.
- Document the overhead to description of option.
While we can not clearly say how much the cost is, we can explain the fact the cleanup operation may affect processing of batch.
Provided options are not mutually exclusive.
cc. to @steveloughran - I think you're expert on cloud storage: could you provide your thought on this?
also cc. to @zsxwing in case of missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR . Does Flink/Storm have this feature? Or are there JIRA issues? I'm wondering if this is popular in the streaming engines and how they are handling this in the cloud situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun
For Storm, it renames input file twice, 1. in process 2. completed (actually it is not a rename, but move to archive directory). HDFS spout is created at 2015 which I don't expect there's deep consideration on cloud storage.
For Flink I have no idea, I'll explore how they handle it.
I think the feature is just an essential thing in ETL situation: a comment in JIRA clearly shows why the feature is needed.
https://issues.apache.org/jira/browse/SPARK-20568?focusedCommentId=16356929&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16356929
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The essential thing should not be slow. If we don't have a clear and written warning, the users will complain again and again due to the performance regression. Frequently, the users don't say they changed this kind of setting. Instead, they say Spark suddenly shows regressions in their environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
Since schema merging is a relatively expensive operation,
and is not a necessity in most cases, we turned it off
by default starting from 1.5.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agreed, and that matches the option 3 I've proposed. And option 1 would not affect much on critical path in a batch since rename operations will be enqueued and background thread will take care.
For option 1, guaranteeing makes the thing being complicated. If we are OK to NOT guarantee that all processed files are renamed, we can take the renaming in background (like option 1) without handling backpressure, and simply drop the requests in queue with logging if the size is beyond the threshold or JVM is shutting down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3 rename is O(data), whereas for real filesystems it is O(1). Azure is usually O(1) unless some cross-shard move takes place, then it drops to O(data)...much rarer though.
+1 for this approach. The file listing cost is huge when the directory has a lot of files. I think one of the goals of this feature is reducing the file listing cost, so it's better to not rename the files into the same directory. Either delete the files or move to a different directory should be fine. Also could you try to make one simple option for In addition, it would be great that we can document that whenever using this option, the same directory should not be used by multiple queries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd drop s3n & s3 refs as they have gone from deprecated to deceased
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like beyond of this PR: we can address it in separate PR. Could you raise another one?
|
@zsxwing @dongjoon-hyun @steveloughran While I covered the new feature with new UTs, I'm yet to test this manually with HDFS. I'll find the time to do manual test in next week. For cloud storages, TBH, it's not easy for me to do manual test against them, so I'd wish to lean on reviewers' eyes and experiences. |
|
Test build #98917 has finished for PR 22952 at commit
|
|
Test build #98919 has finished for PR 22952 at commit
|
|
Test build #98918 has finished for PR 22952 at commit
|
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good feature in general, thanks for the efforts!
My general idea is that the whole logic can happen in a different thread which could bring the following advantages:
- The delete/move time will not count in the processing time
- If files for some reason was not able to deleted/moved temporarily an instant retry will take place
- The load can be split into smaller chunks (like max 50 files per round or something) -> this one is more of a theoretical option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified something like:
matchedModeOpt match {
case None =>
throw new IllegalArgumentException(s"Invalid mode for clean source option $modeStrOption." +
s" Must be one of ${CleanSourceMode.values.mkString(",")}")
case Some(matchedMode) =>
if (matchedMode == CleanSourceMode.ARCHIVE && sourceArchiveDir.isEmpty) {
throw new IllegalArgumentException("Archive mode must be used with 'sourceArchiveDir' " +
"option.")
}
matchedMode
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fs operations can also throw exception. Why not covered these as well with try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding. Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be enforced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I guess you're right. I'll add a logic to check in initialization on FileStreamSource.
|
@gaborgsomogyi |
|
@gaborgsomogyi |
|
Test build #99180 has finished for PR 22952 at commit
|
|
@HeartSaVioR I'm fine with this, on the other hand if you're focusing on different things I'm happy to create a jira + PR for the separate thread thing to speed up processing. |
|
@gaborgsomogyi |
|
@HeartSaVioR ok, feel free to ping me if review needed. |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your PR. This is awesome. Did one round review. But I'm still thinking how to detect path overlap and how to get the new path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you create a method to CleanSourceMode to convert a string to CleanSourceMode.Value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option, because source files will be moved or deleted which behavior may impact the other sources and queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding. I missed the case which multiple sources in same query refer same file directory. Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you do this check only when CleanSourceMode is ARCHIVE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes missed it. Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to also check the return value of rename. A user may reuse a source archive dir and cause path conflicts. We should also log this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I guess the patch prevents the case if it works like my expectation, but I'm also in favor of defensive programming and logging would be better for end users. Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename()'s return of true/false is pretty meaningless, as in "says that it fails, but doesn't provide any explanation as to why". See HADOOP-11452 for discussion on making rename/3 public -this does through useful exceptions on failures. Happy for anyone to take up work on that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) to use the underlying cache in FileStreamSourceLog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I didn't indicate that. Thanks for letting me know! Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val curPath = new Path(new URI(entry.path)) to make it escape/unescape path properly. entry.path was created from Path.toUri.toString. Could you also add a unit test to test special paths such as /a/b/a b%.txt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... actually I was somewhat confused I have to escape/unescape for path. Thanks for suggestion. Will address and add a new unit test for testing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just modified existing UT to have space and % in directory name as well as file name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to use fs.makeQualified to turn all user provided paths to absolute paths as the user may just pass a relative path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding. Will address.
|
@zsxwing Thanks for the detailed review! Addressed review comments. |
|
@zsxwing Btw, how do you think about addressing background move/deletion (I had thought and @gaborgsomogyi also suggested as well) into separate issue? I guess putting more feature would let you spend more time to review. |
|
Test build #99450 has finished for PR 22952 at commit
|
Yeah, this can be done in a separate ticket. I was playing with The worst case, we can check the overlap when parsing the options for a normal path. For glob path, we can use |
|
@zsxwing For example, suppose source path is This is not an elegant approach and the approach has false-positive, ending up restricting the archive path which actually doesn't make overlap (too restrict), but it would guarantee two paths never overlap. (So no need to re-check when renaming file.) I guess the approach might be reasonable because in practice end users would avoid themselves have to think about complicated case on overlaps, and just isolate two paths. What do you think about this approach? cc. @gaborgsomogyi Could you also help validating my approach? |
|
Test build #99567 has finished for PR 22952 at commit
|
|
@HeartSaVioR
This has thrown
Checking the glob part... |
|
@HeartSaVioR This should throw |
|
Hadoop FS glob filtering is pathologically bad on object stores. I think there you'd want to have a threshold as to how many path elements up you'd switch from ls dir + match into the full deep listfiles(recursive) scan Not looked at it for ages. If someone does want to play there, welcome to take it up |
That's an old patch; I don't know of any active dev there. |
|
retest this, please |
|
Test build #112711 has finished for PR 22952 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok, a few nits remaining.
| "s3a://a/b/c/dataset.txt"<br/> | ||
| <code>cleanSource</code>: option to clean up completed files after processing.<br/> | ||
| Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/> | ||
| When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here This will ensure archived files are never included as new source files.<br/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<code></code> around directory path, period before This.
| <code>cleanSource</code>: option to clean up completed files after processing.<br/> | ||
| Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/> | ||
| When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here This will ensure archived files are never included as new source files.<br/> | ||
| Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"<br/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/its/their
| require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) | ||
|
|
||
| baseArchiveFileSystem.foreach { fs => | ||
| require(fileSystem.getUri == fs.getUri, "Base archive path is located to the different" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Base archive path is located on a different file system than the source files."
|
Test build #112793 has finished for PR 22952 at commit
|
|
retest this, please |
|
Test build #112800 has finished for PR 22952 at commit
|
|
retest this, please |
|
Test build #112824 has finished for PR 22952 at commit
|
|
retest this please |
|
Test build #112855 has finished for PR 22952 at commit
|
| require(baseArchivePath.isDefined) | ||
|
|
||
| val curPath = new Path(new URI(entry.path)) | ||
| val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try to not this API? This has a known issue. See https://issues.apache.org/jira/browse/SPARK-28841 and https://issues.apache.org/jira/browse/HDFS-14762
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm revisiting two issues and not sure there's a viable workaround. Looks like the issue pointed out was ":" isn't a valid char for HDFS but might be a valid char for other filesystems so Path API doesn't restrict it and leads problem. Even HDFS-14762 is closed as "Won't fix".
Would this only occur on Path(parent, child) and Path(pathstr) is safe? Would it work if we manually concat two paths as string and pass to Path's constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only occurs in Path(parent, child). I think we can manually concat two paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick feedback! I'll reflect it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it makes people any happier know that "." isn't allowed as the last char in an ABFS filename. Every store has surprises
|
Test build #112867 has finished for PR 22952 at commit
|
|
Could we go through the next round of review? I guess it's close to be ready to be merged. Thanks in advance! |
| * than 2, as neither file nor parent directory of destination path can be matched with | ||
| * source path. | ||
| */ | ||
| require(path.depth() > 2, "Base archive path must have a depth of at least 2 " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check says "greater than 2" but the error says "at least 2". Which one is right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the explanation says about 2 "subdirectories", not 2 "depth". / denotes its own depth. I don't think depth is the term end users are familiar with - I'll remove the part "a depth of".
| } | ||
|
|
||
| baseArchivePath.foreach { path => | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
| fs.makeQualified(new Path(path)) // can contain glob patterns | ||
| } | ||
|
|
||
| private val sourceCleaner: FileStreamSourceCleaner = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't going to ask, but since I have more comments... I think it's better if this were an Option and set to None if the cleaner is off.
Similarly, below, you'll resolve the sourceArchiveDir even if the cleaner is not set to archive, which is not necessary.
(I'm almost suggesting that there should be a separate implementation for delete and for archive to make this, and the code calling it, a bit cleaner.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a good suggestion. Will address.
|
Test build #113055 has finished for PR 22952 at commit
|
|
Test build #113069 has finished for PR 22952 at commit
|
|
Merging to master. |
|
Thanks for reviewing and merging! I'll file the follow-up issue on cleaning from the background thread. |
|
Great news, awesome job! Now that we can delete files after process them, will be good to have an option to re-process the file if it was re-uploaded to the same folder with the same name. It was proposed in #23782 and I think it's a good complimentary feature to this one. Any chance it get a round of review @vanzin @HeartSaVioR @gaborgsomogyi? |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR Thanks for adding this great feature. Sorry that I was busy and didn't have time to finish my review. I just did a post-hoc review and found one issue: https://issues.apache.org/jira/browse/SPARK-29953. Could you help fix it? Thanks! I also left one question regarding the depth check.
| /** | ||
| * FileStreamSource reads the files which one of below conditions is met: | ||
| * 1) file itself is matched with source path | ||
| * 2) parent directory is matched with source path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR Could you clarify this? I think there are some cases we still read files but they don't met these conditions:
- Partitioned paths, such as "/foo/bar/p1=a/p2=b" and the source path is "/foo/bar".
recursiveFileLookupis set to true (See https://issues.apache.org/jira/browse/SPARK-27990).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing
Thanks for spending your time to revisit this! The condition is based on the test suite in FileStreamSource, but for partitioned paths, yes that's missed. Nice finding. I need to update the condition, or just remove the condition documented there at all.
For recursiveFileLookup, it came later than the patch and I missed it. The condition was formed in early this year, and recursiveFileLookup seemed to come in mid this year.
Adding two cases, FileStreamSource can read any files under the source path, which invalidates the depth check. There're three options to deal with this:
- No pattern check and just try to rename. Log it if it fails to rename. (Caution! It doesn't prevent archived file to be added to source file again in different directory.)
- Disallow any path to be used as base archive path if the path matches the source path (glob) - here "disallow" means fail the query. After then we don't need to check the pattern. If end users provide complicated glob path as source path, they also may be puzzled how to not match, but not sure they would really want to set the path be complicated in production.
- Do pattern check before renaming, though it needs checking pattern per file. We may optimize this a bit via grouping files per directory and check the pattern with directory instead of individual files. It doesn't fail the query so end users need to check whether the files are not cleaned up due to the pattern check.
Which one (or couple of) would be the preferred approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, just filed https://issues.apache.org/jira/browse/SPARK-30281 and raised a patch with picking the option 2. #26920
What changes were proposed in this pull request?
This patch adds the option to clean up files which are completed in previous batch.
cleanSource-> "archive" / "delete" / "off"The default value is "off", which Spark will do nothing.
If "delete" is specified, Spark will simply delete input files. If "archive" is specified, Spark will require additional config
sourceArchiveDirwhich will be used to move input files to there. When archiving (via move) the path of input files are retained to the archived paths as sub-path.Note that it is only applied to "micro-batch", since for batch all input files must be kept to get same result across multiple query executions.
How was this patch tested?
Added UT. Manual test against local disk as well as HDFS.