Skip to content

Commit 7e16b71

Browse files
committed
feat(jdbc): implementation of trigger repository
1 parent 454a603 commit 7e16b71

File tree

42 files changed

+1432
-641
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1432
-641
lines changed

Diff for: cli/src/main/resources/application.yml

+6-8
Original file line numberDiff line numberDiff line change
@@ -206,25 +206,23 @@ kestra:
206206
delete.retention.ms: 86400000
207207

208208
jdbc:
209-
defaults:
210-
table-prefix: "kestra_"
211209
tables:
212210
queues:
213-
table: "${kestra.jdbc.defaults.table-prefix}queues"
211+
table: "queues"
214212
flows:
215-
table: "${kestra.jdbc.defaults.table-prefix}flows"
213+
table: "flows"
216214
cls: io.kestra.core.models.flows.Flow
217215
executions:
218-
table: "${kestra.jdbc.defaults.table-prefix}executions"
216+
table: "executions"
219217
cls: io.kestra.core.models.executions.Execution
220218
templates:
221-
table: "${kestra.jdbc.defaults.table-prefix}templates"
219+
table: "templates"
222220
cls: io.kestra.core.models.templates.Template
223221
triggers:
224-
table: "${kestra.jdbc.defaults.table-prefix}triggers"
222+
table: "triggers"
225223
cls: io.kestra.core.models.triggers.Trigger
226224
logs:
227-
table: "${kestra.jdbc.defaults.table-prefix}logs"
225+
table: "logs"
228226
cls: io.kestra.core.models.executions.LogEntry
229227

230228
elasticsearch:

Diff for: core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java

+3
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@ public interface TriggerRepositoryInterface {
1010
Optional<Trigger> findLast(TriggerContext trigger);
1111

1212
List<Trigger> findAll();
13+
14+
Trigger save(Trigger trigger);
1315
}
16+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
package io.kestra.core.repositories;
2+
3+
import com.devskiller.friendly_id.FriendlyId;
4+
import io.kestra.core.models.executions.Execution;
5+
import io.kestra.core.models.executions.TaskRun;
6+
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
7+
import io.kestra.core.models.executions.statistics.ExecutionCount;
8+
import io.kestra.core.models.executions.statistics.Flow;
9+
import io.kestra.core.models.flows.State;
10+
import io.kestra.core.models.tasks.ResolvedTask;
11+
import io.kestra.core.tasks.debugs.Return;
12+
import io.micronaut.data.model.Pageable;
13+
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
14+
import jakarta.inject.Inject;
15+
import org.junit.jupiter.api.Test;
16+
17+
import java.time.Duration;
18+
import java.time.ZonedDateTime;
19+
import java.util.*;
20+
21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.hamcrest.Matchers.greaterThan;
23+
import static org.hamcrest.Matchers.is;
24+
import static org.mockito.Mockito.doReturn;
25+
import static org.mockito.Mockito.spy;
26+
27+
@MicronautTest(transactional = false)
28+
public abstract class AbstractExecutionRepositoryTest {
29+
public static final String NAMESPACE = "io.kestra.unittest";
30+
public static final String FLOW = "full";
31+
32+
@Inject
33+
protected ExecutionRepositoryInterface executionRepository;
34+
35+
public static Execution.ExecutionBuilder builder(State.Type state, String flowId) {
36+
State finalState = randomDuration(state);
37+
38+
Execution.ExecutionBuilder execution = Execution.builder()
39+
.id(FriendlyId.createFriendlyId())
40+
.namespace(NAMESPACE)
41+
.flowId(flowId == null ? FLOW : flowId)
42+
.flowRevision(1)
43+
.state(finalState);
44+
45+
46+
List<TaskRun> taskRuns = Arrays.asList(
47+
TaskRun.of(execution.build(), ResolvedTask.of(
48+
Return.builder().id("first").type(Return.class.getName()).format("test").build())
49+
)
50+
.withState(State.Type.SUCCESS),
51+
spyTaskRun(TaskRun.of(execution.build(), ResolvedTask.of(
52+
Return.builder().id("second").type(Return.class.getName()).format("test").build())
53+
)
54+
.withState(state),
55+
state
56+
),
57+
TaskRun.of(execution.build(), ResolvedTask.of(
58+
Return.builder().id("third").type(Return.class.getName()).format("test").build())).withState(state)
59+
);
60+
61+
if (flowId == null) {
62+
return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1), taskRuns.get(2)));
63+
}
64+
65+
return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1)));
66+
}
67+
68+
69+
static TaskRun spyTaskRun(TaskRun taskRun, State.Type state) {
70+
TaskRun spy = spy(taskRun);
71+
72+
doReturn(randomDuration(state))
73+
.when(spy)
74+
.getState();
75+
76+
return spy;
77+
}
78+
79+
static State randomDuration(State.Type state) {
80+
State finalState = new State();
81+
82+
finalState = spy(finalState
83+
.withState(state != null ? state : State.Type.SUCCESS)
84+
);
85+
86+
Random rand = new Random();
87+
doReturn(Duration.ofSeconds(rand.nextInt(150)))
88+
.when(finalState)
89+
.getDuration();
90+
91+
return finalState;
92+
}
93+
94+
95+
protected void inject() {
96+
for (int i = 0; i < 28; i++) {
97+
executionRepository.save(builder(
98+
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
99+
i < 15 ? null : "second"
100+
).build());
101+
}
102+
}
103+
104+
@Test
105+
protected void find() {
106+
inject();
107+
108+
ArrayListTotal<Execution> executions = executionRepository.find("*", Pageable.from(1, 10), null);
109+
assertThat(executions.getTotal(), is(28L));
110+
assertThat(executions.size(), is(10));
111+
}
112+
113+
@Test
114+
protected void findTaskRun() {
115+
inject();
116+
117+
ArrayListTotal<TaskRun> executions = executionRepository.findTaskRun("*", Pageable.from(1, 10), null);
118+
assertThat(executions.getTotal(), is(71L));
119+
assertThat(executions.size(), is(10));
120+
}
121+
122+
123+
@Test
124+
protected void findById() {
125+
executionRepository.save(ExecutionFixture.EXECUTION_1);
126+
127+
Optional<Execution> full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId());
128+
assertThat(full.isPresent(), is(true));
129+
130+
full.ifPresent(current -> {
131+
assertThat(full.get().getId(), is(ExecutionFixture.EXECUTION_1.getId()));
132+
});
133+
}
134+
135+
@Test
136+
protected void mappingConflict() {
137+
executionRepository.save(ExecutionFixture.EXECUTION_2);
138+
executionRepository.save(ExecutionFixture.EXECUTION_1);
139+
140+
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(NAMESPACE, FLOW, Pageable.from(1, 10));
141+
142+
assertThat(page1.size(), is(2));
143+
}
144+
145+
@Test
146+
protected void dailyGroupByFlowStatistics() throws InterruptedException {
147+
for (int i = 0; i < 28; i++) {
148+
executionRepository.save(builder(
149+
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
150+
i < 15 ? null : "second"
151+
).build());
152+
}
153+
154+
// mysql need some time ...
155+
Thread.sleep(500);
156+
157+
Map<String, Map<String, List<DailyExecutionStatistics>>> result = executionRepository.dailyGroupByFlowStatistics(
158+
"*",
159+
ZonedDateTime.now().minusDays(10),
160+
ZonedDateTime.now(),
161+
false
162+
);
163+
164+
assertThat(result.size(), is(1));
165+
assertThat(result.get("io.kestra.unittest").size(), is(2));
166+
167+
DailyExecutionStatistics full = result.get("io.kestra.unittest").get(FLOW).get(10);
168+
DailyExecutionStatistics second = result.get("io.kestra.unittest").get("second").get(10);
169+
170+
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
171+
assertThat(full.getExecutionCounts().size(), is(9));
172+
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
173+
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
174+
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(7L));
175+
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));
176+
177+
assertThat(second.getDuration().getAvg().getSeconds(), greaterThan(0L));
178+
assertThat(second.getExecutionCounts().size(), is(9));
179+
assertThat(second.getExecutionCounts().get(State.Type.SUCCESS), is(13L));
180+
assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L));
181+
182+
result = executionRepository.dailyGroupByFlowStatistics(
183+
"*",
184+
ZonedDateTime.now().minusDays(10),
185+
ZonedDateTime.now(),
186+
true
187+
);
188+
189+
assertThat(result.size(), is(1));
190+
assertThat(result.get("io.kestra.unittest").size(), is(1));
191+
full = result.get("io.kestra.unittest").get("*").get(10);
192+
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
193+
assertThat(full.getExecutionCounts().size(), is(9));
194+
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
195+
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
196+
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(20L));
197+
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));
198+
}
199+
200+
@Test
201+
protected void dailyStatistics() throws InterruptedException {
202+
for (int i = 0; i < 28; i++) {
203+
executionRepository.save(builder(
204+
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
205+
i < 15 ? null : "second"
206+
).build());
207+
}
208+
209+
// mysql need some time ...
210+
Thread.sleep(500);
211+
212+
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
213+
"*",
214+
ZonedDateTime.now().minusDays(10),
215+
ZonedDateTime.now(),
216+
false
217+
);
218+
219+
assertThat(result.size(), is(11));
220+
assertThat(result.get(10).getExecutionCounts().size(), is(9));
221+
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
222+
223+
assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L));
224+
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L));
225+
assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(20L));
226+
}
227+
228+
@Test
229+
protected void taskRunsDailyStatistics() {
230+
for (int i = 0; i < 28; i++) {
231+
executionRepository.save(builder(
232+
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
233+
i < 15 ? null : "second"
234+
).build());
235+
}
236+
237+
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
238+
"*",
239+
ZonedDateTime.now().minusDays(10),
240+
ZonedDateTime.now(),
241+
true
242+
);
243+
244+
assertThat(result.size(), is(11));
245+
assertThat(result.get(10).getExecutionCounts().size(), is(9));
246+
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
247+
248+
assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L * 2));
249+
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L * 2));
250+
assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(55L));
251+
}
252+
253+
@SuppressWarnings("OptionalGetWithoutIsPresent")
254+
@Test
255+
protected void executionsCount() throws InterruptedException {
256+
for (int i = 0; i < 28; i++) {
257+
executionRepository.save(builder(
258+
State.Type.SUCCESS,
259+
i < 4 ? "first" : (i < 10 ? "second" : "third")
260+
).build());
261+
}
262+
263+
// mysql need some time ...
264+
Thread.sleep(500);
265+
266+
List<ExecutionCount> result = executionRepository.executionCounts(
267+
List.of(
268+
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "first"),
269+
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "second"),
270+
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "third"),
271+
new Flow(NAMESPACE, "missing")
272+
),
273+
"*",
274+
ZonedDateTime.now().minusDays(10),
275+
ZonedDateTime.now()
276+
);
277+
278+
assertThat(result.size(), is(4));
279+
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(4L));
280+
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("second")).findFirst().get().getCount(), is(6L));
281+
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount(), is(18L));
282+
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount(), is(0L));
283+
}
284+
}

Diff for: core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.google.common.collect.ImmutableList;
5-
import io.micronaut.context.event.ApplicationEventListener;
6-
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
7-
import org.junit.jupiter.api.BeforeEach;
8-
import org.junit.jupiter.api.Test;
95
import io.kestra.core.Helpers;
106
import io.kestra.core.events.CrudEvent;
117
import io.kestra.core.events.CrudEventType;
@@ -20,11 +16,14 @@
2016
import io.kestra.core.tasks.scripts.Bash;
2117
import io.kestra.core.utils.IdUtils;
2218
import io.kestra.core.utils.TestsUtils;
23-
19+
import io.micronaut.context.event.ApplicationEventListener;
20+
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
2421
import jakarta.inject.Inject;
2522
import jakarta.inject.Named;
2623
import jakarta.inject.Singleton;
27-
import javax.validation.ConstraintViolationException;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
2827
import java.io.IOException;
2928
import java.net.URISyntaxException;
3029
import java.util.ArrayList;
@@ -33,13 +32,14 @@
3332
import java.util.Optional;
3433
import java.util.concurrent.CountDownLatch;
3534
import java.util.concurrent.TimeUnit;
35+
import javax.validation.ConstraintViolationException;
3636

3737
import static org.hamcrest.MatcherAssert.assertThat;
3838
import static org.hamcrest.Matchers.is;
3939
import static org.hamcrest.Matchers.nullValue;
4040
import static org.junit.jupiter.api.Assertions.assertThrows;
4141

42-
@MicronautTest
42+
@MicronautTest(transactional = false)
4343
public abstract class AbstractFlowRepositoryTest {
4444
@Inject
4545
protected FlowRepositoryInterface flowRepository;

0 commit comments

Comments
 (0)