diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index ad322503b0d06..a07f1cc9e3386 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -60,6 +60,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception pending.add(ReferenceCountUtil.retain(httpObject)); requestStart(ctx); assert state == QUEUEING_DATA; + assert ctx.channel().config().isAutoRead() == false; break; case QUEUEING_DATA: pending.add(ReferenceCountUtil.retain(httpObject)); @@ -76,14 +77,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (httpObject instanceof LastHttpContent) { state = WAITING_TO_START; } - // fall-through + ReferenceCountUtil.release(httpObject); + break; case DROPPING_DATA_PERMANENTLY: assert pending.isEmpty(); ReferenceCountUtil.release(httpObject); // consume without enqueuing + ctx.channel().config().setAutoRead(false); break; } - - setAutoReadForState(ctx, state); } private void requestStart(ChannelHandlerContext ctx) { @@ -104,6 +105,7 @@ private void requestStart(ChannelHandlerContext ctx) { } state = QUEUEING_DATA; + ctx.channel().config().setAutoRead(false); if (httpRequest == null) { // this looks like a malformed request and will forward without validation @@ -149,6 +151,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) { assert ctx.channel().config().isAutoRead() == false; assert state == QUEUEING_DATA; + ctx.channel().config().setAutoRead(true); boolean fullRequestForwarded = forwardData(ctx, pending); assert fullRequestForwarded || pending.isEmpty(); @@ -160,7 +163,6 @@ private void forwardFullRequest(ChannelHandlerContext ctx) { } assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST; - setAutoReadForState(ctx, state); } private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) { @@ -176,6 +178,8 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER); } messageToForward.setDecoderResult(DecoderResult.failure(e)); + + ctx.channel().config().setAutoRead(true); ctx.fireChannelRead(messageToForward); assert fullRequestDropped || pending.isEmpty(); @@ -187,7 +191,6 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex } assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST; - setAutoReadForState(ctx, state); } @Override @@ -243,10 +246,6 @@ private static void maybeResizePendingDown(int largeSize, ArrayDeque } } - private static void setAutoReadForState(ChannelHandlerContext ctx, State state) { - ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false); - } - enum State { WAITING_TO_START, QUEUEING_DATA, diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java index e8622f2c95c2c..d395a3d18e2fd 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java @@ -116,6 +116,36 @@ public void testValidationPausesAndResumesData() { assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); } + public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request); + channel.writeInbound(content); + + assertThat(header.get(), sameInstance(request)); + // channel is paused + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + // channel is resumed + listener.get().onResponse(null); + channel.runPendingTasks(); + + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); + assertThat(channel.readInbound(), sameInstance(request)); + assertThat(channel.readInbound(), sameInstance(content)); + assertThat(channel.readInbound(), nullValue()); + assertThat(content.refCnt(), equalTo(1)); + channel.config().setAutoRead(false); + + channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4))); + assertFalse(channel.config().isAutoRead()); + } + public void testContentForwardedAfterValidation() { assertTrue(channel.config().isAutoRead()); assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));