From b5a40549b67a56a0549795e650b0318df43c79e0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 23 Aug 2024 11:00:14 -0400 Subject: [PATCH 01/22] Incremental bulk integration with rest layer WIP --- .../netty4/Netty4HttpRequestBodyStream.java | 2 +- .../netty4/Netty4HttpServerTransport.java | 5 +- .../action/bulk/BulkRequestParser.java | 45 ++++- .../action/bulk/IncrementalBulkService.java | 7 +- .../rest/action/document/RestBulkAction.java | 164 ++++++++++++++---- 5 files changed, 187 insertions(+), 36 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 8497e3ee8a40d..e43d29d7da2ee 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -28,7 +28,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final Channel channel; private final Queue chunkQueue = new ArrayDeque<>(); - private boolean requested = false; + private boolean requested = true; private boolean hasLast = false; private HttpBody.ChunkHandler handler; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 024391af46b62..5fa73147429c6 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -365,7 +365,10 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { ); } // combines the HTTP message pieces into a single full HTTP request (with headers and body) - final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength()); + final HttpObjectAggregator aggregator = new Netty4HttpAggregator( + handlingSettings.maxContentLength(), + httpPreRequest -> httpPreRequest.uri().startsWith("/_bulk") == false + ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() .addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 898bfd0e1652c..25d760b707d20 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -136,16 +136,52 @@ public void parse( Consumer updateRequestConsumer, Consumer deleteRequestConsumer ) throws IOException { - XContent xContent = xContentType.xContent(); - int line = 0; - int from = 0; - byte marker = xContent.bulkSeparator(); // Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to // deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it // reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request. final Map stringDeduplicator = new HashMap<>(); + + incrementalParse( + data, + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + xContentType, + indexRequestConsumer, + updateRequestConsumer, + deleteRequestConsumer, + stringDeduplicator + ); + } + + public int incrementalParse( + BytesReference data, + String defaultIndex, + String defaultRouting, + FetchSourceContext defaultFetchSourceContext, + String defaultPipeline, + Boolean defaultRequireAlias, + Boolean defaultRequireDataStream, + Boolean defaultListExecutedPipelines, + boolean allowExplicitIndex, + XContentType xContentType, + BiConsumer indexRequestConsumer, + Consumer updateRequestConsumer, + Consumer deleteRequestConsumer, + Map stringDeduplicator + ) throws IOException { + XContent xContent = xContentType.xContent(); + byte marker = xContent.bulkSeparator(); boolean typesDeprecationLogged = false; + int line = 0; + int from = 0; + while (true) { int nextMarker = findNextMarker(marker, from, data); if (nextMarker == -1) { @@ -409,6 +445,7 @@ public void parse( } } } + return from; } @UpdateForV9 diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index ce91586cc4426..26514feda6029 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -36,7 +36,7 @@ public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable Ti return new Handler(client, waitForActiveShards, timeout, refresh); } - public static class Handler { + public static class Handler implements Releasable { private final Client client; private final ActiveShardCount waitForActiveShards; @@ -204,5 +204,10 @@ private BulkResponse combineResponses() { return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis); } + + @Override + public void close() { + // TODO: Implement + } } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index d213d4410c07c..7429dd689ae37 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -9,13 +9,17 @@ package org.elasticsearch.rest.action.document; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestParser; import org.elasticsearch.action.bulk.BulkShardRequest; -import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; @@ -23,7 +27,12 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -42,6 +51,7 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; private final boolean allowExplicitIndex; + private volatile IncrementalBulkService bulkHandler; public RestBulkAction(Settings settings) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); @@ -66,38 +76,133 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + // TODO: Move this to CTOR and hook everything up + if (bulkHandler == null) { + bulkHandler = new IncrementalBulkService(client); + } + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { request.param("type"); } - BulkRequest bulkRequest = new BulkRequest(); - String defaultIndex = request.param("index"); - String defaultRouting = request.param("routing"); - FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); - String defaultPipeline = request.param("pipeline"); - boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); - String waitForActiveShards = request.param("wait_for_active_shards"); - if (waitForActiveShards != null) { - bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + + return new ChunkHandler(request); + } + + private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { + + private final RestRequest request; + + private final Map stringDeduplicator = new HashMap<>(); + private final String defaultIndex; + private final String defaultRouting; + private final FetchSourceContext defaultFetchSourceContext; + private final String defaultPipeline; + private final boolean defaultListExecutedPipelines; + private final Boolean defaultRequireAlias; + private final boolean defaultRequireDataStream; + private final BulkRequestParser parser; + private final IncrementalBulkService.Handler handler; + + private volatile RestChannel restChannel; + private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); + private final ArrayList> items = new ArrayList<>(4); + + private ChunkHandler(RestRequest request) { + this.request = request; + this.defaultIndex = request.param("index"); + this.defaultRouting = request.param("routing"); + this.defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); + this.defaultPipeline = request.param("pipeline"); + this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); + this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); + this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); + this.parser = new BulkRequestParser(true, request.getRestApiVersion()); + handler = bulkHandler.newBulkRequest( + request.param("wait_for_active_shards"), + request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), + request.param("refresh") + ); + } + + @Override + public void accept(RestChannel restChannel) throws Exception { + this.restChannel = restChannel; } - Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); - boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); - bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); - bulkRequest.setRefreshPolicy(request.param("refresh")); - bulkRequest.add( - request.requiredContent(), - defaultIndex, - defaultRouting, - defaultFetchSourceContext, - defaultPipeline, - defaultRequireAlias, - defaultRequireDataStream, - defaultListExecutedPipelines, - allowExplicitIndex, - request.getXContentType(), - request.getRestApiVersion() - ); - return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel)); + @Override + public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { + assert channel == restChannel; + + final ReleasableBytesReference data; + try { + // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in + // BulkRequest#add is fine + + unParsedChunks.add(chunk); + + if (unParsedChunks.size() > 1) { + ReleasableBytesReference[] bytesReferences = unParsedChunks.toArray(new ReleasableBytesReference[0]); + data = new ReleasableBytesReference( + CompositeBytesReference.of(bytesReferences), + () -> Releasables.close(bytesReferences) + ); + } else { + data = chunk; + } + + int bytesConsumed = parser.incrementalParse( + data, + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + request.getXContentType(), + (request, type) -> items.add(request), + items::add, + items::add, + stringDeduplicator + ); + + accountParsing(bytesConsumed); + + } catch (IOException e) { + // TODO: Exception Handling + throw new UncheckedIOException(e); + } + + if (isLast) { + assert unParsedChunks.isEmpty(); + assert channel != null; + handler.lastItems(new ArrayList<>(items), data, new RestRefCountedChunkedToXContentListener<>(channel)); + items.clear(); + } else if (items.isEmpty() == false) { + handler.addItems(new ArrayList<>(items), data, () -> request.contentStream().next()); + items.clear(); + } + } + + @Override + public void close() { + Releasables.close(handler); + Releasables.close(unParsedChunks); + RequestBodyChunkConsumer.super.close(); + } + + private void accountParsing(int bytesConsumed) { + while (bytesConsumed > 0) { + ReleasableBytesReference reference = unParsedChunks.removeFirst(); + if (bytesConsumed >= reference.length()) { + bytesConsumed -= reference.length(); + } else { + unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed)); + bytesConsumed = 0; + } + } + } } @Override @@ -107,6 +212,7 @@ public boolean supportsBulkContent() { @Override public boolean allowsUnsafeBuffers() { + // TODO: Does this change with the chunking? return true; } } From a75b31d47127e6477e9b1db22c50017f5350f5f8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 11:33:34 -0600 Subject: [PATCH 02/22] Change --- .../elasticsearch/http/netty4/Netty4HttpServerTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 5fa73147429c6..66d36a8b4f835 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -367,7 +367,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { // combines the HTTP message pieces into a single full HTTP request (with headers and body) final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), - httpPreRequest -> httpPreRequest.uri().startsWith("/_bulk") == false + httpPreRequest -> httpPreRequest.uri().contains("/_bulk") == false ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() From e26df4aebdc31f912890fb536aab13783fd5352f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 12:25:43 -0600 Subject: [PATCH 03/22] Header --- .../elasticsearch/action/ActionModule.java | 2 +- .../action/bulk/IncrementalBulkService.java | 19 ++++++++++++++++--- .../elasticsearch/node/NodeConstruction.java | 2 +- .../rest/action/document/RestBulkAction.java | 9 ++++++++- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 37a33eab4e4e8..57d39b17a44cd 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -927,7 +927,7 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< registerHandler.accept(new RestCountAction()); registerHandler.accept(new RestTermVectorsAction()); registerHandler.accept(new RestMultiTermVectorsAction()); - registerHandler.accept(new RestBulkAction(settings)); + registerHandler.accept(new RestBulkAction(settings, threadPool.getThreadContext())); registerHandler.accept(new RestUpdateAction()); registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature)); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 26514feda6029..32a41a40f995b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -23,9 +24,11 @@ public class IncrementalBulkService { private final Client client; + private final ThreadContext threadContext; - public IncrementalBulkService(Client client) { + public IncrementalBulkService(Client client, ThreadContext threadContext) { this.client = client; + this.threadContext = threadContext; } public Handler newBulkRequest() { @@ -33,12 +36,13 @@ public Handler newBulkRequest() { } public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { - return new Handler(client, waitForActiveShards, timeout, refresh); + return new Handler(client, threadContext, waitForActiveShards, timeout, refresh); } public static class Handler implements Releasable { private final Client client; + private final ThreadContext threadContext; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; private final String refresh; @@ -50,8 +54,15 @@ public static class Handler implements Releasable { private Exception bulkActionLevelFailure = null; private BulkRequest bulkRequest = null; - private Handler(Client client, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { + private Handler( + Client client, + ThreadContext threadContext, + @Nullable String waitForActiveShards, + @Nullable TimeValue timeout, + @Nullable String refresh + ) { this.client = client; + this.threadContext = threadContext; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; @@ -103,6 +114,8 @@ public void lastItems(List> items, Releasable releasable, Act assert bulkRequest != null; internalAddItems(items, releasable); + threadContext.addResponseHeader("X-elastic-product", "Elasticsearch"); + client.bulk(bulkRequest, new ActionListener<>() { private final boolean isFirstRequest = incrementalRequestSubmitted == false; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 2f76daa7e42ed..b59d1d23fea92 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -965,7 +965,7 @@ record PluginServiceInstances( ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); SnapshotsService snapshotsService = new SnapshotsService( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 7429dd689ae37..65634aca6f387 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; @@ -51,10 +52,16 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; private final boolean allowExplicitIndex; + private final ThreadContext threadContext; private volatile IncrementalBulkService bulkHandler; public RestBulkAction(Settings settings) { + this(settings, new ThreadContext(settings)); + } + + public RestBulkAction(Settings settings, ThreadContext threadContext) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); + this.threadContext = threadContext; } @Override @@ -78,7 +85,7 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { // TODO: Move this to CTOR and hook everything up if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client); + bulkHandler = new IncrementalBulkService(client, threadContext); } if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { From fbfcbb577baced276ed228e0d5d4487473176ab9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 14:41:44 -0600 Subject: [PATCH 04/22] Change --- .../http/IncrementalBulkRestIT.java | 55 +++++++++++++++++++ .../action/bulk/BulkRequestParser.java | 10 ++-- .../action/bulk/IncrementalBulkService.java | 3 +- .../rest/action/document/RestBulkAction.java | 7 ++- 4 files changed, 68 insertions(+), 7 deletions(-) create mode 100644 qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java new file mode 100644 index 0000000000000..ebe35689d93f9 --- /dev/null +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.rest.RestStatus.OK; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) +public class IncrementalBulkRestIT extends HttpSmokeTestCase { + + @SuppressWarnings("unchecked") + public void testIndexingPressureStats() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + successfulIndexingRequest.setJsonEntity(""" + { "index" : { "_index" : "index_name" } } + { "field1" : "value1" } + """); + final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + Map responseMap = XContentHelper.convertToMap( + JsonXContent.jsonXContent, + indexSuccessFul.getEntity().getContent(), + true + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 25d760b707d20..baecc6e2bde33 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -85,13 +85,13 @@ public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiV .withRestApiVersion(restApiVersion); } - private static int findNextMarker(byte marker, int from, BytesReference data) { + private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) { final int res = data.indexOf(marker, from); if (res != -1) { assert res >= 0; return res; } - if (from != data.length()) { + if (from != data.length() && isIncremental == false) { throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]"); } return res; @@ -155,6 +155,7 @@ public void parse( indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer, + false, stringDeduplicator ); } @@ -173,6 +174,7 @@ public int incrementalParse( BiConsumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer, + boolean isIncremental, Map stringDeduplicator ) throws IOException { XContent xContent = xContentType.xContent(); @@ -183,7 +185,7 @@ public int incrementalParse( int from = 0; while (true) { - int nextMarker = findNextMarker(marker, from, data); + int nextMarker = findNextMarker(marker, from, data, isIncremental); if (nextMarker == -1) { break; } @@ -369,7 +371,7 @@ public int incrementalParse( .setIfPrimaryTerm(ifPrimaryTerm) ); } else { - nextMarker = findNextMarker(marker, from, data); + nextMarker = findNextMarker(marker, from, data, isIncremental); if (nextMarker == -1) { break; } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 32a41a40f995b..ea9cffc472202 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -124,7 +124,8 @@ public void lastItems(List> items, Releasable releasable, Act public void onResponse(BulkResponse bulkResponse) { responses.add(bulkResponse); releaseCurrentReferences(); - listener.onResponse(combineResponses()); + BulkResponse response = combineResponses(); + listener.onResponse(response); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 65634aca6f387..6ea8d1abced43 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -84,8 +84,10 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { // TODO: Move this to CTOR and hook everything up - if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client, threadContext); + synchronized (this) { + if (bulkHandler == null) { + bulkHandler = new IncrementalBulkService(client, threadContext); + } } if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { @@ -171,6 +173,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo (request, type) -> items.add(request), items::add, items::add, + isLast == false, stringDeduplicator ); From 3c8d8c058548506d45206dced8b0319c0b93def2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 15:58:13 -0600 Subject: [PATCH 05/22] Try_no_split --- .../http/IncrementalBulkRestIT.java | 49 ++++++++++++++++++- .../action/bulk/IncrementalBulkService.java | 2 +- .../rest/action/document/RestBulkAction.java | 45 ++++++++++------- 3 files changed, 76 insertions(+), 20 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index ebe35689d93f9..52d114224450c 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -40,9 +40,56 @@ public void testIndexingPressureStats() throws IOException { assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + successfulIndexingRequest.setJsonEntity(""" { "index" : { "_index" : "index_name" } } - { "field1" : "value1" } + { "field" : "value1" } + { "index" : { "_index" : "index_name" } } + { "field" : "value2" } + { "index" : { "_index" : "index_name" } } + { "field" : "value3" } + { "index" : { "_index" : "index_name" } } + { "field" : "value4" } + { "index" : { "_index" : "index_name" } } + { "field" : "value5" } + { "index" : { "_index" : "index_name" } } + { "field" : "value6" } + { "index" : { "_index" : "index_name" } } + { "field" : "value7" } + { "index" : { "_index" : "index_name" } } + { "field" : "value8" } + { "index" : { "_index" : "index_name" } } + { "field" : "value9" } + { "index" : { "_index" : "index_name" } } + { "field" : "value10" } + { "index" : { "_index" : "index_name" } } + { "field" : "value11" } + { "index" : { "_index" : "index_name" } } + { "field" : "value12" } + { "index" : { "_index" : "index_name" } } + { "field" : "value13" } + { "index" : { "_index" : "index_name" } } + { "field" : "value14" } + { "index" : { "_index" : "index_name" } } + { "field" : "value15" } + { "index" : { "_index" : "index_name" } } + { "field" : "value16" } + { "index" : { "_index" : "index_name" } } + { "field" : "value17" } + { "index" : { "_index" : "index_name" } } + { "field" : "value18" } + { "index" : { "_index" : "index_name" } } + { "field" : "value19" } + { "index" : { "_index" : "index_name" } } + { "field" : "value20" } + { "index" : { "_index" : "index_name" } } + { "field" : "value21" } + { "index" : { "_index" : "index_name" } } + { "field" : "value22" } + { "index" : { "_index" : "index_name" } } + { "field" : "value23" } + { "index" : { "_index" : "index_name" } } + { "field" : "value24" } """); final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index ea9cffc472202..d2c64e62e4e00 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -103,7 +103,7 @@ public void onFailure(Exception e) { private boolean shouldBackOff() { // TODO: Implement Real Memory Logic - return bulkRequest.requests().size() >= 16; + return false; } public void lastItems(List> items, Releasable releasable, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 6ea8d1abced43..055a23a5fa546 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -13,10 +13,12 @@ import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; @@ -94,11 +96,20 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.param("type"); } - return new ChunkHandler(request); + return new ChunkHandler( + allowExplicitIndex, + request, + bulkHandler.newBulkRequest( + request.param("wait_for_active_shards"), + request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), + request.param("refresh") + ) + ); } - private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { + private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { + private final boolean allowExplicitIndex; private final RestRequest request; private final Map stringDeduplicator = new HashMap<>(); @@ -116,7 +127,8 @@ private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); private final ArrayList> items = new ArrayList<>(4); - private ChunkHandler(RestRequest request) { + private ChunkHandler(boolean allowExplicitIndex, RestRequest request, IncrementalBulkService.Handler handler) { + this.allowExplicitIndex = allowExplicitIndex; this.request = request; this.defaultIndex = request.param("index"); this.defaultRouting = request.param("routing"); @@ -126,11 +138,7 @@ private ChunkHandler(RestRequest request) { this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); this.parser = new BulkRequestParser(true, request.getRestApiVersion()); - handler = bulkHandler.newBulkRequest( - request.param("wait_for_active_shards"), - request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), - request.param("refresh") - ); + this.handler = handler; } @Override @@ -142,7 +150,8 @@ public void accept(RestChannel restChannel) throws Exception { public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { assert channel == restChannel; - final ReleasableBytesReference data; + final BytesReference data; + final Releasable releasable; try { // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in // BulkRequest#add is fine @@ -150,11 +159,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo unParsedChunks.add(chunk); if (unParsedChunks.size() > 1) { - ReleasableBytesReference[] bytesReferences = unParsedChunks.toArray(new ReleasableBytesReference[0]); - data = new ReleasableBytesReference( - CompositeBytesReference.of(bytesReferences), - () -> Releasables.close(bytesReferences) - ); + data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); } else { data = chunk; } @@ -177,7 +182,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo stringDeduplicator ); - accountParsing(bytesConsumed); + releasable = accountParsing(bytesConsumed); } catch (IOException e) { // TODO: Exception Handling @@ -187,10 +192,10 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo if (isLast) { assert unParsedChunks.isEmpty(); assert channel != null; - handler.lastItems(new ArrayList<>(items), data, new RestRefCountedChunkedToXContentListener<>(channel)); + handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel)); items.clear(); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), data, () -> request.contentStream().next()); + handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next()); items.clear(); } } @@ -202,16 +207,20 @@ public void close() { RequestBodyChunkConsumer.super.close(); } - private void accountParsing(int bytesConsumed) { + private Releasable accountParsing(int bytesConsumed) { + ArrayList releasables = new ArrayList<>(unParsedChunks.size()); while (bytesConsumed > 0) { ReleasableBytesReference reference = unParsedChunks.removeFirst(); + releasables.add(reference); if (bytesConsumed >= reference.length()) { bytesConsumed -= reference.length(); + releasables.add(reference); } else { unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed)); bytesConsumed = 0; } } + return () -> Releasables.close(releasables); } } From 2b3bcd51d337af9d64de6239b04e1a430bc90cd3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 16:49:42 -0600 Subject: [PATCH 06/22] Change --- .../common/settings/ClusterSettings.java | 2 + .../rest/action/document/RestBulkAction.java | 82 +++++++++--- .../action/document/RestBulkActionTests.java | 118 +++++++++--------- 3 files changed, 126 insertions(+), 76 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index c023b00ec820f..127e967742e9b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -111,6 +111,7 @@ import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchService; @@ -243,6 +244,7 @@ public void apply(Settings value, Settings current, Settings previous) { Metadata.SETTING_READ_ONLY_SETTING, Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE, + RestBulkAction.INCREMENTAL_BULK, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 055a23a5fa546..fc0dda1b45fe1 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -9,13 +9,16 @@ package org.elasticsearch.rest.action.document; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestParser; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.IncrementalBulkService; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; @@ -37,6 +40,7 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -51,9 +55,12 @@ */ @ServerlessScope(Scope.PUBLIC) public class RestBulkAction extends BaseRestHandler { + public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; + public static final Setting INCREMENTAL_BULK = boolSetting("rest.incremental_bulk", true, Setting.Property.NodeScope); private final boolean allowExplicitIndex; + private final boolean incrementalBulk; private final ThreadContext threadContext; private volatile IncrementalBulkService bulkHandler; @@ -64,6 +71,7 @@ public RestBulkAction(Settings settings) { public RestBulkAction(Settings settings, ThreadContext threadContext) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.threadContext = threadContext; + this.incrementalBulk = INCREMENTAL_BULK.get(settings); } @Override @@ -85,26 +93,61 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - // TODO: Move this to CTOR and hook everything up - synchronized (this) { - if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client, threadContext); + if (incrementalBulk == false) { + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { + request.param("type"); } - } + BulkRequest bulkRequest = new BulkRequest(); + String defaultIndex = request.param("index"); + String defaultRouting = request.param("routing"); + FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); + String defaultPipeline = request.param("pipeline"); + boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + } + Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); + boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); + bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); + bulkRequest.setRefreshPolicy(request.param("refresh")); + bulkRequest.add( + request.requiredContent(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + request.getXContentType(), + request.getRestApiVersion() + ); - if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { - request.param("type"); - } + return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel)); + } else { + // TODO: Move this to CTOR and hook everything up + synchronized (this) { + if (bulkHandler == null) { + bulkHandler = new IncrementalBulkService(client, threadContext); + } + } - return new ChunkHandler( - allowExplicitIndex, - request, - bulkHandler.newBulkRequest( - request.param("wait_for_active_shards"), - request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), - request.param("refresh") - ) - ); + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { + request.param("type"); + } + + return new ChunkHandler( + allowExplicitIndex, + request, + bulkHandler.newBulkRequest( + request.param("wait_for_active_shards"), + request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), + request.param("refresh") + ) + ); + } } private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { @@ -195,8 +238,10 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel)); items.clear(); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next()); + handler.addItems(new ArrayList<>(items), releasable, () -> { request.contentStream().next(); }); items.clear(); + } else { + releasable.close(); } } @@ -214,7 +259,6 @@ private Releasable accountParsing(int bytesConsumed) { releasables.add(reference); if (bytesConsumed >= reference.length()) { bytesConsumed -= reference.length(); - releasables.add(reference); } else { unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed)); bytesConsumed = 0; diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index caeb0f36a1000..838f96b25e099 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -50,7 +50,7 @@ public void bulk(BulkRequest request, ActionListener listener) { }; final Map params = new HashMap<>(); params.put("pipeline", "timestamps"); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} {"field1":"val1"} @@ -82,20 +82,21 @@ public void bulk(BulkRequest request, ActionListener listener) { }; Map params = new HashMap<>(); { - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(false)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(false)); @@ -103,40 +104,42 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.put("list_executed_pipelines", "true"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(true)); } { bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1", "list_executed_pipelines": "false"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1", "list_executed_pipelines": "false"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(false)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(true)); @@ -144,20 +147,21 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.remove("list_executed_pipelines"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1", "list_executed_pipelines": "true"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1", "list_executed_pipelines": "true"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(false)); From 101108676a5be32677833cd07128fca2936fa383 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 16:59:13 -0600 Subject: [PATCH 07/22] Changes --- .../action/bulk/IncrementalBulkService.java | 25 +++++++++++-------- .../elasticsearch/node/NodeConstruction.java | 2 +- .../rest/action/document/RestBulkAction.java | 3 ++- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index d2c64e62e4e00..28c92e5bcd5c4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -24,25 +24,28 @@ public class IncrementalBulkService { private final Client client; - private final ThreadContext threadContext; - public IncrementalBulkService(Client client, ThreadContext threadContext) { + public IncrementalBulkService(Client client) { this.client = client; - this.threadContext = threadContext; } public Handler newBulkRequest() { - return newBulkRequest(null, null, null); + return newBulkRequest(() -> {}, null, null, null); } - public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { - return new Handler(client, threadContext, waitForActiveShards, timeout, refresh); + public Handler newBulkRequest( + ThreadContext.StoredContext storedContext, + @Nullable String waitForActiveShards, + @Nullable TimeValue timeout, + @Nullable String refresh + ) { + return new Handler(client, storedContext, waitForActiveShards, timeout, refresh); } public static class Handler implements Releasable { private final Client client; - private final ThreadContext threadContext; + private final ThreadContext.StoredContext storedContext; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; private final String refresh; @@ -56,13 +59,13 @@ public static class Handler implements Releasable { private Handler( Client client, - ThreadContext threadContext, + ThreadContext.StoredContext storedContext, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh ) { this.client = client; - this.threadContext = threadContext; + this.storedContext = storedContext; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; @@ -81,6 +84,7 @@ public void addItems(List> items, Releasable releasable, Runn final boolean isFirstRequest = incrementalRequestSubmitted == false; incrementalRequestSubmitted = true; + storedContext.restore(); client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { @Override @@ -114,8 +118,7 @@ public void lastItems(List> items, Releasable releasable, Act assert bulkRequest != null; internalAddItems(items, releasable); - threadContext.addResponseHeader("X-elastic-product", "Elasticsearch"); - + storedContext.restore(); client.bulk(bulkRequest, new ActionListener<>() { private final boolean isFirstRequest = incrementalRequestSubmitted == false; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index b59d1d23fea92..2f76daa7e42ed 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -965,7 +965,7 @@ record PluginServiceInstances( ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client); SnapshotsService snapshotsService = new SnapshotsService( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index fc0dda1b45fe1..ee923daa6b877 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -130,7 +130,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // TODO: Move this to CTOR and hook everything up synchronized (this) { if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client, threadContext); + bulkHandler = new IncrementalBulkService(client); } } @@ -142,6 +142,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC allowExplicitIndex, request, bulkHandler.newBulkRequest( + threadContext.newStoredContext(), request.param("wait_for_active_shards"), request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), request.param("refresh") From 247efaf548788e07ee9ae97fade8daed443fe27a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 17:25:37 -0600 Subject: [PATCH 08/22] Change --- .../http/IncrementalBulkRestIT.java | 63 +++----------- .../action/bulk/BulkRequestParser.java | 4 +- .../action/bulk/IncrementalBulkService.java | 83 ++++++++++--------- .../elasticsearch/node/NodeConstruction.java | 2 +- .../rest/action/document/RestBulkAction.java | 2 +- 5 files changed, 63 insertions(+), 91 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 52d114224450c..d4c23d211511e 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -24,7 +24,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase { @SuppressWarnings("unchecked") - public void testIndexingPressureStats() throws IOException { + public void testIncrementalBulk() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" { @@ -41,56 +41,17 @@ public void testIndexingPressureStats() throws IOException { Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); - successfulIndexingRequest.setJsonEntity(""" - { "index" : { "_index" : "index_name" } } - { "field" : "value1" } - { "index" : { "_index" : "index_name" } } - { "field" : "value2" } - { "index" : { "_index" : "index_name" } } - { "field" : "value3" } - { "index" : { "_index" : "index_name" } } - { "field" : "value4" } - { "index" : { "_index" : "index_name" } } - { "field" : "value5" } - { "index" : { "_index" : "index_name" } } - { "field" : "value6" } - { "index" : { "_index" : "index_name" } } - { "field" : "value7" } - { "index" : { "_index" : "index_name" } } - { "field" : "value8" } - { "index" : { "_index" : "index_name" } } - { "field" : "value9" } - { "index" : { "_index" : "index_name" } } - { "field" : "value10" } - { "index" : { "_index" : "index_name" } } - { "field" : "value11" } - { "index" : { "_index" : "index_name" } } - { "field" : "value12" } - { "index" : { "_index" : "index_name" } } - { "field" : "value13" } - { "index" : { "_index" : "index_name" } } - { "field" : "value14" } - { "index" : { "_index" : "index_name" } } - { "field" : "value15" } - { "index" : { "_index" : "index_name" } } - { "field" : "value16" } - { "index" : { "_index" : "index_name" } } - { "field" : "value17" } - { "index" : { "_index" : "index_name" } } - { "field" : "value18" } - { "index" : { "_index" : "index_name" } } - { "field" : "value19" } - { "index" : { "_index" : "index_name" } } - { "field" : "value20" } - { "index" : { "_index" : "index_name" } } - { "field" : "value21" } - { "index" : { "_index" : "index_name" } } - { "field" : "value22" } - { "index" : { "_index" : "index_name" } } - { "field" : "value23" } - { "index" : { "_index" : "index_name" } } - { "field" : "value24" } - """); + + // index documents for the rollup job + final StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); + bulk.append("{\"field\":").append(i).append("}\n"); + } + bulk.append("\r\n"); + + successfulIndexingRequest.setJsonEntity(bulk.toString()); + final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); Map responseMap = XContentHelper.convertToMap( diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index baecc6e2bde33..f1d8cfc8436da 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -183,6 +183,7 @@ public int incrementalParse( int line = 0; int from = 0; + int consumed = 0; while (true) { int nextMarker = findNextMarker(marker, from, data, isIncremental); @@ -444,10 +445,11 @@ public int incrementalParse( } // move pointers from = nextMarker + 1; + consumed = from; } } } - return from; + return isIncremental ? consumed : from; } @UpdateForV9 diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 28c92e5bcd5c4..96402ed9c415d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -24,9 +24,11 @@ public class IncrementalBulkService { private final Client client; + private final ThreadContext threadContext; - public IncrementalBulkService(Client client) { + public IncrementalBulkService(Client client, ThreadContext threadContext) { this.client = client; + this.threadContext = threadContext; } public Handler newBulkRequest() { @@ -39,13 +41,14 @@ public Handler newBulkRequest( @Nullable TimeValue timeout, @Nullable String refresh ) { - return new Handler(client, storedContext, waitForActiveShards, timeout, refresh); + return new Handler(client, threadContext, storedContext, waitForActiveShards, timeout, refresh); } public static class Handler implements Releasable { private final Client client; - private final ThreadContext.StoredContext storedContext; + private final ThreadContext threadContext; + private final ThreadContext.StoredContext requestContext; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; private final String refresh; @@ -59,13 +62,15 @@ public static class Handler implements Releasable { private Handler( Client client, - ThreadContext.StoredContext storedContext, + ThreadContext threadContext, + ThreadContext.StoredContext requestContext, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh ) { this.client = client; - this.storedContext = storedContext; + this.threadContext = threadContext; + this.requestContext = requestContext; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; @@ -84,21 +89,23 @@ public void addItems(List> items, Releasable releasable, Runn final boolean isFirstRequest = incrementalRequestSubmitted == false; incrementalRequestSubmitted = true; - storedContext.restore(); - client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { - - @Override - public void onResponse(BulkResponse bulkResponse) { - responses.add(bulkResponse); - releaseCurrentReferences(); - createNewBulkRequest(bulkResponse.getIncrementalState()); - } - - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - } - }, nextItems::run)); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + requestContext.restore(); + client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { + + @Override + public void onResponse(BulkResponse bulkResponse) { + responses.add(bulkResponse); + releaseCurrentReferences(); + createNewBulkRequest(bulkResponse.getIncrementalState()); + } + + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + } + }, nextItems)); + } } else { nextItems.run(); } @@ -107,7 +114,7 @@ public void onFailure(Exception e) { private boolean shouldBackOff() { // TODO: Implement Real Memory Logic - return false; + return bulkRequest.requests().size() >= 16; } public void lastItems(List> items, Releasable releasable, ActionListener listener) { @@ -118,25 +125,27 @@ public void lastItems(List> items, Releasable releasable, Act assert bulkRequest != null; internalAddItems(items, releasable); - storedContext.restore(); - client.bulk(bulkRequest, new ActionListener<>() { + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + requestContext.restore(); + client.bulk(bulkRequest, new ActionListener<>() { - private final boolean isFirstRequest = incrementalRequestSubmitted == false; + private final boolean isFirstRequest = incrementalRequestSubmitted == false; - @Override - public void onResponse(BulkResponse bulkResponse) { - responses.add(bulkResponse); - releaseCurrentReferences(); - BulkResponse response = combineResponses(); - listener.onResponse(response); - } + @Override + public void onResponse(BulkResponse bulkResponse) { + responses.add(bulkResponse); + releaseCurrentReferences(); + BulkResponse response = combineResponses(); + listener.onResponse(response); + } - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - errorResponse(listener); - } - }); + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + errorResponse(listener); + } + }); + } } } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 2f76daa7e42ed..b59d1d23fea92 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -965,7 +965,7 @@ record PluginServiceInstances( ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); SnapshotsService snapshotsService = new SnapshotsService( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index ee923daa6b877..5639e1270a3f6 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -130,7 +130,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // TODO: Move this to CTOR and hook everything up synchronized (this) { if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client); + bulkHandler = new IncrementalBulkService(client, threadContext); } } From 861c27fd4f2fe2537d0ab61671c1a9cb8245de16 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 17:58:46 -0600 Subject: [PATCH 09/22] Changes --- .../http/netty4/Netty4HttpServerTransport.java | 2 +- .../http/IncrementalBulkRestIT.java | 1 - .../org/elasticsearch/action/ActionModule.java | 4 +++- .../action/bulk/BulkRequestParser.java | 1 + .../action/bulk/IncrementalBulkService.java | 11 +++-------- .../org/elasticsearch/node/NodeConstruction.java | 8 +++++--- .../rest/action/document/RestBulkAction.java | 1 - .../elasticsearch/action/ActionModuleTests.java | 16 +++++++++++----- .../http/AbstractHttpServerTransportTests.java | 4 +++- .../xpack/security/SecurityTests.java | 4 +++- 10 files changed, 30 insertions(+), 22 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 66d36a8b4f835..bca6f224d9682 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -367,7 +367,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { // combines the HTTP message pieces into a single full HTTP request (with headers and body) final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), - httpPreRequest -> httpPreRequest.uri().contains("/_bulk") == false + httpPreRequest -> httpPreRequest.uri().contains("_bulk") == false ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index d4c23d211511e..07c9c933630a9 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -41,7 +41,6 @@ public void testIncrementalBulk() throws IOException { Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); - // index documents for the rollup job final StringBuilder bulk = new StringBuilder(); for (int i = 0; i < 1000; i++) { diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 57d39b17a44cd..c9b89c7ac0e7d 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -159,6 +159,7 @@ import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction; import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.bulk.SimulateBulkAction; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; @@ -475,7 +476,8 @@ public ActionModule( ClusterService clusterService, RerouteService rerouteService, List> reservedStateHandlers, - RestExtension restExtension + RestExtension restExtension, + IncrementalBulkService bulkService ) { this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index f1d8cfc8436da..0835a43d94ccc 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -371,6 +371,7 @@ public int incrementalParse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) ); + consumed = from; } else { nextMarker = findNextMarker(marker, from, data, isIncremental); if (nextMarker == -1) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 96402ed9c415d..7c2dd3c4f9a76 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -32,16 +32,11 @@ public IncrementalBulkService(Client client, ThreadContext threadContext) { } public Handler newBulkRequest() { - return newBulkRequest(() -> {}, null, null, null); + return newBulkRequest(null, null, null); } - public Handler newBulkRequest( - ThreadContext.StoredContext storedContext, - @Nullable String waitForActiveShards, - @Nullable TimeValue timeout, - @Nullable String refresh - ) { - return new Handler(client, threadContext, storedContext, waitForActiveShards, timeout, refresh); + public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { + return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh); } public static class Handler implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index b59d1d23fea92..b9e49c8373f58 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -876,6 +876,9 @@ record PluginServiceInstances( .map(TerminationHandlerProvider::handler); terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null); + final IndexingPressure indexingLimits = new IndexingPressure(settings); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); + ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -901,7 +904,8 @@ record PluginServiceInstances( metadataCreateIndexService, dataStreamGlobalRetentionSettings ), - pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll) + pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll), + incrementalBulkService ); modules.add(actionModule); @@ -964,8 +968,6 @@ record PluginServiceInstances( SearchExecutionStatsCollector.makeWrapper(responseCollectorService) ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); - final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); SnapshotsService snapshotsService = new SnapshotsService( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 5639e1270a3f6..fc0dda1b45fe1 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -142,7 +142,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC allowExplicitIndex, request, bulkHandler.newBulkRequest( - threadContext.newStoredContext(), request.param("wait_for_active_shards"), request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), request.param("refresh") diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index c015dc6177cad..21c863481aa99 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.action; import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.internal.node.NodeClient; @@ -128,7 +129,8 @@ public void testSetupRestHandlerContainsKnownBuiltin() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, new ThreadContext(Settings.EMPTY)) ); actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -192,7 +194,8 @@ public String getName() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, new ThreadContext(Settings.EMPTY)) ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null)); assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET")); @@ -249,7 +252,8 @@ public List getRestHandlers( mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, new ThreadContext(Settings.EMPTY)) ); actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -299,7 +303,8 @@ public void test3rdPartyHandlerIsNotInstalled() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, new ThreadContext(Settings.EMPTY)) ) ); assertThat( @@ -340,7 +345,8 @@ public void test3rdPartyRestControllerIsNotInstalled() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, new ThreadContext(Settings.EMPTY)) ) ); assertThat( diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index 26087ce5f1f0b..5e20dba152cff 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -14,6 +14,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionModule; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -1176,7 +1177,8 @@ public Collection getRestHeaders() { mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, new ThreadContext(Settings.EMPTY)) ); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index a07a7a3a5dd27..13d40660b72f8 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -822,7 +823,8 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc mock(ClusterService.class), null, List.of(), - RestExtension.allowAll() + RestExtension.allowAll(), + new IncrementalBulkService(null, new ThreadContext(Settings.EMPTY)) ); actionModule.initRestHandlers(null, null); From d3bbac1cb2e99ea38cbd6a4e25545629d3898241 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 18:34:34 -0600 Subject: [PATCH 10/22] Changes --- .../http/IncrementalBulkRestIT.java | 35 ++++- .../elasticsearch/action/ActionModule.java | 4 +- .../rest/action/document/RestBulkAction.java | 35 ++--- .../action/document/RestBulkActionTests.java | 137 ++++++++++-------- 4 files changed, 127 insertions(+), 84 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 07c9c933630a9..663964223840c 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.json.JsonXContent; @@ -23,7 +24,6 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) public class IncrementalBulkRestIT extends HttpSmokeTestCase { - @SuppressWarnings("unchecked") public void testIncrementalBulk() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" @@ -59,4 +59,37 @@ public void testIncrementalBulk() throws IOException { true ); } + + public void testIncrementalMalformed() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + + // index documents for the rollup job + final StringBuilder bulk = new StringBuilder(); + bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); + bulk.append("{\"field\":1}\n"); + bulk.append("{}\n"); + bulk.append("\r\n"); + + successfulIndexingRequest.setJsonEntity(bulk.toString()); + + ResponseException responseException = expectThrows( + ResponseException.class, + () -> getRestClient().performRequest(successfulIndexingRequest) + ); + + } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index c9b89c7ac0e7d..c4bc4ebbfae52 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -448,6 +448,7 @@ public class ActionModule extends AbstractModule { private final List actionPlugins; private final Map> actions; private final ActionFilters actionFilters; + private final IncrementalBulkService bulkService; private final AutoCreateIndex autoCreateIndex; private final DestructiveOperations destructiveOperations; private final RestController restController; @@ -489,6 +490,7 @@ public ActionModule( this.threadPool = threadPool; actions = setupActions(actionPlugins); actionFilters = setupActionFilters(actionPlugins); + this.bulkService = bulkService; autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices); destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set headers = Stream.concat( @@ -929,7 +931,7 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< registerHandler.accept(new RestCountAction()); registerHandler.accept(new RestTermVectorsAction()); registerHandler.accept(new RestMultiTermVectorsAction()); - registerHandler.accept(new RestBulkAction(settings, threadPool.getThreadContext())); + registerHandler.accept(new RestBulkAction(settings, bulkService)); registerHandler.accept(new RestUpdateAction()); registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature)); diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index fc0dda1b45fe1..cd6d6f283f892 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; @@ -30,10 +29,10 @@ import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; +import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; @@ -61,16 +60,11 @@ public class RestBulkAction extends BaseRestHandler { private final boolean allowExplicitIndex; private final boolean incrementalBulk; - private final ThreadContext threadContext; - private volatile IncrementalBulkService bulkHandler; + private final IncrementalBulkService bulkHandler; - public RestBulkAction(Settings settings) { - this(settings, new ThreadContext(settings)); - } - - public RestBulkAction(Settings settings, ThreadContext threadContext) { + public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); - this.threadContext = threadContext; + this.bulkHandler = bulkHandler; this.incrementalBulk = INCREMENTAL_BULK.get(settings); } @@ -127,13 +121,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel)); } else { - // TODO: Move this to CTOR and hook everything up - synchronized (this) { - if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client, threadContext); - } - } - if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { request.param("type"); } @@ -167,6 +154,7 @@ private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkCon private final IncrementalBulkService.Handler handler; private volatile RestChannel restChannel; + private boolean isException; private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); private final ArrayList> items = new ArrayList<>(4); @@ -185,13 +173,17 @@ private ChunkHandler(boolean allowExplicitIndex, RestRequest request, Incrementa } @Override - public void accept(RestChannel restChannel) throws Exception { + public void accept(RestChannel restChannel) { this.restChannel = restChannel; } @Override public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { assert channel == restChannel; + if (isException) { + chunk.close(); + return; + } final BytesReference data; final Releasable releasable; @@ -227,9 +219,10 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo releasable = accountParsing(bytesConsumed); - } catch (IOException e) { - // TODO: Exception Handling - throw new UncheckedIOException(e); + } catch (Exception e) { + new RestToXContentListener<>(channel).onFailure(e); + isException = true; + return; } if (isLast) { diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index 838f96b25e099..c2d3414a77c40 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -12,9 +12,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; @@ -50,7 +54,10 @@ public void bulk(BulkRequest request, ActionListener listener) { }; final Map params = new HashMap<>(); params.put("pipeline", "timestamps"); - new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()).handleRequest( + new RestBulkAction( + settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} {"field1":"val1"} @@ -82,21 +89,23 @@ public void bulk(BulkRequest request, ActionListener listener) { }; Map params = new HashMap<>(); { - new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) - .handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction( + settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + ).handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(false)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(false)); @@ -104,42 +113,46 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.put("list_executed_pipelines", "true"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) - .handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction( + settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + ).handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(true)); } { bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) - .handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1", "list_executed_pipelines": "false"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction( + settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + ).handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1", "list_executed_pipelines": "false"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(false)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(true)); @@ -147,21 +160,23 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.remove("list_executed_pipelines"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) - .handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1", "list_executed_pipelines": "true"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction( + settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + ).handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1", "list_executed_pipelines": "true"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(false)); From 1bdb92d9078f3a1d05c558ba6f5952a7917f6e21 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 18:57:48 -0600 Subject: [PATCH 11/22] Change --- .../http/IncrementalBulkRestIT.java | 10 +++----- .../action/bulk/IncrementalBulkService.java | 3 +-- .../rest/action/document/RestBulkAction.java | 23 +++++++++---------- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 663964223840c..94db419f5a7f9 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -75,7 +75,7 @@ public void testIncrementalMalformed() throws IOException { final Response indexCreatedResponse = getRestClient().performRequest(createRequest); assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); - Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + Request bulkRequest = new Request("POST", "/index_name/_bulk"); // index documents for the rollup job final StringBuilder bulk = new StringBuilder(); @@ -84,12 +84,8 @@ public void testIncrementalMalformed() throws IOException { bulk.append("{}\n"); bulk.append("\r\n"); - successfulIndexingRequest.setJsonEntity(bulk.toString()); - - ResponseException responseException = expectThrows( - ResponseException.class, - () -> getRestClient().performRequest(successfulIndexingRequest) - ); + bulkRequest.setJsonEntity(bulk.toString()); + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 7c2dd3c4f9a76..a7ba5980260d3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -130,8 +130,7 @@ public void lastItems(List> items, Releasable releasable, Act public void onResponse(BulkResponse bulkResponse) { responses.add(bulkResponse); releaseCurrentReferences(); - BulkResponse response = combineResponses(); - listener.onResponse(response); + listener.onResponse(combineResponses()); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index cd6d6f283f892..be2acbaefdc3b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -186,11 +186,8 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo } final BytesReference data; - final Releasable releasable; + int bytesConsumed; try { - // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in - // BulkRequest#add is fine - unParsedChunks.add(chunk); if (unParsedChunks.size() > 1) { @@ -199,7 +196,9 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo data = chunk; } - int bytesConsumed = parser.incrementalParse( + // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in + // BulkRequest#add is fine + bytesConsumed = parser.incrementalParse( data, defaultIndex, defaultRouting, @@ -217,21 +216,24 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo stringDeduplicator ); - releasable = accountParsing(bytesConsumed); - } catch (Exception e) { + // TODO: This needs to be better + Releasables.close(handler); + Releasables.close(unParsedChunks); + unParsedChunks.clear(); new RestToXContentListener<>(channel).onFailure(e); isException = true; return; } + final Releasable releasable = accountParsing(bytesConsumed); if (isLast) { assert unParsedChunks.isEmpty(); assert channel != null; handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel)); items.clear(); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), releasable, () -> { request.contentStream().next(); }); + handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next()); items.clear(); } else { releasable.close(); @@ -240,8 +242,6 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo @Override public void close() { - Releasables.close(handler); - Releasables.close(unParsedChunks); RequestBodyChunkConsumer.super.close(); } @@ -268,7 +268,6 @@ public boolean supportsBulkContent() { @Override public boolean allowsUnsafeBuffers() { - // TODO: Does this change with the chunking? - return true; + return false; } } From 6fa810937f9b12939d22a6046ccc3a34f480f152 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 19:36:03 -0600 Subject: [PATCH 12/22] More changes --- .../action/bulk/IncrementalBulkService.java | 2 +- .../rest/action/document/RestBulkAction.java | 21 +++--- .../action/document/RestBulkActionTests.java | 64 +++++++++++++++++++ .../test/rest/FakeRestRequest.java | 5 ++ 4 files changed, 83 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index a7ba5980260d3..4a735884fd244 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -55,7 +55,7 @@ public static class Handler implements Releasable { private Exception bulkActionLevelFailure = null; private BulkRequest bulkRequest = null; - private Handler( + protected Handler( Client client, ThreadContext threadContext, ThreadContext.StoredContext requestContext, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index be2acbaefdc3b..b3766cd071c69 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -137,7 +137,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } } - private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { + static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { private final boolean allowExplicitIndex; private final RestRequest request; @@ -158,7 +158,7 @@ private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkCon private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); private final ArrayList> items = new ArrayList<>(4); - private ChunkHandler(boolean allowExplicitIndex, RestRequest request, IncrementalBulkService.Handler handler) { + ChunkHandler(boolean allowExplicitIndex, RestRequest request, IncrementalBulkService.Handler handler) { this.allowExplicitIndex = allowExplicitIndex; this.request = request; this.defaultIndex = request.param("index"); @@ -226,17 +226,22 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo return; } - final Releasable releasable = accountParsing(bytesConsumed); + final ArrayList releasables = accountParsing(bytesConsumed); if (isLast) { assert unParsedChunks.isEmpty(); assert channel != null; - handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel)); + handler.lastItems( + new ArrayList<>(items), + () -> Releasables.close(releasables), + new RestRefCountedChunkedToXContentListener<>(channel) + ); items.clear(); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next()); + handler.addItems(new ArrayList<>(items), () -> Releasables.close(releasables), () -> request.contentStream().next()); items.clear(); } else { - releasable.close(); + assert releasables.isEmpty(); + request.contentStream().next(); } } @@ -245,7 +250,7 @@ public void close() { RequestBodyChunkConsumer.super.close(); } - private Releasable accountParsing(int bytesConsumed) { + private ArrayList accountParsing(int bytesConsumed) { ArrayList releasables = new ArrayList<>(unParsedChunks.size()); while (bytesConsumed > 0) { ReleasableBytesReference reference = unParsedChunks.removeFirst(); @@ -257,7 +262,7 @@ private Releasable accountParsing(int bytesConsumed) { bytesConsumed = 0; } } - return () -> Releasables.close(releasables); + return releasables; } } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index c2d3414a77c40..f12a9d461cd4b 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.IncrementalBulkService; @@ -17,20 +18,28 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpNodeClient; +import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xcontent.XContentType; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; @@ -183,4 +192,59 @@ public void bulk(BulkRequest request, ActionListener listener) { } } } + + public void testIncrementalParsing() { + ArrayList> docs = new ArrayList<>(); + + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withMethod(RestRequest.Method.POST) + .withBody(new HttpBody.Stream() { + @Override + public ChunkHandler handler() { + return null; + } + + @Override + public void setHandler(ChunkHandler chunkHandler) { + + } + + @Override + public void next() { + + } + }) + .withHeaders(Map.of("Content-Type", Collections.singletonList("application/json"))) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + + RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler( + true, + request, + new IncrementalBulkService.Handler(null, null, null, null, null, null) { + + @Override + public void addItems(List> items, Releasable releasable, Runnable nextItems) { + releasable.close(); + docs.addAll(items); + } + + @Override + public void lastItems(List> items, Releasable releasable, ActionListener listener) { + releasable.close(); + docs.addAll(items); + } + } + ); + + chunkHandler.accept(channel); + chunkHandler.handleChunk( + channel, + ReleasableBytesReference.wrap(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n")), + false + ); + assertThat(docs, empty()); + chunkHandler.handleChunk(channel, ReleasableBytesReference.wrap(new BytesArray("{\"field\":1}")), false); + assertThat(docs, empty()); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index bcea7f61fe158..88d3a0fd3b45e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -228,6 +228,11 @@ public Builder withContent(BytesReference contentBytes, XContentType xContentTyp return this; } + public Builder withBody(HttpBody body) { + this.content = body; + return this; + } + public Builder withPath(String path) { this.path = path; return this; From c007b5abb01c3267cbc32ea80bf4bd460c6d8e75 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 20:15:34 -0600 Subject: [PATCH 13/22] Change --- .../http/IncrementalBulkRestIT.java | 7 ++- .../action/document/RestBulkActionTests.java | 53 +++++++++++++++---- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 94db419f5a7f9..e530080863cc6 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; +import java.util.List; import java.util.Map; import static org.elasticsearch.rest.RestStatus.OK; @@ -24,6 +25,7 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) public class IncrementalBulkRestIT extends HttpSmokeTestCase { + @SuppressWarnings("unchecked") public void testIncrementalBulk() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" @@ -58,6 +60,9 @@ public void testIncrementalBulk() throws IOException { indexSuccessFul.getEntity().getContent(), true ); + + assertFalse((Boolean) responseMap.get("errors")); + assertThat(((List) responseMap.get("items")).size(), equalTo(1000)); } public void testIncrementalMalformed() throws IOException { @@ -86,6 +91,6 @@ public void testIncrementalMalformed() throws IOException { bulkRequest.setJsonEntity(bulk.toString()); - ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); + expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); } } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index f12a9d461cd4b..c6140cadfd375 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -195,6 +195,8 @@ public void bulk(BulkRequest request, ActionListener listener) { public void testIncrementalParsing() { ArrayList> docs = new ArrayList<>(); + AtomicBoolean isLast = new AtomicBoolean(false); + AtomicBoolean next = new AtomicBoolean(false); FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withMethod(RestRequest.Method.POST) @@ -205,13 +207,11 @@ public ChunkHandler handler() { } @Override - public void setHandler(ChunkHandler chunkHandler) { - - } + public void setHandler(ChunkHandler chunkHandler) {} @Override public void next() { - + next.set(true); } }) .withHeaders(Map.of("Content-Type", Collections.singletonList("application/json"))) @@ -233,18 +233,51 @@ public void addItems(List> items, Releasable releasable, Runn public void lastItems(List> items, Releasable releasable, ActionListener listener) { releasable.close(); docs.addAll(items); + isLast.set(true); } } ); chunkHandler.accept(channel); - chunkHandler.handleChunk( - channel, - ReleasableBytesReference.wrap(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n")), - false - ); + ReleasableBytesReference r1 = new ReleasableBytesReference(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n"), () -> {}); + chunkHandler.handleChunk(channel, r1, false); assertThat(docs, empty()); - chunkHandler.handleChunk(channel, ReleasableBytesReference.wrap(new BytesArray("{\"field\":1}")), false); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + + ReleasableBytesReference r2 = new ReleasableBytesReference(new BytesArray("{\"field\":1}"), () -> {}); + chunkHandler.handleChunk(channel, r2, false); assertThat(docs, empty()); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + assertTrue(r1.hasReferences()); + assertTrue(r2.hasReferences()); + + ReleasableBytesReference r3 = new ReleasableBytesReference(new BytesArray("\n{\"delete\":"), () -> {}); + chunkHandler.handleChunk(channel, r3, false); + assertThat(docs, hasSize(1)); + assertFalse(next.get()); + assertFalse(isLast.get()); + assertFalse(r1.hasReferences()); + assertFalse(r2.hasReferences()); + assertTrue(r3.hasReferences()); + + ReleasableBytesReference r4 = new ReleasableBytesReference(new BytesArray("{\"_index\":\"test\",\"_id\":\"2\"}}"), () -> {}); + chunkHandler.handleChunk(channel, r4, false); + assertThat(docs, hasSize(1)); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + + ReleasableBytesReference r5 = new ReleasableBytesReference(new BytesArray("\n"), () -> {}); + chunkHandler.handleChunk(channel, r5, true); + assertThat(docs, hasSize(2)); + assertFalse(next.get()); + assertTrue(isLast.get()); + assertFalse(r3.hasReferences()); + assertFalse(r4.hasReferences()); + assertFalse(r5.hasReferences()); } } From 74d69cfb8ecf5a10ceb478d02e9fdee188d8a1cf Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 23:08:04 -0600 Subject: [PATCH 14/22] More --- .../netty4/Netty4HttpServerTransport.java | 12 +++++-- .../Netty4HttpServerTransportTests.java | 4 ++- .../action/bulk/IncrementalBulkService.java | 31 +++++++++++++++++++ .../elasticsearch/node/NodeConstruction.java | 6 +++- .../rest/action/document/RestBulkAction.java | 4 +-- 5 files changed, 49 insertions(+), 8 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index bca6f224d9682..8dc0169fcf630 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.ThreadWatchdog; @@ -96,6 +97,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final TLSConfig tlsConfig; private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate; private final HttpValidator httpValidator; + private final IncrementalBulkService.Enabled enabled; private final ThreadWatchdog threadWatchdog; private final int readTimeoutMillis; @@ -134,6 +136,7 @@ public Netty4HttpServerTransport( this.acceptChannelPredicate = acceptChannelPredicate; this.httpValidator = httpValidator; this.threadWatchdog = networkService.getThreadWatchdog(); + this.enabled = new IncrementalBulkService.Enabled(clusterSettings); this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); @@ -279,7 +282,7 @@ public void onException(HttpChannel channel, Exception cause) { } public ChannelHandler configureServerChannelHandler() { - return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator); + return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled); } static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel"); @@ -292,19 +295,22 @@ protected static class HttpChannelHandler extends ChannelInitializer { private final TLSConfig tlsConfig; private final BiPredicate acceptChannelPredicate; private final HttpValidator httpValidator; + private final IncrementalBulkService.Enabled enabled; protected HttpChannelHandler( final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings, final TLSConfig tlsConfig, @Nullable final BiPredicate acceptChannelPredicate, - @Nullable final HttpValidator httpValidator + @Nullable final HttpValidator httpValidator, + IncrementalBulkService.Enabled enabled ) { this.transport = transport; this.handlingSettings = handlingSettings; this.tlsConfig = tlsConfig; this.acceptChannelPredicate = acceptChannelPredicate; this.httpValidator = httpValidator; + this.enabled = enabled; } @Override @@ -367,7 +373,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { // combines the HTTP message pieces into a single full HTTP request (with headers and body) final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), - httpPreRequest -> httpPreRequest.uri().contains("_bulk") == false + httpPreRequest -> enabled.incrementalBulkEnabled() && httpPreRequest.uri().contains("_bulk") == false ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 9e213e6468356..ad25c35283cac 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchWrapperException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.Request; @@ -418,7 +419,8 @@ public ChannelHandler configureServerChannelHandler() { handlingSettings, TLSConfig.noTLS(), null, - randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null) + randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null), + new IncrementalBulkService.Enabled(clusterSettings) ) { @Override protected void initChannel(Channel ch) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 4a735884fd244..fe686c47d96f6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -12,23 +12,38 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.action.document.RestBulkAction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class IncrementalBulkService { private final Client client; private final ThreadContext threadContext; + private final Enabled enabled; public IncrementalBulkService(Client client, ThreadContext threadContext) { this.client = client; this.threadContext = threadContext; + this.enabled = new Enabled(); + } + + public IncrementalBulkService(Client client, ThreadContext threadContext, ClusterSettings clusterSettings) { + this.client = client; + this.threadContext = threadContext; + this.enabled = new Enabled(clusterSettings); + } + + public boolean incrementalBulkEnabled() { + return enabled.incrementalBulkEnabled(); } public Handler newBulkRequest() { @@ -39,6 +54,22 @@ public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable Ti return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh); } + public static class Enabled { + + private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true); + + public Enabled() {} + + public Enabled(ClusterSettings clusterSettings) { + incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK)); + clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set); + } + + public boolean incrementalBulkEnabled() { + return incrementalBulksEnabled.get(); + } + } + public static class Handler implements Releasable { private final Client client; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index b9e49c8373f58..c52633aae5b97 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -877,7 +877,11 @@ record PluginServiceInstances( terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService( + client, + threadPool.getThreadContext(), + clusterService.getClusterSettings() + ); ActionModule actionModule = new ActionModule( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index b3766cd071c69..042bebfc3eb57 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -59,13 +59,11 @@ public class RestBulkAction extends BaseRestHandler { public static final Setting INCREMENTAL_BULK = boolSetting("rest.incremental_bulk", true, Setting.Property.NodeScope); private final boolean allowExplicitIndex; - private final boolean incrementalBulk; private final IncrementalBulkService bulkHandler; public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.bulkHandler = bulkHandler; - this.incrementalBulk = INCREMENTAL_BULK.get(settings); } @Override @@ -87,7 +85,7 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - if (incrementalBulk == false) { + if (bulkHandler.incrementalBulkEnabled() == false) { if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { request.param("type"); } From 13ef3f51b64de4652be7fdb822861189cc7d93d5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 28 Aug 2024 11:34:29 -0600 Subject: [PATCH 15/22] Fix --- .../elasticsearch/rest/action/document/RestBulkAction.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 042bebfc3eb57..7601bc9ba1d60 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -56,7 +56,12 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; - public static final Setting INCREMENTAL_BULK = boolSetting("rest.incremental_bulk", true, Setting.Property.NodeScope); + public static final Setting INCREMENTAL_BULK = boolSetting( + "rest.incremental_bulk", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; From b303cc72c359cdd40384ee6fa0c5f5da1bb01af4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 28 Aug 2024 12:18:10 -0600 Subject: [PATCH 16/22] Change --- .../elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java | 2 +- .../org/elasticsearch/rest/action/document/RestBulkAction.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index e43d29d7da2ee..8497e3ee8a40d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -28,7 +28,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final Channel channel; private final Queue chunkQueue = new ArrayDeque<>(); - private boolean requested = true; + private boolean requested = false; private boolean hasLast = false; private HttpBody.ChunkHandler handler; diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 7601bc9ba1d60..8afdc3d7ab3a5 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -178,6 +178,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { @Override public void accept(RestChannel restChannel) { this.restChannel = restChannel; + request.contentStream().next(); } @Override @@ -276,6 +277,6 @@ public boolean supportsBulkContent() { @Override public boolean allowsUnsafeBuffers() { - return false; + return true; } } From 4b3aab502d88eeb2745956801ebe9a9c787ac174 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 28 Aug 2024 15:30:07 -0600 Subject: [PATCH 17/22] Fix --- .../http/netty4/Netty4HttpAggregator.java | 6 +--- .../netty4/Netty4HttpServerTransport.java | 2 +- .../http/IncrementalBulkRestIT.java | 33 +++++++++++++++---- .../action/bulk/IncrementalBulkService.java | 16 ++++++--- .../rest/action/document/RestBulkAction.java | 10 +++--- .../action/document/RestBulkActionTests.java | 23 +++++++------ 6 files changed, 57 insertions(+), 33 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java index 031e803737ee8..d7c5fc0963224 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -35,10 +35,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator { private boolean aggregating = true; private boolean ignoreContentAfterContinueResponse = false; - public Netty4HttpAggregator(int maxContentLength) { - this(maxContentLength, IGNORE_TEST); - } - public Netty4HttpAggregator(int maxContentLength, Predicate decider) { super(maxContentLength); this.decider = decider; @@ -49,7 +45,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception assert msg instanceof HttpObject; if (msg instanceof HttpRequest request) { var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request); - aggregating = decider.test(preReq); + aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq); } if (aggregating || msg instanceof FullHttpRequest) { super.channelRead(ctx, msg); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 8dc0169fcf630..8d9f132028578 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -373,7 +373,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { // combines the HTTP message pieces into a single full HTTP request (with headers and body) final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), - httpPreRequest -> enabled.incrementalBulkEnabled() && httpPreRequest.uri().contains("_bulk") == false + httpPreRequest -> enabled.get() && httpPreRequest.uri().contains("_bulk") == false ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index e530080863cc6..fcb0c1044d22f 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -41,28 +41,49 @@ public void testIncrementalBulk() throws IOException { final Response indexCreatedResponse = getRestClient().performRequest(createRequest); assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); - Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); + + // index documents for the rollup job + String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + + "{\"field\":1}\n" + + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n" + + "{\"field\":1}\n" + + "\r\n"; + + firstBulkRequest.setJsonEntity(bulkBody); + + final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request bulkRequest = new Request("POST", "/index_name/_bulk"); // index documents for the rollup job final StringBuilder bulk = new StringBuilder(); + bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"); + int updates = 0; for (int i = 0; i < 1000; i++) { bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); bulk.append("{\"field\":").append(i).append("}\n"); + if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) { + ++updates; + bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"); + bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n"); + } } bulk.append("\r\n"); - successfulIndexingRequest.setJsonEntity(bulk.toString()); + bulkRequest.setJsonEntity(bulk.toString()); - final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); - assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + final Response bulkResponse = getRestClient().performRequest(bulkRequest); + assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); Map responseMap = XContentHelper.convertToMap( JsonXContent.jsonXContent, - indexSuccessFul.getEntity().getContent(), + bulkResponse.getEntity().getContent(), true ); assertFalse((Boolean) responseMap.get("errors")); - assertThat(((List) responseMap.get("items")).size(), equalTo(1000)); + assertThat(((List) responseMap.get("items")).size(), equalTo(1001 + updates)); } public void testIncrementalMalformed() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index fe686c47d96f6..2eb681f012576 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -23,12 +23,13 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; public class IncrementalBulkService { private final Client client; private final ThreadContext threadContext; - private final Enabled enabled; + private final Supplier enabled; public IncrementalBulkService(Client client, ThreadContext threadContext) { this.client = client; @@ -37,13 +38,17 @@ public IncrementalBulkService(Client client, ThreadContext threadContext) { } public IncrementalBulkService(Client client, ThreadContext threadContext, ClusterSettings clusterSettings) { + this(client, threadContext, new Enabled(clusterSettings)); + } + + public IncrementalBulkService(Client client, ThreadContext threadContext, Supplier enabled) { this.client = client; this.threadContext = threadContext; - this.enabled = new Enabled(clusterSettings); + this.enabled = enabled; } public boolean incrementalBulkEnabled() { - return enabled.incrementalBulkEnabled(); + return enabled.get(); } public Handler newBulkRequest() { @@ -54,7 +59,7 @@ public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable Ti return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh); } - public static class Enabled { + public static class Enabled implements Supplier { private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true); @@ -65,7 +70,8 @@ public Enabled(ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set); } - public boolean incrementalBulkEnabled() { + @Override + public Boolean get() { return incrementalBulksEnabled.get(); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 8afdc3d7ab3a5..cddd595996530 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -234,15 +234,13 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo if (isLast) { assert unParsedChunks.isEmpty(); assert channel != null; - handler.lastItems( - new ArrayList<>(items), - () -> Releasables.close(releasables), - new RestRefCountedChunkedToXContentListener<>(channel) - ); + ArrayList> toPass = new ArrayList<>(items); items.clear(); + handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), () -> Releasables.close(releasables), () -> request.contentStream().next()); + ArrayList> toPass = new ArrayList<>(items); items.clear(); + handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next()); } else { assert releasables.isEmpty(); request.contentStream().next(); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index c6140cadfd375..5a6629a2bb0ea 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -64,8 +64,8 @@ public void bulk(BulkRequest request, ActionListener listener) { final Map params = new HashMap<>(); params.put("pipeline", "timestamps"); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} @@ -99,8 +99,8 @@ public void bulk(BulkRequest request, ActionListener listener) { Map params = new HashMap<>(); { new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -123,8 +123,8 @@ public void bulk(BulkRequest request, ActionListener listener) { params.put("list_executed_pipelines", "true"); bulkCalled.set(false); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -146,8 +146,8 @@ public void bulk(BulkRequest request, ActionListener listener) { { bulkCalled.set(false); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -170,8 +170,8 @@ public void bulk(BulkRequest request, ActionListener listener) { params.remove("list_executed_pipelines"); bulkCalled.set(false); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -201,6 +201,9 @@ public void testIncrementalParsing() { FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withMethod(RestRequest.Method.POST) .withBody(new HttpBody.Stream() { + @Override + public void close() {} + @Override public ChunkHandler handler() { return null; From aba3eb3a02e622b11860ac4f5fd2d14a0d56b7ea Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 28 Aug 2024 16:49:11 -0600 Subject: [PATCH 18/22] Fixes --- .../http/netty4/Netty4HttpRequestSizeLimitIT.java | 3 +++ .../elasticsearch/http/netty4/Netty4HttpServerTransport.java | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java index 0dadc159c41c7..12699442e23bf 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; @@ -51,6 +52,8 @@ protected boolean addMockHttpTransport() { protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies + .put(RestBulkAction.INCREMENTAL_BULK.getKey(), false) .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT) .build(); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 8d9f132028578..f9b83fd68988b 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -373,7 +373,8 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { // combines the HTTP message pieces into a single full HTTP request (with headers and body) final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), - httpPreRequest -> enabled.get() && httpPreRequest.uri().contains("_bulk") == false + httpPreRequest -> enabled.get() == false + || (httpPreRequest.uri().contains("_bulk") == false || httpPreRequest.uri().contains("_bulk_update")) ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() From 3dc16ce6b4efe53db9e8286ae277a21e44768ac5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 28 Aug 2024 19:21:02 -0600 Subject: [PATCH 19/22] Awaits fix --- .../org/elasticsearch/test/rest/RequestsWithoutContentIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java index ba305333f2fb9..6fea09d249168 100644 --- a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java +++ b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java @@ -25,6 +25,7 @@ public void testIndexMissingBody() throws IOException { assertResponseException(responseException, "request body is required"); } + @AwaitsFix(bugUrl = "need to decide how to handle this scenario") public void testBulkMissingBody() throws IOException { ResponseException responseException = expectThrows( ResponseException.class, From 4c1b20c6c4d7bf0002c8fb1728e189298fd5dfcf Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Aug 2024 13:51:41 -0600 Subject: [PATCH 20/22] Fix --- .../elasticsearch/http/netty4/Netty4HttpServerTransport.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index f9b83fd68988b..4bb6370af3478 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -374,7 +374,9 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), httpPreRequest -> enabled.get() == false - || (httpPreRequest.uri().contains("_bulk") == false || httpPreRequest.uri().contains("_bulk_update")) + || (httpPreRequest.uri().contains("_bulk") == false + || httpPreRequest.uri().contains("_bulk_update") + || httpPreRequest.uri().contains("/_xpack/monitoring/_bulk")) ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() From 321377ad8c381805380909bc2d9ac5c48c026093 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Aug 2024 17:33:54 -0600 Subject: [PATCH 21/22] Fixes --- .../action/bulk/IncrementalBulkService.java | 21 +++++---------- .../rest/action/document/RestBulkAction.java | 27 ++++++++++--------- .../action/document/RestBulkActionTests.java | 2 +- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 333350a13aced..27ad99b012829 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -70,15 +70,7 @@ public Handler newBulkRequest() { } public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { - return new Handler( - client, - threadContext, - threadContext.newStoredContext(), - indexingPressure, - waitForActiveShards, - timeout, - refresh - ); + return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh); } public static class Enabled implements Supplier { @@ -104,7 +96,6 @@ public static class Handler implements Releasable { private final Client client; private final ThreadContext threadContext; - private final ThreadContext.StoredContext requestContext; private final IndexingPressure indexingPressure; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; @@ -114,13 +105,13 @@ public static class Handler implements Releasable { private final ArrayList responses = new ArrayList<>(2); private boolean globalFailure = false; private boolean incrementalRequestSubmitted = false; + private ThreadContext.StoredContext requestContext; private Exception bulkActionLevelFailure = null; private BulkRequest bulkRequest = null; protected Handler( Client client, ThreadContext threadContext, - ThreadContext.StoredContext requestContext, IndexingPressure indexingPressure, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @@ -128,7 +119,7 @@ protected Handler( ) { this.client = client; this.threadContext = threadContext; - this.requestContext = requestContext; + this.requestContext = threadContext.newStoredContext(); this.indexingPressure = indexingPressure; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; @@ -163,7 +154,10 @@ public void onResponse(BulkResponse bulkResponse) { public void onFailure(Exception e) { handleBulkFailure(isFirstRequest, e); } - }, nextItems)); + }, () -> { + requestContext = threadContext.newStoredContext(); + nextItems.run(); + })); } } else { nextItems.run(); @@ -185,7 +179,6 @@ public void lastItems(List> items, Releasable releasable, Act } else { assert bulkRequest != null; if (internalAddItems(items, releasable)) { - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { requestContext.restore(); client.bulk(bulkRequest, new ActionListener<>() { diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index cddd595996530..27be8110919b0 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; @@ -38,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -128,15 +130,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.param("type"); } - return new ChunkHandler( - allowExplicitIndex, - request, - bulkHandler.newBulkRequest( - request.param("wait_for_active_shards"), - request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), - request.param("refresh") - ) - ); + String waitForActiveShards = request.param("wait_for_active_shards"); + TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); + String refresh = request.param("refresh"); + return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh)); } } @@ -154,14 +151,15 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { private final Boolean defaultRequireAlias; private final boolean defaultRequireDataStream; private final BulkRequestParser parser; - private final IncrementalBulkService.Handler handler; + private final Supplier handlerSupplier; + private IncrementalBulkService.Handler handler; private volatile RestChannel restChannel; private boolean isException; private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); private final ArrayList> items = new ArrayList<>(4); - ChunkHandler(boolean allowExplicitIndex, RestRequest request, IncrementalBulkService.Handler handler) { + ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier handlerSupplier) { this.allowExplicitIndex = allowExplicitIndex; this.request = request; this.defaultIndex = request.param("index"); @@ -171,18 +169,21 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); - this.parser = new BulkRequestParser(true, request.getRestApiVersion()); - this.handler = handler; + // TODO: Fix type deprecation logging + this.parser = new BulkRequestParser(false, request.getRestApiVersion()); + this.handlerSupplier = handlerSupplier; } @Override public void accept(RestChannel restChannel) { this.restChannel = restChannel; + this.handler = handlerSupplier.get(); request.contentStream().next(); } @Override public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { + assert handler != null; assert channel == restChannel; if (isException) { chunk.close(); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index 04d4f4c1d3c84..0060b306f1018 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -245,7 +245,7 @@ public void next() { RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler( true, request, - new IncrementalBulkService.Handler(null, null, null, null, null, null, null) { + () -> new IncrementalBulkService.Handler(null, null, null, null, null, null) { @Override public void addItems(List> items, Releasable releasable, Runnable nextItems) { From b5308c322c71a8fd13a39f686930fd4735c55cdd Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Aug 2024 18:32:46 -0600 Subject: [PATCH 22/22] Fix --- .../elasticsearch/rest/action/document/RestBulkActionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index 0060b306f1018..66c6660226ab0 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -245,7 +245,7 @@ public void next() { RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler( true, request, - () -> new IncrementalBulkService.Handler(null, null, null, null, null, null) { + () -> new IncrementalBulkService.Handler(null, new ThreadContext(Settings.EMPTY), null, null, null, null) { @Override public void addItems(List> items, Releasable releasable, Runnable nextItems) {