diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java index 12699442e23bf..a8993be982b99 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java @@ -13,6 +13,7 @@ import io.netty.util.ReferenceCounted; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -20,7 +21,6 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; -import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; @@ -53,7 +53,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal, otherSettings)) // TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies - .put(RestBulkAction.INCREMENTAL_BULK.getKey(), false) + .put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false) .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT) .build(); } diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index fcb0c1044d22f..4b3b6fbb2a719 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -8,9 +8,11 @@ package org.elasticsearch.http; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.json.JsonXContent; @@ -25,7 +27,6 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) public class IncrementalBulkRestIT extends HttpSmokeTestCase { - @SuppressWarnings("unchecked") public void testIncrementalBulk() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" @@ -55,35 +56,52 @@ public void testIncrementalBulk() throws IOException { final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest); assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); - Request bulkRequest = new Request("POST", "/index_name/_bulk"); + sendLargeBulk(); + } + + public void testBulkWithIncrementalDisabled() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); // index documents for the rollup job - final StringBuilder bulk = new StringBuilder(); - bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"); - int updates = 0; - for (int i = 0; i < 1000; i++) { - bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); - bulk.append("{\"field\":").append(i).append("}\n"); - if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) { - ++updates; - bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"); - bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n"); - } - } - bulk.append("\r\n"); + String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + + "{\"field\":1}\n" + + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n" + + "{\"field\":1}\n" + + "\r\n"; - bulkRequest.setJsonEntity(bulk.toString()); + firstBulkRequest.setJsonEntity(bulkBody); - final Response bulkResponse = getRestClient().performRequest(bulkRequest); - assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); - Map responseMap = XContentHelper.convertToMap( - JsonXContent.jsonXContent, - bulkResponse.getEntity().getContent(), - true - ); + final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); - assertFalse((Boolean) responseMap.get("errors")); - assertThat(((List) responseMap.get("items")).size(), equalTo(1001 + updates)); + clusterAdmin().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false).build()) + .get(); + + internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false)); + + try { + sendLargeBulk(); + } finally { + internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true)); + clusterAdmin().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null).build()) + .get(); + } } public void testIncrementalMalformed() throws IOException { @@ -114,4 +132,37 @@ public void testIncrementalMalformed() throws IOException { expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); } + + @SuppressWarnings("unchecked") + private static void sendLargeBulk() throws IOException { + Request bulkRequest = new Request("POST", "/index_name/_bulk"); + + // index documents for the rollup job + final StringBuilder bulk = new StringBuilder(); + bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"); + int updates = 0; + for (int i = 0; i < 1000; i++) { + bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); + bulk.append("{\"field\":").append(i).append("}\n"); + if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) { + ++updates; + bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"); + bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n"); + } + } + bulk.append("\r\n"); + + bulkRequest.setJsonEntity(bulk.toString()); + + final Response bulkResponse = getRestClient().performRequest(bulkRequest); + assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + Map responseMap = XContentHelper.convertToMap( + JsonXContent.jsonXContent, + bulkResponse.getEntity().getContent(), + true + ); + + assertFalse((Boolean) responseMap.get("errors")); + assertThat(((List) responseMap.get("items")).size(), equalTo(1001 + updates)); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 27ad99b012829..3e006bc960f84 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; @@ -21,7 +22,6 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexingPressure; -import org.elasticsearch.rest.action.document.RestBulkAction; import java.util.ArrayList; import java.util.Collections; @@ -29,50 +29,48 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static org.elasticsearch.common.settings.Setting.boolSetting; + public class IncrementalBulkService { + public static final Setting INCREMENTAL_BULK = boolSetting( + "rest.incremental_bulk", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); private final Client client; + private final AtomicBoolean enabledForTests = new AtomicBoolean(true); private final IndexingPressure indexingPressure; private final ThreadContext threadContext; - private final Supplier enabled; public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) { - this(client, indexingPressure, threadContext, new Enabled()); - } - - public IncrementalBulkService( - Client client, - IndexingPressure indexingPressure, - ThreadContext threadContext, - ClusterSettings clusterSettings - ) { - this(client, indexingPressure, threadContext, new Enabled(clusterSettings)); - } - - public IncrementalBulkService( - Client client, - IndexingPressure indexingPressure, - ThreadContext threadContext, - Supplier enabled - ) { this.client = client; this.indexingPressure = indexingPressure; this.threadContext = threadContext; - this.enabled = enabled; - } - - public boolean incrementalBulkEnabled() { - return enabled.get(); } public Handler newBulkRequest() { + ensureEnabled(); return newBulkRequest(null, null, null); } public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { + ensureEnabled(); return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh); } + private void ensureEnabled() { + if (enabledForTests.get() == false) { + throw new AssertionError("Unexpected incremental bulk request"); + } + } + + // This method only exists to tests that the feature flag works. Remove once we no longer need the flag. + public void setForTests(boolean value) { + enabledForTests.set(value); + } + public static class Enabled implements Supplier { private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true); @@ -80,8 +78,8 @@ public static class Enabled implements Supplier { public Enabled() {} public Enabled(ClusterSettings clusterSettings) { - incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK)); - clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set); + incrementalBulksEnabled.set(clusterSettings.get(INCREMENTAL_BULK)); + clusterSettings.addSettingsUpdateConsumer(INCREMENTAL_BULK, incrementalBulksEnabled::set); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index f878507411294..4869dfffa772d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.bulk.WriteAckDelay; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.ingest.SimulatePipelineTransportAction; @@ -112,7 +113,6 @@ import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchService; @@ -242,7 +242,7 @@ public void apply(Settings value, Settings current, Settings previous) { Metadata.SETTING_READ_ONLY_SETTING, Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE, - RestBulkAction.INCREMENTAL_BULK, + IncrementalBulkService.INCREMENTAL_BULK, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index d755586e23fdc..442cf42d0ef3a 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -893,8 +893,7 @@ private void construct( final IncrementalBulkService incrementalBulkService = new IncrementalBulkService( client, indexingLimits, - threadPool.getThreadContext(), - clusterService.getClusterSettings() + threadPool.getThreadContext() ); ActionModule actionModule = new ActionModule( diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 27be8110919b0..a2b5cf47a6efb 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -41,7 +40,6 @@ import java.util.Map; import java.util.function.Supplier; -import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -58,12 +56,6 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; - public static final Setting INCREMENTAL_BULK = boolSetting( - "rest.incremental_bulk", - true, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; @@ -92,7 +84,7 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - if (bulkHandler.incrementalBulkEnabled() == false) { + if (request.isStreamedContent() == false) { if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { request.param("type"); } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index 66c6660226ab0..a9f1c3becb099 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -66,7 +66,7 @@ public void bulk(BulkRequest request, ActionListener listener) { params.put("pipeline", "timestamps"); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY), () -> false) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} @@ -101,12 +101,7 @@ public void bulk(BulkRequest request, ActionListener listener) { { new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService( - mock(Client.class), - mock(IndexingPressure.class), - new ThreadContext(Settings.EMPTY), - () -> false - ) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -130,12 +125,7 @@ public void bulk(BulkRequest request, ActionListener listener) { bulkCalled.set(false); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService( - mock(Client.class), - mock(IndexingPressure.class), - new ThreadContext(Settings.EMPTY), - () -> false - ) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -158,12 +148,7 @@ public void bulk(BulkRequest request, ActionListener listener) { bulkCalled.set(false); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService( - mock(Client.class), - mock(IndexingPressure.class), - new ThreadContext(Settings.EMPTY), - () -> false - ) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -187,12 +172,7 @@ public void bulk(BulkRequest request, ActionListener listener) { bulkCalled.set(false); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService( - mock(Client.class), - mock(IndexingPressure.class), - new ThreadContext(Settings.EMPTY), - () -> false - ) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params)