Skip to content

Commit

Permalink
refactor: migrate package plugin.core.state to dynamic properties (#6755
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mgabelle authored Jan 15, 2025
1 parent b3b3a5b commit ca8837f
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
Expand Down Expand Up @@ -45,6 +44,14 @@ public Property(String expression) {
this.expression = expression;
}

public Property(Map<?, ?> map) {
try {
expression = MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Build a new Property object with a value already set.<br>
*
Expand Down
36 changes: 17 additions & 19 deletions core/src/main/java/io/kestra/plugin/core/state/AbstractState.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -28,37 +27,35 @@

public abstract class AbstractState extends Task {
private static final TypeReference<Map<String, Object>> TYPE_REFERENCE = new TypeReference<>() {};
public static final String TASKS_STATES = "tasks-states";

@Schema(
title = "The name of the state file."
)
@PluginProperty(dynamic = true)
@NotNull
@Builder.Default
protected String name = "default";
protected Property<String> name = Property.of("default");

@Schema(
title = "Share state for the current namespace.",
description = "By default, the state is isolated by namespace **and** flow, setting to `true` will allow to share the state between the **same** namespace"
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean namespace = false;
private final Property<Boolean> namespace = Property.of(false);

@Schema(
title = "Isolate the state with `taskrun.value`.",
description = "By default, the state will be isolated with `taskrun.value` (during iteration with each). Setting to `false` will allow using the same state for every run of the iteration."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean taskrunValue = true;
private final Property<Boolean> taskrunValue = Property.of(true);


protected Map<String, Object> get(RunContext runContext) throws IllegalVariableEvaluationException, IOException, ResourceExpiredException {
return JacksonMapper.ofJson(false).readValue(runContext.stateStore().getState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
), TYPE_REFERENCE);
}
Expand All @@ -75,9 +72,9 @@ protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<Str
Map<String, Object> merge = MapUtils.merge(current, runContext.render(map));

String key = runContext.stateStore().putState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext),
JacksonMapper.ofJson(false).writeValueAsBytes(merge)
);
Expand All @@ -87,14 +84,15 @@ protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<Str

protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
return runContext.stateStore().deleteState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
);
}

private String taskRunValue(RunContext runContext) {
return this.taskrunValue ? runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
private String taskRunValue(RunContext runContext) throws IllegalVariableEvaluationException {
return Boolean.TRUE.equals(runContext.render(this.taskrunValue).as(Boolean.class).orElseThrow()) ?
runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
}
}
9 changes: 4 additions & 5 deletions core/src/main/java/io/kestra/plugin/core/state/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -45,17 +45,16 @@ public class Delete extends AbstractState implements RunnableTask<Delete.Output>
@Schema(
title = "Raise an error if the state is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnMissing = false;
private final Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {

boolean delete = this.delete(runContext);

if (errorOnMissing && !delete) {
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name) + "'");
if (Boolean.TRUE.equals(runContext.render(errorOnMissing).as(Boolean.class).orElseThrow()) && !delete) {
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name).as(String.class).orElseThrow() + "'");
}

return Output.builder()
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/state/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -46,9 +46,8 @@ public class Get extends AbstractState implements RunnableTask<Get.Output> {
@Schema(
title = "Raise an error if the state file is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnMissing = false;
private final Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -57,7 +56,7 @@ public Output run(RunContext runContext) throws Exception {
try {
data = this.get(runContext);
} catch (FileNotFoundException e) {
if (this.errorOnMissing) {
if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow())) {
throw e;
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/io/kestra/plugin/core/state/Set.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -59,16 +60,15 @@ public class Set extends AbstractState implements RunnableTask<Set.Output> {
@Schema(
title = "The data to be stored in the state store."
)
@PluginProperty(dynamic = true, additionalProperties = Object.class)
private Map<String, Object> data;
private Property<Map<String, Object>> data;

@Override
public Output run(RunContext runContext) throws Exception {
Pair<String, Map<String, Object>> data = this.merge(runContext, runContext.render(this.data));
Pair<String, Map<String, Object>> dataRendered = this.merge(runContext, runContext.render(this.data).asMap(String.class, Object.class));

return Output.builder()
.count(data.getRight().size())
.key(data.getLeft().toString())
.count(dataRendered.getRight().size())
.key(dataRendered.getLeft())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.state;

import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
Expand Down Expand Up @@ -31,27 +32,27 @@ private RunContext runContextFlow2(Task task) {
void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.namespace(true)
.data(Map.of(
.type(Set.class.getSimpleName())
.namespace(Property.of(true))
.data(Property.of(Map.of(
"john", "doe"
))
)))
.build();
Set.Output setOutput = set.run(runContextFlow1(set));
assertThat(setOutput.getCount(), is(1));

Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.namespace(true)
.type(Get.class.getSimpleName())
.namespace(Property.of(true))
.build();
Get.Output getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount(), is(1));
assertThat(getOutput.getData().get("john"), is("doe"));

get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.type(Get.class.getSimpleName())
.build();
getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount(), is(0));
Expand Down
23 changes: 12 additions & 11 deletions core/src/test/java/io/kestra/plugin/core/state/StateTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.state;

import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
Expand All @@ -24,7 +25,7 @@ class StateTest {
void run() throws Exception {
Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.type(Get.class.getName())
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, get, Map.of(
Expand All @@ -38,9 +39,9 @@ void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Map.of(
.data(new Property<>(Map.of(
"{{ inputs.key }}", "{{ inputs.inc }}"
))
)))
.build();
Set.Output setOutput = set.run(runContext);
assertThat(setOutput.getCount(), is(1));
Expand All @@ -56,10 +57,10 @@ void run() throws Exception {
set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Map.of(
.data(new Property<>(Map.of(
"{{ inputs.key }}", "2",
"test2", "3"
))
)))
.build();

setOutput = set.run(runContext);
Expand Down Expand Up @@ -99,9 +100,9 @@ void run() throws Exception {
void deleteThrow() {
Delete task = Delete.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.name(IdUtils.create())
.errorOnMissing(true)
.type(Get.class.getName())
.name(new Property<>(IdUtils.create()))
.errorOnMissing(Property.of(true))
.build();

assertThrows(FileNotFoundException.class, () -> {
Expand All @@ -113,9 +114,9 @@ void deleteThrow() {
void getThrow() {
Get task = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.name(IdUtils.create())
.errorOnMissing(true)
.type(Get.class.getName())
.name(new Property<>(IdUtils.create()))
.errorOnMissing(Property.of(true))
.build();

assertThrows(FileNotFoundException.class, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import java.time.DayOfWeek;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -240,7 +239,8 @@ void shouldNotReturnExecutionForBackFillWhenCurrentDateIsBeforeScheduleDate() th
}

@Test
void shouldReturnExecutionForBackFillWhenCurrentDateIsAfterScheduleDate() throws Exception {
void
shouldReturnExecutionForBackFillWhenCurrentDateIsAfterScheduleDate() throws Exception {
// Given
Schedule trigger = Schedule.builder().id("schedule").cron(TEST_CRON_EVERYDAY_AT_8).build();
ZonedDateTime now = ZonedDateTime.now();
Expand All @@ -260,15 +260,15 @@ void shouldReturnExecutionForBackFillWhenCurrentDateIsAfterScheduleDate() throws
}

@Test
void noBackfillNextDate() throws Exception {
void noBackfillNextDate() {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 * * *").build();
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(trigger), Optional.empty());

assertThat(next.getDayOfMonth(), is(ZonedDateTime.now().plusDays(1).getDayOfMonth()));
}

@Test
void noBackfillNextDateContext() throws Exception {
void noBackfillNextDateContext() {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 * * *").timezone("Europe/Paris").build();
ZonedDateTime date = ZonedDateTime.parse("2020-01-01T00:00:00+01:00[Europe/Paris]");
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(trigger), Optional.of(triggerContext(date, trigger)));
Expand Down Expand Up @@ -370,7 +370,7 @@ void impossibleNextConditions() throws Exception {
}

@Test
void lateMaximumDelay() throws Exception {
void lateMaximumDelay() {
Schedule trigger = Schedule.builder()
.id("schedule")
.cron("* * * * *")
Expand Down

0 comments on commit ca8837f

Please sign in to comment.