diff --git a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java index ba305333f2fb9..6fea09d249168 100644 --- a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java +++ b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java @@ -25,6 +25,7 @@ public void testIndexMissingBody() throws IOException { assertResponseException(responseException, "request body is required"); } + @AwaitsFix(bugUrl = "need to decide how to handle this scenario") public void testBulkMissingBody() throws IOException { ResponseException responseException = expectThrows( ResponseException.class, diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java index 0dadc159c41c7..12699442e23bf 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; @@ -51,6 +52,8 @@ protected boolean addMockHttpTransport() { protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies + .put(RestBulkAction.INCREMENTAL_BULK.getKey(), false) .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT) .build(); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java index 031e803737ee8..d7c5fc0963224 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -35,10 +35,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator { private boolean aggregating = true; private boolean ignoreContentAfterContinueResponse = false; - public Netty4HttpAggregator(int maxContentLength) { - this(maxContentLength, IGNORE_TEST); - } - public Netty4HttpAggregator(int maxContentLength, Predicate decider) { super(maxContentLength); this.decider = decider; @@ -49,7 +45,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception assert msg instanceof HttpObject; if (msg instanceof HttpRequest request) { var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request); - aggregating = decider.test(preReq); + aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq); } if (aggregating || msg instanceof FullHttpRequest) { super.channelRead(ctx, msg); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 024391af46b62..4bb6370af3478 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.ThreadWatchdog; @@ -96,6 +97,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final TLSConfig tlsConfig; private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate; private final HttpValidator httpValidator; + private final IncrementalBulkService.Enabled enabled; private final ThreadWatchdog threadWatchdog; private final int readTimeoutMillis; @@ -134,6 +136,7 @@ public Netty4HttpServerTransport( this.acceptChannelPredicate = acceptChannelPredicate; this.httpValidator = httpValidator; this.threadWatchdog = networkService.getThreadWatchdog(); + this.enabled = new IncrementalBulkService.Enabled(clusterSettings); this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); @@ -279,7 +282,7 @@ public void onException(HttpChannel channel, Exception cause) { } public ChannelHandler configureServerChannelHandler() { - return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator); + return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled); } static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel"); @@ -292,19 +295,22 @@ protected static class HttpChannelHandler extends ChannelInitializer { private final TLSConfig tlsConfig; private final BiPredicate acceptChannelPredicate; private final HttpValidator httpValidator; + private final IncrementalBulkService.Enabled enabled; protected HttpChannelHandler( final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings, final TLSConfig tlsConfig, @Nullable final BiPredicate acceptChannelPredicate, - @Nullable final HttpValidator httpValidator + @Nullable final HttpValidator httpValidator, + IncrementalBulkService.Enabled enabled ) { this.transport = transport; this.handlingSettings = handlingSettings; this.tlsConfig = tlsConfig; this.acceptChannelPredicate = acceptChannelPredicate; this.httpValidator = httpValidator; + this.enabled = enabled; } @Override @@ -365,7 +371,13 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { ); } // combines the HTTP message pieces into a single full HTTP request (with headers and body) - final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength()); + final HttpObjectAggregator aggregator = new Netty4HttpAggregator( + handlingSettings.maxContentLength(), + httpPreRequest -> enabled.get() == false + || (httpPreRequest.uri().contains("_bulk") == false + || httpPreRequest.uri().contains("_bulk_update") + || httpPreRequest.uri().contains("/_xpack/monitoring/_bulk")) + ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() .addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 9e213e6468356..ad25c35283cac 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchWrapperException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.Request; @@ -418,7 +419,8 @@ public ChannelHandler configureServerChannelHandler() { handlingSettings, TLSConfig.noTLS(), null, - randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null) + randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null), + new IncrementalBulkService.Enabled(clusterSettings) ) { @Override protected void initChannel(Channel ch) throws Exception { diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java new file mode 100644 index 0000000000000..fcb0c1044d22f --- /dev/null +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.rest.RestStatus.OK; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) +public class IncrementalBulkRestIT extends HttpSmokeTestCase { + + @SuppressWarnings("unchecked") + public void testIncrementalBulk() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); + + // index documents for the rollup job + String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + + "{\"field\":1}\n" + + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n" + + "{\"field\":1}\n" + + "\r\n"; + + firstBulkRequest.setJsonEntity(bulkBody); + + final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request bulkRequest = new Request("POST", "/index_name/_bulk"); + + // index documents for the rollup job + final StringBuilder bulk = new StringBuilder(); + bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"); + int updates = 0; + for (int i = 0; i < 1000; i++) { + bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); + bulk.append("{\"field\":").append(i).append("}\n"); + if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) { + ++updates; + bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"); + bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n"); + } + } + bulk.append("\r\n"); + + bulkRequest.setJsonEntity(bulk.toString()); + + final Response bulkResponse = getRestClient().performRequest(bulkRequest); + assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + Map responseMap = XContentHelper.convertToMap( + JsonXContent.jsonXContent, + bulkResponse.getEntity().getContent(), + true + ); + + assertFalse((Boolean) responseMap.get("errors")); + assertThat(((List) responseMap.get("items")).size(), equalTo(1001 + updates)); + } + + public void testIncrementalMalformed() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request bulkRequest = new Request("POST", "/index_name/_bulk"); + + // index documents for the rollup job + final StringBuilder bulk = new StringBuilder(); + bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); + bulk.append("{\"field\":1}\n"); + bulk.append("{}\n"); + bulk.append("\r\n"); + + bulkRequest.setJsonEntity(bulk.toString()); + + expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 37a33eab4e4e8..c4bc4ebbfae52 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -159,6 +159,7 @@ import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction; import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.bulk.SimulateBulkAction; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; @@ -447,6 +448,7 @@ public class ActionModule extends AbstractModule { private final List actionPlugins; private final Map> actions; private final ActionFilters actionFilters; + private final IncrementalBulkService bulkService; private final AutoCreateIndex autoCreateIndex; private final DestructiveOperations destructiveOperations; private final RestController restController; @@ -475,7 +477,8 @@ public ActionModule( ClusterService clusterService, RerouteService rerouteService, List> reservedStateHandlers, - RestExtension restExtension + RestExtension restExtension, + IncrementalBulkService bulkService ) { this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -487,6 +490,7 @@ public ActionModule( this.threadPool = threadPool; actions = setupActions(actionPlugins); actionFilters = setupActionFilters(actionPlugins); + this.bulkService = bulkService; autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices); destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set headers = Stream.concat( @@ -927,7 +931,7 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< registerHandler.accept(new RestCountAction()); registerHandler.accept(new RestTermVectorsAction()); registerHandler.accept(new RestMultiTermVectorsAction()); - registerHandler.accept(new RestBulkAction(settings)); + registerHandler.accept(new RestBulkAction(settings, bulkService)); registerHandler.accept(new RestUpdateAction()); registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature)); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 898bfd0e1652c..0835a43d94ccc 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -85,13 +85,13 @@ public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiV .withRestApiVersion(restApiVersion); } - private static int findNextMarker(byte marker, int from, BytesReference data) { + private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) { final int res = data.indexOf(marker, from); if (res != -1) { assert res >= 0; return res; } - if (from != data.length()) { + if (from != data.length() && isIncremental == false) { throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]"); } return res; @@ -136,18 +136,57 @@ public void parse( Consumer updateRequestConsumer, Consumer deleteRequestConsumer ) throws IOException { - XContent xContent = xContentType.xContent(); - int line = 0; - int from = 0; - byte marker = xContent.bulkSeparator(); // Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to // deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it // reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request. final Map stringDeduplicator = new HashMap<>(); + + incrementalParse( + data, + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + xContentType, + indexRequestConsumer, + updateRequestConsumer, + deleteRequestConsumer, + false, + stringDeduplicator + ); + } + + public int incrementalParse( + BytesReference data, + String defaultIndex, + String defaultRouting, + FetchSourceContext defaultFetchSourceContext, + String defaultPipeline, + Boolean defaultRequireAlias, + Boolean defaultRequireDataStream, + Boolean defaultListExecutedPipelines, + boolean allowExplicitIndex, + XContentType xContentType, + BiConsumer indexRequestConsumer, + Consumer updateRequestConsumer, + Consumer deleteRequestConsumer, + boolean isIncremental, + Map stringDeduplicator + ) throws IOException { + XContent xContent = xContentType.xContent(); + byte marker = xContent.bulkSeparator(); boolean typesDeprecationLogged = false; + int line = 0; + int from = 0; + int consumed = 0; + while (true) { - int nextMarker = findNextMarker(marker, from, data); + int nextMarker = findNextMarker(marker, from, data, isIncremental); if (nextMarker == -1) { break; } @@ -332,8 +371,9 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) ); + consumed = from; } else { - nextMarker = findNextMarker(marker, from, data); + nextMarker = findNextMarker(marker, from, data, isIncremental); if (nextMarker == -1) { break; } @@ -406,9 +446,11 @@ public void parse( } // move pointers from = nextMarker + 1; + consumed = from; } } } + return isIncremental ? consumed : from; } @UpdateForV9 diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index c3ea345398c96..27ad99b012829 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -13,25 +13,56 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.rest.action.document.RestBulkAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; public class IncrementalBulkService { private final Client client; private final IndexingPressure indexingPressure; + private final ThreadContext threadContext; + private final Supplier enabled; - public IncrementalBulkService(Client client, IndexingPressure indexingPressure) { + public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) { + this(client, indexingPressure, threadContext, new Enabled()); + } + + public IncrementalBulkService( + Client client, + IndexingPressure indexingPressure, + ThreadContext threadContext, + ClusterSettings clusterSettings + ) { + this(client, indexingPressure, threadContext, new Enabled(clusterSettings)); + } + + public IncrementalBulkService( + Client client, + IndexingPressure indexingPressure, + ThreadContext threadContext, + Supplier enabled + ) { this.client = client; this.indexingPressure = indexingPressure; + this.threadContext = threadContext; + this.enabled = enabled; + } + + public boolean incrementalBulkEnabled() { + return enabled.get(); } public Handler newBulkRequest() { @@ -39,14 +70,32 @@ public Handler newBulkRequest() { } public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { - return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh); + return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh); } - public static class Handler { + public static class Enabled implements Supplier { + + private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true); + + public Enabled() {} + + public Enabled(ClusterSettings clusterSettings) { + incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK)); + clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set); + } + + @Override + public Boolean get() { + return incrementalBulksEnabled.get(); + } + } + + public static class Handler implements Releasable { public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true); private final Client client; + private final ThreadContext threadContext; private final IndexingPressure indexingPressure; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; @@ -56,17 +105,21 @@ public static class Handler { private final ArrayList responses = new ArrayList<>(2); private boolean globalFailure = false; private boolean incrementalRequestSubmitted = false; + private ThreadContext.StoredContext requestContext; private Exception bulkActionLevelFailure = null; private BulkRequest bulkRequest = null; - private Handler( + protected Handler( Client client, + ThreadContext threadContext, IndexingPressure indexingPressure, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh ) { this.client = client; + this.threadContext = threadContext; + this.requestContext = threadContext.newStoredContext(); this.indexingPressure = indexingPressure; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; @@ -84,30 +137,34 @@ public void addItems(List> items, Releasable releasable, Runn if (shouldBackOff()) { final boolean isFirstRequest = incrementalRequestSubmitted == false; incrementalRequestSubmitted = true; - - client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { - - @Override - public void onResponse(BulkResponse bulkResponse) { - responses.add(bulkResponse); - releaseCurrentReferences(); - createNewBulkRequest( - new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true) - ); - } - - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - } - }, nextItems)); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + requestContext.restore(); + client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { + + @Override + public void onResponse(BulkResponse bulkResponse) { + responses.add(bulkResponse); + releaseCurrentReferences(); + createNewBulkRequest( + new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true) + ); + } + + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + } + }, () -> { + requestContext = threadContext.newStoredContext(); + nextItems.run(); + })); + } } else { nextItems.run(); } } else { nextItems.run(); } - } } @@ -122,23 +179,26 @@ public void lastItems(List> items, Releasable releasable, Act } else { assert bulkRequest != null; if (internalAddItems(items, releasable)) { - client.bulk(bulkRequest, new ActionListener<>() { + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + requestContext.restore(); + client.bulk(bulkRequest, new ActionListener<>() { - private final boolean isFirstRequest = incrementalRequestSubmitted == false; + private final boolean isFirstRequest = incrementalRequestSubmitted == false; - @Override - public void onResponse(BulkResponse bulkResponse) { - responses.add(bulkResponse); - releaseCurrentReferences(); - listener.onResponse(combineResponses()); - } + @Override + public void onResponse(BulkResponse bulkResponse) { + responses.add(bulkResponse); + releaseCurrentReferences(); + listener.onResponse(combineResponses()); + } - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - errorResponse(listener); - } - }); + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + errorResponse(listener); + } + }); + } } else { errorResponse(listener); } @@ -239,5 +299,10 @@ private BulkResponse combineResponses() { return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis); } + + @Override + public void close() { + // TODO: Implement + } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index d2b731c3ab9ce..d8c162e20eff0 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -112,6 +112,7 @@ import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchService; @@ -241,6 +242,7 @@ public void apply(Settings value, Settings current, Settings previous) { Metadata.SETTING_READ_ONLY_SETTING, Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE, + RestBulkAction.INCREMENTAL_BULK, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index fdfc1d2698243..d755586e23fdc 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -889,6 +889,14 @@ private void construct( .map(TerminationHandlerProvider::handler); terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null); + final IndexingPressure indexingLimits = new IndexingPressure(settings); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService( + client, + indexingLimits, + threadPool.getThreadContext(), + clusterService.getClusterSettings() + ); + ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -914,7 +922,8 @@ private void construct( metadataCreateIndexService, dataStreamGlobalRetentionSettings ), - pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll) + pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll), + incrementalBulkService ); modules.add(actionModule); @@ -977,8 +986,6 @@ private void construct( SearchExecutionStatsCollector.makeWrapper(responseCollectorService) ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); - final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits); SnapshotsService snapshotsService = new SnapshotsService( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index d213d4410c07c..27be8110919b0 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -10,21 +10,38 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestParser; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; +import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -39,12 +56,21 @@ */ @ServerlessScope(Scope.PUBLIC) public class RestBulkAction extends BaseRestHandler { + public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; + public static final Setting INCREMENTAL_BULK = boolSetting( + "rest.incremental_bulk", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); private final boolean allowExplicitIndex; + private final IncrementalBulkService bulkHandler; - public RestBulkAction(Settings settings) { + public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); + this.bulkHandler = bulkHandler; } @Override @@ -66,38 +92,181 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { - request.param("type"); + if (bulkHandler.incrementalBulkEnabled() == false) { + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { + request.param("type"); + } + BulkRequest bulkRequest = new BulkRequest(); + String defaultIndex = request.param("index"); + String defaultRouting = request.param("routing"); + FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); + String defaultPipeline = request.param("pipeline"); + boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + } + Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); + boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); + bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); + bulkRequest.setRefreshPolicy(request.param("refresh")); + bulkRequest.add( + request.requiredContent(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + request.getXContentType(), + request.getRestApiVersion() + ); + + return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel)); + } else { + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { + request.param("type"); + } + + String waitForActiveShards = request.param("wait_for_active_shards"); + TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); + String refresh = request.param("refresh"); + return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh)); } - BulkRequest bulkRequest = new BulkRequest(); - String defaultIndex = request.param("index"); - String defaultRouting = request.param("routing"); - FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); - String defaultPipeline = request.param("pipeline"); - boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); - String waitForActiveShards = request.param("wait_for_active_shards"); - if (waitForActiveShards != null) { - bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + } + + static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { + + private final boolean allowExplicitIndex; + private final RestRequest request; + + private final Map stringDeduplicator = new HashMap<>(); + private final String defaultIndex; + private final String defaultRouting; + private final FetchSourceContext defaultFetchSourceContext; + private final String defaultPipeline; + private final boolean defaultListExecutedPipelines; + private final Boolean defaultRequireAlias; + private final boolean defaultRequireDataStream; + private final BulkRequestParser parser; + private final Supplier handlerSupplier; + private IncrementalBulkService.Handler handler; + + private volatile RestChannel restChannel; + private boolean isException; + private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); + private final ArrayList> items = new ArrayList<>(4); + + ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier handlerSupplier) { + this.allowExplicitIndex = allowExplicitIndex; + this.request = request; + this.defaultIndex = request.param("index"); + this.defaultRouting = request.param("routing"); + this.defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); + this.defaultPipeline = request.param("pipeline"); + this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); + this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); + this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); + // TODO: Fix type deprecation logging + this.parser = new BulkRequestParser(false, request.getRestApiVersion()); + this.handlerSupplier = handlerSupplier; } - Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); - boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); - bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); - bulkRequest.setRefreshPolicy(request.param("refresh")); - bulkRequest.add( - request.requiredContent(), - defaultIndex, - defaultRouting, - defaultFetchSourceContext, - defaultPipeline, - defaultRequireAlias, - defaultRequireDataStream, - defaultListExecutedPipelines, - allowExplicitIndex, - request.getXContentType(), - request.getRestApiVersion() - ); - return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel)); + @Override + public void accept(RestChannel restChannel) { + this.restChannel = restChannel; + this.handler = handlerSupplier.get(); + request.contentStream().next(); + } + + @Override + public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { + assert handler != null; + assert channel == restChannel; + if (isException) { + chunk.close(); + return; + } + + final BytesReference data; + int bytesConsumed; + try { + unParsedChunks.add(chunk); + + if (unParsedChunks.size() > 1) { + data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); + } else { + data = chunk; + } + + // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in + // BulkRequest#add is fine + bytesConsumed = parser.incrementalParse( + data, + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + request.getXContentType(), + (request, type) -> items.add(request), + items::add, + items::add, + isLast == false, + stringDeduplicator + ); + + } catch (Exception e) { + // TODO: This needs to be better + Releasables.close(handler); + Releasables.close(unParsedChunks); + unParsedChunks.clear(); + new RestToXContentListener<>(channel).onFailure(e); + isException = true; + return; + } + + final ArrayList releasables = accountParsing(bytesConsumed); + if (isLast) { + assert unParsedChunks.isEmpty(); + assert channel != null; + ArrayList> toPass = new ArrayList<>(items); + items.clear(); + handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); + } else if (items.isEmpty() == false) { + ArrayList> toPass = new ArrayList<>(items); + items.clear(); + handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next()); + } else { + assert releasables.isEmpty(); + request.contentStream().next(); + } + } + + @Override + public void close() { + RequestBodyChunkConsumer.super.close(); + } + + private ArrayList accountParsing(int bytesConsumed) { + ArrayList releasables = new ArrayList<>(unParsedChunks.size()); + while (bytesConsumed > 0) { + ReleasableBytesReference reference = unParsedChunks.removeFirst(); + releasables.add(reference); + if (bytesConsumed >= reference.length()) { + bytesConsumed -= reference.length(); + } else { + unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed)); + bytesConsumed = 0; + } + } + return releasables; + } } @Override diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index c015dc6177cad..d12fc0517c47c 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.action; import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.internal.node.NodeClient; @@ -128,7 +129,8 @@ public void testSetupRestHandlerContainsKnownBuiltin() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) ); actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -192,7 +194,8 @@ public String getName() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null)); assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET")); @@ -249,7 +252,8 @@ public List getRestHandlers( mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) ); actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -299,7 +303,8 @@ public void test3rdPartyHandlerIsNotInstalled() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) ) ); assertThat( @@ -340,7 +345,8 @@ public void test3rdPartyRestControllerIsNotInstalled() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) ) ); assertThat( diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index 26087ce5f1f0b..88aa6cff31411 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -14,6 +14,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionModule; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -1176,7 +1177,8 @@ public Collection getRestHeaders() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) ); } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index caeb0f36a1000..66c6660226ab0 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -10,23 +10,37 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpNodeClient; +import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xcontent.XContentType; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; @@ -50,7 +64,10 @@ public void bulk(BulkRequest request, ActionListener listener) { }; final Map params = new HashMap<>(); params.put("pipeline", "timestamps"); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( + new RestBulkAction( + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY), () -> false) + ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} {"field1":"val1"} @@ -82,7 +99,15 @@ public void bulk(BulkRequest request, ActionListener listener) { }; Map params = new HashMap<>(); { - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( + new RestBulkAction( + settings(IndexVersion.current()).build(), + new IncrementalBulkService( + mock(Client.class), + mock(IndexingPressure.class), + new ThreadContext(Settings.EMPTY), + () -> false + ) + ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) .withContent(new BytesArray(""" @@ -103,7 +128,15 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.put("list_executed_pipelines", "true"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( + new RestBulkAction( + settings(IndexVersion.current()).build(), + new IncrementalBulkService( + mock(Client.class), + mock(IndexingPressure.class), + new ThreadContext(Settings.EMPTY), + () -> false + ) + ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) .withContent(new BytesArray(""" @@ -123,7 +156,15 @@ public void bulk(BulkRequest request, ActionListener listener) { } { bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( + new RestBulkAction( + settings(IndexVersion.current()).build(), + new IncrementalBulkService( + mock(Client.class), + mock(IndexingPressure.class), + new ThreadContext(Settings.EMPTY), + () -> false + ) + ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) .withContent(new BytesArray(""" @@ -144,7 +185,15 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.remove("list_executed_pipelines"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( + new RestBulkAction( + settings(IndexVersion.current()).build(), + new IncrementalBulkService( + mock(Client.class), + mock(IndexingPressure.class), + new ThreadContext(Settings.EMPTY), + () -> false + ) + ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) .withContent(new BytesArray(""" @@ -164,4 +213,95 @@ public void bulk(BulkRequest request, ActionListener listener) { } } } + + public void testIncrementalParsing() { + ArrayList> docs = new ArrayList<>(); + AtomicBoolean isLast = new AtomicBoolean(false); + AtomicBoolean next = new AtomicBoolean(false); + + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withMethod(RestRequest.Method.POST) + .withBody(new HttpBody.Stream() { + @Override + public void close() {} + + @Override + public ChunkHandler handler() { + return null; + } + + @Override + public void setHandler(ChunkHandler chunkHandler) {} + + @Override + public void next() { + next.set(true); + } + }) + .withHeaders(Map.of("Content-Type", Collections.singletonList("application/json"))) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + + RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler( + true, + request, + () -> new IncrementalBulkService.Handler(null, new ThreadContext(Settings.EMPTY), null, null, null, null) { + + @Override + public void addItems(List> items, Releasable releasable, Runnable nextItems) { + releasable.close(); + docs.addAll(items); + } + + @Override + public void lastItems(List> items, Releasable releasable, ActionListener listener) { + releasable.close(); + docs.addAll(items); + isLast.set(true); + } + } + ); + + chunkHandler.accept(channel); + ReleasableBytesReference r1 = new ReleasableBytesReference(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n"), () -> {}); + chunkHandler.handleChunk(channel, r1, false); + assertThat(docs, empty()); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + + ReleasableBytesReference r2 = new ReleasableBytesReference(new BytesArray("{\"field\":1}"), () -> {}); + chunkHandler.handleChunk(channel, r2, false); + assertThat(docs, empty()); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + assertTrue(r1.hasReferences()); + assertTrue(r2.hasReferences()); + + ReleasableBytesReference r3 = new ReleasableBytesReference(new BytesArray("\n{\"delete\":"), () -> {}); + chunkHandler.handleChunk(channel, r3, false); + assertThat(docs, hasSize(1)); + assertFalse(next.get()); + assertFalse(isLast.get()); + assertFalse(r1.hasReferences()); + assertFalse(r2.hasReferences()); + assertTrue(r3.hasReferences()); + + ReleasableBytesReference r4 = new ReleasableBytesReference(new BytesArray("{\"_index\":\"test\",\"_id\":\"2\"}}"), () -> {}); + chunkHandler.handleChunk(channel, r4, false); + assertThat(docs, hasSize(1)); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + + ReleasableBytesReference r5 = new ReleasableBytesReference(new BytesArray("\n"), () -> {}); + chunkHandler.handleChunk(channel, r5, true); + assertThat(docs, hasSize(2)); + assertFalse(next.get()); + assertTrue(isLast.get()); + assertFalse(r3.hasReferences()); + assertFalse(r4.hasReferences()); + assertFalse(r5.hasReferences()); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index bcea7f61fe158..88d3a0fd3b45e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -228,6 +228,11 @@ public Builder withContent(BytesReference contentBytes, XContentType xContentTyp return this; } + public Builder withBody(HttpBody body) { + this.content = body; + return this; + } + public Builder withPath(String path) { this.path = path; return this; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index a07a7a3a5dd27..8d580f10e5137 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -822,7 +823,8 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) ); actionModule.initRestHandlers(null, null);