diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java index 37c4e99c2a427..eb42f58401219 100644 --- a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java +++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexRelocationIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.reindex.ReindexMetrics; +import org.elasticsearch.reindex.ReindexMetrics.SlicingMode; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.rest.root.MainRestPlugin; @@ -53,6 +54,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -110,13 +112,47 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } - public void testLocalReindexRelocation() throws Exception { - final int slices = randomIntBetween(1, 10); + public void testNonSlicedLocalReindexRelocation() throws Exception { + final int slices = 1; + testReindexRelocation( + (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), + localReindexDescription(), + slices, + false, + randomIntBetween(1, 5) + ); + } + + public void testFixedSlicedLocalReindexRelocation() throws Exception { + final int slices = randomIntBetween(2, 5); + testReindexRelocation( + (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), + localReindexDescription(), + slices, + false, + randomIntBetween(1, 5) + ); + } + + public void testAutoSlicedLocalReindexRelocation() throws Exception { + final int slices = 0; + testReindexRelocation( + (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), + localReindexDescription(), + slices, + false, + randomIntBetween(2, 5) + ); + } + + public void testAutoNonSlicedLocalReindexRelocation() throws Exception { + final int slices = 0; testReindexRelocation( (nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices), localReindexDescription(), slices, - false + false, + 1 // no slicing if only 1 shard ); } @@ -128,7 +164,7 @@ public void testNonSlicedRemoteReindexRelocation() throws Exception { .publishAddress() .address(); return startAsyncNonSlicedThrottledRemoteReindexOnNode(nodeBName, nodeAAddress); - }, remoteReindexDescription(), slices, true); + }, remoteReindexDescription(), slices, true, randomIntBetween(1, 5)); } // no test for remote sliced reindex since it's not allowed @@ -136,7 +172,8 @@ private void testReindexRelocation( final CheckedBiFunction startReindexGivenNodeAAndB, final Matcher expectedDescription, final int slices, - final boolean isRemote + final boolean isRemote, + final int shards ) throws Exception { assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); @@ -150,17 +187,18 @@ private void testReindexRelocation( final String nodeBId = nodeIdByName(nodeBName); ensureStableCluster(2); - createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName); - createIndexPinnedToNodeName(DEST_INDEX, nodeAName); + createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName, shards); + createIndexPinnedToNodeName(DEST_INDEX, nodeAName, shards); indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest); ensureGreen(SOURCE_INDEX, DEST_INDEX); // Start throttled async reindex on nodeB and check it has the expected state final TaskId originalTaskId = startReindexGivenNodeAAndB.apply(nodeAName, nodeBName); - final TaskResult originalReindex = getRunningReindex(originalTaskId); - assertThat("reindex should start on nodeB", originalReindex.getTask().taskId().getNodeId(), equalTo(nodeBId)); - assertRunningReindexTaskExpectedState(originalReindex.getTask(), expectedDescription, slices); - + assertBusy(() -> { + final TaskResult originalReindex = getRunningReindex(originalTaskId); + assertThat("reindex should start on nodeB", originalReindex.getTask().taskId().getNodeId(), equalTo(nodeBId)); + assertRunningReindexTaskExpectedState(originalReindex.getTask(), expectedDescription, slices, shards); + }); shutdownNodeNameAndRelocate(nodeBName); // Assert the original task is in .tasks index and has expected content (including relocated taskId on nodeA) @@ -168,18 +206,21 @@ private void testReindexRelocation( originalTaskId, nodeAId, expectedDescription, - slices + slices, + shards ); // Assert relocated reindex is running and has expected state - final TaskResult relocatedReindex = getRunningReindex(relocatedTaskId); - assertThat("relocated reindex should be on nodeA", relocatedReindex.getTask().taskId().getNodeId(), equalTo(nodeAId)); - assertRunningReindexTaskExpectedState(relocatedReindex.getTask(), expectedDescription, slices); + assertBusy(() -> { + final TaskResult relocatedReindex = getRunningReindex(relocatedTaskId); + assertThat("relocated reindex should be on nodeA", relocatedReindex.getTask().taskId().getNodeId(), equalTo(nodeAId)); + assertRunningReindexTaskExpectedState(relocatedReindex.getTask(), expectedDescription, slices, shards); + }); // Speed up reindex post-relocation to keep the test fast unthrottleReindex(relocatedTaskId); - assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, slices); + assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, slices, shards); // Assert nodeA recorded success metrics for the relocated reindex assertReindexSuccessMetricsOnNode(nodeAName, isRemote, slices); @@ -210,7 +251,8 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( final TaskId originalTaskId, final String relocatedNodeId, final Matcher expectedTaskDescription, - final int slices + final int slices, + final int shards ) { assertThat("task completed", originalResult.isCompleted(), is(true)); @@ -243,14 +285,15 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( assertThat(taskStatus.get("reason_cancelled"), is(nullValue())); assertThat((Integer) taskStatus.get("throttled_until_millis"), greaterThanOrEqualTo(0)); - if (slices >= 2) { + if (isSliced(slices, shards)) { + final int expectedSlices = getExpectedSlices(slices, shards); @SuppressWarnings("unchecked") final List> sliceStatuses = (List>) taskStatus.get("slices"); - assertThat(sliceStatuses.size(), equalTo(slices)); - for (int i = 0; i < slices; i++) { + assertThat(sliceStatuses.size(), equalTo(expectedSlices)); + for (int i = 0; i < expectedSlices; i++) { final Map slice = sliceStatuses.get(i); assertThat(slice.get("slice_id"), is(i)); - assertThat((double) slice.get("requests_per_second"), closeTo((double) requestsPerSecond / slices, 0.00001)); + assertThat((double) slice.get("requests_per_second"), closeTo((double) requestsPerSecond / expectedSlices, 0.00001)); } } else { assertThat(taskStatus.containsKey("slices"), is(false)); @@ -266,8 +309,12 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( return new TaskId(relocatedTaskId); } - private void assertRelocatedTaskExpectedEndState(final TaskId taskId, final Matcher expectedTaskDescription, final int slices) - throws Exception { + private void assertRelocatedTaskExpectedEndState( + final TaskId taskId, + final Matcher expectedTaskDescription, + final int slices, + final int shards + ) throws Exception { final SetOnce finishedResult = new SetOnce<>(); assertBusy(() -> finishedResult.set(getCompletedTaskResult(taskId)), 30, TimeUnit.SECONDS); @@ -315,10 +362,11 @@ private void assertRelocatedTaskExpectedEndState(final TaskId taskId, final Matc assertThat(taskStatus.get("reason_cancelled"), is(nullValue())); assertThat((Integer) taskStatus.get("throttled_until_millis"), greaterThanOrEqualTo(0)); - if (slices >= 2) { + if (isSliced(slices, shards)) { + final int expectedSlices = getExpectedSlices(slices, shards); @SuppressWarnings("unchecked") final List> responseSlices = (List>) innerResponse.get("slices"); - assertThat(responseSlices.size(), equalTo(slices)); + assertThat(responseSlices.size(), equalTo(expectedSlices)); int totalCreated = 0; for (Map slice : responseSlices) { assertThat(slice.get("requests_per_second"), is(-1.0)); @@ -334,7 +382,8 @@ private TaskId assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( final TaskId taskId, final String relocatedNodeId, final Matcher expectedTaskDescription, - final int slices + final int slices, + final int shards ) { ensureYellowAndNoInitializingShards(TaskResultsService.TASK_INDEX); // replicas won't be allocated assertNoFailures(indicesAdmin().prepareRefresh(TaskResultsService.TASK_INDEX).get()); @@ -351,14 +400,21 @@ private TaskId assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId( throw new AssertionError("failed to parse task result from .tasks index", e); } - return assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId(result, taskId, relocatedNodeId, expectedTaskDescription, slices); + return assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId( + result, + taskId, + relocatedNodeId, + expectedTaskDescription, + slices, + shards + ); } private TaskId startAsyncThrottledLocalReindexOnNode(final String nodeName, final int slices) throws Exception { try (RestClient restClient = createRestClient(nodeName)) { final Request request = new Request("POST", "/_reindex"); request.addParameter("wait_for_completion", "false"); - request.addParameter("slices", Integer.toString(slices)); + request.addParameter("slices", slices == 0 ? "auto" : Integer.toString(slices)); request.addParameter("requests_per_second", Integer.toString(requestsPerSecond)); request.setJsonEntity(Strings.format(""" { @@ -425,7 +481,8 @@ private TaskResult getRunningReindex(final TaskId taskId) { private void assertRunningReindexTaskExpectedState( final TaskInfo taskInfo, final Matcher expectedTaskDescription, - final int slices + final int slices, + final int shards ) { assertThat(taskInfo.action(), equalTo(ReindexAction.NAME)); assertThat(taskInfo.description(), is(expectedTaskDescription)); @@ -445,18 +502,33 @@ private void assertRunningReindexTaskExpectedState( assertThat(taskStatus.getSearchRetries(), is(0L)); assertThat(taskStatus.getThrottled(), greaterThanOrEqualTo(TimeValue.ZERO)); // sliced leader only reports on completed slices, so the status is completely empty until some slices complete - assertThat(taskStatus.getRequestsPerSecond(), equalTo(slices >= 2 ? 0.0f : requestsPerSecond)); + assertThat(taskStatus.getRequestsPerSecond(), equalTo(isSliced(slices, shards) ? 0.0f : requestsPerSecond)); assertThat(taskStatus.getReasonCancelled(), is(nullValue())); assertThat(taskStatus.getThrottledUntil(), greaterThanOrEqualTo(TimeValue.ZERO)); - if (slices >= 2) { - final List expectedStatuses = Collections.nCopies(slices, null); + if (isSliced(slices, shards)) { + final int expectedSlices = getExpectedSlices(slices, shards); + final List expectedStatuses = Collections.nCopies(expectedSlices, null); assertThat("running slices statuses are null", taskStatus.getSliceStatuses(), equalTo(expectedStatuses)); } else { assertThat(taskStatus.getSliceStatuses().isEmpty(), is(true)); } } + private boolean isSliced(int slices, int shards) { + return slices > 1 || (slices == 0 && shards > 1); + } + + private int getExpectedSlices(int slices, int shards) { + if (slices > 1) { + return slices; + } else if (slices == 0) { + return Math.max(shards, 1); + } else { + return 1; + } + } + private TaskResult getCompletedTaskResult(final TaskId taskId) { final GetTaskResponse response = clusterAdmin().prepareGetTask(taskId).setWaitForCompletion(true).get(); final TaskResult task = response.getTask(); @@ -465,10 +537,10 @@ private TaskResult getCompletedTaskResult(final TaskId taskId) { return task; } - private void createIndexPinnedToNodeName(final String index, final String nodeName) { + private void createIndexPinnedToNodeName(final String index, final String nodeName, final int shards) { prepareCreate(index).setSettings( Settings.builder() - .put("index.number_of_shards", randomIntBetween(1, 3)) + .put("index.number_of_shards", shards) .put("index.number_of_replicas", 0) .put("index.routing.allocation.require._name", nodeName) ).get(); @@ -516,14 +588,24 @@ private void assertReindexSuccessMetricsOnNode(final String nodeName, final bool final TestTelemetryPlugin plugin = getTelemetryPlugin(nodeName); plugin.collect(); final List completions = plugin.getLongCounterMeasurement(ReindexMetrics.REINDEX_COMPLETION_COUNTER); - assertThat(completions.size(), equalTo(slices)); - for (final Measurement completion : completions) { - assertNull(completion.attributes().get(ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE)); - final String expectedSource = isRemote - ? ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE - : ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; - assertThat(completion.attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SOURCE), equalTo(expectedSource)); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE)); + final String expectedSource = isRemote ? ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE : ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; + assertThat(completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SOURCE), equalTo(expectedSource)); + SlicingMode slicingMode = null; + if (slices == 0) { + slicingMode = SlicingMode.AUTO; + } else if (slices == 1) { + slicingMode = SlicingMode.NONE; + } else if (slices > 1) { + slicingMode = SlicingMode.FIXED; + } else { + fail("invalid slices value: " + slices); } + assertThat( + completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE), + equalTo(slicingMode.name().toLowerCase(Locale.ROOT)) + ); } private void assertExpectedNumberOfDocumentsInDestinationIndex() throws IOException { diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java index dcb4c92a0d4c9..653afed6f096b 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.rest.root.MainRestPlugin; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; @@ -33,6 +34,7 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; @@ -115,6 +117,7 @@ public void testReindexFromRemoteMetrics() throws Exception { assertThat(completions.size(), equalTo(1)); assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name())); assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); }); // now create the source index @@ -131,6 +134,7 @@ public void testReindexFromRemoteMetrics() throws Exception { assertThat(completions.size(), equalTo(2)); assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); }); } @@ -189,6 +193,7 @@ public void testReindexMetrics() throws Exception { testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).forEach(m -> { assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + assertThat(m.attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); }); }); } @@ -211,13 +216,116 @@ public void testReindexMetricsWithBulkFailure() throws Exception { testTelemetryPlugin.collect(); assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(1)); + Measurement completion = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).getFirst(); assertThat( - testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER) - .getFirst() - .attributes() - .get(ATTRIBUTE_NAME_ERROR_TYPE), + completion.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo("org.elasticsearch.index.mapper.DocumentParsingException") ); + assertThat(completion.attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("none")); + }); + } + + public void testReindexMetricsWithFixedSlices() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + indexRandom( + true, + prepareIndex("source").setId("1").setSource("foo", "a"), + prepareIndex("source").setId("2").setSource("foo", "b"), + prepareIndex("source").setId("3").setSource("foo", "c"), + prepareIndex("source").setId("4").setSource("foo", "d") + ); + assertHitCount(prepareSearch("source").setSize(0), 4); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + reindex().source("source").destination("dest_fixed").setSlices(2).get(); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + List histograms = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); + assertThat(histograms.size(), equalTo(1)); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("fixed")); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + + List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("fixed")); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + }); + } + + public void testReindexMetricsWithManualSlices() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + indexRandom( + true, + prepareIndex("source").setId("1").setSource("foo", "a"), + prepareIndex("source").setId("2").setSource("foo", "b"), + prepareIndex("source").setId("3").setSource("foo", "c"), + prepareIndex("source").setId("4").setSource("foo", "d") + ); + assertHitCount(prepareSearch("source").setSize(0), 4); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + ReindexRequestBuilder request = reindex().source("source").destination("dest_manual"); + request.source().slice(new SliceBuilder(0, 2)); + request.get(); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + List histograms = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); + assertThat(histograms.size(), equalTo(1)); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("manual")); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + + List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("manual")); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + }); + } + + public void testReindexMetricsWithAutoSlices() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + indexRandom( + true, + prepareIndex("source").setId("1").setSource("foo", "a"), + prepareIndex("source").setId("2").setSource("foo", "b"), + prepareIndex("source").setId("3").setSource("foo", "c"), + prepareIndex("source").setId("4").setSource("foo", "d") + ); + assertHitCount(prepareSearch("source").setSize(0), 4); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + reindex().source("source").destination("dest_auto").setSlices(AbstractBulkByScrollRequest.AUTO_SLICES).get(); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + List histograms = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); + assertThat(histograms.size(), equalTo(1)); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("auto")); + assertThat(histograms.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + + List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER); + assertThat(completions.size(), equalTo(1)); + assertNull(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE), equalTo("auto")); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); }); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index cd934614e6db3..17ad0a0af68a1 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -10,11 +10,14 @@ package org.elasticsearch.reindex; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.telemetry.metric.LongCounter; import org.elasticsearch.telemetry.metric.LongHistogram; import org.elasticsearch.telemetry.metric.MeterRegistry; import java.util.HashMap; +import java.util.Locale; import java.util.Map; public class ReindexMetrics { @@ -29,6 +32,8 @@ public class ReindexMetrics { public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local"; public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote"; + public static final String ATTRIBUTE_NAME_SLICING_MODE = "es_reindex_slicing_mode"; + private final LongHistogram reindexTimeSecsHistogram; private final LongCounter reindexCompletionCounter; @@ -41,23 +46,23 @@ public ReindexMetrics(MeterRegistry meterRegistry) { ); } - public long recordTookTime(long tookTime, boolean remote) { - Map attributes = getAttributes(remote); + public long recordTookTime(long tookTime, boolean remote, SlicingMode slicingMode) { + Map attributes = getAttributes(remote, slicingMode); reindexTimeSecsHistogram.record(tookTime, attributes); return tookTime; } - public void recordSuccess(boolean remote) { - Map attributes = getAttributes(remote); + public void recordSuccess(boolean remote, SlicingMode slicingMode) { + Map attributes = getAttributes(remote, slicingMode); // attribute ATTRIBUTE_ERROR_TYPE being absent indicates success assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes"; reindexCompletionCounter.incrementBy(1, attributes); } - public void recordFailure(boolean remote, Throwable e) { - Map attributes = getAttributes(remote); + public void recordFailure(boolean remote, SlicingMode slicingMode, Throwable e) { + Map attributes = getAttributes(remote, slicingMode); // best effort to extract useful error type if possible String errorType; if (e instanceof ElasticsearchStatusException ese) { @@ -74,9 +79,37 @@ public void recordFailure(boolean remote, Throwable e) { reindexCompletionCounter.incrementBy(1, attributes); } - private Map getAttributes(boolean remote) { + public enum SlicingMode { + // slices resolved automatically from source index shard count (e.g. ?slices=auto) + AUTO, + // reindex request specifies a fixed slice count (e.g. ?slices=4) + FIXED, + // reindex request specifies a slice id (e.g. "slice": { "id": 0, "max": 4 }) + MANUAL, + // no slicing (e.g. ?slices=1) + NONE + } + + /** + * Determines the {@link SlicingMode} from a reindex request. + */ + public static SlicingMode resolveSlicingMode(ReindexRequest request) { + if (request.getSearchRequest().source().slice() != null) { + return SlicingMode.MANUAL; + } + int slices = request.getSlices(); + if (slices == AbstractBulkByScrollRequest.AUTO_SLICES) { + return SlicingMode.AUTO; + } else if (slices > 1) { + return SlicingMode.FIXED; + } + return SlicingMode.NONE; + } + + private Map getAttributes(boolean remote, SlicingMode slicingMode) { Map attributes = new HashMap<>(); attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL); + attributes.put(ATTRIBUTE_NAME_SLICING_MODE, slicingMode.name().toLowerCase(Locale.ROOT)); return attributes; } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 67bc2a495b4da..91b436025eb6d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -155,7 +155,13 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl // todo: move relocations to BulkByPaginatedSearchParallelizationHelper rather than having it in Reindexer, makes it generic // for update-by-query and delete-by-query - final ActionListener listenerWithRelocations = listenerWithRelocations(task, request, listener); + final ActionListener responseListener = wrapWithMetrics( + listenerWithRelocations(task, request, listener), + reindexMetrics, + task, + request, + startTime + ); Consumer workerAction = remoteVersion -> { ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task); @@ -170,7 +176,7 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl projectResolver.getProjectState(clusterService.state()), reindexSslConfig, request, - workerListenerWithRelocationAndMetrics(listenerWithRelocations, startTime, request.getRemoteInfo() != null), + responseListener, remoteVersion ); searchAction.start(); @@ -181,13 +187,13 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl * NB {@link ReindexRequest} forbids remote requests and slices > 1, so we're guaranteed to be running on the only slice */ if (featureService.clusterHasFeature(clusterService.state(), REINDEX_PIT_SEARCH_FEATURE) && request.getRemoteInfo() != null) { - lookupRemoteVersionAndExecute(task, request, listenerWithRelocations, workerAction); + lookupRemoteVersionAndExecute(task, request, responseListener, workerAction); } else { BulkByPaginatedSearchParallelizationHelper.executeSlicedAction( task, request, ReindexAction.INSTANCE, - listenerWithRelocations, + responseListener, client, clusterService.localNode(), null, @@ -204,7 +210,7 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl private void lookupRemoteVersionAndExecute( BulkByScrollTask task, ReindexRequest request, - ActionListener listenerWithRelocations, + ActionListener listener, Consumer workerAction ) { RemoteInfo remoteInfo = request.getRemoteInfo(); @@ -219,7 +225,7 @@ public void onResponse(Version version) { task, request, ReindexAction.INSTANCE, - listenerWithRelocations, + listener, client, clusterService.localNode(), version, @@ -230,12 +236,12 @@ public void onResponse(Version version) { @Override public void onFailure(Exception e) { - closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e)); + closeRestClientAndRun(restClient, () -> listener.onFailure(e)); } @Override public void onRejection(Exception e) { - closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e)); + closeRestClientAndRun(restClient, () -> listener.onFailure(e)); } }; RemoteReindexingUtils.lookupRemoteVersionWithRetries( @@ -264,80 +270,77 @@ private void closeRestClientAndRun(RestClient restClient, Runnable onCompletion) }); } - /** Wraps the listener with metrics tracking and relocation handling (if applicable). Visible for testing. */ - ActionListener workerListenerWithRelocationAndMetrics( - ActionListener potentiallyWrappedRelocationListener, - long startTime, - boolean isRemote - ) { - final ActionListener metricListener = wrapWithMetrics( - potentiallyWrappedRelocationListener, - reindexMetrics, - startTime, - isRemote - ); - - return metricListener.delegateFailure((l, resp) -> { - // note: implicitly relies on TaskResumeInfo only being populated if a suitable node exists for relocating to. - final boolean willBeRelocatedThereforeDoNotRecordMetrics = resp.getTaskResumeInfo().isPresent(); - if (willBeRelocatedThereforeDoNotRecordMetrics) { - assert resp.getBulkFailures().isEmpty() : "bulk failures should be empty if relocating"; - assert resp.getSearchFailures().isEmpty() : "search failures should be empty if relocating"; - potentiallyWrappedRelocationListener.onResponse(resp); - return; - } - l.onResponse(resp); - }); - } - - // Visible for testing + /** + * Wrap listener with reindex metrics. For sliced reindex, this should record once only when all slices complete + * Visible for testing + */ static ActionListener wrapWithMetrics( ActionListener listener, @Nullable ReindexMetrics metrics, - long startTime, - boolean isRemote + BulkByScrollTask task, + ReindexRequest request, + long startTime ) { if (metrics == null) { return listener; } + if (task.isWorker() && task.getParentTaskId().isSet()) { + // Do not record metrics for slice worker, only leader will record them when all slices are completed + // Potentially add slice-level metrics for slice worker + return listener; + } + final boolean isRemote = request.getRemoteInfo() != null; + final ReindexMetrics.SlicingMode slicingMode = ReindexMetrics.resolveSlicingMode(request); // todo(szy): add relocation metrics - // add completion metrics - var withCompletionMetrics = new ActionListener() { + return new ActionListener<>() { + private void recordDuration() { + long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); + metrics.recordTookTime(elapsedTime, isRemote, slicingMode); + } + @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures()) - .stream() - .flatMap(List::stream) - .map(PaginatedHitSource.SearchFailure::getReason) - .findFirst(); - var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures()) - .stream() - .flatMap(List::stream) - .map(BulkItemResponse.Failure::getCause) - .findFirst(); - if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) { - // record only the first sample error in metric - Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get); - metrics.recordFailure(isRemote, e); + if (bulkByScrollResponse.getTaskResumeInfo().isPresent()) { + // Task will be relocated to a different node + // Do not record metrics on the source node, the destination node will record metrics when the relocated task completes + assert bulkByScrollResponse.getBulkFailures().isEmpty() : "bulk failures should be empty if relocating"; + assert bulkByScrollResponse.getSearchFailures().isEmpty() : "search failures should be empty if relocating"; listener.onResponse(bulkByScrollResponse); - } else { - metrics.recordSuccess(isRemote); + return; + } + try { + var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures()) + .stream() + .flatMap(List::stream) + .map(PaginatedHitSource.SearchFailure::getReason) + .findFirst(); + var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures()) + .stream() + .flatMap(List::stream) + .map(BulkItemResponse.Failure::getCause) + .findFirst(); + if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) { + Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get); + metrics.recordFailure(isRemote, slicingMode, e); + } else { + metrics.recordSuccess(isRemote, slicingMode); + } listener.onResponse(bulkByScrollResponse); + } finally { + recordDuration(); } } @Override public void onFailure(Exception e) { - metrics.recordFailure(isRemote, e); - listener.onFailure(e); + try { + metrics.recordFailure(isRemote, slicingMode, e); + listener.onFailure(e); + } finally { + recordDuration(); + } } }; - - // add duration metric - return ActionListener.runAfter(withCompletionMetrics, () -> { - long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); - metrics.recordTookTime(elapsedTime, isRemote); - }); } /** Wraps the listener with relocation handling (if applicable). Visible for testing. */ diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index 9da6b8b3166bc..25d5aac8e7bce 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -10,7 +10,11 @@ package org.elasticsearch.reindex; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.RecordingMeterRegistry; @@ -18,13 +22,16 @@ import org.junit.Before; import java.util.List; +import java.util.Locale; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_COUNTER; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.SlicingMode; public class ReindexMetricsTests extends ESTestCase { @@ -40,17 +47,16 @@ public void createMetrics() { public void testRecordTookTime() { long secondsTaken = randomLongBetween(1, Long.MAX_VALUE); - // first metric - metrics.recordTookTime(secondsTaken, false); + metrics.recordTookTime(secondsTaken, false, SlicingMode.NONE); List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); assertEquals(1, measurements.size()); assertEquals(secondsTaken, measurements.getFirst().getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals("none", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); - // second metric long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE); - metrics.recordTookTime(remoteSecondsTaken, true); + metrics.recordTookTime(remoteSecondsTaken, true, SlicingMode.AUTO); measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); assertEquals(2, measurements.size()); @@ -58,40 +64,43 @@ public void testRecordTookTime() { assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); assertEquals(remoteSecondsTaken, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals("auto", measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); } public void testRecordSuccess() { - // first metric - metrics.recordSuccess(false); + metrics.recordSuccess(false, SlicingMode.NONE); List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(1, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals( + SlicingMode.NONE.name().toLowerCase(Locale.ROOT), + measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE) + ); assertNull(measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); - // second metric - metrics.recordSuccess(true); + metrics.recordSuccess(true, SlicingMode.FIXED); measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(2, measurements.size()); assertEquals(1, measurements.get(1).getLong()); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals("fixed", measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); assertNull(measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); } public void testRecordFailure() { - // first metric - metrics.recordFailure(false, new IllegalArgumentException("random failure")); + metrics.recordFailure(false, SlicingMode.NONE, new IllegalArgumentException("random failure")); List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(1, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); assertEquals("java.lang.IllegalArgumentException", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals("none", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); - // second metric - metrics.recordFailure(true, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); + metrics.recordFailure(true, SlicingMode.AUTO, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, REINDEX_COMPLETION_COUNTER); assertEquals(2, measurements.size()); @@ -99,5 +108,35 @@ public void testRecordFailure() { assertEquals(1, measurements.get(1).getLong()); assertEquals(RestStatus.BAD_REQUEST.name(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals("auto", measurements.get(1).attributes().get(ATTRIBUTE_NAME_SLICING_MODE)); + } + + public void testResolveSlicingModeNone() { + ReindexRequest request = new ReindexRequest(); + assertEquals(SlicingMode.NONE, ReindexMetrics.resolveSlicingMode(request)); + } + + public void testResolveSlicingModeNoneOneSlice() { + ReindexRequest request = new ReindexRequest(); + request.setSlices(1); + assertEquals(SlicingMode.NONE, ReindexMetrics.resolveSlicingMode(request)); + } + + public void testResolveSlicingModeFixed() { + ReindexRequest request = new ReindexRequest(); + request.setSlices(randomIntBetween(2, 20)); + assertEquals(SlicingMode.FIXED, ReindexMetrics.resolveSlicingMode(request)); + } + + public void testResolveSlicingModeAuto() { + ReindexRequest request = new ReindexRequest(); + request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + assertEquals(SlicingMode.AUTO, ReindexMetrics.resolveSlicingMode(request)); + } + + public void testResolveSlicingModeManual() { + ReindexRequest request = new ReindexRequest(); + request.getSearchRequest().source(new SearchSourceBuilder().slice(new SliceBuilder(0, 3))); + assertEquals(SlicingMode.MANUAL, ReindexMetrics.resolveSlicingMode(request)); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index 627501bb42988..3fd64ba3ed12f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -60,38 +60,43 @@ public class ReindexerTests extends ESTestCase { + // --- wrapWithMetrics tests --- + public void testWrapWithMetricsSuccess() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); wrapped.onResponse(response); verify(listener).onResponse(response); - verify(metrics).recordSuccess(true); - verify(metrics, never()).recordFailure(anyBoolean(), any()); - verify(metrics).recordTookTime(anyLong(), eq(true)); + verify(metrics).recordSuccess(eq(false), any()); + verify(metrics, never()).recordFailure(anyBoolean(), any(), any()); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); } public void testWrapWithMetricsFailure() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); Exception exception = new Exception("random failure"); wrapped.onFailure(exception); verify(listener).onFailure(exception); - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics).recordFailure(true, exception); - verify(metrics).recordTookTime(anyLong(), eq(true)); + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics).recordFailure(eq(false), any(), eq(exception)); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); } public void testWrapWithMetricsBulkFailure() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), false); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); Exception exception = new Exception("random failure"); Exception anotherException = new Exception("another failure"); @@ -102,15 +107,16 @@ public void testWrapWithMetricsBulkFailure() { wrapped.onResponse(response); verify(listener).onResponse(response); - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics).recordFailure(false, exception); - verify(metrics).recordTookTime(anyLong(), eq(false)); + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics).recordFailure(eq(false), any(), eq(exception)); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); } public void testWrapWithMetricsSearchFailure() { ReindexMetrics metrics = mock(); ActionListener listener = spy(ActionListener.noop()); - var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + BulkByScrollTask task = createNonSlicedWorkerTask(); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); Exception exception = new Exception("random failure"); Exception anotherException = new Exception("another failure"); @@ -121,9 +127,50 @@ public void testWrapWithMetricsSearchFailure() { wrapped.onResponse(response); verify(listener).onResponse(response); - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics).recordFailure(true, exception); - verify(metrics).recordTookTime(anyLong(), eq(true)); + verify(metrics, never()).recordSuccess(anyBoolean(), any()); + verify(metrics).recordFailure(eq(false), any(), eq(exception)); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); + } + + public void testWrapWithMetricsSkipsSliceWorker() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + BulkByScrollTask task = createSliceWorkerTask(); + + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); + + assertSame(listener, wrapped); + verifyNoMoreInteractions(metrics); + } + + public void testWrapWithMetricsWrapsLeader() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + BulkByScrollTask task = createLeaderTask(); + + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); + + assertNotSame(listener, wrapped); + + BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); + wrapped.onResponse(response); + + verify(metrics).recordSuccess(eq(false), any()); + verify(metrics).recordTookTime(anyLong(), eq(false), any()); + } + + public void testWrapWithMetricsSkipsMetricsWhenRelocating() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + BulkByScrollTask task = createNonSlicedWorkerTask(); + + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong()); + + BulkByScrollResponse response = reindexResponseWithResumeInfo(); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verifyNoMoreInteractions(metrics); } // listenerWithRelocations tests @@ -236,47 +283,6 @@ public void testListenerWithRelocationsTriggersRelocationWhenResumeInfoPresent() assertThat(exception.getMetadata("es.relocated_task_id"), equalTo(List.of("target-node:123"))); } - // --- workerListenerWithRelocationAndMetrics tests --- - - public void testWorkerListenerSkipsMetricsWhenRelocating() { - assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); - final ReindexMetrics metrics = mock(); - final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); - final ActionListener outer = spy(ActionListener.noop()); - - final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, randomNonNegativeLong(), randomBoolean()); - - final BulkByScrollResponse response = reindexResponseWithResumeInfo(); - wrapped.onResponse(response); - - // metrics should NOT be recorded for a relocation response - verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics, never()).recordFailure(anyBoolean(), any()); - verify(metrics, never()).recordTookTime(anyLong(), anyBoolean()); - // outer listener should still receive the response - verify(outer).onResponse(response); - - verifyNoMoreInteractions(metrics, outer); - } - - public void testWorkerListenerRecordsMetricsForNormalResponse() { - assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED); - final ReindexMetrics metrics = mock(); - final Reindexer reindexer = reindexerWithRelocationAndMetrics(metrics); - final ActionListener outer = spy(ActionListener.noop()); - - final var wrapped = reindexer.workerListenerWithRelocationAndMetrics(outer, randomNonNegativeLong(), true); - - final BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null); - wrapped.onResponse(response); - - verify(outer).onResponse(response); - verify(metrics).recordSuccess(true); - verify(metrics).recordTookTime(anyLong(), eq(true)); - - verifyNoMoreInteractions(metrics, outer); - } - // --- helpers --- private BulkByScrollResponse reindexResponseWithBulkAndSearchFailures( @@ -309,6 +315,48 @@ private BulkByScrollResponse reindexResponseWithResumeInfo() { ); } + private static BulkByScrollTask createNonSlicedWorkerTask() { + BulkByScrollTask task = new BulkByScrollTask( + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + TaskId.EMPTY_TASK_ID, + Map.of(), + randomBoolean() + ); + task.setWorker(Float.POSITIVE_INFINITY, null); + return task; + } + + private static BulkByScrollTask createSliceWorkerTask() { + BulkByScrollTask task = new BulkByScrollTask( + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + new TaskId("node", 1), + Map.of(), + randomBoolean() + ); + task.setWorker(randomFloat(), 0); + return task; + } + + private static BulkByScrollTask createLeaderTask() { + BulkByScrollTask task = new BulkByScrollTask( + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + TaskId.EMPTY_TASK_ID, + Map.of(), + randomBoolean() + ); + task.setWorkerCount(randomIntBetween(2, 10)); + return task; + } + private static BulkByScrollTask createTaskWithParentIdAndRelocationEnabled(final TaskId parentTaskId) { return new BulkByScrollTask(987, "test_type", "test_action", "test", parentTaskId, Collections.emptyMap(), true); } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java index bac1406885e2f..320d7902b810a 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java @@ -174,7 +174,10 @@ public boolean isTimedOut() { return timedOut; } - /** Resume info for relocation or empty if this is a final response. */ + /** + * Resume info for relocation. Only present if the task is not complete and needs to be relocated to a different node + * due to the current node being shut down, and a suitable destination node has been found. + * */ public Optional getTaskResumeInfo() { return Optional.ofNullable(resumeInfo); }