Skip to content

Commit

Permalink
fix(core, jdbc): Count task
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 27, 2025
1 parent 1bcc22a commit 2359c4f
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class Property<T> {
.copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);

@Getter
private String expression;
private T value;

Expand All @@ -53,6 +52,10 @@ public Property(Map<?, ?> map) {
}
}

String getExpression() {
return expression;
}

/**
* Build a new Property object with a value already set.<br>
*
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/java/io/kestra/plugin/core/execution/Count.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
tasks:
- id: counts
type: io.kestra.plugin.core.execution.Counts
type: io.kestra.plugin.core.execution.Count
expression: "{{ count == 0 }}"
flows:
- namespace: company.team
Expand Down Expand Up @@ -108,7 +108,8 @@ public class Count extends Task implements RunnableTask<Count.Output> {
"- ```yaml {{ eq count 0 }} ```: no execution found\n" +
"- ```yaml {{ gte count 5 }} ```: more than 5 executions\n"
)
protected Property<String> expression;
@PluginProperty(dynamic = true) // we cannot use `Property` as we render it multiple time with different variables, which is an issue for the property cache
protected String expression;

protected Property<List<String>> namespaces;

Expand Down Expand Up @@ -140,8 +141,8 @@ public Output run(RunContext runContext) throws Exception {
flowInfo.tenantId(),
flows,
runContext.render(this.states).asList(State.Type.class),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()) : null,
runContext.render(this.startDate).as(String.class).map(ZonedDateTime::parse).orElse(null),
runContext.render(this.endDate).as(String.class).map(ZonedDateTime::parse).orElse(null),
runContext.render(this.namespaces).asList(String.class)
);

Expand All @@ -150,10 +151,7 @@ public Output run(RunContext runContext) throws Exception {
List<Result> count = executionCounts
.stream()
.filter(throwPredicate(item -> runContext
.render(
this.expression.getExpression(),
ImmutableMap.of("count", item.getCount().intValue())
)
.render(this.expression, ImmutableMap.of("count", item.getCount().intValue()))
.equals("true")
))
.map(item -> Result.builder()
Expand Down
93 changes: 93 additions & 0 deletions core/src/test/java/io/kestra/plugin/core/execution/CountTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.kestra.plugin.core.execution;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.AbstractExecutionRepositoryTest;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@KestraTest
class CountTest {
@Inject
RunContextFactory runContextFactory;

@Inject
ExecutionRepositoryInterface executionRepository;


@Test
void run() throws Exception {
for (int i = 0; i < 28; i++) {
executionRepository.save(AbstractExecutionRepositoryTest.builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 4 ? "first" : (i < 10 ? "second" : "third")
).build());
}
// matching one
Count task = Count.builder()
.id(IdUtils.create())
.type(Count.class.getName())
.flows(List.of(
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "first"),
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "second"),
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "third")
))
.expression("{{ count >= 5 }}")
.startDate(new Property<>("{{ now() | dateAdd (-30, 'DAYS') }}"))
.endDate(new Property<>("{{ now() }}"))
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of("namespace", "io.kestra.unittest"));
Count.Output run = task.run(runContext);

assertThat(run.getResults().size(), is(2));
assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("second")).count(), is(1L));
assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("second")).findFirst().get().getCount(), is(6L));
assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("third")).count(), is(1L));
assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("third")).findFirst().get().getCount(), is(18L));
assertThat(run.getTotal(), is(24L));

// add state filter no result
run = Count.builder()
.flows(List.of(
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "first"),
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "second"),
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "third")
))
.states(Property.of(List.of(State.Type.RUNNING)))
.expression("{{ count >= 5 }}")
.build()
.run(runContext);

assertThat(run.getResults().size(), is(0));

// non-matching entry
run = Count.builder()
.flows(List.of(
new Flow("io.kestra.test", "missing"),
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "second"),
new Flow(AbstractExecutionRepositoryTest.NAMESPACE, "third")
))
.expression("{{ count == 0 }}")
.build()
.run(runContext);

assertThat(run.getResults().size(), is(1));
assertThat(run.getResults().stream().filter(f -> f.getFlowId().equals("missing")).count(), is(1L));
assertThat(run.getTotal(), is(0L));

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ public List<ExecutionCount> executionCounts(
select = select.and(START_DATE_FIELD.greaterOrEqual(finalStartDate.toOffsetDateTime()));
select = select.and(START_DATE_FIELD.lessOrEqual(finalEndDate.toOffsetDateTime()));

if (states != null) {
if (!ListUtils.isEmpty(states)) {
select = select.and(this.statesFilter(states));
}

Expand Down Expand Up @@ -921,7 +921,7 @@ public List<ExecutionCount> executionCounts(

List<ExecutionCount> counts = new ArrayList<>();
// fill missing with count at 0
if (flows != null) {
if (!ListUtils.isEmpty(flows)) {
counts.addAll(flows
.stream()
.map(flow -> result
Expand All @@ -939,7 +939,7 @@ public List<ExecutionCount> executionCounts(
.toList());
}

if (namespaces != null) {
if (!ListUtils.isEmpty(namespaces)) {
Map<String, Long> groupedByNamespace = result.stream()
.collect(Collectors.groupingBy(
ExecutionCount::getNamespace,
Expand Down

0 comments on commit 2359c4f

Please sign in to comment.