diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java index b8ffe72f50fb5..f11525fc6bff8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java @@ -46,6 +46,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; public class IncrementalBulkIT extends ESIntegTestCase { @@ -55,6 +56,14 @@ protected Collection> nodePlugins() { return List.of(IngestClientIT.ExtendedIngestTestPlugin.class); } + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B") + .build(); + } + public void testSingleBulkRequest() { String index = "test"; createIndex(index); @@ -81,6 +90,71 @@ public void testSingleBulkRequest() { assertFalse(refCounted.hasReferences()); } + public void testIndexingPressureRejection() { + String index = "test"; + createIndex(index); + + String nodeName = internalCluster().getRandomNodeName(); + IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName); + IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName); + + try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) { + IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); + AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + + if (randomBoolean()) { + AtomicBoolean nextPage = new AtomicBoolean(false); + refCounted.incRef(); + handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true)); + assertTrue(nextPage.get()); + } + + PlainActionFuture future = new PlainActionFuture<>(); + handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + + expectThrows(EsRejectedExecutionException.class, future::actionGet); + assertFalse(refCounted.hasReferences()); + } + } + + public void testIncrementalBulkRequestMemoryBackOff() throws Exception { + String index = "test"; + createIndex(index); + + String nodeName = internalCluster().getRandomNodeName(); + IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName); + IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName); + + IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); + + AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + AtomicBoolean nextPage = new AtomicBoolean(false); + + IndexRequest indexRequest = indexRequest(index); + long total = indexRequest.ramBytesUsed(); + while (total < 512) { + refCounted.incRef(); + handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true)); + assertTrue(nextPage.get()); + nextPage.set(false); + indexRequest = indexRequest(index); + total += indexRequest.ramBytesUsed(); + } + + assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L)); + refCounted.incRef(); + handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true)); + + assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L))); + + PlainActionFuture future = new PlainActionFuture<>(); + handler.lastItems(List.of(indexRequest), refCounted::decRef, future); + + BulkResponse bulkResponse = future.actionGet(); + assertNoFailures(bulkResponse); + assertFalse(refCounted.hasReferences()); + } + public void testMultipleBulkPartsWithBackoff() { ExecutorService executorService = Executors.newFixedThreadPool(1); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index bee46d20dec08..e78eb9e0415d0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -399,7 +399,7 @@ private void completeBulkOperation() { responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos), BulkResponse.NO_INGEST_TOOK, - new BulkRequest.IncrementalState(shortCircuitShardFailures) + new BulkRequest.IncrementalState(shortCircuitShardFailures, bulkRequest.incrementalState().indexingPressureAccounted()) ) ); // Allow memory for bulk shard request items to be reclaimed before all items have been completed diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index dda28fb7da2f6..dd32245517924 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -496,12 +496,12 @@ public boolean isSimulated() { return false; // Always false, but may be overridden by a subclass } - record IncrementalState(Map shardLevelFailures) implements Writeable { + record IncrementalState(Map shardLevelFailures, boolean indexingPressureAccounted) implements Writeable { - static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap()); + static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap(), false); IncrementalState(StreamInput in) throws IOException { - this(in.readMap(ShardId::new, input -> input.readException())); + this(in.readMap(ShardId::new, input -> input.readException()), false); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index ce91586cc4426..c3ea345398c96 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -8,24 +8,30 @@ package org.elasticsearch.action.bulk; +import org.apache.lucene.util.Accountable; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexingPressure; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class IncrementalBulkService { private final Client client; + private final IndexingPressure indexingPressure; - public IncrementalBulkService(Client client) { + public IncrementalBulkService(Client client, IndexingPressure indexingPressure) { this.client = client; + this.indexingPressure = indexingPressure; } public Handler newBulkRequest() { @@ -33,12 +39,15 @@ public Handler newBulkRequest() { } public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { - return new Handler(client, waitForActiveShards, timeout, refresh); + return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh); } public static class Handler { + public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true); + private final Client client; + private final IndexingPressure indexingPressure; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; private final String refresh; @@ -50,12 +59,19 @@ public static class Handler { private Exception bulkActionLevelFailure = null; private BulkRequest bulkRequest = null; - private Handler(Client client, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { + private Handler( + Client client, + IndexingPressure indexingPressure, + @Nullable String waitForActiveShards, + @Nullable TimeValue timeout, + @Nullable String refresh + ) { this.client = client; + this.indexingPressure = indexingPressure; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; - createNewBulkRequest(BulkRequest.IncrementalState.EMPTY); + createNewBulkRequest(EMPTY_STATE); } public void addItems(List> items, Releasable releasable, Runnable nextItems) { @@ -64,35 +80,39 @@ public void addItems(List> items, Releasable releasable, Runn nextItems.run(); } else { assert bulkRequest != null; - internalAddItems(items, releasable); - - if (shouldBackOff()) { - final boolean isFirstRequest = incrementalRequestSubmitted == false; - incrementalRequestSubmitted = true; - - client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { - - @Override - public void onResponse(BulkResponse bulkResponse) { - responses.add(bulkResponse); - releaseCurrentReferences(); - createNewBulkRequest(bulkResponse.getIncrementalState()); - } - - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - } - }, nextItems::run)); + if (internalAddItems(items, releasable)) { + if (shouldBackOff()) { + final boolean isFirstRequest = incrementalRequestSubmitted == false; + incrementalRequestSubmitted = true; + + client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { + + @Override + public void onResponse(BulkResponse bulkResponse) { + responses.add(bulkResponse); + releaseCurrentReferences(); + createNewBulkRequest( + new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true) + ); + } + + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + } + }, nextItems)); + } else { + nextItems.run(); + } } else { nextItems.run(); } + } } private boolean shouldBackOff() { - // TODO: Implement Real Memory Logic - return bulkRequest.requests().size() >= 16; + return indexingPressure.shouldSplitBulks(); } public void lastItems(List> items, Releasable releasable, ActionListener listener) { @@ -101,25 +121,27 @@ public void lastItems(List> items, Releasable releasable, Act errorResponse(listener); } else { assert bulkRequest != null; - internalAddItems(items, releasable); + if (internalAddItems(items, releasable)) { + client.bulk(bulkRequest, new ActionListener<>() { - client.bulk(bulkRequest, new ActionListener<>() { + private final boolean isFirstRequest = incrementalRequestSubmitted == false; - private final boolean isFirstRequest = incrementalRequestSubmitted == false; - - @Override - public void onResponse(BulkResponse bulkResponse) { - responses.add(bulkResponse); - releaseCurrentReferences(); - listener.onResponse(combineResponses()); - } + @Override + public void onResponse(BulkResponse bulkResponse) { + responses.add(bulkResponse); + releaseCurrentReferences(); + listener.onResponse(combineResponses()); + } - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - errorResponse(listener); - } - }); + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + errorResponse(listener); + } + }); + } else { + errorResponse(listener); + } } } @@ -159,9 +181,22 @@ private void addItemLevelFailures(List> items) { responses.add(new BulkResponse(bulkItemResponses, 0, 0)); } - private void internalAddItems(List> items, Releasable releasable) { - bulkRequest.add(items); - releasables.add(releasable); + private boolean internalAddItems(List> items, Releasable releasable) { + try { + bulkRequest.add(items); + releasables.add(releasable); + releasables.add( + indexingPressure.markCoordinatingOperationStarted( + items.size(), + items.stream().mapToLong(Accountable::ramBytesUsed).sum(), + false + ) + ); + return true; + } catch (EsRejectedExecutionException e) { + handleBulkFailure(incrementalRequestSubmitted == false, e); + return false; + } } private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index c44ad505aea84..141bea3368edb 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -111,7 +111,12 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener {}; + } else { + releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem); + } final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor; ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index c023b00ec820f..18973ff43180d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -560,6 +560,7 @@ public void apply(Settings value, Settings current, Settings previous) { FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, IndexingPressure.MAX_INDEXING_BYTES, + IndexingPressure.SPLIT_BULK_THRESHOLD, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN, DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING, CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index 7f07cdd1c3b1a..f91fcad054117 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -29,6 +29,12 @@ public class IndexingPressure { Setting.Property.NodeScope ); + public static final Setting SPLIT_BULK_THRESHOLD = Setting.memorySizeSetting( + "indexing_pressure.memory.split_bulk_threshold", + "8.5%", + Setting.Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(IndexingPressure.class); private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); @@ -56,10 +62,12 @@ public class IndexingPressure { private final AtomicLong primaryDocumentRejections = new AtomicLong(0); private final long primaryAndCoordinatingLimits; + private final long splitBulkThreshold; private final long replicaLimits; public IndexingPressure(Settings settings) { this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); + this.splitBulkThreshold = SPLIT_BULK_THRESHOLD.get(settings).getBytes(); this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); } @@ -203,6 +211,10 @@ public Releasable markReplicaOperationStarted(int operations, long bytes, boolea }); } + public boolean shouldSplitBulks() { + return currentCombinedCoordinatingAndPrimaryBytes.get() >= splitBulkThreshold; + } + public IndexingPressureStats stats() { return new IndexingPressureStats( totalCombinedCoordinatingAndPrimaryBytes.get(), diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 2f76daa7e42ed..3cfb800170371 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -965,7 +965,7 @@ record PluginServiceInstances( ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits); SnapshotsService snapshotsService = new SnapshotsService( settings, diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 67912544be812..7bcfaa0252cc5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -118,6 +118,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.MockEngineFactoryPlugin; @@ -2068,6 +2069,9 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE.getKey(), randomFrom(1, 2, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE) ); + if (randomBoolean()) { + builder.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), randomFrom("256B", "1KB", "64KB")); + } return builder.build(); }