Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {

private final Channel channel;
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
private boolean requested = false;
private boolean requested = true;
Comment thread
mhl-b marked this conversation as resolved.
Outdated
private boolean hasLast = false;
private HttpBody.ChunkHandler handler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.ThreadWatchdog;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final TLSConfig tlsConfig;
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;
private final ThreadWatchdog threadWatchdog;
private final int readTimeoutMillis;

Expand Down Expand Up @@ -134,6 +136,7 @@ public Netty4HttpServerTransport(
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.threadWatchdog = networkService.getThreadWatchdog();
this.enabled = new IncrementalBulkService.Enabled(clusterSettings);

this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);

Expand Down Expand Up @@ -279,7 +282,7 @@ public void onException(HttpChannel channel, Exception cause) {
}

public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator);
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled);
}

static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
Expand All @@ -292,19 +295,22 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
private final TLSConfig tlsConfig;
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;

protected HttpChannelHandler(
final Netty4HttpServerTransport transport,
final HttpHandlingSettings handlingSettings,
final TLSConfig tlsConfig,
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
@Nullable final HttpValidator httpValidator
@Nullable final HttpValidator httpValidator,
IncrementalBulkService.Enabled enabled
) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.tlsConfig = tlsConfig;
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.enabled = enabled;
}

@Override
Expand Down Expand Up @@ -365,7 +371,10 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
);
}
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
handlingSettings.maxContentLength(),
httpPreRequest -> enabled.incrementalBulkEnabled() && httpPreRequest.uri().contains("_bulk") == false
Comment thread
mhl-b marked this conversation as resolved.
Outdated
);
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline()
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
Expand Down Expand Up @@ -418,7 +419,8 @@ public ChannelHandler configureServerChannelHandler() {
handlingSettings,
TLSConfig.noTLS(),
null,
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null),
new IncrementalBulkService.Enabled(clusterSettings)
) {
@Override
protected void initChannel(Channel ch) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {

@SuppressWarnings("unchecked")
public void testIncrementalBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"write.wait_for_active_shards": 2
}
}
}""");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
for (int i = 0; i < 1000; i++) {
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":").append(i).append("}\n");
}
bulk.append("\r\n");

successfulIndexingRequest.setJsonEntity(bulk.toString());

final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Map<String, Object> responseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
indexSuccessFul.getEntity().getContent(),
true
);

assertFalse((Boolean) responseMap.get("errors"));
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1000));
}

public void testIncrementalMalformed() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"write.wait_for_active_shards": 2
}
}
}""");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":1}\n");
bulk.append("{}\n");
bulk.append("\r\n");

bulkRequest.setJsonEntity(bulk.toString());

expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.bulk.SimulateBulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
Expand Down Expand Up @@ -447,6 +448,7 @@ public class ActionModule extends AbstractModule {
private final List<ActionPlugin> actionPlugins;
private final Map<String, ActionHandler<?, ?>> actions;
private final ActionFilters actionFilters;
private final IncrementalBulkService bulkService;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
private final RestController restController;
Expand Down Expand Up @@ -475,7 +477,8 @@ public ActionModule(
ClusterService clusterService,
RerouteService rerouteService,
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
RestExtension restExtension
RestExtension restExtension,
IncrementalBulkService bulkService
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand All @@ -487,6 +490,7 @@ public ActionModule(
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
this.bulkService = bulkService;
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
Expand Down Expand Up @@ -927,7 +931,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
registerHandler.accept(new RestCountAction());
registerHandler.accept(new RestTermVectorsAction());
registerHandler.accept(new RestMultiTermVectorsAction());
registerHandler.accept(new RestBulkAction(settings));
registerHandler.accept(new RestBulkAction(settings, bulkService));
registerHandler.accept(new RestUpdateAction());

registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiV
.withRestApiVersion(restApiVersion);
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
final int res = data.indexOf(marker, from);
if (res != -1) {
assert res >= 0;
return res;
}
if (from != data.length()) {
if (from != data.length() && isIncremental == false) {
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
}
return res;
Expand Down Expand Up @@ -136,18 +136,57 @@ public void parse(
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
XContent xContent = xContentType.xContent();
int line = 0;
int from = 0;
byte marker = xContent.bulkSeparator();
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
final Map<String, String> stringDeduplicator = new HashMap<>();

incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
indexRequestConsumer,
updateRequestConsumer,
deleteRequestConsumer,
false,
stringDeduplicator
);
}

public int incrementalParse(
BytesReference data,
String defaultIndex,
String defaultRouting,
FetchSourceContext defaultFetchSourceContext,
String defaultPipeline,
Boolean defaultRequireAlias,
Boolean defaultRequireDataStream,
Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer,
boolean isIncremental,
Map<String, String> stringDeduplicator
) throws IOException {
XContent xContent = xContentType.xContent();
byte marker = xContent.bulkSeparator();
boolean typesDeprecationLogged = false;

int line = 0;
int from = 0;
int consumed = 0;

while (true) {
int nextMarker = findNextMarker(marker, from, data);
int nextMarker = findNextMarker(marker, from, data, isIncremental);
if (nextMarker == -1) {
break;
}
Expand Down Expand Up @@ -332,8 +371,9 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
);
consumed = from;
} else {
nextMarker = findNextMarker(marker, from, data);
nextMarker = findNextMarker(marker, from, data, isIncremental);
if (nextMarker == -1) {
break;
}
Expand Down Expand Up @@ -406,9 +446,11 @@ public void parse(
}
// move pointers
from = nextMarker + 1;
consumed = from;
}
}
}
return isIncremental ? consumed : from;
}

@UpdateForV9
Expand Down
Loading