Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsHandler;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioSocketChannel;
Expand All @@ -50,21 +52,25 @@
import java.util.List;
import java.util.function.BiConsumer;

import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;

public class HttpReadWriteHandler implements ReadWriteHandler {

private final NettyAdaptor adaptor;
private final NioSocketChannel nioChannel;
private final NioHttpServerTransport transport;
private final HttpHandlingSettings settings;
private final NamedXContentRegistry xContentRegistry;
private final NioCorsConfig corsConfig;
private final ThreadContext threadContext;

HttpReadWriteHandler(NioSocketChannel nioChannel, NioHttpServerTransport transport, HttpHandlingSettings settings,
NamedXContentRegistry xContentRegistry, ThreadContext threadContext) {
NamedXContentRegistry xContentRegistry, NioCorsConfig corsConfig, ThreadContext threadContext) {
this.nioChannel = nioChannel;
this.transport = transport;
this.settings = settings;
this.xContentRegistry = xContentRegistry;
this.corsConfig = corsConfig;
this.threadContext = threadContext;

List<ChannelHandler> handlers = new ArrayList<>(5);
Expand All @@ -78,6 +84,9 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
if (settings.isCompression()) {
handlers.add(new HttpContentCompressor(settings.getCompressionLevel()));
}
if (settings.isCorsEnabled()) {
handlers.add(new NioCorsHandler(corsConfig));
}
handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));

adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
Expand Down Expand Up @@ -178,7 +187,7 @@ private void handleRequest(Object msg) {
int sequence = pipelinedRequest.getSequence();
BigArrays bigArrays = transport.getBigArrays();
try {
innerChannel = new NioHttpChannel(nioChannel, bigArrays, httpRequest, sequence, settings, threadContext);
innerChannel = new NioHttpChannel(nioChannel, bigArrays, httpRequest, sequence, settings, corsConfig, threadContext);
} catch (final IllegalArgumentException e) {
if (badRequestCause == null) {
badRequestCause = e;
Expand All @@ -191,7 +200,7 @@ private void handleRequest(Object msg) {
Collections.emptyMap(), // we are going to dispatch the request as a bad request, drop all parameters
copiedRequest.uri(),
copiedRequest);
innerChannel = new NioHttpChannel(nioChannel, bigArrays, innerRequest, sequence, settings, threadContext);
innerChannel = new NioHttpChannel(nioChannel, bigArrays, innerRequest, sequence, settings, corsConfig, threadContext);
}
channel = innerChannel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsHandler;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestResponse;
Expand All @@ -58,17 +60,19 @@ public class NioHttpChannel extends AbstractRestChannel {

private final BigArrays bigArrays;
private final int sequence;
private final NioCorsConfig corsConfig;
private final ThreadContext threadContext;
private final FullHttpRequest nettyRequest;
private final NioSocketChannel nioChannel;
private final boolean resetCookies;

NioHttpChannel(NioSocketChannel nioChannel, BigArrays bigArrays, NioHttpRequest request, int sequence,
HttpHandlingSettings settings, ThreadContext threadContext) {
HttpHandlingSettings settings, NioCorsConfig corsConfig, ThreadContext threadContext) {
super(request, settings.getDetailedErrorsEnabled());
this.nioChannel = nioChannel;
this.bigArrays = bigArrays;
this.sequence = sequence;
this.corsConfig = corsConfig;
this.threadContext = threadContext;
this.nettyRequest = request.getRequest();
this.resetCookies = settings.isResetCookies();
Expand All @@ -87,6 +91,8 @@ public void sendResponse(RestResponse response) {
}
resp.setStatus(getStatus(response.status()));

NioCorsHandler.setCorsResponseHeaders(nettyRequest, resp, corsConfig);

String opaque = nettyRequest.headers().get("X-Opaque-Id");
if (opaque != null) {
setHeaderField(resp, "X-Opaque-Id", opaque);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.http.nio;

import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.timeout.ReadTimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -43,6 +45,8 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.AbstractHttpServerTransport;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.BytesChannelContext;
Expand All @@ -56,6 +60,7 @@
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.SocketEventHandler;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand All @@ -64,15 +69,23 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Pattern;

import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_MAX_AGE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION_LEVEL;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED;
Expand All @@ -86,6 +99,7 @@
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
import static org.elasticsearch.http.nio.cors.NioCorsHandler.ANY_ORIGIN;

public class NioHttpServerTransport extends AbstractHttpServerTransport {

Expand Down Expand Up @@ -115,6 +129,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private final Set<NioSocketChannel> socketChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private NioGroup nioGroup;
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;

public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, HttpServerTransport.Dispatcher dispatcher) {
Expand All @@ -136,6 +151,7 @@ public NioHttpServerTransport(Settings settings, NetworkService networkService,
SETTING_HTTP_COMPRESSION_LEVEL.get(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings),
pipeliningMaxEvents);
this.corsConfig = buildCorsConfig(settings);

this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
Expand Down Expand Up @@ -279,6 +295,38 @@ protected void nonChannelExceptionCaught(Exception ex) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), ex);
}

static NioCorsConfig buildCorsConfig(Settings settings) {
if (SETTING_CORS_ENABLED.get(settings) == false) {
return NioCorsConfigBuilder.forOrigins().disable().build();
}
String origin = SETTING_CORS_ALLOW_ORIGIN.get(settings);
final NioCorsConfigBuilder builder;
if (Strings.isNullOrEmpty(origin)) {
builder = NioCorsConfigBuilder.forOrigins();
} else if (origin.equals(ANY_ORIGIN)) {
builder = NioCorsConfigBuilder.forAnyOrigin();
} else {
Pattern p = RestUtils.checkCorsSettingForRegex(origin);
if (p == null) {
builder = NioCorsConfigBuilder.forOrigins(RestUtils.corsSettingAsArray(origin));
} else {
builder = NioCorsConfigBuilder.forPattern(p);
}
}
if (SETTING_CORS_ALLOW_CREDENTIALS.get(settings)) {
builder.allowCredentials();
}
String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ",");
HttpMethod[] methods = Arrays.stream(strMethods)
.map(HttpMethod::valueOf)
.toArray(HttpMethod[]::new);
return builder.allowedRequestMethods(methods)
.maxAge(SETTING_CORS_MAX_AGE.get(settings))
.allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ","))
.shortCircuit()
.build();
}

private void closeChannels(List<NioChannel> channels) {
List<ActionFuture<Void>> futures = new ArrayList<>(channels.size());

Expand Down Expand Up @@ -315,7 +363,7 @@ private HttpChannelFactory() {
public NioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException {
NioSocketChannel nioChannel = new NioSocketChannel(channel);
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
httpHandlingSettings, xContentRegistry, threadPool.getThreadContext());
httpHandlingSettings, xContentRegistry, corsConfig, threadPool.getThreadContext());
Consumer<Exception> exceptionHandler = (e) -> exceptionCaught(nioChannel, e);
SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline,
InboundChannelBuffer.allocatingInstance());
Expand Down
Loading