Skip to content

Commit

Permalink
fix(core): ForEach failed with errors, finally and concurrency = 0
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 14, 2025
1 parent 0c0ff37 commit ca60638
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 79 deletions.
71 changes: 0 additions & 71 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,77 +217,6 @@ public static List<NextTaskRun> resolveParallelNexts(
);
}

/**
* resolveConcurrentNexts will resolve concurrent values
* For both concurrent vales and subtasks, see resolveParallelNexts()
*/
public static List<NextTaskRun> resolveConcurrentNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
Integer concurrency
) {
if (execution.getState().getCurrent() == State.Type.KILLING) {
return Collections.emptyList();
}

List<ResolvedTask> allTasks = execution.findTaskDependingFlowState(
tasks,
errors,
_finally,
parentTaskRun
);

// all tasks run
List<TaskRun> taskRuns = execution.findTaskRunByTasks(allTasks, parentTaskRun);

// find all non-terminated
long nonTerminatedCount = taskRuns
.stream()
.filter(taskRun -> !taskRun.getState().isTerminated())
.count();

if (concurrency > 0 && nonTerminatedCount >= concurrency) {
return Collections.emptyList();
}

long concurrencySlots = concurrency == 0 ? Integer.MAX_VALUE : concurrency - nonTerminatedCount;

// first one
if (taskRuns.isEmpty()) {
Map<String, List<ResolvedTask>> collect = allTasks
.stream()
.collect(Collectors.groupingBy(ResolvedTask::getValue, LinkedHashMap::new, Collectors.toList()));
return collect.values().stream()
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
.toList()
.reversed();
}

// start as many tasks as we have concurrency slots
Map<String, List<ResolvedTask>> collect = allTasks
.stream()
.collect(Collectors.groupingBy(ResolvedTask::getValue, LinkedHashMap::new, Collectors.toList()));

return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
.toList();
}

private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
return tasks.stream()
.filter(resolvedTask -> taskRuns.stream()
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
)
.toList();
}

public static List<NextTaskRun> resolveDagNexts(
Execution execution,
List<ResolvedTask> tasks,
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/flow/ForEach.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String>
GraphUtils.parallel(
subGraph,
this.getTasks(),
this.getErrors(),
this.getFinally(),
this.errors,
this._finally,
taskRun,
execution
);
Expand Down Expand Up @@ -224,14 +224,14 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
if (this.concurrencyLimit == 1) {
return FlowableUtils.resolveSequentialNexts(
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.values),
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun
);
}

return FlowableUtils.resolveConcurrentNexts(
return FlowableUtils.resolveParallelNexts(
execution,
FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.values),
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ void parallelWithErrors() throws QueueException, TimeoutException {
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

// @FIXME
@Disabled("ForEach is not working with errors neither finally")
@Test
@LoadFlows({"flows/valids/finally-foreach.yaml"})
void forEachWithoutErrors() throws QueueException, TimeoutException {
Expand All @@ -188,8 +186,6 @@ void forEachWithoutErrors() throws QueueException, TimeoutException {
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

// @FIXME
@Disabled("ForEach is not working with errors neither finally")
@Test
@LoadFlows({"flows/valids/finally-foreach.yaml"})
void forEachWithErrors() throws QueueException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,13 @@ void disabledTasks(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(1));
}

@Test
@ExecuteFlow("flows/valids/foreach-error.yaml")
void errors(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(6));
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}
}
20 changes: 20 additions & 0 deletions core/src/test/resources/flows/valids/foreach-error.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
id: each-error
namespace: io.kestra.tests

tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
values: '["1", "2", "3"]'
concurrencyLimit: 0
tasks:
- id: ko
type: io.kestra.plugin.core.execution.Fail

errors:
- id: e1
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"

- id: e2
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"

0 comments on commit ca60638

Please sign in to comment.