Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix ClassCastException in FlightClientChannel for requests larger than 16KB ([#20010](https://github.com/opensearch-project/OpenSearch/pull/20010))
- Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937))
- Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948))
- Keep track and release Reactor Netty 4 Transport accepted Http Channels during the Node shutdown ([#20106](https://github.com/opensearch-project/OpenSearch/pull/20106))
- Fix deletion failure/error of unused index template; case when an index template matches a data stream but has a lower priority. ([#20102](https://github.com/opensearch-project/OpenSearch/pull/20102))
- Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([20105](https://github.com/opensearch-project/OpenSearch/pull/20105))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.netty4;

import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.rest.action.RestCancellableNodeClient;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;

@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4HttpChannelsReleaseIntegTests extends OpenSearchNetty4IntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

private ThreadPool threadPool;

@Before
public void createThreadPool() {
threadPool = new TestThreadPool(getClass().getName());
}

@After
public void stopThreadPool() {
ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS);
}

public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws InterruptedException {
String testIndex = "test_idx";
assertAcked(client().admin().indices().prepareCreate(testIndex));

int initialHttpChannels = RestCancellableNodeClient.getNumChannels();
int numChannels = randomIntBetween(50, 100);
CountDownLatch countDownLatch = new CountDownLatch(numChannels);
for (int i = 0; i < numChannels; i++) {
threadPool.generic().execute(() -> {
executeRequest(testIndex);
countDownLatch.countDown();
});
}
countDownLatch.await();

// no channels get closed in this test, hence we expect as many channels as we created in the map
assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels());
assertEquals(0, RestCancellableNodeClient.getNumTasks());
}

/**
* Execute a Search request against the given index. The Search requests are tracked
* by the RestCancellableNodeClient to verify that channels are released properly.
*
* @param index the index to search against
*/
private static void executeRequest(String index) {
try {
Request request = new Request("GET", "/" + index + "/_search");
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(new MatchAllQueryBuilder());
request.setJsonEntity(Strings.toString(MediaTypeRegistry.JSON, searchSource));
Response response = getRestClient().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
} catch (IOException e) {
throw new IllegalStateException("Failed to execute the request", e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.OpenSearchReactorNetty4IntegTestCase;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.rest.action.RestCancellableNodeClient;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;

@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class ReactorNetty4HttpChannelsReleaseIntegTests extends OpenSearchReactorNetty4IntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

private ThreadPool threadPool;

@Before
public void createThreadPool() {
threadPool = new TestThreadPool(getClass().getName());
}

@After
public void stopThreadPool() {
ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS);
}

public void testAcceptedChannelsGetCleanedUpOnTheNodeShutdown() throws InterruptedException {
String testIndex = "test_idx";
assertAcked(client().admin().indices().prepareCreate(testIndex));

int initialHttpChannels = RestCancellableNodeClient.getNumChannels();
int numChannels = randomIntBetween(50, 100);
CountDownLatch countDownLatch = new CountDownLatch(numChannels);
for (int i = 0; i < numChannels; i++) {
threadPool.generic().execute(() -> {
executeRequest(testIndex);
countDownLatch.countDown();
});
}
countDownLatch.await();

// no channels get closed in this test, hence we expect as many channels as we created in the map
assertEquals("All channels remain open", initialHttpChannels + numChannels, RestCancellableNodeClient.getNumChannels());
assertEquals(0, RestCancellableNodeClient.getNumTasks());
}

/**
* Execute a Search request against the given index. The Search requests are tracked
* by the RestCancellableNodeClient to verify that channels are released properly.
*
* @param index the index to search against
*/
private static void executeRequest(String index) {
try {
Request request = new Request("GET", "/" + index + "/_search");
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(new MatchAllQueryBuilder());
request.setJsonEntity(Strings.toString(MediaTypeRegistry.JSON, searchSource));
Response response = getRestClient().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
} catch (IOException e) {
throw new IllegalStateException("Failed to execute the request", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ public void close() {

@Override
public String toString() {
return "ReactorNetty4HttpChannel{localAddress=" + getLocalAddress() + "}";
return "ReactorNetty4HttpServerChannel{localAddress=" + getLocalAddress() + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor

/**
* Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
*
* @param settings settings
* @param networkService network service
* @param bigArrays big array allocator
Expand Down Expand Up @@ -194,6 +195,7 @@ public ReactorNetty4HttpServerTransport(

/**
* Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
*
* @param settings settings
* @param networkService network service
* @param bigArrays big array allocator
Expand Down Expand Up @@ -232,6 +234,7 @@ public ReactorNetty4HttpServerTransport(

/**
* Binds the transport engine to the socket address
*
* @param socketAddress socket address to bind to
*/
@Override
Expand Down Expand Up @@ -345,8 +348,20 @@ private HttpServer configure(final HttpServer server) throws Exception {
return configured;
}

/**
* An override to be able to keep track of accepted channels by the
* {@link ReactorNetty4NonStreamingRequestConsumer} and {@link ReactorNetty4StreamingRequestConsumer}
*
* @param httpChannel the accepted channel
*/
@Override
public void serverAcceptedChannel(HttpChannel httpChannel) {
super.serverAcceptedChannel(httpChannel);
}

/**
* Handles incoming Reactor Netty request
*
* @param request request instance
* @param response response instances
* @return response publisher
Expand All @@ -367,6 +382,7 @@ protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerR
);
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(
this,
request,
response
);
Expand Down Expand Up @@ -457,4 +473,5 @@ public void onException(HttpChannel channel, Exception cause) {
super.onException(channel, cause);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ public boolean isOpen() {

@Override
public void close() {
request.withConnection(connection -> connection.channel().close());
request.withConnection(connection -> {
if (closeContext.isDone() == false) {
Netty4Utils.addListener(connection.channel().close(), closeContext);
} else {
connection.channel().close();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.http.reactor.netty4;

import org.opensearch.http.AbstractHttpServerTransport;
import org.opensearch.http.HttpRequest;

import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,12 +29,12 @@ class ReactorNetty4NonStreamingRequestConsumer<T extends HttpContent> implements
private final HttpServerResponse response;
private final CompositeByteBuf content;
private final Publisher<HttpContent> publisher;
private final AbstractHttpServerTransport transport;
private final ReactorNetty4HttpServerTransport transport;
private final AtomicBoolean disposed = new AtomicBoolean(false);
private volatile FluxSink<HttpContent> emitter;

ReactorNetty4NonStreamingRequestConsumer(
AbstractHttpServerTransport transport,
ReactorNetty4HttpServerTransport transport,
HttpServerRequest request,
HttpServerResponse response,
int maxCompositeBufferComponents
Expand Down Expand Up @@ -73,6 +72,7 @@ void process(HttpContent in, FluxSink<HttpContent> emitter) {
final HttpRequest r = createRequest(request, content);

try {
transport.serverAcceptedChannel(channel);
transport.incomingRequest(r, channel);
} catch (Exception ex) {
emitter.error(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ public boolean isOpen() {

@Override
public void close() {
request.withConnection(connection -> connection.channel().close());
request.withConnection(connection -> {
if (closeContext.isDone() == false) {
Netty4Utils.addListener(connection.channel().close(), closeContext);
} else {
connection.channel().close();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Co
private final ReactorNetty4StreamingResponseProducer sender;
private final StreamingHttpChannel httpChannel;

ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) {
ReactorNetty4StreamingRequestConsumer(
ReactorNetty4HttpServerTransport transport,
HttpServerRequest request,
HttpServerResponse response
) {
this.sender = new ReactorNetty4StreamingResponseProducer();
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
transport.serverAcceptedChannel(httpChannel);
}

@Override
Expand Down
Loading
Loading