diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index e9c326410b..6070741a29 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1027,6 +1027,32 @@ public WorkflowModel decide(String workflowId) { } } + /** + * This method overloads the {@link #decide(String)}. It will acquire a lock and evaluate the + * state of the workflow. + * + * @param workflow the workflow to evaluate the state for + * @return the workflow + */ + public WorkflowModel decideWithLock(WorkflowModel workflow) { + if (workflow == null) { + return null; + } + StopWatch watch = new StopWatch(); + watch.start(); + if (!executionLockService.acquireLock(workflow.getWorkflowId())) { + return null; + } + try { + return decide(workflow); + + } finally { + executionLockService.releaseLock(workflow.getWorkflowId()); + watch.stop(); + Monitors.recordWorkflowDecisionTime(watch.getTime()); + } + } + /** * @param workflow the workflow to evaluate the state for * @return true if the workflow has completed (success or failed), false otherwise. Note: This diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java index ad47595232..49ca8a632d 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java @@ -109,6 +109,11 @@ public void verifyAndRepairWorkflowTasks(String workflowId) { () -> new NotFoundException( "Could not find workflow: " + workflowId)); + verifyAndRepairWorkflowTasks(workflow); + } + + /** Verify and repair tasks in a workflow. */ + public void verifyAndRepairWorkflowTasks(WorkflowModel workflow) { workflow.getTasks().forEach(this::verifyAndRepairTask); // repair the parent workflow if needed verifyAndRepairWorkflow(workflow.getParentWorkflowId()); diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 46e8ca6f55..cd4b46be75 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -28,6 +28,7 @@ import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.core.WorkflowContext; import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.dao.QueueDAO; @@ -48,6 +49,7 @@ public class WorkflowSweeper { private final WorkflowExecutor workflowExecutor; private final WorkflowRepairService workflowRepairService; private final QueueDAO queueDAO; + private final ExecutionDAOFacade executionDAOFacade; private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName(); @@ -56,10 +58,12 @@ public WorkflowSweeper( WorkflowExecutor workflowExecutor, Optional workflowRepairService, ConductorProperties properties, - QueueDAO queueDAO) { + QueueDAO queueDAO, + ExecutionDAOFacade executionDAOFacade) { this.properties = properties; this.queueDAO = queueDAO; this.workflowExecutor = workflowExecutor; + this.executionDAOFacade = executionDAOFacade; this.workflowRepairService = workflowRepairService.orElse(null); LOGGER.info("WorkflowSweeper initialized."); } @@ -77,12 +81,14 @@ public void sweep(String workflowId) { WorkflowContext.set(workflowContext); LOGGER.debug("Running sweeper for workflow {}", workflowId); + workflow = executionDAOFacade.getWorkflowModel(workflowId, true); + if (workflowRepairService != null) { // Verify and repair tasks in the workflow. - workflowRepairService.verifyAndRepairWorkflowTasks(workflowId); + workflowRepairService.verifyAndRepairWorkflowTasks(workflow); } - workflow = workflowExecutor.decide(workflowId); + workflow = workflowExecutor.decideWithLock(workflow); if (workflow != null && workflow.getStatus().isTerminal()) { queueDAO.remove(DECIDER_QUEUE, workflowId); return; diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 1ed45dd7bd..7951d5e923 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -23,6 +23,7 @@ import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.model.TaskModel; @@ -42,6 +43,7 @@ public class TestWorkflowSweeper { private WorkflowExecutor workflowExecutor; private WorkflowRepairService workflowRepairService; private QueueDAO queueDAO; + private ExecutionDAOFacade executionDAOFacade; private WorkflowSweeper workflowSweeper; private int defaultPostPoneOffSetSeconds = 1800; @@ -52,9 +54,14 @@ public void setUp() { workflowExecutor = mock(WorkflowExecutor.class); queueDAO = mock(QueueDAO.class); workflowRepairService = mock(WorkflowRepairService.class); + executionDAOFacade = mock(ExecutionDAOFacade.class); workflowSweeper = new WorkflowSweeper( - workflowExecutor, Optional.of(workflowRepairService), properties, queueDAO); + workflowExecutor, + Optional.of(workflowRepairService), + properties, + queueDAO, + executionDAOFacade); } @Test