Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx);
assert state == QUEUEING_DATA;
assert ctx.channel().config().isAutoRead() == false;
break;
case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject));
Expand All @@ -76,14 +77,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
// fall-through
ReferenceCountUtil.release(httpObject);
break;
case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing
ctx.channel().config().setAutoRead(false);
break;
}

setAutoReadForState(ctx, state);
}

private void requestStart(ChannelHandlerContext ctx) {
Expand All @@ -104,6 +105,7 @@ private void requestStart(ChannelHandlerContext ctx) {
}

state = QUEUEING_DATA;
ctx.channel().config().setAutoRead(false);

if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
Expand Down Expand Up @@ -149,6 +151,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;

ctx.channel().config().setAutoRead(true);
boolean fullRequestForwarded = forwardData(ctx, pending);

assert fullRequestForwarded || pending.isEmpty();
Expand All @@ -160,7 +163,6 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
Expand All @@ -176,6 +178,8 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
}
messageToForward.setDecoderResult(DecoderResult.failure(e));

ctx.channel().config().setAutoRead(true);
ctx.fireChannelRead(messageToForward);

assert fullRequestDropped || pending.isEmpty();
Expand All @@ -187,7 +191,6 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

@Override
Expand Down Expand Up @@ -243,10 +246,6 @@ private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject>
}
}

private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
}

enum State {
WAITING_TO_START,
QUEUEING_DATA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,36 @@ public void testValidationPausesAndResumesData() {
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
}

public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));

final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(request);
channel.writeInbound(content);

assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());

// channel is resumed
listener.get().onResponse(null);
channel.runPendingTasks();

assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
assertThat(channel.readInbound(), sameInstance(request));
assertThat(channel.readInbound(), sameInstance(content));
assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(1));
channel.config().setAutoRead(false);

channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4)));
assertFalse(channel.config().isAutoRead());
}

public void testContentForwardedAfterValidation() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
Expand Down