-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Add PersistentTasksClusterService::unassignPersistentTask method #37576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
6340be8
11af7df
5cd2f9d
df24e18
5fcf03a
7756bc7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,8 +46,10 @@ | |
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; | ||
| import static org.hamcrest.Matchers.empty; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.hasSize; | ||
| import static org.hamcrest.Matchers.notNullValue; | ||
| import static org.hamcrest.Matchers.nullValue; | ||
| import static org.hamcrest.core.Is.is; | ||
|
|
||
| @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2) | ||
| public class PersistentTasksExecutorIT extends ESIntegTestCase { | ||
|
|
@@ -155,11 +157,8 @@ public void testPersistentActionWithNoAvailableNode() throws Exception { | |
| Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); | ||
| String newNode = internalCluster().startNode(nodeSettings); | ||
| String newNodeId = internalCluster().clusterService(newNode).localNode().getId(); | ||
| assertBusy(() -> { | ||
| // Wait for the task to start | ||
| assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() | ||
| .size(), equalTo(1)); | ||
| }); | ||
| waitForTaskToStart(); | ||
|
|
||
| TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") | ||
| .get().getTasks().get(0); | ||
|
|
||
|
|
@@ -199,11 +198,7 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception | |
|
|
||
| TestPersistentTasksExecutor.setNonClusterStateCondition(true); | ||
|
|
||
| assertBusy(() -> { | ||
| // Wait for the task to start | ||
| assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() | ||
| .size(), equalTo(1)); | ||
| }); | ||
| waitForTaskToStart(); | ||
| TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") | ||
| .get().getTasks().get(0); | ||
|
|
||
|
|
@@ -221,12 +216,7 @@ public void testPersistentActionStatusUpdate() throws Exception { | |
| PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); | ||
| persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); | ||
| String taskId = future.get().getId(); | ||
|
|
||
| assertBusy(() -> { | ||
| // Wait for the task to start | ||
| assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() | ||
| .size(), equalTo(1)); | ||
| }); | ||
| waitForTaskToStart(); | ||
| TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") | ||
| .get().getTasks().get(0); | ||
|
|
||
|
|
@@ -307,6 +297,63 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception { | |
| }); | ||
| } | ||
|
|
||
| public void testUnassignRunningPersistentTask() throws Exception { | ||
| PersistentTasksClusterService persistentTasksClusterService = | ||
| internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName()); | ||
| // Speed up rechecks to a rate that is quicker than what settings would allow | ||
| persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1)); | ||
| PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); | ||
| PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); | ||
| TestParams testParams = new TestParams("Blah"); | ||
| testParams.setExecutorNodeAttr("test"); | ||
| persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); | ||
| PersistentTask<TestParams> task = future.get(); | ||
| String taskId = task.getId(); | ||
|
|
||
| Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); | ||
| internalCluster().startNode(nodeSettings); | ||
|
|
||
| waitForTaskToStart(); | ||
|
|
||
| PlainActionFuture<PersistentTask<?>> unassignmentFuture = new PlainActionFuture<>(); | ||
|
|
||
| // Disallow re-assignment after it is unallocated to verify master and node state | ||
| TestPersistentTasksExecutor.setNonClusterStateCondition(false); | ||
|
|
||
| persistentTasksClusterService.unassignPersistentTask(taskId, | ||
| task.getAllocationId() + 1, | ||
| "unassignment test", | ||
| unassignmentFuture); | ||
| PersistentTask<?> unassignedTask = unassignmentFuture.get(); | ||
| assertThat(unassignedTask.getId(), equalTo(taskId)); | ||
| assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test")); | ||
| assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue())); | ||
|
|
||
| assertBusy(() -> { | ||
| // Verify that the task is NOT running on the node | ||
| List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() | ||
| .getTasks(); | ||
| assertThat(tasks.size(), equalTo(0)); | ||
|
|
||
| // Verify that the task is STILL in internal cluster state | ||
| internalClusterHasSingleTask(taskId); | ||
| }); | ||
|
|
||
| // Allow it to be reassigned again to the same node | ||
| TestPersistentTasksExecutor.setNonClusterStateCondition(true); | ||
|
|
||
| // Verify it starts again | ||
| waitForTaskToStart(); | ||
|
|
||
| // Assert that we still have it in master state | ||
|
||
| internalClusterHasSingleTask(taskId); | ||
|
|
||
| // Complete or cancel the running task | ||
| TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") | ||
| .get().getTasks().get(0); | ||
| stopOrCancelTask(taskInfo.getTaskId()); | ||
| } | ||
|
|
||
| private void stopOrCancelTask(TaskId taskId) { | ||
| if (randomBoolean()) { | ||
| logger.info("Completing the running task"); | ||
|
|
@@ -322,6 +369,25 @@ private void stopOrCancelTask(TaskId taskId) { | |
| } | ||
| } | ||
|
|
||
| private static void waitForTaskToStart() throws Exception { | ||
| assertBusy(() -> { | ||
| // Wait for the task to start | ||
| assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() | ||
| .size(), equalTo(1)); | ||
| }); | ||
| } | ||
|
|
||
| private static void internalClusterHasSingleTask(String taskId) { | ||
|
||
| Collection<PersistentTask<?>> clusterTasks = ((PersistentTasksCustomMetaData) internalCluster() | ||
| .clusterService() | ||
| .state() | ||
| .getMetaData() | ||
| .custom(PersistentTasksCustomMetaData.TYPE)) | ||
| .tasks(); | ||
| assertThat(clusterTasks, hasSize(1)); | ||
| assertThat(clusterTasks.iterator().next().getId(), equalTo(taskId)); | ||
| } | ||
|
|
||
| private void assertNoRunningTasks() throws Exception { | ||
| assertBusy(() -> { | ||
| // Wait for the task to finish | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a risk to set it to a such low value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already another test that sets 1ms for the recheck interval. I agree 1ms would be a completely inappropriate setting for production, but that's why it's being set via a method that end users cannot call. I think if a test fails because the interval is low then it will be exposing a bug that could happen with a higher setting, just much less frequently. During this test it's true that the master node will be doing a lot of work iterating through the persistent tasks list, but it won't be doing the other work that a production master node would be doing, so a modern CPU should be able to cope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the precision. I just wanted to be sure that this value couldn't overhelm any thread pool and cause other issues.