Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate package plugin.core.state to dynamic properties #6755

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
Expand Down Expand Up @@ -45,6 +44,14 @@ public Property(String expression) {
this.expression = expression;
}

public Property(Map<?, ?> map) {
try {
expression = MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Build a new Property object with a value already set.<br>
*
Expand Down
36 changes: 17 additions & 19 deletions core/src/main/java/io/kestra/plugin/core/state/AbstractState.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -28,37 +27,35 @@

public abstract class AbstractState extends Task {
private static final TypeReference<Map<String, Object>> TYPE_REFERENCE = new TypeReference<>() {};
public static final String TASKS_STATES = "tasks-states";

@Schema(
title = "The name of the state file."
)
@PluginProperty(dynamic = true)
@NotNull
@Builder.Default
protected String name = "default";
protected Property<String> name = Property.of("default");

@Schema(
title = "Share state for the current namespace.",
description = "By default, the state is isolated by namespace **and** flow, setting to `true` will allow to share the state between the **same** namespace"
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean namespace = false;
private final Property<Boolean> namespace = Property.of(false);

@Schema(
title = "Isolate the state with `taskrun.value`.",
description = "By default, the state will be isolated with `taskrun.value` (during iteration with each). Setting to `false` will allow using the same state for every run of the iteration."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean taskrunValue = true;
private final Property<Boolean> taskrunValue = Property.of(true);


protected Map<String, Object> get(RunContext runContext) throws IllegalVariableEvaluationException, IOException, ResourceExpiredException {
return JacksonMapper.ofJson(false).readValue(runContext.stateStore().getState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
), TYPE_REFERENCE);
}
Expand All @@ -75,9 +72,9 @@ protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<Str
Map<String, Object> merge = MapUtils.merge(current, runContext.render(map));

String key = runContext.stateStore().putState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext),
JacksonMapper.ofJson(false).writeValueAsBytes(merge)
);
Expand All @@ -87,14 +84,15 @@ protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<Str

protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
return runContext.stateStore().deleteState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
);
}

private String taskRunValue(RunContext runContext) {
return this.taskrunValue ? runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
private String taskRunValue(RunContext runContext) throws IllegalVariableEvaluationException {
return Boolean.TRUE.equals(runContext.render(this.taskrunValue).as(Boolean.class).orElseThrow()) ?
runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
}
}
9 changes: 4 additions & 5 deletions core/src/main/java/io/kestra/plugin/core/state/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -45,17 +45,16 @@ public class Delete extends AbstractState implements RunnableTask<Delete.Output>
@Schema(
title = "Raise an error if the state is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnMissing = false;
private final Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {

boolean delete = this.delete(runContext);

if (errorOnMissing && !delete) {
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name) + "'");
if (Boolean.TRUE.equals(runContext.render(errorOnMissing).as(Boolean.class).orElseThrow()) && !delete) {
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name).as(String.class).orElseThrow() + "'");
}

return Output.builder()
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/state/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -46,9 +46,8 @@ public class Get extends AbstractState implements RunnableTask<Get.Output> {
@Schema(
title = "Raise an error if the state file is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnMissing = false;
private final Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -57,7 +56,7 @@ public Output run(RunContext runContext) throws Exception {
try {
data = this.get(runContext);
} catch (FileNotFoundException e) {
if (this.errorOnMissing) {
if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow())) {
throw e;
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/io/kestra/plugin/core/state/Set.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -59,16 +60,15 @@ public class Set extends AbstractState implements RunnableTask<Set.Output> {
@Schema(
title = "The data to be stored in the state store."
)
@PluginProperty(dynamic = true, additionalProperties = Object.class)
private Map<String, Object> data;
private Property<Map<String, Object>> data;

@Override
public Output run(RunContext runContext) throws Exception {
Pair<String, Map<String, Object>> data = this.merge(runContext, runContext.render(this.data));
Pair<String, Map<String, Object>> dataRendered = this.merge(runContext, runContext.render(this.data).asMap(String.class, Object.class));

return Output.builder()
.count(data.getRight().size())
.key(data.getLeft().toString())
.count(dataRendered.getRight().size())
.key(dataRendered.getLeft())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.state;

import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
Expand Down Expand Up @@ -31,27 +32,27 @@ private RunContext runContextFlow2(Task task) {
void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.namespace(true)
.data(Map.of(
.type(Set.class.getSimpleName())
.namespace(Property.of(true))
.data(Property.of(Map.of(
"john", "doe"
))
)))
.build();
Set.Output setOutput = set.run(runContextFlow1(set));
assertThat(setOutput.getCount(), is(1));

Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.namespace(true)
.type(Get.class.getSimpleName())
.namespace(Property.of(true))
.build();
Get.Output getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount(), is(1));
assertThat(getOutput.getData().get("john"), is("doe"));

get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.type(Get.class.getSimpleName())
.build();
getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount(), is(0));
Expand Down
23 changes: 12 additions & 11 deletions core/src/test/java/io/kestra/plugin/core/state/StateTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.state;

import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
Expand All @@ -24,7 +25,7 @@ class StateTest {
void run() throws Exception {
Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.type(Get.class.getName())
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, get, Map.of(
Expand All @@ -38,9 +39,9 @@ void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Map.of(
.data(new Property<>(Map.of(
"{{ inputs.key }}", "{{ inputs.inc }}"
))
)))
.build();
Set.Output setOutput = set.run(runContext);
assertThat(setOutput.getCount(), is(1));
Expand All @@ -56,10 +57,10 @@ void run() throws Exception {
set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Map.of(
.data(new Property<>(Map.of(
"{{ inputs.key }}", "2",
"test2", "3"
))
)))
.build();

setOutput = set.run(runContext);
Expand Down Expand Up @@ -99,9 +100,9 @@ void run() throws Exception {
void deleteThrow() {
Delete task = Delete.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.name(IdUtils.create())
.errorOnMissing(true)
.type(Get.class.getName())
.name(new Property<>(IdUtils.create()))
.errorOnMissing(Property.of(true))
.build();

assertThrows(FileNotFoundException.class, () -> {
Expand All @@ -113,9 +114,9 @@ void deleteThrow() {
void getThrow() {
Get task = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.name(IdUtils.create())
.errorOnMissing(true)
.type(Get.class.getName())
.name(new Property<>(IdUtils.create()))
.errorOnMissing(Property.of(true))
.build();

assertThrows(FileNotFoundException.class, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.time.DayOfWeek;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -239,7 +238,8 @@ void shouldNotReturnExecutionForBackFillWhenCurrentDateIsBeforeScheduleDate() th
}

@Test
void shouldReturnExecutionForBackFillWhenCurrentDateIsAfterScheduleDate() throws Exception {
void
shouldReturnExecutionForBackFillWhenCurrentDateIsAfterScheduleDate() throws Exception {
// Given
Schedule trigger = Schedule.builder().id("schedule").cron(TEST_CRON_EVERYDAY_AT_8).build();
ZonedDateTime now = ZonedDateTime.now();
Expand All @@ -259,15 +259,15 @@ void shouldReturnExecutionForBackFillWhenCurrentDateIsAfterScheduleDate() throws
}

@Test
void noBackfillNextDate() throws Exception {
void noBackfillNextDate() {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 * * *").build();
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(trigger), Optional.empty());

assertThat(next.getDayOfMonth(), is(ZonedDateTime.now().plusDays(1).getDayOfMonth()));
}

@Test
void noBackfillNextDateContext() throws Exception {
void noBackfillNextDateContext() {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 * * *").timezone("Europe/Paris").build();
ZonedDateTime date = ZonedDateTime.parse("2020-01-01T00:00:00+01:00[Europe/Paris]");
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(trigger), Optional.of(triggerContext(date, trigger)));
Expand Down Expand Up @@ -369,7 +369,7 @@ void impossibleNextConditions() throws Exception {
}

@Test
void lateMaximumDelay() throws Exception {
void lateMaximumDelay() {
Schedule trigger = Schedule.builder()
.id("schedule")
.cron("* * * * *")
Expand Down
Loading