From a5b0fe0bd9900535d44113f1074fd6f311fee368 Mon Sep 17 00:00:00 2001 From: James Roper Date: Fri, 26 Jun 2015 14:10:33 +1000 Subject: [PATCH 1/2] Upgraded to Netty 4 In addition, made the following changes to the way the handler works: * Messages are buffered in a hash map by sequence ID if it's not the right time for the sequence. * Removed subsequence ordering - writes for a given request are required to be sequential without the pipelining handler, so with it we can assume the same. * Replaced buffer limit with back pressure - if the number of in flight requests exceeds a high water mark, the handler now exerts back pressure on the client by blocking read completed events, and unblocks them, and requests a new read, when a low watermark is reached. --- .gitignore | 21 - .travis.yml | 8 + README.md | 9 +- pom.xml | 12 +- .../pipelining/HttpPipeliningHandler.java | 269 +++++++--- .../OrderedDownstreamChannelEvent.java | 77 --- .../OrderedUpstreamMessageEvent.java | 25 - .../http/pipelining/SequencedHttpRequest.java | 50 ++ .../pipelining/SequencedOutboundMessage.java | 55 +++ .../pipelining/HttpPipeliningHandlerTest.java | 462 ++++++++++++------ 10 files changed, 620 insertions(+), 368 deletions(-) create mode 100644 .travis.yml delete mode 100644 src/main/java/com/typesafe/netty/http/pipelining/OrderedDownstreamChannelEvent.java delete mode 100644 src/main/java/com/typesafe/netty/http/pipelining/OrderedUpstreamMessageEvent.java create mode 100644 src/main/java/com/typesafe/netty/http/pipelining/SequencedHttpRequest.java create mode 100644 src/main/java/com/typesafe/netty/http/pipelining/SequencedOutboundMessage.java diff --git a/.gitignore b/.gitignore index 663eb67..eb5a316 100644 --- a/.gitignore +++ b/.gitignore @@ -1,22 +1 @@ -# Extracted from https://github.com/ulrich/macaron-factory/blob/master/.gitignore -# Ignore all dotfiles... -.* -# except for .gitignore -!.gitignore - -# Ignore Play! working directory # -db -eclipse -lib -log -logs -modules -precompiled -project/project -project/target target -tmp -test-result -server.pid -*.iml -*.eml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..7f420d7 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,8 @@ +language: java +jdk: + - oraclejdk8 + - openjdk7 +cache: + directories: + - $HOME/.m2 + diff --git a/README.md b/README.md index 7d07e22..dc34002 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,7 @@ netty-http-pipelining This library adds http pipelining capability to Netty. Inserting the HttpPipeliningHandler into a pipeline will cause message events containing an HttpRequest to become transformed -into OrderedUpstreamMessageEvents. The OrderedUpstreamMessageEvent retains context such that a handler further upstream can -compose it and reply with an OrderedDownstreamChannelEvent in any order and in parallel. The HttpPipeliningHandler will +into SequencedHttpRequest. The SequencedHttpRequest retains context such that a handler further upstream can +compose it and reply with an SequencedOutboundMessage in any order and in parallel. The HttpPipeliningHandler will ensure that http replies are sent back in the order that the http pipelining specification requires i.e. the order in which replies are returned must correlate to the order in which requests are made. - -The chunking of http replies is handled. Limits are also available within the handler to cap the buffering of replies -in order to avoid memory exhaustion. - -Please refer to the HttpPipeliningHandlerTest for a comprehensive illustration of usage. \ No newline at end of file diff --git a/pom.xml b/pom.xml index b7797ac..26f88e1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.typesafe.netty netty-http-pipelining - 1.1.5-SNAPSHOT + 2.0-SNAPSHOT ${project.groupId}:${project.artifactId} This library provides a handler and some new message types that enable http pipelining in Netty @@ -31,6 +31,12 @@ Typesafe http://typesafe.com + + James Roper + james@typesafe.com + Typesafe + http://typesafe.com + @@ -41,8 +47,8 @@ io.netty - netty - 3.10.1.Final + netty-codec-http + 4.0.29.Final junit diff --git a/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java b/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java index 30aee01..17879b3 100644 --- a/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java +++ b/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java @@ -1,110 +1,231 @@ package com.typesafe.netty.http.pipelining; -import org.jboss.netty.channel.*; -import org.jboss.netty.handler.codec.http.HttpRequest; + +import io.netty.channel.*; +import io.netty.handler.codec.http.*; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.IntObjectMap; import java.util.*; /** * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their - * corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will - * cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely - * OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects. + * corresponding requests. + * + * Each incoming request is assigned a sequence number, which is provided in the wrapping {@link SequencedHttpRequest}. + * When a handler writes a response and a response body, it must wrap each of those messages in a + * {@link SequencedOutboundMessage} with the corresponding sequence from that request. It must send at least one + * message in response to the request, and the last message in the sequence must implement LastHttpContent. + * + * If messages are sent after the last message is sent, the behaviour is undefined. + * + * There is no limit to the amount of messages that this handler will buffer for a particular sequence number. It is + * the responsibility of the handler sending the outbound messages to handle back pressure via promises associated + * with each write event - if this is done, the buffering will be inherently bounded by back pressure. * - * @author Christopher Hunt + * This handler does however put a bound on the maximum number of in flight requests that it will handle, configured by + * inFlightRequestsLowWatermark and inFlightRequestsHighWatermark. When the high watermark is exceeded, the handler will + * push back on the client. When the low watermark is reached, the handler will start reading again. This back pressure + * mechanism only works if ChannelOptions.AUTO_READ is false. + * + * This back pressure is implemented by blocking channelReadComplete events, so assumes that the following handlers + * will not request reading unless they receive this event. Note that the handler does nothing to actually block + * incoming requests when the high watermark is reached, it only pushes back on the TCP connection. If there are more + * requests in the TCP buffer before this back pressure takes effect, these requests will still be sent to the following + * handlers. + * + * If channelReadComplete is invoked while the high watermark is reached, then when the low watermark is reached, this + * will be fired again, to signal demand. */ -public class HttpPipeliningHandler extends SimpleChannelHandler { +public class HttpPipeliningHandler extends ChannelDuplexHandler { + + /** + * The sequence of received HTTP requests. + */ + private int receiveSequence = 0; + + /** + * The currently sending sequence of HTTP requests. + */ + private int currentlySendingSequence = 1; - public static final int INITIAL_EVENTS_HELD = 3; - public static final int MAX_EVENTS_HELD = 10000; + /** + * Whether the high watermark has been exceeded. + */ + private boolean highWatermarkReached = false; + + /** + * If the high watermark has been exceeded, whether a channel read complete has occurred. + */ + private boolean channelReadCompleteOccurred = false; - private final int maxEventsHeld; + /** + * A write message with a promise of when it's written. + */ + private static class WriteMessage { + /** + * The written message. + */ + final SequencedOutboundMessage message; + + /** + * The future that is redeemed once the message is written. + */ + final ChannelPromise promise; + + public WriteMessage(SequencedOutboundMessage message, ChannelPromise promise) { + this.message = message; + this.promise = promise; + } + } - private int sequence; - private int nextRequiredSequence; - private int nextRequiredSubsequence; + /** + * The buffered events, by sequence + */ + private final IntObjectMap> bufferedEvents = new IntObjectHashMap<>(); - private final Queue holdingQueue; + private final int inFlightRequestsLowWatermark; + private final int inFlightRequestsHighWatermark; + /** + * Create the pipelining handler with low and high watermarks of 2 and 4. + */ public HttpPipeliningHandler() { - this(MAX_EVENTS_HELD); + this(2, 4); } /** - * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel - * connection. This is required as events cannot queue up indefintely; we would run out of - * memory if this was the case. + * Create the pipelining handler. + * + * @param inFlightRequestsLowWatermark The low watermark for in flight requests, where, if the high watermark has + * been exceeded, the handler will start reading again. Must be at least 0. + * @param inFlightRequestsHighWatermark The high watermark, once in flight requests has exceeded this, the handler + * will stop reading, pushing back on the client. Must be greater than the + * low watermark. */ - public HttpPipeliningHandler(final int maxEventsHeld) { - this.maxEventsHeld = maxEventsHeld; - - holdingQueue = new PriorityQueue(INITIAL_EVENTS_HELD, new Comparator() { - @Override - public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) { - final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence(); - if (delta == 0) { - return o1.getSubsequence() - o2.getSubsequence(); - } else { - return delta; - } - } - }); + public HttpPipeliningHandler(int inFlightRequestsLowWatermark, int inFlightRequestsHighWatermark) { + if (inFlightRequestsLowWatermark < 0) { + throw new IllegalArgumentException("inFlightRequestsLowWatermark must be an least 0, was " + inFlightRequestsLowWatermark); + } + if (inFlightRequestsHighWatermark <= inFlightRequestsLowWatermark) { + throw new IllegalArgumentException("inFlightRequestsHighWatermark must be greater than inFlightRequestsLowWatermark, but was " + inFlightRequestsHighWatermark); + } + this.inFlightRequestsLowWatermark = inFlightRequestsLowWatermark; + this.inFlightRequestsHighWatermark = inFlightRequestsHighWatermark; } - public int getMaxEventsHeld() { - return maxEventsHeld; + private int inFlight() { + return receiveSequence - currentlySendingSequence + 1; } @Override - public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { - final Object msg = e.getMessage(); - if (msg instanceof HttpRequest) { - ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress())); + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // Only forward read complete if we haven't exceeded the high watermark + if (!highWatermarkReached) { + ctx.fireChannelReadComplete(); } else { - ctx.sendUpstream(e); + // Store the fact that read complete has been requested. + channelReadCompleteOccurred = true; + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + Object toSend = msg; + + // Wrap message in sequenced if it needs to be + if (msg instanceof HttpRequest) { + receiveSequence++; + HttpRequest request = (HttpRequest) msg; + toSend = new SequencedHttpRequest(receiveSequence, request); + } + + // If we've reached the end of an http request, and we're at or over the high watermark, + // set it to reached. + if (msg instanceof LastHttpContent && inFlight() >= inFlightRequestsHighWatermark) { + highWatermarkReached = true; } + + ctx.fireChannelRead(toSend); } @Override - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - if (e instanceof OrderedDownstreamChannelEvent) { - - boolean channelShouldClose = false; - - synchronized (holdingQueue) { - if (holdingQueue.size() < maxEventsHeld) { - - final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e; - holdingQueue.add(currentEvent); - - while (!holdingQueue.isEmpty()) { - final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek(); - if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence | - nextEvent.getSubsequence() != nextRequiredSubsequence) { - break; - } - holdingQueue.remove(); - ctx.sendDownstream(nextEvent.getChannelEvent()); - if (nextEvent.isLast()) { - ++nextRequiredSequence; - nextRequiredSubsequence = 0; - } else { - ++nextRequiredSubsequence; - } - } - - } else { - channelShouldClose = true; - } + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof SequencedOutboundMessage) { + SequencedOutboundMessage sequenced = (SequencedOutboundMessage) msg; + + writeInSequence(ctx, sequenced, promise); + } else { + ctx.write(msg, promise); + } + } + + private void progressToNextSendingSequence(ChannelHandlerContext ctx) { + currentlySendingSequence++; + + int inFlight = this.inFlight(); + // If we're now at the low water mark, set it to false + if (highWatermarkReached && inFlight == inFlightRequestsLowWatermark) { + highWatermarkReached = false; + + // Check if, while we were over the high watermark, a channel read had occurred + // that we blocked + if (channelReadCompleteOccurred) { + // Send it on + ctx.fireChannelReadComplete(); + channelReadCompleteOccurred = false; + } + } + } + + /** + * Write the next sequences, if buffered. + */ + private void flushNextSequences(ChannelHandlerContext ctx) { + progressToNextSendingSequence(ctx); + + List toFlush = bufferedEvents.get(currentlySendingSequence); + + // Loop while we still have a sequence to flush + while (toFlush != null) { + + bufferedEvents.remove(currentlySendingSequence); + + WriteMessage lastWritten = null; + + // Flush each event + for (WriteMessage message: toFlush) { + ctx.write(message.message.getMessage(), message.promise); + lastWritten = message; } - if (channelShouldClose) { - Channels.close(e.getChannel()); + // If the last message that we wrote was the last message for that sequence, + // then increment the sequence and maybe get the next sequence from the buffer. + if (lastWritten != null && lastWritten.message.getMessage() instanceof LastHttpContent) { + progressToNextSendingSequence(ctx); + toFlush = bufferedEvents.get(currentlySendingSequence); + } else { + toFlush = null; } - } else { - super.handleDownstream(ctx, e); } } + /** + * Write the message in sequence. + */ + private void writeInSequence(ChannelHandlerContext ctx, SequencedOutboundMessage sequenced, ChannelPromise promise) { + if (sequenced.getSequence() == currentlySendingSequence) { + ctx.write(sequenced.getMessage(), promise); + if (sequenced.getMessage() instanceof LastHttpContent) { + flushNextSequences(ctx); + } + } else { + List sequenceBuffer = bufferedEvents.get(sequenced.getSequence()); + if (sequenceBuffer == null) { + sequenceBuffer = new ArrayList<>(); + bufferedEvents.put(sequenced.getSequence(), sequenceBuffer); + } + sequenceBuffer.add(new WriteMessage(sequenced, promise)); + } + } } diff --git a/src/main/java/com/typesafe/netty/http/pipelining/OrderedDownstreamChannelEvent.java b/src/main/java/com/typesafe/netty/http/pipelining/OrderedDownstreamChannelEvent.java deleted file mode 100644 index 979bf5a..0000000 --- a/src/main/java/com/typesafe/netty/http/pipelining/OrderedDownstreamChannelEvent.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.typesafe.netty.http.pipelining; - -import org.jboss.netty.channel.*; - -/** - * Permits downstream channel events to be ordered and signalled as to whether more are to come for a given sequence. - * - * @author Christopher Hunt - */ -public class OrderedDownstreamChannelEvent implements ChannelEvent { - - final ChannelEvent ce; - final OrderedUpstreamMessageEvent oue; - final int subsequence; - final boolean last; - - /** - * Construct a downstream channel event for all types of events. - * - * @param oue the OrderedUpstreamMessageEvent that this response is associated with - * @param subsequence the sequence within the sequence - * @param last when set to true this indicates that there are no more responses to be received for the - * original OrderedUpstreamMessageEvent - */ - public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last, - final ChannelEvent ce) { - this.oue = oue; - this.ce = ce; - this.subsequence = subsequence; - this.last = last; - } - - /** - * Convenience constructor signifying that this downstream message event is the last one for the given sequence, - * and that there is only one response. - */ - public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oe, - final Object message) { - this(oe, 0, true, message); - } - - /** - * Convenience constructor for passing message events. - */ - public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last, - final Object message) { - this(oue, subsequence, last, new DownstreamMessageEvent(oue.getChannel(), Channels.future(oue.getChannel()), - message, oue.getRemoteAddress())); - - } - - public OrderedUpstreamMessageEvent getOrderedUpstreamMessageEvent() { - return oue; - } - - public int getSubsequence() { - return subsequence; - } - - public boolean isLast() { - return last; - } - - @Override - public Channel getChannel() { - return ce.getChannel(); - } - - @Override - public ChannelFuture getFuture() { - return ce.getFuture(); - } - - public ChannelEvent getChannelEvent() { - return ce; - } -} diff --git a/src/main/java/com/typesafe/netty/http/pipelining/OrderedUpstreamMessageEvent.java b/src/main/java/com/typesafe/netty/http/pipelining/OrderedUpstreamMessageEvent.java deleted file mode 100644 index af272f1..0000000 --- a/src/main/java/com/typesafe/netty/http/pipelining/OrderedUpstreamMessageEvent.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.typesafe.netty.http.pipelining; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.UpstreamMessageEvent; - -import java.net.SocketAddress; - -/** - * Permits upstream message events to be ordered. - * - * @author Christopher Hunt - */ -public class OrderedUpstreamMessageEvent extends UpstreamMessageEvent { - final int sequence; - - public OrderedUpstreamMessageEvent(final int sequence, final Channel channel, final Object msg, final SocketAddress remoteAddress) { - super(channel, msg, remoteAddress); - this.sequence = sequence; - } - - public int getSequence() { - return sequence; - } - -} diff --git a/src/main/java/com/typesafe/netty/http/pipelining/SequencedHttpRequest.java b/src/main/java/com/typesafe/netty/http/pipelining/SequencedHttpRequest.java new file mode 100644 index 0000000..0b1270a --- /dev/null +++ b/src/main/java/com/typesafe/netty/http/pipelining/SequencedHttpRequest.java @@ -0,0 +1,50 @@ +package com.typesafe.netty.http.pipelining; + +import io.netty.handler.codec.http.HttpRequest; + +/** + * A sequenced HTTP request. + * + * The sequence number should be used to send responses (and their data) wrapped in a SequencedOutboundMessage. + */ +public class SequencedHttpRequest { + private final int sequence; + private final HttpRequest httpRequest; + + /** + * Create a sequenced HTTP request. + * + * @param sequence The sequence. + * @param httpRequest The http request. + */ + public SequencedHttpRequest(int sequence, HttpRequest httpRequest) { + this.sequence = sequence; + this.httpRequest = httpRequest; + } + + /** + * Get the sequence for this HTTP request. + * + * @return The sequence. + */ + public int getSequence() { + return sequence; + } + + /** + * Get the HTTP request. + * + * @return The HTTP request. + */ + public HttpRequest getHttpRequest() { + return httpRequest; + } + + @Override + public String toString() { + return "SequencedHttpRequest{" + + "sequence=" + sequence + + ", httpRequest=" + httpRequest + + '}'; + } +} diff --git a/src/main/java/com/typesafe/netty/http/pipelining/SequencedOutboundMessage.java b/src/main/java/com/typesafe/netty/http/pipelining/SequencedOutboundMessage.java new file mode 100644 index 0000000..060ec60 --- /dev/null +++ b/src/main/java/com/typesafe/netty/http/pipelining/SequencedOutboundMessage.java @@ -0,0 +1,55 @@ +package com.typesafe.netty.http.pipelining; + +import io.netty.handler.codec.http.HttpObject; + +/** + * A sequenced outbound message. + * + * This allows a handler to send one to many outbound messages associated with a sequence number from a + * {@link SequencedHttpRequest}, such that all the outbound messages for a particular sequence number are sent before + * any of the messages for the next sequence number are sent. + * + * For each SequencedHttpRequest received, downstream handlers are required to send zero to many messages that don't + * extend LastHttpContent, followed by exactly one message that does extend LastHttpContent. + */ +public class SequencedOutboundMessage { + private final int sequence; + private final HttpObject message; + + /** + * Create a sequenced outbound message. + * + * @param sequence The sequence of the {@link SequencedHttpRequest} that this outbound message is associated with. + * @param message The message. + */ + public SequencedOutboundMessage(int sequence, HttpObject message) { + this.sequence = sequence; + this.message = message; + } + + /** + * Get the sequence. + * + * @return The sequence. + */ + public int getSequence() { + return sequence; + } + + /** + * Get the message. + * + * @return The message. + */ + public HttpObject getMessage() { + return message; + } + + @Override + public String toString() { + return "SequencedOutboundMessage{" + + "sequence=" + sequence + + ", message=" + message + + '}'; + } +} diff --git a/src/test/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandlerTest.java b/src/test/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandlerTest.java index 8c4fe26..1aa8a79 100644 --- a/src/test/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandlerTest.java +++ b/src/test/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandlerTest.java @@ -1,200 +1,340 @@ package com.typesafe.netty.http.pipelining; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.handler.codec.http.*; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.jboss.netty.buffer.ChannelBuffers.EMPTY_BUFFER; -import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CHUNKED; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.jboss.netty.util.CharsetUtil.*; -import static org.junit.Assert.assertTrue; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; +import org.junit.*; + +import java.net.SocketAddress; +import java.nio.charset.Charset; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; public class HttpPipeliningHandlerTest { - private static final long RESPONSE_TIMEOUT = 10000L; - private static final long CONNECTION_TIMEOUT = 10000L; - private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8"; - private static final InetSocketAddress HOST_ADDR = new InetSocketAddress("127.0.0.1", 9080); - private static final String PATH1 = "/1"; - private static final String PATH2 = "/2"; - private static final String SOME_RESPONSE_TEXT = "some response for "; - - private ClientBootstrap clientBootstrap; - private ServerBootstrap serverBootstrap; + private static NioEventLoopGroup group; + private static Channel serverBindChannel; + private static ServerHandler serverHandler; + + @BeforeClass + public static void startServer() throws Exception { + group = new NioEventLoopGroup(); + ServerBootstrap serverBootstrap = new ServerBootstrap() + .group(group) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.AUTO_READ, false) + .localAddress("127.0.0.1", 0) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast( + new HttpRequestDecoder(), + new HttpResponseEncoder(), + new HttpPipeliningHandler(2, 4), + serverHandler + ); + } + }); - private CountDownLatch responsesIn; - private final List responses = new ArrayList(2); + serverBindChannel = await(serverBootstrap.bind()).channel(); + } - private HashedWheelTimer timer; + private Channel clientChannel; + private Channel serverChannel; + private BlockingQueue serverMessagesReceived; + private BlockingQueue clientMessagesReceived; @Before - public void setUp() { - clientBootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newSingleThreadExecutor(), - Executors.newSingleThreadExecutor())); - - clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new HttpClientCodec(), - new ClientHandler() - ); - } - }); - - serverBootstrap = new ServerBootstrap( - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), - Executors.newCachedThreadPool())); - - serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new HttpRequestDecoder(), - new HttpResponseEncoder(), - new HttpPipeliningHandler(), - new ServerHandler() - ); - } - }); - - serverBootstrap.bind(HOST_ADDR); - - timer = new HashedWheelTimer(); + public void setUpConnection() throws Exception { + serverMessagesReceived = new LinkedBlockingQueue<>(); + clientMessagesReceived = new LinkedBlockingQueue<>(); + final Promise serverChannelPromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); + + serverHandler = new ServerHandler(serverMessagesReceived, serverChannelPromise); + + Bootstrap clientBootstrap = new Bootstrap() + .group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast( + new HttpClientCodec(), + new HttpObjectAggregator(1048576), + new ClientHandler(clientMessagesReceived) + ); + } + }); + + SocketAddress bindAddress = serverBindChannel.localAddress(); + + clientChannel = await(clientBootstrap.connect(bindAddress)).channel(); + serverChannel = await(serverChannelPromise).get(); } @After - public void shutDown() { - timer.stop(); + public void closeConnection() throws Exception { + await(clientChannel.close()); + } - serverBootstrap.shutdown(); - serverBootstrap.releaseExternalResources(); - clientBootstrap.shutdown(); - clientBootstrap.releaseExternalResources(); + @AfterClass + public static void shutDownServer() throws Exception { + await(serverBindChannel.close()); + group.shutdownGracefully(); } @Test - public void shouldReturnMessagesInOrder() throws InterruptedException { - responsesIn = new CountDownLatch(1); - responses.clear(); + public void noPipelining() throws Exception { + // Request with simple response + clientMakeRequest("/"); + SequencedHttpRequest request1 = serverReceiveRequest(); + serverWrite(new SequencedOutboundMessage(request1.getSequence(), createResponse("")), true); + clientReceiveResponse(); + + // Request with chunked response + clientMakeRequest("/"); + SequencedHttpRequest request2 = serverReceiveRequest(); + serverSendChunkedResponse(request2, true, "hello", " ", "world"); + assertEquals("hello world", clientReceiveResponseBody()); + + // Another request with chunked response, but don't wait for back pressure + clientMakeRequest("/"); + SequencedHttpRequest request3 = serverReceiveRequest(); + serverSendChunkedResponse(request3, false, "foo", " ", "bar"); + serverChannel.flush(); + assertEquals("foo bar", clientReceiveResponseBody()); + + // And another request with simple response + clientMakeRequest("/"); + SequencedHttpRequest request4 = serverReceiveRequest(); + serverWrite(new SequencedOutboundMessage(request4.getSequence(), createResponse("simple")), true); + assertEquals("simple", clientReceiveResponseBody()); + } + + @Test + public void withPipelining() throws Exception { + // Request with simple response + clientMakeRequest("/"); + SequencedHttpRequest request1 = serverReceiveRequest(); + + // Request with chunked response + clientMakeRequest("/"); + SequencedHttpRequest request2 = serverReceiveRequest(); + + // Another request with chunked response + clientMakeRequest("/"); + SequencedHttpRequest request3 = serverReceiveRequest(); + + // Another request with simple response + clientMakeRequest("/"); + SequencedHttpRequest request4 = serverReceiveRequest(); - final ChannelFuture connectionFuture = clientBootstrap.connect(HOST_ADDR); + // Write the responses to request 2 and 4 + serverSendChunkedResponse(request2, false, "this", " is", " request", " 2"); + serverSendChunkedResponse(request4, false, "this", " is", " request", " 4"); - assertTrue(connectionFuture.await(CONNECTION_TIMEOUT)); - final Channel clientChannel = connectionFuture.getChannel(); + // Write the first response + serverSendChunkedResponse(request1, true, "this", " is", " request", " 1"); - final HttpRequest request1 = new DefaultHttpRequest( - HTTP_1_1, HttpMethod.GET, PATH1); - request1.headers().set(HOST, HOST_ADDR.toString()); + assertEquals("this is request 1", clientReceiveResponseBody()); + assertEquals("this is request 2", clientReceiveResponseBody()); - final HttpRequest request2 = new DefaultHttpRequest( - HTTP_1_1, HttpMethod.GET, PATH2); - request2.headers().set(HOST, HOST_ADDR.toString()); + // Write the response to 3 + serverSendChunkedResponse(request3, true, "this", " is", " request", " 3"); - clientChannel.write(request1); - clientChannel.write(request2); - responsesIn.await(RESPONSE_TIMEOUT, MILLISECONDS); + assertEquals("this is request 3", clientReceiveResponseBody()); + assertEquals("this is request 4", clientReceiveResponseBody()); - assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH1)); - assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH2)); + // Send/receive one more to make sure no buffering is done + clientMakeRequest("/"); + SequencedHttpRequest request5 = serverReceiveRequest(); + serverSendChunkedResponse(request5, true, "this", " is", " request", " 5"); + assertEquals("this is request 5", clientReceiveResponseBody()); } - public class ClientHandler extends SimpleChannelUpstreamHandler { - @Override - public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { - final Object message = e.getMessage(); - if (message instanceof HttpChunk) { - final HttpChunk response = (HttpChunk) e.getMessage(); - if (!response.isLast()) { - final String content = response.getContent().toString(UTF_8); - responses.add(content); - if (content.equals(SOME_RESPONSE_TEXT + PATH2)) { - responsesIn.countDown(); - } - } - } + @Test + public void backPressure() throws Exception { + // Send + clientMakeRequest("/1"); + SequencedHttpRequest request1 = serverReceiveRequest(); // inFlight = 1 + clientMakeRequest("/2"); + SequencedHttpRequest request2 = serverReceiveRequest(); // inFlight = 2 + clientMakeRequest("/3"); + SequencedHttpRequest request3 = serverReceiveRequest(); // inFlight = 3 + // This write will cause the high water mark to be reached + clientMakeRequest("/4"); + SequencedHttpRequest request4 = serverReceiveRequest(); // inFlight = 4 + + // High watermark is exceeded, next write should never be received + clientMakeRequest("/5"); + assertNull(serverMessagesReceived.poll(200, TimeUnit.MILLISECONDS)); + + // Until we reach the low water mark (2), the server still should not be receiving the messages + serverSendChunkedResponse(request1, true, "1"); + assertEquals("1", clientReceiveResponseBody()); // inFlight = 3 + assertNull(serverMessagesReceived.poll(200, TimeUnit.MILLISECONDS)); + serverSendChunkedResponse(request2, true, "2"); + assertEquals("2", clientReceiveResponseBody()); // inFlight = 2 + + // Since inFlight dropped to 2, we should now be able to receive the request on the server + SequencedHttpRequest request5 = serverReceiveRequest(); // inFlight = 3 + + clientMakeRequest("/6"); + SequencedHttpRequest request6 = serverReceiveRequest(); // inFlight = 4 + + // High water mark exceeded again, write will be blocked + clientMakeRequest("/7"); + assertNull(serverMessagesReceived.poll(200, TimeUnit.MILLISECONDS)); + + serverSendChunkedResponse(request3, true, "3"); + assertEquals("3", clientReceiveResponseBody()); // inFlight = 3 + assertNull(serverMessagesReceived.poll(200, TimeUnit.MILLISECONDS)); + + serverSendChunkedResponse(request4, true, "4"); + assertEquals("4", clientReceiveResponseBody()); // inFlight = 2 + + SequencedHttpRequest request7 = serverReceiveRequest(); // inFlight = 3 + + serverSendChunkedResponse(request5, true, "5"); + assertEquals("5", clientReceiveResponseBody()); // inFlight = 2 + serverSendChunkedResponse(request6, true, "6"); + assertEquals("6", clientReceiveResponseBody()); // inFlight = 1 + serverSendChunkedResponse(request7, true, "7"); + assertEquals("7", clientReceiveResponseBody()); // inFlight = 0 + } + + private void serverSendChunkedResponse(SequencedHttpRequest request, boolean await, String... messages) throws Exception { + serverWrite(new SequencedOutboundMessage(request.getSequence(), createChunkedResponse()), await); + for (String message: messages) { + serverWrite(new SequencedOutboundMessage(request.getSequence(), createContent(message)), await); } + serverWrite(new SequencedOutboundMessage(request.getSequence(), createLastContent()), await); } - public class ServerHandler extends SimpleChannelUpstreamHandler { - private final AtomicBoolean sendFinalChunk = new AtomicBoolean(false); + private void serverWrite(Object message, boolean await) throws Exception { + ChannelFuture future = serverChannel.write(message); + if (await) { + serverChannel.flush(); + await(future); + } + } - @Override - public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws InterruptedException { - final HttpRequest request = (HttpRequest) e.getMessage(); + private void clientMakeRequest(String path) throws Exception { + ChannelFuture future = clientChannel.write(createRequest(path)); + clientChannel.flush(); + await(future); + } + + private FullHttpMessage createRequest(String path) { + FullHttpMessage request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); + HttpHeaders.setContentLength(request, 0); + return request; + } + private FullHttpResponse createResponse(String content) { + ByteBuf buffer = Unpooled.copiedBuffer(content, Charset.defaultCharset()); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buffer); + HttpHeaders.setContentLength(response, buffer.readableBytes()); + return response; + } + private HttpResponse createChunkedResponse() { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + HttpHeaders.setTransferEncodingChunked(response); + return response; + } + private HttpContent createContent(String content) { + return new DefaultHttpContent(Unpooled.copiedBuffer(content, Charset.defaultCharset())); + } + private HttpContent createLastContent() { + return new DefaultLastHttpContent(); + } - final OrderedUpstreamMessageEvent oue = (OrderedUpstreamMessageEvent) e; - final String uri = request.getUri(); + private String clientReceiveResponseBody() throws Exception { + return clientReceiveResponse().content().toString(Charset.forName("utf-8")); + } - final HttpResponse initialChunk = new DefaultHttpResponse(HTTP_1_1, OK); - initialChunk.headers().set(CONTENT_TYPE, CONTENT_TYPE_TEXT); - initialChunk.headers().set(CONNECTION, KEEP_ALIVE); - initialChunk.headers().set(TRANSFER_ENCODING, CHUNKED); + private FullHttpResponse clientReceiveResponse() throws Exception { + return clientReceive(FullHttpResponse.class); + } - ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, 0, false, initialChunk)); + private SequencedHttpRequest serverReceiveRequest() throws Exception { + SequencedHttpRequest request = serverReceive(SequencedHttpRequest.class); + HttpObject message = request.getHttpRequest(); + while (!(message instanceof LastHttpContent)) { + message = serverReceive(HttpContent.class); + } + return request; + } - timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, 1), 0, MILLISECONDS); + private T clientReceive(Class clazz) throws Exception { + Object message = clientMessagesReceived.poll(2, TimeUnit.SECONDS); + assertNotNull(message); + return clazz.cast(message); + } + + private T serverReceive(Class clazz) throws Exception { + Object message = serverMessagesReceived.poll(2, TimeUnit.SECONDS); + assertNotNull(message); + return clazz.cast(message); + } + + static class ClientHandler extends ChannelInboundHandlerAdapter { + private final Queue received; + + public ClientHandler(Queue received) { + this.received = received; } - private class ChunkWriter implements TimerTask { - private final ChannelHandlerContext ctx; - private final MessageEvent e; - private final String uri; - private final OrderedUpstreamMessageEvent oue; - private final int subSequence; - - public ChunkWriter(final ChannelHandlerContext ctx, final MessageEvent e, final String uri, - final OrderedUpstreamMessageEvent oue, final int subSequence) { - this.ctx = ctx; - this.e = e; - this.uri = uri; - this.oue = oue; - this.subSequence = subSequence; - } - - @Override - public void run(final Timeout timeout) { - if (sendFinalChunk.get() && subSequence > 1) { - final HttpChunk finalChunk = new DefaultHttpChunk(EMPTY_BUFFER); - ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, true, finalChunk)); - } else { - final HttpChunk chunk = new DefaultHttpChunk(copiedBuffer(SOME_RESPONSE_TEXT + uri, UTF_8)); - ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, false, chunk)); - - timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, subSequence + 1), 0, MILLISECONDS); - - if (uri.equals(PATH2)) { - sendFinalChunk.set(true); - } - } - } + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.add(msg); + } + } + + static class ServerHandler extends ChannelInboundHandlerAdapter { + private final Queue received; + private final Promise channelPromise; + public ServerHandler(Queue received, Promise channelPromise) { + this.received = received; + this.channelPromise = channelPromise; } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.add(msg); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.read(); + channelPromise.setSuccess(ctx.channel()); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.read(); + } + } + + private static , T> F await(F future) throws Exception { + assertTrue(future.await(2000)); + return future; } } From 84fae601f6554ce3744a681aba71b25d7a8df8da Mon Sep 17 00:00:00 2001 From: James Roper Date: Sun, 28 Jun 2015 09:13:51 +1000 Subject: [PATCH 2/2] Fixed potential reference count leak on close --- .../pipelining/HttpPipeliningHandler.java | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java b/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java index 17879b3..5e4ddd6 100644 --- a/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java +++ b/src/main/java/com/typesafe/netty/http/pipelining/HttpPipeliningHandler.java @@ -3,6 +3,7 @@ import io.netty.channel.*; import io.netty.handler.codec.http.*; +import io.netty.util.ReferenceCountUtil; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; @@ -59,6 +60,11 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler { */ private boolean channelReadCompleteOccurred = false; + /** + * Whether the channel is inactive - if it's inactive, we should not buffer events to prevent leaks. + */ + private boolean inactive = true; + /** * A write message with a promise of when it's written. */ @@ -220,12 +226,35 @@ private void writeInSequence(ChannelHandlerContext ctx, SequencedOutboundMessage flushNextSequences(ctx); } } else { - List sequenceBuffer = bufferedEvents.get(sequenced.getSequence()); - if (sequenceBuffer == null) { - sequenceBuffer = new ArrayList<>(); - bufferedEvents.put(sequenced.getSequence(), sequenceBuffer); + if (!inactive) { + List sequenceBuffer = bufferedEvents.get(sequenced.getSequence()); + if (sequenceBuffer == null) { + sequenceBuffer = new ArrayList<>(); + bufferedEvents.put(sequenced.getSequence(), sequenceBuffer); + } + sequenceBuffer.add(new WriteMessage(sequenced, promise)); + } + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + cleanup(); + inactive = true; + super.channelInactive(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + cleanup(); + super.handlerRemoved(ctx); + } + + private void cleanup() { + for (List messages: bufferedEvents.values()) { + for (WriteMessage message: messages) { + ReferenceCountUtil.release(message.message.getMessage()); } - sequenceBuffer.add(new WriteMessage(sequenced, promise)); } } }