diff --git a/CHANGELOG.md b/CHANGELOG.md index 00917a56bbb5c..9e349b3ab01db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601)) - Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963)) - Cache the `StoredFieldsReader` for scroll query optimization ([#20112](https://github.com/opensearch-project/OpenSearch/pull/20112)) +- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)] +- Support for HTTP/3 (server side) ([#20017](https://github.com/opensearch-project/OpenSearch/pull/20017)) ### Changed - Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573)) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a4835c9f29ccc..4513de17b0631 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -149,6 +149,8 @@ netty-codec-dns = { group = "io.netty", name = "netty-codec-dns", version.ref = netty-codec-http = { group = "io.netty", name = "netty-codec-http", version.ref = "netty" } netty-codec-http2 = { group = "io.netty", name = "netty-codec-http2", version.ref = "netty" } netty-codec-http3 = { group = "io.netty", name = "netty-codec-http3", version.ref = "netty" } +netty-codec-native-quic = { group = "io.netty", name = "netty-codec-native-quic", version.ref = "netty" } +netty-codec-classes-quic = { group = "io.netty", name = "netty-codec-classes-quic", version.ref = "netty" } netty-common = { group = "io.netty", name = "netty-common", version.ref = "netty" } netty-handler = { group = "io.netty", name = "netty-handler", version.ref = "netty" } netty-resolver-dns = { group = "io.netty", name = "netty-resolver-dns", version.ref = "netty" } diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index ac918a23791f1..0ab3fe86b701f 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -63,14 +63,25 @@ dependencies { api "io.netty:netty-codec:${versions.netty}" api "io.netty:netty-codec-http:${versions.netty}" api "io.netty:netty-codec-http2:${versions.netty}" + api "io.netty:netty-codec-http3:${versions.netty}" + api "io.netty:netty-codec-classes-quic:${versions.netty}" + api "io.netty:netty-codec-native-quic:${versions.netty}" api "io.netty:netty-common:${versions.netty}" api "io.netty:netty-handler:${versions.netty}" api "io.netty:netty-resolver:${versions.netty}" api "io.netty:netty-transport:${versions.netty}" api "io.netty:netty-transport-native-unix-common:${versions.netty}" + testFipsRuntimeOnly "org.bouncycastle:bc-fips:${versions.bouncycastle_jce}" testFipsRuntimeOnly "org.bouncycastle:bctls-fips:${versions.bouncycastle_tls}" testFipsRuntimeOnly "org.bouncycastle:bcutil-fips:${versions.bouncycastle_util}" + + // Bundle all supported OSes and Archs + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:windows-x86_64" } restResources { @@ -205,7 +216,9 @@ thirdPartyAudit { 'io.netty.pkitesting.CertificateBuilder', 'io.netty.pkitesting.CertificateBuilder$Algorithm', - 'io.netty.pkitesting.X509Bundle' + 'io.netty.pkitesting.X509Bundle', + + 'io.netty.channel.epoll.SegmentedDatagramPacket' ) ignoreViolations( diff --git a/modules/transport-netty4/licenses/netty-codec-classes-quic-4.2.7.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-classes-quic-4.2.7.Final.jar.sha1 new file mode 100644 index 0000000000000..2758c952ac630 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-classes-quic-4.2.7.Final.jar.sha1 @@ -0,0 +1 @@ +98e80e8a575aa5cacd0db278a91fc4b34e4721fe \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-http3-4.2.7.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-http3-4.2.7.Final.jar.sha1 new file mode 100644 index 0000000000000..da9276cbfdd65 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-http3-4.2.7.Final.jar.sha1 @@ -0,0 +1 @@ +c9b6155713d6017a21e71dea3e041c4143facf5c \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-aarch_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..28c795d9fe9e5 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-aarch_64.jar.sha1 @@ -0,0 +1 @@ +7ecd8d1de6b3eb7eeb3dcdb1034780826e141f1c \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-x86_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-x86_64.jar.sha1 new file mode 100644 index 0000000000000..ac96bb86be54f --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-x86_64.jar.sha1 @@ -0,0 +1 @@ +6a3e4398852f926c4b22f381d89dae2388446ae8 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-aarch_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..4cf4316e6fe70 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-aarch_64.jar.sha1 @@ -0,0 +1 @@ +c30d746bd588c07bac4ac04abde576c05a3c0a28 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-x86_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-x86_64.jar.sha1 new file mode 100644 index 0000000000000..5abd9e612dd18 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-x86_64.jar.sha1 @@ -0,0 +1 @@ +f3679a6cc103292931098b1dc835ffb831e27368 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-windows-x86_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-windows-x86_64.jar.sha1 new file mode 100644 index 0000000000000..2fdbde31dcdd6 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final-windows-x86_64.jar.sha1 @@ -0,0 +1 @@ +c4f177d2a99668c209acc31b8b85df58e9166218 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final.jar.sha1 new file mode 100644 index 0000000000000..6525658f1e2a0 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.7.Final.jar.sha1 @@ -0,0 +1 @@ +d6a7141d5fab45c8d2f6c646965f2dd53a43b41e \ No newline at end of file diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java index b0bbe4b2fdf1a..c32ac37f903bf 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java @@ -219,6 +219,8 @@ public HttpVersion protocolVersion() { return HttpRequest.HttpVersion.HTTP_1_1; } else if (request.protocolVersion().equals("HTTP/2.0")) { return HttpRequest.HttpVersion.HTTP_2_0; + } else if (request.protocolVersion().equals("HTTP/3.0")) { + return HttpRequest.HttpVersion.HTTP_3_0; } else { throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion()); } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java index 1f7aaf17d2191..715426a4ac303 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java @@ -33,6 +33,7 @@ package org.opensearch.http.netty4; import org.opensearch.ExceptionsHelper; +import org.opensearch.http.AbstractHttpServerTransport; import org.opensearch.http.HttpPipelinedRequest; import io.netty.channel.ChannelHandler; @@ -42,9 +43,9 @@ @ChannelHandler.Sharable class Netty4HttpRequestHandler extends SimpleChannelInboundHandler { - private final Netty4HttpServerTransport serverTransport; + private final AbstractHttpServerTransport serverTransport; - Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport) { + Netty4HttpRequestHandler(AbstractHttpServerTransport serverTransport) { this.serverTransport = serverTransport; } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 7e2f3496e5c01..47d8e5ce8bc33 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -197,7 +197,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { /** * Creates new HTTP transport implementations based on Netty 4 - * @param settings seetings + * @param settings settings * @param networkService network service * @param bigArrays big array allocator * @param threadPool thread pool instance diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4QuicServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4QuicServerTransport.java new file mode 100644 index 0000000000000..3a8fcce54c803 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4QuicServerTransport.java @@ -0,0 +1,449 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.http.AbstractHttpServerTransport; +import org.opensearch.http.HttpChannel; +import org.opensearch.http.HttpHandlingSettings; +import org.opensearch.http.HttpReadTimeoutException; +import org.opensearch.http.HttpServerChannel; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider.SecureHttpTransportParameters; +import org.opensearch.plugins.TransportExceptionHandler; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.NettyAllocator; +import org.opensearch.transport.NettyByteBufSizer; +import org.opensearch.transport.SharedGroupFactory; +import org.opensearch.transport.netty4.Netty4Utils; + +import javax.net.ssl.KeyManagerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.handler.codec.compression.Brotli; +import io.netty.handler.codec.compression.CompressionOptions; +import io.netty.handler.codec.compression.DeflateOptions; +import io.netty.handler.codec.compression.GzipOptions; +import io.netty.handler.codec.compression.StandardCompressionOptions; +import io.netty.handler.codec.compression.ZstdEncoder; +import io.netty.handler.codec.http.HttpContentCompressor; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http3.Http3; +import io.netty.handler.codec.http3.Http3FrameToHttpObjectCodec; +import io.netty.handler.codec.http3.Http3ServerConnectionHandler; +import io.netty.handler.codec.quic.InsecureQuicTokenHandler; +import io.netty.handler.codec.quic.QuicChannel; +import io.netty.handler.codec.quic.QuicSslContext; +import io.netty.handler.codec.quic.QuicSslContextBuilder; +import io.netty.handler.codec.quic.QuicStreamChannel; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.AttributeKey; + +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; +import static org.opensearch.http.netty4.Netty4HttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS; +import static org.opensearch.http.netty4.Netty4HttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE; + +/** + * The HTTP/3 transport implementations based on Netty 4. + */ +public class Netty4QuicServerTransport extends AbstractHttpServerTransport { + private static final Logger logger = LogManager.getLogger(Netty4QuicServerTransport.class); + + /** + * Set the initial maximum data limit for local bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_LOCAL_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_local_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum data limit for remote bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_REMOTE_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_remote_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum stream limit for bidirectional streams. + * + * The HTTP/3 standard expects that each end configures at least 100 + * concurrent bidirectional streams at a time, to avoid reducing performance + * by reducing parallelism. + */ + public static final Setting SETTING_H3_MAX_STREAMS = Setting.longSetting("h3.max_streams", 100L, Property.NodeScope); + + private final ByteSizeValue maxInitialLineLength; + private final ByteSizeValue maxHeaderSize; + private final ByteSizeValue maxChunkSize; + + private final SharedGroupFactory sharedGroupFactory; + private final RecvByteBufAllocator recvByteBufAllocator; + private final int readTimeoutMillis; + + private final int maxCompositeBufferComponents; + + private volatile Bootstrap bootstrap; + private volatile SharedGroupFactory.SharedGroup sharedGroup; + private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + private final TransportExceptionHandler exceptionHandler; + + /** + * Creates new HTTP transport implementations based on Netty 4 + * @param settings settings + * @param networkService network service + * @param bigArrays big array allocator + * @param threadPool thread pool instance + * @param xContentRegistry XContent registry instance + * @param dispatcher dispatcher instance + * @param clusterSettings cluster settings + * @param sharedGroupFactory shared group factory + */ + public Netty4QuicServerTransport( + final Settings settings, + final NetworkService networkService, + final BigArrays bigArrays, + final ThreadPool threadPool, + final NamedXContentRegistry xContentRegistry, + final Dispatcher dispatcher, + final ClusterSettings clusterSettings, + final SharedGroupFactory sharedGroupFactory, + final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider, + final Tracer tracer + ) { + super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer); + Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings)); + NettyAllocator.logAllocatorDescriptionIfNeeded(); + this.sharedGroupFactory = sharedGroupFactory; + this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider; + this.exceptionHandler = secureHttpTransportSettingsProvider.buildHttpServerExceptionHandler(settings, this) + .orElse(TransportExceptionHandler.NOOP); + + this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); + this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); + this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings); + + this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); + + this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); + + ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings); + recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt()); + + logger.debug( + "using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " + + "receive_predictor[{}], max_composite_buffer_components[{}]", + maxChunkSize, + maxHeaderSize, + maxInitialLineLength, + maxContentLength, + receivePredictor, + maxCompositeBufferComponents + ); + } + + public Settings settings() { + return this.settings; + } + + @Override + protected void doStart() { + boolean success = false; + try { + sharedGroup = sharedGroupFactory.getHttpGroup(); + bootstrap = new Bootstrap(); + + bootstrap.group(sharedGroup.getLowLevelGroup()); + + // NettyAllocator will return the channel type designed to work with the configuredAllocator + bootstrap.channel(NioDatagramChannel.class); + + // Set the allocators for both the server channel and the child channels created + bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator(true)); + bootstrap.handler(configureServerChannelHandler()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); + + bindServer(); + success = true; + } finally { + if (success == false) { + doStop(); // otherwise we leak threads since we never moved to started + } + } + } + + @Override + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { + ChannelFuture future = bootstrap.bind(socketAddress).sync(); + Channel channel = future.channel(); + Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel); + channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel); + return httpServerChannel; + } + + @Override + protected void stopInternal() { + if (sharedGroup != null) { + sharedGroup.shutdown(); + sharedGroup = null; + } + } + + @Override + public void onException(HttpChannel channel, Exception cause) { + if (cause instanceof ReadTimeoutException) { + super.onException(channel, new HttpReadTimeoutException(readTimeoutMillis, cause)); + } else { + super.onException(channel, cause); + } + } + + public ChannelHandler configureServerChannelHandler() { + return new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + final Optional parameters = secureHttpTransportSettingsProvider.parameters(settings); + + final KeyManagerFactory keyManagerFactory = parameters.flatMap(SecureHttpTransportParameters::keyManagerFactory) + .orElseThrow(() -> new OpenSearchException("The KeyManagerFactory instance is not provided")); + + final QuicSslContextBuilder sslContextBuilder = QuicSslContextBuilder.forServer(keyManagerFactory, null); + + parameters.flatMap(SecureHttpTransportParameters::trustManagerFactory).ifPresent(sslContextBuilder::trustManager); + parameters.flatMap(SecureHttpTransportParameters::clientAuth) + .ifPresent(clientAuth -> sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth))); + + final QuicSslContext sslContext = sslContextBuilder.applicationProtocols( + io.netty.handler.codec.http3.Http3.supportedApplicationProtocols() + ).build(); + + ch.pipeline() + .addLast( + Http3.newQuicServerCodecBuilder() + .sslContext(sslContext) + .initialMaxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).getBytes()) + .initialMaxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(settings).getBytes()) + .initialMaxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(settings).getBytes()) + .initialMaxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(settings).longValue()) + .tokenHandler(InsecureQuicTokenHandler.INSTANCE) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(QuicChannel ch) { + // Called for each connection + ch.pipeline() + .addLast( + new Http3ServerConnectionHandler( + new HttpChannelHandler(Netty4QuicServerTransport.this, handlingSettings) + ) + ); + } + }) + .build() + ); + } + }; + } + + public static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-quic-channel"); + protected static final AttributeKey HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance( + "opensearch-quic-server-channel" + ); + + protected static class HttpChannelHandler extends ChannelInitializer { + + private final Netty4QuicServerTransport transport; + private final NettyByteBufSizer byteBufSizer; + private final Netty4HttpRequestCreator requestCreator; + private final Netty4HttpRequestHandler requestHandler; + private final Netty4HttpResponseCreator responseCreator; + private final HttpHandlingSettings handlingSettings; + + protected HttpChannelHandler(final Netty4QuicServerTransport transport, final HttpHandlingSettings handlingSettings) { + this.transport = transport; + this.handlingSettings = handlingSettings; + this.byteBufSizer = new NettyByteBufSizer(); + this.requestCreator = new Netty4HttpRequestCreator(); + this.requestHandler = new Netty4HttpRequestHandler(transport); + this.responseCreator = new Netty4HttpResponseCreator(); + } + + public ChannelHandler getRequestHandler() { + return requestHandler; + } + + @Override + protected void initChannel(QuicStreamChannel ch) throws Exception { + Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch); + ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel); + ch.pipeline().addLast("byte_buf_sizer", byteBufSizer); + ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)); + + configurePipeline(ch); + transport.serverAcceptedChannel(nettyHttpChannel); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + super.exceptionCaught(ctx, cause); + } + + protected void configurePipeline(Channel ch) { + ch.pipeline().addLast(new Http3FrameToHttpObjectCodec(true)); + ch.pipeline().addLast("header_verifier", transport.createHeaderVerifier()); + ch.pipeline().addLast("decoder_compress", transport.createDecompressor()); + final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength()); + aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); + ch.pipeline().addLast("aggregator", aggregator); + if (handlingSettings.isCompression()) { + ch.pipeline() + .addLast( + "encoder_compress", + new HttpContentCompressor(defaultCompressionOptions(handlingSettings.getCompressionLevel())) + ); + } + ch.pipeline().addLast("request_creator", requestCreator); + ch.pipeline().addLast("response_creator", responseCreator); + ch.pipeline().addLast("handler", requestHandler); + } + } + + @ChannelHandler.Sharable + private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter { + + private final Netty4QuicServerTransport transport; + + private ServerChannelExceptionHandler(Netty4QuicServerTransport transport) { + this.transport = transport; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + transport.onServerException(httpServerChannel, new Exception(cause)); + } else { + transport.onServerException(httpServerChannel, (Exception) cause); + } + } + } + + /** + * Extension point that allows a NetworkPlugin to extend the netty pipeline and inspect headers after request decoding + */ + protected ChannelInboundHandlerAdapter createHeaderVerifier() { + // pass-through + return new ChannelInboundHandlerAdapter(); + } + + /** + * Extension point that allows a NetworkPlugin to override the default netty HttpContentDecompressor and supply a custom decompressor. + * + * Used in instances to conditionally decompress depending on the outcome from header verification + */ + protected ChannelInboundHandlerAdapter createDecompressor() { + return new HttpContentDecompressor(); + } + + /** + * Copy of {@link HttpContentCompressor} default compression options with ZSTD excluded: + * although zstd-jni is on the classpath, {@link ZstdEncoder} requires direct buffers support + * which by default {@link NettyAllocator} does not provide. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @return default compression options + */ + private static CompressionOptions[] defaultCompressionOptions(int compressionLevel) { + return defaultCompressionOptions(compressionLevel, 15, 8); + } + + /** + * Copy of {@link HttpContentCompressor} default compression options with ZSTD excluded: + * although zstd-jni is on the classpath, {@link ZstdEncoder} requires direct buffers support + * which by default {@link NettyAllocator} does not provide. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * @param windowBits + * The base two logarithm of the size of the history buffer. The + * value should be in the range {@code 9} to {@code 15} inclusive. + * Larger values result in better compression at the expense of + * memory usage. The default value is {@code 15}. + * @param memLevel + * How much memory should be allocated for the internal compression + * state. {@code 1} uses minimum memory and {@code 9} uses maximum + * memory. Larger values result in better and faster compression + * at the expense of memory usage. The default value is {@code 8} + * + * @return default compression options + */ + private static CompressionOptions[] defaultCompressionOptions(int compressionLevel, int windowBits, int memLevel) { + final List options = new ArrayList(4); + final GzipOptions gzipOptions = StandardCompressionOptions.gzip(compressionLevel, windowBits, memLevel); + final DeflateOptions deflateOptions = StandardCompressionOptions.deflate(compressionLevel, windowBits, memLevel); + + options.add(gzipOptions); + options.add(deflateOptions); + options.add(StandardCompressionOptions.snappy()); + + if (Brotli.isAvailable()) { + options.add(StandardCompressionOptions.brotli()); + } + + return options.toArray(new CompressionOptions[0]); + } + +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java index ff901476c162d..8789c77811fd0 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java @@ -157,6 +157,14 @@ public static void logAllocatorDescriptionIfNeeded() { } } + public static ByteBufAllocator getAllocator(boolean directBuffers) { + if (directBuffers == true && ALLOCATOR instanceof NoDirectBuffers ndb) { + return ndb.delegate; /* Http3/Quic only supports direct buffers */ + } else { + return ALLOCATOR; + } + } + public static ByteBufAllocator getAllocator() { return ALLOCATOR; } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java index 8b8223da70c08..7d260821c3fd4 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java @@ -24,7 +24,7 @@ * @see TLSUtil */ public class SslUtils { - private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" }; + public static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" }; private static final int SSL_CONTENT_TYPE_CHANGE_CIPHER_SPEC = 20; private static final int SSL_CONTENT_TYPE_ALERT = 21; diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java index cf841f2e24b1e..98da06e0a00bf 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java @@ -34,6 +34,7 @@ import org.opensearch.common.TriFunction; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.tasks.Task; @@ -52,7 +53,9 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -61,7 +64,9 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -86,12 +91,24 @@ import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler; import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder; import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder; +import io.netty.handler.codec.http3.Http3; +import io.netty.handler.codec.http3.Http3ClientConnectionHandler; +import io.netty.handler.codec.http3.Http3FrameToHttpObjectCodec; +import io.netty.handler.codec.http3.Http3RequestStreamInitializer; +import io.netty.handler.codec.quic.QuicChannel; +import io.netty.handler.codec.quic.QuicSslContext; +import io.netty.handler.codec.quic.QuicSslContextBuilder; +import io.netty.handler.codec.quic.QuicStreamChannel; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.util.AttributeKey; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.opensearch.http.netty4.Netty4QuicServerTransport.SETTING_H3_MAX_STREAMS; +import static org.opensearch.http.netty4.Netty4QuicServerTransport.SETTING_H3_MAX_STREAM_LOCAL_LENGTH; +import static org.opensearch.http.netty4.Netty4QuicServerTransport.SETTING_H3_MAX_STREAM_REMOTE_LENGTH; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static org.junit.Assert.fail; @@ -118,12 +135,12 @@ static Collection returnOpaqueIds(Collection responses } private final Bootstrap clientBootstrap; - private final TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory; + private final TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory; private final boolean secure; Netty4HttpClient( Bootstrap clientBootstrap, - TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory, + TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory, boolean secure ) { this.clientBootstrap = clientBootstrap; @@ -161,13 +178,23 @@ public static Netty4HttpClient http2() { ); } + public static Netty4HttpClient http3() { + return new Netty4HttpClient( + new Bootstrap().channel(NioDatagramChannel.class) + .option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator(true)) + .group(new NioEventLoopGroup(1)), + CountDownLatchHandlerHttp3::new, + false + ); + } + public List get(SocketAddress remoteAddress, String... uris) throws InterruptedException { List requests = new ArrayList<>(uris.length); for (int i = 0; i < uris.length; i++) { final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); httpRequest.headers().add(HOST, "localhost"); httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); - httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "http" : "https"); + httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); requests.add(httpRequest); } return sendRequests(remoteAddress, requests); @@ -179,6 +206,8 @@ public final Collection post(SocketAddress remoteAddress, List } public final FullHttpResponse send(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException { + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); List responses = sendRequests(remoteAddress, Collections.singleton(httpRequest)); assert responses.size() == 1 : "expected 1 and only 1 http response"; return responses.get(0); @@ -202,7 +231,7 @@ private List processRequestsWithBody( request.headers().add(HttpHeaderNames.HOST, "localhost"); request.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json"); - request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http"); + request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); request.headers().add("X-Opaque-ID", String.valueOf(i)); requests.add(request); } @@ -214,7 +243,7 @@ private synchronized List sendRequests(final SocketAddress rem final CountDownLatch latch = new CountDownLatch(requests.size()); final List content = Collections.synchronizedList(new ArrayList<>(requests.size())); - final AwaitableChannelInitializer handler = handlerFactory.apply(latch, content, secure); + final AwaitableChannelInitializer handler = handlerFactory.apply(latch, content, secure); clientBootstrap.handler(handler); ChannelFuture channelFuture = null; @@ -224,7 +253,8 @@ private synchronized List sendRequests(final SocketAddress rem handler.await(); for (HttpRequest request : requests) { - channelFuture.channel().writeAndFlush(request); + final Channel channel = handler.prepare(clientBootstrap, channelFuture.channel()); + channel.writeAndFlush(request); } if (latch.await(30L, TimeUnit.SECONDS) == false) { fail("Failed to get all expected responses."); @@ -247,7 +277,7 @@ public void close() { /** * helper factory which adds returned data to a list and uses a count down latch to decide when done */ - private static class CountDownLatchHandlerHttp extends AwaitableChannelInitializer { + private static class CountDownLatchHandlerHttp extends AwaitableChannelInitializer { private final CountDownLatch latch; private final Collection content; @@ -302,16 +332,20 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E * The channel initializer with the ability to await for initialization to be completed * */ - private static abstract class AwaitableChannelInitializer extends ChannelInitializer { + private static abstract class AwaitableChannelInitializer extends ChannelInitializer { void await() { // do nothing } + + Channel prepare(Bootstrap clientBootstrap, Channel channel) throws InterruptedException { + return channel; + } } /** * helper factory which adds returned data to a list and uses a count down latch to decide when done */ - private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitializer { + private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitializer { private final CountDownLatch latch; private final Collection content; @@ -439,4 +473,73 @@ protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws } } + /** + * helper factory which adds returned data to a list and uses a count down latch to decide when done + */ + private static class CountDownLatchHandlerHttp3 extends AwaitableChannelInitializer { + private final CountDownLatch latch; + private final Collection content; + + CountDownLatchHandlerHttp3(final CountDownLatch latch, final Collection content, final boolean secure) { + this.latch = latch; + this.content = content; + } + + @Override + protected void initChannel(DatagramChannel ch) { + final QuicSslContext context = QuicSslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .applicationProtocols(Http3.supportedApplicationProtocols()) + .build(); + + final ChannelHandler quicClientCodec = Http3.newQuicClientCodecBuilder() + .sslContext(context) + .maxIdleTimeout(5000, TimeUnit.MILLISECONDS) + .initialMaxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(Settings.EMPTY).getBytes()) + .initialMaxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(Settings.EMPTY).getBytes()) + .initialMaxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(Settings.EMPTY).getBytes()) + .initialMaxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(Settings.EMPTY).longValue()) + .build(); + + ch.pipeline().addLast(quicClientCodec); + } + + @Override + Channel prepare(Bootstrap clientBootstrap, Channel channel) throws InterruptedException { + final QuicChannel quicChannel = QuicChannel.newBootstrap(channel) + .handler(new Http3ClientConnectionHandler()) + .remoteAddress(channel.remoteAddress()) + .connect() + .sync() + .getNow(); + + return Http3.newRequestStream(quicChannel, new Http3RequestStreamInitializer() { + @Override + protected void initRequestStream(QuicStreamChannel ch) { + final int maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt(); + ch.pipeline().addLast(new Http3FrameToHttpObjectCodec(false)); + ch.pipeline().addLast(new HttpContentDecompressor()); + ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength)); + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { + if (msg instanceof FullHttpResponse ht) { + // We copy the buffer manually to avoid a huge allocation on a pooled allocator. We have + // a test that tracks huge allocations, so we want to avoid them in this test code. + ByteBuf newContent = Unpooled.copiedBuffer(ht.content()); + content.add(ht.replace(newContent)); + } + latch.countDown(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + latch.countDown(); + } + }); + } + }).sync().getNow(); + } + } } diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4QuicServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4QuicServerTransportTests.java new file mode 100644 index 0000000000000..a0d67e078e546 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4QuicServerTransportTests.java @@ -0,0 +1,606 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.MockBigArrays; +import org.opensearch.common.util.MockPageCacheRecycler; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.http.BindHttpException; +import org.opensearch.http.CorsHandler; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpTransportSettings; +import org.opensearch.http.NullDispatcher; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.TransportExceptionHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.KeyStoreUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.NettyAllocator; +import org.opensearch.transport.SharedGroupFactory; +import org.opensearch.transport.netty4.ssl.SslUtils; +import org.junit.After; +import org.junit.Before; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; +import javax.net.ssl.TrustManagerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.pkitesting.CertificateBuilder.Algorithm; + +import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; +import static org.opensearch.test.KeyStoreUtils.KEYSTORE_PASSWORD; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +/** + * Tests for the {@link Netty4QuicServerTransport} class. + */ +public class Netty4QuicServerTransportTests extends OpenSearchTestCase { + + private NetworkService networkService; + private ThreadPool threadPool; + private MockBigArrays bigArrays; + private ClusterSettings clusterSettings; + private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + + @Before + public void setup() throws Exception { + networkService = new NetworkService(Collections.emptyList()); + threadPool = new TestThreadPool("test"); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("PKIX"); + keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(Algorithm.ecp384), KEYSTORE_PASSWORD); + + secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() { + @Override + public Optional parameters(Settings settings) { + return Optional.of(new SecureHttpTransportParameters() { + @Override + public Optional keyManagerFactory() { + return Optional.of(keyManagerFactory); + } + + @Override + public Optional sslProvider() { + return Optional.empty(); + } + + @Override + public Optional clientAuth() { + return Optional.empty(); + } + + @Override + public Collection protocols() { + return Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS); + } + + @Override + public Collection cipherSuites() { + return Http2SecurityUtil.CIPHERS; + } + + @Override + public Optional trustManagerFactory() { + return Optional.of(InsecureTrustManagerFactory.INSTANCE); + } + }); + } + + @Override + public Optional buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) { + return Optional.empty(); + } + + @Override + public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException { + final SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build() + .newEngine(NettyAllocator.getAllocator()); + return Optional.of(engine); + } + }; + } + + @After + public void shutdown() throws Exception { + if (threadPool != null) { + threadPool.shutdownNow(); + } + threadPool = null; + networkService = null; + bigArrays = null; + clusterSettings = null; + } + + /** + * Test that {@link Netty4QuicServerTransportTests} supports the "Expect: 100-continue" HTTP header + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeader() throws InterruptedException { + final Settings settings = createSettings(); + final int contentLength = randomIntBetween(1, HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).bytesAsInt()); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.CONTINUE); + } + + /** + * Test that {@link Netty4QuicServerTransportTests} responds to a + * 100-continue expectation with too large a content-length + * with a 413 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException { + final String key = HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(); + final int maxContentLength = randomIntBetween(1, 104857600); + final Settings settings = createBuilderWithPort().put(key, maxContentLength + "b").build(); + final int contentLength = randomIntBetween(maxContentLength + 1, Integer.MAX_VALUE); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); + } + + /** + * Test that {@link Netty4QuicServerTransportTests} responds to an unsupported expectation with a 417 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectUnsupportedExpectation() throws InterruptedException { + Settings settings = createSettings(); + runExpectHeaderTest(settings, "chocolate=yummy", 0, HttpResponseStatus.EXPECTATION_FAILED); + } + + private void runExpectHeaderTest( + final Settings settings, + final String expectation, + final int contentLength, + final HttpResponseStatus expectedStatus + ) throws InterruptedException { + + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { + channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))); + } + + @Override + public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + }; + try ( + Netty4QuicServerTransport transport = new Netty4QuicServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); + request.headers().set(HttpHeaderNames.EXPECT, expectation); + HttpUtil.setContentLength(request, contentLength); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(expectedStatus)); + if (expectedStatus.equals(HttpResponseStatus.CONTINUE)) { + final FullHttpRequest continuationRequest = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.POST, + "/", + Unpooled.EMPTY_BUFFER + ); + final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), continuationRequest); + try { + assertThat(continuationResponse.status(), is(HttpResponseStatus.OK)); + assertThat( + new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), + is("done") + ); + } finally { + continuationResponse.release(); + } + } + } finally { + response.release(); + } + } + } + } + + public void testBindUnavailableAddress() { + Settings initialSettings = createSettings(); + try ( + Netty4QuicServerTransport transport = new Netty4QuicServerTransport( + initialSettings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + new NullDispatcher(), + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + Settings settings = Settings.builder() + .put("http.port", remoteAddress.getPort()) + .put("network.host", remoteAddress.getAddress()) + .build(); + try ( + Netty4QuicServerTransport otherTransport = new Netty4QuicServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + new NullDispatcher(), + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start); + assertEquals("Failed to bind to " + NetworkAddress.format(remoteAddress.address()), bindHttpException.getMessage()); + } + } + } + + public void testBadRequest() throws InterruptedException { + final AtomicReference causeReference = new AtomicReference<>(); + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + causeReference.set(cause); + try { + final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad"); + channel.sendResponse(new BytesRestResponse(channel, BAD_REQUEST, e)); + } catch (final IOException e) { + throw new AssertionError(e); + } + } + + }; + + final Settings settings; + final int maxInitialLineLength; + final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; + if (randomBoolean()) { + maxInitialLineLength = httpMaxInitialLineLengthSetting.getDefault(Settings.EMPTY).bytesAsInt(); + settings = createSettings(); + } else { + maxInitialLineLength = randomIntBetween(1, 8192); + settings = createBuilderWithPort().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); + } + + try ( + Netty4QuicServerTransport transport = new Netty4QuicServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8")); + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.BAD_REQUEST)); + assertThat( + new String(response.content().array(), Charset.forName("UTF-8")), + containsString("you sent a bad request and you should feel bad") + ); + } finally { + response.release(); + } + } + } + + assertNotNull(causeReference.get()); + assertThat(causeReference.get(), instanceOf(TooLongFrameException.class)); + } + + public void testLargeCompressedResponse() throws InterruptedException { + final String responseString = randomAlphaOfLength(4 * 1024 * 1024); + final String url = "/thing"; + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + if (url.equals(request.uri())) { + channel.sendResponse(new BytesRestResponse(OK, responseString)); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); + throw new AssertionError(); + } + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + + }; + + try ( + Netty4QuicServerTransport transport = new Netty4QuicServerTransport( + Settings.EMPTY, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + byte[] bytes = new byte[response.content().readableBytes()]; + response.content().readBytes(bytes); + assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString)); + } finally { + response.release(); + } + } + } + } + + public void testCorsRequest() throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + + }; + + final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) + .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") + .build(); + + try ( + Netty4QuicServerTransport transport = new Netty4QuicServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + // Test pre-flight request + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); + request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); + request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("test-cors.org")); + assertThat(response.headers().get(CorsHandler.VARY), equalTo(CorsHandler.ORIGIN)); + assertTrue(response.headers().contains(CorsHandler.DATE)); + } finally { + response.release(); + } + } + + // Test short-circuited request + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + request.headers().add(CorsHandler.ORIGIN, "google.com"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN)); + } finally { + response.release(); + } + } + } + } + + public void testReadTimeout() throws Exception { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError("Should not have received a dispatched request"); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError("Should not have received a dispatched request"); + } + + }; + + Settings settings = createBuilderWithPort().put( + HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), + new TimeValue(randomIntBetween(100, 300)) + ).build(); + + NioEventLoopGroup group = new NioEventLoopGroup(); + try ( + Netty4QuicServerTransport transport = new Netty4QuicServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + CountDownLatch channelClosedLatch = new CountDownLatch(1); + + Bootstrap clientBootstrap = new Bootstrap().option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator(true)) + .channel(NioDatagramChannel.class) + .handler(new ChannelInitializer() { + + @Override + protected void initChannel(NioDatagramChannel ch) { + ch.pipeline().addLast(new ChannelHandlerAdapter() { + }); + + } + }) + .group(group); + ChannelFuture connect = clientBootstrap.connect(remoteAddress.address()); + connect.channel().closeFuture().addListener(future -> channelClosedLatch.countDown()); + + assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES)); + + } finally { + group.shutdownGracefully().await(); + } + } + + private Settings createSettings() { + return createBuilderWithPort().build(); + } + + private Settings.Builder createBuilderWithPort() { + return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()); + } +} diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index ab78de7497bfa..00f84af68e815 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -146,23 +146,8 @@ thirdPartyAudit { 'io.netty.channel.uring.IoUringIoHandler', 'io.netty.channel.uring.IoUringServerSocketChannel', 'io.netty.channel.uring.IoUringSocketChannel', - 'io.netty.handler.codec.quic.QuicChannel', - 'io.netty.handler.codec.quic.QuicChannelBootstrap', - 'io.netty.handler.codec.quic.QuicClientCodecBuilder', - 'io.netty.handler.codec.quic.QuicServerCodecBuilder', - 'io.netty.handler.codec.quic.QuicSslContext', - 'io.netty.handler.codec.quic.QuicSslContextBuilder', - 'io.netty.handler.codec.quic.QuicSslEngine', - 'io.netty.handler.codec.quic.QuicStreamChannel', - 'io.netty.handler.codec.quic.QuicStreamChannelBootstrap', - 'io.netty.handler.codec.quic.QuicTokenHandler', 'io.netty.handler.codec.haproxy.HAProxyMessage', 'io.netty.handler.codec.haproxy.HAProxyMessageDecoder', - 'io.netty.handler.codec.http3.Http3', - 'io.netty.handler.codec.http3.Http3ClientConnectionHandler', - 'io.netty.handler.codec.http3.Http3DataFrame', - 'io.netty.handler.codec.http3.Http3Headers', - 'io.netty.handler.codec.http3.Http3HeadersFrame', 'javax.activation.DataHandler', 'javax.activation.DataSource', 'javax.xml.bind.JAXBElement', diff --git a/plugins/transport-reactor-netty4/build.gradle b/plugins/transport-reactor-netty4/build.gradle index b3142f0def72a..ceb9faee43e1c 100644 --- a/plugins/transport-reactor-netty4/build.gradle +++ b/plugins/transport-reactor-netty4/build.gradle @@ -35,6 +35,15 @@ dependencies { api libs.bundles.reactornetty api libs.netty.codec.http3 + api libs.netty.codec.classes.quic + api libs.netty.codec.native.quic + + // Bundle all supported OSes and Archs + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:windows-x86_64" testImplementation libs.log4jslf4jimpl javaRestTestImplementation libs.reactor.test @@ -150,6 +159,7 @@ thirdPartyAudit { 'io.netty.channel.epoll.EpollDatagramChannel', 'io.netty.channel.epoll.EpollServerSocketChannel', 'io.netty.channel.epoll.EpollSocketChannel', + 'io.netty.channel.epoll.SegmentedDatagramPacket', 'io.netty.channel.kqueue.KQueue', 'io.netty.channel.kqueue.KQueueDatagramChannel', 'io.netty.channel.kqueue.KQueueServerSocketChannel', @@ -167,20 +177,6 @@ thirdPartyAudit { 'io.netty.channel.uring.IoUringIoHandler', 'io.netty.channel.uring.IoUringServerSocketChannel', 'io.netty.channel.uring.IoUringSocketChannel', - 'io.netty.handler.codec.quic.QuicChannel', - 'io.netty.handler.codec.quic.QuicChannelBootstrap', - 'io.netty.handler.codec.quic.QuicClientCodecBuilder', - 'io.netty.handler.codec.quic.QuicCodecBuilder', - 'io.netty.handler.codec.quic.QuicException', - 'io.netty.handler.codec.quic.QuicServerCodecBuilder', - 'io.netty.handler.codec.quic.QuicSslContext', - 'io.netty.handler.codec.quic.QuicSslContextBuilder', - 'io.netty.handler.codec.quic.QuicSslEngine', - 'io.netty.handler.codec.quic.QuicStreamChannel', - 'io.netty.handler.codec.quic.QuicStreamChannelBootstrap', - 'io.netty.handler.codec.quic.QuicStreamFrame', - 'io.netty.handler.codec.quic.QuicStreamType', - 'io.netty.handler.codec.quic.QuicTokenHandler', 'io.netty.pkitesting.CertificateBuilder', 'io.netty.pkitesting.CertificateBuilder$Algorithm', diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-classes-quic-4.2.7.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-classes-quic-4.2.7.Final.jar.sha1 new file mode 100644 index 0000000000000..2758c952ac630 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-classes-quic-4.2.7.Final.jar.sha1 @@ -0,0 +1 @@ +98e80e8a575aa5cacd0db278a91fc4b34e4721fe \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-aarch_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..28c795d9fe9e5 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-aarch_64.jar.sha1 @@ -0,0 +1 @@ +7ecd8d1de6b3eb7eeb3dcdb1034780826e141f1c \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-x86_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-x86_64.jar.sha1 new file mode 100644 index 0000000000000..ac96bb86be54f --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-linux-x86_64.jar.sha1 @@ -0,0 +1 @@ +6a3e4398852f926c4b22f381d89dae2388446ae8 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-aarch_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..4cf4316e6fe70 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-aarch_64.jar.sha1 @@ -0,0 +1 @@ +c30d746bd588c07bac4ac04abde576c05a3c0a28 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-x86_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-x86_64.jar.sha1 new file mode 100644 index 0000000000000..5abd9e612dd18 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-osx-x86_64.jar.sha1 @@ -0,0 +1 @@ +f3679a6cc103292931098b1dc835ffb831e27368 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-windows-x86_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-windows-x86_64.jar.sha1 new file mode 100644 index 0000000000000..2fdbde31dcdd6 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final-windows-x86_64.jar.sha1 @@ -0,0 +1 @@ +c4f177d2a99668c209acc31b8b85df58e9166218 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final.jar.sha1 new file mode 100644 index 0000000000000..6525658f1e2a0 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.7.Final.jar.sha1 @@ -0,0 +1 @@ +d6a7141d5fab45c8d2f6c646965f2dd53a43b41e \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java index 833d60375a2bd..5485da8581e35 100644 --- a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java +++ b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java @@ -101,7 +101,7 @@ public void testLimitsInFlightRequests() throws Exception { HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); - try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false)) { + try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false, Settings.EMPTY)) { final Collection singleResponse = nettyHttpClient.post(transportAddress.address(), requests.subList(0, 1)); try { assertThat(singleResponse, hasSize(1)); @@ -131,7 +131,7 @@ public void testDoesNotLimitExcludedRequests() throws Exception { HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); - try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false)) { + try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false, Settings.EMPTY)) { final Collection responses = nettyHttpClient.put(transportAddress.address(), requestUris); try { assertThat(responses, hasSize(requestUris.size())); diff --git a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java index c0e43de06f6ff..46c30557b3c5a 100644 --- a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java +++ b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java @@ -14,6 +14,7 @@ package org.opensearch.http.reactor.netty4; import org.opensearch.OpenSearchReactorNetty4IntegTestCase; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.http.HttpServerTransport; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; @@ -43,7 +44,7 @@ public void testThatNettyHttpServerSupportsPipelining() throws Exception { TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); TransportAddress transportAddress = randomFrom(boundAddresses); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(Settings.EMPTY)) { Collection responses = client.get(transportAddress.address(), true, requests); try { assertThat(responses, hasSize(5)); diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4CompositeHttpServerChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4CompositeHttpServerChannel.java new file mode 100644 index 0000000000000..e3f1b45a3f6e8 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4CompositeHttpServerChannel.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.http.HttpServerChannel; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; + +import io.netty.channel.Channel; + +class ReactorNetty4CompositeHttpServerChannel implements HttpServerChannel { + private final Channel[] channels; + private final CompletableContext[] closeContexts; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + ReactorNetty4CompositeHttpServerChannel(Channel... channels) { + this.channels = channels; + this.closeContexts = new CompletableContext[channels.length]; + for (int i = 0; i < channels.length; ++i) { + closeContexts[i] = new CompletableContext<>(); + Netty4Utils.addListener(this.channels[i].closeFuture(), closeContexts[i]); + } + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channels[0].localAddress(); + } + + @Override + public void addCloseListener(ActionListener listener) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + final CompletableFuture[] futures = new CompletableFuture[closeContexts.length]; + for (int i = 0; i < closeContexts.length; ++i) { + final CompletableFuture future = new CompletableFuture<>(); + closeContexts[i].addListener((v, t) -> { + if (t == null) { + future.complete(v); + } else { + future.completeExceptionally(t); + } + }); + futures[i] = future; + } + + // Wait for all contexts to be closed + CompletableFuture.allOf(futures).whenComplete((v, t) -> { + if (t == null) { + listener.onResponse(v); + } else { + listener.onFailure((Exception) t); + } + }); + } + + @Override + public boolean isOpen() { + for (int i = 0; i < channels.length; ++i) { + if (channels[i].isOpen() == false) { + return false; + } + } + return true; + } + + @Override + public void close() { + for (int i = 0; i < channels.length; ++i) { + channels[i].close(); + } + } + + @Override + public String toString() { + return "ReactorNetty4CompositeHttpServerChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java index 10f06ad91b78d..bf1bcd63b7138 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java @@ -158,6 +158,8 @@ public HttpVersion protocolVersion() { return HttpRequest.HttpVersion.HTTP_1_1; } else if (protocol.equals("HTTP/2.0")) { return HttpRequest.HttpVersion.HTTP_2_0; + } else if (protocol.equals("HTTP/3.0")) { + return HttpRequest.HttpVersion.HTTP_3_0; } else { throw new IllegalArgumentException("Unexpected http protocol version: " + protocol); } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index 78e5edf48cc57..314f3b4ef389b 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -45,12 +45,14 @@ import java.util.Optional; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.quic.QuicSslContextBuilder; import io.netty.handler.ssl.ApplicationProtocolConfig; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ClientAuth; @@ -62,13 +64,16 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.netty.DisposableChannel; import reactor.netty.DisposableServer; import reactor.netty.http.HttpProtocol; +import reactor.netty.http.internal.Http3; import reactor.netty.http.server.HttpServer; import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_CONNECT_TIMEOUT; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; @@ -103,6 +108,33 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor Property.NodeScope ); + /** + * Set the initial maximum data limit for local bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_LOCAL_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_local_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum data limit for remote bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_REMOTE_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_remote_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum stream limit for bidirectional streams. + * + * The HTTP/3 standard expects that each end configures at least 100 + * concurrent bidirectional streams at a time, to avoid reducing performance + * by reducing parallelism. + */ + public static final Setting SETTING_H3_MAX_STREAMS = Setting.longSetting("h3.max_streams", 100L, Property.NodeScope); + /** * The number of Reactor Netty HTTP workers */ @@ -239,7 +271,7 @@ public ReactorNetty4HttpServerTransport( */ @Override protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { - final HttpServer server = configure( + final HttpServer http11or2 = configureHttp11orHttp2( HttpServer.create() .httpFormDecoder(builder -> builder.scheduler(scheduler)) .idleTimeout(Duration.ofMillis(connectTimeoutMillis)) @@ -258,11 +290,85 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti .handle((req, res) -> incomingRequest(req, res)) ); - disposableServer = server.bindNow(); - return new ReactorNetty4HttpServerChannel(disposableServer.channel()); + // The HTTP/3 server binds to the same port as HTTP/2 or HTTP/1.1 since those are + // different protocols (UDP, TCP) + final Optional http3Opt = configureHttp3(socketAddress).map(HttpServer::bindNow); + if (http3Opt.isEmpty()) { + disposableServer = http11or2.bindNow(); + return new ReactorNetty4HttpServerChannel(disposableServer.channel()); + } else { + final DisposableServer http3Server = http3Opt.get(); + final DisposableServer http11or2Server = http11or2.bindNow(); + + disposableServer = new DisposableServer() { + @Override + public Channel channel() { + throw new UnsupportedOperationException("The channel() operation is not supported"); + } + + @Override + public void disposeNow() { + disposeQuietly(http3Server); + disposeQuietly(http11or2Server); + } + }; + + return new ReactorNetty4CompositeHttpServerChannel(http11or2Server.channel(), http3Server.channel()); + } } - private HttpServer configure(final HttpServer server) throws Exception { + private Optional configureHttp3(InetSocketAddress socketAddress) throws Exception { + // Configure SSL context if available + if (secureHttpTransportSettingsProvider != null) { + final Optional parameters = secureHttpTransportSettingsProvider.parameters(settings); + + final KeyManagerFactory keyManagerFactory = parameters.flatMap(SecureHttpTransportParameters::keyManagerFactory) + .orElseThrow(() -> new OpenSearchException("The KeyManagerFactory instance is not provided")); + + if (Http3.isHttp3Available() && SETTING_HTTP_HTTP3_ENABLED.get(settings).booleanValue() == true) { + final QuicSslContextBuilder sslContextBuilder = QuicSslContextBuilder.forServer(keyManagerFactory, null); + + parameters.flatMap(SecureHttpTransportParameters::trustManagerFactory).ifPresent(sslContextBuilder::trustManager); + parameters.flatMap(SecureHttpTransportParameters::clientAuth) + .ifPresent(clientAuth -> sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth))); + + final SslContext sslContext = sslContextBuilder.applicationProtocols( + io.netty.handler.codec.http3.Http3.supportedApplicationProtocols() + ).build(); + + return Optional.of( + HttpServer.create() + .httpFormDecoder(builder -> builder.scheduler(scheduler)) + .idleTimeout(Duration.ofMillis(connectTimeoutMillis)) + .readTimeout(Duration.ofMillis(readTimeoutMillis)) + .runOn(sharedGroup.getLowLevelGroup()) + .bindAddress(() -> socketAddress) + .compress(true) + .httpRequestDecoder( + spec -> spec.maxChunkSize(maxChunkSize.bytesAsInt()) + .h2cMaxContentLength(h2cMaxContentLength.bytesAsInt()) + .maxHeaderSize(maxHeaderSize.bytesAsInt()) + .maxInitialLineLength(maxInitialLineLength.bytesAsInt()) + .allowPartialChunks(false) + ) + .handle((req, res) -> incomingRequest(req, res)) + .http3Settings( + spec -> spec.idleTimeout(Duration.ofSeconds(5)) + .maxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(settings).getBytes()) + .maxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(settings).longValue()) + ) + .secure(spec -> spec.sslContext(sslContext)) + .protocol(HttpProtocol.HTTP3) + ); + } + } + + return Optional.empty(); + } + + private HttpServer configureHttp11orHttp2(final HttpServer server) throws Exception { HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)) .childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); @@ -433,7 +539,7 @@ protected void stopInternal() { } if (disposableServer != null) { - disposableServer.disposeNow(); + disposeQuietly(disposableServer); disposableServer = null; } } @@ -474,4 +580,16 @@ public void onException(HttpChannel channel, Exception cause) { } } + /** + * Quietly disposes the channel. + */ + private static void disposeQuietly(final DisposableChannel disposable) { + try { + if (disposable != null) { + disposable.disposeNow(); + } + } catch (final RuntimeException ex) { + // Do nothing + } + } } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java index 90ed1fe729d3a..d938b40b6abab 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java @@ -57,7 +57,12 @@ public ReactorNetty4Plugin() {} */ @Override public List> getSettings() { - return Arrays.asList(ReactorNetty4HttpServerTransport.SETTING_H2C_MAX_CONTENT_LENGTH); + return Arrays.asList( + ReactorNetty4HttpServerTransport.SETTING_H2C_MAX_CONTENT_LENGTH, + ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_LOCAL_LENGTH, + ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_REMOTE_LENGTH, + ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAMS + ); } /** diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java index edd5bc97368dd..245332580d9de 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java @@ -14,6 +14,7 @@ package org.opensearch.http.reactor.netty4; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; @@ -55,11 +56,18 @@ import reactor.core.publisher.ParallelFlux; import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.Http2SslContextSpec; +import reactor.netty.http.Http3SslContextSpec; import reactor.netty.http.HttpProtocol; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.PrematureCloseException; +import reactor.netty.http.internal.Http3; import reactor.util.retry.Retry; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAMS; +import static org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_LOCAL_LENGTH; +import static org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_REMOTE_LENGTH; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -69,7 +77,8 @@ public class ReactorHttpClient implements Closeable { private final boolean compression; private final boolean secure; - private final boolean useHttp11Only; + private final HttpProtocol protocol; + private final Settings settings; static Collection returnHttpResponseBodies(Collection responses) { List list = new ArrayList<>(responses.size()); @@ -87,22 +96,23 @@ static Collection returnOpaqueIds(Collection responses return list; } - public ReactorHttpClient(boolean compression, boolean secure) { + public ReactorHttpClient(boolean compression, boolean secure, Settings settings) { this.compression = compression; this.secure = secure; - this.useHttp11Only = OpenSearchTestCase.randomBoolean(); + this.protocol = randomProtocol(secure, settings); + this.settings = settings; } - public static ReactorHttpClient create() { - return create(true); + public static ReactorHttpClient create(Settings settings) { + return create(true, settings); } - public static ReactorHttpClient create(boolean compression) { - return new ReactorHttpClient(compression, false); + public static ReactorHttpClient create(boolean compression, Settings settings) { + return new ReactorHttpClient(compression, false, settings); } - public static ReactorHttpClient https() { - return new ReactorHttpClient(true, true); + public static ReactorHttpClient https(Settings settings) { + return new ReactorHttpClient(true, true, settings); } public List get(InetSocketAddress remoteAddress, String... uris) throws InterruptedException { @@ -272,27 +282,46 @@ private HttpClient createClient(final InetSocketAddress remoteAddress, final Nio .runOn(eventLoopGroup) .host(remoteAddress.getHostString()) .port(remoteAddress.getPort()) - .compress(compression) - .protocol(HttpProtocol.H2, HttpProtocol.HTTP11); + .compress(compression); if (secure) { - return client.protocol( - useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 } - ) - .secure( - spec -> spec.sslContext( - useHttp11Only - /* switch between HTTP 1.1/HTTP 2 randomly, both are supported */ - ? Http11SslContextSpec.forClient() + if (protocol == HttpProtocol.HTTP11) { + return client.protocol(protocol) + .secure( + spec -> spec.sslContext( + Http11SslContextSpec.forClient() .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE)) - : Http2SslContextSpec.forClient() + ).handshakeTimeout(Duration.ofSeconds(30)) + ); + } else if (protocol == HttpProtocol.H2) { + return client.protocol(new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 }) + .secure( + spec -> spec.sslContext( + Http2SslContextSpec.forClient() .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE)) - ).handshakeTimeout(Duration.ofSeconds(30)) - ); + ).handshakeTimeout(Duration.ofSeconds(30)) + ); + } else { + return client.protocol(protocol) + .secure( + spec -> spec.sslContext( + Http3SslContextSpec.forClient().configure(s -> s.trustManager(InsecureTrustManagerFactory.INSTANCE)) + ).handshakeTimeout(Duration.ofSeconds(30)) + ) + .http3Settings( + spec -> spec.idleTimeout(Duration.ofSeconds(5)) + .maxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(settings).getBytes()) + .maxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(settings).longValue()) + ); + } } else { - return client.protocol( - useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C } - ); + if (protocol == HttpProtocol.HTTP11) { + return client.protocol(protocol); + } else { + return client.protocol(new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C }); + } } } @@ -302,6 +331,23 @@ public void close() { } public boolean useHttp11only() { - return useHttp11Only; + return protocol == HttpProtocol.HTTP11; + } + + private static HttpProtocol randomProtocol(boolean secure, Settings settings) { + HttpProtocol[] values = null; + + if (secure) { + if (Http3.isHttp3Available() && SETTING_HTTP_HTTP3_ENABLED.get(settings).booleanValue() == true) { + values = new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2, HttpProtocol.HTTP3 }; + } else { + values = new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 }; + } + } else { + values = new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C }; + } + + return values[OpenSearchTestCase.randomInt(values.length - 1)]; } + } diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java index 00ca378a4e46b..91825505a39a3 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java @@ -97,7 +97,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, httpServerTransport.start(); final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); - try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create()) { + try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(settings)) { final List responses = nettyHttpClient.get(transportAddress.address(), "/_cluster/settings?pretty=%"); try { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java index 715f0191fd851..eaa13cf665c61 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java @@ -119,7 +119,7 @@ public void testRequestResponseStreaming() throws InterruptedException { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create(false)) { + try (ReactorHttpClient client = ReactorHttpClient.create(false, Settings.EMPTY)) { HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); try { @@ -159,7 +159,7 @@ public void testConnectionsGettingClosedForStreamingRequests() throws Interrupte new SharedGroupFactory(Settings.EMPTY), NoopTracer.INSTANCE ); - ReactorHttpClient client = ReactorHttpClient.create(false) + ReactorHttpClient client = ReactorHttpClient.create(false, Settings.EMPTY) ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -240,6 +240,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } }; + } private static ToXContent[] newChunks(final String responseString) { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java index 72d645aaf8022..be344361e8991 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java @@ -202,7 +202,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); request.headers().set(HttpHeaderNames.EXPECT, expectation); HttpUtil.setContentLength(request, contentLength); @@ -307,7 +307,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final String url = "/" + randomAlphaOfLength(maxInitialLineLength); final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); @@ -353,7 +353,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final FullHttpResponse response = client.send(remoteAddress.address(), request); @@ -409,7 +409,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(Settings.EMPTY)) { DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); long numOfHugeAllocations = getHugeAllocationCount(); @@ -475,7 +475,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, new SharedGroupFactory(Settings.EMPTY), NoopTracer.INSTANCE ); - ReactorHttpClient client = ReactorHttpClient.create() + ReactorHttpClient client = ReactorHttpClient.create(Settings.EMPTY) ) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -538,7 +538,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); // Test pre-flight request - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); @@ -555,7 +555,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } // Test short-circuited request - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); request.headers().add(CorsHandler.ORIGIN, "google.com"); diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java index c5f1e6215f098..2fa30ee485109 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java @@ -89,6 +89,7 @@ import io.netty.handler.codec.http2.Http2SecurityUtil; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.pkitesting.CertificateBuilder.Algorithm; import static org.opensearch.core.rest.RestStatus.OK; import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; @@ -117,7 +118,7 @@ public void setup() throws Exception { clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); var keyManagerFactory = KeyManagerFactory.getInstance("PKIX"); - keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(), KEYSTORE_PASSWORD); + keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(Algorithm.ecp384), KEYSTORE_PASSWORD); secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() { @Override @@ -162,15 +163,11 @@ public Optional buildHttpServerExceptionHandler(Setti @Override public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException { - try { - SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .build() - .newEngine(NettyAllocator.getAllocator()); - return Optional.of(engine); - } catch (final Exception ex) { - throw new SSLException(ex); - } + SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build() + .newEngine(NettyAllocator.getAllocator()); + return Optional.of(engine); } }; } @@ -256,7 +253,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); request.headers().set(HttpHeaderNames.EXPECT, expectation); HttpUtil.setContentLength(request, contentLength); @@ -295,6 +292,10 @@ public void testBindUnavailableAddress() { Settings settings = Settings.builder() .put("http.port", remoteAddress.getPort()) .put("network.host", remoteAddress.getAddress()) + .put( + HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.getKey(), + HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.get(initialSettings) + ) .build(); try ( ReactorNetty4HttpServerTransport otherTransport = new ReactorNetty4HttpServerTransport( @@ -359,7 +360,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final String url = "/" + randomAlphaOfLength(maxInitialLineLength); final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); @@ -406,7 +407,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final FullHttpResponse response = client.send(remoteAddress.address(), request); @@ -446,9 +447,13 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th }; + final Settings settings = Settings.builder() + .put(HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.getKey(), randomBoolean()) + .build(); + try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( - Settings.EMPTY, + settings, networkService, bigArrays, threadPool, @@ -463,7 +468,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); long numOfHugeAllocations = getHugeAllocationCount(); @@ -535,7 +540,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); // Test pre-flight request - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); @@ -552,7 +557,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } // Test short-circuited request - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); request.headers().add(CorsHandler.ORIGIN, "google.com"); @@ -638,6 +643,8 @@ private Settings createSettings() { } private Settings.Builder createBuilderWithPort() { - return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()); + return Settings.builder() + .put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()) + .put(HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.getKey(), randomBoolean()); } } diff --git a/server/src/main/java/org/opensearch/bootstrap/Security.java b/server/src/main/java/org/opensearch/bootstrap/Security.java index a8652dffa6910..90b623094fff1 100644 --- a/server/src/main/java/org/opensearch/bootstrap/Security.java +++ b/server/src/main/java/org/opensearch/bootstrap/Security.java @@ -129,7 +129,9 @@ */ @SuppressWarnings("removal") final class Security { - private static final Pattern CODEBASE_JAR_WITH_CLASSIFIER = Pattern.compile("^(.+)-\\d+\\.\\d+[^-]*.*?[-]?([^-]+)?\\.jar$"); + private static final Pattern CODEBASE_JAR_WITH_CLASSIFIER = Pattern.compile( + "^(.+)-\\d+\\.\\d+[^-]*.*?[-]?((?:linux-|windows-|osx-)?[^-]+)?\\.jar$" + ); /** no instantiation */ private Security() {} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ca8c43da99dbb..36efa870cf066 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -424,6 +424,7 @@ public void apply(Settings value, Settings current, Settings previous) { HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE, HttpTransportSettings.SETTING_HTTP_TRACE_LOG_INCLUDE, HttpTransportSettings.SETTING_HTTP_TRACE_LOG_EXCLUDE, + HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED, HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING, HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/http/HttpRequest.java b/server/src/main/java/org/opensearch/http/HttpRequest.java index 56e6276811377..4fefed54e8f27 100644 --- a/server/src/main/java/org/opensearch/http/HttpRequest.java +++ b/server/src/main/java/org/opensearch/http/HttpRequest.java @@ -60,7 +60,8 @@ public interface HttpRequest { enum HttpVersion { HTTP_1_0, HTTP_1_1, - HTTP_2_0 + HTTP_2_0, + HTTP_3_0 } /** diff --git a/server/src/main/java/org/opensearch/http/HttpTransportSettings.java b/server/src/main/java/org/opensearch/http/HttpTransportSettings.java index 05c0a28157cb4..6f627c9924793 100644 --- a/server/src/main/java/org/opensearch/http/HttpTransportSettings.java +++ b/server/src/main/java/org/opensearch/http/HttpTransportSettings.java @@ -259,5 +259,12 @@ public final class HttpTransportSettings { Setting.Property.NodeScope ); + // Enable HTTP/3 protocol if supported by the operating system and architecture + public static final Setting SETTING_HTTP_HTTP3_ENABLED = Setting.boolSetting( + "http.protocol.http3.enabled", + false, + Property.NodeScope + ); + private HttpTransportSettings() {} } diff --git a/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java b/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java index 4ca11d724468f..0ca4eb09c377d 100644 --- a/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java @@ -48,7 +48,11 @@ public static String inferStoreType(String filePath) { } public static KeyStore createServerKeyStore() throws Exception { - var serverCred = generateCert(); + return createServerKeyStore(Algorithm.rsa2048); + } + + public static KeyStore createServerKeyStore(Algorithm algorithm) throws Exception { + var serverCred = generateCert(algorithm); var keyStore = KeyStore.getInstance(FipsMode.CHECK.isFipsEnabled() ? "BCFKS" : "JKS"); keyStore.load(null, null); keyStore.setKeyEntry( @@ -60,7 +64,7 @@ public static KeyStore createServerKeyStore() throws Exception { return keyStore; } - private static X509Bundle generateCert() throws Exception { + private static X509Bundle generateCert(Algorithm algorithm) throws Exception { final Locale locale = Locale.getDefault(); try { Locale.setDefault(LocaleUtil.EN_Locale); @@ -68,7 +72,7 @@ private static X509Bundle generateCert() throws Exception { // reference: https://csrc.nist.gov/projects/cryptographic-module-validation-program/certificate/4943 return new CertificateBuilder().subject("CN=Test CA Certificate") .setIsCertificateAuthority(true) - .algorithm(Algorithm.rsa2048) + .algorithm(algorithm) .provider(new BouncyCastleFipsProvider()) .buildSelfSigned(); } finally {