Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix netty leak #3127

Merged
merged 2 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,25 @@ public ServiceProfilerClient(URL hostUrl, String instrumentationKey, HttpPipelin
public Mono<BlobAccessPass> getUploadAccess(UUID profileId, String extension) {
URL requestUrl = uploadRequestUri(profileId, extension);

return executePostWithRedirect(requestUrl)
.map(
response -> {
if (response.getStatusCode() >= 300) {
throw new HttpResponseException(response);
}
String location = response.getHeaderValue("Location");
if (location == null || location.isEmpty()) {
throw new AssertionError("response did not have a location");
}
return new BlobAccessPass(null, location, null);
});
return executePostWithRedirect(requestUrl).map(ServiceProfilerClient::getUploadAccess);
}

private static BlobAccessPass getUploadAccess(HttpResponse response) {
try {
if (response.getStatusCode() >= 300) {
throw new HttpResponseException(response);
}
String location = response.getHeaderValue("Location");
if (location == null || location.isEmpty()) {
throw new AssertionError("response did not have a location");
}
return new BlobAccessPass(null, location, null);
} finally {
// need to consume the body or close the response, otherwise get netty ByteBuf leak warnings:
// io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before
// it's garbage-collected (see https://github.com/Azure/azure-sdk-for-java/issues/10467)
response.close();
}
}

public Mono<HttpResponse> executePostWithRedirect(URL requestUrl) {
Expand All @@ -94,21 +101,7 @@ public Mono<ArtifactAcceptedResponse> reportUploadFinish(
URL requestUrl = uploadFinishedRequestUrl(profileId, extension, etag);

return executePostWithRedirect(requestUrl)
.flatMap(
response -> {
if (response == null) {
// this shouldn't happen, the mono should complete with a response or a failure
return Mono.error(new AssertionError("http response mono returned empty"));
}

int statusCode = response.getStatusCode();
if (statusCode != 201 && statusCode != 202) {
logger.error("Trace upload failed: {}", statusCode);
return Mono.error(new AssertionError("http request failed"));
}

return response.getBodyAsString();
})
.flatMap(ServiceProfilerClient::reportUploadFinish)
.flatMap(
json -> {
if (json == null) {
Expand All @@ -128,33 +121,55 @@ public Mono<ArtifactAcceptedResponse> reportUploadFinish(
});
}

private static Mono<String> reportUploadFinish(HttpResponse response) {
if (response == null) {
// this shouldn't happen, the mono should complete with a response or a failure
return Mono.error(new AssertionError("http response mono returned empty"));
}
try {
int statusCode = response.getStatusCode();
if (statusCode != 201 && statusCode != 202) {
logger.error("Trace upload failed: {}", statusCode);
return Mono.error(new AssertionError("http request failed"));
}
return response.getBodyAsString();
} finally {
// need to consume the body or close the response, otherwise get netty ByteBuf leak warnings:
// io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before
// it's garbage-collected (see https://github.com/Azure/azure-sdk-for-java/issues/10467)
response.close();
}
}

/** Obtain current settings that have been configured within the UI. */
public Mono<ProfilerConfiguration> getSettings(Date oldTimeStamp) {

URL requestUrl = getSettingsPath(oldTimeStamp);

HttpRequest request = new HttpRequest(HttpMethod.GET, requestUrl);

return httpPipeline
.send(request)
return httpPipeline.send(request).flatMap(response -> handle(response, requestUrl));
}

private static Mono<ProfilerConfiguration> handle(HttpResponse response, URL requestUrl) {
if (response.getStatusCode() >= 300) {
// need to consume the body or close the response, otherwise get netty ByteBuf leak warnings:
// io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before
// it's garbage-collected (see https://github.com/Azure/azure-sdk-for-java/issues/10467)
response.close();
return Mono.error(
new HttpResponseException(
"Received error code " + response.getStatusCode() + " from " + requestUrl, response));
}
return response
.getBodyAsString()
.flatMap(
response -> {
if (response.getStatusCode() >= 300) {
return Mono.error(
new HttpResponseException(
"Received error code " + response.getStatusCode() + " from " + requestUrl,
response));
body -> {
try {
return Mono.just(mapper.readValue(body, ProfilerConfiguration.class));
} catch (IOException e) {
return Mono.error(e);
}
return response
.getBodyAsString()
.flatMap(
body -> {
try {
return Mono.just(mapper.readValue(body, ProfilerConfiguration.class));
} catch (IOException e) {
return Mono.error(e);
}
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ t, getQuickPulseEndpoint(), friendlyExceptionThrown, logger)) {
}
} finally {
if (response != null) {
// need to consume the body or close the response, otherwise get netty ByteBuf leak
// warnings:
// io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before
// it's garbage-collected (see https://github.com/Azure/azure-sdk-for-java/issues/10467)
response.close();
}
}
Expand Down