Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix lag metric for pull-based ingestion when streaming source is empty ([#19393](https://github.com/opensearch-project/OpenSearch/pull/19393))
- Fix ingestion state xcontent serialization in IndexMetadata and fail fast on mapping errors([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320))
- Fix updated keyword field params leading to stale responses from request cache ([#19385](https://github.com/opensearch-project/OpenSearch/pull/19385))
- Implement SslHandler retrieval logic for transport-reactor-netty4 plugin ([#19458](https://github.com/opensearch-project/OpenSearch/pull/19458))

### Dependencies
- Bump `com.gradleup.shadow:shadow-gradle-plugin` from 8.3.5 to 8.3.9 ([#19400](https://github.com/opensearch-project/OpenSearch/pull/19400))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import javax.net.ssl.SSLEngine;

import java.util.Optional;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.handler.ssl.SslHandler;
import reactor.netty.NettyPipeline;
import reactor.netty.http.server.HttpServerRequest;

final class ReactorNetty4BaseHttpChannel {
private static final String CHANNEL_PROPERTY = "channel";
private static final String SSL_HANDLER_PROPERTY = "ssl_http";
private static final String SSL_ENGINE_PROPERTY = "ssl_engine";

private ReactorNetty4BaseHttpChannel() {}

@SuppressWarnings("unchecked")
static <T> Optional<T> get(HttpServerRequest request, String name, Class<T> clazz) {
if (CHANNEL_PROPERTY.equalsIgnoreCase(name) == true && clazz.isAssignableFrom(Channel.class) == true) {
final Channel[] channels = new Channel[1];
request.withConnection(connection -> { channels[0] = connection.channel(); });
return Optional.of((T) channels[0]);
} else if (SSL_HANDLER_PROPERTY.equalsIgnoreCase(name) == true || SSL_ENGINE_PROPERTY.equalsIgnoreCase(name) == true) {
final ChannelHandler[] channels = new ChannelHandler[1];
request.withConnection(connection -> {
final Channel channel = connection.channel();
if (channel.parent() != null) {
channels[0] = channel.parent().pipeline().get(NettyPipeline.SslHandler);
} else {
channels[0] = channel.pipeline().get(NettyPipeline.SslHandler);
}
});
if (channels[0] != null) {
if (SSL_HANDLER_PROPERTY.equalsIgnoreCase(name) == true && clazz.isInstance(channels[0]) == true) {
return Optional.of((T) channels[0]);
} else if (SSL_ENGINE_PROPERTY.equalsIgnoreCase(name) == true
&& clazz.isAssignableFrom(SSLEngine.class)
&& channels[0] instanceof SslHandler h) {
return Optional.of((T) h.engine());
}
}
} else {
final ChannelHandler[] channels = new ChannelHandler[1];
request.withConnection(connection -> { channels[0] = connection.channel().pipeline().get(name); });
if (channels[0] != null && clazz.isInstance(channels[0]) == true) {
return Optional.of((T) channels[0]);
}
}

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
Expand Down Expand Up @@ -317,6 +318,8 @@ private HttpServer configure(final HttpServer server) throws Exception {
parameters.flatMap(SecureHttpTransportParameters::trustManagerFactory).ifPresent(sslContextBuilder::trustManager);
parameters.map(SecureHttpTransportParameters::cipherSuites)
.ifPresent(ciphers -> sslContextBuilder.ciphers(ciphers, SupportedCipherSuiteFilter.INSTANCE));
parameters.flatMap(SecureHttpTransportParameters::clientAuth)
.ifPresent(clientAuth -> sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth)));

final SslContext sslContext = sslContextBuilder.protocols(
parameters.map(SecureHttpTransportParameters::protocols).orElseGet(() -> Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.handler.codec.http.FullHttpResponse;
Expand Down Expand Up @@ -75,6 +76,11 @@ public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) response.hostAddress();
}

@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
return ReactorNetty4BaseHttpChannel.get(request, name, clazz);
}

FullHttpResponse createResponse(HttpResponse response) {
return (FullHttpResponse) response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpContent;
Expand Down Expand Up @@ -123,6 +124,11 @@ public void subscribe(Subscriber<? super HttpChunk> subscriber) {
receiver.subscribe(subscriber);
}

@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
return ReactorNetty4BaseHttpChannel.get(request, name, clazz);
}

private static HttpContent createContent(HttpResponse response) {
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response;
return new DefaultHttpContent(fullHttpResponse.content());
Expand Down
Loading