Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Fixed
- Add task cancellation checks in aggregators ([#18426](https://github.com/opensearch-project/OpenSearch/pull/18426))
- Fix concurrent timings in profiler ([#18540](https://github.com/opensearch-project/OpenSearch/pull/18540))
- Cannot communicate with HTTP/2 when reactor-netty is enabled ([#18599](https://github.com/opensearch-project/OpenSearch/pull/18599))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.http.reactor.netty4;

import org.opensearch.OpenSearchException;
import org.opensearch.common.Nullable;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
Expand All @@ -27,35 +28,35 @@
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider.SecureHttpTransportParameters;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.reactor.SharedGroupFactory;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSessionContext;
import javax.net.ssl.KeyManagerFactory;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.ssl.ApplicationProtocolNegotiator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand Down Expand Up @@ -306,59 +307,33 @@

// Configure SSL context if available
if (secureHttpTransportSettingsProvider != null) {
final SSLEngine engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, this)
.orElseGet(SslUtils::createDefaultServerSSLEngine);

try {
final List<String> cipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
final List<String> applicationProtocols = Arrays.asList(engine.getSSLParameters().getApplicationProtocols());

configured = configured.secure(spec -> spec.sslContext(new SslContext() {
@Override
public SSLSessionContext sessionContext() {
throw new UnsupportedOperationException(); /* server only, should never be called */
}

@Override
public SSLEngine newEngine(ByteBufAllocator alloc, String peerHost, int peerPort) {
throw new UnsupportedOperationException(); /* server only, should never be called */
}

@Override
public SSLEngine newEngine(ByteBufAllocator alloc) {
try {
return secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(
settings,
ReactorNetty4HttpServerTransport.this
).orElseGet(SslUtils::createDefaultServerSSLEngine);
} catch (final SSLException ex) {
throw new UnsupportedOperationException("Unable to create SSLEngine", ex);
}
}

@Override
public boolean isClient() {
return false; /* server only */
}

@Override
public List<String> cipherSuites() {
return cipherSuites;
}
final Optional<SecureHttpTransportParameters> parameters = secureHttpTransportSettingsProvider.parameters(settings);

final KeyManagerFactory keyManagerFactory = parameters.flatMap(SecureHttpTransportParameters::keyManagerFactory)
.orElseThrow(() -> new OpenSearchException("The KeyManagerFactory instance is not provided"));

final SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManagerFactory);
parameters.flatMap(SecureHttpTransportParameters::trustManagerFactory).ifPresent(sslContextBuilder::trustManager);
parameters.map(SecureHttpTransportParameters::cipherSuites)
.ifPresent(ciphers -> sslContextBuilder.ciphers(ciphers, SupportedCipherSuiteFilter.INSTANCE));

final SslContext sslContext = sslContextBuilder.protocols(
parameters.map(SecureHttpTransportParameters::protocols).orElseGet(() -> Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS))
)
.applicationProtocolConfig(
new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
// NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers.
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
// ACCEPT is currently the only mode supported by both OpenSsl and JDK providers.
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2,
ApplicationProtocolNames.HTTP_1_1
)
)
.build();

@Override
public ApplicationProtocolNegotiator applicationProtocolNegotiator() {
return new ApplicationProtocolNegotiator() {
@Override
public List<String> protocols() {
return applicationProtocols;
}
};
}
}).build()).protocol(HttpProtocol.HTTP11, HttpProtocol.H2);
} finally {
ReferenceCountUtil.release(engine);
}
configured = configured.secure(spec -> spec.sslContext(sslContext)).protocol(HttpProtocol.HTTP11, HttpProtocol.H2);
} else {
configured = configured.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);
}
Expand All @@ -373,6 +348,12 @@
* @return response publisher
*/
protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerResponse response) {
// At least now, Reactor Netty does not respect maxInitialLineLength setting for HTTP/2 (but
// does respect it for H2C and HTTP/1.1)
if (request.uri().length() > maxInitialLineLength.bytesAsInt()) {
return response.status(HttpResponseStatus.REQUEST_URI_TOO_LONG).send();

Check warning on line 354 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L354

Added line #L354 was not covered by tests
}

final Method method = HttpConversionUtil.convertMethod(request.method());
final Optional<RestHandler> dispatchHandlerOpt = dispatcher.dispatchHandler(
request.uri(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
* Helper class for creating default SSL engines
*/
public class SslUtils {
private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };
/**
* Default support TLS protocols
*/
public static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };

private SslUtils() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import reactor.core.publisher.ParallelFlux;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;

import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
Expand All @@ -65,6 +66,7 @@
public class ReactorHttpClient implements Closeable {
private final boolean compression;
private final boolean secure;
private final boolean useHttp11Only;

static Collection<String> returnHttpResponseBodies(Collection<FullHttpResponse> responses) {
List<String> list = new ArrayList<>(responses.size());
Expand All @@ -85,6 +87,7 @@ static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses
public ReactorHttpClient(boolean compression, boolean secure) {
this.compression = compression;
this.secure = secure;
this.useHttp11Only = OpenSearchTestCase.randomBoolean();
}

public static ReactorHttpClient create() {
Expand Down Expand Up @@ -265,25 +268,36 @@ private HttpClient createClient(final InetSocketAddress remoteAddress, final Nio
.runOn(eventLoopGroup)
.host(remoteAddress.getHostString())
.port(remoteAddress.getPort())
.compress(compression);
.compress(compression)
.protocol(HttpProtocol.H2, HttpProtocol.HTTP11);

if (secure) {
return client.secure(
spec -> spec.sslContext(
OpenSearchTestCase.randomBoolean()
/* switch between HTTP 1.1/HTTP 2 randomly, both are supported */ ? Http11SslContextSpec.forClient()
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
: Http2SslContextSpec.forClient()
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
)
return client.protocol(
useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 }
)
.secure(
spec -> spec.sslContext(
useHttp11Only
/* switch between HTTP 1.1/HTTP 2 randomly, both are supported */
? Http11SslContextSpec.forClient()
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
: Http2SslContextSpec.forClient()
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
)
);
} else {
return client.protocol(
useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C }
);
}

return client;
}

@Override
public void close() {

}

public boolean useHttp11only() {
return useHttp11Only;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,13 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
final HttpContent continuationRequest = new DefaultHttpContent(Unpooled.EMPTY_BUFFER);
final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), request, continuationRequest);
try {
assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done"));
if (expectedStatus == HttpResponseStatus.EXPECTATION_FAILED && client.useHttp11only() == false) {
assertThat(continuationResponse.status(), is(HttpResponseStatus.EXPECTATION_FAILED));
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is(""));
} else {
assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done"));
}
} finally {
continuationResponse.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -80,6 +83,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

Expand Down Expand Up @@ -108,7 +112,45 @@ public void setup() throws Exception {
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

var keyManagerFactory = KeyManagerFactory.getInstance("PKIX");
keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(), KEYSTORE_PASSWORD);

secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() {
@Override
public Optional<SecureHttpTransportParameters> parameters(Settings settings) {
return Optional.of(new SecureHttpTransportParameters() {
@Override
public Optional<KeyManagerFactory> keyManagerFactory() {
return Optional.of(keyManagerFactory);
}

@Override
public Optional<String> sslProvider() {
return Optional.empty();
}

@Override
public Optional<String> clientAuth() {
return Optional.empty();
}

@Override
public Collection<String> protocols() {
return Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS);
}

@Override
public Collection<String> cipherSuites() {
return Http2SecurityUtil.CIPHERS;
}

@Override
public Optional<TrustManagerFactory> trustManagerFactory() {
return Optional.of(InsecureTrustManagerFactory.INSTANCE);
}
});
}

@Override
public Optional<TransportExceptionHandler> buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) {
return Optional.empty();
Expand All @@ -117,8 +159,6 @@ public Optional<TransportExceptionHandler> buildHttpServerExceptionHandler(Setti
@Override
public Optional<SSLEngine> buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException {
try {
var keyManagerFactory = KeyManagerFactory.getInstance("PKIX");
keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(), KEYSTORE_PASSWORD);
SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build()
Expand Down
Loading