Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.reindex.ReindexMetrics;
import org.elasticsearch.reindex.ReindexMetrics.SlicingMode;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.reindex.TransportReindexAction;
import org.elasticsearch.rest.root.MainRestPlugin;
Expand All @@ -53,6 +54,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -110,13 +112,47 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.build();
}

public void testLocalReindexRelocation() throws Exception {
final int slices = randomIntBetween(1, 10);
public void testNonSlicedLocalReindexRelocation() throws Exception {
final int slices = 1;
testReindexRelocation(
(nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices),
localReindexDescription(),
slices,
false,
randomIntBetween(1, 5)
);
}

public void testFixedSlicedLocalReindexRelocation() throws Exception {
final int slices = randomIntBetween(2, 5);
testReindexRelocation(
(nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices),
localReindexDescription(),
slices,
false,
randomIntBetween(1, 5)
);
}

public void testAutoSlicedLocalReindexRelocation() throws Exception {
final int slices = 0;
testReindexRelocation(
(nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices),
localReindexDescription(),
slices,
false,
randomIntBetween(2, 5)
);
}

public void testAutoNonSlicedLocalReindexRelocation() throws Exception {
final int slices = 0;
testReindexRelocation(
(nodeAName, nodeBName) -> startAsyncThrottledLocalReindexOnNode(nodeBName, slices),
localReindexDescription(),
slices,
false
false,
1 // no slicing if only 1 shard
);
}

Expand All @@ -128,15 +164,16 @@ public void testNonSlicedRemoteReindexRelocation() throws Exception {
.publishAddress()
.address();
return startAsyncNonSlicedThrottledRemoteReindexOnNode(nodeBName, nodeAAddress);
}, remoteReindexDescription(), slices, true);
}, remoteReindexDescription(), slices, true, randomIntBetween(1, 5));
}
// no test for remote sliced reindex since it's not allowed

private void testReindexRelocation(
final CheckedBiFunction<String, String, TaskId, Exception> startReindexGivenNodeAAndB,
final Matcher<String> expectedDescription,
final int slices,
final boolean isRemote
final boolean isRemote,
final int shards
) throws Exception {
assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED);

Expand All @@ -150,36 +187,40 @@ private void testReindexRelocation(
final String nodeBId = nodeIdByName(nodeBName);
ensureStableCluster(2);

createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName);
createIndexPinnedToNodeName(DEST_INDEX, nodeAName);
createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName, shards);
createIndexPinnedToNodeName(DEST_INDEX, nodeAName, shards);
indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest);
ensureGreen(SOURCE_INDEX, DEST_INDEX);

// Start throttled async reindex on nodeB and check it has the expected state
final TaskId originalTaskId = startReindexGivenNodeAAndB.apply(nodeAName, nodeBName);
final TaskResult originalReindex = getRunningReindex(originalTaskId);
assertThat("reindex should start on nodeB", originalReindex.getTask().taskId().getNodeId(), equalTo(nodeBId));
assertRunningReindexTaskExpectedState(originalReindex.getTask(), expectedDescription, slices);

assertBusy(() -> {
final TaskResult originalReindex = getRunningReindex(originalTaskId);
assertThat("reindex should start on nodeB", originalReindex.getTask().taskId().getNodeId(), equalTo(nodeBId));
assertRunningReindexTaskExpectedState(originalReindex.getTask(), expectedDescription, slices, shards);
});
shutdownNodeNameAndRelocate(nodeBName);

// Assert the original task is in .tasks index and has expected content (including relocated taskId on nodeA)
final TaskId relocatedTaskId = assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId(
originalTaskId,
nodeAId,
expectedDescription,
slices
slices,
shards
);

// Assert relocated reindex is running and has expected state
final TaskResult relocatedReindex = getRunningReindex(relocatedTaskId);
assertThat("relocated reindex should be on nodeA", relocatedReindex.getTask().taskId().getNodeId(), equalTo(nodeAId));
assertRunningReindexTaskExpectedState(relocatedReindex.getTask(), expectedDescription, slices);
assertBusy(() -> {
final TaskResult relocatedReindex = getRunningReindex(relocatedTaskId);
assertThat("relocated reindex should be on nodeA", relocatedReindex.getTask().taskId().getNodeId(), equalTo(nodeAId));
assertRunningReindexTaskExpectedState(relocatedReindex.getTask(), expectedDescription, slices, shards);
});

// Speed up reindex post-relocation to keep the test fast
unthrottleReindex(relocatedTaskId);

assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, slices);
assertRelocatedTaskExpectedEndState(relocatedTaskId, expectedDescription, slices, shards);

// Assert nodeA recorded success metrics for the relocated reindex
assertReindexSuccessMetricsOnNode(nodeAName, isRemote, slices);
Expand Down Expand Up @@ -210,7 +251,8 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId(
final TaskId originalTaskId,
final String relocatedNodeId,
final Matcher<String> expectedTaskDescription,
final int slices
final int slices,
final int shards
) {
assertThat("task completed", originalResult.isCompleted(), is(true));

Expand Down Expand Up @@ -243,14 +285,15 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId(
assertThat(taskStatus.get("reason_cancelled"), is(nullValue()));
assertThat((Integer) taskStatus.get("throttled_until_millis"), greaterThanOrEqualTo(0));

if (slices >= 2) {
if (isSliced(slices, shards)) {
final int expectedSlices = getExpectedSlices(slices, shards);
@SuppressWarnings("unchecked")
final List<Map<String, Object>> sliceStatuses = (List<Map<String, Object>>) taskStatus.get("slices");
assertThat(sliceStatuses.size(), equalTo(slices));
for (int i = 0; i < slices; i++) {
assertThat(sliceStatuses.size(), equalTo(expectedSlices));
for (int i = 0; i < expectedSlices; i++) {
final Map<String, Object> slice = sliceStatuses.get(i);
assertThat(slice.get("slice_id"), is(i));
assertThat((double) slice.get("requests_per_second"), closeTo((double) requestsPerSecond / slices, 0.00001));
assertThat((double) slice.get("requests_per_second"), closeTo((double) requestsPerSecond / expectedSlices, 0.00001));
}
} else {
assertThat(taskStatus.containsKey("slices"), is(false));
Expand All @@ -266,8 +309,12 @@ private TaskId assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId(
return new TaskId(relocatedTaskId);
}

private void assertRelocatedTaskExpectedEndState(final TaskId taskId, final Matcher<String> expectedTaskDescription, final int slices)
throws Exception {
private void assertRelocatedTaskExpectedEndState(
final TaskId taskId,
final Matcher<String> expectedTaskDescription,
final int slices,
final int shards
) throws Exception {
final SetOnce<TaskResult> finishedResult = new SetOnce<>();

assertBusy(() -> finishedResult.set(getCompletedTaskResult(taskId)), 30, TimeUnit.SECONDS);
Expand Down Expand Up @@ -315,10 +362,11 @@ private void assertRelocatedTaskExpectedEndState(final TaskId taskId, final Matc
assertThat(taskStatus.get("reason_cancelled"), is(nullValue()));
assertThat((Integer) taskStatus.get("throttled_until_millis"), greaterThanOrEqualTo(0));

if (slices >= 2) {
if (isSliced(slices, shards)) {
final int expectedSlices = getExpectedSlices(slices, shards);
@SuppressWarnings("unchecked")
final List<Map<String, Object>> responseSlices = (List<Map<String, Object>>) innerResponse.get("slices");
assertThat(responseSlices.size(), equalTo(slices));
assertThat(responseSlices.size(), equalTo(expectedSlices));
int totalCreated = 0;
for (Map<String, Object> slice : responseSlices) {
assertThat(slice.get("requests_per_second"), is(-1.0));
Expand All @@ -334,7 +382,8 @@ private TaskId assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId(
final TaskId taskId,
final String relocatedNodeId,
final Matcher<String> expectedTaskDescription,
final int slices
final int slices,
final int shards
) {
ensureYellowAndNoInitializingShards(TaskResultsService.TASK_INDEX); // replicas won't be allocated
assertNoFailures(indicesAdmin().prepareRefresh(TaskResultsService.TASK_INDEX).get());
Expand All @@ -351,14 +400,21 @@ private TaskId assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId(
throw new AssertionError("failed to parse task result from .tasks index", e);
}

return assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId(result, taskId, relocatedNodeId, expectedTaskDescription, slices);
return assertOriginalTaskExpectedEndStateAndGetRelocatedTaskId(
result,
taskId,
relocatedNodeId,
expectedTaskDescription,
slices,
shards
);
}

private TaskId startAsyncThrottledLocalReindexOnNode(final String nodeName, final int slices) throws Exception {
try (RestClient restClient = createRestClient(nodeName)) {
final Request request = new Request("POST", "/_reindex");
request.addParameter("wait_for_completion", "false");
request.addParameter("slices", Integer.toString(slices));
request.addParameter("slices", slices == 0 ? "auto" : Integer.toString(slices));
request.addParameter("requests_per_second", Integer.toString(requestsPerSecond));
request.setJsonEntity(Strings.format("""
{
Expand Down Expand Up @@ -425,7 +481,8 @@ private TaskResult getRunningReindex(final TaskId taskId) {
private void assertRunningReindexTaskExpectedState(
final TaskInfo taskInfo,
final Matcher<String> expectedTaskDescription,
final int slices
final int slices,
final int shards
) {
assertThat(taskInfo.action(), equalTo(ReindexAction.NAME));
assertThat(taskInfo.description(), is(expectedTaskDescription));
Expand All @@ -445,18 +502,33 @@ private void assertRunningReindexTaskExpectedState(
assertThat(taskStatus.getSearchRetries(), is(0L));
assertThat(taskStatus.getThrottled(), greaterThanOrEqualTo(TimeValue.ZERO));
// sliced leader only reports on completed slices, so the status is completely empty until some slices complete
assertThat(taskStatus.getRequestsPerSecond(), equalTo(slices >= 2 ? 0.0f : requestsPerSecond));
assertThat(taskStatus.getRequestsPerSecond(), equalTo(isSliced(slices, shards) ? 0.0f : requestsPerSecond));
assertThat(taskStatus.getReasonCancelled(), is(nullValue()));
assertThat(taskStatus.getThrottledUntil(), greaterThanOrEqualTo(TimeValue.ZERO));

if (slices >= 2) {
final List<BulkByScrollTask.StatusOrException> expectedStatuses = Collections.nCopies(slices, null);
if (isSliced(slices, shards)) {
final int expectedSlices = getExpectedSlices(slices, shards);
final List<BulkByScrollTask.StatusOrException> expectedStatuses = Collections.nCopies(expectedSlices, null);
assertThat("running slices statuses are null", taskStatus.getSliceStatuses(), equalTo(expectedStatuses));
} else {
assertThat(taskStatus.getSliceStatuses().isEmpty(), is(true));
}
}

private boolean isSliced(int slices, int shards) {
return slices > 1 || (slices == 0 && shards > 1);
}

private int getExpectedSlices(int slices, int shards) {
if (slices > 1) {
return slices;
} else if (slices == 0) {
return Math.max(shards, 1);
} else {
return 1;
}
}

private TaskResult getCompletedTaskResult(final TaskId taskId) {
final GetTaskResponse response = clusterAdmin().prepareGetTask(taskId).setWaitForCompletion(true).get();
final TaskResult task = response.getTask();
Expand All @@ -465,10 +537,10 @@ private TaskResult getCompletedTaskResult(final TaskId taskId) {
return task;
}

private void createIndexPinnedToNodeName(final String index, final String nodeName) {
private void createIndexPinnedToNodeName(final String index, final String nodeName, final int shards) {
prepareCreate(index).setSettings(
Settings.builder()
.put("index.number_of_shards", randomIntBetween(1, 3))
.put("index.number_of_shards", shards)
.put("index.number_of_replicas", 0)
.put("index.routing.allocation.require._name", nodeName)
).get();
Expand Down Expand Up @@ -516,14 +588,24 @@ private void assertReindexSuccessMetricsOnNode(final String nodeName, final bool
final TestTelemetryPlugin plugin = getTelemetryPlugin(nodeName);
plugin.collect();
final List<Measurement> completions = plugin.getLongCounterMeasurement(ReindexMetrics.REINDEX_COMPLETION_COUNTER);
assertThat(completions.size(), equalTo(slices));
for (final Measurement completion : completions) {
assertNull(completion.attributes().get(ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE));
final String expectedSource = isRemote
? ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE
: ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL;
assertThat(completion.attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SOURCE), equalTo(expectedSource));
assertThat(completions.size(), equalTo(1));
assertNull(completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE));
final String expectedSource = isRemote ? ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE : ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL;
assertThat(completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SOURCE), equalTo(expectedSource));
SlicingMode slicingMode = null;
if (slices == 0) {
slicingMode = SlicingMode.AUTO;
} else if (slices == 1) {
slicingMode = SlicingMode.NONE;
} else if (slices > 1) {
slicingMode = SlicingMode.FIXED;
} else {
fail("invalid slices value: " + slices);
}
assertThat(
completions.getFirst().attributes().get(ReindexMetrics.ATTRIBUTE_NAME_SLICING_MODE),
equalTo(slicingMode.name().toLowerCase(Locale.ROOT))
);
}

private void assertExpectedNumberOfDocumentsInDestinationIndex() throws IOException {
Expand Down
Loading