Skip to content

Commit

Permalink
fix(core): ForEach failed with errors, finally and concurrency = 0
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 16, 2025
1 parent 979c131 commit 246602f
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 6 deletions.
14 changes: 14 additions & 0 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,19 @@ public static List<NextTaskRun> resolveConcurrentNexts(
parentTaskRun
);

boolean isTasks = tasks.equals(allTasks);

// errors & finally must be run as sequential tasks
if (!isTasks) {
return resolveSequentialNexts(
execution,
tasks,
errors,
_finally,
parentTaskRun
);
}

// all tasks run
List<TaskRun> taskRuns = execution.findTaskRunByTasks(allTasks, parentTaskRun);

Expand All @@ -260,6 +273,7 @@ public static List<NextTaskRun> resolveConcurrentNexts(
Map<String, List<ResolvedTask>> collect = allTasks
.stream()
.collect(Collectors.groupingBy(ResolvedTask::getValue, LinkedHashMap::new, Collectors.toList()));

return collect.values().stream()
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/plugin/core/flow/ForEach.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String>
GraphUtils.parallel(
subGraph,
this.getTasks(),
this.getErrors(),
this.getFinally(),
this.errors,
this._finally,
taskRun,
execution
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ void parallelWithErrors() throws QueueException, TimeoutException {
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

// @FIXME
@Disabled("ForEach is not working with errors neither finally")
@Test
@LoadFlows({"flows/valids/finally-foreach.yaml"})
void forEachWithoutErrors() throws QueueException, TimeoutException {
Expand All @@ -200,8 +198,6 @@ void forEachWithoutErrors() throws QueueException, TimeoutException {
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

// @FIXME
@Disabled("ForEach is not working with errors neither finally")
@Test
@LoadFlows({"flows/valids/finally-foreach.yaml"})
void forEachWithErrors() throws QueueException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,13 @@ void disabledTasks(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(1));
}

@Test
@ExecuteFlow("flows/valids/foreach-error.yaml")
void errors(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(6));
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}
}
20 changes: 20 additions & 0 deletions core/src/test/resources/flows/valids/foreach-error.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
id: each-error
namespace: io.kestra.tests

tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
values: '["1", "2", "3"]'
concurrencyLimit: 0
tasks:
- id: ko
type: io.kestra.plugin.core.execution.Fail

errors:
- id: e1
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"

- id: e2
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"

0 comments on commit 246602f

Please sign in to comment.