Skip to content

Commit 708d11f

Browse files
authored
Ensure that releasing listener is called
When sending a response to a client, we attach a releasing listener to the channel promise. If the client disappears before the response is sent, the releasing listener was never notified. The reason the listeners were never notified was due to a mistaken invocation of write and flush on the channel which has two overrides: one that takes an existing promise, and one that does not and instead creates a new promise. When the client disappears, it is this latter promise that is notified, which does not contain the releasing listener. This commit addreses this issue by invoking the override that passes our channel promise through. Relates #23310
1 parent 6f1ed8a commit 708d11f

File tree

8 files changed

+58
-31
lines changed

8 files changed

+58
-31
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,19 +127,20 @@ public void sendResponse(RestResponse response) {
127127

128128
if (release) {
129129
promise.addListener(f -> ((Releasable)content).close());
130-
release = false;
131130
}
132131

133132
if (isCloseConnection()) {
134133
promise.addListener(ChannelFutureListener.CLOSE);
135134
}
136135

136+
final Object msg;
137137
if (pipelinedRequest != null) {
138-
channel.writeAndFlush(pipelinedRequest.createHttpResponse(resp, promise));
138+
msg = pipelinedRequest.createHttpResponse(resp);
139139
} else {
140-
channel.writeAndFlush(resp, promise);
140+
msg = resp;
141141
}
142-
142+
channel.writeAndFlush(msg, promise);
143+
release = false;
143144
} finally {
144145
if (release) {
145146
((Releasable) content).close();

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
7070
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
7171
import org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler;
72-
import org.elasticsearch.monitor.jvm.JvmInfo;
7372
import org.elasticsearch.rest.RestChannel;
7473
import org.elasticsearch.rest.RestRequest;
7574
import org.elasticsearch.rest.RestUtils;

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipelinedRequest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919

2020
package org.elasticsearch.http.netty4.pipelining;
2121

22-
import io.netty.channel.ChannelPromise;
23-
import io.netty.handler.codec.http.FullHttpRequest;
2422
import io.netty.handler.codec.http.FullHttpResponse;
25-
import io.netty.handler.codec.http.HttpResponse;
2623
import io.netty.handler.codec.http.LastHttpContent;
2724
import io.netty.util.ReferenceCounted;
2825

@@ -36,7 +33,7 @@ public class HttpPipelinedRequest implements ReferenceCounted {
3633
private final int sequence;
3734

3835

39-
HttpPipelinedRequest(final LastHttpContent last, final int sequence) {
36+
public HttpPipelinedRequest(final LastHttpContent last, final int sequence) {
4037
this.last = last;
4138
this.sequence = sequence;
4239
}
@@ -45,8 +42,8 @@ public LastHttpContent last() {
4542
return last;
4643
}
4744

48-
public HttpPipelinedResponse createHttpResponse(final FullHttpResponse response, final ChannelPromise promise) {
49-
return new HttpPipelinedResponse(response, promise, sequence);
45+
public HttpPipelinedResponse createHttpResponse(final FullHttpResponse response) {
46+
return new HttpPipelinedResponse(response, sequence);
5047
}
5148

5249
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipelinedResponse.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,23 @@
1919
* under the License.
2020
*/
2121

22-
import io.netty.channel.ChannelPromise;
2322
import io.netty.handler.codec.http.FullHttpResponse;
24-
import io.netty.handler.codec.http.HttpResponse;
2523
import io.netty.util.ReferenceCounted;
2624

2725
class HttpPipelinedResponse implements Comparable<HttpPipelinedResponse>, ReferenceCounted {
2826

2927
private final FullHttpResponse response;
30-
private final ChannelPromise promise;
3128
private final int sequence;
3229

33-
HttpPipelinedResponse(FullHttpResponse response, ChannelPromise promise, int sequence) {
30+
HttpPipelinedResponse(FullHttpResponse response, int sequence) {
3431
this.response = response;
35-
this.promise = promise;
3632
this.sequence = sequence;
3733
}
3834

3935
public FullHttpResponse response() {
4036
return response;
4137
}
4238

43-
public ChannelPromise promise() {
44-
return promise;
45-
}
46-
4739
public int sequence() {
4840
return sequence;
4941
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
import io.netty.channel.ChannelHandlerContext;
2525
import io.netty.channel.ChannelPromise;
2626
import io.netty.handler.codec.http.LastHttpContent;
27-
import io.netty.util.ReferenceCountUtil;
28-
import org.elasticsearch.action.termvectors.TermVectorsFilter;
29-
import org.elasticsearch.common.SuppressForbidden;
3027
import org.elasticsearch.transport.netty4.Netty4Utils;
3128

3229
import java.util.Collections;
@@ -84,7 +81,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
8481
break;
8582
}
8683
holdingQueue.remove();
87-
ctx.write(response.response(), response.promise());
84+
ctx.write(response.response(), promise);
8885
writeSequence++;
8986
}
9087
} else {

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,19 @@
4141
import io.netty.handler.codec.http.HttpVersion;
4242
import io.netty.util.Attribute;
4343
import io.netty.util.AttributeKey;
44-
4544
import org.elasticsearch.common.bytes.BytesReference;
45+
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
46+
import org.elasticsearch.common.lease.Releasable;
4647
import org.elasticsearch.common.network.NetworkService;
4748
import org.elasticsearch.common.settings.Settings;
49+
import org.elasticsearch.common.util.BigArrays;
50+
import org.elasticsearch.common.util.ByteArray;
4851
import org.elasticsearch.common.util.MockBigArrays;
52+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4953
import org.elasticsearch.http.HttpTransportSettings;
5054
import org.elasticsearch.http.NullDispatcher;
5155
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
56+
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
5257
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
5358
import org.elasticsearch.rest.RestResponse;
5459
import org.elasticsearch.rest.RestStatus;
@@ -59,6 +64,7 @@
5964
import org.junit.After;
6065
import org.junit.Before;
6166

67+
import java.io.UnsupportedEncodingException;
6268
import java.net.SocketAddress;
6369
import java.nio.charset.StandardCharsets;
6470
import java.util.ArrayList;
@@ -70,6 +76,7 @@
7076
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN;
7177
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
7278
import static org.hamcrest.Matchers.equalTo;
79+
import static org.hamcrest.Matchers.instanceOf;
7380
import static org.hamcrest.Matchers.is;
7481
import static org.hamcrest.Matchers.notNullValue;
7582
import static org.hamcrest.Matchers.nullValue;
@@ -217,6 +224,25 @@ public void testHeadersSet() {
217224
}
218225
}
219226

227+
public void testReleaseOnSendToClosedChannel() {
228+
final Settings settings = Settings.builder().build();
229+
final NamedXContentRegistry registry = xContentRegistry();
230+
try (Netty4HttpServerTransport httpServerTransport =
231+
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, registry, new NullDispatcher())) {
232+
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
233+
final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
234+
final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel);
235+
final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null;
236+
final Netty4HttpChannel channel =
237+
new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, randomBoolean(), threadPool.getThreadContext());
238+
final TestResponse response = new TestResponse(bigArrays);
239+
assertThat(response.content(), instanceOf(Releasable.class));
240+
embeddedChannel.close();
241+
channel.sendResponse(response);
242+
// ESTestCase#after will invoke ensureAllArraysAreReleased which will fail if the response content was not released
243+
}
244+
}
245+
220246
public void testConnectionClose() throws Exception {
221247
final Settings settings = Settings.builder().build();
222248
try (Netty4HttpServerTransport httpServerTransport =
@@ -508,14 +534,32 @@ List<Object> getWrittenObjects() {
508534

509535
private static class TestResponse extends RestResponse {
510536

537+
private final BytesReference reference;
538+
539+
TestResponse() {
540+
reference = Netty4Utils.toBytesReference(Unpooled.copiedBuffer("content", StandardCharsets.UTF_8));
541+
}
542+
543+
TestResponse(final BigArrays bigArrays) {
544+
final byte[] bytes;
545+
try {
546+
bytes = "content".getBytes("UTF-8");
547+
} catch (final UnsupportedEncodingException e) {
548+
throw new AssertionError(e);
549+
}
550+
final ByteArray bigArray = bigArrays.newByteArray(bytes.length);
551+
bigArray.set(0, bytes, 0, bytes.length);
552+
reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length);
553+
}
554+
511555
@Override
512556
public String contentType() {
513557
return "text";
514558
}
515559

516560
@Override
517561
public BytesReference content() {
518-
return Netty4Utils.toBytesReference(Unpooled.copiedBuffer("content", StandardCharsets.UTF_8));
562+
return reference;
519563
}
520564

521565
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void run() {
256256
}
257257

258258
if (pipelinedRequest != null) {
259-
ctx.writeAndFlush(pipelinedRequest.createHttpResponse(httpResponse, ctx.channel().newPromise()));
259+
ctx.writeAndFlush(pipelinedRequest.createHttpResponse(httpResponse));
260260
} else {
261261
ctx.writeAndFlush(httpResponse);
262262
}

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/pipelining/Netty4HttpPipeliningHandlerTests.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.netty.buffer.ByteBufUtil;
2424
import io.netty.buffer.Unpooled;
2525
import io.netty.channel.ChannelHandlerContext;
26-
import io.netty.channel.ChannelPromise;
2726
import io.netty.channel.SimpleChannelInboundHandler;
2827
import io.netty.channel.embedded.EmbeddedChannel;
2928
import io.netty.handler.codec.http.DefaultFullHttpRequest;
@@ -37,7 +36,6 @@
3736
import io.netty.handler.codec.http.LastHttpContent;
3837
import io.netty.handler.codec.http.QueryStringDecoder;
3938
import org.elasticsearch.common.Randomness;
40-
import org.elasticsearch.common.util.concurrent.CountDown;
4139
import org.elasticsearch.test.ESTestCase;
4240
import org.junit.After;
4341

@@ -248,8 +246,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpPipelined
248246
executorService.submit(() -> {
249247
try {
250248
waitingLatch.await(1000, TimeUnit.SECONDS);
251-
final ChannelPromise promise = ctx.newPromise();
252-
ctx.write(pipelinedRequest.createHttpResponse(httpResponse, promise), promise);
249+
ctx.write(pipelinedRequest.createHttpResponse(httpResponse), ctx.newPromise());
253250
finishingLatch.countDown();
254251
} catch (InterruptedException e) {
255252
fail(e.toString());

0 commit comments

Comments
 (0)