|  | 
| 1 | 1 | package com.typesafe.netty.http.pipelining; | 
| 2 | 2 | 
 | 
| 3 |  | -import org.jboss.netty.channel.*; | 
| 4 |  | -import org.jboss.netty.handler.codec.http.HttpRequest; | 
|  | 3 | + | 
|  | 4 | +import io.netty.channel.*; | 
|  | 5 | +import io.netty.handler.codec.http.*; | 
|  | 6 | +import io.netty.util.collection.IntObjectHashMap; | 
|  | 7 | +import io.netty.util.collection.IntObjectMap; | 
| 5 | 8 | 
 | 
| 6 | 9 | import java.util.*; | 
| 7 | 10 | 
 | 
| 8 | 11 | /** | 
| 9 | 12 |  * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their | 
| 10 |  | - * corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will | 
| 11 |  | - * cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely | 
| 12 |  | - * OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects. | 
|  | 13 | + * corresponding requests. | 
|  | 14 | + * | 
|  | 15 | + * Each incoming request is assigned a sequence number, which is provided in the wrapping {@link SequencedHttpRequest}. | 
|  | 16 | + * When a handler writes a response and a response body, it must wrap each of those messages in a | 
|  | 17 | + * {@link SequencedOutboundMessage} with the corresponding sequence from that request.  It must send at least one | 
|  | 18 | + * message in response to the request, and the last message in the sequence must implement LastHttpContent. | 
|  | 19 | + * | 
|  | 20 | + * If messages are sent after the last message is sent, the behaviour is undefined. | 
|  | 21 | + * | 
|  | 22 | + * There is no limit to the amount of messages that this handler will buffer for a particular sequence number.  It is | 
|  | 23 | + * the responsibility of the handler sending the outbound messages to handle back pressure via promises associated | 
|  | 24 | + * with each write event - if this is done, the buffering will be inherently bounded by back pressure. | 
| 13 | 25 |  * | 
| 14 |  | - * @author Christopher Hunt | 
|  | 26 | + * This handler does however put a bound on the maximum number of in flight requests that it will handle, configured by | 
|  | 27 | + * inFlightRequestsLowWatermark and inFlightRequestsHighWatermark. When the high watermark is exceeded, the handler will | 
|  | 28 | + * push back on the client. When the low watermark is reached, the handler will start reading again.  This back pressure | 
|  | 29 | + * mechanism only works if ChannelOptions.AUTO_READ is false. | 
|  | 30 | + * | 
|  | 31 | + * This back pressure is implemented by blocking channelReadComplete events, so assumes that the following handlers | 
|  | 32 | + * will not request reading unless they receive this event.  Note that the handler does nothing to actually block | 
|  | 33 | + * incoming requests when the high watermark is reached, it only pushes back on the TCP connection.  If there are more | 
|  | 34 | + * requests in the TCP buffer before this back pressure takes effect, these requests will still be sent to the following | 
|  | 35 | + * handlers. | 
|  | 36 | + * | 
|  | 37 | + * If channelReadComplete is invoked while the high watermark is reached, then when the low watermark is reached, this | 
|  | 38 | + * will be fired again, to signal demand. | 
| 15 | 39 |  */ | 
| 16 |  | -public class HttpPipeliningHandler extends SimpleChannelHandler { | 
|  | 40 | +public class HttpPipeliningHandler extends ChannelDuplexHandler { | 
|  | 41 | + | 
|  | 42 | +    /** | 
|  | 43 | +     * The sequence of received HTTP requests. | 
|  | 44 | +     */ | 
|  | 45 | +    private int receiveSequence = 0; | 
|  | 46 | + | 
|  | 47 | +    /** | 
|  | 48 | +     * The currently sending sequence of HTTP requests. | 
|  | 49 | +     */ | 
|  | 50 | +    private int currentlySendingSequence = 1; | 
| 17 | 51 | 
 | 
| 18 |  | -    public static final int INITIAL_EVENTS_HELD = 3; | 
| 19 |  | -    public static final int MAX_EVENTS_HELD = 10000; | 
|  | 52 | +    /** | 
|  | 53 | +     * Whether the high watermark has been exceeded. | 
|  | 54 | +     */ | 
|  | 55 | +    private boolean highWatermarkReached = false; | 
|  | 56 | + | 
|  | 57 | +    /** | 
|  | 58 | +     * If the high watermark has been exceeded, whether a channel read complete has occurred. | 
|  | 59 | +     */ | 
|  | 60 | +    private boolean channelReadCompleteOccurred = false; | 
| 20 | 61 | 
 | 
| 21 |  | -    private final int maxEventsHeld; | 
|  | 62 | +    /** | 
|  | 63 | +     * A write message with a promise of when it's written. | 
|  | 64 | +     */ | 
|  | 65 | +    private static class WriteMessage { | 
|  | 66 | +        /** | 
|  | 67 | +         * The written message. | 
|  | 68 | +         */ | 
|  | 69 | +        final SequencedOutboundMessage message; | 
|  | 70 | + | 
|  | 71 | +        /** | 
|  | 72 | +         * The future that is redeemed once the message is written. | 
|  | 73 | +         */ | 
|  | 74 | +        final ChannelPromise promise; | 
|  | 75 | + | 
|  | 76 | +        public WriteMessage(SequencedOutboundMessage message, ChannelPromise promise) { | 
|  | 77 | +            this.message = message; | 
|  | 78 | +            this.promise = promise; | 
|  | 79 | +        } | 
|  | 80 | +    } | 
| 22 | 81 | 
 | 
| 23 |  | -    private int sequence; | 
| 24 |  | -    private int nextRequiredSequence; | 
| 25 |  | -    private int nextRequiredSubsequence; | 
|  | 82 | +    /** | 
|  | 83 | +     * The buffered events, by sequence | 
|  | 84 | +     */ | 
|  | 85 | +    private final IntObjectMap<List<WriteMessage>> bufferedEvents = new IntObjectHashMap<>(); | 
| 26 | 86 | 
 | 
| 27 |  | -    private final Queue<OrderedDownstreamChannelEvent> holdingQueue; | 
|  | 87 | +    private final int inFlightRequestsLowWatermark; | 
|  | 88 | +    private final int inFlightRequestsHighWatermark; | 
| 28 | 89 | 
 | 
|  | 90 | +    /** | 
|  | 91 | +     * Create the pipelining handler with low and high watermarks of 2 and 4. | 
|  | 92 | +     */ | 
| 29 | 93 |     public HttpPipeliningHandler() { | 
| 30 |  | -        this(MAX_EVENTS_HELD); | 
|  | 94 | +        this(2, 4); | 
| 31 | 95 |     } | 
| 32 | 96 | 
 | 
| 33 | 97 |     /** | 
| 34 |  | -     * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel | 
| 35 |  | -     *                      connection. This is required as events cannot queue up indefintely; we would run out of | 
| 36 |  | -     *                      memory if this was the case. | 
|  | 98 | +     * Create the pipelining handler. | 
|  | 99 | +     * | 
|  | 100 | +     * @param inFlightRequestsLowWatermark The low watermark for in flight requests, where, if the high watermark has | 
|  | 101 | +     *                                     been exceeded, the handler will start reading again. Must be at least 0. | 
|  | 102 | +     * @param inFlightRequestsHighWatermark The high watermark, once in flight requests has exceeded this, the handler | 
|  | 103 | +     *                                      will stop reading, pushing back on the client.  Must be greater than the | 
|  | 104 | +     *                                      low watermark. | 
| 37 | 105 |      */ | 
| 38 |  | -    public HttpPipeliningHandler(final int maxEventsHeld) { | 
| 39 |  | -        this.maxEventsHeld = maxEventsHeld; | 
| 40 |  | - | 
| 41 |  | -        holdingQueue = new PriorityQueue<OrderedDownstreamChannelEvent>(INITIAL_EVENTS_HELD, new Comparator<OrderedDownstreamChannelEvent>() { | 
| 42 |  | -            @Override | 
| 43 |  | -            public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) { | 
| 44 |  | -                final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence(); | 
| 45 |  | -                if (delta == 0) { | 
| 46 |  | -                    return o1.getSubsequence() - o2.getSubsequence(); | 
| 47 |  | -                } else { | 
| 48 |  | -                    return delta; | 
| 49 |  | -                } | 
| 50 |  | -            } | 
| 51 |  | -        }); | 
|  | 106 | +    public HttpPipeliningHandler(int inFlightRequestsLowWatermark, int inFlightRequestsHighWatermark) { | 
|  | 107 | +        if (inFlightRequestsLowWatermark < 0) { | 
|  | 108 | +            throw new IllegalArgumentException("inFlightRequestsLowWatermark must be an least 0, was " + inFlightRequestsLowWatermark); | 
|  | 109 | +        } | 
|  | 110 | +        if (inFlightRequestsHighWatermark <= inFlightRequestsLowWatermark) { | 
|  | 111 | +            throw new IllegalArgumentException("inFlightRequestsHighWatermark must be greater than inFlightRequestsLowWatermark, but was " + inFlightRequestsHighWatermark); | 
|  | 112 | +        } | 
|  | 113 | +        this.inFlightRequestsLowWatermark = inFlightRequestsLowWatermark; | 
|  | 114 | +        this.inFlightRequestsHighWatermark = inFlightRequestsHighWatermark; | 
| 52 | 115 |     } | 
| 53 | 116 | 
 | 
| 54 |  | -    public int getMaxEventsHeld() { | 
| 55 |  | -        return maxEventsHeld; | 
|  | 117 | +    private int inFlight() { | 
|  | 118 | +        return receiveSequence - currentlySendingSequence + 1; | 
| 56 | 119 |     } | 
| 57 | 120 | 
 | 
| 58 | 121 |     @Override | 
| 59 |  | -    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { | 
| 60 |  | -        final Object msg = e.getMessage(); | 
| 61 |  | -        if (msg instanceof HttpRequest) { | 
| 62 |  | -            ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress())); | 
|  | 122 | +    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { | 
|  | 123 | +        // Only forward read complete if we haven't exceeded the high watermark | 
|  | 124 | +        if (!highWatermarkReached) { | 
|  | 125 | +            ctx.fireChannelReadComplete(); | 
| 63 | 126 |         } else { | 
| 64 |  | -            ctx.sendUpstream(e); | 
|  | 127 | +            // Store the fact that read complete has been requested. | 
|  | 128 | +            channelReadCompleteOccurred = true; | 
|  | 129 | +        } | 
|  | 130 | +    } | 
|  | 131 | + | 
|  | 132 | +    @Override | 
|  | 133 | +    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | 
|  | 134 | +        Object toSend = msg; | 
|  | 135 | + | 
|  | 136 | +        // Wrap message in sequenced if it needs to be | 
|  | 137 | +        if (msg instanceof HttpRequest) { | 
|  | 138 | +            receiveSequence++; | 
|  | 139 | +            HttpRequest request = (HttpRequest) msg; | 
|  | 140 | +            toSend = new SequencedHttpRequest(receiveSequence, request); | 
|  | 141 | +        } | 
|  | 142 | + | 
|  | 143 | +        // If we've reached the end of an http request, and we're at or over the high watermark, | 
|  | 144 | +        // set it to reached. | 
|  | 145 | +        if (msg instanceof LastHttpContent && inFlight() >= inFlightRequestsHighWatermark) { | 
|  | 146 | +            highWatermarkReached = true; | 
| 65 | 147 |         } | 
|  | 148 | + | 
|  | 149 | +        ctx.fireChannelRead(toSend); | 
| 66 | 150 |     } | 
| 67 | 151 | 
 | 
| 68 | 152 |     @Override | 
| 69 |  | -    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) | 
| 70 |  | -            throws Exception { | 
| 71 |  | -        if (e instanceof OrderedDownstreamChannelEvent) { | 
| 72 |  | - | 
| 73 |  | -            boolean channelShouldClose = false; | 
| 74 |  | - | 
| 75 |  | -            synchronized (holdingQueue) { | 
| 76 |  | -                if (holdingQueue.size() < maxEventsHeld) { | 
| 77 |  | - | 
| 78 |  | -                    final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e; | 
| 79 |  | -                    holdingQueue.add(currentEvent); | 
| 80 |  | - | 
| 81 |  | -                    while (!holdingQueue.isEmpty()) { | 
| 82 |  | -                        final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek(); | 
| 83 |  | -                        if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence | | 
| 84 |  | -                                nextEvent.getSubsequence() != nextRequiredSubsequence) { | 
| 85 |  | -                            break; | 
| 86 |  | -                        } | 
| 87 |  | -                        holdingQueue.remove(); | 
| 88 |  | -                        ctx.sendDownstream(nextEvent.getChannelEvent()); | 
| 89 |  | -                        if (nextEvent.isLast()) { | 
| 90 |  | -                            ++nextRequiredSequence; | 
| 91 |  | -                            nextRequiredSubsequence = 0; | 
| 92 |  | -                        } else { | 
| 93 |  | -                            ++nextRequiredSubsequence; | 
| 94 |  | -                        } | 
| 95 |  | -                    } | 
| 96 |  | - | 
| 97 |  | -                } else { | 
| 98 |  | -                    channelShouldClose = true; | 
| 99 |  | -                } | 
|  | 153 | +    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { | 
|  | 154 | +        if (msg instanceof SequencedOutboundMessage) { | 
|  | 155 | +            SequencedOutboundMessage sequenced = (SequencedOutboundMessage) msg; | 
|  | 156 | + | 
|  | 157 | +            writeInSequence(ctx, sequenced, promise); | 
|  | 158 | +        } else { | 
|  | 159 | +            ctx.write(msg, promise); | 
|  | 160 | +        } | 
|  | 161 | +    } | 
|  | 162 | + | 
|  | 163 | +    private void progressToNextSendingSequence(ChannelHandlerContext ctx) { | 
|  | 164 | +        currentlySendingSequence++; | 
|  | 165 | + | 
|  | 166 | +        int inFlight = this.inFlight(); | 
|  | 167 | +        // If we're now at the low water mark, set it to false | 
|  | 168 | +        if (highWatermarkReached && inFlight == inFlightRequestsLowWatermark) { | 
|  | 169 | +            highWatermarkReached = false; | 
|  | 170 | + | 
|  | 171 | +            // Check if, while we were over the high watermark, a channel read had occurred | 
|  | 172 | +            // that we blocked | 
|  | 173 | +            if (channelReadCompleteOccurred) { | 
|  | 174 | +                // Send it on | 
|  | 175 | +                ctx.fireChannelReadComplete(); | 
|  | 176 | +                channelReadCompleteOccurred = false; | 
|  | 177 | +            } | 
|  | 178 | +        } | 
|  | 179 | +    } | 
|  | 180 | + | 
|  | 181 | +    /** | 
|  | 182 | +     * Write the next sequences, if buffered. | 
|  | 183 | +     */ | 
|  | 184 | +    private void flushNextSequences(ChannelHandlerContext ctx) { | 
|  | 185 | +        progressToNextSendingSequence(ctx); | 
|  | 186 | + | 
|  | 187 | +        List<WriteMessage> toFlush = bufferedEvents.get(currentlySendingSequence); | 
|  | 188 | + | 
|  | 189 | +        // Loop while we still have a sequence to flush | 
|  | 190 | +        while (toFlush != null) { | 
|  | 191 | + | 
|  | 192 | +            bufferedEvents.remove(currentlySendingSequence); | 
|  | 193 | + | 
|  | 194 | +            WriteMessage lastWritten = null; | 
|  | 195 | + | 
|  | 196 | +            // Flush each event | 
|  | 197 | +            for (WriteMessage message: toFlush) { | 
|  | 198 | +                ctx.write(message.message.getMessage(), message.promise); | 
|  | 199 | +                lastWritten = message; | 
| 100 | 200 |             } | 
| 101 | 201 | 
 | 
| 102 |  | -            if (channelShouldClose) { | 
| 103 |  | -                Channels.close(e.getChannel()); | 
|  | 202 | +            // If the last message that we wrote was the last message for that sequence, | 
|  | 203 | +            // then increment the sequence and maybe get the next sequence from the buffer. | 
|  | 204 | +            if (lastWritten != null && lastWritten.message.getMessage() instanceof LastHttpContent) { | 
|  | 205 | +                progressToNextSendingSequence(ctx); | 
|  | 206 | +                toFlush = bufferedEvents.get(currentlySendingSequence); | 
|  | 207 | +            } else { | 
|  | 208 | +                toFlush = null; | 
| 104 | 209 |             } | 
| 105 |  | -        } else { | 
| 106 |  | -            super.handleDownstream(ctx, e); | 
| 107 | 210 |         } | 
| 108 | 211 |     } | 
| 109 | 212 | 
 | 
|  | 213 | +    /** | 
|  | 214 | +     * Write the message in sequence. | 
|  | 215 | +     */ | 
|  | 216 | +    private void writeInSequence(ChannelHandlerContext ctx, SequencedOutboundMessage sequenced, ChannelPromise promise) { | 
|  | 217 | +        if (sequenced.getSequence() == currentlySendingSequence) { | 
|  | 218 | +            ctx.write(sequenced.getMessage(), promise); | 
|  | 219 | +            if (sequenced.getMessage() instanceof LastHttpContent) { | 
|  | 220 | +                flushNextSequences(ctx); | 
|  | 221 | +            } | 
|  | 222 | +        } else { | 
|  | 223 | +            List<WriteMessage> sequenceBuffer = bufferedEvents.get(sequenced.getSequence()); | 
|  | 224 | +            if (sequenceBuffer == null) { | 
|  | 225 | +                sequenceBuffer = new ArrayList<>(); | 
|  | 226 | +                bufferedEvents.put(sequenced.getSequence(), sequenceBuffer); | 
|  | 227 | +            } | 
|  | 228 | +            sequenceBuffer.add(new WriteMessage(sequenced, promise)); | 
|  | 229 | +        } | 
|  | 230 | +    } | 
| 110 | 231 | } | 
0 commit comments