Skip to content

Commit 797821f

Browse files
committed
[BUG] Add missing fields to resolve Strict Dynamic Mapping issue in .tasks index (opensearch-project#16060)
- Fixed issue where `.tasks` index failed to update due to StrictDynamicMappingException when a task was cancelled. - Added missing `cancellation_time_millis` and `resource_stats` fields to `task-index-mapping.json`. - Ensured proper task result storage by updating the mappings. - Changed the version in the meta field from 4 to 5 to reflect the updated mappings. Signed-off-by: inpink <[email protected]>
1 parent 0bded88 commit 797821f

File tree

5 files changed

+170
-35
lines changed

5 files changed

+170
-35
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8787
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)
8888

8989
- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))
90+
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))
9091

9192
### Security
9293

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java

+97-33
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,14 @@
5858
import org.opensearch.common.collect.Tuple;
5959
import org.opensearch.common.regex.Regex;
6060
import org.opensearch.common.settings.Settings;
61+
import org.opensearch.common.util.io.Streams;
6162
import org.opensearch.core.action.ActionListener;
6263
import org.opensearch.core.tasks.TaskId;
64+
import org.opensearch.core.tasks.resourcetracker.TaskResourceStats;
65+
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
66+
import org.opensearch.core.tasks.resourcetracker.TaskThreadUsage;
6367
import org.opensearch.core.xcontent.MediaTypeRegistry;
68+
import org.opensearch.index.mapper.StrictDynamicMappingException;
6469
import org.opensearch.index.query.QueryBuilders;
6570
import org.opensearch.search.builder.SearchSourceBuilder;
6671
import org.opensearch.tasks.Task;
@@ -73,11 +78,17 @@
7378
import org.opensearch.transport.ReceiveTimeoutTransportException;
7479
import org.opensearch.transport.TransportService;
7580

81+
import java.io.ByteArrayOutputStream;
82+
import java.io.IOException;
83+
import java.io.InputStream;
84+
import java.nio.charset.StandardCharsets;
7685
import java.util.Collections;
7786
import java.util.HashMap;
7887
import java.util.List;
7988
import java.util.Map;
8089
import java.util.concurrent.BrokenBarrierException;
90+
import java.util.concurrent.CompletableFuture;
91+
import java.util.concurrent.CompletionException;
8192
import java.util.concurrent.CountDownLatch;
8293
import java.util.concurrent.CyclicBarrier;
8394
import java.util.concurrent.TimeUnit;
@@ -103,6 +114,8 @@
103114
import static org.hamcrest.Matchers.not;
104115
import static org.hamcrest.Matchers.notNullValue;
105116
import static org.hamcrest.Matchers.startsWith;
117+
import static org.mockito.Mockito.doReturn;
118+
import static org.mockito.Mockito.spy;
106119

107120
/**
108121
* Integration tests for task management API
@@ -112,6 +125,26 @@
112125
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
113126
public class TasksIT extends AbstractTasksIT {
114127

128+
protected final TaskInfo taskInfo = new TaskInfo(
129+
new TaskId("fake", 1),
130+
"test_type",
131+
"test_action",
132+
"test_description",
133+
null,
134+
0L,
135+
1L,
136+
false,
137+
false,
138+
TaskId.EMPTY_TASK_ID,
139+
Collections.emptyMap(),
140+
new TaskResourceStats(new HashMap<>() {
141+
{
142+
put("dummy-type1", new TaskResourceUsage(10, 20));
143+
}
144+
}, new TaskThreadUsage(30, 40)),
145+
2L
146+
);
147+
115148
public void testTaskCounts() {
116149
// Run only on data nodes
117150
ListTasksResponse response = client().admin()
@@ -879,46 +912,77 @@ public void testNodeNotFoundButTaskFound() throws Exception {
879912
// Save a fake task that looks like it is from a node that isn't part of the cluster
880913
CyclicBarrier b = new CyclicBarrier(2);
881914
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
882-
resultsService.storeResult(
883-
new TaskResult(
884-
new TaskInfo(
885-
new TaskId("fake", 1),
886-
"test",
887-
"test",
888-
"",
889-
null,
890-
0,
891-
0,
892-
false,
893-
false,
894-
TaskId.EMPTY_TASK_ID,
895-
Collections.emptyMap(),
896-
null
897-
),
898-
new RuntimeException("test")
899-
),
900-
new ActionListener<Void>() {
915+
resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
916+
@Override
917+
public void onResponse(Void response) {
918+
try {
919+
b.await();
920+
} catch (InterruptedException | BrokenBarrierException e) {
921+
onFailure(e);
922+
}
923+
}
924+
925+
@Override
926+
public void onFailure(Exception e) {
927+
throw new RuntimeException(e);
928+
}
929+
});
930+
b.await();
931+
932+
// Now we can find it!
933+
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
934+
TaskResult taskResult = response.getTask();
935+
TaskInfo task = taskResult.getTask();
936+
937+
assertEquals("fake", task.getTaskId().getNodeId());
938+
assertEquals(1, task.getTaskId().getId());
939+
assertEquals("test_type", task.getType());
940+
assertEquals("test_action", task.getAction());
941+
assertEquals("test_description", task.getDescription());
942+
assertEquals(0L, task.getStartTime());
943+
assertEquals(1L, task.getRunningTimeNanos());
944+
assertFalse(task.isCancellable());
945+
assertFalse(task.isCancelled());
946+
assertEquals(TaskId.EMPTY_TASK_ID, task.getParentTaskId());
947+
assertEquals(1, task.getResourceStats().getResourceUsageInfo().size());
948+
assertEquals(30, task.getResourceStats().getThreadUsage().getThreadExecutions());
949+
assertEquals(40, task.getResourceStats().getThreadUsage().getActiveThreads());
950+
assertEquals(Long.valueOf(2L), task.getCancellationStartTime());
951+
952+
assertNotNull(taskResult.getError());
953+
assertNull(taskResult.getResponse());
954+
}
955+
956+
public void testStoreTaskResultFailsDueToMissingIndexMappingFields() throws IOException {
957+
// given
958+
TaskResultsService resultsService = spy(internalCluster().getInstance(TaskResultsService.class));
959+
960+
InputStream mockInputStream = getClass().getResourceAsStream("/org/opensearch/tasks/missing-fields-task-index-mapping.json");
961+
ByteArrayOutputStream out = new ByteArrayOutputStream();
962+
Streams.copy(mockInputStream, out);
963+
String mockJsonString = out.toString(StandardCharsets.UTF_8.name());
964+
965+
// when & then
966+
doReturn(mockJsonString).when(resultsService).taskResultIndexMapping();
967+
968+
CompletionException thrown = assertThrows(CompletionException.class, () -> {
969+
CompletableFuture<Void> future = new CompletableFuture<>();
970+
971+
resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
901972
@Override
902973
public void onResponse(Void response) {
903-
try {
904-
b.await();
905-
} catch (InterruptedException | BrokenBarrierException e) {
906-
onFailure(e);
907-
}
974+
future.complete(null);
908975
}
909976

910977
@Override
911978
public void onFailure(Exception e) {
912-
throw new RuntimeException(e);
979+
future.completeExceptionally(e);
913980
}
914-
}
915-
);
916-
b.await();
981+
});
917982

918-
// Now we can find it!
919-
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
920-
assertEquals("test", response.getTask().getTask().getAction());
921-
assertNotNull(response.getTask().getError());
922-
assertNull(response.getTask().getResponse());
983+
future.join();
984+
});
985+
986+
assertTrue(thrown.getCause() instanceof StrictDynamicMappingException);
923987
}
924988
}

server/src/main/java/org/opensearch/tasks/TaskResultsService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public class TaskResultsService {
8585

8686
public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";
8787

88-
public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json
88+
public static final int TASK_RESULT_MAPPING_VERSION = 5; // must match version in task-index-mapping.json
8989

9090
/**
9191
* The backoff policy to use when saving a task result fails. The total wait

server/src/main/resources/org/opensearch/tasks/task-index-mapping.json

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"_doc" : {
33
"_meta": {
4-
"version": 4
4+
"version": 5
55
},
66
"dynamic" : "strict",
77
"properties" : {
@@ -34,6 +34,9 @@
3434
"start_time_in_millis": {
3535
"type": "long"
3636
},
37+
"cancellation_time_millis": {
38+
"type": "long"
39+
},
3740
"type": {
3841
"type": "keyword"
3942
},
@@ -47,6 +50,10 @@
4750
"headers": {
4851
"type" : "object",
4952
"enabled" : false
53+
},
54+
"resource_stats": {
55+
"type" : "object",
56+
"enabled" : false
5057
}
5158
}
5259
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
{
2+
"_doc" : {
3+
"_meta": {
4+
"version": 5
5+
},
6+
"dynamic" : "strict",
7+
"properties" : {
8+
"completed": {
9+
"type": "boolean"
10+
},
11+
"task" : {
12+
"properties": {
13+
"action": {
14+
"type": "keyword"
15+
},
16+
"cancellable": {
17+
"type": "boolean"
18+
},
19+
"cancelled": {
20+
"type": "boolean"
21+
},
22+
"id": {
23+
"type": "long"
24+
},
25+
"parent_task_id": {
26+
"type": "keyword"
27+
},
28+
"node": {
29+
"type": "keyword"
30+
},
31+
"running_time_in_nanos": {
32+
"type": "long"
33+
},
34+
"start_time_in_millis": {
35+
"type": "long"
36+
},
37+
"type": {
38+
"type": "keyword"
39+
},
40+
"status": {
41+
"type" : "object",
42+
"enabled" : false
43+
},
44+
"description": {
45+
"type": "text"
46+
},
47+
"headers": {
48+
"type" : "object",
49+
"enabled" : false
50+
}
51+
}
52+
},
53+
"response" : {
54+
"type" : "object",
55+
"enabled" : false
56+
},
57+
"error" : {
58+
"type" : "object",
59+
"enabled" : false
60+
}
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)