-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed #24964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @zsxwing |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems OK to me; are there other callbacks that might have the same problem?
|
@srowen , I only find one place in public Future<Integer> removeBlocks(
String host,
int port,
String execId,
String[] blockIds) throws IOException, InterruptedException {
checkInit();
CompletableFuture<Integer> numRemovedBlocksFuture = new CompletableFuture<>();
ByteBuffer removeBlocksMessage = new RemoveBlocks(appId, execId, blockIds).toByteBuffer();
final TransportClient client = clientFactory.createClient(host, port);
client.sendRpc(removeBlocksMessage, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
numRemovedBlocksFuture.complete(((BlocksRemoved)msgObj).numRemovedBlocks);
client.close();
}I prefer to change to below code since @Override
public void onSuccess(ByteBuffer response) {
try {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks);
} catch (Exception e) {
logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
" via external shuffle service from executor: " + execId, e);
numRemovedBlocksFuture.complete(0);
} finally {
client.close();
}
} |
|
Should I fix above code if needed in this PR or file a new one? |
|
I think you can fix the similar issue here, and update the title/description. |
| try { | ||
| BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); | ||
| numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to catch Throwable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven’t seen any error could occur here, so I use Exception. Throwable of course is fine.
| // flip "copy" to make it readable | ||
| copy.flip(); | ||
| result.set(copy); | ||
| } catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to log the throwable here just for completeness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a warning log.
|
Gentle ping @srowen |
|
Test build #4813 has finished for PR 24964 at commit
|
…checked exception missed This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
…checked exception missed This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
|
Merged to master/2.4/2.3 |
…checked exception missed ## What changes were proposed in this pull request? This is very like apache#23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. ## How was this patch tested? I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes apache#24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…checked exception missed This is very like apache#23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes apache#24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
…checked exception missed This is very like apache#23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes apache#24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
…d exception missed ### What changes were proposed in this pull request? Refer: [SPARK-28160](https://issues.apache.org/jira/browse/SPARK-28160) / apache/spark#24964 ByteBuffer.allocate may throw OutOfMemoryError when the response is large but no enough memory is available. However, when this happens, TransportClient.sendRpcSync will just hang forever if the timeout set to unlimited. ### Why are the changes needed? To catch the exception of `ByteBuffer.allocate` in corner case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Quote the local test in apache/spark#24964 ``` I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected IllegalArgumentException will be caught. Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #2316 from turboFei/fix_transport_client_onsucess. Authored-by: Fei Wang <[email protected]> Signed-off-by: chenfu <[email protected]>
…d exception missed ### What changes were proposed in this pull request? Refer: [SPARK-28160](https://issues.apache.org/jira/browse/SPARK-28160) / apache/spark#24964 ByteBuffer.allocate may throw OutOfMemoryError when the response is large but no enough memory is available. However, when this happens, TransportClient.sendRpcSync will just hang forever if the timeout set to unlimited. ### Why are the changes needed? To catch the exception of `ByteBuffer.allocate` in corner case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Quote the local test in apache/spark#24964 ``` I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected IllegalArgumentException will be caught. Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #2316 from turboFei/fix_transport_client_onsucess. Authored-by: Fei Wang <[email protected]> Signed-off-by: chenfu <[email protected]> (cherry picked from commit 387bffc) Signed-off-by: chenfu <[email protected]>
…d exception missed ### What changes were proposed in this pull request? Refer: [SPARK-28160](https://issues.apache.org/jira/browse/SPARK-28160) / apache/spark#24964 ByteBuffer.allocate may throw OutOfMemoryError when the response is large but no enough memory is available. However, when this happens, TransportClient.sendRpcSync will just hang forever if the timeout set to unlimited. ### Why are the changes needed? To catch the exception of `ByteBuffer.allocate` in corner case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Quote the local test in apache/spark#24964 ``` I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected IllegalArgumentException will be caught. Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #2316 from turboFei/fix_transport_client_onsucess. Authored-by: Fei Wang <[email protected]> Signed-off-by: chenfu <[email protected]> (cherry picked from commit 387bffc) Signed-off-by: chenfu <[email protected]>
What changes were proposed in this pull request?
This is very like #23590 .
ByteBuffer.allocatemay throwOutOfMemoryErrorwhen the response is large but no enough memory is available. However, when this happens,TransportClient.sendRpcSyncwill just hang forever if the timeout set to unlimited.This PR catches
Throwableand uses the error to completeSettableFuture.How was this patch tested?
I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected
IllegalArgumentExceptionwill be caught.