Skip to content

Commit ab8f85a

Browse files
sunqijun1sunqijun.junkkewwei
authored andcommitted
[BUG FIX] Using an excessively large reindex slice can lead to a JVM OutOfMemoryError on coordinator (opensearch-project#18964)
* bugfix for too much slices cause jvm oom Signed-off-by: sunqijun.jun <[email protected]> * add changelogs Signed-off-by: sunqijun.jun <[email protected]> * fix spotlessApply Signed-off-by: sunqijun.jun <[email protected]> --------- Signed-off-by: sunqijun.jun <[email protected]> Signed-off-by: kkewwei <[email protected]> Co-authored-by: sunqijun.jun <[email protected]> Co-authored-by: kkewwei <[email protected]> Signed-off-by: Ankit Jain <[email protected]>
1 parent d61a88a commit ab8f85a

File tree

6 files changed

+48
-2
lines changed

6 files changed

+48
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3838
- Fix pull-based ingestion pause state initialization during replica promotion ([#19212](https://github.com/opensearch-project/OpenSearch/pull/19212))
3939
- Fix QueryPhaseResultConsumer incomplete callback loops ([#19231](https://github.com/opensearch-project/OpenSearch/pull/19231))
4040
- Fix the `scaled_float` precision issue ([#19188](https://github.com/opensearch-project/OpenSearch/pull/19188))
41+
- Fix Using an excessively large reindex slice can lead to a JVM OutOfMemoryError on coordinator.([#18964](https://github.com/opensearch-project/OpenSearch/pull/18964))
4142

4243
### Dependencies
4344
- Bump `com.netflix.nebula.ospackage-base` from 12.0.0 to 12.1.0 ([#19019](https://github.com/opensearch-project/OpenSearch/pull/19019))

modules/reindex/src/main/java/org/opensearch/index/reindex/BulkByScrollParallelizationHelper.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@
3636
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
3737
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
3838
import org.opensearch.action.search.SearchRequest;
39+
import org.opensearch.cluster.metadata.IndexMetadata;
40+
import org.opensearch.cluster.metadata.Metadata;
3941
import org.opensearch.cluster.node.DiscoveryNode;
4042
import org.opensearch.core.action.ActionListener;
4143
import org.opensearch.core.index.Index;
4244
import org.opensearch.core.tasks.TaskId;
45+
import org.opensearch.index.IndexSettings;
4346
import org.opensearch.index.mapper.IdFieldMapper;
4447
import org.opensearch.search.builder.SearchSourceBuilder;
4548
import org.opensearch.search.slice.SliceBuilder;
@@ -74,6 +77,7 @@ private BulkByScrollParallelizationHelper() {}
7477
* This method is equivalent to calling {@link #initTaskState} followed by {@link #executeSlicedAction}
7578
*/
7679
static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAction(
80+
Metadata metadata,
7781
Request request,
7882
BulkByScrollTask task,
7983
ActionType<BulkByScrollResponse> action,
@@ -85,7 +89,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
8589
initTaskState(task, request, client, new ActionListener<Void>() {
8690
@Override
8791
public void onResponse(Void aVoid) {
88-
executeSlicedAction(task, request, action, listener, client, node, workerAction);
92+
executeSlicedAction(metadata, task, request, action, listener, client, node, workerAction);
8993
}
9094

9195
@Override
@@ -106,6 +110,7 @@ public void onFailure(Exception e) {
106110
* This method can only be called after the task state is initialized {@link #initTaskState}.
107111
*/
108112
static <Request extends AbstractBulkByScrollRequest<Request>> void executeSlicedAction(
113+
Metadata metadata,
109114
BulkByScrollTask task,
110115
Request request,
111116
ActionType<BulkByScrollResponse> action,
@@ -115,7 +120,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
115120
Runnable workerAction
116121
) {
117122
if (task.isLeader()) {
118-
sendSubRequests(client, action, node.getId(), task, request, listener);
123+
sendSubRequests(metadata, client, action, node.getId(), task, request, listener);
119124
} else if (task.isWorker()) {
120125
workerAction.run();
121126
} else {
@@ -182,6 +187,7 @@ private static int countSlicesBasedOnShards(ClusterSearchShardsResponse response
182187
}
183188

184189
private static <Request extends AbstractBulkByScrollRequest<Request>> void sendSubRequests(
190+
Metadata metadata,
185191
Client client,
186192
ActionType<BulkByScrollResponse> action,
187193
String localNodeId,
@@ -192,6 +198,24 @@ private static <Request extends AbstractBulkByScrollRequest<Request>> void sendS
192198

193199
LeaderBulkByScrollTaskState worker = task.getLeaderState();
194200
int totalSlices = worker.getSlices();
201+
for (String index : request.getSearchRequest().indices()) {
202+
IndexMetadata indexMetadata = metadata.index(index);
203+
if (indexMetadata != null && IndexSettings.MAX_SLICES_PER_SCROLL.get(indexMetadata.getSettings()) < totalSlices) {
204+
throw new IllegalArgumentException(
205+
"The number of slices ["
206+
+ totalSlices
207+
+ "] is too large. It must "
208+
+ "be less than ["
209+
+ IndexSettings.MAX_SLICES_PER_SCROLL.get(indexMetadata.getSettings())
210+
+ "]. "
211+
+ "This limit can be set by changing the ["
212+
+ IndexSettings.MAX_SLICES_PER_SCROLL.getKey()
213+
+ "] index"
214+
+ " level setting."
215+
);
216+
}
217+
}
218+
195219
TaskId parentTaskId = new TaskId(localNodeId, task.getId());
196220
for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), IdFieldMapper.NAME, totalSlices)) {
197221
// TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general....

modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
133133
public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
134134
ActionListener<BulkByScrollResponse> remoteReindexActionListener = getRemoteReindexWrapperListener(listener, request);
135135
BulkByScrollParallelizationHelper.executeSlicedAction(
136+
clusterService.state().metadata(),
136137
task,
137138
request,
138139
ReindexAction.INSTANCE,

modules/reindex/src/main/java/org/opensearch/index/reindex/TransportDeleteByQueryAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public TransportDeleteByQueryAction(
7777
public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
7878
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
7979
BulkByScrollParallelizationHelper.startSlicedAction(
80+
clusterService.state().metadata(),
8081
request,
8182
bulkByScrollTask,
8283
DeleteByQueryAction.INSTANCE,

modules/reindex/src/main/java/org/opensearch/index/reindex/TransportUpdateByQueryAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public TransportUpdateByQueryAction(
8787
protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
8888
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
8989
BulkByScrollParallelizationHelper.startSlicedAction(
90+
clusterService.state().metadata(),
9091
request,
9192
bulkByScrollTask,
9293
UpdateByQueryAction.INSTANCE,

modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexBasicTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import static org.opensearch.index.query.QueryBuilders.termQuery;
5353
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
5454
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
55+
import static org.hamcrest.Matchers.containsString;
5556
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5657
import static org.hamcrest.Matchers.hasSize;
5758
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -483,4 +484,21 @@ private void verifyTransformedContent(String indexName, int expectedCount) {
483484
assertNotNull(source.get("date_field"));
484485
}
485486
}
487+
488+
public void testTooMuchSlices() throws InterruptedException {
489+
indexRandom(
490+
true,
491+
client().prepareIndex("source").setId("1").setSource("foo", "a"),
492+
client().prepareIndex("source").setId("2").setSource("foo", "a"),
493+
client().prepareIndex("source").setId("3").setSource("foo", "b"),
494+
client().prepareIndex("source").setId("4").setSource("foo", "c")
495+
);
496+
assertHitCount(client().prepareSearch("source").setSize(0).get(), 4);
497+
498+
int slices = 2000;
499+
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
500+
reindex().source("source").destination("dest").refresh(true).setSlices(slices).get();
501+
});
502+
assertThat(e.getMessage(), containsString("is too large"));
503+
}
486504
}

0 commit comments

Comments
 (0)