diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowListeners.java b/core/src/main/java/io/kestra/core/runners/FlowListeners.java similarity index 95% rename from runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowListeners.java rename to core/src/main/java/io/kestra/core/runners/FlowListeners.java index 2782acd63cd..4531ad63149 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryFlowListeners.java +++ b/core/src/main/java/io/kestra/core/runners/FlowListeners.java @@ -1,4 +1,4 @@ -package io.kestra.runner.memory; +package io.kestra.core.runners; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -20,8 +20,7 @@ @Singleton @Slf4j -@MemoryQueueEnabled -public class MemoryFlowListeners implements FlowListenersInterface { +public class FlowListeners implements FlowListenersInterface { private static final ObjectMapper MAPPER = JacksonMapper.ofJson(); private static final TypeReference> TYPE_REFERENCE = new TypeReference<>(){}; @@ -32,7 +31,7 @@ public class MemoryFlowListeners implements FlowListenersInterface { private final List>> consumers = new ArrayList<>(); @Inject - public MemoryFlowListeners( + public FlowListeners( FlowRepositoryInterface flowRepository, @Named(QueueFactoryInterface.FLOW_NAMED) QueueInterface flowQueue ) { diff --git a/core/src/test/java/io/kestra/core/runners/FlowListenersTest.java b/core/src/test/java/io/kestra/core/runners/FlowListenersTest.java index b8c93f14749..53a4da18edd 100644 --- a/core/src/test/java/io/kestra/core/runners/FlowListenersTest.java +++ b/core/src/test/java/io/kestra/core/runners/FlowListenersTest.java @@ -7,7 +7,6 @@ import io.kestra.core.services.FlowListenersInterface; import io.kestra.core.tasks.debugs.Return; import io.kestra.core.utils.IdUtils; -import io.kestra.runner.memory.MemoryFlowListeners; import java.util.Collections; import java.util.concurrent.CountDownLatch; @@ -18,7 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -@MicronautTest +@MicronautTest(transactional = false) abstract public class FlowListenersTest { @Inject protected FlowRepositoryInterface flowRepository; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java index ac2d98478fd..fb00bf3b99e 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java @@ -6,7 +6,7 @@ import io.kestra.core.models.flows.State; import io.kestra.core.models.triggers.Trigger; import io.kestra.core.models.triggers.types.Schedule; -import io.kestra.runner.memory.MemoryFlowListeners; +import io.kestra.core.runners.FlowListeners; import jakarta.inject.Inject; import org.junit.jupiter.api.Test; @@ -26,7 +26,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest { @Inject - protected MemoryFlowListeners flowListenersService; + protected FlowListeners flowListenersService; @Inject protected SchedulerTriggerStateInterface triggerState; @@ -58,7 +58,7 @@ private static Flow createScheduleFlow() { @Test void schedule() throws Exception { // mock flow listeners - MemoryFlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState); CountDownLatch queueCount = new CountDownLatch(4); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java index 231075502f2..096fdbdd1da 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java @@ -1,11 +1,12 @@ package io.kestra.core.schedulers; -import org.junit.jupiter.api.Test; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; import io.kestra.core.models.triggers.types.Schedule; -import io.kestra.runner.memory.MemoryFlowListeners; +import io.kestra.core.runners.FlowListeners; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; @@ -15,8 +16,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import jakarta.inject.Inject; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; @@ -24,7 +23,7 @@ class SchedulerScheduleTest extends AbstractSchedulerTest { @Inject - protected MemoryFlowListeners flowListenersService; + protected FlowListeners flowListenersService; @Inject protected SchedulerTriggerStateInterface triggerState; @@ -58,7 +57,7 @@ private static ZonedDateTime date(int minus) { @Test void schedule() throws Exception { // mock flow listeners - MemoryFlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState); CountDownLatch queueCount = new CountDownLatch(5); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java index 8d10c4e97f0..62877cf1199 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java @@ -1,11 +1,12 @@ package io.kestra.core.schedulers; -import io.kestra.core.models.flows.TaskDefault; -import org.junit.jupiter.api.Test; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; -import io.kestra.runner.memory.MemoryFlowListeners; +import io.kestra.core.models.flows.TaskDefault; +import io.kestra.core.runners.FlowListeners; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; @@ -15,8 +16,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import jakarta.inject.Inject; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; @@ -24,7 +23,7 @@ public class SchedulerThreadTest extends AbstractSchedulerTest { @Inject - protected MemoryFlowListeners flowListenersService; + protected FlowListeners flowListenersService; @Inject protected SchedulerTriggerStateInterface triggerState; @@ -49,7 +48,7 @@ public static Flow createThreadFlow() { @Test void thread() throws Exception { // mock flow listeners - MemoryFlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState); CountDownLatch queueCount = new CountDownLatch(2); diff --git a/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlFlowListenersTest.java b/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlFlowListenersTest.java new file mode 100644 index 00000000000..53f81be4560 --- /dev/null +++ b/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlFlowListenersTest.java @@ -0,0 +1,27 @@ +package io.kestra.runner.mysql; + +import io.kestra.core.runners.FlowListeners; +import io.kestra.core.runners.FlowListenersTest; +import io.kestra.jdbc.JdbcTestUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class MysqlFlowListenersTest extends FlowListenersTest { + @Inject + FlowListeners flowListenersService; + + @Inject + JdbcTestUtils jdbcTestUtils; + + @Test + public void all() { + this.suite(flowListenersService); + } + + @BeforeEach + protected void init() { + jdbcTestUtils.drop(); + jdbcTestUtils.migrate(); + } +} \ No newline at end of file diff --git a/jdbc-postgres/build.gradle b/jdbc-postgres/build.gradle index 7e33d4abf7c..a24720b1bd0 100644 --- a/jdbc-postgres/build.gradle +++ b/jdbc-postgres/build.gradle @@ -10,5 +10,6 @@ dependencies { testImplementation project(':core').sourceSets.test.output testImplementation project(':jdbc').sourceSets.test.output testImplementation project(':runner-memory') + testImplementation project(':storage-local') testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1' } diff --git a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresFlowListenersTest.java b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresFlowListenersTest.java new file mode 100644 index 00000000000..0f7baf915ea --- /dev/null +++ b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresFlowListenersTest.java @@ -0,0 +1,27 @@ +package io.kestra.runner.postgres; + +import io.kestra.core.runners.FlowListeners; +import io.kestra.core.runners.FlowListenersTest; +import io.kestra.jdbc.JdbcTestUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class PostgresFlowListenersTest extends FlowListenersTest { + @Inject + FlowListeners flowListenersService; + + @Inject + JdbcTestUtils jdbcTestUtils; + + @Test + public void all() { + this.suite(flowListenersService); + } + + @BeforeEach + protected void init() { + jdbcTestUtils.drop(); + jdbcTestUtils.migrate(); + } +} \ No newline at end of file diff --git a/jdbc-postgres/src/test/resources/application.yml b/jdbc-postgres/src/test/resources/application.yml index f4c0b53afb8..3f12fafa2a0 100644 --- a/jdbc-postgres/src/test/resources/application.yml +++ b/jdbc-postgres/src/test/resources/application.yml @@ -18,6 +18,10 @@ kestra: type: postgres repository: type: postgres + storage: + type: local + local: + base-path: /tmp/unittest jdbc: tables: diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java index edf8c82b146..f9e57348281 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java @@ -1,6 +1,8 @@ package io.kestra.runner.kafka; +import io.kestra.core.runners.FlowListeners; import io.kestra.runner.kafka.services.*; +import io.micronaut.context.annotation.Replaces; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -28,6 +30,7 @@ @Singleton @Slf4j @KafkaQueueEnabled +@Replaces(FlowListeners.class) public class KafkaFlowListeners implements FlowListenersInterface { private final KafkaAdminService kafkaAdminService; private final KafkaStreamService kafkaStreamService; diff --git a/runner-memory/src/test/java/io/kestra/runner/memory/MemoryFlowListenersTest.java b/runner-memory/src/test/java/io/kestra/runner/memory/MemoryFlowListenersTest.java index 8615b488c79..9e14ed7150a 100644 --- a/runner-memory/src/test/java/io/kestra/runner/memory/MemoryFlowListenersTest.java +++ b/runner-memory/src/test/java/io/kestra/runner/memory/MemoryFlowListenersTest.java @@ -1,5 +1,6 @@ package io.kestra.runner.memory; +import io.kestra.core.runners.FlowListeners; import org.junit.jupiter.api.Test; import io.kestra.core.runners.FlowListenersTest; @@ -7,7 +8,7 @@ class MemoryFlowListenersTest extends FlowListenersTest { @Inject - MemoryFlowListeners flowListenersService; + FlowListeners flowListenersService; @Test public void all() {