diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d67ed755fa31..96164bd839841 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370) - Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386)) +- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java index c7d75108883dd..4d8c80954cd0a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java @@ -58,9 +58,14 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.Streams; import org.opensearch.core.action.ActionListener; import org.opensearch.core.tasks.TaskId; +import org.opensearch.core.tasks.resourcetracker.TaskResourceStats; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.core.tasks.resourcetracker.TaskThreadUsage; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.mapper.StrictDynamicMappingException; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; @@ -73,11 +78,17 @@ import org.opensearch.transport.ReceiveTimeoutTransportException; import org.opensearch.transport.TransportService; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -103,6 +114,8 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; /** * Integration tests for task management API @@ -112,6 +125,26 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class TasksIT extends AbstractTasksIT { + protected final TaskInfo taskInfo = new TaskInfo( + new TaskId("fake", 1), + "test_type", + "test_action", + "test_description", + null, + 0L, + 1L, + false, + false, + TaskId.EMPTY_TASK_ID, + Collections.emptyMap(), + new TaskResourceStats(new HashMap<>() { + { + put("dummy-type1", new TaskResourceUsage(10, 20)); + } + }, new TaskThreadUsage(30, 40)), + 2L + ); + public void testTaskCounts() { // Run only on data nodes ListTasksResponse response = client().admin() @@ -879,46 +912,77 @@ public void testNodeNotFoundButTaskFound() throws Exception { // Save a fake task that looks like it is from a node that isn't part of the cluster CyclicBarrier b = new CyclicBarrier(2); TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class); - resultsService.storeResult( - new TaskResult( - new TaskInfo( - new TaskId("fake", 1), - "test", - "test", - "", - null, - 0, - 0, - false, - false, - TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - null - ), - new RuntimeException("test") - ), - new ActionListener() { + resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener() { + @Override + public void onResponse(Void response) { + try { + b.await(); + } catch (InterruptedException | BrokenBarrierException e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }); + b.await(); + + // Now we can find it! + GetTaskResponse response = expectFinishedTask(new TaskId("fake:1")); + TaskResult taskResult = response.getTask(); + TaskInfo task = taskResult.getTask(); + + assertEquals("fake", task.getTaskId().getNodeId()); + assertEquals(1, task.getTaskId().getId()); + assertEquals("test_type", task.getType()); + assertEquals("test_action", task.getAction()); + assertEquals("test_description", task.getDescription()); + assertEquals(0L, task.getStartTime()); + assertEquals(1L, task.getRunningTimeNanos()); + assertFalse(task.isCancellable()); + assertFalse(task.isCancelled()); + assertEquals(TaskId.EMPTY_TASK_ID, task.getParentTaskId()); + assertEquals(1, task.getResourceStats().getResourceUsageInfo().size()); + assertEquals(30, task.getResourceStats().getThreadUsage().getThreadExecutions()); + assertEquals(40, task.getResourceStats().getThreadUsage().getActiveThreads()); + assertEquals(Long.valueOf(2L), task.getCancellationStartTime()); + + assertNotNull(taskResult.getError()); + assertNull(taskResult.getResponse()); + } + + public void testStoreTaskResultFailsDueToMissingIndexMappingFields() throws IOException { + // given + TaskResultsService resultsService = spy(internalCluster().getInstance(TaskResultsService.class)); + + InputStream mockInputStream = getClass().getResourceAsStream("/org/opensearch/tasks/missing-fields-task-index-mapping.json"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Streams.copy(mockInputStream, out); + String mockJsonString = out.toString(StandardCharsets.UTF_8.name()); + + // when & then + doReturn(mockJsonString).when(resultsService).taskResultIndexMapping(); + + CompletionException thrown = assertThrows(CompletionException.class, () -> { + CompletableFuture future = new CompletableFuture<>(); + + resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener() { @Override public void onResponse(Void response) { - try { - b.await(); - } catch (InterruptedException | BrokenBarrierException e) { - onFailure(e); - } + future.complete(null); } @Override public void onFailure(Exception e) { - throw new RuntimeException(e); + future.completeExceptionally(e); } - } - ); - b.await(); + }); - // Now we can find it! - GetTaskResponse response = expectFinishedTask(new TaskId("fake:1")); - assertEquals("test", response.getTask().getTask().getAction()); - assertNotNull(response.getTask().getError()); - assertNull(response.getTask().getResponse()); + future.join(); + }); + + assertTrue(thrown.getCause() instanceof StrictDynamicMappingException); } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskResultsService.java b/server/src/main/java/org/opensearch/tasks/TaskResultsService.java index d1ee04bd5cb25..3d11bf77ae32a 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResultsService.java @@ -85,7 +85,7 @@ public class TaskResultsService { public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; - public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json + public static final int TASK_RESULT_MAPPING_VERSION = 5; // must match version in task-index-mapping.json /** * The backoff policy to use when saving a task result fails. The total wait diff --git a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json index 58b6b2d3bc873..21d418c472898 100644 --- a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json +++ b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json @@ -1,7 +1,7 @@ { "_doc" : { "_meta": { - "version": 4 + "version": 5 }, "dynamic" : "strict", "properties" : { @@ -34,6 +34,9 @@ "start_time_in_millis": { "type": "long" }, + "cancellation_time_millis": { + "type": "long" + }, "type": { "type": "keyword" }, @@ -47,6 +50,10 @@ "headers": { "type" : "object", "enabled" : false + }, + "resource_stats": { + "type" : "object", + "enabled" : false } } }, diff --git a/server/src/test/resources/org/opensearch/tasks/missing-fields-task-index-mapping.json b/server/src/test/resources/org/opensearch/tasks/missing-fields-task-index-mapping.json new file mode 100644 index 0000000000000..2e59bbc4803bf --- /dev/null +++ b/server/src/test/resources/org/opensearch/tasks/missing-fields-task-index-mapping.json @@ -0,0 +1,63 @@ +{ + "_doc" : { + "_meta": { + "version": 5 + }, + "dynamic" : "strict", + "properties" : { + "completed": { + "type": "boolean" + }, + "task" : { + "properties": { + "action": { + "type": "keyword" + }, + "cancellable": { + "type": "boolean" + }, + "cancelled": { + "type": "boolean" + }, + "id": { + "type": "long" + }, + "parent_task_id": { + "type": "keyword" + }, + "node": { + "type": "keyword" + }, + "running_time_in_nanos": { + "type": "long" + }, + "start_time_in_millis": { + "type": "long" + }, + "type": { + "type": "keyword" + }, + "status": { + "type" : "object", + "enabled" : false + }, + "description": { + "type": "text" + }, + "headers": { + "type" : "object", + "enabled" : false + } + } + }, + "response" : { + "type" : "object", + "enabled" : false + }, + "error" : { + "type" : "object", + "enabled" : false + } + } + } +}