From a6d746ae5a45cb9747546aa07e4c6d524417e3bc Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle <54168385+mgabelle@users.noreply.github.com> Date: Mon, 20 Jan 2025 11:05:11 +0100 Subject: [PATCH] refactor: migrate package plugin.core.execution to dynamic properties (#6708) migrate Count migrate Fail migrate PurgeExecutions migrate Resume --- .../kestra/plugin/core/execution/Count.java | 34 ++++++------ .../io/kestra/plugin/core/execution/Fail.java | 13 +++-- .../core/execution/PurgeExecutions.java | 53 ++++++++----------- .../kestra/plugin/core/execution/Resume.java | 25 +++++---- .../core/execution/PurgeExecutionsTest.java | 13 ++--- 5 files changed, 67 insertions(+), 71 deletions(-) diff --git a/core/src/main/java/io/kestra/plugin/core/execution/Count.java b/core/src/main/java/io/kestra/plugin/core/execution/Count.java index 5d032e3f06b..c3fcfb0f380 100644 --- a/core/src/main/java/io/kestra/plugin/core/execution/Count.java +++ b/core/src/main/java/io/kestra/plugin/core/execution/Count.java @@ -7,6 +7,7 @@ import io.kestra.core.models.executions.statistics.ExecutionCount; import io.kestra.core.models.executions.statistics.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.repositories.ExecutionRepositoryInterface; @@ -41,7 +42,7 @@ code = """ id: executions_count namespace: company.team - + tasks: - id: counts type: io.kestra.plugin.core.execution.Counts @@ -64,7 +65,7 @@ "text": ":warning: Flow `{{ jq taskrun.value '.namespace' true }}`.`{{ jq taskrun.value '.flowId' true }}` has no execution for last 24h!" } url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX" - + triggers: - id: schedule type: io.kestra.plugin.core.trigger.Schedule @@ -86,21 +87,18 @@ public class Count extends Task implements RunnableTask { @Schema( title = "A list of states to be filtered." ) - @PluginProperty - protected List states; + protected Property> states; @NotNull @Schema( title = "The start date." ) - @PluginProperty(dynamic = true) - protected String startDate; + protected Property startDate; @Schema( title = "The end date." ) - @PluginProperty(dynamic = true) - protected String endDate; + protected Property endDate; @NotNull @Schema( @@ -110,11 +108,9 @@ public class Count extends Task implements RunnableTask { "- ```yaml {{ eq count 0 }} ```: no execution found\n" + "- ```yaml {{ gte count 5 }} ```: more than 5 executions\n" ) - @PluginProperty(dynamic = true) - protected String expression; + protected Property expression; - @PluginProperty - protected List namespaces; + protected Property> namespaces; @Override public Output run(RunContext runContext) throws Exception { @@ -134,17 +130,19 @@ public Output run(RunContext runContext) throws Exception { if (flows != null) { flows.forEach(flow -> flowService.checkAllowedNamespace(flowInfo.tenantId(), flow.getNamespace(), flowInfo.tenantId(), flowInfo.namespace())); } + if (namespaces != null) { - namespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace())); + var renderedNamespaces = runContext.render(this.namespaces).asList(String.class); + renderedNamespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace())); } List executionCounts = executionRepository.executionCounts( flowInfo.tenantId(), flows, - this.states, - startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null, - endDate != null ? ZonedDateTime.parse(runContext.render(endDate)) : null, - namespaces + runContext.render(this.states).asList(State.Type.class), + startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null, + endDate != null ? ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()) : null, + runContext.render(this.namespaces).asList(String.class) ); logger.trace("{} flows matching filters", executionCounts.size()); @@ -153,7 +151,7 @@ public Output run(RunContext runContext) throws Exception { .stream() .filter(throwPredicate(item -> runContext .render( - this.expression, + runContext.render(this.expression).as(String.class).orElseThrow(), ImmutableMap.of("count", item.getCount().intValue()) ) .equals("true") diff --git a/core/src/main/java/io/kestra/plugin/core/execution/Fail.java b/core/src/main/java/io/kestra/plugin/core/execution/Fail.java index c1eec4d764f..6ec53ce0e24 100644 --- a/core/src/main/java/io/kestra/plugin/core/execution/Fail.java +++ b/core/src/main/java/io/kestra/plugin/core/execution/Fail.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.VoidOutput; @@ -89,29 +90,27 @@ aliases = "io.kestra.core.tasks.executions.Fail" ) public class Fail extends Task implements RunnableTask { - @PluginProperty(dynamic = true) @Schema( title = "Optional condition, must coerce to a boolean.", description = "Boolean coercion allows 0, -0, and '' to coerce to false, all other values to coerce to true." ) - private String condition; + private Property condition; - @PluginProperty(dynamic = true) @Schema(title = "Optional error message.") @Builder.Default - private String errorMessage = "Task failure"; + private Property errorMessage = Property.of("Task failure"); @Override public VoidOutput run(RunContext runContext) throws Exception { if (condition != null) { - String rendered = runContext.render(condition); + String rendered = runContext.render(condition).as(String.class).orElse(null); if (TruthUtils.isTruthy(rendered)) { - runContext.logger().error(runContext.render(errorMessage)); + runContext.logger().error(runContext.render(errorMessage).as(String.class).orElse(null)); throw new Exception("Fail on a condition"); } return null; } - throw new Exception(runContext.render(errorMessage)); + throw new Exception(runContext.render(errorMessage).as(String.class).orElse(null)); } } diff --git a/core/src/main/java/io/kestra/plugin/core/execution/PurgeExecutions.java b/core/src/main/java/io/kestra/plugin/core/execution/PurgeExecutions.java index ad32c9659d3..5616b5a1024 100644 --- a/core/src/main/java/io/kestra/plugin/core/execution/PurgeExecutions.java +++ b/core/src/main/java/io/kestra/plugin/core/execution/PurgeExecutions.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.DefaultRunContext; @@ -48,44 +49,38 @@ public class PurgeExecutions extends Task implements RunnableTask namespace; @Schema( title = "The flow ID to be purged.", description = "You need to provide the `namespace` properties if you want to purge a flow." ) - @PluginProperty(dynamic = true) - private String flowId; + private Property flowId; @Schema( title = "The minimum date to be purged.", description = "All data of flows executed after this date will be purged." ) - @PluginProperty(dynamic = true) - private String startDate; + private Property startDate; @Schema( title = "The maximum date to be purged.", description = "All data of flows executed before this date will be purged." ) - @PluginProperty(dynamic = true) @NotNull - private String endDate; + private Property endDate; @Schema( title = "The state of the executions to be purged.", description = "If not set, executions for any states will be purged." ) - @PluginProperty - private List states; + private Property> states; @Schema( title = "Whether to purge executions." ) - @PluginProperty @Builder.Default - private boolean purgeExecution = true; + private Property purgeExecution = Property.of(true); @Schema( title = "Whether to purge execution's logs.", @@ -93,23 +88,20 @@ public class PurgeExecutions extends Task implements RunnableTask purgeLog = Property.of(true); @Schema( title = "Whether to purge execution's metrics." ) - @PluginProperty @Builder.Default - private boolean purgeMetric = true; + private Property purgeMetric = Property.of(true); @Schema( title = "Whether to purge execution's files from the Kestra's internal storage." ) - @PluginProperty @Builder.Default - private boolean purgeStorage = true; + private Property purgeStorage = Property.of(true); @Override public PurgeExecutions.Output run(RunContext runContext) throws Exception { @@ -118,23 +110,24 @@ public PurgeExecutions.Output run(RunContext runContext) throws Exception { // validate that this namespace is authorized on the target namespace / all namespaces var flowInfo = runContext.flowInfo(); - if (namespace == null){ + String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null); + if (renderedNamespace == null){ flowService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace()); - } else if (!runContext.render(namespace).equals(flowInfo.namespace())) { - flowService.checkAllowedNamespace(flowInfo.tenantId(), runContext.render(namespace), flowInfo.tenantId(), flowInfo.namespace()); + } else if (!renderedNamespace.equals(flowInfo.namespace())) { + flowService.checkAllowedNamespace(flowInfo.tenantId(), renderedNamespace, flowInfo.tenantId(), flowInfo.namespace()); } ExecutionService.PurgeResult purgeResult = executionService.purge( - purgeExecution, - purgeLog, - purgeMetric, - purgeStorage, + runContext.render(this.purgeExecution).as(Boolean.class).orElseThrow(), + runContext.render(this.purgeLog).as(Boolean.class).orElseThrow(), + runContext.render(this.purgeMetric).as(Boolean.class).orElseThrow(), + runContext.render(this.purgeStorage).as(Boolean.class).orElseThrow(), flowInfo.tenantId(), - runContext.render(namespace), - runContext.render(flowId), - startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null, - ZonedDateTime.parse(runContext.render(endDate)), - states + renderedNamespace, + runContext.render(flowId).as(String.class).orElse(null), + startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null, + ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()), + this.states == null ? null : runContext.render(this.states).asList(State.Type.class) ); return Output.builder() diff --git a/core/src/main/java/io/kestra/plugin/core/execution/Resume.java b/core/src/main/java/io/kestra/plugin/core/execution/Resume.java index 62752a72cff..8bb4d9a854f 100644 --- a/core/src/main/java/io/kestra/plugin/core/execution/Resume.java +++ b/core/src/main/java/io/kestra/plugin/core/execution/Resume.java @@ -1,11 +1,13 @@ package io.kestra.plugin.core.execution; +import com.ctc.wstx.util.PrefixedName; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.VoidOutput; @@ -49,14 +51,12 @@ public class Resume extends Task implements RunnableTask { @Schema( title = "Filter for a specific namespace in case `executionId` is set. In case you wonder why `executionId` is not enough — we require specifying the namespace to make permissions explicit. The Enterprise Edition of Kestra allows you to resume executions from another namespaces only if the permissions allow it. Check the [Allowed Namespaces](https://kestra.io/docs/enterprise/allowed-namespaces) documentation for more details." ) - @PluginProperty(dynamic = true) - private String namespace; + private Property namespace; @Schema( title = "Filter for a specific flow identifier in case `executionId` is set." ) - @PluginProperty(dynamic = true) - private String flowId; + private Property flowId; @Schema( title = "Filter for a specific execution.", @@ -67,19 +67,22 @@ public class Resume extends Task implements RunnableTask { If `executionId` is not set, the task will use the ID of the current execution.""" ) - @PluginProperty(dynamic = true) - private String executionId; + private Property executionId; @Schema( title = "Inputs to be passed to the execution when it's resumed." ) - @PluginProperty(dynamic = true) - private Map inputs; + private Property> inputs; @SuppressWarnings("unchecked") @Override public VoidOutput run(RunContext runContext) throws Exception { - var executionInfo = PluginUtilsService.executionFromTaskParameters(runContext, this.namespace, this.flowId, this.executionId); + var executionInfo = PluginUtilsService.executionFromTaskParameters( + runContext, + runContext.render(this.namespace).as(String.class).orElse(null), + runContext.render(this.flowId).as(String.class).orElse(null), + runContext.render(this.executionId).as(String.class).orElse(null) + ); ApplicationContext applicationContext = ((DefaultRunContext)runContext).getApplicationContext(); ExecutionService executionService = applicationContext.getBean(ExecutionService.class); @@ -90,7 +93,9 @@ public VoidOutput run(RunContext runContext) throws Exception { Execution execution = executionRepository.findById(executionInfo.tenantId(), executionInfo.id()) .orElseThrow(() -> new IllegalArgumentException("No execution found for execution id " + executionInfo.id())); Flow flow = flowExecutor.findByExecution(execution).orElseThrow(() -> new IllegalArgumentException("Flow not found for execution id " + executionInfo.id())); - Map renderedInputs = inputs != null ? runContext.render(inputs) : null; + + Map renderedInputs = runContext.render(this.inputs).asMap(String.class, Object.class); + renderedInputs = !renderedInputs.isEmpty() ? renderedInputs : null; Execution resumed = executionService.resume(execution, flow, State.Type.RUNNING, renderedInputs); executionQueue.emit(resumed); diff --git a/core/src/test/java/io/kestra/plugin/core/execution/PurgeExecutionsTest.java b/core/src/test/java/io/kestra/plugin/core/execution/PurgeExecutionsTest.java index e5e559c86c8..3560095dccb 100644 --- a/core/src/test/java/io/kestra/plugin/core/execution/PurgeExecutionsTest.java +++ b/core/src/test/java/io/kestra/plugin/core/execution/PurgeExecutionsTest.java @@ -3,6 +3,7 @@ import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.utils.IdUtils; @@ -39,9 +40,9 @@ void run() throws Exception { executionRepository.save(execution); var purge = PurgeExecutions.builder() - .flowId(flowId) - .namespace(namespace) - .endDate(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)) + .flowId(Property.of(flowId)) + .namespace(Property.of(namespace)) + .endDate(Property.of(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))) .build(); var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId))); var output = purge.run(runContext); @@ -65,9 +66,9 @@ void deleted() throws Exception { executionRepository.delete(execution); var purge = PurgeExecutions.builder() - .namespace(namespace) - .flowId(flowId) - .endDate(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)) + .namespace(Property.of(namespace)) + .flowId(Property.of(flowId)) + .endDate(Property.of(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))) .build(); var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId))); var output = purge.run(runContext);