diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Delete.java b/core/src/main/java/io/kestra/plugin/core/kv/Delete.java index 36464362484..8e495fd4f05 100644 --- a/core/src/main/java/io/kestra/plugin/core/kv/Delete.java +++ b/core/src/main/java/io/kestra/plugin/core/kv/Delete.java @@ -2,7 +2,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.DefaultRunContext; @@ -46,36 +46,33 @@ public class Delete extends Task implements RunnableTask { @Schema( title = "The key for which to delete the value." ) - @PluginProperty(dynamic = true) - private String key; + private Property key; @NotNull @Schema( title = "The namespace on which to set the value." ) - @PluginProperty(dynamic = true) @Builder.Default - private String namespace = "{{ flow.namespace }}"; + private Property namespace = new Property<>("{{ flow.namespace }}"); @NotNull @Schema( title = "Whether to fail if there is no value for the given key." ) - @PluginProperty @Builder.Default - private boolean errorOnMissing = false; + private Property errorOnMissing = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { - String renderedNamespace = runContext.render(this.namespace); + String renderedNamespace = runContext.render(this.namespace).as(String.class).orElseThrow(); FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace()); - String renderedKey = runContext.render(this.key); + String renderedKey = runContext.render(this.key).as(String.class).orElseThrow(); boolean deleted = runContext.namespaceKv(renderedNamespace).delete(renderedKey); - if (this.errorOnMissing && !deleted) { + if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow()) && !deleted) { throw new NoSuchElementException("No value found for key '" + renderedKey + "' in namespace '" + renderedNamespace + "' and `errorOnMissing` is set to true"); } diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Get.java b/core/src/main/java/io/kestra/plugin/core/kv/Get.java index f54a9b11230..9f6eea048e6 100644 --- a/core/src/main/java/io/kestra/plugin/core/kv/Get.java +++ b/core/src/main/java/io/kestra/plugin/core/kv/Get.java @@ -2,7 +2,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.DefaultRunContext; @@ -51,37 +51,34 @@ public class Get extends Task implements RunnableTask { @Schema( title = "The key for which to get the value." ) - @PluginProperty(dynamic = true) - private String key; + private Property key; @NotNull @Schema( title = "The namespace on which to get the value." ) - @PluginProperty(dynamic = true) @Builder.Default - private String namespace = "{{ flow.namespace }}"; + private Property namespace = new Property<>("{{ flow.namespace }}"); @NotNull @Schema( title = "Whether to fail if there is no value for the given key." ) - @PluginProperty @Builder.Default - private boolean errorOnMissing = false; + private Property errorOnMissing = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { - String renderedNamespace = runContext.render(this.namespace); + String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null); FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace()); - String renderedKey = runContext.render(this.key); + String renderedKey = runContext.render(this.key).as(String.class).orElse(null); Optional maybeValue = runContext.namespaceKv(renderedNamespace).getValue(renderedKey); - if (this.errorOnMissing && maybeValue.isEmpty()) { + if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow()) && maybeValue.isEmpty()) { throw new NoSuchElementException("No value found for key '" + renderedKey + "' in namespace '" + renderedNamespace + "' and `errorOnMissing` is set to true"); } diff --git a/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java b/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java index ff60137e7f6..b729d4dbded 100644 --- a/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java +++ b/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java @@ -2,7 +2,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.DefaultRunContext; @@ -50,26 +50,24 @@ public class GetKeys extends Task implements RunnableTask { @Schema( title = "The key for which to get the value." ) - @PluginProperty(dynamic = true) - private String prefix; + private Property prefix; @NotNull @Schema( title = "The namespace on which to get the value." ) - @PluginProperty(dynamic = true) @Builder.Default - private String namespace = "{{ flow.namespace }}"; + private Property namespace = new Property<>("{{ flow.namespace }}"); @Override public Output run(RunContext runContext) throws Exception { - String renderedNamespace = runContext.render(this.namespace); + String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null); FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace()); - String renderedPrefix = runContext.render(this.prefix); + String renderedPrefix = runContext.render(this.prefix).as(String.class).orElse(null); Predicate filter = renderedPrefix == null ? key -> true : key -> key.startsWith(renderedPrefix); List keys = runContext.namespaceKv(renderedNamespace).list().stream() diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Set.java b/core/src/main/java/io/kestra/plugin/core/kv/Set.java index ebc0dbf3eeb..4bf2c170912 100644 --- a/core/src/main/java/io/kestra/plugin/core/kv/Set.java +++ b/core/src/main/java/io/kestra/plugin/core/kv/Set.java @@ -2,8 +2,8 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.kv.KVType; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.VoidOutput; @@ -56,56 +56,50 @@ public class Set extends Task implements RunnableTask { @Schema( title = "The key for which to set the value." ) - @PluginProperty(dynamic = true) - private String key; + private Property key; @NotNull @Schema( title = "The value to map to the key." ) - @PluginProperty(dynamic = true) - private String value; + private Property value; @NotNull @Schema( title = "The namespace in which the KV pair will be stored. By default, Kestra will use the namespace of the flow." ) - @PluginProperty(dynamic = true) @Builder.Default - private String namespace = "{{ flow.namespace }}"; + private Property namespace = new Property<>("{{ flow.namespace }}"); @NotNull @Schema( title = "Whether to overwrite or fail if a value for the given key already exists." ) - @PluginProperty @Builder.Default - private boolean overwrite = true; + private Property overwrite = Property.of(true); @Schema( title = "Optional Time-To-Live (TTL) duration for the key-value pair. If not set, the KV pair will never be deleted from internal storage." ) - @PluginProperty - private Duration ttl; + private Property ttl; @Schema( title = "Enum representing the data type of the KV pair. If not set, the value will be stored as a string." ) - @PluginProperty - private KVType kvType; + private Property kvType; @Override public VoidOutput run(RunContext runContext) throws Exception { - String renderedNamespace = runContext.render(this.namespace); + String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null); - String renderedKey = runContext.render(this.key); - Object renderedValue = runContext.renderTyped(this.value); + String renderedKey = runContext.render(this.key).as(String.class).orElse(null); + + Object renderedValue = runContext.renderTyped(this.value.toString()); KVStore kvStore = runContext.namespaceKv(renderedNamespace); - if (kvType != null) { - if (renderedValue instanceof String renderedValueStr) { - renderedValue = switch (kvType) { + if (kvType != null && renderedValue instanceof String renderedValueStr) { + renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) { case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class); case BOOLEAN -> Boolean.parseBoolean((String) renderedValue); case DATETIME, DATE -> Instant.parse(renderedValueStr); @@ -114,8 +108,11 @@ public VoidOutput run(RunContext runContext) throws Exception { default -> renderedValue; }; } - } - kvStore.put(renderedKey, new KVValueAndMetadata(new KVMetadata(ttl), renderedValue), this.overwrite); + + kvStore.put(renderedKey, new KVValueAndMetadata( + new KVMetadata(runContext.render(ttl).as(Duration.class).orElse(null)), renderedValue), + runContext.render(this.overwrite).as(Boolean.class).orElseThrow() + ); return null; } diff --git a/core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java b/core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java index fe8e91e9c3e..eb90d13c097 100644 --- a/core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java +++ b/core/src/test/java/io/kestra/plugin/core/kv/DeleteTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.core.kv; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.storages.kv.KVStore; @@ -17,7 +18,7 @@ import static org.hamcrest.Matchers.is; @KestraTest -public class DeleteTest { +class DeleteTest { static final String TEST_KV_KEY = "test-key"; @Inject @@ -38,8 +39,8 @@ void shouldOutputTrueGivenExistingKey() throws Exception { Delete delete = Delete.builder() .id(Delete.class.getSimpleName()) .type(Delete.class.getName()) - .namespace("{{ inputs.namespace }}") - .key("{{ inputs.key }}") + .namespace(new Property<>("{{ inputs.namespace }}")) + .key(new Property<>("{{ inputs.key }}")) .build(); final KVStore kv = runContext.namespaceKv(namespaceId); @@ -67,8 +68,8 @@ void shouldOutputFalseGivenNonExistingKey() throws Exception { Delete delete = Delete.builder() .id(Delete.class.getSimpleName()) .type(Delete.class.getName()) - .namespace(namespaceId) - .key("my-key") + .namespace(new Property<>(namespaceId)) + .key(new Property<>("my-key")) .build(); // When @@ -76,7 +77,7 @@ void shouldOutputFalseGivenNonExistingKey() throws Exception { assertThat(run.isDeleted(), is(false)); - NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> delete.toBuilder().errorOnMissing(true).build().run(runContext)); + NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> delete.toBuilder().errorOnMissing(Property.of(true)).build().run(runContext)); assertThat(noSuchElementException.getMessage(), is("No value found for key 'my-key' in namespace '" + namespaceId + "' and `errorOnMissing` is set to true")); } } diff --git a/core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java b/core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java index 276b4f15027..da5df18cce2 100644 --- a/core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java +++ b/core/src/test/java/io/kestra/plugin/core/kv/GetKeysTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.core.kv; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.storages.kv.KVStore; @@ -16,7 +17,7 @@ import static org.hamcrest.Matchers.empty; @KestraTest -public class GetKeysTest { +class GetKeysTest { static final String TEST_KEY_PREFIX_TEST = "test"; @Inject @@ -61,7 +62,7 @@ void shouldGetKeysGivenMatchingPrefix() throws Exception { GetKeys getKeys = GetKeys.builder() .id(GetKeys.class.getSimpleName()) .type(GetKeys.class.getName()) - .prefix("{{ inputs.prefix }}") + .prefix(new Property<>("{{ inputs.prefix }}")) .build(); final KVStore kv = runContext.namespaceKv(namespace); @@ -90,7 +91,7 @@ void shouldGetNoKeysGivenEmptyKeyStore() throws Exception { GetKeys getKeys = GetKeys.builder() .id(GetKeys.class.getSimpleName()) .type(GetKeys.class.getName()) - .prefix("{{ inputs.prefix }}") + .prefix(new Property<>("{{ inputs.prefix }}")) .build(); // When diff --git a/core/src/test/java/io/kestra/plugin/core/kv/GetTest.java b/core/src/test/java/io/kestra/plugin/core/kv/GetTest.java index 15aff03785d..907532576d3 100644 --- a/core/src/test/java/io/kestra/plugin/core/kv/GetTest.java +++ b/core/src/test/java/io/kestra/plugin/core/kv/GetTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.core.kv; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.storages.kv.KVStore; @@ -20,7 +21,7 @@ import static org.hamcrest.Matchers.nullValue; @KestraTest -public class GetTest { +class GetTest { static final String TEST_KV_KEY = "test-key"; @@ -44,8 +45,8 @@ void shouldGetGivenExistingKey() throws Exception { Get get = Get.builder() .id(Get.class.getSimpleName()) .type(Get.class.getName()) - .namespace("{{ inputs.namespace }}") - .key("{{ inputs.key }}") + .namespace(new Property<>("{{ inputs.namespace }}")) + .key(new Property<>("{{ inputs.key }}")) .build(); @@ -74,8 +75,8 @@ void shouldGetGivenNonExistingKey() throws Exception { Get get = Get.builder() .id(Get.class.getSimpleName()) .type(Get.class.getName()) - .namespace(namespaceId) - .key("my-key") + .namespace(new Property<>(namespaceId)) + .key(new Property<>("my-key")) .build(); // When @@ -84,7 +85,7 @@ void shouldGetGivenNonExistingKey() throws Exception { // Then assertThat(run.getValue(), nullValue()); - NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> get.toBuilder().errorOnMissing(true).build().run(runContext)); + NoSuchElementException noSuchElementException = Assertions.assertThrows(NoSuchElementException.class, () -> get.toBuilder().errorOnMissing(Property.of(true)).build().run(runContext)); assertThat(noSuchElementException.getMessage(), is("No value found for key 'my-key' in namespace '" + namespaceId + "' and `errorOnMissing` is set to true")); } } diff --git a/core/src/test/java/io/kestra/plugin/core/kv/SetTest.java b/core/src/test/java/io/kestra/plugin/core/kv/SetTest.java index dea7b03220b..b37e2589e4b 100644 --- a/core/src/test/java/io/kestra/plugin/core/kv/SetTest.java +++ b/core/src/test/java/io/kestra/plugin/core/kv/SetTest.java @@ -2,6 +2,7 @@ import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.kv.KVType; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.storages.StorageInterface; @@ -25,7 +26,7 @@ import static org.hamcrest.Matchers.nullValue; @KestraTest -public class SetTest { +class SetTest { static final String TEST_KEY = "test-key"; @Inject @@ -40,8 +41,8 @@ void shouldSetKVGivenNoNamespace() throws Exception { Set set = Set.builder() .id(Set.class.getSimpleName()) .type(Set.class.getName()) - .key("{{ inputs.key }}") - .value("{{ inputs.value }}") + .key(new Property<>("{{ inputs.key }}")) + .value(new Property<>("{{ inputs.value }}")) .build(); var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string"); @@ -73,9 +74,9 @@ void shouldSetKVGivenSameNamespace() throws Exception { Set set = Set.builder() .id(Set.class.getSimpleName()) .type(Set.class.getName()) - .key("{{ inputs.key }}") - .value("{{ inputs.value }}") - .namespace("io.kestra.test") + .key(new Property<>("{{ inputs.key }}")) + .value(new Property<>("{{ inputs.value }}")) + .namespace(new Property<>("io.kestra.test")) .build(); // When @@ -101,9 +102,9 @@ void shouldSetKVGivenChildNamespace() throws Exception { Set set = Set.builder() .id(Set.class.getSimpleName()) .type(Set.class.getName()) - .key("{{ inputs.key }}") - .value("{{ inputs.value }}") - .namespace("io.kestra") + .key(new Property<>("{{ inputs.key }}")) + .value(new Property<>("{{ inputs.value }}")) + .namespace(new Property<>("io.kestra")) .build(); // When set.run(runContext); @@ -128,9 +129,9 @@ void shouldFailGivenNonExistingNamespace() { Set set = Set.builder() .id(Set.class.getSimpleName()) .type(Set.class.getName()) - .key("{{ inputs.key }}") - .value("{{ inputs.value }}") - .namespace("not-found") + .key(new Property<>("{{ inputs.key }}")) + .value(new Property<>("{{ inputs.value }}")) + .namespace(new Property<>("not-found")) .build(); // When - Then @@ -143,9 +144,9 @@ void shouldSetKVGivenTTL() throws Exception { Set set = Set.builder() .id(Set.class.getSimpleName()) .type(Set.class.getName()) - .key("{{ inputs.key }}") - .value("{{ inputs.value }}") - .ttl(Duration.ofMinutes(5)) + .key(new Property<>("{{ inputs.key }}")) + .value(new Property<>("{{ inputs.value }}")) + .ttl(Property.of(Duration.ofMinutes(5))) .build(); var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string"); @@ -170,9 +171,9 @@ void shouldFailGivenExistingKeyAndOverwriteFalse() { Set set = Set.builder() .id(Set.class.getSimpleName()) .type(Set.class.getName()) - .key("{{ inputs.key }}") - .value("{{ inputs.value }}") - .overwrite(false) + .key(new Property<>("{{ inputs.key }}")) + .value(new Property<>("{{ inputs.value }}")) + .overwrite(Property.of(false)) .build(); var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string"); @@ -188,31 +189,33 @@ void shouldFailGivenExistingKeyAndOverwriteFalse() { @Test void typeSpecified() throws Exception { - // Given - Set set = Set.builder() - .id(Set.class.getSimpleName()) - .type(Set.class.getName()) - .key(TEST_KEY) - .build(); - - final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, null); - - final KVStore kv = runContext.namespaceKv(runContext.flowInfo().namespace()); + KVStore kv = createAndPerformSetTask("123.45", KVType.NUMBER); + assertThat(kv.getValue(TEST_KEY).orElseThrow().value(), is(123.45)); - set.toBuilder().value("123.45").kvType(KVType.NUMBER).build().run(runContext); - assertThat(kv.getValue(TEST_KEY).get().value(), is(123.45)); + kv = createAndPerformSetTask("true", KVType.BOOLEAN); + assertThat(kv.getValue(TEST_KEY).orElseThrow().value(), is(true)); - set.toBuilder().value("true").kvType(KVType.BOOLEAN).build().run(runContext); - assertThat(kv.getValue(TEST_KEY).get().value(), is(true)); + kv = createAndPerformSetTask("2023-05-02T01:02:03Z", KVType.DATETIME); + assertThat(kv.getValue(TEST_KEY).orElseThrow().value(), is(Instant.parse("2023-05-02T01:02:03Z"))); - set.toBuilder().value("2023-05-02T01:02:03Z").kvType(KVType.DATETIME).build().run(runContext); - assertThat(kv.getValue(TEST_KEY).get().value(), is(Instant.parse("2023-05-02T01:02:03Z"))); - - set.toBuilder().value("P1DT5S").kvType(KVType.DURATION).build().run(runContext); + kv = createAndPerformSetTask("P1DT5S", KVType.DURATION); // TODO Hack meanwhile we handle duration serialization as currently they are stored as bigint... - assertThat((long) Double.parseDouble(kv.getValue(TEST_KEY).get().value().toString()), is(Duration.ofDays(1).plus(Duration.ofSeconds(5)).toSeconds())); + assertThat((long) Double.parseDouble(kv.getValue(TEST_KEY).orElseThrow().value().toString()), is(Duration.ofDays(1).plus(Duration.ofSeconds(5)).toSeconds())); + + kv = createAndPerformSetTask("[{\"some\":\"value\"},{\"another\":\"value\"}]", KVType.JSON); + assertThat(kv.getValue(TEST_KEY).orElseThrow().value(), is(List.of(Map.of("some", "value"), Map.of("another", "value")))); + } - set.toBuilder().value("[{\"some\":\"value\"},{\"another\":\"value\"}]").kvType(KVType.JSON).build().run(runContext); - assertThat(kv.getValue(TEST_KEY).get().value(), is(List.of(Map.of("some", "value"), Map.of("another", "value")))); + private KVStore createAndPerformSetTask(String value, KVType type) throws Exception { + Set set = Set.builder() + .id(Set.class.getSimpleName()) + .type(Set.class.getName()) + .key(new Property<>(TEST_KEY)) + .value(new Property<>(value)) + .kvType(Property.of(type)) + .build(); + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, null); + set.run(runContext); + return runContext.namespaceKv(runContext.flowInfo().namespace()); } }