Skip to content

Commit

Permalink
Merge pull request #144 from NiteshKant/master
Browse files Browse the repository at this point in the history
Fixes issue #143
  • Loading branch information
NiteshKant committed Jun 16, 2014
2 parents f6807c0 + 92044ff commit 2337be3
Showing 1 changed file with 163 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@
*/
package io.reactivex.netty.protocol.http.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -38,6 +45,7 @@ public class HttpServerResponse<T> extends DefaultChannelWriter<T> {
private final HttpResponseHeaders headers;
private final HttpResponse nettyResponse;
private final AtomicBoolean headerWritten = new AtomicBoolean();
private volatile boolean fullResponseWritten;
private ChannelFuture headerWriteFuture;

public HttpServerResponse(ChannelHandlerContext ctx) {
Expand Down Expand Up @@ -75,7 +83,7 @@ public Observable<Void> _close() {

writeHeadersIfNotWritten();

if (headers.isTransferEncodingChunked() || headers.isKeepAlive()) {
if (!fullResponseWritten && (headers.isTransferEncodingChunked() || headers.isKeepAlive())) {
writeOnChannel(new DefaultLastHttpContent()); // This indicates end of response for netty. If this is not
// sent for keep-alive connections, netty's HTTP codec will not know that the response has ended and hence
// will ignore the subsequent HTTP header writes. See issue: https://github.com/Netflix/RxNetty/issues/130
Expand All @@ -93,11 +101,51 @@ boolean isHeaderWritten() {

@Override
protected ChannelFuture writeOnChannel(Object msg) {
/**
* The following code either sends a single FullHttpResponse or assures that the headers are written before
* writing any content.
*
* A single FullHttpResponse will be written, if and only if,
* -- The passed object (to be written) is a ByteBuf instance and it's readable bytes are equal to the
* content-length header value set.
* -- There is no content ever to be written (content length header is set to zero).
*
* We resort to writing a FullHttpResponse in above scenarios to reduce the overhead of write (executing
* netty's pipeline)
*/
if (!HttpServerResponse.class.isAssignableFrom(msg.getClass())) {
if (msg instanceof ByteBuf) {
ByteBuf content = (ByteBuf) msg;
long contentLength = headers.getContentLength(-1);
if (-1 != contentLength && contentLength == content.readableBytes()) {
if (headerWritten.compareAndSet(false, true)) {
// The passed object (to be written) is a ByteBuf instance and it's readable bytes are equal to the
// content-length header value set.
// So write full response instead of header, content & last HTTP content.
return writeFullResponse((ByteBuf) msg);
}
}
}
writeHeadersIfNotWritten();
} else {
long contentLength = headers.getContentLength(-1);
if (0 == contentLength) {
if (headerWritten.compareAndSet(false, true)) {
// There is no content ever to be written (content length header is set to zero).
// So write full response instead of header & last HTTP content.
return writeFullResponse((ByteBuf) msg);
}
}
// There is no reason to call writeHeadersIfNotWritten() as this is the call to actually write the headers.
}

return super.writeOnChannel(msg);
return super.writeOnChannel(msg); // Write the message as is if we did not write FullHttpResponse.
}

private ChannelFuture writeFullResponse(ByteBuf content) {
fullResponseWritten = true;
FullHttpResponse fhr = new DelegatingFullHttpResponse(nettyResponse, content);
return super.writeOnChannel(fhr);
}

protected void writeHeadersIfNotWritten() {
Expand All @@ -122,4 +170,117 @@ protected void writeHeadersIfNotWritten() {
headerWriteFuture = super.writeOnChannel(this);
}
}

/**
* An implementation of {@link FullHttpResponse} which can be composed of already created headers and content
* separately. The implementation provided by netty does not provide a way to do this.
*/
private static class DelegatingFullHttpResponse implements FullHttpResponse {

private final HttpResponse headers;
private final ByteBuf content;
private final HttpHeaders trailingHeaders;

public DelegatingFullHttpResponse(HttpResponse headers, ByteBuf content) {
this.headers = headers;
this.content = content;
trailingHeaders = new DefaultHttpHeaders(false);
}

public static FullHttpResponse newWithNoContent(HttpResponse headers, ByteBufAllocator allocator) {
headers.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
return new DelegatingFullHttpResponse(headers, allocator.buffer(0));
}

@Override
public FullHttpResponse copy() {
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(getProtocolVersion(), getStatus(), content.copy());
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}

@Override
public HttpContent duplicate() {
DefaultFullHttpResponse dup = new DefaultFullHttpResponse(getProtocolVersion(), getStatus(),
content.duplicate());
dup.headers().set(headers());
dup.trailingHeaders().set(trailingHeaders());
return dup;
}

@Override
public FullHttpResponse retain(int increment) {
content.retain(increment);
return this;
}

@Override
public FullHttpResponse retain() {
content.retain();
return this;
}

@Override
public FullHttpResponse setProtocolVersion(HttpVersion version) {
headers.setProtocolVersion(version);
return this;
}

@Override
public FullHttpResponse setStatus(HttpResponseStatus status) {
headers.setStatus(status);
return this;
}

@Override
public ByteBuf content() {
return content;
}

@Override
public HttpResponseStatus getStatus() {
return headers.getStatus();
}

@Override
public HttpVersion getProtocolVersion() {
return headers.getProtocolVersion();
}

@Override
public HttpHeaders headers() {
return headers.headers();
}

@Override
public HttpHeaders trailingHeaders() {
return trailingHeaders;
}

@Override
public DecoderResult getDecoderResult() {
return DecoderResult.SUCCESS;
}

@Override
public void setDecoderResult(DecoderResult result) {
// No op as we use this only for write.
}

@Override
public int refCnt() {
return content.refCnt();
}

@Override
public boolean release() {
return content.release();
}

@Override
public boolean release(int decrement) {
return content.release(decrement);
}
}
}

0 comments on commit 2337be3

Please sign in to comment.