From de2ca100f7aefa2a2d2c3041139fad089f56a734 Mon Sep 17 00:00:00 2001 From: Bert Verstraete Date: Sun, 12 Nov 2023 09:51:54 +0100 Subject: [PATCH] Fix WorkflowRepairService and AsyncSystemTaskExecutor race condition (#3836) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue Summary: There's a race condition in the system involving async system tasks and the WorkflowRepairService. For example, when a SUB_WORKFLOW task starts, the WorkflowRepairService sometimes erroneously reinserts the task into the processing queue because it perceives the task as out-of-sync between the ExecutorDAO and the queueDAO. This issue stems from the AsyncSystemTaskExecutor updating a task's status only after it removes it from the queue, creating a window where the WorkflowRepairService can wrongly assess the task state. This leads to duplicate subworkflows/http/… tasks being executed concurrently, which complicates maintaining idempotency of Tasks. Proposed Solution: To resolve the issue, it's suggested that the AsyncSystemTaskExecutor should update the status of tasks before removing them from the queue. This should close the window where the WorkflowRepairService can misidentify the task state and prevent unnecessary re-queuing of tasks. An edge case we’ve considered is if the process crashes after the task is updated but before it's removed from the queue. If that happens, the executor will simply remove the task from the queue the next time it runs, thereby not affecting system correctness. Co-authored-by: Jaim Silva --- .../core/execution/AsyncSystemTaskExecutor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java index dd7975425f..331a2b4ac9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -110,6 +110,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { } boolean hasTaskExecutionCompleted = false; + boolean shouldRemoveTaskFromQueue = false; String workflowId = task.getWorkflowInstanceId(); // if we are here the Task object is updated and needs to be persisted regardless of an // exception @@ -130,7 +131,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { String.format( "Workflow is in %s state", workflow.getStatus().toString())); } - queueDAO.remove(queueName, task.getTaskId()); + shouldRemoveTaskFromQueue = true; return; } @@ -156,13 +157,12 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { // Update message in Task queue based on Task status // Remove asyncComplete system tasks from the queue that are not in SCHEDULED state if (isTaskAsyncComplete && task.getStatus() != TaskModel.Status.SCHEDULED) { - queueDAO.remove(queueName, task.getTaskId()); + shouldRemoveTaskFromQueue = true; hasTaskExecutionCompleted = true; } else if (task.getStatus().isTerminal()) { task.setEndTime(System.currentTimeMillis()); - queueDAO.remove(queueName, task.getTaskId()); + shouldRemoveTaskFromQueue = true; hasTaskExecutionCompleted = true; - LOGGER.debug("{} removed from queue: {}", task, queueName); } else { task.setCallbackAfterSeconds(systemTaskCallbackTime); systemTask @@ -188,6 +188,10 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e); } finally { executionDAOFacade.updateTask(task); + if (shouldRemoveTaskFromQueue) { + queueDAO.remove(queueName, task.getTaskId()); + LOGGER.debug("{} removed from queue: {}", task, queueName); + } // if the current task execution has completed, then the workflow needs to be evaluated if (hasTaskExecutionCompleted) { workflowExecutor.decide(workflowId);