From 2359c4f744c75c6e99a3e84eadc0cdf716035009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 27 Jan 2025 12:48:15 +0100 Subject: [PATCH] fix(core, jdbc): Count task --- .../kestra/core/models/property/Property.java | 5 +- .../kestra/plugin/core/execution/Count.java | 14 ++- .../plugin/core/execution/CountTest.java | 93 +++++++++++++++++++ .../org.mockito.plugins.MockMaker | 1 + .../AbstractJdbcExecutionRepository.java | 6 +- 5 files changed, 107 insertions(+), 12 deletions(-) create mode 100644 core/src/test/java/io/kestra/plugin/core/execution/CountTest.java create mode 100644 core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/core/src/main/java/io/kestra/core/models/property/Property.java b/core/src/main/java/io/kestra/core/models/property/Property.java index 9f95b52d8d9..4fc0998c0cc 100644 --- a/core/src/main/java/io/kestra/core/models/property/Property.java +++ b/core/src/main/java/io/kestra/core/models/property/Property.java @@ -37,7 +37,6 @@ public class Property { .copy() .configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); - @Getter private String expression; private T value; @@ -53,6 +52,10 @@ public Property(Map map) { } } + String getExpression() { + return expression; + } + /** * Build a new Property object with a value already set.
* diff --git a/core/src/main/java/io/kestra/plugin/core/execution/Count.java b/core/src/main/java/io/kestra/plugin/core/execution/Count.java index 26233230c2c..a81dc97d671 100644 --- a/core/src/main/java/io/kestra/plugin/core/execution/Count.java +++ b/core/src/main/java/io/kestra/plugin/core/execution/Count.java @@ -45,7 +45,7 @@ tasks: - id: counts - type: io.kestra.plugin.core.execution.Counts + type: io.kestra.plugin.core.execution.Count expression: "{{ count == 0 }}" flows: - namespace: company.team @@ -108,7 +108,8 @@ public class Count extends Task implements RunnableTask { "- ```yaml {{ eq count 0 }} ```: no execution found\n" + "- ```yaml {{ gte count 5 }} ```: more than 5 executions\n" ) - protected Property expression; + @PluginProperty(dynamic = true) // we cannot use `Property` as we render it multiple time with different variables, which is an issue for the property cache + protected String expression; protected Property> namespaces; @@ -140,8 +141,8 @@ public Output run(RunContext runContext) throws Exception { flowInfo.tenantId(), flows, runContext.render(this.states).asList(State.Type.class), - startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null, - endDate != null ? ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()) : null, + runContext.render(this.startDate).as(String.class).map(ZonedDateTime::parse).orElse(null), + runContext.render(this.endDate).as(String.class).map(ZonedDateTime::parse).orElse(null), runContext.render(this.namespaces).asList(String.class) ); @@ -150,10 +151,7 @@ public Output run(RunContext runContext) throws Exception { List count = executionCounts .stream() .filter(throwPredicate(item -> runContext - .render( - this.expression.getExpression(), - ImmutableMap.of("count", item.getCount().intValue()) - ) + .render(this.expression, ImmutableMap.of("count", item.getCount().intValue())) .equals("true") )) .map(item -> Result.builder() diff --git a/core/src/test/java/io/kestra/plugin/core/execution/CountTest.java b/core/src/test/java/io/kestra/plugin/core/execution/CountTest.java new file mode 100644 index 00000000000..e5843863216 --- /dev/null +++ b/core/src/test/java/io/kestra/plugin/core/execution/CountTest.java @@ -0,0 +1,93 @@ +package io.kestra.plugin.core.execution; + +import com.google.common.collect.ImmutableMap; +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.executions.statistics.Flow; +import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; +import io.kestra.core.repositories.AbstractExecutionRepositoryTest; +import io.kestra.core.repositories.ExecutionRepositoryInterface; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@KestraTest +class CountTest { + @Inject + RunContextFactory runContextFactory; + + @Inject + ExecutionRepositoryInterface executionRepository; + + + @Test + void run() throws Exception { + for (int i = 0; i < 28; i++) { + executionRepository.save(AbstractExecutionRepositoryTest.builder( + i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), + i < 4 ? "first" : (i < 10 ? "second" : "third") + ).build()); + } + // matching one + Count task = Count.builder() + .id(IdUtils.create()) + .type(Count.class.getName()) + .flows(List.of( + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "first"), + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "second"), + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "third") + )) + .expression("{{ count >= 5 }}") + .startDate(new Property<>("{{ now() | dateAdd (-30, 'DAYS') }}")) + .endDate(new Property<>("{{ now() }}")) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of("namespace", "io.kestra.unittest")); + Count.Output run = task.run(runContext); + + assertThat(run.getResults().size(), is(2)); + assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("second")).count(), is(1L)); + assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("second")).findFirst().get().getCount(), is(6L)); + assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("third")).count(), is(1L)); + assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("third")).findFirst().get().getCount(), is(18L)); + assertThat(run.getTotal(), is(24L)); + + // add state filter no result + run = Count.builder() + .flows(List.of( + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "first"), + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "second"), + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "third") + )) + .states(Property.of(List.of(State.Type.RUNNING))) + .expression("{{ count >= 5 }}") + .build() + .run(runContext); + + assertThat(run.getResults().size(), is(0)); + + // non-matching entry + run = Count.builder() + .flows(List.of( + new Flow("io.kestra.test", "missing"), + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "second"), + new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "third") + )) + .expression("{{ count == 0 }}") + .build() + .run(runContext); + + assertThat(run.getResults().size(), is(1)); + assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("missing")).count(), is(1L)); + assertThat(run.getTotal(), is(0L)); + + } +} \ No newline at end of file diff --git a/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000000..1f0955d450f --- /dev/null +++ b/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java index cde929fa709..73b056b7dd4 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java @@ -877,7 +877,7 @@ public List executionCounts( select = select.and(START_DATE_FIELD.greaterOrEqual(finalStartDate.toOffsetDateTime())); select = select.and(START_DATE_FIELD.lessOrEqual(finalEndDate.toOffsetDateTime())); - if (states != null) { + if (!ListUtils.isEmpty(states)) { select = select.and(this.statesFilter(states)); } @@ -921,7 +921,7 @@ public List executionCounts( List counts = new ArrayList<>(); // fill missing with count at 0 - if (flows != null) { + if (!ListUtils.isEmpty(flows)) { counts.addAll(flows .stream() .map(flow -> result @@ -939,7 +939,7 @@ public List executionCounts( .toList()); } - if (namespaces != null) { + if (!ListUtils.isEmpty(namespaces)) { Map groupedByNamespace = result.stream() .collect(Collectors.groupingBy( ExecutionCount::getNamespace,