From 06337f5b2c7dab13024ef63429ed80cfe19ec02f Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Mon, 13 Jun 2022 12:52:59 +0200 Subject: [PATCH] feat(jdbc): introduce h2 in memory runner --- cli/build.gradle | 1 + .../MultipleConditionWindow.java | 3 - .../AbstractExecutionRepositoryTest.java | 10 +- jdbc-h2/build.gradle | 15 ++ .../repository/h2/H2ExecutionRepository.java | 26 +++ .../repository/h2/H2FlowRepository.java | 29 +++ .../kestra/repository/h2/H2LogRepository.java | 25 +++ .../io/kestra/repository/h2/H2Repository.java | 81 +++++++ .../repository/h2/H2RepositoryEnabled.java | 14 ++ .../repository/h2/H2TemplateRepository.java | 25 +++ .../repository/h2/H2TriggerRepository.java | 16 ++ .../runner/h2/H2ExecutorStateStorage.java | 15 ++ .../java/io/kestra/runner/h2/H2Functions.java | 63 ++++++ .../runner/h2/H2MultipleConditionStorage.java | 15 ++ .../java/io/kestra/runner/h2/H2Queue.java | 74 +++++++ .../io/kestra/runner/h2/H2QueueEnabled.java | 14 ++ .../io/kestra/runner/h2/H2QueueFactory.java | 108 ++++++++++ .../h2/H2WorkerTaskExecutionStorage.java | 15 ++ .../kestra/runner/h2/H2WorkerTaskQueue.java | 40 ++++ .../runner/h2/H2lExecutionDelayStorage.java | 15 ++ .../resources/migrations/h2/V1__initial.sql | 197 ++++++++++++++++++ .../h2/H2ExecutionRepositoryTest.java | 24 +++ .../repository/h2/H2LogRepositoryTest.java | 7 + .../h2/H2TemplateRepositoryTest.java | 7 + .../h2/H2TriggerRepositoryTest.java | 7 + .../repository/h2/H2lFlowRepositoryTest.java | 7 + .../kestra/runner/h2/H2FlowListenersTest.java | 27 +++ .../io/kestra/runner/h2/H2FunctionsTest.java | 39 ++++ .../io/kestra/runner/h2/H2JdbcTestUtils.java | 31 +++ .../h2/H2MultipleConditionStorageTest.java | 35 ++++ .../java/io/kestra/runner/h2/H2QueueTest.java | 7 + .../io/kestra/runner/h2/H2RunnerTest.java | 7 + .../h2/H2WorkerTaskExecutionStorageTest.java | 7 + .../h2/H2SchedulerScheduleTest.java | 17 ++ jdbc-h2/src/test/resources/application.yml | 55 +++++ jdbc-h2/src/test/resources/logback.xml | 10 + .../org.mockito.plugins.MockMaker | 1 + .../repository/mysql/MysqlFlowRepository.java | 6 - .../repository/mysql/MysqlLogRepository.java | 5 +- .../mysql/MysqlTemplateRepository.java | 2 +- .../mysql/MysqlTemplateRepositoryTest.java | 6 + .../postgres/PostgresFlowRepository.java | 6 - .../postgres/PostgresRepository.java | 9 +- .../kestra/runner/postgres/PostgresQueue.java | 5 +- .../migrations/postgres/V1__initial.sql | 2 +- .../kestra/jdbc/AbstractJdbcRepository.java | 42 +++- .../java/io/kestra/jdbc/JooqSettings.java | 4 +- .../AbstractExecutionRepository.java | 96 +++++---- .../repository/AbstractFlowRepository.java | 64 +++--- .../repository/AbstractLogRepository.java | 22 +- .../jdbc/repository/AbstractRepository.java | 11 +- .../AbstractTemplateRepository.java | 18 +- .../repository/AbstractTriggerRepository.java | 10 +- .../runner/AbstractExecutionDelayStorage.java | 4 +- .../runner/AbstractExecutorStateStorage.java | 5 +- .../AbstractJdbcMultipleConditionStorage.java | 14 +- .../AbstractWorkerTaskExecutionStorage.java | 4 +- .../java/io/kestra/jdbc/runner/JdbcQueue.java | 11 +- .../kestra/jdbc/runner/JdbcRunnerEnabled.java | 2 +- .../jdbc/AbstractJdbcRepositoryTest.java | 44 ++++ .../java/io/kestra/jdbc/JdbcTestUtils.java | 7 +- .../AbstractJdbcTemplateRepositoryTest.java | 1 - settings.gradle | 1 + 63 files changed, 1326 insertions(+), 164 deletions(-) create mode 100644 jdbc-h2/build.gradle create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2FlowRepository.java create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2LogRepository.java create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2Repository.java create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2RepositoryEnabled.java create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2TemplateRepository.java create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2TriggerRepository.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2ExecutorStateStorage.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2Functions.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2MultipleConditionStorage.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2Queue.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueEnabled.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerTaskExecutionStorage.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2WorkerTaskQueue.java create mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2lExecutionDelayStorage.java create mode 100644 jdbc-h2/src/main/resources/migrations/h2/V1__initial.sql create mode 100644 jdbc-h2/src/test/java/io/kestra/repository/h2/H2ExecutionRepositoryTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/repository/h2/H2LogRepositoryTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/repository/h2/H2TemplateRepositoryTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/repository/h2/H2TriggerRepositoryTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/repository/h2/H2lFlowRepositoryTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2FlowListenersTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2FunctionsTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2JdbcTestUtils.java create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2MultipleConditionStorageTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2QueueTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2RunnerTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2WorkerTaskExecutionStorageTest.java create mode 100644 jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java create mode 100644 jdbc-h2/src/test/resources/application.yml create mode 100644 jdbc-h2/src/test/resources/logback.xml create mode 100644 jdbc-h2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker create mode 100644 jdbc/src/test/java/io/kestra/jdbc/AbstractJdbcRepositoryTest.java diff --git a/cli/build.gradle b/cli/build.gradle index 3f648df3668..a784377e4eb 100644 --- a/cli/build.gradle +++ b/cli/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation project(":indexer-kafka-elasticsearch") implementation project(":jdbc") + implementation project(":jdbc-h2") implementation project(":jdbc-mysql") implementation project(":jdbc-postgres") diff --git a/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionWindow.java b/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionWindow.java index 41da1db49b5..e0220c8a252 100644 --- a/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionWindow.java +++ b/core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleConditionWindow.java @@ -20,11 +20,8 @@ public class MultipleConditionWindow { String conditionId; - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm[:ss][.SSS]XXX") ZonedDateTime start; - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm[:ss][.SSS]XXX") ZonedDateTime end; Map results; diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java index b0ab46e87ca..077ed2ab536 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java @@ -169,14 +169,14 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException { DailyExecutionStatistics full = result.get("io.kestra.unittest").get(FLOW).get(10); DailyExecutionStatistics second = result.get("io.kestra.unittest").get("second").get(10); - assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(full.getDuration().getAvg().toMillis(), greaterThan(0L)); assertThat(full.getExecutionCounts().size(), is(9)); assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L)); assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L)); assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(7L)); assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L)); - assertThat(second.getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(second.getDuration().getAvg().toMillis(), greaterThan(0L)); assertThat(second.getExecutionCounts().size(), is(9)); assertThat(second.getExecutionCounts().get(State.Type.SUCCESS), is(13L)); assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L)); @@ -193,7 +193,7 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException { assertThat(result.size(), is(1)); assertThat(result.get("io.kestra.unittest").size(), is(1)); full = result.get("io.kestra.unittest").get("*").get(10); - assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(full.getDuration().getAvg().toMillis(), greaterThan(0L)); assertThat(full.getExecutionCounts().size(), is(9)); assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L)); assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L)); @@ -224,7 +224,7 @@ protected void dailyStatistics() throws InterruptedException { assertThat(result.size(), is(11)); assertThat(result.get(10).getExecutionCounts().size(), is(9)); - assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(result.get(10).getDuration().getAvg().toMillis(), greaterThan(0L)); assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L)); assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L)); @@ -251,7 +251,7 @@ protected void taskRunsDailyStatistics() { assertThat(result.size(), is(11)); assertThat(result.get(10).getExecutionCounts().size(), is(9)); - assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(result.get(10).getDuration().getAvg().toMillis(), greaterThan(0L)); assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L * 2)); assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L * 2)); diff --git a/jdbc-h2/build.gradle b/jdbc-h2/build.gradle new file mode 100644 index 00000000000..1f8634ad1c0 --- /dev/null +++ b/jdbc-h2/build.gradle @@ -0,0 +1,15 @@ +publishSonatypePublicationPublicationToSonatypeRepository.enabled = false + +dependencies { + implementation project(":core") + implementation project(":jdbc") + + implementation("io.micronaut.sql:micronaut-jooq") + runtimeOnly("com.h2database:h2:2.1.212") + + testImplementation project(':core').sourceSets.test.output + testImplementation project(':jdbc').sourceSets.test.output + testImplementation project(':runner-memory') + testImplementation project(':storage-local') + testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1' +} diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java new file mode 100644 index 00000000000..64ce8ee975c --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java @@ -0,0 +1,26 @@ +package io.kestra.repository.h2; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.repositories.ExecutionRepositoryInterface; +import io.kestra.jdbc.repository.AbstractExecutionRepository; +import io.kestra.jdbc.runner.AbstractExecutorStateStorage; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.jooq.Condition; + +import java.util.List; + +@Singleton +@H2RepositoryEnabled +public class H2ExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface { + @Inject + public H2ExecutionRepository(ApplicationContext applicationContext, AbstractExecutorStateStorage executorStateStorage) { + super(new H2Repository<>(Execution.class, applicationContext), executorStateStorage); + } + + @Override + protected Condition findCondition(String query) { + return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2FlowRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2FlowRepository.java new file mode 100644 index 00000000000..6b88844d588 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2FlowRepository.java @@ -0,0 +1,29 @@ +package io.kestra.repository.h2; + +import io.kestra.core.models.flows.Flow; +import io.kestra.jdbc.repository.AbstractFlowRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.jooq.Condition; + +import java.util.List; + +@Singleton +@H2RepositoryEnabled +public class H2FlowRepository extends AbstractFlowRepository { + @Inject + public H2FlowRepository(ApplicationContext applicationContext) { + super(new H2Repository<>(Flow.class, applicationContext), applicationContext); + } + + @Override + protected Condition findCondition(String query) { + return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); + } + + @Override + protected Condition findSourceCodeCondition(String query) { + return this.jdbcRepository.fullTextCondition(List.of("source_code"), query); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2LogRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2LogRepository.java new file mode 100644 index 00000000000..654aff5b231 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2LogRepository.java @@ -0,0 +1,25 @@ +package io.kestra.repository.h2; + +import io.kestra.core.models.executions.LogEntry; +import io.kestra.jdbc.repository.AbstractLogRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.jooq.Condition; + +import java.util.List; + +@Singleton +@H2RepositoryEnabled +public class H2LogRepository extends AbstractLogRepository { + @Inject + public H2LogRepository(ApplicationContext applicationContext) { + super(new H2Repository<>(LogEntry.class, applicationContext)); + } + + @Override + protected Condition findCondition(String query) { + return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); + } +} + diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2Repository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2Repository.java new file mode 100644 index 00000000000..96cd360cf11 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2Repository.java @@ -0,0 +1,81 @@ +package io.kestra.repository.h2; + +import io.kestra.core.repositories.ArrayListTotal; +import io.kestra.jdbc.AbstractJdbcRepository; +import io.kestra.jdbc.repository.AbstractRepository; +import io.micronaut.context.ApplicationContext; +import io.micronaut.data.model.Pageable; +import lombok.SneakyThrows; +import org.jooq.*; +import org.jooq.impl.DSL; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class H2Repository extends AbstractJdbcRepository { + public H2Repository(Class cls, ApplicationContext applicationContext) { + super(cls, applicationContext); + } + + @SneakyThrows + public void persist(T entity, DSLContext context, @Nullable Map, Object> fields) { + Map, Object> finalFields = fields == null ? this.persistFields(entity) : fields; + + context + .insertInto(table) + .set(AbstractRepository.field("key"), key(entity)) + .set(finalFields) + .onConflict(AbstractRepository.field("key")) + .doUpdate() + .set(finalFields) + .execute(); + } + + public Condition fullTextCondition(List fields, String query) { + if (query == null || query.equals("*")) { + return DSL.trueCondition(); + } + + if (fields.size() > 1) { + throw new IllegalStateException("Too many fields for h2 '" + fields + "'"); + } + + Field field = AbstractRepository.field(fields.get(0)); + + List match = Arrays + .stream(query.split("\\p{P}|\\p{S}|\\p{Z}")) + .map(s -> field.likeIgnoreCase("%" + s.toUpperCase(Locale.ROOT) + "%")) + .collect(Collectors.toList()); + + if (match.size() == 0) { + return DSL.falseCondition(); + } + + return DSL.and(match); + } + + @SuppressWarnings("unchecked") + public ArrayListTotal fetchPage(DSLContext context, SelectConditionStep select, Pageable pageable, RecordMapper mapper) { + Result results = this.limit( + context.select(DSL.asterisk(), DSL.count().over().as("total_count")) + .from(this + .sort(select, pageable) + .asTable("page") + ) + .where(DSL.trueCondition()), + pageable + ) + .fetch(); + + Integer totalCount = results.size() > 0 ? results.get(0).get("total_count", Integer.class) : 0; + + List map = results + .map((Record record) -> mapper.map((R) record)); + + return new ArrayListTotal<>(map, totalCount); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2RepositoryEnabled.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2RepositoryEnabled.java new file mode 100644 index 00000000000..5eacd0fd440 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2RepositoryEnabled.java @@ -0,0 +1,14 @@ +package io.kestra.repository.h2; + +import io.micronaut.context.annotation.DefaultImplementation; +import io.micronaut.context.annotation.Requires; + +import java.lang.annotation.*; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.PACKAGE, ElementType.TYPE}) +@Requires(property = "kestra.repository.type", value = "h2") +@DefaultImplementation +public @interface H2RepositoryEnabled { +} \ No newline at end of file diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2TemplateRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2TemplateRepository.java new file mode 100644 index 00000000000..b949f2bee33 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2TemplateRepository.java @@ -0,0 +1,25 @@ +package io.kestra.repository.h2; + +import io.kestra.core.models.templates.Template; +import io.kestra.core.repositories.TemplateRepositoryInterface; +import io.kestra.jdbc.repository.AbstractTemplateRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.jooq.Condition; + +import java.util.List; + +@Singleton +@H2RepositoryEnabled +public class H2TemplateRepository extends AbstractTemplateRepository implements TemplateRepositoryInterface { + @Inject + public H2TemplateRepository(ApplicationContext applicationContext) { + super(new H2Repository<>(Template.class, applicationContext), applicationContext); + } + + @Override + protected Condition findCondition(String query) { + return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2TriggerRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2TriggerRepository.java new file mode 100644 index 00000000000..39b987566dc --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2TriggerRepository.java @@ -0,0 +1,16 @@ +package io.kestra.repository.h2; + +import io.kestra.core.models.triggers.Trigger; +import io.kestra.jdbc.repository.AbstractTriggerRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@H2RepositoryEnabled +public class H2TriggerRepository extends AbstractTriggerRepository { + @Inject + public H2TriggerRepository(ApplicationContext applicationContext) { + super(new H2Repository<>(Trigger.class, applicationContext)); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2ExecutorStateStorage.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2ExecutorStateStorage.java new file mode 100644 index 00000000000..6d9d8b4414b --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2ExecutorStateStorage.java @@ -0,0 +1,15 @@ +package io.kestra.runner.h2; + +import io.kestra.jdbc.runner.AbstractExecutorStateStorage; +import io.kestra.jdbc.runner.JdbcExecutorState; +import io.kestra.repository.h2.H2Repository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Singleton; + +@Singleton +@H2QueueEnabled +public class H2ExecutorStateStorage extends AbstractExecutorStateStorage { + public H2ExecutorStateStorage(ApplicationContext applicationContext) { + super(new H2Repository<>(JdbcExecutorState.class, applicationContext)); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2Functions.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2Functions.java new file mode 100644 index 00000000000..ca6da6805c9 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2Functions.java @@ -0,0 +1,63 @@ +package io.kestra.runner.h2; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import io.kestra.core.serializers.JacksonMapper; +import lombok.SneakyThrows; +import net.thisptr.jackson.jq.BuiltinFunctionLoader; +import net.thisptr.jackson.jq.JsonQuery; +import net.thisptr.jackson.jq.Scope; +import net.thisptr.jackson.jq.Versions; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +public class H2Functions { + private static final Scope rootScope; + private static final Scope scope; + + static { + rootScope = Scope.newEmptyScope(); + BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, rootScope); + scope = Scope.newEmptyScope(); + } + + public static Boolean jqBoolean(String value, String expression) { + return H2Functions.jq(value, expression, JsonNode::asBoolean); + } + + public static String jqString(String value, String expression) { + return H2Functions.jq(value, expression, JsonNode::asText); + } + + public static Long jqLong(String value, String expression) { + return H2Functions.jq(value, expression, JsonNode::asLong); + } + + public static Integer jqInteger(String value, String expression) { + return H2Functions.jq(value, expression, JsonNode::asInt); + } + + public static Double jqDouble(String value, String expression) { + return H2Functions.jq(value, expression, JsonNode::asDouble); + } + + @SneakyThrows + private static T jq(String value, String expression, Function function) { + JsonQuery q = JsonQuery.compile(expression, Versions.JQ_1_6); + + final List out = new ArrayList<>(); + JsonNode in = JacksonMapper.ofJson().readTree(value); + + q.apply(scope, in, out::add); + + JsonNode node = out.get(0); + + if (node instanceof NullNode) { + return null; + } else { + return function.apply(node); + } + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2MultipleConditionStorage.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2MultipleConditionStorage.java new file mode 100644 index 00000000000..82394e2b472 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2MultipleConditionStorage.java @@ -0,0 +1,15 @@ +package io.kestra.runner.h2; + +import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; +import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage; +import io.kestra.repository.h2.H2Repository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Singleton; + +@Singleton +@H2QueueEnabled +public class H2MultipleConditionStorage extends AbstractJdbcMultipleConditionStorage { + public H2MultipleConditionStorage(ApplicationContext applicationContext) { + super(new H2Repository<>(MultipleConditionWindow.class, applicationContext)); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2Queue.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2Queue.java new file mode 100644 index 00000000000..0f6b0606770 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2Queue.java @@ -0,0 +1,74 @@ +package io.kestra.runner.h2; + +import io.kestra.jdbc.runner.JdbcQueue; +import io.micronaut.context.ApplicationContext; +import io.micronaut.core.annotation.NonNull; +import org.jooq.DSLContext; +import org.jooq.Record; +import org.jooq.Result; + +import java.util.List; +import java.util.stream.Collectors; + +public class H2Queue extends JdbcQueue { + public H2Queue(Class cls, ApplicationContext applicationContext) { + super(cls, applicationContext); + } + + protected Result receiveFetch(DSLContext ctx, @NonNull Integer offset) { + return ctx + .resultQuery( + "SELECT" + "\n" + + " \"value\"," + "\n" + + " \"offset\"" + "\n" + + "FROM " + table.getName() + "\n" + + "WHERE 1 = 1" + "\n" + + (offset != 0 ? "AND \"offset\" > ?" + "\n" : "") + + "AND \"type\" = ? " + "\n" + + "ORDER BY \"offset\" ASC" + "\n" + + "LIMIT 10" + "\n" + + "FOR UPDATE", + offset != 0 ? offset : this.cls.getName(), + this.cls.getName() + ) + .fetch(); + } + + protected Result receiveFetch(DSLContext ctx, String consumerGroup) { + return ctx + .resultQuery( + "SELECT" + "\n" + + " \"value\"," + "\n" + + " \"offset\"" + "\n" + + "FROM " + table.getName() + "\n" + + "WHERE (" + + " \"consumers\" IS NULL" + "\n" + + " OR NOT(ARRAY_CONTAINS(\"consumers\", ?))" + "\n" + + ")" + "\n" + + "AND \"type\" = ?" + "\n" + + "ORDER BY \"offset\" ASC" + "\n" + + "LIMIT 10" + "\n" + + "FOR UPDATE", + consumerGroup, + this.cls.getName() + ) + .fetch(); + } + + @Override + protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, List offsets) { + ctx + .query( + "UPDATE " + table.getName() + "\n" + + "SET \"consumers\" = COALESCE(\"consumers\", ARRAY[]) || ARRAY['" + consumerGroup + "']\n" + + "WHERE \"offset\" IN (" + + offsets + .stream() + .map(Object::toString) + .collect(Collectors.joining(",")) + + ")", + consumerGroup + ) + .execute(); + } +} diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueEnabled.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueEnabled.java new file mode 100644 index 00000000000..55048f29592 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueEnabled.java @@ -0,0 +1,14 @@ +package io.kestra.runner.h2; + +import io.micronaut.context.annotation.DefaultImplementation; +import io.micronaut.context.annotation.Requires; + +import java.lang.annotation.*; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.PACKAGE, ElementType.TYPE}) +@Requires(property = "kestra.queue.type", value = "h2") +@DefaultImplementation +public @interface H2QueueEnabled { +} \ No newline at end of file diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java new file mode 100644 index 00000000000..7767a66c010 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java @@ -0,0 +1,108 @@ +package io.kestra.runner.h2; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.ExecutionKilled; +import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.templates.Template; +import io.kestra.core.models.triggers.Trigger; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.WorkerTaskQueueInterface; +import io.kestra.core.runners.*; +import io.micronaut.context.ApplicationContext; +import io.micronaut.context.annotation.Factory; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import org.apache.commons.lang3.NotImplementedException; + +@Factory +@H2QueueEnabled +public class H2QueueFactory implements QueueFactoryInterface { + @Inject + ApplicationContext applicationContext; + + @Override + @Singleton + @Named(QueueFactoryInterface.EXECUTION_NAMED) + public QueueInterface execution() { + return new H2Queue<>(Execution.class, applicationContext); + } + + @Override + @Singleton + @Named(QueueFactoryInterface.EXECUTOR_NAMED) + public QueueInterface executor() { + throw new NotImplementedException(); + } + + @Override + @Singleton + @Named(QueueFactoryInterface.WORKERTASK_NAMED) + public QueueInterface workerTask() { + return new H2Queue<>(WorkerTask.class, applicationContext); + } + + @Override + @Singleton + @Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) + public QueueInterface workerTaskResult() { + return new H2Queue<>(WorkerTaskResult.class, applicationContext); + } + + @Override + @Singleton + @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) + public QueueInterface logEntry() { + return new H2Queue<>(LogEntry.class, applicationContext); + } + + @Override + @Singleton + @Named(QueueFactoryInterface.FLOW_NAMED) + public QueueInterface flow() { + return new H2Queue<>(Flow.class, applicationContext); + } + + @Override + @Singleton + @Named(QueueFactoryInterface.KILL_NAMED) + public QueueInterface kill() { + return new H2Queue<>(ExecutionKilled.class, applicationContext); + } + + @Override + @Singleton + @Named(QueueFactoryInterface.TEMPLATE_NAMED) + public QueueInterface