diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java index cbcb807f335dc..f5f32bf333779 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java @@ -12,13 +12,14 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.ChunkedRestResponseBody; import org.elasticsearch.rest.RestStatus; /** * A http response that will be transferred via chunked encoding when handled by {@link Netty4HttpPipeliningHandler}. */ -public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4RestResponse { +public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4HttpResponse, HttpResponse { private final int sequence; @@ -38,4 +39,14 @@ public ChunkedRestResponseBody body() { public int getSequence() { return sequence; } + + @Override + public void addHeader(String name, String value) { + headers().add(name, value); + } + + @Override + public boolean containsHeader(String name) { + return headers().contains(name); + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpResponse.java index 2a5a1fb6e05d8..a350427c75ec5 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpResponse.java @@ -13,10 +13,11 @@ import io.netty.handler.codec.http.HttpVersion; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.netty4.Netty4Utils; -public final class Netty4FullHttpResponse extends DefaultFullHttpResponse implements Netty4RestResponse { +public final class Netty4FullHttpResponse extends DefaultFullHttpResponse implements Netty4HttpResponse, HttpResponse { private final int sequence; @@ -29,4 +30,14 @@ public final class Netty4FullHttpResponse extends DefaultFullHttpResponse implem public int getSequence() { return sequence; } + + @Override + public void addHeader(String name, String value) { + headers().add(name, value); + } + + @Override + public boolean containsHeader(String name) { + return headers().contains(name); + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 669c9714dd8a1..a7ecb85bda47c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -55,7 +55,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { private final Logger logger; private final int maxEventsHeld; - private final PriorityQueue> outboundHoldingQueue; + private final PriorityQueue> outboundHoldingQueue; private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {} @@ -136,10 +136,10 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws IOException { - assert msg instanceof Netty4RestResponse : "Invalid message type: " + msg.getClass(); + assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass(); boolean success = false; try { - final Netty4RestResponse restResponse = (Netty4RestResponse) msg; + final Netty4HttpResponse restResponse = (Netty4HttpResponse) msg; if (restResponse.getSequence() != writeSequence) { assert restResponse.getSequence() > writeSequence : "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]"; @@ -174,7 +174,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann private void doWriteQueued(ChannelHandlerContext ctx) throws IOException { while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) { - final Tuple top = outboundHoldingQueue.poll(); + final Tuple top = outboundHoldingQueue.poll(); assert top != null : "we know the outbound holding queue to not be empty at this point"; doWrite(ctx, top.v1(), top.v2()); } @@ -191,7 +191,7 @@ private void doWriteQueued(ChannelHandlerContext ctx) throws IOException { SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99); } - private void doWrite(ChannelHandlerContext ctx, Netty4RestResponse readyResponse, ChannelPromise promise) throws IOException { + private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse, ChannelPromise promise) throws IOException { assert currentChunkedWrite == null : "unexpected existing write [" + currentChunkedWrite + "]"; assert readyResponse != null : "cannot write null response"; assert readyResponse.getSequence() == writeSequence; @@ -347,11 +347,11 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) { safeFailPromise(currentChunkedWrite.onDone, new ClosedChannelException()); currentChunkedWrite = null; } - List> inflightResponses = removeAllInflightResponses(); + List> inflightResponses = removeAllInflightResponses(); if (inflightResponses.isEmpty() == false) { ClosedChannelException closedChannelException = new ClosedChannelException(); - for (Tuple inflightResponse : inflightResponses) { + for (Tuple inflightResponse : inflightResponses) { safeFailPromise(inflightResponse.v2(), closedChannelException); } } @@ -396,8 +396,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } - private List> removeAllInflightResponses() { - ArrayList> responses = new ArrayList<>(outboundHoldingQueue); + private List> removeAllInflightResponses() { + ArrayList> responses = new ArrayList<>(outboundHoldingQueue); outboundHoldingQueue.clear(); return responses; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java new file mode 100644 index 0000000000000..3396b13cdab0f --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +/** + * Super-interface for responses handled by the Netty4 HTTP transport. + */ +public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse { + /** + * @return The sequence number for the request which corresponds with this response, for making sure that we send responses to pipelined + * requests in the correct order. + */ + int getSequence(); +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java deleted file mode 100644 index eacdaa765add4..0000000000000 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.http.netty4; - -import io.netty.handler.codec.http.HttpMessage; - -import org.elasticsearch.http.HttpResponse; - -public sealed interface Netty4RestResponse extends HttpResponse, HttpMessage permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse { - - int getSequence(); - - @Override - default void addHeader(String name, String value) { - headers().add(name, value); - } - - @Override - default boolean containsHeader(String name) { - return headers().contains(name); - } -}