Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestStatus;

/**
* A http response that will be transferred via chunked encoding when handled by {@link Netty4HttpPipeliningHandler}.
*/
public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4RestResponse {
public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4HttpResponse, HttpResponse {

private final int sequence;

Expand All @@ -38,4 +39,14 @@ public ChunkedRestResponseBody body() {
public int getSequence() {
return sequence;
}

@Override
public void addHeader(String name, String value) {
headers().add(name, value);
}

@Override
public boolean containsHeader(String name) {
return headers().contains(name);
}
Comment on lines +43 to +51
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I feel it is natural for Netty4HttpResponse to extend HttpResponse so that these methods can be kept at interface level. Or we should have a comment somewhere to explain why future refactor should not make such change.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently working on a (possible) enhancement that would allow us to split responses over multiple such messages, for which it only makes sense for the first message in the sequence to extend HttpResponse and carry stuff like headers. If that lands then this'll explain why we've cut down this interface, and if it doesn't then we can pull these back up to the super-interface.

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;

public final class Netty4FullHttpResponse extends DefaultFullHttpResponse implements Netty4RestResponse {
public final class Netty4FullHttpResponse extends DefaultFullHttpResponse implements Netty4HttpResponse, HttpResponse {

private final int sequence;

Expand All @@ -29,4 +30,14 @@ public final class Netty4FullHttpResponse extends DefaultFullHttpResponse implem
public int getSequence() {
return sequence;
}

@Override
public void addHeader(String name, String value) {
headers().add(name, value);
}

@Override
public boolean containsHeader(String name) {
return headers().contains(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
private final Logger logger;

private final int maxEventsHeld;
private final PriorityQueue<Tuple<? extends Netty4RestResponse, ChannelPromise>> outboundHoldingQueue;
private final PriorityQueue<Tuple<? extends Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;

private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {}

Expand Down Expand Up @@ -136,10 +136,10 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws IOException {
assert msg instanceof Netty4RestResponse : "Invalid message type: " + msg.getClass();
assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();
boolean success = false;
try {
final Netty4RestResponse restResponse = (Netty4RestResponse) msg;
final Netty4HttpResponse restResponse = (Netty4HttpResponse) msg;
if (restResponse.getSequence() != writeSequence) {
assert restResponse.getSequence() > writeSequence
: "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]";
Expand Down Expand Up @@ -174,7 +174,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann

private void doWriteQueued(ChannelHandlerContext ctx) throws IOException {
while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) {
final Tuple<? extends Netty4RestResponse, ChannelPromise> top = outboundHoldingQueue.poll();
final Tuple<? extends Netty4HttpResponse, ChannelPromise> top = outboundHoldingQueue.poll();
assert top != null : "we know the outbound holding queue to not be empty at this point";
doWrite(ctx, top.v1(), top.v2());
}
Expand All @@ -191,7 +191,7 @@ private void doWriteQueued(ChannelHandlerContext ctx) throws IOException {
SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99);
}

private void doWrite(ChannelHandlerContext ctx, Netty4RestResponse readyResponse, ChannelPromise promise) throws IOException {
private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse, ChannelPromise promise) throws IOException {
assert currentChunkedWrite == null : "unexpected existing write [" + currentChunkedWrite + "]";
assert readyResponse != null : "cannot write null response";
assert readyResponse.getSequence() == writeSequence;
Expand Down Expand Up @@ -347,11 +347,11 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
safeFailPromise(currentChunkedWrite.onDone, new ClosedChannelException());
currentChunkedWrite = null;
}
List<Tuple<? extends Netty4RestResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses();
List<Tuple<? extends Netty4HttpResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses();

if (inflightResponses.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
for (Tuple<? extends Netty4RestResponse, ChannelPromise> inflightResponse : inflightResponses) {
for (Tuple<? extends Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
safeFailPromise(inflightResponse.v2(), closedChannelException);
}
}
Expand Down Expand Up @@ -396,8 +396,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}

private List<Tuple<? extends Netty4RestResponse, ChannelPromise>> removeAllInflightResponses() {
ArrayList<Tuple<? extends Netty4RestResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue);
private List<Tuple<? extends Netty4HttpResponse, ChannelPromise>> removeAllInflightResponses() {
ArrayList<Tuple<? extends Netty4HttpResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue);
outboundHoldingQueue.clear();
return responses;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

/**
* Super-interface for responses handled by the Netty4 HTTP transport.
*/
public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse {
/**
* @return The sequence number for the request which corresponds with this response, for making sure that we send responses to pipelined
* requests in the correct order.
*/
int getSequence();
}

This file was deleted.