Skip to content

Commit

Permalink
Feat Search:
Browse files Browse the repository at this point in the history
Create operation enum for search
Add custom parser for filter[field][operation]=value format
create new Annotation @QueryFilterFormat to bind to specfic pojo

Feat filter:
Implement new abstract rapo method
Implement the find elasticsearch implementation

Feat filter WIP:
Implement the find search method of the flow repository

Feat filter WIP:
Create new enum for all fields
Add the list of fields with their allowed operation

Feat filter WIP:
expose list fields with their supported operation in the config api

Feat filter WIP:
Handle scope and lables in the query filters mapper

Feat Filter (WIP):
change the binder parser to handle key value format
change all /search endpoint
change all filter jdbc implementation based on the filters list

Feat Filter (WIP):
Add the @inherited annotation to propagate to the override methods

Feat Filter (WIP):
Fix the abstract unit tests

Feat Filter (WIP):
Handle the state.type fields

Feat Filter (WIP):
Handle the childfilter fields
handle to snake case column name

Feat Filter (UNIT TEST REFACTORING):
handle to snake case column names

Feat Filter (UNIT TEST REFACTORING):
refacto columnName with DSL.quoteName

Feat Search (Fix Unit Test) :
refactor the geenric method filter repo
use the specific labels condition

Feat Search (Fix Unit Test) :

use the specific labels condition for execution lables

Feat Search (Fix Unit Test) :

use the specific labels condition for execution labels

Feat Search (Fix Unit Test) :

use the specific labels condition for execution labels

Refactoring:
code clean up

Unit test Fix:
Fix the specific log level filter

Unit test Fix:
Refactor Execution controller to handle time range and dates

Unit test Fix:
Refactor task controller to handle time range and dates

Unit test Fix:
Refactor flow controller test for the search endpoint

Unit test Fix:
Refactor log controller unit test for the new /search endpoint

Refactor:
Add method extract query from list filters

Change the scope list to mutable

Feat Filter:
Add scope handling in jdbc query

Feat Filter:
fix cast to ChildFilter

Feat Filter:
Synchronise with develop

Feat Filter:
Rollback deprecated params for all /search endpoints

Feat Filters:
Fix the received list of values in filter params

Feat Filters:
Add unit test for utils classes

Feat Filters:
Remove unused query params

Refacto Filter:
Create new QueryFilter utils class
add deprecated on all fields /Search endpoint
  • Loading branch information
aeSouid committed Feb 3, 2025
1 parent 26c15a4 commit 7b083c8
Show file tree
Hide file tree
Showing 44 changed files with 1,754 additions and 206 deletions.
250 changes: 250 additions & 0 deletions core/src/main/java/io/kestra/core/models/QueryFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package io.kestra.core.models;

import io.kestra.core.utils.Enums;
import lombok.Builder;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

@Builder
public record QueryFilter(
Field field,
Op operation,
Object value
) {
public enum Op {
EQUALS("$eq"),
NOT_EQUALS("$ne"),
GREATER_THAN("$gte"),
LESS_THAN("$lte"),
IN("$in"),
NOT_IN("$notIn"),
STARTS_WITH("$startsWith"),
ENDS_WITH("$endsWith"),
CONTAINS("$contains"),
REGEX("$regex");

private static final Map<String, Op> BY_VALUE = Arrays.stream(values())
.collect(Collectors.toMap(Op::value, Function.identity()));

private final String value;

Op(String value) {
this.value = value;
}

public static Op fromString(String value) {
return Enums.fromString(value, BY_VALUE, "operation");
}

public String value() {
return value;
}
}

public enum Field {
QUERY("q") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.REGEX);
}
},
SCOPE("scope") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
NAMESPACE("namespace") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
LABELS("labels") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
FLOW_ID("flowId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.IN, Op.NOT_IN);
}
},
START_DATE("startDate") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
END_DATE("endDate") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
STATE("state") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
}
},
TIME_RANGE("timeRange") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH,
Op.ENDS_WITH, Op.IN, Op.NOT_IN, Op.REGEX);
}
},
TRIGGER_EXECUTION_ID("triggerExecutionId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
TRIGGER_ID("triggerId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
CHILD_FILTER("childFilter") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
WORKER_ID("workerId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
EXISTING_ONLY("existingOnly") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
MIN_LEVEL("level") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
};

private static final Map<String, Field> BY_VALUE = Arrays.stream(values())
.collect(Collectors.toMap(Field::value, Function.identity()));

public abstract List<Op> supportedOp();

private final String value;

Field(String value) {
this.value = value;
}

public static Field fromString(String value) {
return Enums.fromString(value, BY_VALUE, "field");
}

public String value() {
return value;
}
}


public enum Resource {
FLOW {
@Override
public List<Field> supportedField() {
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE);
}
},
NAMESPACE {
@Override
public List<Field> supportedField() {
return List.of(Field.EXISTING_ONLY);
}
},
EXECUTION {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE, Field.TIME_RANGE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE
);
}
},
LOG {
@Override
public List<Field> supportedField() {
return List.of(Field.NAMESPACE, Field.START_DATE, Field.END_DATE,
Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
);
}
},
TASK {
@Override
public List<Field> supportedField() {
return List.of(Field.NAMESPACE, Field.QUERY, Field.END_DATE, Field.FLOW_ID, Field.START_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER
);
}
},
TEMPLATE {
@Override
public List<Field> supportedField() {
return List.of(Field.NAMESPACE, Field.QUERY);
}
},
TRIGGER {
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID
);
}
};

public abstract List<Field> supportedField();

/**
* Converts {@code Resource} enums to a list of {@code ResourceField},
* including fields and their supported operations.
*
* @return List of {@code ResourceField} with resource names, fields, and operations.
*/
public static List<ResourceField> asResourceList() {
return Arrays.stream(values())
.map(Resource::toResourceField)
.toList();
}

private static ResourceField toResourceField(Resource resource) {
List<FieldOp> fieldOps = resource.supportedField().stream()
.map(Resource::toFieldInfo)
.toList();
return new ResourceField(resource.name().toLowerCase(), fieldOps);
}

private static FieldOp toFieldInfo(Field field) {
List<Operation> operations = field.supportedOp().stream()
.map(Resource::toOperation)
.toList();
return new FieldOp(field.name().toLowerCase(), field.value(), operations);
}

private static Operation toOperation(Op op) {
return new Operation(op.name(), op.value());
}
}

public record ResourceField(String name, List<FieldOp> fields) {}
public record FieldOp(String name, String value, List<Operation> operations) {}
public record Operation(String name, String value) {}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.repositories;

import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
Expand Down Expand Up @@ -59,19 +60,9 @@ default Optional<Execution> findById(String tenantId, String id) {

ArrayListTotal<Execution> find(
Pageable pageable,
@Nullable String query,
@Nullable String tenantId,
@Nullable List<FlowScope> scope,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels,
@Nullable String triggerExecutionId,
@Nullable ChildFilter childFilter
@Nullable List<QueryFilter> filters
);

default Flux<Execution> find(
@Nullable String query,
@Nullable String tenantId,
Expand Down Expand Up @@ -103,18 +94,11 @@ Flux<Execution> find(
boolean allowDeleted
);


ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> states,
@Nullable Map<String, String> labels,
@Nullable String triggerExecutionId,
@Nullable ChildFilter childFilter
List<QueryFilter> filters
);

Execution delete(Execution execution);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.repositories;

import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
Expand Down Expand Up @@ -143,6 +144,12 @@ ArrayListTotal<Flow> find(
@Nullable Map<String, String> labels
);

ArrayListTotal<Flow> find(
Pageable pageable,
@Nullable String tenantId,
@Nullable List<QueryFilter> filters
);

List<FlowWithSource> findWithSource(
@Nullable String query,
@Nullable String tenantId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.repositories;

import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.statistics.LogStatistics;
Expand Down Expand Up @@ -76,15 +77,9 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry

ArrayListTotal<LogEntry> find(
Pageable pageable,
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace,
@Nullable String flowId,
@Nullable String triggerId,
@Nullable Level minLevel,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
);
List<QueryFilter> filters
);

Flux<LogEntry> findAsync(
@Nullable String tenantId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.repositories;

import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
Expand Down Expand Up @@ -29,6 +30,7 @@ public interface TriggerRepositoryInterface {
Trigger lock(String triggerUid, Function<Trigger, Trigger> function);

ArrayListTotal<Trigger> find(Pageable from, String query, String tenantId, String namespace, String flowId, String workerId);
ArrayListTotal<Trigger> find(Pageable from, String tenantId, List<QueryFilter> filters);

/**
* Counts the total number of triggers.
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/io/kestra/core/utils/Enums.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ public static <T extends Enum<T>> Set<T> allExcept(final @NotNull Class<T> enumT
.collect(Collectors.toSet());
}

/**
* Converts a string to its corresponding enum value based on a provided mapping.
*
* @param value The string representation of the enum value.
* @param mapping A map of string values to enum constants.
* @param typeName A descriptive name of the enum type (used in error messages).
* @param <T> The type of the enum.
* @return The corresponding enum constant.
* @throws IllegalArgumentException If the string does not match any enum value.
*/
public static <T extends Enum<T>> T fromString(String value, Map<String, T> mapping, String typeName) {
return Optional.ofNullable(mapping.get(value))
.orElseThrow(() -> new IllegalArgumentException(
"Unsupported %s '%s'. Expected one of: %s".formatted(typeName, value, mapping.keySet())
));
}

private Enums() {
}
Expand Down
Loading

0 comments on commit 7b083c8

Please sign in to comment.