From 719398d84fdbd620580b913d071cc8b8b3938896 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Wed, 26 Nov 2025 17:31:31 +0100 Subject: [PATCH 01/12] Close RestClient after MSearch API calls to reliably close HttpChannels Signed-off-by: Sergei Ustimenko --- .../http/DetailedErrorsDisabledIT.java | 23 ++++++++++--------- .../http/DetailedErrorsEnabledIT.java | 9 ++++---- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java index cb8951c860a39..bbaba1017d2a5 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java @@ -32,6 +32,10 @@ package org.opensearch.http; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + import java.io.IOException; import org.apache.hc.core5.http.ContentType; @@ -51,10 +55,6 @@ import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; - /** * Tests that when disabling detailed errors, a request with the error_trace parameter returns an HTTP 400 response. */ @@ -65,9 +65,9 @@ public class DetailedErrorsDisabledIT extends HttpSmokeTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED.getKey(), false) - .build(); + .put(super.nodeSettings(nodeOrdinal)) + .put(HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED.getKey(), false) + .build(); } public void testThatErrorTraceParamReturns400() throws IOException, ParseException { @@ -79,7 +79,7 @@ public void testThatErrorTraceParamReturns400() throws IOException, ParseExcepti Response response = e.getResponse(); assertThat(response.getHeader("Content-Type"), is("application/json; charset=UTF-8")); assertThat(EntityUtils.toString(e.getResponse().getEntity()), - containsString("\"error\":\"error traces in responses are disabled.\"")); + containsString("\"error\":\"error traces in responses are disabled.\"")); assertThat(response.getStatusLine().getStatusCode(), is(400)); } @@ -92,9 +92,10 @@ public void testDetailedStackTracesAreNotIncludedWhenErrorTraceIsDisabledForBulk byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); request.setEntity(new ByteArrayEntity(requestBody, ContentType.APPLICATION_JSON)); - Response response = getRestClient().performRequest(request); - - assertThat(EntityUtils.toString(response.getEntity()), not(containsString("stack_trace"))); + try (var restClient = getRestClient()) { + Response response = restClient.performRequest(request); + assertThat(EntityUtils.toString(response.getEntity()), not(containsString("stack_trace"))); + } } } diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java index b105ddb097c94..11bf4454e59de 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java @@ -92,10 +92,11 @@ public void testDetailedStackTracesAreIncludedWhenErrorTraceIsExplicitlyEnabledF byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); request.setEntity(new ByteArrayEntity(requestBody, ContentType.APPLICATION_JSON)); - Response response = getRestClient().performRequest(request); - - assertThat(EntityUtils.toString(response.getEntity()), - containsString("\"stack_trace\":\"[missing_index] IndexNotFoundException[no such index [missing_index]]")); + try (var restClient = getRestClient()) { + Response response = restClient.performRequest(request); + assertThat(EntityUtils.toString(response.getEntity()), + containsString("\"stack_trace\":\"[missing_index] IndexNotFoundException[no such index [missing_index]]")); + } } } From a76f224a61b632b907a24173862fe83ef58e3170 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Wed, 26 Nov 2025 17:42:25 +0100 Subject: [PATCH 02/12] Add the CHANGELOG entry Signed-off-by: Sergei Ustimenko --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1838db443bd06..d715cb10b0e6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix ClassCastException in FlightClientChannel for requests larger than 16KB ([#20010](https://github.com/opensearch-project/OpenSearch/pull/20010)) - Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937)) - Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948)) +- Fix flaky DetailedErrorsDisabledIT and DetailedErrorsEnabledIT ([#20106](https://github.com/opensearch-project/OpenSearch/pull/20106)) ### Dependencies - Bump Apache Lucene from 10.3.1 to 10.3.2 ([#20026](https://github.com/opensearch-project/OpenSearch/pull/20026)) From b975d8e55b04c4a15ca173a41fdcfeb18c9da3b4 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Wed, 26 Nov 2025 23:43:15 +0100 Subject: [PATCH 03/12] Keep track of Reactor Netty 4 Transport accepted Http Channels Signed-off-by: Sergei Ustimenko --- .../Netty4HttpChannelsReleaseIntegTests.java | 92 +++++++++++++++++++ ...orNetty4HttpChannelsReleaseIntegTests.java | 92 +++++++++++++++++++ .../ReactorNetty4HttpServerChannel.java | 2 +- .../ReactorNetty4HttpServerTransport.java | 74 +++++++++++++++ .../transport/reactor/netty4/Netty4Utils.java | 28 ++++++ .../http/DetailedErrorsDisabledIT.java | 7 +- .../http/DetailedErrorsEnabledIT.java | 9 +- 7 files changed, 294 insertions(+), 10 deletions(-) create mode 100644 modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java create mode 100644 plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java new file mode 100644 index 0000000000000..df29b8cfe2b53 --- /dev/null +++ b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.netty4; + +import org.opensearch.OpenSearchNetty4IntegTestCase; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.rest.action.RestCancellableNodeClient; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) +public class Netty4HttpChannelsReleaseIntegTests extends OpenSearchNetty4IntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + private ThreadPool threadPool; + + @Before + public void createThreadPool() { + threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void stopThreadPool() { + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + } + + public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws InterruptedException { + String testIndex = "test_idx"; + assertAcked(client().admin().indices().prepareCreate(testIndex)); + + int initialHttpChannels = RestCancellableNodeClient.getNumChannels(); + int numChannels = randomIntBetween(50, 100); + CountDownLatch countDownLatch = new CountDownLatch(numChannels); + for (int i = 0; i < numChannels; i++) { + threadPool.generic().execute(() -> { + executeRequest(testIndex); + countDownLatch.countDown(); + }); + } + countDownLatch.await(); + + // no channels get closed in this test, hence we expect as many channels as we created in the map + assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels()); + assertEquals(0, RestCancellableNodeClient.getNumTasks()); + } + + /** + * Execute a Search request against the given index. The Search requests are tracked + * by the RestCancellableNodeClient to verify that channels are released properly. + * + * @param index the index to search against + */ + private static void executeRequest(String index) { + try { + Request request = new Request("GET", "/" + index + "/_search"); + SearchSourceBuilder searchSource = new SearchSourceBuilder().query(new MatchAllQueryBuilder()); + request.setJsonEntity(Strings.toString(MediaTypeRegistry.JSON, searchSource)); + Response response = getRestClient().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } catch (IOException e) { + throw new IllegalStateException("Failed to execute the request", e); + } + } + +} diff --git a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java new file mode 100644 index 0000000000000..92fb3a5ced9d1 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.OpenSearchReactorNetty4IntegTestCase; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.rest.action.RestCancellableNodeClient; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) +public class ReactorNetty4HttpChannelsReleaseIntegTests extends OpenSearchReactorNetty4IntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + private ThreadPool threadPool; + + @Before + public void createThreadPool() { + threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void stopThreadPool() { + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + } + + public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws InterruptedException { + String testIndex = "test_idx"; + assertAcked(client().admin().indices().prepareCreate(testIndex)); + + int initialHttpChannels = RestCancellableNodeClient.getNumChannels(); + int numChannels = randomIntBetween(50, 100); + CountDownLatch countDownLatch = new CountDownLatch(numChannels); + for (int i = 0; i < numChannels; i++) { + threadPool.generic().execute(() -> { + executeRequest(testIndex); + countDownLatch.countDown(); + }); + } + countDownLatch.await(); + + // no channels get closed in this test, hence we expect as many channels as we created in the map + assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels()); + assertEquals(0, RestCancellableNodeClient.getNumTasks()); + } + + /** + * Execute a Search request against the given index. The Search requests are tracked + * by the RestCancellableNodeClient to verify that channels are released properly. + * + * @param index the index to search against + */ + private static void executeRequest(String index) { + try { + Request request = new Request("GET", "/" + index + "/_search"); + SearchSourceBuilder searchSource = new SearchSourceBuilder().query(new MatchAllQueryBuilder()); + request.setJsonEntity(Strings.toString(MediaTypeRegistry.JSON, searchSource)); + Response response = getRestClient().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } catch (IOException e) { + throw new IllegalStateException("Failed to execute the request", e); + } + } + +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java index 84360bf028ba9..a9e86e52f60a1 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java @@ -48,6 +48,6 @@ public void close() { @Override public String toString() { - return "ReactorNetty4HttpChannel{localAddress=" + getLocalAddress() + "}"; + return "ReactorNetty4HttpServerChannel{localAddress=" + getLocalAddress() + "}"; } } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index 6f2a6b206608d..610facb6df3a0 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -10,6 +10,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.common.Nullable; +import org.opensearch.common.concurrent.CompletableContext; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -19,12 +20,14 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.io.IOUtils; import org.opensearch.common.util.net.NetUtils; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.AbstractHttpServerTransport; import org.opensearch.http.HttpChannel; import org.opensearch.http.HttpReadTimeoutException; +import org.opensearch.http.HttpResponse; import org.opensearch.http.HttpServerChannel; import org.opensearch.http.reactor.netty4.ssl.SslUtils; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; @@ -39,12 +42,14 @@ import javax.net.ssl.KeyManagerFactory; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.SocketOption; import java.time.Duration; import java.util.Arrays; import java.util.Optional; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.http.DefaultLastHttpContent; @@ -62,6 +67,8 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.netty.ChannelPipelineConfigurer; +import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; import reactor.netty.http.HttpProtocol; import reactor.netty.http.server.HttpServer; @@ -157,6 +164,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor /** * Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}). + * * @param settings settings * @param networkService network service * @param bigArrays big array allocator @@ -194,6 +202,7 @@ public ReactorNetty4HttpServerTransport( /** * Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}). + * * @param settings settings * @param networkService network service * @param bigArrays big array allocator @@ -232,6 +241,7 @@ public ReactorNetty4HttpServerTransport( /** * Binds the transport engine to the socket address + * * @param socketAddress socket address to bind to */ @Override @@ -261,6 +271,7 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti private HttpServer configure(final HttpServer server) throws Exception { HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)) + .doOnChannelInit(new ChannelInitializer(this)) .childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) { @@ -347,6 +358,7 @@ private HttpServer configure(final HttpServer server) throws Exception { /** * Handles incoming Reactor Netty request + * * @param request request instance * @param response response instances * @return response publisher @@ -457,4 +469,66 @@ public void onException(HttpChannel channel, Exception cause) { super.onException(channel, cause); } } + + /** + * A pipeline configurer to be added as a part of channel initialization process + * to register accepted channels with the transport. This allows the transport + * to properly track open channels. + */ + private static class ChannelInitializer implements ChannelPipelineConfigurer { + private final ReactorNetty4HttpServerTransport transport; + + private ChannelInitializer(ReactorNetty4HttpServerTransport transport) { + this.transport = transport; + } + + @Override + public void onChannelInit(ConnectionObserver connectionObserver, Channel channel, SocketAddress remoteAddress) { + transport.serverAcceptedChannel(new ClosableHttpChannelWrapper(channel)); + } + } + + /** + * A wrapper to close the underlying Netty channel. + */ + private static class ClosableHttpChannelWrapper implements HttpChannel { + private final Channel channel; + private final CompletableContext closeContext = new CompletableContext<>(); + + private ClosableHttpChannelWrapper(Channel channel) { + this.channel = channel; + Netty4Utils.addListener(this.channel.closeFuture(), closeContext); + } + + @Override + public void sendResponse(HttpResponse response, ActionListener listener) { + channel.writeAndFlush(response, Netty4Utils.addPromise(listener, channel)); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) channel.remoteAddress(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + } + } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java index 8ec432b7dd5cd..3d40a1477933a 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java @@ -12,6 +12,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.common.Booleans; import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; @@ -25,7 +26,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; import io.netty.util.NettyRuntime; /** @@ -139,4 +142,29 @@ public static void addListener(ChannelFuture channelFuture, CompletableContext listener, Channel channel) { + ChannelPromise writePromise = channel.newPromise(); + writePromise.addListener(f -> { + if (f.isSuccess()) { + listener.onResponse(null); + } else { + final Throwable cause = f.cause(); + ExceptionsHelper.maybeDieOnAnotherThread(cause); + if (cause instanceof Error) { + listener.onFailure(new Exception(cause)); + } else { + listener.onFailure((Exception) cause); + } + } + }); + return writePromise; + } } diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java index bbaba1017d2a5..3d73fa319f761 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsDisabledIT.java @@ -92,10 +92,9 @@ public void testDetailedStackTracesAreNotIncludedWhenErrorTraceIsDisabledForBulk byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); request.setEntity(new ByteArrayEntity(requestBody, ContentType.APPLICATION_JSON)); - try (var restClient = getRestClient()) { - Response response = restClient.performRequest(request); - assertThat(EntityUtils.toString(response.getEntity()), not(containsString("stack_trace"))); - } + Response response = getRestClient().performRequest(request); + + assertThat(EntityUtils.toString(response.getEntity()), not(containsString("stack_trace"))); } } diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java index 11bf4454e59de..b105ddb097c94 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/DetailedErrorsEnabledIT.java @@ -92,11 +92,10 @@ public void testDetailedStackTracesAreIncludedWhenErrorTraceIsExplicitlyEnabledF byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); request.setEntity(new ByteArrayEntity(requestBody, ContentType.APPLICATION_JSON)); - try (var restClient = getRestClient()) { - Response response = restClient.performRequest(request); - assertThat(EntityUtils.toString(response.getEntity()), - containsString("\"stack_trace\":\"[missing_index] IndexNotFoundException[no such index [missing_index]]")); - } + Response response = getRestClient().performRequest(request); + + assertThat(EntityUtils.toString(response.getEntity()), + containsString("\"stack_trace\":\"[missing_index] IndexNotFoundException[no such index [missing_index]]")); } } From 4abed820aed074f57233c3e26d2a4ce867e15dc8 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Thu, 27 Nov 2025 13:32:04 +0100 Subject: [PATCH 04/12] Update CHANGELOG Signed-off-by: Sergei Ustimenko --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2256833a62f62..aa0eda9ff1340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,7 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix ClassCastException in FlightClientChannel for requests larger than 16KB ([#20010](https://github.com/opensearch-project/OpenSearch/pull/20010)) - Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937)) - Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948)) -- Fix flaky DetailedErrorsDisabledIT and DetailedErrorsEnabledIT ([#20106](https://github.com/opensearch-project/OpenSearch/pull/20106)) +- Keep track and release Reactor Netty 4 Transport accepted Http Channels during the Node shutdown ([#20106](https://github.com/opensearch-project/OpenSearch/pull/20106)) - Fix deletion failure/error of unused index template; case when an index template matches a data stream but has a lower priority. ([#20102](https://github.com/opensearch-project/OpenSearch/pull/20102)) - Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([20105](https://github.com/opensearch-project/OpenSearch/pull/20105)) From 6fa89ac4d096a311aa90225ed501d02680899a55 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Thu, 27 Nov 2025 14:01:07 +0100 Subject: [PATCH 05/12] Add missing visibility modifier to RestCancellableNodeClient#getNumTasks Signed-off-by: Sergei Ustimenko --- .../org/opensearch/rest/action/RestCancellableNodeClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java b/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java index f7705d69cf75a..18d822686db5c 100644 --- a/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java +++ b/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java @@ -83,7 +83,7 @@ public static int getNumChannels() { /** * Returns the number of tasks tracked globally. */ - static int getNumTasks() { + public static int getNumTasks() { return httpChannels.values().stream().mapToInt(CloseListener::getNumTasks).sum(); } From 4e56c55d543e673342c9ab3918c053e672b4e709 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Thu, 27 Nov 2025 14:50:30 +0100 Subject: [PATCH 06/12] Extract channel wrapper into a separate class and extend the get() for proper context access Signed-off-by: Sergei Ustimenko --- .../netty4/ReactorNetty4BaseHttpChannel.java | 3 +- .../netty4/ReactorNetty4HttpChannel.java | 96 +++++++++++++++++++ .../ReactorNetty4HttpServerTransport.java | 48 +--------- 3 files changed, 99 insertions(+), 48 deletions(-) create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java index c0a354e71d481..276e0d8ce38af 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java @@ -19,7 +19,8 @@ import reactor.netty.http.server.HttpServerRequest; final class ReactorNetty4BaseHttpChannel { - private static final String CHANNEL_PROPERTY = "channel"; + static final String CHANNEL_PROPERTY = "channel"; + private static final String SSL_HANDLER_PROPERTY = "ssl_http"; private static final String SSL_ENGINE_PROPERTY = "ssl_engine"; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java new file mode 100644 index 0000000000000..48be5ae3e3d35 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.http.HttpChannel; +import org.opensearch.http.HttpResponse; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; +import java.util.Optional; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; + +import static org.opensearch.http.reactor.netty4.ReactorNetty4BaseHttpChannel.CHANNEL_PROPERTY; + +/** + * A Reactor Netty 4 implementation of an {@link HttpChannel}. + */ +public class ReactorNetty4HttpChannel implements HttpChannel { + private final Channel channel; + private final ChannelPipeline inboundPipeline; + private final CompletableContext closeContext = new CompletableContext<>(); + + ReactorNetty4HttpChannel(Channel channel) { + this.channel = channel; + this.inboundPipeline = channel.pipeline(); + Netty4Utils.addListener(this.channel.closeFuture(), closeContext); + } + + @Override + public void sendResponse(HttpResponse response, ActionListener listener) { + channel.writeAndFlush(response, Netty4Utils.addPromise(listener, channel)); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) channel.remoteAddress(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + public ChannelPipeline inboundPipeline() { + return inboundPipeline; + } + + public Channel getNettyChannel() { + return channel; + } + + @SuppressWarnings("unchecked") + @Override + public Optional get(String name, Class clazz) { + if (CHANNEL_PROPERTY.equalsIgnoreCase(name) && clazz.isAssignableFrom(Channel.class)) { + return (Optional) Optional.of(getNettyChannel()); + } + + Object handler = getNettyChannel().pipeline().get(name); + + if (handler == null && inboundPipeline() != null) { + handler = inboundPipeline().get(name); + } + + if (handler != null && clazz.isInstance(handler) == true) { + return Optional.of((T) handler); + } + + return Optional.empty(); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index 610facb6df3a0..1596be7470035 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -10,7 +10,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.common.Nullable; -import org.opensearch.common.concurrent.CompletableContext; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -20,14 +19,12 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.io.IOUtils; import org.opensearch.common.util.net.NetUtils; -import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.AbstractHttpServerTransport; import org.opensearch.http.HttpChannel; import org.opensearch.http.HttpReadTimeoutException; -import org.opensearch.http.HttpResponse; import org.opensearch.http.HttpServerChannel; import org.opensearch.http.reactor.netty4.ssl.SslUtils; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; @@ -484,50 +481,7 @@ private ChannelInitializer(ReactorNetty4HttpServerTransport transport) { @Override public void onChannelInit(ConnectionObserver connectionObserver, Channel channel, SocketAddress remoteAddress) { - transport.serverAcceptedChannel(new ClosableHttpChannelWrapper(channel)); - } - } - - /** - * A wrapper to close the underlying Netty channel. - */ - private static class ClosableHttpChannelWrapper implements HttpChannel { - private final Channel channel; - private final CompletableContext closeContext = new CompletableContext<>(); - - private ClosableHttpChannelWrapper(Channel channel) { - this.channel = channel; - Netty4Utils.addListener(this.channel.closeFuture(), closeContext); - } - - @Override - public void sendResponse(HttpResponse response, ActionListener listener) { - channel.writeAndFlush(response, Netty4Utils.addPromise(listener, channel)); - } - - @Override - public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) channel.localAddress(); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) channel.remoteAddress(); - } - - @Override - public void close() { - channel.close(); - } - - @Override - public void addCloseListener(ActionListener listener) { - closeContext.addListener(ActionListener.toBiConsumer(listener)); - } - - @Override - public boolean isOpen() { - return channel.isOpen(); + transport.serverAcceptedChannel(new ReactorNetty4HttpChannel(channel)); } } From e8f44c5cb600b15ca45535eb57a77803c9de128c Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Thu, 27 Nov 2025 14:55:24 +0100 Subject: [PATCH 07/12] Add missing javadocs Signed-off-by: Sergei Ustimenko --- .../reactor/netty4/ReactorNetty4HttpChannel.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java index 48be5ae3e3d35..21c75d18756ca 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java @@ -30,7 +30,11 @@ public class ReactorNetty4HttpChannel implements HttpChannel { private final ChannelPipeline inboundPipeline; private final CompletableContext closeContext = new CompletableContext<>(); - ReactorNetty4HttpChannel(Channel channel) { + /** + * Construct a new {@link ReactorNetty4HttpChannel} + * @param channel the underlying Netty channel + */ + public ReactorNetty4HttpChannel(Channel channel) { this.channel = channel; this.inboundPipeline = channel.pipeline(); Netty4Utils.addListener(this.channel.closeFuture(), closeContext); @@ -66,10 +70,18 @@ public boolean isOpen() { return channel.isOpen(); } + /** + * Get the inbound pipeline associated with this channel + * @return the inbound pipeline + */ public ChannelPipeline inboundPipeline() { return inboundPipeline; } + /** + * Get the underlying Netty channel + * @return the Netty channel + */ public Channel getNettyChannel() { return channel; } From 77606c7b1046bfa0c3634927be243e20da1b3943 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Thu, 27 Nov 2025 18:15:19 +0100 Subject: [PATCH 08/12] Hook into AbstractHttpServerTransport directly from Reactor Request Consumers Signed-off-by: Sergei Ustimenko --- .../netty4/ReactorNetty4HttpChannel.java | 108 ------------------ .../ReactorNetty4HttpServerTransport.java | 35 ++---- ...ctorNetty4NonStreamingRequestConsumer.java | 6 +- ...ReactorNetty4StreamingRequestConsumer.java | 7 +- 4 files changed, 21 insertions(+), 135 deletions(-) delete mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java deleted file mode 100644 index 21c75d18756ca..0000000000000 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannel.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.http.reactor.netty4; - -import org.opensearch.common.concurrent.CompletableContext; -import org.opensearch.core.action.ActionListener; -import org.opensearch.http.HttpChannel; -import org.opensearch.http.HttpResponse; -import org.opensearch.transport.reactor.netty4.Netty4Utils; - -import java.net.InetSocketAddress; -import java.util.Optional; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelPipeline; - -import static org.opensearch.http.reactor.netty4.ReactorNetty4BaseHttpChannel.CHANNEL_PROPERTY; - -/** - * A Reactor Netty 4 implementation of an {@link HttpChannel}. - */ -public class ReactorNetty4HttpChannel implements HttpChannel { - private final Channel channel; - private final ChannelPipeline inboundPipeline; - private final CompletableContext closeContext = new CompletableContext<>(); - - /** - * Construct a new {@link ReactorNetty4HttpChannel} - * @param channel the underlying Netty channel - */ - public ReactorNetty4HttpChannel(Channel channel) { - this.channel = channel; - this.inboundPipeline = channel.pipeline(); - Netty4Utils.addListener(this.channel.closeFuture(), closeContext); - } - - @Override - public void sendResponse(HttpResponse response, ActionListener listener) { - channel.writeAndFlush(response, Netty4Utils.addPromise(listener, channel)); - } - - @Override - public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) channel.localAddress(); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) channel.remoteAddress(); - } - - @Override - public void close() { - channel.close(); - } - - @Override - public void addCloseListener(ActionListener listener) { - closeContext.addListener(ActionListener.toBiConsumer(listener)); - } - - @Override - public boolean isOpen() { - return channel.isOpen(); - } - - /** - * Get the inbound pipeline associated with this channel - * @return the inbound pipeline - */ - public ChannelPipeline inboundPipeline() { - return inboundPipeline; - } - - /** - * Get the underlying Netty channel - * @return the Netty channel - */ - public Channel getNettyChannel() { - return channel; - } - - @SuppressWarnings("unchecked") - @Override - public Optional get(String name, Class clazz) { - if (CHANNEL_PROPERTY.equalsIgnoreCase(name) && clazz.isAssignableFrom(Channel.class)) { - return (Optional) Optional.of(getNettyChannel()); - } - - Object handler = getNettyChannel().pipeline().get(name); - - if (handler == null && inboundPipeline() != null) { - handler = inboundPipeline().get(name); - } - - if (handler != null && clazz.isInstance(handler) == true) { - return Optional.of((T) handler); - } - - return Optional.empty(); - } -} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index 1596be7470035..78e5edf48cc57 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -39,14 +39,12 @@ import javax.net.ssl.KeyManagerFactory; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.SocketOption; import java.time.Duration; import java.util.Arrays; import java.util.Optional; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.http.DefaultLastHttpContent; @@ -64,8 +62,6 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.netty.ChannelPipelineConfigurer; -import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; import reactor.netty.http.HttpProtocol; import reactor.netty.http.server.HttpServer; @@ -268,7 +264,6 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti private HttpServer configure(final HttpServer server) throws Exception { HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)) - .doOnChannelInit(new ChannelInitializer(this)) .childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) { @@ -353,6 +348,17 @@ private HttpServer configure(final HttpServer server) throws Exception { return configured; } + /** + * An override to be able to keep track of accepted channels by the + * {@link ReactorNetty4NonStreamingRequestConsumer} and {@link ReactorNetty4StreamingRequestConsumer} + * + * @param httpChannel the accepted channel + */ + @Override + public void serverAcceptedChannel(HttpChannel httpChannel) { + super.serverAcceptedChannel(httpChannel); + } + /** * Handles incoming Reactor Netty request * @@ -376,6 +382,7 @@ protected Publisher incomingRequest(HttpServerRequest request, HttpServerR ); if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) { final ReactorNetty4StreamingRequestConsumer consumer = new ReactorNetty4StreamingRequestConsumer<>( + this, request, response ); @@ -467,22 +474,4 @@ public void onException(HttpChannel channel, Exception cause) { } } - /** - * A pipeline configurer to be added as a part of channel initialization process - * to register accepted channels with the transport. This allows the transport - * to properly track open channels. - */ - private static class ChannelInitializer implements ChannelPipelineConfigurer { - private final ReactorNetty4HttpServerTransport transport; - - private ChannelInitializer(ReactorNetty4HttpServerTransport transport) { - this.transport = transport; - } - - @Override - public void onChannelInit(ConnectionObserver connectionObserver, Channel channel, SocketAddress remoteAddress) { - transport.serverAcceptedChannel(new ReactorNetty4HttpChannel(channel)); - } - } - } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java index c09e7755b1670..177567dc7f660 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java @@ -8,7 +8,6 @@ package org.opensearch.http.reactor.netty4; -import org.opensearch.http.AbstractHttpServerTransport; import org.opensearch.http.HttpRequest; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,12 +29,12 @@ class ReactorNetty4NonStreamingRequestConsumer implements private final HttpServerResponse response; private final CompositeByteBuf content; private final Publisher publisher; - private final AbstractHttpServerTransport transport; + private final ReactorNetty4HttpServerTransport transport; private final AtomicBoolean disposed = new AtomicBoolean(false); private volatile FluxSink emitter; ReactorNetty4NonStreamingRequestConsumer( - AbstractHttpServerTransport transport, + ReactorNetty4HttpServerTransport transport, HttpServerRequest request, HttpServerResponse response, int maxCompositeBufferComponents @@ -73,6 +72,7 @@ void process(HttpContent in, FluxSink emitter) { final HttpRequest r = createRequest(request, content); try { + transport.serverAcceptedChannel(channel); transport.incomingRequest(r, channel); } catch (Exception ex) { emitter.error(ex); diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java index 0559f89478838..1bba5d6fa9041 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -24,9 +24,14 @@ class ReactorNetty4StreamingRequestConsumer implements Co private final ReactorNetty4StreamingResponseProducer sender; private final StreamingHttpChannel httpChannel; - ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) { + ReactorNetty4StreamingRequestConsumer( + ReactorNetty4HttpServerTransport transport, + HttpServerRequest request, + HttpServerResponse response + ) { this.sender = new ReactorNetty4StreamingResponseProducer(); this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender); + transport.serverAcceptedChannel(httpChannel); } @Override From d8ed05b38388429bf578c9a15805b8c95577acbd Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 27 Nov 2025 15:07:29 -0500 Subject: [PATCH 09/12] Make sure the CloseContext is always called even if the channel is disposed already Signed-off-by: Andriy Redko --- .../netty4/ReactorNetty4NonStreamingHttpChannel.java | 8 +++++++- .../reactor/netty4/ReactorNetty4StreamingHttpChannel.java | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java index 6be38899ca71a..35e827374f3a8 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java @@ -46,7 +46,13 @@ public boolean isOpen() { @Override public void close() { - request.withConnection(connection -> connection.channel().close()); + request.withConnection(connection -> { + if (closeContext.isDone() == false) { + Netty4Utils.addListener(connection.channel().close(), closeContext); + } else { + connection.channel().close(); + } + }); } @Override diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java index 44e765c5d0bcf..efdfa15492a9a 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -56,7 +56,13 @@ public boolean isOpen() { @Override public void close() { - request.withConnection(connection -> connection.channel().close()); + request.withConnection(connection -> { + if (closeContext.isDone() == false) { + Netty4Utils.addListener(connection.channel().close(), closeContext); + } else { + connection.channel().close(); + } + }); } @Override From 84ee3325550b0f701a3a68eb812c82e05d43bd53 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Thu, 27 Nov 2025 22:33:03 +0100 Subject: [PATCH 10/12] Remove extra code Signed-off-by: Sergei Ustimenko --- .../netty4/ReactorNetty4BaseHttpChannel.java | 3 +- .../transport/reactor/netty4/Netty4Utils.java | 28 ------------------- 2 files changed, 1 insertion(+), 30 deletions(-) diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java index 276e0d8ce38af..c0a354e71d481 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4BaseHttpChannel.java @@ -19,8 +19,7 @@ import reactor.netty.http.server.HttpServerRequest; final class ReactorNetty4BaseHttpChannel { - static final String CHANNEL_PROPERTY = "channel"; - + private static final String CHANNEL_PROPERTY = "channel"; private static final String SSL_HANDLER_PROPERTY = "ssl_http"; private static final String SSL_ENGINE_PROPERTY = "ssl_engine"; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java index 3d40a1477933a..8ec432b7dd5cd 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java @@ -12,7 +12,6 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.common.Booleans; import org.opensearch.common.concurrent.CompletableContext; -import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; @@ -26,9 +25,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPromise; import io.netty.util.NettyRuntime; /** @@ -142,29 +139,4 @@ public static void addListener(ChannelFuture channelFuture, CompletableContext listener, Channel channel) { - ChannelPromise writePromise = channel.newPromise(); - writePromise.addListener(f -> { - if (f.isSuccess()) { - listener.onResponse(null); - } else { - final Throwable cause = f.cause(); - ExceptionsHelper.maybeDieOnAnotherThread(cause); - if (cause instanceof Error) { - listener.onFailure(new Exception(cause)); - } else { - listener.onFailure((Exception) cause); - } - } - }); - return writePromise; - } } From d91400a1b14aab6ee05cb662cfe234e01530eeea Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Fri, 28 Nov 2025 01:13:34 +0100 Subject: [PATCH 11/12] Add more coverage for streaming and non-streaming netty connections Signed-off-by: Sergei Ustimenko --- ...tty4HttpServerTransportStreamingTests.java | 117 ++++++++++++------ ...ReactorNetty4HttpServerTransportTests.java | 61 +++++++++ 2 files changed, 141 insertions(+), 37 deletions(-) diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java index 0e9b3caad9aac..715f0191fd851 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java @@ -101,7 +101,86 @@ public void testRequestResponseStreaming() throws InterruptedException { final String url = "/stream/"; final ToXContent[] chunks = newChunks(responseString); - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(url, responseString); + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + Settings.EMPTY, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (ReactorHttpClient client = ReactorHttpClient.create(false)) { + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + byte[] bytes = new byte[response.content().readableBytes()]; + response.content().readBytes(bytes); + assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(Arrays.stream(newChunks(responseString)).map(s -> { + try (XContentBuilder builder = XContentType.JSON.contentBuilder()) { + return s.toXContent(builder, ToXContent.EMPTY_PARAMS).toString(); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }).collect(Collectors.joining("\r\n", "", "\r\n")))); + } finally { + response.release(); + } + } + } + } + + public void testConnectionsGettingClosedForStreamingRequests() throws InterruptedException { + final String responseString = randomAlphaOfLength(4 * 1024); + final String url = "/stream/"; + + final ToXContent[] chunks = newChunks(responseString); + final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(url, responseString); + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + Settings.EMPTY, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + NoopTracer.INSTANCE + ); + ReactorHttpClient client = ReactorHttpClient.create(false) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + long numRequests = randomLongBetween(5L, 15L); + for (int i = 0; i < numRequests; i++) { + logger.info("Sending request {}/{}", i + 1, numRequests); + final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + } finally { + response.release(); + } + } + assertThat(transport.stats().getServerOpen(), equalTo(0L)); + assertThat(transport.stats().getTotalOpen(), equalTo(numRequests)); + } + } + + private HttpServerTransport.Dispatcher createStreamingDispatcher(String url, String responseString) { + return new HttpServerTransport.Dispatcher() { @Override public Optional dispatchHandler(String uri, String rawPath, Method method, Map params) { return Optional.of(new RestHandler() { @@ -161,42 +240,6 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } }; - - try ( - ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( - Settings.EMPTY, - networkService, - bigArrays, - threadPool, - xContentRegistry(), - dispatcher, - clusterSettings, - new SharedGroupFactory(Settings.EMPTY), - NoopTracer.INSTANCE - ) - ) { - transport.start(); - final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - - try (ReactorHttpClient client = ReactorHttpClient.create(false)) { - HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); - final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); - try { - assertThat(response.status(), equalTo(HttpResponseStatus.OK)); - byte[] bytes = new byte[response.content().readableBytes()]; - response.content().readBytes(bytes); - assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(Arrays.stream(newChunks(responseString)).map(s -> { - try (XContentBuilder builder = XContentType.JSON.contentBuilder()) { - return s.toXContent(builder, ToXContent.EMPTY_PARAMS).toString(); - } catch (final IOException ex) { - throw new UncheckedIOException(ex); - } - }).collect(Collectors.joining("\r\n", "", "\r\n")))); - } finally { - response.release(); - } - } - } } private static ToXContent[] newChunks(final String responseString) { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java index c6aa98da07a45..72d645aaf8022 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java @@ -132,6 +132,7 @@ public void shutdown() throws Exception { /** * Test that {@link ReactorNetty4HttpServerTransport} supports the "Expect: 100-continue" HTTP header + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectContinueHeader() throws InterruptedException { @@ -144,6 +145,7 @@ public void testExpectContinueHeader() throws InterruptedException { * Test that {@link ReactorNetty4HttpServerTransport} responds to a * 100-continue expectation with too large a content-length * with a 413 status. + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException { @@ -156,6 +158,7 @@ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedExc /** * Test that {@link ReactorNetty4HttpServerTransport} responds to an unsupported expectation with a 417 status. + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectUnsupportedExpectation() throws InterruptedException { @@ -436,6 +439,64 @@ private long getHugeAllocationCount() { return numOfHugAllocations; } + public void testConnectionsGettingClosed() throws InterruptedException { + final String responseString = "ok"; + final String url = "/thing/"; + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + if (url.equals(request.uri())) { + channel.sendResponse(new BytesRestResponse(OK, responseString)); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); + throw new AssertionError(); + } + } + + @Override + public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(cause); + } + }; + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + Settings.EMPTY, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + NoopTracer.INSTANCE + ); + ReactorHttpClient client = ReactorHttpClient.create() + ) { + transport.start(); + TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + long numRequests = randomLongBetween(5L, 15L); + + for (int i = 0; i < numRequests; i++) { + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + logger.info("Sending request {}/{}", i + 1, numRequests); + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + } finally { + response.release(); + } + } + + assertThat(transport.stats().getServerOpen(), equalTo(0L)); + assertThat(transport.stats().getTotalOpen(), equalTo(numRequests)); + } + } + public void testCorsRequest() throws InterruptedException { final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { From 93a3457c47fa03f22150f2d584310521c96b25e4 Mon Sep 17 00:00:00 2001 From: Sergei Ustimenko Date: Fri, 28 Nov 2025 15:58:16 +0100 Subject: [PATCH 12/12] Revert visibility modifier of the RestCancellableNodeClient#getNumTasks Signed-off-by: Sergei Ustimenko --- .../transport/netty4/Netty4HttpChannelsReleaseIntegTests.java | 1 - .../netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java | 1 - .../org/opensearch/rest/action/RestCancellableNodeClient.java | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java index df29b8cfe2b53..dc23338e48720 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/netty4/Netty4HttpChannelsReleaseIntegTests.java @@ -68,7 +68,6 @@ public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws Interrupt // no channels get closed in this test, hence we expect as many channels as we created in the map assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels()); - assertEquals(0, RestCancellableNodeClient.getNumTasks()); } /** diff --git a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java index 92fb3a5ced9d1..5ccc1eae15a91 100644 --- a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java +++ b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChannelsReleaseIntegTests.java @@ -68,7 +68,6 @@ public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws Interrupt // no channels get closed in this test, hence we expect as many channels as we created in the map assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels()); - assertEquals(0, RestCancellableNodeClient.getNumTasks()); } /** diff --git a/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java b/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java index 18d822686db5c..f7705d69cf75a 100644 --- a/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java +++ b/server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java @@ -83,7 +83,7 @@ public static int getNumChannels() { /** * Returns the number of tasks tracked globally. */ - public static int getNumTasks() { + static int getNumTasks() { return httpChannels.values().stream().mapToInt(CloseListener::getNumTasks).sum(); }