diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 57b240dd3d..1098525015 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -254,7 +254,8 @@ private void populateWorkflowOutput(Workflow workflow) { * Removes a workflow from the system * * @param workflowId the id of the workflow to be deleted - * @param archiveWorkflow flag to indicate if the workflow should be archived before deletion + * @param archiveWorkflow flag to indicate if the workflow and associated tasks should be + * archived before deletion */ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { Validate.notBlank(workflowId, "Workflow id cannot be blank"); diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index b8ac050eca..c8bb0e6b5d 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -44,6 +44,7 @@ import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; +import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.*; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; @@ -332,19 +333,43 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) { * Removes the workflow from the data store. * * @param workflowId the id of the workflow to be removed - * @param archiveWorkflow if true, the workflow will be archived in the {@link IndexDAO} after - * removal from {@link ExecutionDAO} + * @param archiveWorkflow if true, the workflow and associated tasks will be archived in the + * {@link IndexDAO} after removal from {@link ExecutionDAO}. */ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); + executionDAO.removeWorkflow(workflowId); try { removeWorkflowIndex(workflow, archiveWorkflow); } catch (JsonProcessingException e) { throw new TransientException("Workflow can not be serialized to json", e); } - executionDAO.removeWorkflow(workflowId); + workflow.getTasks() + .forEach( + task -> { + try { + removeTaskIndex(workflow, task, archiveWorkflow); + } catch (JsonProcessingException e) { + throw new TransientException( + String.format( + "Task %s of workflow %s can not be serialized to json", + task.getTaskId(), workflow.getWorkflowId()), + e); + } + + try { + queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); + } catch (Exception e) { + LOGGER.info( + "Error removing task: {} of workflow: {} from {} queue", + workflowId, + task.getTaskId(), + QueueUtils.getQueueName(task), + e); + } + }); try { queueDAO.remove(DECIDER_QUEUE, workflowId); @@ -509,6 +534,29 @@ public void removeTask(String taskId) { executionDAO.removeTask(taskId); } + private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean archiveTask) + throws JsonProcessingException { + if (archiveTask) { + if (task.getStatus().isTerminal()) { + // Only allow archival if task is in terminal state + // DO NOT archive async, since if archival errors out, task data will be lost + indexDAO.updateTask( + workflow.getWorkflowId(), + task.getTaskId(), + new String[] {ARCHIVED_FIELD}, + new Object[] {true}); + } else { + throw new IllegalArgumentException( + String.format( + "Cannot archive task: %s of workflow: %s with status: %s", + task.getTaskId(), workflow.getWorkflowId(), task.getStatus())); + } + } else { + // Not archiving, remove task from index + indexDAO.asyncRemoveTask(workflow.getWorkflowId(), task.getTaskId()); + } + } + public void extendLease(TaskModel taskModel) { taskModel.setUpdateTime(System.currentTimeMillis()); executionDAO.updateTask(taskModel); diff --git a/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java b/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java index 4a7f427cbc..05d6550b78 100644 --- a/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java @@ -78,6 +78,23 @@ public CompletableFuture asyncUpdateWorkflow( return CompletableFuture.completedFuture(null); } + @Override + public void removeTask(String workflowId, String taskId) {} + + @Override + public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { + return CompletableFuture.completedFuture(null); + } + + @Override + public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {} + + @Override + public CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values) { + return CompletableFuture.completedFuture(null); + } + @Override public String get(String workflowInstanceId, String key) { return null; diff --git a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java index 490758d151..927856657a 100644 --- a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java @@ -111,6 +111,45 @@ SearchResult searchTasks( CompletableFuture asyncUpdateWorkflow( String workflowInstanceId, String[] keys, Object[] values); + /** + * Remove the task index + * + * @param workflowId workflow containing task + * @param taskId task to be removed + */ + void removeTask(String workflowId, String taskId); + + /** + * Remove the task index asynchronously + * + * @param workflowId workflow containing task + * @param taskId task to be removed + * @return CompletableFuture of type void + */ + CompletableFuture asyncRemoveTask(String workflowId, String taskId); + + /** + * Updates the index + * + * @param workflowId id of the workflow + * @param taskId id of the task + * @param keys keys to be updated + * @param values values. Number of keys and values MUST match. + */ + void updateTask(String workflowId, String taskId, String[] keys, Object[] values); + + /** + * Updates the index + * + * @param workflowId id of the workflow + * @param taskId id of the task + * @param keys keys to be updated + * @param values values. Number of keys and values MUST match. + * @return CompletableFuture of type void + */ + CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values); + /** * Retrieves a specific field from the index * diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java index beba7af92c..bb760ac684 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java @@ -134,7 +134,7 @@ Workflow getExecutionStatus( * Removes the workflow from the system. * * @param workflowId WorkflowID of the workflow you want to remove from system. - * @param archiveWorkflow Archives the workflow. + * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ void deleteWorkflow( @NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId, diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java index 864368f85b..99732a23a2 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java @@ -190,7 +190,7 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) { * Removes the workflow from the system. * * @param workflowId WorkflowID of the workflow you want to remove from system. - * @param archiveWorkflow Archives the workflow. + * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { executionService.removeWorkflow(workflowId, archiveWorkflow); diff --git a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java index 69a4f83c2a..aceccf2b28 100644 --- a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java +++ b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java @@ -139,11 +139,21 @@ public void testGetWorkflowsByCorrelationId() { @Test public void testRemoveWorkflow() { WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowId("workflowId"); workflow.setStatus(WorkflowModel.Status.COMPLETED); + + TaskModel task = new TaskModel(); + task.setTaskId("taskId"); + workflow.setTasks(Collections.singletonList(task)); + when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); executionDAOFacade.removeWorkflow("workflowId", false); - verify(indexDAO, never()).updateWorkflow(any(), any(), any()); - verify(indexDAO, times(1)).asyncRemoveWorkflow(workflow.getWorkflowId()); + verify(executionDAO, times(1)).removeWorkflow(anyString()); + verify(executionDAO, never()).removeTask(anyString()); + verify(indexDAO, never()).updateWorkflow(anyString(), any(), any()); + verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); + verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString()); + verify(indexDAO, times(1)).asyncRemoveTask(anyString(), anyString()); } @Test @@ -153,8 +163,12 @@ public void testArchiveWorkflow() throws Exception { when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); executionDAOFacade.removeWorkflow("workflowId", true); - verify(indexDAO, times(1)).updateWorkflow(any(), any(), any()); - verify(indexDAO, never()).removeWorkflow(any()); + verify(executionDAO, times(1)).removeWorkflow(anyString()); + verify(executionDAO, never()).removeTask(anyString()); + verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any()); + verify(indexDAO, times(15)).updateTask(anyString(), anyString(), any(), any()); + verify(indexDAO, never()).removeWorkflow(anyString()); + verify(indexDAO, never()).removeTask(anyString(), anyString()); } @Test diff --git a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java index f5154ff169..dd61089835 100644 --- a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java @@ -41,12 +41,7 @@ import static com.netflix.conductor.TestUtils.getConstraintViolationMessages; import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -184,12 +179,30 @@ public void testNotFoundExceptionGetExecutionStatus() { @Test public void testDeleteWorkflow() { - workflowService.deleteWorkflow("w123", true); - verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean()); + workflowService.deleteWorkflow("w123", false); + verify(executionService, times(1)).removeWorkflow(anyString(), eq(false)); } @Test(expected = ConstraintViolationException.class) public void testInvalidDeleteWorkflow() { + try { + workflowService.deleteWorkflow(null, false); + } catch (ConstraintViolationException ex) { + assertEquals(1, ex.getConstraintViolations().size()); + Set messages = getConstraintViolationMessages(ex.getConstraintViolations()); + assertTrue(messages.contains("WorkflowId cannot be null or empty.")); + throw ex; + } + } + + @Test + public void testArchiveWorkflow() { + workflowService.deleteWorkflow("w123", true); + verify(executionService, times(1)).removeWorkflow(anyString(), eq(true)); + } + + @Test(expected = ConstraintViolationException.class) + public void testInvalidArchiveWorkflow() { try { workflowService.deleteWorkflow(null, true); } catch (ConstraintViolationException ex) { diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java index 716be69815..b99c858549 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java @@ -732,6 +732,98 @@ public CompletableFuture asyncUpdateWorkflow( () -> updateWorkflow(workflowInstanceId, keys, values), executorService); } + @Override + public void removeTask(String workflowId, String taskId) { + try { + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + + SearchResult taskSearchResult = + searchTasks( + String.format( + "(taskId='%s') AND (workflowId='%s')", taskId, workflowId), + "*", + 0, + 1, + null); + + if (taskSearchResult.getTotalHits() == 0) { + LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId); + Monitors.error(CLASS_NAME, "removeTask"); + return; + } + + DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId); + DeleteResponse response = elasticSearchClient.delete(request).actionGet(); + long endTime = Instant.now().toEpochMilli(); + + if (response.getResult() != DocWriteResponse.Result.DELETED) { + LOGGER.error( + "Index removal failed - task not found by id: {} of workflow: {}", + taskId, + workflowId); + Monitors.error(CLASS_NAME, "removeTask"); + return; + } + LOGGER.debug( + "Time taken {} for removing task:{} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("remove_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } catch (Exception e) { + LOGGER.error( + "Failed to remove task: {} of workflow: {} from index", taskId, workflowId, e); + Monitors.error(CLASS_NAME, "removeTask"); + } + } + + @Override + public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { + return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService); + } + + @Override + public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) { + if (keys.length != values.length) { + throw new IllegalArgumentException("Number of keys and values do not match"); + } + + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + + UpdateRequest request = new UpdateRequest(taskIndexName, docType, taskId); + Map source = + IntStream.range(0, keys.length) + .boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + request.doc(source); + LOGGER.debug( + "Updating task: {} of workflow: {} in elasticsearch index: {}", + taskId, + workflowId, + taskIndexName); + elasticSearchClient.update(request).actionGet(); + long endTime = Instant.now().toEpochMilli(); + LOGGER.debug( + "Time taken {} for updating task: {} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("update_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } + + @Override + public CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values) { + return CompletableFuture.runAsync( + () -> updateTask(workflowId, taskId, keys, values), executorService); + } + @Override public String get(String workflowInstanceId, String fieldToGet) { String docType = StringUtils.isBlank(docTypeOverride) ? WORKFLOW_DOC_TYPE : docTypeOverride; diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java index 764329ab35..6ebf161329 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java @@ -810,6 +810,96 @@ public CompletableFuture asyncUpdateWorkflow( () -> updateWorkflow(workflowInstanceId, keys, values), executorService); } + @Override + public void removeTask(String workflowId, String taskId) { + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + + SearchResult taskSearchResult = + searchTasks( + String.format("(taskId='%s') AND (workflowId='%s')", taskId, workflowId), + "*", + 0, + 1, + null); + + if (taskSearchResult.getTotalHits() == 0) { + LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId); + Monitors.error(className, "removeTask"); + return; + } + + DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId); + + try { + DeleteResponse response = elasticSearchClient.delete(request); + + if (response.getResult() != DocWriteResponse.Result.DELETED) { + LOGGER.error("Index removal failed - task not found by id: {}", workflowId); + Monitors.error(className, "removeTask"); + return; + } + long endTime = Instant.now().toEpochMilli(); + LOGGER.debug( + "Time taken {} for removing task:{} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("remove_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } catch (IOException e) { + LOGGER.error( + "Failed to remove task {} of workflow: {} from index", taskId, workflowId, e); + Monitors.error(className, "removeTask"); + } + } + + @Override + public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { + return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService); + } + + @Override + public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) { + try { + if (keys.length != values.length) { + throw new IllegalArgumentException("Number of keys and values do not match"); + } + + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + UpdateRequest request = new UpdateRequest(taskIndexName, docType, taskId); + Map source = + IntStream.range(0, keys.length) + .boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + request.doc(source); + + LOGGER.debug("Updating task: {} of workflow: {} with {}", taskId, workflowId, source); + elasticSearchClient.update(request, RequestOptions.DEFAULT); + long endTime = Instant.now().toEpochMilli(); + LOGGER.debug( + "Time taken {} for updating task: {} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("update_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } catch (Exception e) { + LOGGER.error("Failed to update task: {} of workflow: {}", taskId, workflowId, e); + Monitors.error(className, "update"); + } + } + + @Override + public CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values) { + return CompletableFuture.runAsync( + () -> updateTask(workflowId, taskId, keys, values), executorService); + } + @Override public String get(String workflowInstanceId, String fieldToGet) { diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java index 1597d3cb16..75dd64454c 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java @@ -34,8 +34,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; public class TestElasticSearchDAOV6 extends ElasticSearchDaoBaseTest { @@ -196,6 +196,84 @@ public void shouldIndexTaskAsync() throws Exception { assertEquals(taskSummary.getTaskId(), tasks.get(0)); } + @Test + public void shouldRemoveTask() { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldAsyncRemoveTask() throws Exception { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotRemoveTaskWhenNotAssociatedWithWorkflow() { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask("InvalidWorkflow", taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotAsyncRemoveTaskWhenNotAssociatedWithWorkflow() throws Exception { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask("InvalidWorkflow", taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + @Test public void shouldAddTaskExecutionLogs() { List logs = new ArrayList<>(); diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java index 50117ce138..154b891add 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java @@ -38,8 +38,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestElasticSearchRestDAOV6 extends ElasticSearchRestDaoBaseTest { @@ -195,6 +194,84 @@ public void shouldIndexTaskAsync() throws Exception { assertEquals(taskSummary.getTaskId(), tasks.get(0)); } + @Test + public void shouldRemoveTask() { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldAsyncRemoveTask() throws Exception { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotRemoveTaskWhenNotAssociatedWithWorkflow() { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask("InvalidWorkflow", taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotAsyncRemoveTaskWhenNotAssociatedWithWorkflow() throws Exception { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask("InvalidWorkflow", taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + @Test public void shouldAddTaskExecutionLogs() { List logs = new ArrayList<>(); diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java index 1102cd494d..4cb8d234d7 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java @@ -52,6 +52,18 @@ public static TaskSummary loadTaskSnapshot(ObjectMapper objectMapper, String res } } + public static TaskSummary loadTaskSnapshot( + ObjectMapper objectMapper, String resourceFileName, String workflowId) { + try { + String content = loadJsonResource(resourceFileName); + content = content.replace(WORKFLOW_INSTANCE_ID_PLACEHOLDER, workflowId); + + return objectMapper.readValue(content, TaskSummary.class); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + public static String loadJsonResource(String resourceFileName) { try { return FileUtils.readFileToString( diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy index 3834323f38..44f7ade064 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy @@ -23,6 +23,8 @@ import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.common.utils.ExternalPayloadStorage import com.netflix.conductor.core.exception.NotFoundException import com.netflix.conductor.core.exception.TransientException +import com.netflix.conductor.core.utils.QueueUtils +import com.netflix.conductor.core.utils.Utils import com.netflix.conductor.rest.controllers.TaskResource import com.netflix.conductor.rest.controllers.WorkflowResource import com.netflix.conductor.test.base.AbstractResiliencySpecification @@ -255,10 +257,16 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.delete(workflowInstanceId, false) then: "Verify queueDAO is called to remove from _deciderQueue" - 1 * queueDAO._ + 1 * queueDAO.remove(Utils.DECIDER_QUEUE, _) - when: "We try to get deleted workflow" - workflowResource.getExecutionStatus(workflowInstanceId, true) + when: "We try to get deleted workflow, verify the status and check if tasks are not removed from queue" + with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.TERMINATED + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.CANCELED + 0 * queueDAO.remove(QueueUtils.getQueueName(tasks[0]), _) + } then: thrown(NotFoundException.class)