-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-16570. S3A committers encounter scale issues #1442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-16570. S3A committers encounter scale issues #1442
Conversation
|
tested s3 ireland, ddb, auth |
5e8d7ce to
618e24c
Compare
|
Tested, s3a ireland. FWIW, I'm planning to backport the thread cleanup and extra asserts to branches -3.1 and 3.2; I'm not so sure about the thread checks if we need assertj to be added, but well, its an extra test-time dependency so not that troublesome. |
|
Note that the thread tests go beyond just asking the committers if they have a thread pool; they verify that there aren't any left around. If we like this, it could be expanded to other places, such as verifying the other FS clients are also being good citizens -especially for hive, which creates and destroys many instances |
618e24c to
1729122
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No real reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made non final
|
This patch is about to get bigger and more complicated, as when you get really big with the scale tests, you also discover that trying to keep a list of all pending commits in the job committer it's just a way to see OOM exception traces. I going to have to be more incremental about loading and committing files. |
25dde8f to
ddc3d8b
Compare
|
tested -s3a ireland w/ddb. not yet tested: all the way through spark |
05f2e8f to
2246490
Compare
|
latest test run -s3 ireland. There's a new unit test which with the current values takes 1 min; plan to cut the numbers back, just leaving as is to be confident that there's no scale problems with these values. I think I'll declare many more blocks per file. The slow parts of the test are actually
As that list process is the one for the staging committers, it is only listing the consistent cluster FS (i.e HDFS) so s3 perf won't matter. In real jobs the time to POST commits will dominate -and with that patch every pendingset file is loaded and processed in parallel |
|
🎊 +1 overall
This message was automatically generated. |
|
checkstyle warnings |
c671212 to
24ba90a
Compare
|
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not going to create the _SUCCESS marker because the file list is too large, why get all committed file names here? I think this should be inside whatever check maybeCreateSuccessMarker has to avoid the memory consumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we stop adding files to that pending.committed objects list once we reach an arbitrary threshold (SUCCESS_MARKER_FILE_LIMIT == 100), so we add that subset of entries. I'll clarify that in the comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unrelated. Why is it necessary to create the output path here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah, because sometimes we've had our terasort tests fail saying there's a tombstone at the far end, and I suspect it means that sometimes we somehow aren't getting that directory created. So I'm doing it preemptively
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does CommitContext no longer require close to be called? Because the Tasks call now handles all of the failure and abort cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it's because its the second of the two closeables; its still in the () section; it follows the duration one after the semicolon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this explicitly call throwFailureWhenFinished()? I typically call either that one or suppressFailures() so that it is obvious to the reader how the error is handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation doesn't have that throwFailureWhenFinished() call. I'll be explicit with suppressFailures here and elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failures for each failed pending set are handled in loadAndCommit so I don't think you need to handle failures here, just reverts and aborts.
|
Overall, looks good. The only problem I saw was loading all of the committed files for the _SUCCESS marker. Otherwise, I think everything should work, even if you end up aborting some failed files twice. |
let me review that. thanks for checking this over. |
24ba90a to
c34f3fe
Compare
|
just pushed up a rebased update; tries to address all of Ryans comments, plus I use Tasks to parallelise the partition deletion. If a job writes to multiple partitions this way -speed up. tested, s3 ireland with -Dparallel-tests -DtestsThreadCount=12 -Ds3guard -Ddynamo |
|
🎊 +1 overall
This message was automatically generated. |
ehiggs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Only nit was using a time unit on the config default.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public static final int THREAD_POOL_SHUTDOWN_DELAY = 30; | |
| public static final int THREAD_POOL_SHUTDOWN_DELAY_SECS = 30; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do. thanks
This patch explicitly shuts down the thread pool in job cleanup and after task commit, abort, job abort and job commit. The alternative strategy would to be to always destroy the threads in the same method they were used, but as two operations are normally parallelized back-to-back: listing pending .files and then committing or aborting them, retaining the pool is useful. And there isn't any close() method or similar in the OutputCommitter interface to place it. To test this, a probe for the committer having a thread pool was added, and the AbstractITCommitProtocol test extended to verify that there was no thread pool after the various commit and abort lifecycle operations. To verify that the tests themselves were valid, the destroyThreadPool() initially *did not* actually destroy the pool; the fact that the modified tests then all failed providesd evidence that all paths followed in those tests successfully cleaned up. Once the method did close the thread pool, all these failing tests passed. Change-Id: Ib2765d70aae2658535e07da268899d72824094f4 Note: I also switched to the HadoopExecutors thread pool factory; I considered moving to one of the caching thread pools but decided that I'd make this change simpler for ease of backport. For a trunk-only fix I'd consider asking the target S3A FS for its store context and creating a thread pool of it, which would just be a restricted fraction of the store's own pool.
Based on some hints from Kevin Risden, two test suites now verify that after the test run there are no unexpected threads -after we strip out a set of known-but-unstoppable threads we can expect to see. This is used in ITestS3AClosedFS to verify that an FS instance doesn't leak any threads -useful for future regression testing. For the S3A committer tests we only scan for outstanding committer pool tests; the rest are unimportant. Also -some comments/diags in the instrumentation class -s3a FS close() also explcitly ishuts down the threadpools. With a min size of 0 they'll eventually stop anyway, this just guarantees it happens faster. Change-Id: I2081e327ac8fb57eb38a3d119f02efce6232bad2
This avoids needing to store the entire list of files and thercommit information during job commit. Consequences * each task's .pendingset file is loaded in its own thread, but the files to commit listed inside the file are sequentially committed in that thread. I don't know what the performance consequences will be. * it's harder to abort things. Rather than abort all commits we know about by way of the files, I'm just going to abort all uploads under the destination path. We do that anyway as a fail safe. * the partitioned committer is going to have to load the files twice in "REPLACE" mode, the first time to identify the partitions being written to and then to delete their contents. I haven't written the change for the partitioned committer yet; i'll make sure the core commit process is working first. When I do implement it, I might spread the actual delete operations across the thread pool -as we know how long delete can take -don't we? Change-Id: Ife66eb020f6dc9c08ce9e1ea001e94ea91b28f86
-Incomplete reimplementation of revert semantics for staging committers -partitioned committer implements replace as a load-and-apply sequence Change-Id: If78520bfa1f7c12ed4c1a5be4d330bc923659224
…dingset files * there are pendingset-level commit/abort/revert operations to manage committing work and the (best effort) rolling back from failures. * The Tasks API is used within these operations to choreograph commit/abort/revert actions. However, no thread pool is currently created for that work. I didn't want to use the pool which schedules of the file loading, as deadlock would have been inevitable. A separate thread pool could be created. However, unless it was actually bigger than the current pool, there would be no extra parallelisation. One special case: there were only a few tasks but they generated many, many files. I'm not worrying about that. * The mock test of the committers have been reworked for this world, including explicitly creating and saving multiple .pendingset files to better stress the commit process. * I've also moved to AssertJ assertions while trying to debug mismatches between the expected and actual values. I'd split the blame for those failing tests equally between setting up the mock state and me getting revert and abort to work as the existing test cases expected. * some more detail in the Java docs to explain what is going on. Regarding the state of the patch, the tests all happy; now want to see what Yetus says. I also want to make PartitionedStagingCommitter.replacePartitions() do its read of all .pendingset files in parallel, so it can build up the list of partitions to replace a bit faster when there is the output of many tasks to process. I'm also actually wondering what it would take to use the MockS3AClient here across more tests. Currently it is good (and with this patch better) at Simulating incomplete multipart uploads -including with tracking of active uploads. We could probably expand this to model more of the final state of the store -for example actually simulating the persisted state of the store. Worth a thought -though it is probably a moving target. Change-Id: Idfa8198a920664f2fefe441d317b8e0fb681d368
This is a unit test, using mocking as a substitute for talking to S3; we are testing client side scale, not that of the store communications. Change-Id: Ide6dab0b5b08a845a88553b9085d0cf06426a7cb
Change-Id: I85a35d84f306e5f369eaacac0cff38febe1ccac0
…tify slow points Listing files is surprisingly slow. Theories * the listFiles() call is the wrong scan for local (and HDFS?) * over use of java 8 streams/maps, etc explore #2 and then worry about #1. We must stay with listFiles for the magic committers scans of s3, but for the staging committers, we just need to flat list the source dir with a filter Change-Id: I7e29b6004e71b146500a95c9822c5eed17390fb4
…t checks. the partitioned staging committer will do this while identifying parent directories if it needs to replace those partitions. Change-Id: I4f83eaafc244e92d5d937d3edb55c9dcc8b0e254
* explicitly call suppressExceptions() * remove onFailure handler in commitPendingUploads * explain why the active commit list of pending files doesn't overload the _SUCCESS file Also partitioned committer deletes partition paths in parallel for a bit more speed; minimumm one LIST/POST per directory, plus on s3guard some extra IO to DDB. Change-Id: I750f421e826f7df738149afeb04afd35a0d44d9b
Change-Id: I32b7475b16e1d5cec5bbd29932d4d70e3bf47d73
60d0679 to
ac41d9e
Compare
|
🎊 +1 overall
This message was automatically generated. |
|
rebasing against trunk HADOOP-16207 I'm getting some failures which I'm blaming on test setup code, such as the fact that temp dirs on forked runs are coming in wrong: I'll address here. |
|
Also filed: https://issues.apache.org/jira/browse/HADOOP-16632 The failed assertion was caused by a speculative task writing its .pending output file to its attempt directory after the job had completed. This is my first full trace what happens during a partition and I am pleased the actual output of the job was correct. We just can't prevent partitioned MR tasks from writing to the attempt directories after the job completes -and as there is a risk that pending uploads may be outstanding, document the need to have a life cycle rule to clean these up. Which people should have anyway. |
Change-Id: Iffef642be1f24b08c5e6369f2200327e8ad256e4
|
🎊 +1 overall
This message was automatically generated. |
|
ok, merged in. thank you for the reviews. |
This patch addresses scale issues
Thread pool leakage
explicitly shuts down the thread pool in job cleanup and after task commit, abort, job abort and job commit.
The alternative strategy would to be to always destroy the threads in the same method they were used, but as two operations are normally parallelized back-to-back: listing pending .files and then committing or aborting them, retaining the pool is useful. And there isn't any close() method or similar in the OutputCommitter interface to place it.
To test this, a probe for the committer having a thread pool was added, and the AbstractITCommitProtocol test extended to verify that there was no thread pool after the various commit and abort lifecycle operations.
To verify that the tests themselves were valid, the destroyThreadPool() initially did not actually destroy the pool; the fact that the modified tests then all failed providesd evidence that all paths followed in those tests successfully cleaned up. Once the method did close the thread pool, all these failing tests passed.
Note: I also switched to the HadoopExecutors thread pool factory; I considered moving to one of the caching thread pools but decided that I'd make this change simpler for ease of backport. For a trunk-only fix I'd consider asking the target S3A FS for its store context and creating a thread pool of it, which would just be a restricted fraction of the store's own pool.
OOM on job commit for jobs with many thousands of tasks, each generating tens of files.
Instead of loading all pending commits into memory as a single list, the list of files to load is the sole list which is passed around; .pendingset files are loaded and processed in isolation -and reloaded if necessary for any abort/rollback operation.
The parallel commit/abort/revert operations now work at the .pendingset level, rather than that of individual pending commit files. The existing parallelized Tasks API is still used to commit those files, but with a null thread pool, so as to serialize the operations.
This could slow down the commit operation in the following situations:-
The job will be blocked waiting for the largest tasks to complete.
I am not going to worry about these.