-
Notifications
You must be signed in to change notification settings - Fork 26.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix memory leak of tri protocol #13973
Conversation
@icodening LGTM |
Code Review Agent Run Status
Code Review Overview
>>See detailed code suggestions<< High-level FeedbackGeneral feedback for improvement includes ensuring comprehensive unit tests are added to cover the new code paths introduced, especially around the new exception handling logic and stream closure operations. Additionally, considering the integration of these changes across different components, verifying thread safety and potential race conditions around the state management (e.g., the closed flag in Http2ServerChannelObserver) would be crucial. |
public class CancelStreamException extends RuntimeException implements ErrorCodeHolder { | ||
|
||
private final boolean cancelByRemote; | ||
|
||
private final long errorCode; | ||
|
||
private CancelStreamException(boolean cancelByRemote, long errorCode) { | ||
this.cancelByRemote = cancelByRemote; | ||
this.errorCode = errorCode; | ||
} | ||
|
||
public boolean isCancelByRemote() { | ||
return cancelByRemote; | ||
} | ||
|
||
public static CancelStreamException fromRemote(long errorCode) { | ||
return new CancelStreamException(true, errorCode); | ||
} | ||
|
||
public static CancelStreamException fromLocal(long errorCode) { | ||
return new CancelStreamException(false, errorCode); | ||
} | ||
|
||
@Override | ||
public long getErrorCode() { | ||
return errorCode; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Consider making CancelStreamException a checked exception to enforce explicit handling of stream cancellation scenarios, promoting safer code practices.
Code Suggestion:
-public class CancelStreamException extends RuntimeException implements ErrorCodeHolder {
+public class CancelStreamException extends Exception implements ErrorCodeHolder {
private boolean closed = false; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Ensure thread safety for the "closed" flag access and updates, considering possible concurrent modifications.
Code Suggestion:
+import java.util.concurrent.atomic.AtomicBoolean;
-private boolean closed = false;
+private AtomicBoolean closed = new AtomicBoolean(false);
if (throwable instanceof CancelStreamException) { | ||
if (((CancelStreamException) throwable).isCancelByRemote()) { | ||
closed = true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Refactor the condition inside the cancel method to reduce nesting and improve readability.
Code Suggestion:
+if (throwable instanceof CancelStreamException && ((CancelStreamException) throwable).isCancelByRemote()) {
- if (((CancelStreamException) throwable).isCancelByRemote()) {
- closed = true;
+ closed.set(true);
- }
Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); | ||
ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); | ||
pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Add error handling for the closeFuture listener to gracefully manage potential exceptions during stream closure.
Code Suggestion:
+ctx.channel().closeFuture().addListener(future -> {
+ try {
+ http2TransportListener.onStreamClosed();
+ } catch (Exception e) {
+ // Log or handle the exception
+ }
+});
public void onStreamClosed() { | ||
// doing on event loop thread | ||
getStreamingDecoder().close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Validate the state of getStreamingDecoder() before invoking close() to avoid potential NullPointerException.
Code Suggestion:
+if (getStreamingDecoder() != null) {
+ getStreamingDecoder().close();
+}
Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); | ||
ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); | ||
pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scalability Issue: The addition of a closeFuture listener to handle stream closure can potentially create a scalability issue. If there are many instances of Http2TransportListener being created and not properly removed, it could lead to a memory leak as each listener holds a reference to the Http2TransportListener instance. This is particularly concerning in a high-throughput environment where connections are frequently opened and closed.
Fix: Implement a mechanism to ensure that these listeners are removed or deregistered when no longer needed, or when the associated Channel is closed. This could involve keeping track of listeners and explicitly removing them, or using weak references.
Code Suggestion:
ctx.channel().closeFuture().addListener(future -> {
http2TransportListener.onStreamClosed();
listeners.remove(http2TransportListener);
});
if (throwable instanceof CancelStreamException) { | ||
if (((CancelStreamException) throwable).isCancelByRemote()) { | ||
closed = true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization Issue: The conditional block checks if the throwable is an instance of CancelStreamException and if it is cancelled by remote, then sets the 'closed' flag to true. However, there is no mechanism to ensure that resources associated with the channel are released or cleaned up, potentially leading to resource leaks.
Fix: Ensure that all resources associated with the channel are properly released or cleaned up when the channel is closed to prevent resource leaks.
Code Suggestion:
+ if (closed) {
+ releaseAssociatedResources(); // Implement this method to clean up resources
+ }
if (closed) { | ||
return; | ||
} | ||
super.onNext(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization Issue: The onNext method checks if the channel is closed and returns early if it is. This could lead to missed data processing if the close flag is set prematurely. Additionally, there is no handling for the case where data arrives after the channel is marked as closed but before it is actually closed.
Fix: Implement a more robust mechanism for handling data that arrives after the channel is marked as closed but before it is actually closed. Consider queueing such data for processing or handling it in a way that ensures no data is lost.
Code Suggestion:
+ private final ConcurrentLinkedQueue<Object> pendingData = new ConcurrentLinkedQueue<>();
+ if (closed) {
+ pendingData.offer(data);
+ return;
+ }
+ while (!pendingData.isEmpty()) {
+ Object nextData = pendingData.poll();
+ super.onNext(nextData);
+ }
if (closed) { | ||
return; | ||
} | ||
super.onError(throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization Issue: Similar to onNext, the onError method returns early if the channel is closed, potentially missing important error handling steps. This could lead to unhandled exceptions or errors that occur after the channel is marked as closed but before it is actually closed.
Fix: Ensure that errors occurring after the channel is marked as closed but before it is actually closed are properly logged or handled to prevent unhandled exceptions.
Code Suggestion:
+ if (closed) {
+ logError(throwable); // Implement this method to log or handle error
+ return;
+ }
Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); | ||
ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); | ||
pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization Issue: Registering a listener for every channel close event inside channelRead0 might lead to a memory leak if not properly handled. This is because every time a message is read, a new listener is added to the closeFuture of the channel, potentially creating a large number of listeners for the same event, especially under high load.
Fix: Consider maintaining a single global listener for the close event per channel, or ensure that listeners are properly removed after being triggered to avoid potential memory leaks.
Code Suggestion:
private static final ChannelFutureListener CLOSE_LISTENER = future -> {
// cleanup resources
};
// During initialization
channel.closeFuture().addListener(CLOSE_LISTENER);
Quality Gate passedIssues Measures |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## 3.3 #13973 +/- ##
==========================================
+ Coverage 38.55% 39.09% +0.54%
==========================================
Files 1895 1730 -165
Lines 79272 75284 -3988
Branches 11528 11093 -435
==========================================
- Hits 30560 29433 -1127
+ Misses 44439 41701 -2738
+ Partials 4273 4150 -123 ☔ View full report in Codecov by Sentry. |
Quality Gate passedIssues Measures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change
fix memory leak of tri protocol
Brief changelog
add streamclose listener to release netty bytebuf
Verifying this change
Checklist