-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30462][SS] Streamline the logic on file stream source and sink metadata log to avoid memory issue #28904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
Outdated
Show resolved
Hide resolved
| * Returns all files except the deleted ones. | ||
| */ | ||
| def allFiles(): Array[T] = { | ||
| def allFiles(predicate: T => Boolean = _ => true): Array[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also have a streamlined version of this method to avoid materializing all entries on initializing FileStreamSource, though I think there's the another problem we should solve (file stream source log should not have bunch of entries - think about other data sources) and once we fixed that issue it won't matter at all.
| val logs = | ||
| getAllValidBatches(latestId, compactInterval).flatMap { id => | ||
| super.get(id).getOrElse { | ||
| filterInBatch(id)(shouldRetain).getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would help when we introduce a new condition on exclusion of entries.
| // Otherwise, this is a real IO issue and we should throw it. | ||
| latestId = nextCompactionBatchId(latestId, compactInterval) | ||
| super.get(latestId).getOrElse { | ||
| val expectedMinLatestId = nextCompactionBatchId(latestId, compactInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new approach is to avoid reading the next compact file log, which materializes all entries into the file. It should be extreme case, so it's also OK to keep this as it is if someone strongly think the previous one is better.
| logs.filter(f => !deletedFiles.contains(f.path)) | ||
| } | ||
| override def shouldRetain(log: SinkFileStatus): Boolean = { | ||
| log.action != FileStreamSinkLog.DELETE_ACTION |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I just keep this, I think we should just remove this. As I left TODO below, it hasn't been used, exists hypothetically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just think out loud, DELETE_ACTION is not needed even we consider data compaction, if we do data compaction in compact batch. Nevertheless, let's focus on current state for now. DELETE_ACTION can be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll just remove this condition - I believe we shouldn't concern about DELETE_ACTION, but even we concern about that, it would work better if we leave it to the reader, as FileStreamSinkLog won't be able to match pair of ADD_ACTION and DELETE_ACTION and remove the entries.
Again, DELETE_ACTION should be just dropped, unless we have a plan to implement it properly.
| object FileStreamSinkLog { | ||
| val VERSION = 1 | ||
| // TODO: This action hasn't been used from the introduction. We should just remove this. | ||
| // TODO: We can remove the field "action" as well, ignoring "action" in existing metadata log. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this would also help to reduce the size of each entry. OK to leave only ADD_ACTION if JSON serializer/deserializer complains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's consider only when we are open to introduce another version. Don't want to deal with compatibility even without bumping version.
|
also cc. @xuanyuanking @uncleGen as they have been reviewed sibling PRs. |
|
Test build #124408 has finished for PR 28904 at commit
|
|
Test build #124403 has finished for PR 28904 at commit
|
|
Test build #124410 has finished for PR 28904 at commit
|
|
Just to be sure, the test app with patch now writes the version 1589, which the log file size is 2.9G, with RES 1.025g. |
|
Combining efforts together:
10x speedup, reduction of log file size (missed the batch 919 but 1369.compact only took 678M), consistent memory usage (under 1.2G) |
|
I've been running sustain tests (still running) and here's some observation:
|
|
Very impressive! I'll review this in 2 days. |
|
UPDATE: now SPARK-30946 + SPARK-30462 writes 11879 which RES is still less than 2G (around 1.7G). I'll stop the sustaining test for enough heap and run the another sustaining test for smaller heap (1.5G). |
|
UPDATE: SPARK-30946 + SPARK-30462 with lower down driver memory to 1.5G now writes batch 9039 which RES is around 1.3g. I guess the process uses up available memory if possible, but not leads to OOME. |
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
Outdated
Show resolved
Hide resolved
| * NOTE: This no longer fails early on corruption. The caller should handle the exception | ||
| * properly and make sure the logic is not affected by failing in the middle. | ||
| */ | ||
| def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: duplicated code with get.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
Show resolved
Hide resolved
|
Test build #124716 has finished for PR 28904 at commit
|
|
org.apache.spark.sql.hive.thriftserver.HiveSessionImplSuite.(It is not a test it is a sbt.testing.SuiteSelector) This seems to be failing frequently. I'll see other build result being run via 124719. |
|
Test build #124719 has finished for PR 28904 at commit
|
|
retest this, please |
|
Test build #124870 has finished for PR 28904 at commit
|
|
retest this, please |
|
Test build #124877 has finished for PR 28904 at commit
|
|
retest this, please |
|
Test build #124945 has finished for PR 28904 at commit
|
|
retest this, please |
|
Test build #124973 has finished for PR 28904 at commit
|
| @@ -1,3 +1,2 @@ | |||
| v1 | |||
| {"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"} | |||
| {"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"} | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be removed - this gave false information as we support DELETE_ACTION previously, but in reality it was never implemented.
|
Yep that's right. I'm also looking at the code in detail and try to find a way both keep this API and have the improvement. If it's hard to achieve, of course the improvement has a higher priority. I'll take a closer look today. |
|
Technically it's a private API, even not tagged as developer API - that said, it doesn't break anything in Spark's perspective. If we have confusion with availability of |
|
Test build #125933 has finished for PR 28904 at commit
|
|
retest this, please |
|
Test build #125956 has finished for PR 28904 at commit
|
|
cc. @tdas @zsxwing @jose-torres as it's technically not possible to merge only from @xuanyuanking 's review. |
|
@HeartSaVioR - This is a much-needed fix. Thanks for it. I have an orthogonal question. Why do we need to worry about file sink metadata files? I can think of the following reasons
If the compacted metadata file size is running into GBs, the number of valid files would be in millions. In practice, the end-user will consider this sink path as a staging location and have another job to compact these small files into a final destination. for exactly-once semantics, we can add make changes in ManifestFileCommitter to delete files in the abort function. Or we can come up with some other alternatives. In short, if we provide an option just to have last few commits in sink metadata to ensure SS is not impacted. And make changes in various readers not to read using metadata log files. Won't it help in ensuring the reliability of the streaming job? |
I already provided the change (Spark 3.0.0 if I remember correctly), but this is only "best-effort". You cannot deal with crashing scenario. Given we directly write the file to the final path, worth noting that reading output files in directory directly doesn't only mean you're reading duplicated outputs (at least once). This also means there's a chance you may be reading incomplete/corrupted files as well, in some sort of crashing during write. Stepping back to explain the rationalization and the goal - providing holistic solution is not a goal. There're already lots of efforts being made to provide holistic solution, though these efforts are happening "outside" of Spark codebase, Delta Lake, Apache Iceberg, Apache Hudi, and probably more. I'd just reinvent the wheel if I try to address the entire problems, which I can't persuade anyone to provide me enough time to work. My goal of the overall improvements in file stream source/sink is, enabling end users run the query a bit longer without weird issues (like OOM). I just had to take easier (and limited) approach to solve the issue. e.g. regarding growing entries issue, while alternatives support data compaction to reduce the overall number of files without losing anything, I proposed retention of the output files so that older files can be excluded in metadata log but they no longer be accessible. End users need to pick up alternatives once they cannot live with the limitations. |
|
cc. @tdas @zsxwing @jose-torres kindly reminder |
|
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the following notable change, I also agree with @HeartSaVioR 's assessment (#28904 (comment)) but didn't receive any strong feedback here.
- def compactLogs(logs: Seq[T]): Seq[T]
+ def shouldRetain(log: T): BooleanPlease let us know if you disagree, @tdas , @zsxwing , @jose-torres . If there is no written feedback, this is considered as OK in the community because two committers (@HeartSaVioR and me) are already agreeing in the written discussion.
For the remaining part, we will continue to discuss.
|
Test build #127241 has finished for PR 28904 at commit
|
|
|
||
| object FileStreamSinkLog { | ||
| val VERSION = 1 | ||
| // TODO: This action hasn't been used from the introduction. We should just remove this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you file a JIRA and make it IDed TODO, @HeartSaVioR ?
That will improve the visibility on DELETE_ACTION related issues. That will lead a follow-up discussion on DELETE_ACTION after this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a JIRA number in the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Will file an issue and add JIRA number into the code comment.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
Show resolved
Hide resolved
| def foreachInBatch(batchId: Long)(fn: T => Unit): Unit = applyFnInBatch(batchId)(_.foreach(fn)) | ||
|
|
||
| /** | ||
| * Apply filter on all entries in the specific batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. Shall we mention IllegalStateException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. Would we want to mention all possible exceptions per method? IllegalStateException is the explicit one, but there're also many possible exceptions, starting from IOException.
| */ | ||
| def compactLogs(logs: Seq[T]): Seq[T] | ||
| /** Determine whether the log should be retained or not. */ | ||
| def shouldRetain(log: T): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR . Can we have def shouldRetain(log: T): Boolean = true to simplify?
true is reasonable for the default value for Log, isn't it? And, in this PR, all derived class overrides with true only. I believe that the special derived classes will override in any way according to their semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this improvement. I left a few minor comments, @HeartSaVioR .
| * FileNotFoundException if the metadata log file doesn't exist. | ||
| * | ||
| * NOTE: This doesn't fail early on corruption. The caller should handle the exception | ||
| * properly and make sure the logic is not affected by failing in the middle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the caller does not handle the exception properly, what is the consequence? Do we have a test case to cover it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to ensure all the callers handle it properly? Do we need to introduce a test to cover it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how this class ensures callers are following the guide. Did you mean we'd like to test this behavior with derived classes (file stream source/sink) log? Or we'd like to test this behavior with test-purpose implementation of CompactibleFileStreamLog?
| } | ||
| } | ||
|
|
||
| def serializeEntry(entry: T, out: OutputStream): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
| out.write(Serialization.write(entry).getBytes(UTF_8)) | ||
| } | ||
|
|
||
| def deserializeEntry(line: String): T = Serialization.read[T](line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
|
|
||
| /** | ||
| * Store the metadata for the specified batchId and return `true` if successful. This method | ||
| * fills the content of metadata via executing function. If the function throws exception, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: exception -> an exception
|
Thanks for the review comments. Please allow some more days (probably next week) to address as I'm on vacation this week and have limited time on looking into laptop. :) Thanks again! |
|
Sure. Have a nice vacation and take care, @HeartSaVioR . |
|
Test build #127530 has finished for PR 28904 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @HeartSaVioR and all.
Merged to master for Apache Spark 3.1.0 on December 2020.
|
Thanks all for reviewing and merging! |
What changes were proposed in this pull request?
In many operations on CompactibleFileStreamLog reads a metadata log file and materializes all entries into memory. As the nature of the compact operation, CompactibleFileStreamLog may have a huge compact log file with bunch of entries included, and for now they're just monotonically increasing, which means the amount of memory to materialize also grows incrementally. This leads pressure on GC.
This patch proposes to streamline the logic on file stream source and sink whenever possible to avoid memory issue. To make this possible we have to break the existing behavior of excluding entries - now the
compactLogsmethod is called with all entries, which forces us to materialize all entries into memory. This is hopefully no effect on end users, because only file stream sink has a condition to exclude entries, and the condition has been never true. (DELETE_ACTION has been never set.)Based on the observation, this patch also changes the existing UT a bit which simulates the situation where "A" file is added, and another batch marks the "A" file as deleted. This situation simply doesn't work with the change, but as I mentioned earlier it hasn't been used. (I'm not sure the UT is from the actual run. I guess not.)
Why are the changes needed?
The memory issue (OOME) is reported by both JIRA issue and user mailing list.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
The manual test leverages the simple apps which continuously writes the file stream sink metadata log.
HeartSaVioR/spark-delegation-token-experiment@bea7680
The test is configured to have a batch metadata log file at 1.9M (10,000 entries) whereas other Spark configuration is set to the default. (compact interval = 10) The app runs as driver, and the heap memory on driver is set to 3g.
It only ran for 40 mins, with the latest compact batch file size as 1.3G. The process struggled with GC, and after some struggling, it threw OOME.
It sustained 2 hours run (manually stopped as it's expected to run more), with the latest compact batch file size as 2.2G. The actual memory usage didn't even go up to 1.2G, and be cleaned up soon without outstanding GC activity.