Skip to content

Commit

Permalink
refactor: migrate package plugin.core.kv to dynamic properties
Browse files Browse the repository at this point in the history
migrate Delete, Get, Set, GetKeys from KV to dynamic properties
  • Loading branch information
mgabelle committed Jan 13, 2025
1 parent 589638c commit d052a87
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 102 deletions.
17 changes: 7 additions & 10 deletions core/src/main/java/io/kestra/plugin/core/kv/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
Expand Down Expand Up @@ -46,36 +46,33 @@ public class Delete extends Task implements RunnableTask<Delete.Output> {
@Schema(
title = "The key for which to delete the value."
)
@PluginProperty(dynamic = true)
private String key;
private Property<String> key;

@NotNull
@Schema(
title = "The namespace on which to set the value."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String namespace = "{{ flow.namespace }}";
private Property<String> namespace = new Property<>("{{ flow.namespace }}");

@NotNull
@Schema(
title = "Whether to fail if there is no value for the given key."
)
@PluginProperty
@Builder.Default
private boolean errorOnMissing = false;
private Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {
String renderedNamespace = runContext.render(this.namespace);
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElseThrow();

FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace());

String renderedKey = runContext.render(this.key);
String renderedKey = runContext.render(this.key).as(String.class).orElseThrow();

boolean deleted = runContext.namespaceKv(renderedNamespace).delete(renderedKey);
if (this.errorOnMissing && !deleted) {
if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow()) && !deleted) {
throw new NoSuchElementException("No value found for key '" + renderedKey + "' in namespace '" + renderedNamespace + "' and `errorOnMissing` is set to true");
}

Expand Down
17 changes: 7 additions & 10 deletions core/src/main/java/io/kestra/plugin/core/kv/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
Expand Down Expand Up @@ -51,37 +51,34 @@ public class Get extends Task implements RunnableTask<Get.Output> {
@Schema(
title = "The key for which to get the value."
)
@PluginProperty(dynamic = true)
private String key;
private Property<String> key;

@NotNull
@Schema(
title = "The namespace on which to get the value."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String namespace = "{{ flow.namespace }}";
private Property<String> namespace = new Property<>("{{ flow.namespace }}");

@NotNull
@Schema(
title = "Whether to fail if there is no value for the given key."
)
@PluginProperty
@Builder.Default
private boolean errorOnMissing = false;
private Property<Boolean> errorOnMissing = Property.of(false);


@Override
public Output run(RunContext runContext) throws Exception {
String renderedNamespace = runContext.render(this.namespace);
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null);

FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace());

String renderedKey = runContext.render(this.key);
String renderedKey = runContext.render(this.key).as(String.class).orElse(null);

Optional<KVValue> maybeValue = runContext.namespaceKv(renderedNamespace).getValue(renderedKey);
if (this.errorOnMissing && maybeValue.isEmpty()) {
if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow()) && maybeValue.isEmpty()) {
throw new NoSuchElementException("No value found for key '" + renderedKey + "' in namespace '" + renderedNamespace + "' and `errorOnMissing` is set to true");
}

Expand Down
12 changes: 5 additions & 7 deletions core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
Expand Down Expand Up @@ -50,26 +50,24 @@ public class GetKeys extends Task implements RunnableTask<GetKeys.Output> {
@Schema(
title = "The key for which to get the value."
)
@PluginProperty(dynamic = true)
private String prefix;
private Property<String> prefix;

@NotNull
@Schema(
title = "The namespace on which to get the value."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String namespace = "{{ flow.namespace }}";
private Property<String> namespace = new Property<>("{{ flow.namespace }}");


@Override
public Output run(RunContext runContext) throws Exception {
String renderedNamespace = runContext.render(this.namespace);
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null);

FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace());

String renderedPrefix = runContext.render(this.prefix);
String renderedPrefix = runContext.render(this.prefix).as(String.class).orElse(null);
Predicate<String> filter = renderedPrefix == null ? key -> true : key -> key.startsWith(renderedPrefix);

List<String> keys = runContext.namespaceKv(renderedNamespace).list().stream()
Expand Down
39 changes: 18 additions & 21 deletions core/src/main/java/io/kestra/plugin/core/kv/Set.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.kv.KVType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
Expand Down Expand Up @@ -56,56 +56,50 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
@Schema(
title = "The key for which to set the value."
)
@PluginProperty(dynamic = true)
private String key;
private Property<String> key;

@NotNull
@Schema(
title = "The value to map to the key."
)
@PluginProperty(dynamic = true)
private String value;
private Property<String> value;

@NotNull
@Schema(
title = "The namespace in which the KV pair will be stored. By default, Kestra will use the namespace of the flow."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String namespace = "{{ flow.namespace }}";
private Property<String> namespace = new Property<>("{{ flow.namespace }}");

@NotNull
@Schema(
title = "Whether to overwrite or fail if a value for the given key already exists."
)
@PluginProperty
@Builder.Default
private boolean overwrite = true;
private Property<Boolean> overwrite = Property.of(true);

@Schema(
title = "Optional Time-To-Live (TTL) duration for the key-value pair. If not set, the KV pair will never be deleted from internal storage."
)
@PluginProperty
private Duration ttl;
private Property<Duration> ttl;

@Schema(
title = "Enum representing the data type of the KV pair. If not set, the value will be stored as a string."
)
@PluginProperty
private KVType kvType;
private Property<KVType> kvType;

@Override
public VoidOutput run(RunContext runContext) throws Exception {
String renderedNamespace = runContext.render(this.namespace);
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null);

String renderedKey = runContext.render(this.key);
Object renderedValue = runContext.renderTyped(this.value);
String renderedKey = runContext.render(this.key).as(String.class).orElse(null);

Object renderedValue = runContext.renderTyped(this.value.toString());

KVStore kvStore = runContext.namespaceKv(renderedNamespace);

if (kvType != null) {
if (renderedValue instanceof String renderedValueStr) {
renderedValue = switch (kvType) {
if (kvType != null && renderedValue instanceof String renderedValueStr) {
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) {
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
case DATETIME, DATE -> Instant.parse(renderedValueStr);
Expand All @@ -114,8 +108,11 @@ public VoidOutput run(RunContext runContext) throws Exception {
default -> renderedValue;
};
}
}
kvStore.put(renderedKey, new KVValueAndMetadata(new KVMetadata(ttl), renderedValue), this.overwrite);

kvStore.put(renderedKey, new KVValueAndMetadata(
new KVMetadata(runContext.render(ttl).as(Duration.class).orElse(null)), renderedValue),
runContext.render(this.overwrite).as(Boolean.class).orElseThrow()
);

return null;
}
Expand Down
13 changes: 7 additions & 6 deletions core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.kv;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
Expand All @@ -17,7 +18,7 @@
import static org.hamcrest.Matchers.is;

@KestraTest
public class DeleteTest {
class DeleteTest {
static final String TEST_KV_KEY = "test-key";

@Inject
Expand All @@ -38,8 +39,8 @@ void shouldOutputTrueGivenExistingKey() throws Exception {
Delete delete = Delete.builder()
.id(Delete.class.getSimpleName())
.type(Delete.class.getName())
.namespace("{{ inputs.namespace }}")
.key("{{ inputs.key }}")
.namespace(new Property<>("{{ inputs.namespace }}"))
.key(new Property<>("{{ inputs.key }}"))
.build();

final KVStore kv = runContext.namespaceKv(namespaceId);
Expand Down Expand Up @@ -67,16 +68,16 @@ void shouldOutputFalseGivenNonExistingKey() throws Exception {
Delete delete = Delete.builder()
.id(Delete.class.getSimpleName())
.type(Delete.class.getName())
.namespace(namespaceId)
.key("my-key")
.namespace(new Property<>(namespaceId))
.key(new Property<>("my-key"))
.build();

// When
Delete.Output run = delete.run(runContext);

assertThat(run.isDeleted(), is(false));

NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> delete.toBuilder().errorOnMissing(true).build().run(runContext));
NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> delete.toBuilder().errorOnMissing(Property.of(true)).build().run(runContext));
assertThat(noSuchElementException.getMessage(), is("No value found for key 'my-key' in namespace '" + namespaceId + "' and `errorOnMissing` is set to true"));
}
}
7 changes: 4 additions & 3 deletions core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.kv;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
Expand All @@ -16,7 +17,7 @@
import static org.hamcrest.Matchers.empty;

@KestraTest
public class GetKeysTest {
class GetKeysTest {
static final String TEST_KEY_PREFIX_TEST = "test";

@Inject
Expand Down Expand Up @@ -61,7 +62,7 @@ void shouldGetKeysGivenMatchingPrefix() throws Exception {
GetKeys getKeys = GetKeys.builder()
.id(GetKeys.class.getSimpleName())
.type(GetKeys.class.getName())
.prefix("{{ inputs.prefix }}")
.prefix(new Property<>("{{ inputs.prefix }}"))
.build();

final KVStore kv = runContext.namespaceKv(namespace);
Expand Down Expand Up @@ -90,7 +91,7 @@ void shouldGetNoKeysGivenEmptyKeyStore() throws Exception {
GetKeys getKeys = GetKeys.builder()
.id(GetKeys.class.getSimpleName())
.type(GetKeys.class.getName())
.prefix("{{ inputs.prefix }}")
.prefix(new Property<>("{{ inputs.prefix }}"))
.build();

// When
Expand Down
13 changes: 7 additions & 6 deletions core/src/test/java/io/kestra/plugin/core/kv/GetTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.kv;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
Expand All @@ -20,7 +21,7 @@
import static org.hamcrest.Matchers.nullValue;

@KestraTest
public class GetTest {
class GetTest {

static final String TEST_KV_KEY = "test-key";

Expand All @@ -44,8 +45,8 @@ void shouldGetGivenExistingKey() throws Exception {
Get get = Get.builder()
.id(Get.class.getSimpleName())
.type(Get.class.getName())
.namespace("{{ inputs.namespace }}")
.key("{{ inputs.key }}")
.namespace(new Property<>("{{ inputs.namespace }}"))
.key(new Property<>("{{ inputs.key }}"))
.build();


Expand Down Expand Up @@ -74,8 +75,8 @@ void shouldGetGivenNonExistingKey() throws Exception {
Get get = Get.builder()
.id(Get.class.getSimpleName())
.type(Get.class.getName())
.namespace(namespaceId)
.key("my-key")
.namespace(new Property<>(namespaceId))
.key(new Property<>("my-key"))
.build();

// When
Expand All @@ -84,7 +85,7 @@ void shouldGetGivenNonExistingKey() throws Exception {
// Then
assertThat(run.getValue(), nullValue());

NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> get.toBuilder().errorOnMissing(true).build().run(runContext));
NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> get.toBuilder().errorOnMissing(Property.of(true)).build().run(runContext));
assertThat(noSuchElementException.getMessage(), is("No value found for key 'my-key' in namespace '" + namespaceId + "' and `errorOnMissing` is set to true"));
}
}
Loading

0 comments on commit d052a87

Please sign in to comment.