Skip to content

Commit

Permalink
feat(task): introduce new log.Fetch task (#1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye authored Mar 16, 2023
1 parent cfd8e48 commit 126c39d
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 8 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ subprojects {

task sourcesJar(type: Jar) {
dependsOn = [':core:copyGradleProperties']
dependsOn = [':ui:assembleFrontend']
archiveClassifier.set('sources')
from sourceSets.main.allSource
}
Expand Down
128 changes: 128 additions & 0 deletions core/src/main/java/io/kestra/core/tasks/log/Fetch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package io.kestra.core.tasks.log;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.event.Level;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Output execution logs in a file.",
description = "This task is useful to propagate your logs."
)
@Plugin(
examples = {
@Example(
code = {
"level: ERROR",
}
),
@Example(
code = {
"level: WARN",
"tasksId: ",
" - \"previous-task-id\""
}
),
@Example(
code = {
"level: WARN",
"executionId: \"{{execution.id}}\""
}
)
}
)
public class Fetch extends Task implements RunnableTask<Fetch.Output> {
@Schema(
title = "Filter on specific execution",
description = "If not set, will use the current execution"
)
@PluginProperty(dynamic = true)
private String executionId;

@Schema(
title = "Filter on specific task(s)"
)
@PluginProperty
private Collection<String> tasksId;

@Schema(
title = "Minimum log level you want to fetch"
)
@Builder.Default
@PluginProperty
private Level level = Level.INFO;

@Override
public Output run(RunContext runContext) throws Exception {
String executionId = this.executionId != null ? runContext.render(this.executionId) : (String) new HashMap<>((Map<String, Object>) runContext.getVariables().get("execution")).get("id");
LogRepositoryInterface logRepository = runContext.getApplicationContext().getBean(LogRepositoryInterface.class);

File tempFile = runContext.tempFile(".ion").toFile();
AtomicLong count = new AtomicLong();

try (OutputStream output = new FileOutputStream(tempFile)) {
if (this.tasksId != null) {
for (String taskId : tasksId) {
logRepository
.findByExecutionIdAndTaskId(executionId, taskId, level)
.forEach(throwConsumer(log -> {
count.incrementAndGet();
FileSerde.write(output, log);
}));
}
} else {
logRepository
.findByExecutionId(executionId, level)
.forEach(throwConsumer(log -> {
count.incrementAndGet();
FileSerde.write(output, log);
}));
}
}

return Output
.builder()
.uri(runContext.putTempFile(tempFile))
.size(count.get())
.build();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The size of the fetched rows"
)
private Long size;

@Schema(
title = "The uri of stored results",
description = "File format is ion"
)
private URI uri;
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 58;
public static long FLOWS_COUNT = 61;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/java/io/kestra/core/tasks/FetchTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.kestra.core.tasks;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class FetchTest extends AbstractMemoryRunnerTest {
@Inject
FlowRepositoryInterface flowRepository;

@Test
void fetch() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(2));
}

@Test
void fetchWithTaskId() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log-taskid");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(1));
}

@Test
void fetchWithExecutionId() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "get-log-executionid");

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
TaskRun fetch = execution.getTaskRunList().get(2);
assertThat(fetch.getOutputs().get("size"), is(2));
}
}
12 changes: 12 additions & 0 deletions core/src/test/resources/flows/valids/get-log-executionid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
id: get-log-executionid
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.log.Fetch
id: get-log-task
executionId: "{{execution.id}}"
13 changes: 13 additions & 0 deletions core/src/test/resources/flows/valids/get-log-taskid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
id: get-log-taskid
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.log.Fetch
id: get-log-task
tasksId:
- task-1
11 changes: 11 additions & 0 deletions core/src/test/resources/flows/valids/get-log.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id: get-log
namespace: io.kestra.tests
tasks:
- type: io.kestra.core.tasks.debugs.Echo
id: task-1
format: task 1
- type: io.kestra.core.tasks.debugs.Echo
id: task-2
format: task 2
- type: io.kestra.core.tasks.log.Fetch
id: get-log-task
2 changes: 1 addition & 1 deletion core/src/test/resources/flows/valids/pause-delay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace: io.kestra.tests
tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
delay: PT1S
delay: PT10S
tasks:
- id: ko
type: io.kestra.core.tasks.scripts.Bash
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.kestra.repository.memory;

import io.kestra.core.models.executions.Execution;
import io.micronaut.data.model.Pageable;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import org.slf4j.event.Level;

import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Singleton;

import javax.annotation.Nullable;
import java.util.stream.Collectors;

@Singleton
@MemoryRepositoryEnabled
Expand All @@ -21,12 +21,18 @@ public class MemoryLogRepository implements LogRepositoryInterface {

@Override
public List<LogEntry> findByExecutionId(String id, Level minLevel) {
throw new UnsupportedOperationException();
return logs
.stream()
.filter(logEntry -> logEntry.getExecutionId().equals(id) && logEntry.getLevel().equals(minLevel))
.collect(Collectors.toList());
}

@Override
public List<LogEntry> findByExecutionIdAndTaskId(String executionId, String taskId, Level minLevel) {
throw new UnsupportedOperationException();
return logs
.stream()
.filter(logEntry -> logEntry.getExecutionId().equals(executionId) && logEntry.getTaskId().equals(taskId) && logEntry.getLevel().equals(minLevel))
.collect(Collectors.toList());
}

@Override
Expand Down

0 comments on commit 126c39d

Please sign in to comment.