Skip to content

Conversation

@turboFei
Copy link
Member

@turboFei turboFei commented Feb 22, 2024

What changes were proposed in this pull request?

Refer: SPARK-28160 / apache/spark#24964
ByteBuffer.allocate may throw OutOfMemoryError when the response is large but no enough memory is available. However, when this happens, TransportClient.sendRpcSync will just hang forever if the timeout set to unlimited.

Why are the changes needed?

To catch the exception of ByteBuffer.allocate in corner case.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Quote the local test in apache/spark#24964

I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected IllegalArgumentException will be caught.

      @Override
      public void onSuccess(ByteBuffer response) {
        try {
          int size = response.remaining();
          ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug
          copy.put(response);
          // flip "copy" to make it readable
          copy.flip();
          result.set(copy);
        } catch (Throwable t) {
          result.setException(t);
        }
      }

@turboFei turboFei force-pushed the fix_transport_client_onsucess branch from b82be88 to bf9d308 Compare February 22, 2024 07:13
@codecov
Copy link

codecov bot commented Feb 22, 2024

Codecov Report

Attention: 3 lines in your changes are missing coverage. Please review.

Comparison is base (2523769) 48.69% compared to head (bf9d308) 48.68%.
Report is 1 commits behind head on main.

Files Patch % Lines
...eleborn/common/network/client/TransportClient.java 62.50% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2316      +/-   ##
==========================================
- Coverage   48.69%   48.68%   -0.00%     
==========================================
  Files         209      209              
  Lines       12940    12944       +4     
  Branches     1119     1119              
==========================================
+ Hits         6300     6301       +1     
- Misses       6233     6236       +3     
  Partials      407      407              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@AngersZhuuuu AngersZhuuuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cfmcgrady
Copy link
Contributor

TransportClient.sendRpcSync will just hang forever if the timeout set to unlimited.

  1. After conducting a search of the codebase, I have found that all calls to the TransportClient.sendRpcSync function have included a timeout, rather than using the value of Int.MaxValue.

  2. Should all blocks that call ByteBuffer.allocate() within the Callback handle exceptions? Could you please review the codebase to ensure that similar issues are handled appropriately?

https://github.com/apache/incubator-celeborn/blob/e804798088237c3d2ca6dca25454c117199c4977/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala#L296-L331

@cfmcgrady
Copy link
Contributor

cc @mridulm

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.
For reference, this is fixed in spark in SPARK-28160

@turboFei
Copy link
Member Author

  • After conducting a search of the codebase, I have found that all calls to the TransportClient.sendRpcSync function have included a timeout, rather than using the value of Int.MaxValue.
  1. in case the timeout is set too long
  2. not sure, for the case in the PR, SettableFuture is used, we catch the exception and set the result exception to complete the future.

@cfmcgrady
Copy link
Contributor

2. not sure, for the case in the PR, SettableFuture is used, we catch the exception and set the result exception to complete the future.

You are correct, I had a misunderstanding previously.

@cfmcgrady
Copy link
Contributor

thanks, merging to main(v0.5.0)/branch-0.4(v0.4.1)/branch-0.3(0.3.3).

@cfmcgrady cfmcgrady closed this in 387bffc Feb 23, 2024
cfmcgrady pushed a commit that referenced this pull request Feb 23, 2024
…d exception missed

### What changes were proposed in this pull request?
Refer: [SPARK-28160](https://issues.apache.org/jira/browse/SPARK-28160) / apache/spark#24964
ByteBuffer.allocate may throw OutOfMemoryError when the response is large but no enough memory is available. However, when this happens, TransportClient.sendRpcSync will just hang forever if the timeout set to unlimited.

### Why are the changes needed?
To catch the exception of `ByteBuffer.allocate` in corner case.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Quote the local test in apache/spark#24964
```
I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected IllegalArgumentException will be caught.

      Override
      public void onSuccess(ByteBuffer response) {
        try {
          int size = response.remaining();
          ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug
          copy.put(response);
          // flip "copy" to make it readable
          copy.flip();
          result.set(copy);
        } catch (Throwable t) {
          result.setException(t);
        }
      }
```

Closes #2316 from turboFei/fix_transport_client_onsucess.

Authored-by: Fei Wang <[email protected]>
Signed-off-by: chenfu <[email protected]>
(cherry picked from commit 387bffc)
Signed-off-by: chenfu <[email protected]>
cfmcgrady pushed a commit that referenced this pull request Feb 23, 2024
…d exception missed

### What changes were proposed in this pull request?
Refer: [SPARK-28160](https://issues.apache.org/jira/browse/SPARK-28160) / apache/spark#24964
ByteBuffer.allocate may throw OutOfMemoryError when the response is large but no enough memory is available. However, when this happens, TransportClient.sendRpcSync will just hang forever if the timeout set to unlimited.

### Why are the changes needed?
To catch the exception of `ByteBuffer.allocate` in corner case.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Quote the local test in apache/spark#24964
```
I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected IllegalArgumentException will be caught.

      Override
      public void onSuccess(ByteBuffer response) {
        try {
          int size = response.remaining();
          ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug
          copy.put(response);
          // flip "copy" to make it readable
          copy.flip();
          result.set(copy);
        } catch (Throwable t) {
          result.setException(t);
        }
      }
```

Closes #2316 from turboFei/fix_transport_client_onsucess.

Authored-by: Fei Wang <[email protected]>
Signed-off-by: chenfu <[email protected]>
(cherry picked from commit 387bffc)
Signed-off-by: chenfu <[email protected]>
@turboFei turboFei deleted the fix_transport_client_onsucess branch February 23, 2024 02:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants