Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate possible fields from plugin.core.flow to dynamic properties #6837

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
}

@Override
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, Flow flow, Execution execution) {
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, Flow flow, Execution execution) throws IllegalVariableEvaluationException {
return subflowTask.createSubflowExecutionResult(runContext, taskRun, flow, execution);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.models.tasks;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
Expand Down Expand Up @@ -31,7 +32,7 @@ List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext,
TaskRun taskRun,
Flow flow,
Execution execution);
Execution execution) throws IllegalVariableEvaluationException;

/**
* Whether to wait for the execution(s) of the subflow before terminating this tasks
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/flow/Dag.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -93,8 +94,7 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
title = "Number of concurrent parallel tasks that can be running at any point in time.",
description = "If the value is `0`, no concurrency limit exists for the tasks in a DAG and all tasks that can run in parallel will start at the same time."
)
@PluginProperty
private final Integer concurrent = 0;
private final Property<Integer> concurrent = Property.of(0);

@Valid
@NotEmpty
Expand Down Expand Up @@ -171,7 +171,7 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun,
this.concurrent,
runContext.render(this.concurrent).as(Integer.class).orElseThrow(),
this.tasks
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.VoidOutput;
Expand Down Expand Up @@ -128,8 +129,7 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
title = "Number of concurrent parallel tasks that can be running at any point in time.",
description = "If the value is `0`, no limit exist and all the tasks will start at the same time."
)
@PluginProperty
private final Integer concurrent = 0;
private final Property<Integer> concurrent = Property.of(0);

@NotNull
@PluginProperty(dynamic = true)
Expand Down Expand Up @@ -191,7 +191,7 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun,
this.concurrent
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
);
}
}
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/flow/If.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -69,12 +70,11 @@
aliases = "io.kestra.core.tasks.flows.If"
)
public class If extends Task implements FlowableTask<If.Output> {
@PluginProperty(dynamic = true)
@Schema(
title = "The `If` condition which can be any expression that evaluates to a boolean value.",
description = "Boolean coercion allows 0, -0, null and '' to evaluate to false, all other values will evaluate to true."
)
private String condition;
private Property<String> condition;

@Valid
@PluginProperty
Expand Down Expand Up @@ -205,7 +205,7 @@ public If.Output outputs(RunContext runContext) throws Exception {
}

private Boolean isTrue(RunContext runContext) throws IllegalVariableEvaluationException {
String rendered = runContext.render(condition);
String rendered = runContext.render(condition).as(String.class).orElse(null);
return TruthUtils.isTruthy(rendered);
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/flow/Parallel.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -71,8 +72,7 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
title = "Number of concurrent parallel tasks that can be running at any point in time.",
description = "If the value is `0`, no limit exist and all tasks will start at the same time."
)
@PluginProperty
private final Integer concurrent = 0;
private final Property<Integer> concurrent = Property.of(0);

@Valid
@PluginProperty
Expand Down Expand Up @@ -134,7 +134,7 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun,
this.concurrent
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
);
}
}
9 changes: 5 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/flow/Sleep.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
Expand Down Expand Up @@ -42,15 +43,15 @@ public class Sleep extends Task implements RunnableTask<VoidOutput> {
title = "Duration to sleep",
description = "The time duration in ISO-8601 format (e.g., `PT5S` for 5 seconds)."
)
@PluginProperty
@NotNull
private Duration duration;
private Property<Duration> duration;

public VoidOutput run(RunContext runContext) throws Exception {
runContext.logger().info("Waiting for {}", duration);
var renderedDuration = runContext.render(duration).as(Duration.class).orElseThrow();
runContext.logger().info("Waiting for {}", renderedDuration);

// Wait for the specified duration
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
TimeUnit.MILLISECONDS.sleep(renderedDuration.toMillis());

return null;
}
Expand Down
21 changes: 10 additions & 11 deletions core/src/main/java/io/kestra/plugin/core/flow/Subflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
Expand Down Expand Up @@ -103,8 +104,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
@Schema(
title = "The inputs to pass to the subflow to be executed."
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
private Property<Map<String, Object>> inputs;

@Schema(
title = "The labels to pass to the subflow to be executed.",
Expand All @@ -127,16 +127,14 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
title = "Whether to fail the current execution if the subflow execution fails or is killed.",
description = "Note that this option works only if `wait` is set to `true`."
)
@PluginProperty
private final Boolean transmitFailed = true;
private final Property<Boolean> transmitFailed = Property.of(true);

@Builder.Default
@Schema(
title = "Whether the subflow should inherit labels from this execution that triggered it.",
description = "By default, labels are not passed to the subflow execution. If you set this option to `true`, the child flow execution will inherit all labels from the parent execution."
)
@PluginProperty
private final Boolean inheritLabels = false;
private final Property<Boolean> inheritLabels = Property.of(false);

/**
* @deprecated Output value should now be defined part of the Flow definition.
Expand Down Expand Up @@ -173,8 +171,9 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
Execution currentExecution,
TaskRun currentTaskRun) throws InternalException {
Map<String, Object> inputs = new HashMap<>();
if (this.inputs != null) {
inputs.putAll(runContext.render(this.inputs));
var renderedInputs = runContext.render(this.inputs).asMap(String.class, Object.class);
if (!renderedInputs.isEmpty()) {
inputs.putAll(renderedInputs);
}

return ExecutableUtils.subflowExecution(
Expand All @@ -186,7 +185,7 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
currentTaskRun,
inputs,
labels,
inheritLabels,
runContext.render(inheritLabels).as(Boolean.class).orElseThrow(),
scheduleDate
)
.<List<SubflowExecution<?>>>map(subflowExecution -> List.of(subflowExecution))
Expand All @@ -199,7 +198,7 @@ public Optional<SubflowExecutionResult> createSubflowExecutionResult(
TaskRun taskRun,
io.kestra.core.models.flows.Flow flow,
Execution execution
) {
) throws IllegalVariableEvaluationException {
// we only create a worker task result when the execution is terminated
if (!taskRun.getState().isTerminated()) {
return Optional.empty();
Expand Down Expand Up @@ -250,7 +249,7 @@ public Optional<SubflowExecutionResult> createSubflowExecutionResult(

taskRun = taskRun.withOutputs(builder.build().toMap());

State.Type finalState = ExecutableUtils.guessState(execution, this.transmitFailed, this.isAllowFailure(), this.isAllowWarning());
State.Type finalState = ExecutableUtils.guessState(execution, runContext.render(this.transmitFailed).as(Boolean.class).orElseThrow(), this.isAllowFailure(), this.isAllowWarning());
if (taskRun.getState().getCurrent() != finalState) {
taskRun = taskRun.withState(finalState);
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/flow/Switch.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -95,8 +96,7 @@ public class Switch extends Task implements FlowableTask<Switch.Output> {
@Schema(
title = "The value to be evaluated."
)
@PluginProperty(dynamic = true)
private String value;
private Property<String> value;

// @FIXME: @Valid break on io.micronaut.validation.validator.DefaultValidator#cascadeToOne with "Cannot validate java.util.ArrayList"
// @Valid
Expand Down Expand Up @@ -124,7 +124,7 @@ public List<Task> getFinally() {
}

private String rendererValue(RunContext runContext) throws IllegalVariableEvaluationException {
return runContext.render(this.value);
return runContext.render(this.value).as(String.class).orElse(null);
}

@Override
Expand Down
30 changes: 14 additions & 16 deletions core/src/main/java/io/kestra/plugin/core/flow/WaitFor.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.kestra.core.models.hierarchies.AbstractGraph;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -89,19 +90,17 @@ public List<Task> getFinally() {
private List<Task> tasks;

@NotNull
@PluginProperty(dynamic = true)
@Schema(
title = "The condition expression that should evaluate to `true` or `false`.",
description = "Boolean coercion allows 0, -0, null and '' to evaluate to false; all other values will evaluate to true."
)
private String condition;
private Property<String> condition;

@Schema(
title = "If set to `true`, the task run will end in a failed state once the `maxIterations` or `maxDuration` are reached."
)
@Builder.Default
@PluginProperty
private Boolean failOnMaxReached = false;
private Property<Boolean> failOnMaxReached = Property.of(false);

@Schema(
title = "Check the frequency configuration."
Expand Down Expand Up @@ -158,16 +157,16 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution

public Instant nextExecutionDate(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (!this.reachedMaximums(runContext, execution, parentTaskRun, false)) {
String continueLoop = runContext.render(this.condition);
String continueLoop = runContext.render(this.condition).as(String.class).orElseThrow();
if (!TruthUtils.isTruthy(continueLoop)) {
return Instant.now().plus(this.checkFrequency.interval);
return Instant.now().plus(runContext.render(this.checkFrequency.interval).as(Duration.class).orElseThrow());
}
}

return null;
}

private boolean reachedMaximums(RunContext runContext, Execution execution, TaskRun parentTaskRun, Boolean printLog) {
private boolean reachedMaximums(RunContext runContext, Execution execution, TaskRun parentTaskRun, Boolean printLog) throws IllegalVariableEvaluationException {
Logger logger = runContext.logger();

if (!this.childTaskRunExecuted(execution, parentTaskRun)) {
Expand All @@ -177,14 +176,16 @@ private boolean reachedMaximums(RunContext runContext, Execution execution, Task
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
.map(outputs -> (Integer) outputs.get("iterationCount"))
.orElse(0);
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
if (this.checkFrequency.maxIterations != null &&
iterationCount != null &&
iterationCount > runContext.render(this.checkFrequency.maxIterations).as(Integer.class).orElseThrow()) {
if (printLog) {logger.warn("Max iterations reached");}
return true;
}

Instant creationDate = parentTaskRun.getState().getHistories().getFirst().getDate();
if (this.checkFrequency.maxDuration != null &&
creationDate != null && creationDate.plus(this.checkFrequency.maxDuration).isBefore(Instant.now())) {
creationDate != null && creationDate.plus(runContext.render(this.checkFrequency.maxDuration).as(Duration.class).orElseThrow()).isBefore(Instant.now())) {
if (printLog) {logger.warn("Max duration reached");}

return true;
Expand All @@ -200,7 +201,7 @@ public Optional<State.Type> resolveState(RunContext runContext, Execution execut
return Optional.empty();
}

if (childTaskExecuted && this.reachedMaximums(runContext, execution, parentTaskRun, true) && this.failOnMaxReached) {
if (childTaskExecuted && this.reachedMaximums(runContext, execution, parentTaskRun, true) && runContext.render(this.failOnMaxReached).as(Boolean.class).orElseThrow()) {
return Optional.of(State.Type.FAILED);
}

Expand Down Expand Up @@ -268,21 +269,18 @@ public static class CheckFrequency {
title = "Maximum count of iterations."
)
@Builder.Default
@PluginProperty
private Integer maxIterations = 100;
private Property<Integer> maxIterations = Property.of(100);

@Schema(
title = "Maximum duration of the task."
)
@Builder.Default
@PluginProperty
private Duration maxDuration = Duration.ofHours(1);
private Property<Duration> maxDuration = Property.of(Duration.ofHours(1));

@Schema(
title = "Interval between each iteration."
)
@Builder.Default
@PluginProperty
private Duration interval = Duration.ofSeconds(1);
private Property<Duration> interval = Property.of(Duration.ofSeconds(1));
}
}
Loading
Loading