Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public void testIndexMissingBody() throws IOException {
assertResponseException(responseException, "request body is required");
}

@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have this change in my draft of HttpObjectAggregator removal. It should address no-content partially, for known length I use content-length header, for chunked there is no way to tell so it's always true.

https://github.com/elastic/elasticsearch/pull/112120/files#diff-b6d89d18f95a49d731741f926ee22d01f0dd8039de82382e68c1862e19f22b04R282-R297

public void testBulkMissingBody() throws IOException {
ResponseException responseException = expectThrows(
ResponseException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;

Expand Down Expand Up @@ -51,6 +52,8 @@ protected boolean addMockHttpTransport() {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
.put(RestBulkAction.INCREMENTAL_BULK.getKey(), false)
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
private boolean aggregating = true;
private boolean ignoreContentAfterContinueResponse = false;

public Netty4HttpAggregator(int maxContentLength) {
this(maxContentLength, IGNORE_TEST);
}

public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
super(maxContentLength);
this.decider = decider;
Expand All @@ -49,7 +45,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
assert msg instanceof HttpObject;
if (msg instanceof HttpRequest request) {
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
aggregating = decider.test(preReq);
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.ThreadWatchdog;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final TLSConfig tlsConfig;
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;
private final ThreadWatchdog threadWatchdog;
private final int readTimeoutMillis;

Expand Down Expand Up @@ -134,6 +136,7 @@ public Netty4HttpServerTransport(
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.threadWatchdog = networkService.getThreadWatchdog();
this.enabled = new IncrementalBulkService.Enabled(clusterSettings);

this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);

Expand Down Expand Up @@ -279,7 +282,7 @@ public void onException(HttpChannel channel, Exception cause) {
}

public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator);
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled);
}

static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
Expand All @@ -292,19 +295,22 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
private final TLSConfig tlsConfig;
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;

protected HttpChannelHandler(
final Netty4HttpServerTransport transport,
final HttpHandlingSettings handlingSettings,
final TLSConfig tlsConfig,
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
@Nullable final HttpValidator httpValidator
@Nullable final HttpValidator httpValidator,
IncrementalBulkService.Enabled enabled
) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.tlsConfig = tlsConfig;
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.enabled = enabled;
}

@Override
Expand Down Expand Up @@ -365,7 +371,13 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
);
}
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
handlingSettings.maxContentLength(),
httpPreRequest -> enabled.get() == false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure of the importance of this being in sync with the enabled flag checked in the rest-bulk-handler. However, it seems they could flip at different times, since we register these independently and they each add a listener for cluster settings. I wonder if we can fix that. I realize it is only an issue when turning it on/off, but would be nice to fix anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment to the feature flag ticket and will address it as a follow-up. I agree with the concern.

https://elasticco.atlassian.net/browse/ES-9262

|| (httpPreRequest.uri().contains("_bulk") == false
|| httpPreRequest.uri().contains("_bulk_update")
|| httpPreRequest.uri().contains("/_xpack/monitoring/_bulk"))
);
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline()
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
Expand Down Expand Up @@ -418,7 +419,8 @@ public ChannelHandler configureServerChannelHandler() {
handlingSettings,
TLSConfig.noTLS(),
null,
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null),
new IncrementalBulkService.Enabled(clusterSettings)
) {
@Override
protected void initChannel(Channel ch) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {

@SuppressWarnings("unchecked")
public void testIncrementalBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"write.wait_for_active_shards": 2
}
}
}""");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
+ "{\"field\":1}\n"
+ "\r\n";

firstBulkRequest.setJsonEntity(bulkBody);

final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
for (int i = 0; i < 1000; i++) {
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":").append(i).append("}\n");
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
++updates;
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
}
}
bulk.append("\r\n");

bulkRequest.setJsonEntity(bulk.toString());

final Response bulkResponse = getRestClient().performRequest(bulkRequest);
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Map<String, Object> responseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
bulkResponse.getEntity().getContent(),
true
);

assertFalse((Boolean) responseMap.get("errors"));
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
}

public void testIncrementalMalformed() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"write.wait_for_active_shards": 2
}
}
}""");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":1}\n");
bulk.append("{}\n");
bulk.append("\r\n");

bulkRequest.setJsonEntity(bulk.toString());

expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.bulk.SimulateBulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
Expand Down Expand Up @@ -447,6 +448,7 @@ public class ActionModule extends AbstractModule {
private final List<ActionPlugin> actionPlugins;
private final Map<String, ActionHandler<?, ?>> actions;
private final ActionFilters actionFilters;
private final IncrementalBulkService bulkService;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
private final RestController restController;
Expand Down Expand Up @@ -475,7 +477,8 @@ public ActionModule(
ClusterService clusterService,
RerouteService rerouteService,
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
RestExtension restExtension
RestExtension restExtension,
IncrementalBulkService bulkService
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand All @@ -487,6 +490,7 @@ public ActionModule(
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
this.bulkService = bulkService;
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
Expand Down Expand Up @@ -927,7 +931,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
registerHandler.accept(new RestCountAction());
registerHandler.accept(new RestTermVectorsAction());
registerHandler.accept(new RestMultiTermVectorsAction());
registerHandler.accept(new RestBulkAction(settings));
registerHandler.accept(new RestBulkAction(settings, bulkService));
registerHandler.accept(new RestUpdateAction());

registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiV
.withRestApiVersion(restApiVersion);
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
final int res = data.indexOf(marker, from);
if (res != -1) {
assert res >= 0;
return res;
}
if (from != data.length()) {
if (from != data.length() && isIncremental == false) {
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
}
return res;
Expand Down Expand Up @@ -136,18 +136,57 @@ public void parse(
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
XContent xContent = xContentType.xContent();
int line = 0;
int from = 0;
byte marker = xContent.bulkSeparator();
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
final Map<String, String> stringDeduplicator = new HashMap<>();

incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
indexRequestConsumer,
updateRequestConsumer,
deleteRequestConsumer,
false,
stringDeduplicator
);
}

public int incrementalParse(
BytesReference data,
String defaultIndex,
String defaultRouting,
FetchSourceContext defaultFetchSourceContext,
String defaultPipeline,
Boolean defaultRequireAlias,
Boolean defaultRequireDataStream,
Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer,
boolean isIncremental,
Map<String, String> stringDeduplicator
) throws IOException {
XContent xContent = xContentType.xContent();
byte marker = xContent.bulkSeparator();
boolean typesDeprecationLogged = false;

int line = 0;
int from = 0;
int consumed = 0;

while (true) {
int nextMarker = findNextMarker(marker, from, data);
int nextMarker = findNextMarker(marker, from, data, isIncremental);
if (nextMarker == -1) {
break;
}
Expand Down Expand Up @@ -332,8 +371,9 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
);
consumed = from;
} else {
nextMarker = findNextMarker(marker, from, data);
nextMarker = findNextMarker(marker, from, data, isIncremental);
if (nextMarker == -1) {
break;
}
Expand Down Expand Up @@ -406,9 +446,11 @@ public void parse(
}
// move pointers
from = nextMarker + 1;
consumed = from;
}
}
}
return isIncremental ? consumed : from;
}

@UpdateForV9
Expand Down
Loading