Skip to content

Commit

Permalink
fix(core): subflow labels must not be overriden by parent flow ones
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 31, 2025
1 parent ceda5eb commit d12dd17
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
13 changes: 12 additions & 1 deletion core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.Storage;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.kestra.core.trace.propagation.ExecutionTextMapSetter;
import io.opentelemetry.api.OpenTelemetry;
Expand Down Expand Up @@ -153,7 +154,7 @@ public static <T extends Task & ExecutableTask<?>> Optional<SubflowExecution<?>>
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}

List<Label> newLabels = inheritLabels ? new ArrayList<>(currentExecution.getLabels()) : new ArrayList<>(systemLabels(currentExecution));
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
if (labels != null) {
labels.forEach(throwConsumer(label -> newLabels.add(new Label(runContext.render(label.key()), runContext.render(label.value())))));
}
Expand Down Expand Up @@ -201,6 +202,16 @@ public static <T extends Task & ExecutableTask<?>> Optional<SubflowExecution<?>>
}));
}

private static List<Label> filterLabels(List<Label> labels, Flow flow) {
if (ListUtils.isEmpty(flow.getLabels())) {
return labels;
}

return labels.stream()
.filter(label -> flow.getLabels().stream().noneMatch(flowLabel -> flowLabel.key().equals(label.key())))
.toList();
}

private static List<Label> systemLabels(Execution execution) {
return Streams.of(execution.getLabels())
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))
Expand Down
11 changes: 7 additions & 4 deletions core/src/test/java/io/kestra/plugin/core/flow/FlowCaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,24 @@ void run(String input, State.Type fromState, State.Type triggerState, int count,
assertThat(triggered.get().getState().getCurrent(), is(triggerState));

if (testInherited) {
assertThat(triggered.get().getLabels().size(), is(5));
assertThat(triggered.get().getLabels().size(), is(6));
assertThat(triggered.get().getLabels(), hasItems(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("mainFlowExecutionLabel", "execFoo"),
new Label("mainFlowLabel", "flowFoo"),
new Label("launchTaskLabel", "launchFoo"),
new Label("switchFlowLabel", "switchFoo")
new Label("switchFlowLabel", "switchFoo"),
new Label("overriding", "child")
));
} else {
assertThat(triggered.get().getLabels().size(), is(3));
assertThat(triggered.get().getLabels().size(), is(4));
assertThat(triggered.get().getLabels(), hasItems(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("launchTaskLabel", "launchFoo"),
new Label("switchFlowLabel", "switchFoo")
new Label("switchFlowLabel", "switchFoo"),
new Label("overriding", "child")
));
assertThat(triggered.get().getLabels(), not(hasItems(new Label("inherited", "label"))));
}
}
}
1 change: 1 addition & 0 deletions core/src/test/resources/flows/valids/switch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ inputs:

labels:
switchFlowLabel: switchFoo
overriding: child

tasks:
- id: parent-seq
Expand Down
1 change: 1 addition & 0 deletions core/src/test/resources/flows/valids/task-flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ inputs:

labels:
mainFlowLabel: flowFoo
overriding: parent

tasks:
- id: launch
Expand Down

0 comments on commit d12dd17

Please sign in to comment.