From f4b655a07c532798d28a62e57f1cbaa0abfb363d Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Mon, 2 Mar 2026 13:27:53 -0500 Subject: [PATCH 01/12] WIP --- .../elasticsearch/reindex/ReindexMetrics.java | 56 +++++++++++++++--- .../org/elasticsearch/reindex/Reindexer.java | 57 +++++++++++++------ 2 files changed, 88 insertions(+), 25 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index cd934614e6db3..aee0a1af8af2c 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -10,6 +10,8 @@ package org.elasticsearch.reindex; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.telemetry.metric.LongCounter; import org.elasticsearch.telemetry.metric.LongHistogram; import org.elasticsearch.telemetry.metric.MeterRegistry; @@ -29,6 +31,8 @@ public class ReindexMetrics { public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local"; public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote"; + public static final String ATTRIBUTE_NAME_SLICING_MODE = "slicing_mode"; + private final LongHistogram reindexTimeSecsHistogram; private final LongCounter reindexCompletionCounter; @@ -41,23 +45,23 @@ public ReindexMetrics(MeterRegistry meterRegistry) { ); } - public long recordTookTime(long tookTime, boolean remote) { - Map attributes = getAttributes(remote); + public long recordTookTime(long tookTime, boolean remote, SlicingMode slicingMode) { + Map attributes = getAttributes(remote, slicingMode); reindexTimeSecsHistogram.record(tookTime, attributes); return tookTime; } - public void recordSuccess(boolean remote) { - Map attributes = getAttributes(remote); + public void recordSuccess(boolean remote, SlicingMode slicingMode) { + Map attributes = getAttributes(remote, slicingMode); // attribute ATTRIBUTE_ERROR_TYPE being absent indicates success assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes"; reindexCompletionCounter.incrementBy(1, attributes); } - public void recordFailure(boolean remote, Throwable e) { - Map attributes = getAttributes(remote); + public void recordFailure(boolean remote, SlicingMode slicingMode, Throwable e) { + Map attributes = getAttributes(remote, slicingMode); // best effort to extract useful error type if possible String errorType; if (e instanceof ElasticsearchStatusException ese) { @@ -74,9 +78,47 @@ public void recordFailure(boolean remote, Throwable e) { reindexCompletionCounter.incrementBy(1, attributes); } - private Map getAttributes(boolean remote) { + public enum SlicingMode { + // slices resolved automatically from source index shard count (e.g. ?slices=auto) + AUTO("auto"), + // caller specifies a fixed slice count (e.g. ?slices=4) + FIXED("fixed"), + // caller provides slice id (e.g. "slice": { "id": 0, "max": 4 }) + MANUAL("manual"), + // no slicing (e.g. ?slices=1) + NONE("none"); + + private final String value; + + SlicingMode(String value) { + this.value = value; + } + + public String value() { + return value; + } + } + + /** + * Determines the {@link SlicingMode} from a reindex request. + */ + public static SlicingMode resolveSlicingMode(ReindexRequest request) { + if (request.getSearchRequest().source() != null && request.getSearchRequest().source().slice() != null) { + return SlicingMode.MANUAL; + } + int slices = request.getSlices(); + if (slices == AbstractBulkByScrollRequest.AUTO_SLICES) { + return SlicingMode.AUTO; + } else if (slices > 1) { + return SlicingMode.FIXED; + } + return SlicingMode.NONE; + } + + private Map getAttributes(boolean remote, SlicingMode slicingMode) { Map attributes = new HashMap<>(); attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL); + attributes.put(ATTRIBUTE_NAME_SLICING_MODE, slicingMode.value()); return attributes; } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index b047963afcfed..daa3ec60c2e1a 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -162,24 +162,32 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl projectResolver.getProjectState(clusterService.state()), reindexSslConfig, request, - workerListenerWithRelocationAndMetrics(listenerWithRelocations, startTime, request.getRemoteInfo() != null), + workerListenerWithRelocationAndMetrics(listenerWithRelocations, task, request, startTime), remoteVersion ); searchAction.start(); }; + final ActionListener responseListener = wrapWithMetrics( + listenerWithRelocations, + reindexMetrics, + task, + request, + startTime + ); + /** * If this is a request to reindex from remote, then we need to determine the remote version prior to execution * NB {@link ReindexRequest} forbids remote requests and slices > 1, so we're guaranteed to be running on the only slice */ if (REINDEX_PIT_SEARCH_ENABLED && request.getRemoteInfo() != null) { - lookupRemoteVersionAndExecute(task, request, listenerWithRelocations, workerAction); + lookupRemoteVersionAndExecute(task, request, responseListener, workerAction); } else { BulkByPaginatedSearchParallelizationHelper.executeSlicedAction( task, request, ReindexAction.INSTANCE, - listenerWithRelocations, + responseListener, client, clusterService.localNode(), null, @@ -196,7 +204,7 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl private void lookupRemoteVersionAndExecute( BulkByScrollTask task, ReindexRequest request, - ActionListener listenerWithRelocations, + ActionListener listener, Consumer workerAction ) { RemoteInfo remoteInfo = request.getRemoteInfo(); @@ -211,7 +219,7 @@ public void onResponse(Version version) { task, request, ReindexAction.INSTANCE, - listenerWithRelocations, + listener, client, clusterService.localNode(), version, @@ -222,12 +230,12 @@ public void onResponse(Version version) { @Override public void onFailure(Exception e) { - closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e)); + closeRestClientAndRun(restClient, () -> listener.onFailure(e)); } @Override public void onRejection(Exception e) { - closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e)); + closeRestClientAndRun(restClient, () -> listener.onFailure(e)); } }; RemoteReindexingUtils.lookupRemoteVersionWithRetries( @@ -259,14 +267,16 @@ private void closeRestClientAndRun(RestClient restClient, Runnable onCompletion) /** Wraps the listener with metrics tracking and relocation handling (if applicable). Visible for testing. */ ActionListener workerListenerWithRelocationAndMetrics( ActionListener potentiallyWrappedRelocationListener, - long startTime, - boolean isRemote + BulkByScrollTask task, + ReindexRequest request, + long startTime ) { final ActionListener metricListener = wrapWithMetrics( potentiallyWrappedRelocationListener, reindexMetrics, - startTime, - isRemote + task, + request, + startTime ); return metricListener.delegateFailure((l, resp) -> { @@ -282,16 +292,27 @@ ActionListener workerListenerWithRelocationAndMetrics( }); } - // Visible for testing + /** + * Wrap listener with reindex metrics. For sliced reindex, this should record once only when all slices complete + * Visible for testing + */ static ActionListener wrapWithMetrics( ActionListener listener, @Nullable ReindexMetrics metrics, - long startTime, - boolean isRemote + BulkByScrollTask task, + ReindexRequest request, + long startTime ) { if (metrics == null) { return listener; } + if (task.isWorker() && task.getParentTaskId().isSet()) { + // Do not record metrics for slice worker, only leader will record them when all slices are completed + // Potentially add slice-level metrics for slice worker + return listener; + } + final boolean isRemote = request.getRemoteInfo() != null; + final ReindexMetrics.SlicingMode slicingMode = ReindexMetrics.resolveSlicingMode(request); // todo(szy): add relocation metrics // add completion metrics var withCompletionMetrics = new ActionListener() { @@ -310,17 +331,17 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) { if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) { // record only the first sample error in metric Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get); - metrics.recordFailure(isRemote, e); + metrics.recordFailure(isRemote, slicingMode, e); listener.onResponse(bulkByScrollResponse); } else { - metrics.recordSuccess(isRemote); + metrics.recordSuccess(isRemote, slicingMode); listener.onResponse(bulkByScrollResponse); } } @Override public void onFailure(Exception e) { - metrics.recordFailure(isRemote, e); + metrics.recordFailure(isRemote, slicingMode, e); listener.onFailure(e); } }; @@ -328,7 +349,7 @@ public void onFailure(Exception e) { // add duration metric return ActionListener.runAfter(withCompletionMetrics, () -> { long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); - metrics.recordTookTime(elapsedTime, isRemote); + metrics.recordTookTime(elapsedTime, isRemote, slicingMode); }); } From 290a8cf26f23a87bfa3662cb83c506612d97c6a2 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Mon, 2 Mar 2026 13:55:53 -0500 Subject: [PATCH 02/12] unit tests --- .../reindex/ReindexMetricsTests.java | 52 +++++-- .../elasticsearch/reindex/ReindexerTests.java | 142 +++++++++++++++--- 2 files changed, 157 insertions(+), 37 deletions(-) diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index 9da6b8b3166bc..21ef9dcb323d6 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -10,7 +10,10 @@ package org.elasticsearch.reindex; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.RecordingMeterRegistry; @@ -20,11 +23,13 @@ import java.util.List; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_COUNTER; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.SlicingMode; public class ReindexMetricsTests extends ESTestCase { @@ -40,17 +45,16 @@ public void createMetrics() { public void testRecordTookTime() { long secondsTaken = randomLongBetween(1, Long.MAX_VALUE); - // first metric - metrics.recordTookTime(secondsTaken, false); + metrics.recordTookTime(secondsTaken, false, SlicingMode.NONE); List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); assertEquals(1, measurements.size()); assertEquals(secondsTaken, measurements.getFirst().getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(SlicingMode.NONE.value(), measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); - // second metric long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE); - metrics.recordTookTime(remoteSecondsTaken, true); + metrics.recordTookTime(remoteSecondsTaken, true, SlicingMode.AUTO); measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); assertEquals(2, measurements.size()); @@ -58,40 +62,40 @@ public void testRecordTookTime() { assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); assertEquals(remoteSecondsTaken, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(SlicingMode.AUTO.value(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); } public void testRecordSuccess() { - // first metric - metrics.recordSuccess(false); + metrics.recordSuccess(false, SlicingMode.NONE); List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(1, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(SlicingMode.NONE.value(), measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); assertNull(measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); - // second metric - metrics.recordSuccess(true); + metrics.recordSuccess(true, SlicingMode.FIXED); measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(2, measurements.size()); assertEquals(1, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(SlicingMode.FIXED.value(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); assertNull(measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); } public void testRecordFailure() { - // first metric - metrics.recordFailure(false, new IllegalArgumentException("random failure")); + metrics.recordFailure(false, SlicingMode.NONE, new IllegalArgumentException("random failure")); List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(1, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); assertEquals("java.lang.IllegalArgumentException", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(SlicingMode.NONE.value(), measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); - // second metric - metrics.recordFailure(true, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); + metrics.recordFailure(true, SlicingMode.AUTO, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(2, measurements.size()); @@ -99,5 +103,29 @@ public void testRecordFailure() { assertEquals(1, measurements.get(1).getLong()); assertEquals(RestStatus.BAD_REQUEST.name(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(SlicingMode.AUTO.value(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + } + + public void testResolveSlicingModeNone() { + ReindexRequest request = new ReindexRequest(); + assertEquals(SlicingMode.NONE, ReindexMetrics.resolveSlicingMode(request)); + } + + public void testResolveSlicingModeFixed() { + ReindexRequest request = new ReindexRequest(); + request.setSlices(5); + assertEquals(SlicingMode.FIXED, ReindexMetrics.resolveSlicingMode(request)); + } + + public void testResolveSlicingModeAuto() { + ReindexRequest request = new ReindexRequest(); + request.setSlices(0); + assertEquals(SlicingMode.AUTO, ReindexMetrics.resolveSlicingMode(request)); + } + + public void testResolveSlicingModeManual() { + ReindexRequest request = new ReindexRequest(); + request.getSearchRequest().source(new SearchSourceBuilder().slice(new SliceBuilder(0, 3))); + assertEquals(SlicingMode.MANUAL, ReindexMetrics.resolveSlicingMode(request)); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index 25e599a9772b0..05f95224d5509 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -57,38 +57,43 @@ public class ReindexerTests extends ESTestCase { + // --- wrapWithMetrics tests --- + public void testWrapWithMetricsSuccess() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); wrapped.onResponse(response); verify(listener).onResponse(response); - verify(metrics).recordSuccess(true); - verify(metrics, never()).recordFailure(anyBoolean(), any()); - verify(metrics).recordTookTime(anyLong(), eq(true)); + verify(metrics).recordSuccess(eq(false), any()); + verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); } public void testWrapWithMetricsFailure() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); Exception exception = new Exception("random failure"); wrapped.onFailure(exception); verify(listener).onFailure(exception); - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics).recordFailure(true, exception); - verify(metrics).recordTookTime(anyLong(), eq(true)); + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics).recordFailure(eq(false), any(), eq(exception)); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); } public void testWrapWithMetricsBulkFailure() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), false); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); Exception exception = new Exception("random failure"); Exception anotherException = new Exception("another failure"); @@ -99,15 +104,16 @@ public void testWrapWithMetricsBulkFailure() { wrapped.onResponse(response); verify(listener).onResponse(response); - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics).recordFailure(false, exception); - verify(metrics).recordTookTime(anyLong(), eq(false)); + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics).recordFailure(eq(false), any(), eq(exception)); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); } public void testWrapWithMetricsSearchFailure() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); Exception exception = new Exception("random failure"); Exception anotherException = new Exception("another failure"); @@ -118,9 +124,36 @@ public void testWrapWithMetricsSearchFailure() { wrapped.onResponse(response); verify(listener).onResponse(response); - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics).recordFailure(true, exception); - verify(metrics).recordTookTime(anyLong(), eq(true)); + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics).recordFailure(eq(false), any(), eq(exception)); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); + } + + public void testWrapWithMetricsSkipsSliceWorker() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + BulkByScrollTask task = createSliceWorkerTask(); + + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); + + assertSame(listener, wrapped); + verifyNoMoreInteractions(metrics); + } + + public void testWrapWithMetricsWrapsLeader() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + BulkByScrollTask task = createLeaderTask(); + + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); + + assertNotSame(listener, wrapped); + + BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); + wrapped.onResponse(response); + + verify(metrics).recordSuccess(eq(false), any()); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); } // listenerWithRelocations tests @@ -223,17 +256,16 @@ public void testWorkerListenerSkipsMetricsWhenRelocating() { final ReindexMetrics metrics = mock(); final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); final ActionListener outer = spy(ActionListener.noop()); + final BulkByScrollTask task = createNonSlicedWorkerTask(); - final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, randomNonNegativeLong(), randomBoolean()); + final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, task, reindexRequest(), randomNonNegativeLong()); final BulkByScrollResponse response = reindexResponseWithResumeInfo(); wrapped.onResponse(response); - // metrics should NOT be recorded for a relocation response - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics, never()).recordFailure(anyBoolean(), any()); - verify(metrics, never()).recordTookTime(anyLong(), anyBoolean()); - // outer listener should still receive the response + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); + verify(metrics, never()).recordTookTime(anyLong(), anyBoolean(), any()); verify(outer).onResponse(response); verifyNoMoreInteractions(metrics, outer); @@ -244,19 +276,37 @@ public void testWorkerListenerRecordsMetricsForNormalResponse() { final ReindexMetrics metrics = mock(); final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); final ActionListener outer = spy(ActionListener.noop()); + final BulkByScrollTask task = createNonSlicedWorkerTask(); - final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, randomNonNegativeLong(), true); + final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, task, reindexRequest(), randomNonNegativeLong()); final BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); wrapped.onResponse(response); verify(outer).onResponse(response); - verify(metrics).recordSuccess(true); - verify(metrics).recordTookTime(anyLong(), eq(true)); + verify(metrics).recordSuccess(eq(false), any()); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); verifyNoMoreInteractions(metrics, outer); } + public void testWorkerListenerSkipsMetricsForSliceWorker() { + assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); + final ReindexMetrics metrics = mock(); + final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); + final ActionListener outer = spy(ActionListener.noop()); + final BulkByScrollTask task = createSliceWorkerTask(); + + final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, task, reindexRequest(), randomNonNegativeLong()); + + final BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); + wrapped.onResponse(response); + + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); + verify(metrics, never()).recordTookTime(anyLong(), anyBoolean(), any()); + } + // --- helpers --- private BulkByScrollResponse reindexResponseWithBulkAndSearchFailures( @@ -289,6 +339,48 @@ private BulkByScrollResponse reindexResponseWithResumeInfo() { ); } + private static BulkByScrollTask createNonSlicedWorkerTask() { + BulkByScrollTask task = new BulkByScrollTask( + 1, + "test_type", + "test_action", + "test", + TaskId.EMPTY_TASK_ID, + Collections.emptyMap(), + false + ); + task.setWorker(Float.POSITIVE_INFINITY, null); + return task; + } + + private static BulkByScrollTask createSliceWorkerTask() { + BulkByScrollTask task = new BulkByScrollTask( + 2, + "test_type", + "test_action", + "test", + new TaskId("node", 1), + Collections.emptyMap(), + false + ); + task.setWorker(randomFloat(), 0); + return task; + } + + private static BulkByScrollTask createLeaderTask() { + BulkByScrollTask task = new BulkByScrollTask( + 3, + "test_type", + "test_action", + "test", + TaskId.EMPTY_TASK_ID, + Collections.emptyMap(), + false + ); + task.setWorkerCount(randomIntBetween(2, 10)); + return task; + } + private static BulkByScrollTask createTaskWithParentIdAndRelocationEnabled(final TaskId parentTaskId) { return new BulkByScrollTask(987, "test_type", "test_action", "test", parentTaskId, Collections.emptyMap(), true); } From b224f6a147dc7f445b5bff8cae12e60d5766abb7 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Mon, 2 Mar 2026 14:00:02 -0500 Subject: [PATCH 03/12] add IT --- .../index/reindex/ReindexPluginMetricsIT.java | 79 ++++++++++++++++++- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java index dcb4c92a0d4c9..e36d5e8f4d6a6 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java @@ -33,6 +33,7 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; @@ -115,6 +116,7 @@ public void testReindexFromRemoteMetrics() throws Exception { assertThat(completions.size(), equalTo(1)); assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name())); assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); }); // now create the source index @@ -131,6 +133,7 @@ public void testReindexFromRemoteMetrics() throws Exception { assertThat(completions.size(), equalTo(2)); assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); }); } @@ -189,6 +192,7 @@ public void testReindexMetrics() throws Exception { testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).forEach(m -> { assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + assertThat(m.attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); }); }); } @@ -211,13 +215,80 @@ public void testReindexMetricsWithBulkFailure() throws Exception { testTelemetryPlugin.collect(); assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(1)); + Measurement completion = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).getFirst(); assertThat( - testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER) - .getFirst() - .attributes() - .get(ATTRIBUTE_NAME_ERROR_TYPE), + completion.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo("org.elasticsearch.index.mapper.DocumentParsingException") ); + assertThat(completion.attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); + }); + } + + public void testReindexMetricsWithFixedSlices() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + indexRandom( + true, + prepareIndex("source").setId("1").setSource("foo", "a"), + prepareIndex("source").setId("2").setSource("foo", "b"), + prepareIndex("source").setId("3").setSource("foo", "c"), + prepareIndex("source").setId("4").setSource("foo", "d") + ); + assertHitCount(prepareSearch("source").setSize(0), 4); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + reindex().source("source").destination("dest_fixed").setSlices(2).get(); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + List histograms = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); + assertThat(histograms.size(), equalTo(1)); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("fixed")); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + + List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("fixed")); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + }); + } + + public void testReindexMetricsWithAutoSlices() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + indexRandom( + true, + prepareIndex("source").setId("1").setSource("foo", "a"), + prepareIndex("source").setId("2").setSource("foo", "b"), + prepareIndex("source").setId("3").setSource("foo", "c"), + prepareIndex("source").setId("4").setSource("foo", "d") + ); + assertHitCount(prepareSearch("source").setSize(0), 4); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + reindex().source("source").destination("dest_auto").setSlices(AbstractBulkByScrollRequest.AUTO_SLICES).get(); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + List histograms = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); + assertThat(histograms.size(), equalTo(1)); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("auto")); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + + List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("auto")); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); }); } From d82ebec66081dec702b99980a159e2d06dac4388 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Mon, 2 Mar 2026 14:10:52 -0500 Subject: [PATCH 04/12] minor --- .../main/java/org/elasticsearch/reindex/ReindexMetrics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index aee0a1af8af2c..1c8227a9725db 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -81,9 +81,9 @@ public void recordFailure(boolean remote, SlicingMode slicingMode, Throwable e) public enum SlicingMode { // slices resolved automatically from source index shard count (e.g. ?slices=auto) AUTO("auto"), - // caller specifies a fixed slice count (e.g. ?slices=4) + // reindex request specifies a fixed slice count (e.g. ?slices=4) FIXED("fixed"), - // caller provides slice id (e.g. "slice": { "id": 0, "max": 4 }) + // reindex request specifies a slice id (e.g. "slice": { "id": 0, "max": 4 }) MANUAL("manual"), // no slicing (e.g. ?slices=1) NONE("none"); From 180b17d3c2eb044c9fb3541c046969d7ba098775 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Mon, 2 Mar 2026 14:23:13 -0500 Subject: [PATCH 05/12] minor test --- .../elasticsearch/reindex/ReindexMetricsTests.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index 21ef9dcb323d6..0e6280602fdb3 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.reindex; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -111,15 +112,21 @@ public void testResolveSlicingModeNone() { assertEquals(SlicingMode.NONE, ReindexMetrics.resolveSlicingMode(request)); } + public void testResolveSlicingModeNoneOneSlice() { + ReindexRequest request = new ReindexRequest(); + request.setSlices(1); + assertEquals(SlicingMode.NONE, ReindexMetrics.resolveSlicingMode(request)); + } + public void testResolveSlicingModeFixed() { ReindexRequest request = new ReindexRequest(); - request.setSlices(5); + request.setSlices(randomIntBetween(2, 20)); assertEquals(SlicingMode.FIXED, ReindexMetrics.resolveSlicingMode(request)); } public void testResolveSlicingModeAuto() { ReindexRequest request = new ReindexRequest(); - request.setSlices(0); + request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); assertEquals(SlicingMode.AUTO, ReindexMetrics.resolveSlicingMode(request)); } From 8e08af845d663fe7768cd74a86a6b6ef6f7621ab Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Mon, 2 Mar 2026 16:51:25 -0500 Subject: [PATCH 06/12] fix metric name --- .../src/main/java/org/elasticsearch/reindex/ReindexMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index 1c8227a9725db..d49ee17f35e8b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -31,7 +31,7 @@ public class ReindexMetrics { public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local"; public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote"; - public static final String ATTRIBUTE_NAME_SLICING_MODE = "slicing_mode"; + public static final String ATTRIBUTE_NAME_SLICING_MODE = "es_reindex_slicing_mode"; private final LongHistogram reindexTimeSecsHistogram; private final LongCounter reindexCompletionCounter; From 1968c60280c0a1dd4cd0b0062b4e5db98b9b8c81 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 4 Mar 2026 18:13:52 -0500 Subject: [PATCH 07/12] Fix relocated sliced metrics --- .../management/ReindexRelocationIT.java | 26 +++++--- .../elasticsearch/reindex/ReindexMetrics.java | 23 +++---- .../org/elasticsearch/reindex/Reindexer.java | 62 +++++++++++-------- .../reindex/ReindexMetricsTests.java | 22 +++++-- .../elasticsearch/reindex/ReindexerTests.java | 16 +++++ 5 files changed, 94 insertions(+), 55 deletions(-) diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java index 37c4e99c2a427..6f113582c4746 100644 --- a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java +++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.reindex.ReindexMetrics; +import org.elasticsearch.reindex.ReindexMetrics.SlicingMode; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.rest.root.MainRestPlugin; @@ -53,6 +54,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -516,14 +518,24 @@ private void assertReindexSuccessMetricsOnNode(final String nodeName, final bool final TestTelemetryPlugin plugin = getTelemetryPlugin(nodeName); plugin.collect(); final List completions = plugin.getLongCounterMeasurement(ReindexMetrics.REINDEX_COMPLETION_COUNTER); - assertThat(completions.size(), equalTo(slices)); - for (final Measurement completion : completions) { - assertNull(completion.attributes().get(ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE)); - final String expectedSource = isRemote - ? ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE - : ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; - assertThat(completion.attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SOURCE), equalTo(expectedSource)); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE)); + final String expectedSource = isRemote ? ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE : ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; + assertThat(completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SOURCE), equalTo(expectedSource)); + SlicingMode slicingMode = null; + if (slices == 0) { + slicingMode = SlicingMode.AUTO; + } else if (slices == 1) { + slicingMode = SlicingMode.NONE; + } else if (slices > 1) { + slicingMode = SlicingMode.FIXED; + } else { + fail("invalid slices value: " + slices); } + assertThat( + completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE), + equalTo(slicingMode.name().toLowerCase(Locale.ROOT)) + ); } private void assertExpectedNumberOfDocumentsInDestinationIndex() throws IOException { diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index d49ee17f35e8b..596cf89153487 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -17,6 +17,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; import java.util.HashMap; +import java.util.Locale; import java.util.Map; public class ReindexMetrics { @@ -80,30 +81,20 @@ public void recordFailure(boolean remote, SlicingMode slicingMode, Throwable e) public enum SlicingMode { // slices resolved automatically from source index shard count (e.g. ?slices=auto) - AUTO("auto"), + AUTO(), // reindex request specifies a fixed slice count (e.g. ?slices=4) - FIXED("fixed"), + FIXED(), // reindex request specifies a slice id (e.g. "slice": { "id": 0, "max": 4 }) - MANUAL("manual"), + MANUAL(), // no slicing (e.g. ?slices=1) - NONE("none"); - - private final String value; - - SlicingMode(String value) { - this.value = value; - } - - public String value() { - return value; - } + NONE(); } /** * Determines the {@link SlicingMode} from a reindex request. */ public static SlicingMode resolveSlicingMode(ReindexRequest request) { - if (request.getSearchRequest().source() != null && request.getSearchRequest().source().slice() != null) { + if (request.getSearchRequest().source().slice() != null) { return SlicingMode.MANUAL; } int slices = request.getSlices(); @@ -118,7 +109,7 @@ public static SlicingMode resolveSlicingMode(ReindexRequest request) { private Map getAttributes(boolean remote, SlicingMode slicingMode) { Map attributes = new HashMap<>(); attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL); - attributes.put(ATTRIBUTE_NAME_SLICING_MODE, slicingMode.value()); + attributes.put(ATTRIBUTE_NAME_SLICING_MODE, slicingMode.name().toLowerCase(Locale.ROOT)); return attributes; } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 6de63b84ff700..7f14d909d3677 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -322,43 +322,53 @@ static ActionListener wrapWithMetrics( final boolean isRemote = request.getRemoteInfo() != null; final ReindexMetrics.SlicingMode slicingMode = ReindexMetrics.resolveSlicingMode(request); // todo(szy): add relocation metrics - // add completion metrics - var withCompletionMetrics = new ActionListener() { + return new ActionListener<>() { + private void recordDuration() { + long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); + metrics.recordTookTime(elapsedTime, isRemote, slicingMode); + } + @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures()) - .stream() - .flatMap(List::stream) - .map(PaginatedHitSource.SearchFailure::getReason) - .findFirst(); - var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures()) - .stream() - .flatMap(List::stream) - .map(BulkItemResponse.Failure::getCause) - .findFirst(); - if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) { - // record only the first sample error in metric - Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get); - metrics.recordFailure(isRemote, slicingMode, e); + if (bulkByScrollResponse.getTaskResumeInfo().isPresent()) { + // Task is being relocated; do not record metrics on the source node, the destination node will record metrics when + // the relocated task completes listener.onResponse(bulkByScrollResponse); - } else { - metrics.recordSuccess(isRemote, slicingMode); + return; + } + try { + var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures()) + .stream() + .flatMap(List::stream) + .map(PaginatedHitSource.SearchFailure::getReason) + .findFirst(); + var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures()) + .stream() + .flatMap(List::stream) + .map(BulkItemResponse.Failure::getCause) + .findFirst(); + if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) { + Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get); + metrics.recordFailure(isRemote, slicingMode, e); + } else { + metrics.recordSuccess(isRemote, slicingMode); + } listener.onResponse(bulkByScrollResponse); + } finally { + recordDuration(); } } @Override public void onFailure(Exception e) { - metrics.recordFailure(isRemote, slicingMode, e); - listener.onFailure(e); + try { + metrics.recordFailure(isRemote, slicingMode, e); + listener.onFailure(e); + } finally { + recordDuration(); + } } }; - - // add duration metric - return ActionListener.runAfter(withCompletionMetrics, () -> { - long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); - metrics.recordTookTime(elapsedTime, isRemote, slicingMode); - }); } /** Wraps the listener with relocation handling (if applicable). Visible for testing. */ diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index 0e6280602fdb3..7a4da0e2f0a56 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -22,6 +22,7 @@ import org.junit.Before; import java.util.List; +import java.util.Locale; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE; @@ -52,7 +53,10 @@ public void testRecordTookTime() { assertEquals(1, measurements.size()); assertEquals(secondsTaken, measurements.getFirst().getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.NONE.value(), measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals( + SlicingMode.NONE.name().toLowerCase(Locale.ROOT), + measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE) + ); long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE); metrics.recordTookTime(remoteSecondsTaken, true, SlicingMode.AUTO); @@ -63,7 +67,7 @@ public void testRecordTookTime() { assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); assertEquals(remoteSecondsTaken, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.AUTO.value(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals(SlicingMode.AUTO.name().toLowerCase(Locale.ROOT), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); } public void testRecordSuccess() { @@ -73,7 +77,10 @@ public void testRecordSuccess() { assertEquals(1, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.NONE.value(), measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals( + SlicingMode.NONE.name().toLowerCase(Locale.ROOT), + measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE) + ); assertNull(measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); metrics.recordSuccess(true, SlicingMode.FIXED); @@ -82,7 +89,7 @@ public void testRecordSuccess() { assertEquals(2, measurements.size()); assertEquals(1, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.FIXED.value(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals(SlicingMode.FIXED.name().toLowerCase(Locale.ROOT), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); assertNull(measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); } @@ -94,7 +101,10 @@ public void testRecordFailure() { assertEquals(1, measurements.getFirst().getLong()); assertEquals("java.lang.IllegalArgumentException", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.NONE.value(), measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals( + SlicingMode.NONE.name().toLowerCase(Locale.ROOT), + measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE) + ); metrics.recordFailure(true, SlicingMode.AUTO, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); @@ -104,7 +114,7 @@ public void testRecordFailure() { assertEquals(1, measurements.get(1).getLong()); assertEquals(RestStatus.BAD_REQUEST.name(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.AUTO.value(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals(SlicingMode.AUTO.name().toLowerCase(Locale.ROOT), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); } public void testResolveSlicingModeNone() { diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index 50e6df73186dc..f15701db1d85a 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -159,6 +159,22 @@ public void testWrapWithMetricsWrapsLeader() { verify(metrics).recordTookTime(anyLong(), eq(false), any()); } + public void testWrapWithMetricsSkipsMetricsWhenRelocating() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + BulkByScrollTask task = createNonSlicedWorkerTask(); + + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); + + BulkByScrollResponse response = reindexResponseWithResumeInfo(); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); + verify(metrics, never()).recordTookTime(anyLong(), anyBoolean(), any()); + } + // listenerWithRelocations tests public void testListenerWithRelocationsPassesThroughForWorkerWithLeaderParent() { From aea2eff4eb2c5c2b9fe4f15b4b8602215d13ec3f Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Thu, 5 Mar 2026 11:01:21 -0500 Subject: [PATCH 08/12] adress comments --- .../elasticsearch/reindex/ReindexMetrics.java | 8 +- .../org/elasticsearch/reindex/Reindexer.java | 32 +------ .../reindex/ReindexMetricsTests.java | 16 +--- .../elasticsearch/reindex/ReindexerTests.java | 94 ++++--------------- 4 files changed, 30 insertions(+), 120 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index 596cf89153487..17ad0a0af68a1 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -81,13 +81,13 @@ public void recordFailure(boolean remote, SlicingMode slicingMode, Throwable e) public enum SlicingMode { // slices resolved automatically from source index shard count (e.g. ?slices=auto) - AUTO(), + AUTO, // reindex request specifies a fixed slice count (e.g. ?slices=4) - FIXED(), + FIXED, // reindex request specifies a slice id (e.g. "slice": { "id": 0, "max": 4 }) - MANUAL(), + MANUAL, // no slicing (e.g. ?slices=1) - NONE(); + NONE } /** diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 7f14d909d3677..5aa288a380bee 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -170,7 +170,7 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl projectResolver.getProjectState(clusterService.state()), reindexSslConfig, request, - workerListenerWithRelocationAndMetrics(listenerWithRelocations, task, request, startTime), + wrapWithMetrics(listenerWithRelocations, reindexMetrics, task, request, startTime), remoteVersion ); searchAction.start(); @@ -272,34 +272,6 @@ private void closeRestClientAndRun(RestClient restClient, Runnable onCompletion) }); } - /** Wraps the listener with metrics tracking and relocation handling (if applicable). Visible for testing. */ - ActionListener workerListenerWithRelocationAndMetrics( - ActionListener potentiallyWrappedRelocationListener, - BulkByScrollTask task, - ReindexRequest request, - long startTime - ) { - final ActionListener metricListener = wrapWithMetrics( - potentiallyWrappedRelocationListener, - reindexMetrics, - task, - request, - startTime - ); - - return metricListener.delegateFailure((l, resp) -> { - // note: implicitly relies on TaskResumeInfo only being populated if a suitable node exists for relocating to. - final boolean willBeRelocatedThereforeDoNotRecordMetrics = resp.getTaskResumeInfo().isPresent(); - if (willBeRelocatedThereforeDoNotRecordMetrics) { - assert resp.getBulkFailures().isEmpty() : "bulk failures should be empty if relocating"; - assert resp.getSearchFailures().isEmpty() : "search failures should be empty if relocating"; - potentiallyWrappedRelocationListener.onResponse(resp); - return; - } - l.onResponse(resp); - }); - } - /** * Wrap listener with reindex metrics. For sliced reindex, this should record once only when all slices complete * Visible for testing @@ -333,6 +305,8 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) { if (bulkByScrollResponse.getTaskResumeInfo().isPresent()) { // Task is being relocated; do not record metrics on the source node, the destination node will record metrics when // the relocated task completes + assert bulkByScrollResponse.getBulkFailures().isEmpty() : "bulk failures should be empty if relocating"; + assert bulkByScrollResponse.getSearchFailures().isEmpty() : "search failures should be empty if relocating"; listener.onResponse(bulkByScrollResponse); return; } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index 7a4da0e2f0a56..25d5aac8e7bce 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -53,10 +53,7 @@ public void testRecordTookTime() { assertEquals(1, measurements.size()); assertEquals(secondsTaken, measurements.getFirst().getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals( - SlicingMode.NONE.name().toLowerCase(Locale.ROOT), - measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE) - ); + assertEquals("none", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE); metrics.recordTookTime(remoteSecondsTaken, true, SlicingMode.AUTO); @@ -67,7 +64,7 @@ public void testRecordTookTime() { assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); assertEquals(remoteSecondsTaken, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.AUTO.name().toLowerCase(Locale.ROOT), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals("auto", measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); } public void testRecordSuccess() { @@ -89,7 +86,7 @@ public void testRecordSuccess() { assertEquals(2, measurements.size()); assertEquals(1, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.FIXED.name().toLowerCase(Locale.ROOT), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals("fixed", measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); assertNull(measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); } @@ -101,10 +98,7 @@ public void testRecordFailure() { assertEquals(1, measurements.getFirst().getLong()); assertEquals("java.lang.IllegalArgumentException", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals( - SlicingMode.NONE.name().toLowerCase(Locale.ROOT), - measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE) - ); + assertEquals("none", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); metrics.recordFailure(true, SlicingMode.AUTO, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); @@ -114,7 +108,7 @@ public void testRecordFailure() { assertEquals(1, measurements.get(1).getLong()); assertEquals(RestStatus.BAD_REQUEST.name(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); - assertEquals(SlicingMode.AUTO.name().toLowerCase(Locale.ROOT), measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + assertEquals("auto", measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); } public void testResolveSlicingModeNone() { diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index f15701db1d85a..e65e64fe69f31 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -285,64 +285,6 @@ public void testListenerWithRelocationsTriggersRelocationWhenResumeInfoPresent() assertThat(exception.getMetadata("es.relocated_task_id"), equalTo(List.of("target-node:123"))); } - // --- workerListenerWithRelocationAndMetrics tests --- - - public void testWorkerListenerSkipsMetricsWhenRelocating() { - assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); - final ReindexMetrics metrics = mock(); - final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); - final ActionListener outer = spy(ActionListener.noop()); - final BulkByScrollTask task = createNonSlicedWorkerTask(); - - final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, task, reindexRequest(), randomNonNegativeLong()); - - final BulkByScrollResponse response = reindexResponseWithResumeInfo(); - wrapped.onResponse(response); - - verify(metrics, never()).recordSuccess(anyBoolean(), any()); - verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); - verify(metrics, never()).recordTookTime(anyLong(), anyBoolean(), any()); - verify(outer).onResponse(response); - - verifyNoMoreInteractions(metrics, outer); - } - - public void testWorkerListenerRecordsMetricsForNormalResponse() { - assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); - final ReindexMetrics metrics = mock(); - final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); - final ActionListener outer = spy(ActionListener.noop()); - final BulkByScrollTask task = createNonSlicedWorkerTask(); - - final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, task, reindexRequest(), randomNonNegativeLong()); - - final BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); - wrapped.onResponse(response); - - verify(outer).onResponse(response); - verify(metrics).recordSuccess(eq(false), any()); - verify(metrics).recordTookTime(anyLong(), eq(false), any()); - - verifyNoMoreInteractions(metrics, outer); - } - - public void testWorkerListenerSkipsMetricsForSliceWorker() { - assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); - final ReindexMetrics metrics = mock(); - final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); - final ActionListener outer = spy(ActionListener.noop()); - final BulkByScrollTask task = createSliceWorkerTask(); - - final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, task, reindexRequest(), randomNonNegativeLong()); - - final BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); - wrapped.onResponse(response); - - verify(metrics, never()).recordSuccess(anyBoolean(), any()); - verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); - verify(metrics, never()).recordTookTime(anyLong(), anyBoolean(), any()); - } - // --- helpers --- private BulkByScrollResponse reindexResponseWithBulkAndSearchFailures( @@ -377,13 +319,13 @@ private BulkByScrollResponse reindexResponseWithResumeInfo() { private static BulkByScrollTask createNonSlicedWorkerTask() { BulkByScrollTask task = new BulkByScrollTask( - 1, - "test_type", - "test_action", - "test", + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false + Map.of(), + randomBoolean() ); task.setWorker(Float.POSITIVE_INFINITY, null); return task; @@ -391,13 +333,13 @@ private static BulkByScrollTask createNonSlicedWorkerTask() { private static BulkByScrollTask createSliceWorkerTask() { BulkByScrollTask task = new BulkByScrollTask( - 2, - "test_type", - "test_action", - "test", + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), new TaskId("node", 1), - Collections.emptyMap(), - false + Map.of(), + randomBoolean() ); task.setWorker(randomFloat(), 0); return task; @@ -405,13 +347,13 @@ private static BulkByScrollTask createSliceWorkerTask() { private static BulkByScrollTask createLeaderTask() { BulkByScrollTask task = new BulkByScrollTask( - 3, - "test_type", - "test_action", - "test", + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false + Map.of(), + randomBoolean() ); task.setWorkerCount(randomIntBetween(2, 10)); return task; From 0ffebf5b17c8555c3cebf9cd3256f631e46b8dd2 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Thu, 5 Mar 2026 13:23:10 -0500 Subject: [PATCH 09/12] add tests --- .../management/ReindexRelocationIT.java | 138 +++++++++++++----- .../index/reindex/ReindexPluginMetricsIT.java | 37 +++++ 2 files changed, 141 insertions(+), 34 deletions(-) diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java index 6f113582c4746..8cb2924570c2d 100644 --- a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java +++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java @@ -112,13 +112,47 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } - public void testLocalReindexRelocation() throws Exception { - final int slices = randomIntBetween(1, 10); + public void testNonSlicedLocalReindexRelocation() throws Exception { + final int slices = 1; + testReindexRelocation( + (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), + localReindexDescription(), + slices, + false, + randomIntBetween(1, 5) + ); + } + + public void testFixedSlicedLocalReindexRelocation() throws Exception { + final int slices = randomIntBetween(2, 5); + testReindexRelocation( + (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), + localReindexDescription(), + slices, + false, + randomIntBetween(1, 5) + ); + } + + public void testAutoSlicedLocalReindexRelocation() throws Exception { + final int slices = 0; + testReindexRelocation( + (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), + localReindexDescription(), + slices, + false, + randomIntBetween(2, 5) + ); + } + + public void testAutoNonSlicedLocalReindexRelocation() throws Exception { + final int slices = 0; testReindexRelocation( (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), localReindexDescription(), slices, - false + false, + 1 // no slicing if only 1 shard ); } @@ -130,7 +164,7 @@ public void testNonSlicedRemoteReindexRelocation() throws Exception { .publishAddress() .address(); return startAsyncNonSlicedThrottledRemoteReindexOnNode(nodeBName, nodeAAddress); - }, remoteReindexDescription(), slices, true); + }, remoteReindexDescription(), slices, true, randomIntBetween(1, 5)); } // no test for remote sliced reindex since it's not allowed @@ -138,7 +172,8 @@ private void testReindexRelocation( final CheckedBiFunction startReindexGivenNodeAAndB, final Matcher expectedDescription, final int slices, - final boolean isRemote + final boolean isRemote, + final int shards ) throws Exception { assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); @@ -152,17 +187,18 @@ private void testReindexRelocation( final String nodeBId = nodeIdByName(nodeBName); ensureStableCluster(2); - createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName); - createIndexPinnedToNodeName(DEST_INDEX, nodeAName); + createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName, shards); + createIndexPinnedToNodeName(DEST_INDEX, nodeAName, shards); indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest); ensureGreen(SOURCE_INDEX, DEST_INDEX); // Start throttled async reindex on nodeB and check it has the expected state final TaskId originalTaskId = startReindexGivenNodeAAndB.apply(nodeAName, nodeBName); - final TaskResult originalReindex = getRunningReindex(originalTaskId); - assertThat("reindex should start on nodeB", originalReindex.getTask().taskId().getNodeId(), equalTo(nodeBId)); - assertRunningReindexTaskExpectedState(originalReindex.getTask(), expectedDescription, slices); - + assertBusy(() -> { + final TaskResult originalReindex = getRunningReindex(originalTaskId); + assertThat("reindex should start on nodeB", originalReindex.getTask().taskId().getNodeId(), equalTo(nodeBId)); + assertRunningReindexTaskExpectedState(originalReindex.getTask(), expectedDescription, slices, shards); + }); shutdownNodeNameAndRelocate(nodeBName); // Assert the original task is in .tasks index and has expected content (including relocated taskId on nodeA) @@ -170,18 +206,21 @@ private void testReindexRelocation( originalTaskId, nodeAId, expectedDescription, - slices + slices, + shards ); // Assert relocated reindex is running and has expected state - final TaskResult relocatedReindex = getRunningReindex(relocatedTaskId); - assertThat("relocated reindex should be on nodeA", relocatedReindex.getTask().taskId().getNodeId(), equalTo(nodeAId)); - assertRunningReindexTaskExpectedState(relocatedReindex.getTask(), expectedDescription, slices); + assertBusy(() -> { + final TaskResult relocatedReindex = getRunningReindex(relocatedTaskId); + assertThat("relocated reindex should be on nodeA", relocatedReindex.getTask().taskId().getNodeId(), equalTo(nodeAId)); + assertRunningReindexTaskExpectedState(relocatedReindex.getTask(), expectedDescription, slices, shards); + }); // Speed up reindex post-relocation to keep the test fast unthrottleReindex(relocatedTaskId); - assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, slices); + assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, slices, shards); // Assert nodeA recorded success metrics for the relocated reindex assertReindexSuccessMetricsOnNode(nodeAName, isRemote, slices); @@ -212,7 +251,8 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( final TaskId originalTaskId, final String relocatedNodeId, final Matcher expectedTaskDescription, - final int slices + final int slices, + final int shards ) { assertThat("task completed", originalResult.isCompleted(), is(true)); @@ -245,14 +285,15 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( assertThat(taskStatus.get("reason_cancelled"), is(nullValue())); assertThat((Integer) taskStatus.get("throttled_until_millis"), greaterThanOrEqualTo(0)); - if (slices >= 2) { + if (isSliced(slices, shards)) { + final int expectedSlices = getExpectedSlices(slices, shards); @SuppressWarnings("unchecked") final List> sliceStatuses = (List>) taskStatus.get("slices"); - assertThat(sliceStatuses.size(), equalTo(slices)); - for (int i = 0; i < slices; i++) { + assertThat(sliceStatuses.size(), equalTo(expectedSlices)); + for (int i = 0; i < expectedSlices; i++) { final Map slice = sliceStatuses.get(i); assertThat(slice.get("slice_id"), is(i)); - assertThat((double) slice.get("requests_per_second"), closeTo((double) requestsPerSecond / slices, 0.00001)); + assertThat((double) slice.get("requests_per_second"), closeTo((double) requestsPerSecond / expectedSlices, 0.00001)); } } else { assertThat(taskStatus.containsKey("slices"), is(false)); @@ -268,8 +309,12 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( return new TaskId(relocatedTaskId); } - private void assertRelocatedTaskExpectedEndState(final TaskId taskId, final Matcher expectedTaskDescription, final int slices) - throws Exception { + private void assertRelocatedTaskExpectedEndState( + final TaskId taskId, + final Matcher expectedTaskDescription, + final int slices, + final int shards + ) throws Exception { final SetOnce finishedResult = new SetOnce<>(); assertBusy(() -> finishedResult.set(getCompletedTaskResult(taskId)), 30, TimeUnit.SECONDS); @@ -317,10 +362,11 @@ private void assertRelocatedTaskExpectedEndState(final TaskId taskId, final Matc assertThat(taskStatus.get("reason_cancelled"), is(nullValue())); assertThat((Integer) taskStatus.get("throttled_until_millis"), greaterThanOrEqualTo(0)); - if (slices >= 2) { + if (isSliced(slices, shards)) { + final int expectedSlices = getExpectedSlices(slices, shards); @SuppressWarnings("unchecked") final List> responseSlices = (List>) innerResponse.get("slices"); - assertThat(responseSlices.size(), equalTo(slices)); + assertThat(responseSlices.size(), equalTo(expectedSlices)); int totalCreated = 0; for (Map slice : responseSlices) { assertThat(slice.get("requests_per_second"), is(-1.0)); @@ -336,7 +382,8 @@ private TaskId assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( final TaskId taskId, final String relocatedNodeId, final Matcher expectedTaskDescription, - final int slices + final int slices, + final int shards ) { ensureYellowAndNoInitializingShards(TaskResultsService.TASK_INDEX); // replicas won't be allocated assertNoFailures(indicesAdmin().prepareRefresh(TaskResultsService.TASK_INDEX).get()); @@ -353,14 +400,21 @@ private TaskId assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( throw new AssertionError("failed to parse task result from .tasks index", e); } - return assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId(result, taskId, relocatedNodeId, expectedTaskDescription, slices); + return assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( + result, + taskId, + relocatedNodeId, + expectedTaskDescription, + slices, + shards + ); } private TaskId startAsyncThrottledLocalReindexOnNode(final String nodeName, final int slices) throws Exception { try (RestClient restClient = createRestClient(nodeName)) { final Request request = new Request("POST", "/_reindex"); request.addParameter("wait_for_completion", "false"); - request.addParameter("slices", Integer.toString(slices)); + request.addParameter("slices", slices == 0 ? "auto" : Integer.toString(slices)); request.addParameter("requests_per_second", Integer.toString(requestsPerSecond)); request.setJsonEntity(Strings.format(""" { @@ -427,7 +481,8 @@ private TaskResult getRunningReindex(final TaskId taskId) { private void assertRunningReindexTaskExpectedState( final TaskInfo taskInfo, final Matcher expectedTaskDescription, - final int slices + final int slices, + final int shards ) { assertThat(taskInfo.action(), equalTo(ReindexAction.NAME)); assertThat(taskInfo.description(), is(expectedTaskDescription)); @@ -447,18 +502,33 @@ private void assertRunningReindexTaskExpectedState( assertThat(taskStatus.getSearchRetries(), is(0L)); assertThat(taskStatus.getThrottled(), greaterThanOrEqualTo(TimeValue.ZERO)); // sliced leader only reports on completed slices, so the status is completely empty until some slices complete - assertThat(taskStatus.getRequestsPerSecond(), equalTo(slices >= 2 ? 0.0f : requestsPerSecond)); + // assertThat(taskStatus.getRequestsPerSecond(), equalTo(isSliced(slices, shards) ? 0.0f : requestsPerSecond)); assertThat(taskStatus.getReasonCancelled(), is(nullValue())); assertThat(taskStatus.getThrottledUntil(), greaterThanOrEqualTo(TimeValue.ZERO)); - if (slices >= 2) { - final List expectedStatuses = Collections.nCopies(slices, null); + if (isSliced(slices, shards)) { + final int expectedSlices = getExpectedSlices(slices, shards); + final List expectedStatuses = Collections.nCopies(expectedSlices, null); assertThat("running slices statuses are null", taskStatus.getSliceStatuses(), equalTo(expectedStatuses)); } else { assertThat(taskStatus.getSliceStatuses().isEmpty(), is(true)); } } + private boolean isSliced(int slices, int shards) { + return slices > 1 || (slices == 0 && shards > 1); + } + + private int getExpectedSlices(int slices, int shards) { + if (slices > 1) { + return slices; + } else if (slices == 0) { + return Math.max(shards, 1); + } else { + return 1; + } + } + private TaskResult getCompletedTaskResult(final TaskId taskId) { final GetTaskResponse response = clusterAdmin().prepareGetTask(taskId).setWaitForCompletion(true).get(); final TaskResult task = response.getTask(); @@ -467,10 +537,10 @@ private TaskResult getCompletedTaskResult(final TaskId taskId) { return task; } - private void createIndexPinnedToNodeName(final String index, final String nodeName) { + private void createIndexPinnedToNodeName(final String index, final String nodeName, final int shards) { prepareCreate(index).setSettings( Settings.builder() - .put("index.number_of_shards", randomIntBetween(1, 3)) + .put("index.number_of_shards", shards) .put("index.number_of_replicas", 0) .put("index.routing.allocation.require._name", nodeName) ).get(); diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java index e36d5e8f4d6a6..653afed6f096b 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.rest.root.MainRestPlugin; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; @@ -258,6 +259,42 @@ public void testReindexMetricsWithFixedSlices() throws Exception { }); } + public void testReindexMetricsWithManualSlices() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + indexRandom( + true, + prepareIndex("source").setId("1").setSource("foo", "a"), + prepareIndex("source").setId("2").setSource("foo", "b"), + prepareIndex("source").setId("3").setSource("foo", "c"), + prepareIndex("source").setId("4").setSource("foo", "d") + ); + assertHitCount(prepareSearch("source").setSize(0), 4); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + ReindexRequestBuilder request = reindex().source("source").destination("dest_manual"); + request.source().slice(new SliceBuilder(0, 2)); + request.get(); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + List histograms = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); + assertThat(histograms.size(), equalTo(1)); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("manual")); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + + List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("manual")); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + }); + } + public void testReindexMetricsWithAutoSlices() throws Exception { final String dataNodeName = internalCluster().startNode(); From dadd1fbbbc54c58dd3ad12dc0808f81499fd460f Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Thu, 5 Mar 2026 13:32:27 -0500 Subject: [PATCH 10/12] fix --- .../elasticsearch/reindex/management/ReindexRelocationIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java index 8cb2924570c2d..eb42f58401219 100644 --- a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java +++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java @@ -502,7 +502,7 @@ private void assertRunningReindexTaskExpectedState( assertThat(taskStatus.getSearchRetries(), is(0L)); assertThat(taskStatus.getThrottled(), greaterThanOrEqualTo(TimeValue.ZERO)); // sliced leader only reports on completed slices, so the status is completely empty until some slices complete - // assertThat(taskStatus.getRequestsPerSecond(), equalTo(isSliced(slices, shards) ? 0.0f : requestsPerSecond)); + assertThat(taskStatus.getRequestsPerSecond(), equalTo(isSliced(slices, shards) ? 0.0f : requestsPerSecond)); assertThat(taskStatus.getReasonCancelled(), is(nullValue())); assertThat(taskStatus.getThrottledUntil(), greaterThanOrEqualTo(TimeValue.ZERO)); From 90ae2bbbe41cc907c3ed34ae42da321fe307a036 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Thu, 5 Mar 2026 14:38:11 -0500 Subject: [PATCH 11/12] minor --- .../src/main/java/org/elasticsearch/reindex/Reindexer.java | 4 ++-- .../test/java/org/elasticsearch/reindex/ReindexerTests.java | 4 +--- .../elasticsearch/index/reindex/BulkByScrollResponse.java | 5 ++++- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 5aa288a380bee..42cf314de5dd3 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -303,8 +303,8 @@ private void recordDuration() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { if (bulkByScrollResponse.getTaskResumeInfo().isPresent()) { - // Task is being relocated; do not record metrics on the source node, the destination node will record metrics when - // the relocated task completes + // Task will be relocated to a different node + // Do not record metrics on the source node, the destination node will record metrics when the relocated task completes assert bulkByScrollResponse.getBulkFailures().isEmpty() : "bulk failures should be empty if relocating"; assert bulkByScrollResponse.getSearchFailures().isEmpty() : "search failures should be empty if relocating"; listener.onResponse(bulkByScrollResponse); diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index e65e64fe69f31..3fd64ba3ed12f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -170,9 +170,7 @@ public void testWrapWithMetricsSkipsMetricsWhenRelocating() { wrapped.onResponse(response); verify(listener).onResponse(response); - verify(metrics, never()).recordSuccess(anyBoolean(), any()); - verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); - verify(metrics, never()).recordTookTime(anyLong(), anyBoolean(), any()); + verifyNoMoreInteractions(metrics); } // listenerWithRelocations tests diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java index bac1406885e2f..320d7902b810a 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java @@ -174,7 +174,10 @@ public boolean isTimedOut() { return timedOut; } - /** Resume info for relocation or empty if this is a final response. */ + /** + * Resume info for relocation. Only present if the task is not complete and needs to be relocated to a different node + * due to the current node being shut down, and a suitable destination node has been found. + * */ public Optional getTaskResumeInfo() { return Optional.ofNullable(resumeInfo); } From 8bcee2ce981666339f34a1d8d3e6829ba403e039 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Thu, 5 Mar 2026 15:07:43 -0500 Subject: [PATCH 12/12] fix listener --- .../org/elasticsearch/reindex/Reindexer.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 42cf314de5dd3..91b436025eb6d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -155,7 +155,13 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl // todo: move relocations to BulkByPaginatedSearchParallelizationHelper rather than having it in Reindexer, makes it generic // for update-by-query and delete-by-query - final ActionListener listenerWithRelocations = listenerWithRelocations(task, request, listener); + final ActionListener responseListener = wrapWithMetrics( + listenerWithRelocations(task, request, listener), + reindexMetrics, + task, + request, + startTime + ); Consumer workerAction = remoteVersion -> { ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task); @@ -170,26 +176,18 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl projectResolver.getProjectState(clusterService.state()), reindexSslConfig, request, - wrapWithMetrics(listenerWithRelocations, reindexMetrics, task, request, startTime), + responseListener, remoteVersion ); searchAction.start(); }; - final ActionListener responseListener = wrapWithMetrics( - listenerWithRelocations, - reindexMetrics, - task, - request, - startTime - ); - /** * If this is a request to reindex from remote, then we need to determine the remote version prior to execution * NB {@link ReindexRequest} forbids remote requests and slices > 1, so we're guaranteed to be running on the only slice */ if (featureService.clusterHasFeature(clusterService.state(), REINDEX_PIT_SEARCH_FEATURE) && request.getRemoteInfo() != null) { - lookupRemoteVersionAndExecute(task, request, listenerWithRelocations, workerAction); + lookupRemoteVersionAndExecute(task, request, responseListener, workerAction); } else { BulkByPaginatedSearchParallelizationHelper.executeSlicedAction( task,