Skip to content

Commit

Permalink
[BUG] Add missing fields to resolve Strict Dynamic Mapping issue in .…
Browse files Browse the repository at this point in the history
…tasks index (opensearch-project#16060)

- Fixed issue where `.tasks` index failed to update due to StrictDynamicMappingException when a task was cancelled.
- Added missing `cancellation_time_millis` and `resource_stats` fields to `task-index-mapping.json`.
- Ensured proper task result storage by updating the mappings.
- Changed the version in the meta field from 4 to 5 to reflect the updated mappings.

Signed-off-by: inpink <[email protected]>
  • Loading branch information
inpink committed Oct 19, 2024
1 parent 0bded88 commit f09fe85
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)

- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.Streams;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.tasks.resourcetracker.TaskResourceStats;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.tasks.resourcetracker.TaskThreadUsage;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.mapper.StrictDynamicMappingException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
Expand All @@ -73,11 +78,17 @@
import org.opensearch.transport.ReceiveTimeoutTransportException;
import org.opensearch.transport.TransportService;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand All @@ -103,6 +114,8 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

/**
* Integration tests for task management API
Expand All @@ -112,6 +125,26 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class TasksIT extends AbstractTasksIT {

protected final TaskInfo taskInfo = new TaskInfo(
new TaskId("fake", 1),
"test_type",
"test_action",
"test_description",
null,
0L,
1L,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
new TaskResourceStats(new HashMap<>() {
{
put("dummy-type1", new TaskResourceUsage(10, 20));
}
}, new TaskThreadUsage(30, 40)),
2L
);

public void testTaskCounts() {
// Run only on data nodes
ListTasksResponse response = client().admin()
Expand Down Expand Up @@ -879,46 +912,77 @@ public void testNodeNotFoundButTaskFound() throws Exception {
// Save a fake task that looks like it is from a node that isn't part of the cluster
CyclicBarrier b = new CyclicBarrier(2);
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
null
),
new RuntimeException("test")
),
new ActionListener<Void>() {
resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
});
b.await();

// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
TaskResult taskResult = response.getTask();
TaskInfo task = taskResult.getTask();

assertEquals("fake", task.getTaskId().getNodeId());
assertEquals(1, task.getTaskId().getId());
assertEquals("test_type", task.getType());
assertEquals("test_action", task.getAction());
assertEquals("test_description", task.getDescription());
assertEquals(0L, task.getStartTime());
assertEquals(1L, task.getRunningTimeNanos());
assertFalse(task.isCancellable());
assertFalse(task.isCancelled());
assertEquals(TaskId.EMPTY_TASK_ID, task.getParentTaskId());
assertEquals(1, task.getResourceStats().getResourceUsageInfo().size());
assertEquals(30, task.getResourceStats().getThreadUsage().getThreadExecutions());
assertEquals(40, task.getResourceStats().getThreadUsage().getActiveThreads());
assertEquals(Long.valueOf(2L), task.getCancellationStartTime());

assertNotNull(taskResult.getError());
assertNull(taskResult.getResponse());
}

public void testStoreTaskResultFailsDueToMissingIndexMappingFields() throws IOException {
// given
TaskResultsService resultsService = spy(internalCluster().getInstance(TaskResultsService.class));

InputStream mockInputStream = getClass().getResourceAsStream("/org/opensearch/tasks/missing-fields-task-index-mapping.json");
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(mockInputStream, out);
String mockJsonString = out.toString(StandardCharsets.UTF_8.name());

// when & then
doReturn(mockJsonString).when(resultsService).taskResultIndexMapping();

CompletionException thrown = assertThrows(CompletionException.class, () -> {
CompletableFuture<Void> future = new CompletableFuture<>();

resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
onFailure(e);
}
future.complete(null);
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
future.completeExceptionally(e);
}
}
);
b.await();
});

// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
assertEquals("test", response.getTask().getTask().getAction());
assertNotNull(response.getTask().getError());
assertNull(response.getTask().getResponse());
future.join();
});

assertTrue(thrown.getCause() instanceof StrictDynamicMappingException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class TaskResultsService {

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json
public static final int TASK_RESULT_MAPPING_VERSION = 5; // must match version in task-index-mapping.json

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_doc" : {
"_meta": {
"version": 4
"version": 5
},
"dynamic" : "strict",
"properties" : {
Expand Down Expand Up @@ -34,6 +34,9 @@
"start_time_in_millis": {
"type": "long"
},
"cancellation_time_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
Expand All @@ -47,6 +50,10 @@
"headers": {
"type" : "object",
"enabled" : false
},
"resource_stats": {
"type" : "object",
"enabled" : false
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"_doc" : {
"_meta": {
"version": 5
},
"dynamic" : "strict",
"properties" : {
"completed": {
"type": "boolean"
},
"task" : {
"properties": {
"action": {
"type": "keyword"
},
"cancellable": {
"type": "boolean"
},
"cancelled": {
"type": "boolean"
},
"id": {
"type": "long"
},
"parent_task_id": {
"type": "keyword"
},
"node": {
"type": "keyword"
},
"running_time_in_nanos": {
"type": "long"
},
"start_time_in_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
"status": {
"type" : "object",
"enabled" : false
},
"description": {
"type": "text"
},
"headers": {
"type" : "object",
"enabled" : false
}
}
},
"response" : {
"type" : "object",
"enabled" : false
},
"error" : {
"type" : "object",
"enabled" : false
}
}
}
}

0 comments on commit f09fe85

Please sign in to comment.