Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Remove tasks from Elasticsearch when workflow is removed #3300

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.*;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
Expand Down Expand Up @@ -333,24 +334,69 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
*
* @param workflowId the id of the workflow to be removed
* @param archiveWorkflow if true, the workflow will be archived in the {@link IndexDAO} after
* removal from {@link ExecutionDAO}
* removal from {@link ExecutionDAO}. Next to this if removeTasks is true, the tasks
* associated with the workflow will also be archived in the {@link IndexDAO} after removal
* from {@link ExecutionDAO}.
* @param removeTasks if true, the tasks associated with the workflow will be removed
*/
public void removeWorkflow(String workflowId, boolean archiveWorkflow) {
public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) {
if (!removeTasks) {
LOGGER.info("Not removing tasks of workflow: {}", workflowId);
}

WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true);
List<TaskModel> tasks = workflow.getTasks();

executionDAO.removeWorkflow(workflowId);
if (removeTasks) {
tasks.forEach(
task -> {
executionDAO.removeTask(task.getTaskId());
});
}

try {
removeWorkflowIndex(workflow, archiveWorkflow);
} catch (JsonProcessingException e) {
throw new TransientException("Workflow can not be serialized to json", e);
}

executionDAO.removeWorkflow(workflowId);
if (removeTasks) {
tasks.forEach(
task -> {
try {
removeTaskIndex(workflow, task, archiveWorkflow);
} catch (JsonProcessingException e) {
throw new TransientException(
String.format(
"Task %s of workflow %s can not be serialized to json",
task.getTaskId(), workflow.getWorkflowId()),
e);
}
});
}

try {
queueDAO.remove(DECIDER_QUEUE, workflowId);
} catch (Exception e) {
LOGGER.info("Error removing workflow: {} from decider queue", workflowId, e);
}

if (removeTasks) {
tasks.forEach(
task -> {
try {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
} catch (Exception e) {
LOGGER.info(
"Error removing task: {} of workflow: {} from {} queue",
workflowId,
task.getTaskId(),
QueueUtils.getQueueName(task),
e);
}
});
}
}

private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow)
Expand Down Expand Up @@ -509,6 +555,29 @@ public void removeTask(String taskId) {
executionDAO.removeTask(taskId);
}

private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean archiveTask)
throws JsonProcessingException {
if (archiveTask) {
if (task.getStatus().isTerminal()) {
// Only allow archival if task is in terminal state
// DO NOT archive async, since if archival errors out, task data will be lost
indexDAO.updateTask(
RemcoBuddelmeijer marked this conversation as resolved.
Show resolved Hide resolved
workflow.getWorkflowId(),
task.getTaskId(),
new String[] {RAW_JSON_FIELD, ARCHIVED_FIELD},
new Object[] {objectMapper.writeValueAsString(task), true});
} else {
throw new IllegalArgumentException(
String.format(
"Cannot archive task: %s of workflow: %s with status: %s",
task.getTaskId(), workflow.getWorkflowId(), task.getStatus()));
}
} else {
// Not archiving, remove task from index
indexDAO.asyncRemoveTask(workflow.getWorkflowId(), task.getTaskId());
}
}

public void extendLease(TaskModel taskModel) {
taskModel.setUpdateTime(System.currentTimeMillis());
executionDAO.updateTask(taskModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,9 @@ public String startWorkflow(

// It's possible the remove workflow call hits an exception as well, in that case we
// want to log both errors to help diagnosis.
// For now the tasks are not removed or archived.
try {
executionDAOFacade.removeWorkflow(workflowId, false);
executionDAOFacade.removeWorkflow(workflowId, false, false);
} catch (Exception rwe) {
LOGGER.error("Could not remove the workflowId: " + workflowId, rwe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ public CompletableFuture<Void> asyncUpdateWorkflow(
return CompletableFuture.completedFuture(null);
}

@Override
public void removeTask(String workflowId, String taskId) {}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
return CompletableFuture.completedFuture(null);
}

@Override
public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {}

@Override
public CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values) {
return CompletableFuture.completedFuture(null);
}

@Override
public String get(String workflowInstanceId, String key) {
return null;
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/IndexDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,45 @@ SearchResult<String> searchTasks(
CompletableFuture<Void> asyncUpdateWorkflow(
String workflowInstanceId, String[] keys, Object[] values);

/**
* Remove the task index
*
* @param workflowId workflow containing task
* @param taskId task to be removed
*/
void removeTask(String workflowId, String taskId);

/**
* Remove the task index asynchronously
*
* @param workflowId workflow containing task
* @param taskId task to be removed
* @return CompletableFuture of type void
*/
CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId);

/**
* Updates the index
*
* @param workflowId id of the workflow
* @param taskId id of the task
* @param keys keys to be updated
* @param values values. Number of keys and values MUST match.
*/
void updateTask(String workflowId, String taskId, String[] keys, Object[] values);

/**
* Updates the index
*
* @param workflowId id of the workflow
* @param taskId id of the task
* @param keys keys to be updated
* @param values values. Number of keys and values MUST match.
* @return CompletableFuture of type void
*/
CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values);

/**
* Retrieves a specific field from the index
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ public List<String> getRunningWorkflows(String workflowName, int version) {
return executionDAOFacade.getRunningWorkflowIds(workflowName, version);
}

public void removeWorkflow(String workflowId, boolean archiveWorkflow) {
executionDAOFacade.removeWorkflow(workflowId, archiveWorkflow);
public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) {
executionDAOFacade.removeWorkflow(workflowId, archiveWorkflow, removeTasks);
}

public SearchResult<WorkflowSummary> search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,14 @@ Workflow getExecutionStatus(
* Removes the workflow from the system.
*
* @param workflowId WorkflowID of the workflow you want to remove from system.
* @param archiveWorkflow Archives the workflow.
* @param archiveWorkflow Archives the workflow instead of removing it. Next to this if
* removeTasks is true, the tasks associated with the workflow will be archived as well.
* @param removeTasks Whether to remove the associated tasks from system.
*/
void deleteWorkflow(
@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId,
boolean archiveWorkflow);
boolean archiveWorkflow,
boolean removeTasks);

/**
* Retrieves all the running workflows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,12 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) {
* Removes the workflow from the system.
*
* @param workflowId WorkflowID of the workflow you want to remove from system.
* @param archiveWorkflow Archives the workflow.
* @param archiveWorkflow Archives the workflow instead of removing it. Next to this if
* removeTasks is true, the tasks associated with the workflow will be archived as well.
* @param removeTasks Whether to remove the associated tasks from system.
*/
public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
executionService.removeWorkflow(workflowId, archiveWorkflow);
public void deleteWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) {
executionService.removeWorkflow(workflowId, archiveWorkflow, removeTasks);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,65 @@ public void testGetWorkflowsByCorrelationId() {
}

@Test
public void testRemoveWorkflow() {
public void testRemoveWorkflowWithoutTasks() {
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("workflowId");
workflow.setStatus(WorkflowModel.Status.COMPLETED);

TaskModel task = new TaskModel();
task.setTaskId("taskId");
workflow.setTasks(Collections.singletonList(task));

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", false, false);
verify(indexDAO, never()).updateWorkflow(anyString(), any(), any());
verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString());
verify(indexDAO, never()).asyncRemoveTask(anyString(), anyString());
}

@Test
public void testRemoveWorkflowWithTasks() {
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("workflowId");
workflow.setStatus(WorkflowModel.Status.COMPLETED);

TaskModel task = new TaskModel();
task.setTaskId("taskId");
workflow.setTasks(Collections.singletonList(task));

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", false, true);
verify(indexDAO, never()).updateWorkflow(anyString(), any(), any());
verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString());
verify(indexDAO, times(1)).asyncRemoveTask(anyString(), anyString());
}

@Test
public void testArchiveWorkflowWithoutTasks() throws Exception {
InputStream stream = TestDeciderService.class.getResourceAsStream("/completed.json");
WorkflowModel workflow = objectMapper.readValue(stream, WorkflowModel.class);

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", false);
verify(indexDAO, never()).updateWorkflow(any(), any(), any());
verify(indexDAO, times(1)).asyncRemoveWorkflow(workflow.getWorkflowId());
executionDAOFacade.removeWorkflow("workflowId", true, false);
verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any());
verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, never()).removeWorkflow(anyString());
verify(indexDAO, never()).removeTask(anyString(), anyString());
}

@Test
public void testArchiveWorkflow() throws Exception {
public void testArchiveWorkflowWithTasks() throws Exception {
InputStream stream = TestDeciderService.class.getResourceAsStream("/completed.json");
WorkflowModel workflow = objectMapper.readValue(stream, WorkflowModel.class);

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", true);
verify(indexDAO, times(1)).updateWorkflow(any(), any(), any());
verify(indexDAO, never()).removeWorkflow(any());
executionDAOFacade.removeWorkflow("workflowId", true, true);
verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any());
verify(indexDAO, times(15)).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, never()).removeWorkflow(anyString());
verify(indexDAO, never()).removeTask(anyString(), anyString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@
import static com.netflix.conductor.TestUtils.getConstraintViolationMessages;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -244,15 +238,33 @@ public void testNotFoundExceptionGetExecutionStatus() {
}

@Test
public void testDeleteWorkflow() {
workflowService.deleteWorkflow("w123", true);
verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean());
public void testDeleteWorkflowWithoutTask() {
workflowService.deleteWorkflow("w123", true, false);
verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean(), eq(false));
}

@Test
public void testDeleteWorkflowWithTask() {
workflowService.deleteWorkflow("w123", true, true);
verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean(), eq(true));
}

@Test(expected = ConstraintViolationException.class)
public void testInvalidDeleteWorkflowWithoutTask() {
try {
workflowService.deleteWorkflow(null, true, false);
} catch (ConstraintViolationException ex) {
assertEquals(1, ex.getConstraintViolations().size());
Set<String> messages = getConstraintViolationMessages(ex.getConstraintViolations());
assertTrue(messages.contains("WorkflowId cannot be null or empty."));
throw ex;
}
}

@Test(expected = ConstraintViolationException.class)
public void testInvalidDeleteWorkflow() {
public void testInvalidDeleteWorkflowWithTask() {
try {
workflowService.deleteWorkflow(null, true);
workflowService.deleteWorkflow(null, true, true);
} catch (ConstraintViolationException ex) {
assertEquals(1, ex.getConstraintViolations().size());
Set<String> messages = getConstraintViolationMessages(ex.getConstraintViolations());
Expand Down
Loading