Skip to content

Commit

Permalink
feat(core, jdbc): change the state of a subflow restart parent execution
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 27, 2025
1 parent 4e3ed33 commit 7cf4955
Show file tree
Hide file tree
Showing 27 changed files with 421 additions and 368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface QueueFactoryInterface {
String TRIGGER_NAMED = "triggerQueue";
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";

QueueInterface<Execution> execution();

Expand Down Expand Up @@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
WorkerTriggerResultQueueInterface workerTriggerResultQueue();

QueueInterface<SubflowExecutionResult> subflowExecutionResult();

QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
}
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public String key(Object object) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ public static <T extends Task & ExecutableTask<?>> Optional<SubflowExecution<?>>
"namespace", currentFlow.getNamespace(),
"flowId", currentFlow.getId(),
"flowRevision", currentFlow.getRevision(),
"taskRunId", currentTaskRun.getId()
"taskRunId", currentTaskRun.getId(),
"taskId", currentTaskRun.getTaskId()
));
if (currentTaskRun.getOutputs() != null) {
variables.put("taskRunOutputs", currentTaskRun.getOutputs());
}
if (currentTaskRun.getValue() != null) {
variables.put("taskRunValue", currentTaskRun.getValue());
}
Expand Down Expand Up @@ -278,4 +282,27 @@ private static State.Type findTerminalState(Map<String, Integer> iterations, boo
}
return State.Type.SUCCESS;
}

public static SubflowExecutionResult subflowExecutionResultFromChildExecution(RunContext runContext, Flow flow, Execution execution, ExecutableTask<?> executableTask, TaskRun taskRun) {
try {
return executableTask
.createSubflowExecutionResult(runContext, taskRun, flow, execution)
.orElse(null);
} catch (Exception e) {
log.error("Unable to create the Subflow Execution Result", e);
// we return a fail subflow execution result to end the flow
return SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun.withState(State.Type.FAILED).withAttempts(List.of(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build())))
.build();
}
}

public static boolean isSubflow(Execution execution) {
return execution.getTrigger() != null && (
"io.kestra.plugin.core.flow.Subflow".equals(execution.getTrigger().getType()) ||
"io.kestra.plugin.core.flow.ForEachItem$ForEachItemExecutable".equals(execution.getTrigger().getType())
);
}
}
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class Executor {
private final List<WorkerTrigger> workerTriggers = new ArrayList<>();
private WorkerJob workerJobToResubmit;
private State.Type originalState;
private SubflowExecutionEnd subflowExecutionEnd;
private SubflowExecutionEnd joinedSubflowExecutionEnd;

/**
* The sequence id should be incremented each time the execution is persisted after mutation.
Expand Down Expand Up @@ -67,6 +69,10 @@ public Executor(SubflowExecutionResult subflowExecutionResult) {
this.joinedSubflowExecutionResult = subflowExecutionResult;
}

public Executor(SubflowExecutionEnd subflowExecutionEnd) {
this.joinedSubflowExecutionEnd = subflowExecutionEnd;
}

public Executor(WorkerJob workerJob) {
this.workerJobToResubmit = workerJob;
}
Expand Down Expand Up @@ -169,6 +175,11 @@ public Executor withExecutionKilled(final List<ExecutionKilledExecution> executi
return this;
}

public Executor withSubflowExecutionEnd(SubflowExecutionEnd subflowExecutionEnd) {
this.subflowExecutionEnd = subflowExecutionEnd;
return this;
}

public Executor serialize() {
return new Executor(
this.execution,
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.tasks.*;
Expand Down Expand Up @@ -873,17 +874,21 @@ private Executor handleExecutableTask(final Executor executor) {
);
} else {
executions.addAll(subflowExecutions);
if (!executableTask.waitForExecution()) {
// send immediately all workerTaskResult to ends the executable task
Optional<FlowWithSource> flow = flowExecutorInterface.findByExecution(subflowExecutions.getFirst().getExecution());
if (flow.isPresent()) {
// add SubflowExecutionResults to notify parents
for (SubflowExecution<?> subflowExecution : subflowExecutions) {
Optional<SubflowExecutionResult> subflowExecutionResult = executableTask.createSubflowExecutionResult(
runContext,
subflowExecution.getParentTaskRun().withState(State.Type.SUCCESS),
executor.getFlow(),
// if we didn't wait for the execution, we directly set the state to SUCCESS
executableTask.waitForExecution() ? subflowExecution.getParentTaskRun() : subflowExecution.getParentTaskRun().withState(State.Type.SUCCESS),
flow.get(),
subflowExecution.getExecution()
);
subflowExecutionResult.ifPresent(subflowExecutionResults::add);
}
} else {
log.error("Unable to find flow for execution {}", subflowExecutions.getFirst().getExecution().getId());
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -1030,6 +1035,15 @@ public void log(Logger log, Boolean in, SubflowExecutionResult value) {
);
}

public void log(Logger log, Boolean in, SubflowExecutionEnd value) {
log.debug(
"{} {} : {}",
in ? "<< IN " : ">> OUT",
value.getClass().getSimpleName(),
value.toStringState()
);
}

public void log(Logger log, Boolean in, Execution value) {
log.debug(
"{} {} [key='{}']\n{}",
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/java/io/kestra/core/runners/SubflowExecutionEnd.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SubflowExecutionEnd {
private Execution childExecution;
private String parentExecutionId;
private String taskRunId;
private String taskId;
private State.Type state;
private Map<String, Object> outputs;

public String toStringState() {
return "SubflowExecutionEnd(" +
"childExecutionId=" + this.getChildExecution().getId() +
", parentExecutionId=" + this.getParentExecutionId() +
", taskId=" + this.getTaskId() +
", taskRunId=" + this.getTaskRunId() +
", state=" + this.getState().toString() +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junitpioneer.jupiter.RetryingTest;
Expand Down Expand Up @@ -90,6 +92,9 @@ public abstract class AbstractRunnerTest {
@Inject
private SLATestCase slaTestCase;

@Inject
private ChangeStateTestCase changeStateTestCase;

@Test
@ExecuteFlow("flows/valids/full.yaml")
void full(Execution execution) {
Expand Down Expand Up @@ -173,6 +178,18 @@ void restartMultiple() throws Exception {
restartCaseTest.restartMultiple();
}

@Test
@LoadFlows({"flows/valids/restart_always_failed.yaml"})
void restartFailedThenFailureWithGlobalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithGlobalErrors();
}

@RetryingTest(5)
@LoadFlows({"flows/valids/restart_local_errors.yaml"})
void restartFailedThenFailureWithLocalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithLocalErrors();
}

@Test
@LoadFlows({"flows/valids/restart-parent.yaml", "flows/valids/restart-child.yaml"})
void restartSubflow() throws Exception {
Expand Down Expand Up @@ -244,7 +261,7 @@ void flowWaitSuccess() throws Exception {
"flows/valids/task-flow.yaml",
"flows/valids/task-flow-inherited-labels.yaml"})
void flowWaitFailed() throws Exception {
flowCaseTest.waitFailed();
flowCaseTest.waitFailed();
}

@Test
Expand Down Expand Up @@ -342,6 +359,12 @@ protected void forEachItemSubflowOutputs() throws Exception {
forEachItemCaseTest.forEachItemWithSubflowOutputs();
}

@Test
@LoadFlows({"flows/valids/restart-for-each-item.yaml", "flows/valids/restart-child.yaml"})
void restartForEachItem() throws Exception {
forEachItemCaseTest.restartForEachItem();
}

@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception {
Expand Down Expand Up @@ -468,4 +491,16 @@ void multipleIf() throws TimeoutException, QueueException {
assertThat(execution.getTaskRunList(), hasSize(12));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}

@Test
@ExecuteFlow("flows/valids/failed-first.yaml")
public void changeStateShouldEndsInSuccess(Execution execution) throws Exception {
changeStateTestCase.changeStateShouldEndsInSuccess(execution);
}

@Test
@LoadFlows({"flows/valids/failed-first.yaml", "flows/valids/subflow-parent-of-failed.yaml"})
public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception {
changeStateTestCase.changeStateInSubflowShouldEndsParentFlowInSuccess();
}
}
113 changes: 113 additions & 0 deletions core/src/test/java/io/kestra/core/runners/ChangeStateTestCase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@Singleton
public class ChangeStateTestCase {
@Inject
private FlowRepositoryInterface flowRepository;

@Inject
private ExecutionService executionService;

@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;

@Inject
private RunnerUtils runnerUtils;

public void changeStateShouldEndsInSuccess(Execution execution) throws Exception {
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED));

// await for the last execution
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();
Flux<Execution> receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if (execution.getId().equals(exec.getId()) && exec.getState().getCurrent() == State.Type.SUCCESS) {
lastExecution.set(exec);
latch.countDown();
}
});

Flow flow = flowRepository.findByExecution(execution);
Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
executionQueue.emit(markedAs);

assertThat(latch.await(10, TimeUnit.SECONDS), is(true));
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(lastExecution.get().getTaskRunList(), hasSize(2));
assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception {
// await for the subflow execution
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();
Flux<Execution> receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if ("failed-first".equals(exec.getFlowId()) && exec.getState().getCurrent() == State.Type.FAILED) {
lastExecution.set(exec);
latch.countDown();
}
});

// run the parent flow
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "subflow-parent-of-failed");
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED));

// assert on the subflow
assertThat(latch.await(10, TimeUnit.SECONDS), is(true));
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent(), is(State.Type.FAILED));
assertThat(lastExecution.get().getTaskRunList(), hasSize(1));
assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED));

// await for the parent execution
CountDownLatch parentLatch = new CountDownLatch(1);
AtomicReference<Execution> lastParentExecution = new AtomicReference<>();
receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if (execution.getId().equals(exec.getId()) && exec.getState().isTerminated()) {
lastParentExecution.set(exec);
parentLatch.countDown();
}
});

// restart the subflow
Flow flow = flowRepository.findByExecution(lastExecution.get());
Execution markedAs = executionService.markAs(lastExecution.get(), flow, lastExecution.get().getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
executionQueue.emit(markedAs);

// assert for the parent flow
assertThat(parentLatch.await(10, TimeUnit.SECONDS), is(true));
receivedExecutions.blockLast();
assertThat(lastParentExecution.get().getState().getCurrent(), is(State.Type.FAILED)); // FIXME should be success but it's FAILED on unit tests
assertThat(lastParentExecution.get().getTaskRunList(), hasSize(1));
assertThat(lastParentExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,16 @@ void shouldKillPausedExecutions() throws Exception {
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent(), is(State.Type.KILLED));
assertThat(killed.getState().getHistories(), hasSize(4));
}

@Test
@ExecuteFlow("flows/valids/failed-first.yaml")
void shouldRestartAfterChangeTaskState(Execution execution) throws Exception {
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED));

Flow flow = flowRepository.findByExecution(execution);
Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
assertThat(markedAs.getState().getCurrent(), is(State.Type.RESTARTED));
}
}
Loading

0 comments on commit 7cf4955

Please sign in to comment.