Skip to content

Commit

Permalink
feat(jdbc): implementation of log repository
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 7e16b71 commit 94aaad9
Show file tree
Hide file tree
Showing 23 changed files with 523 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.kestra.core.repositories;

import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.utils.IdUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

import java.time.Instant;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest(transactional = false)
public abstract class AbstractLogRepositoryTest {
@Inject
protected LogRepositoryInterface logRepository;

private static LogEntry.LogEntryBuilder logEntry() {
return LogEntry.builder()
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
.taskId("taskId")
.executionId(IdUtils.create())
.taskRunId(IdUtils.create())
.attemptNumber(0)
.timestamp(Instant.now())
.level(Level.INFO)
.thread("")
.message("john doe");
}

@Test
void all() {
LogEntry.LogEntryBuilder builder = logEntry();

ArrayListTotal<LogEntry> find = logRepository.find("*", Pageable.UNPAGED, null);
assertThat(find.size(), is(0));

LogEntry save = logRepository.save(builder.build());

find = logRepository.find("doe", Pageable.UNPAGED, null);
assertThat(find.size(), is(1));
assertThat(find.get(0).getExecutionId(), is(save.getExecutionId()));

find = logRepository.find("*", Pageable.UNPAGED, null);
assertThat(find.size(), is(1));
assertThat(find.get(0).getExecutionId(), is(save.getExecutionId()));

List<LogEntry> list = logRepository.findByExecutionId(save.getExecutionId(), null);
assertThat(list.size(), is(1));
assertThat(list.get(0).getExecutionId(), is(save.getExecutionId()));

list = logRepository.findByExecutionIdAndTaskId(save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size(), is(1));
assertThat(list.get(0).getExecutionId(), is(save.getExecutionId()));


list = logRepository.findByExecutionIdAndTaskRunId(save.getExecutionId(), save.getTaskRunId(), null);
assertThat(list.size(), is(1));
assertThat(list.get(0).getExecutionId(), is(save.getExecutionId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.Arrays;

@Singleton
@MysqlRepositoryEnabled
Expand All @@ -14,4 +17,10 @@ public class MysqlExecutionRepository extends AbstractExecutionRepository implem
public MysqlExecutionRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Execution.class, applicationContext), applicationContext);
}


@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,65 +24,13 @@ public MysqlFlowRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Flow.class, applicationContext), applicationContext);
}

@SuppressWarnings("unchecked")
private <R extends Record> SelectConditionStep<R> fullTextSelect(DSLContext context, List<Field<Object>> field) {
ArrayList<Field<Object>> fields = new ArrayList<>(Collections.singletonList(DSL.field("value")));

if (field != null) {
fields.addAll(field);
}

return (SelectConditionStep<R>) context
.select(fields)
.hint("SQL_CALC_FOUND_ROWS")
.from(lastRevision(false))
.join(jdbcRepository.getTable().as("ft"))
.on(
DSL.field("ft.key").eq(DSL.field("rev.key"))
.and(DSL.field("ft.revision").eq(DSL.field("rev.revision")))
)
.where(this.defaultFilter());
}

public ArrayListTotal<Flow> find(String query, Pageable pageable) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);

SelectConditionStep<Record1<Object>> select = this.fullTextSelect(context, Collections.emptyList());

if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query));
}

return this.jdbcRepository.fetchPage(context, select, pageable);
});

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query);
}

@Override
public ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);

SelectConditionStep<Record> select = this.fullTextSelect(context, Collections.singletonList(DSL.field("source_code")));

if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("source_code"), query));
}

return this.jdbcRepository.fetchPage(
context,
select,
pageable,
record -> new SearchResult<>(
this.jdbcRepository.map(record),
this.jdbcRepository.fragments(query, record.getValue("source_code", String.class))
)
);
});
protected Condition findSourceCodeCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("source_code"), query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.kestra.repository.mysql;

import io.kestra.core.models.executions.LogEntry;
import io.kestra.jdbc.repository.AbstractLogRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.Arrays;

@Singleton
@MysqlRepositoryEnabled
public class MysqlLogRepository extends AbstractLogRepository {
@Inject
public MysqlLogRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(LogEntry.class, applicationContext));
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "flow_id", "task_id", "execution_id", "taskrun_id", "trigger_id", "message", "thread"), query);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.Arrays;

@Singleton
@MysqlRepositoryEnabled
Expand All @@ -14,4 +17,9 @@ public class MysqlTemplateRepository extends AbstractTemplateRepository implemen
public MysqlTemplateRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Template.class, applicationContext), applicationContext);
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "flow_id", "id"), query);
}
}
33 changes: 33 additions & 0 deletions jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,36 @@ CREATE TABLE triggers (
`trigger_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.triggerId') STORED NOT NULL,
INDEX ix_executions_id (namespace, flow_id, trigger_id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


CREATE TABLE logs (
`key` INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
`value` JSON NOT NULL,
`deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`task_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskId') STORED NOT NULL,
`execution_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.executionId') STORED NOT NULL,
`taskrun_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskRunId') STORED,
`attempt_number` INT GENERATED ALWAYS AS (value ->> '$.attemptNumber') STORED NOT NULL,
`trigger_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.triggerId') STORED,
`message` TEXT GENERATED ALWAYS AS (value ->> '$.message') STORED,
`thread` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.thread') STORED,
`level` ENUM(
'ERROR',
'WARN',
'INFO',
'DEBUG',
'TRACE'
) GENERATED ALWAYS AS (value ->> '$.level') STORED NOT NULL,
`timestamp` TIMESTAMP GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.timestamp' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,

INDEX logs_namespace (namespace),
INDEX logs_flowId (flow_id),
INDEX logs_task_id (task_id),
INDEX logs_execution_id (execution_id),
INDEX logs_taskrun_id (taskrun_id),
INDEX logs_trigger_id (trigger_id),
INDEX logs_timestamp (timestamp),
FULLTEXT ix_fulltext (namespace, flow_id, task_id, execution_id, taskrun_id, trigger_id, message, thread)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.repository.mysql;

import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest;

public class MysqlLogRepositoryTest extends AbstractJdbcLogRepositoryTest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.Collections;

@Singleton
@PostgresRepositoryEnabled
Expand All @@ -14,4 +17,9 @@ public class PostgresExecutionRepository extends AbstractExecutionRepository imp
public PostgresExecutionRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Execution.class, applicationContext), applicationContext);
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,63 +23,13 @@ public PostgresFlowRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Flow.class, applicationContext), applicationContext);
}

@SuppressWarnings("unchecked")
private <R extends Record, E> SelectConditionStep<R> fullTextSelect(DSLContext context, List<Field<Object>> field) {
ArrayList<Field<Object>> fields = new ArrayList<>(Collections.singletonList(DSL.field("value")));

if (field != null) {
fields.addAll(field);
}

return (SelectConditionStep<R>) context
.select(fields)
.from(lastRevision(false))
.join(jdbcRepository.getTable().as("ft"))
.on(
DSL.field("ft.key").eq(DSL.field("rev.key"))
.and(DSL.field("ft.revision").eq(DSL.field("rev.revision")))
)
.where(this.defaultFilter());
}

public ArrayListTotal<Flow> find(String query, Pageable pageable) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);

SelectConditionStep<Record1<Object>> select = this.fullTextSelect(context, Collections.emptyList());

if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}

return this.jdbcRepository.fetchPage(context, select, pageable);
});
@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query);
}

@Override
public ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);

SelectConditionStep<Record> select = this.fullTextSelect(context, Collections.singletonList(DSL.field("source_code")));

if (query != null) {
select.and(DSL.condition("source_code @@ TO_TSQUERY('simple', ?)", query));
}

return this.jdbcRepository.fetchPage(
context,
select,
pageable,
record -> new SearchResult<>(
this.jdbcRepository.map(record),
this.jdbcRepository.fragments(query, record.getValue("value", String.class))
)
);
});
protected Condition findSourceCodeCondition(String query) {
return DSL.condition("source_code @@ TO_TSQUERY('simple', ?)", query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.kestra.repository.postgres;

import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.jdbc.repository.AbstractLogRepository;
import io.kestra.jdbc.repository.AbstractTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.Collections;

@Singleton
@PostgresRepositoryEnabled
public class PostgresLogRepository extends AbstractLogRepository implements LogRepositoryInterface {
@Inject
public PostgresLogRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(LogEntry.class, applicationContext));
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query);
}
}
Loading

0 comments on commit 94aaad9

Please sign in to comment.