Skip to content

Commit

Permalink
refactor: migrate package plugin.core.state to dynamic properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Jan 15, 2025
1 parent f217d33 commit 9b3e7db
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public Property(String expression) {
this.expression = expression;
}

public Property(Map<?, ?> map) {
try {
expression = MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Build a new Property object with a value already set.<br>
*
Expand Down
36 changes: 17 additions & 19 deletions core/src/main/java/io/kestra/plugin/core/state/AbstractState.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -28,37 +27,35 @@

public abstract class AbstractState extends Task {
private static final TypeReference<Map<String, Object>> TYPE_REFERENCE = new TypeReference<>() {};
public static final String TASKS_STATES = "tasks-states";

@Schema(
title = "The name of the state file."
)
@PluginProperty(dynamic = true)
@NotNull
@Builder.Default
protected String name = "default";
protected Property<String> name = Property.of("default");

@Schema(
title = "Share state for the current namespace.",
description = "By default, the state is isolated by namespace **and** flow, setting to `true` will allow to share the state between the **same** namespace"
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean namespace = false;
private final Property<Boolean> namespace = Property.of(false);

@Schema(
title = "Isolate the state with `taskrun.value`.",
description = "By default, the state will be isolated with `taskrun.value` (during iteration with each). Setting to `false` will allow using the same state for every run of the iteration."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean taskrunValue = true;
private final Property<Boolean> taskrunValue = Property.of(true);


protected Map<String, Object> get(RunContext runContext) throws IllegalVariableEvaluationException, IOException, ResourceExpiredException {
return JacksonMapper.ofJson(false).readValue(runContext.stateStore().getState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
), TYPE_REFERENCE);
}
Expand All @@ -75,9 +72,9 @@ protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<Str
Map<String, Object> merge = MapUtils.merge(current, runContext.render(map));

String key = runContext.stateStore().putState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext),
JacksonMapper.ofJson(false).writeValueAsBytes(merge)
);
Expand All @@ -87,14 +84,15 @@ protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<Str

protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
return runContext.stateStore().deleteState(
!this.namespace,
"tasks-states",
runContext.render(this.name),
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
);
}

private String taskRunValue(RunContext runContext) {
return this.taskrunValue ? runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
private String taskRunValue(RunContext runContext) throws IllegalVariableEvaluationException {
return Boolean.TRUE.equals(runContext.render(this.taskrunValue).as(Boolean.class).orElseThrow()) ?
runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
}
}
9 changes: 4 additions & 5 deletions core/src/main/java/io/kestra/plugin/core/state/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -45,17 +45,16 @@ public class Delete extends AbstractState implements RunnableTask<Delete.Output>
@Schema(
title = "Raise an error if the state is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnMissing = false;
private final Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {

boolean delete = this.delete(runContext);

if (errorOnMissing && !delete) {
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name) + "'");
if (Boolean.TRUE.equals(runContext.render(errorOnMissing).as(Boolean.class).orElseThrow()) && !delete) {
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name).as(String.class).orElseThrow() + "'");
}

return Output.builder()
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/state/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -46,9 +46,8 @@ public class Get extends AbstractState implements RunnableTask<Get.Output> {
@Schema(
title = "Raise an error if the state file is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnMissing = false;
private final Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -57,7 +56,7 @@ public Output run(RunContext runContext) throws Exception {
try {
data = this.get(runContext);
} catch (FileNotFoundException e) {
if (this.errorOnMissing) {
if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow())) {
throw e;
}

Expand Down
29 changes: 16 additions & 13 deletions core/src/main/java/io/kestra/plugin/core/state/Set.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand All @@ -20,14 +21,17 @@
@NoArgsConstructor
@Schema(
title = "Set a state in the state store.",
description = "Values will be merged: \n" +
"* If you provide a new key, the new key will be added.\n" +
"* If you provide an existing key, the previous key will be overwrite.\n" +
"\n" +
"::alert{type=\"warning\"}\n" +
"This method is not concurrency safe. If many executions for the same flow are concurrent, there is no guarantee on isolation on the value.\n" +
"The value can be overwritten by other executions.\n" +
"::\n"
description = """
Values will be merged:
* If you provide a new key, the new key will be added.
* If you provide an existing key, the previous key will be overwrite.
\s
::alert{type=\\"warning\\"}
This method is not concurrency safe. If many executions for the same flow are concurrent, there is no guarantee on isolation on the value.
The value can be overwritten by other executions.
::
"""
)
@Plugin(
examples = {
Expand Down Expand Up @@ -59,16 +63,15 @@ public class Set extends AbstractState implements RunnableTask<Set.Output> {
@Schema(
title = "The data to be stored in the state store."
)
@PluginProperty(dynamic = true, additionalProperties = Object.class)
private Map<String, Object> data;
private Property<Map<String, Object>> data;

@Override
public Output run(RunContext runContext) throws Exception {
Pair<String, Map<String, Object>> data = this.merge(runContext, runContext.render(this.data));
Pair<String, Map<String, Object>> dataRendered = this.merge(runContext, runContext.render(this.data).asMap(String.class, Object.class));

return Output.builder()
.count(data.getRight().size())
.key(data.getLeft().toString())
.count(dataRendered.getRight().size())
.key(dataRendered.getLeft())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.state;

import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
Expand Down Expand Up @@ -32,18 +33,18 @@ void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.namespace(true)
.data(Map.of(
.namespace(Property.of(true))
.data(Property.of(Map.of(
"john", "doe"
))
)))
.build();
Set.Output setOutput = set.run(runContextFlow1(set));

Check failure on line 41 in core/src/test/java/io/kestra/plugin/core/state/StateNamespaceTest.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

StateNamespaceTest.run()

jakarta.validation.ConstraintViolationException: type: must match "\p{javaJavaIdentifierStart}\p{javaJavaIdentifierPart}*(\.\p{javaJavaIdentifierStart}\p{javaJavaIdentifierPart}*)*"
Raw output
jakarta.validation.ConstraintViolationException: type: must match "\p{javaJavaIdentifierStart}\p{javaJavaIdentifierPart}*(\.\p{javaJavaIdentifierStart}\p{javaJavaIdentifierPart}*)*"
	at app//io.kestra.core.runners.RunContextProperty.validate(RunContextProperty.java:46)
	at app//io.kestra.core.runners.RunContextProperty.validate(RunContextProperty.java:52)
	at app//io.kestra.core.runners.RunContextProperty.asMap(RunContextProperty.java:146)
	at app//io.kestra.plugin.core.state.Set.run(Set.java:70)
	at app//io.kestra.plugin.core.state.StateNamespaceTest.run(StateNamespaceTest.java:41)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:580)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1596)
	at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1596)
assertThat(setOutput.getCount(), is(1));

Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.namespace(true)
.namespace(Property.of(true))
.build();
Get.Output getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount(), is(1));
Expand Down
19 changes: 10 additions & 9 deletions core/src/test/java/io/kestra/plugin/core/state/StateTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.state;

import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
Expand All @@ -24,7 +25,7 @@ class StateTest {
void run() throws Exception {
Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.type(Get.class.getName())
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, get, Map.of(
Expand All @@ -38,9 +39,9 @@ void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Map.of(
.data(new Property<>(Map.of(
"{{ inputs.key }}", "{{ inputs.inc }}"
))
)))
.build();
Set.Output setOutput = set.run(runContext);
assertThat(setOutput.getCount(), is(1));
Expand All @@ -56,10 +57,10 @@ void run() throws Exception {
set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Map.of(
.data(new Property<>(Map.of(
"{{ inputs.key }}", "2",
"test2", "3"
))
)))
.build();

setOutput = set.run(runContext);
Expand Down Expand Up @@ -100,8 +101,8 @@ void deleteThrow() {
Delete task = Delete.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.name(IdUtils.create())
.errorOnMissing(true)
.name(new Property<>(IdUtils.create()))
.errorOnMissing(Property.of(true))
.build();

assertThrows(FileNotFoundException.class, () -> {
Expand All @@ -114,8 +115,8 @@ void getThrow() {
Get task = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.name(IdUtils.create())
.errorOnMissing(true)
.name(new Property<>(IdUtils.create()))
.errorOnMissing(Property.of(true))
.build();

assertThrows(FileNotFoundException.class, () -> {
Expand Down

0 comments on commit 9b3e7db

Please sign in to comment.