Skip to content

Commit

Permalink
fix(cli): flow watcher should compute plugin defaults
Browse files Browse the repository at this point in the history
fixes #6908
  • Loading branch information
loicmathieu committed Jan 29, 2025
1 parent 0661899 commit b190fc5
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
Expand Down Expand Up @@ -36,6 +37,9 @@ public class FileChangedEventListener {
@Inject
private FlowRepositoryInterface flowRepositoryInterface;

@Inject
private PluginDefaultService pluginDefaultService;

@Inject
private YamlParser yamlParser;

Expand Down Expand Up @@ -64,7 +68,7 @@ public FileChangedEventListener(@Nullable FileWatchConfiguration fileWatchConfig

public void startListeningFromConfig() throws IOException, InterruptedException {
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, pluginDefaultService);
List<Path> paths = fileWatchConfiguration.getPaths();
this.setup(paths);

Expand Down Expand Up @@ -107,7 +111,6 @@ public void startListeningFromConfig() throws IOException, InterruptedException
} else {
log.info("File watching is disabled.");
}

}

public void startListening(List<Path> paths) throws IOException, InterruptedException {
Expand Down Expand Up @@ -230,7 +233,8 @@ private void flowToFile(FlowWithSource flow, Path path) {
private Optional<Flow> parseFlow(String content, Path entry) {
try {
Flow flow = yamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
FlowWithSource withPluginDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
modelValidator.validate(withPluginDefault);
return Optional.of(flow);
} catch (ConstraintViolationException e) {
log.warn("Error while parsing flow: {}", entry, e);
Expand Down
28 changes: 16 additions & 12 deletions cli/src/main/java/io/kestra/cli/services/LocalFlowFileWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,36 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.context.annotation.Requires;
import io.kestra.core.services.PluginDefaultService;
import lombok.extern.slf4j.Slf4j;

@Requires(property = "micronaut.io.watch.enabled", value = "true")
@Slf4j
public class LocalFlowFileWatcher implements FlowFilesManager {
private FlowRepositoryInterface flowRepositoryInterface;
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;

public LocalFlowFileWatcher(FlowRepositoryInterface flowRepositoryInterface) {
this.flowRepositoryInterface = flowRepositoryInterface;
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
}

@Override
public FlowWithSource createOrUpdateFlow(Flow flow, String content) {
return flowRepositoryInterface.findById(null, flow.getNamespace(), flow.getId())
.map(previous -> flowRepositoryInterface.update(flow, previous, content, flow))
.orElseGet(() -> flowRepositoryInterface.create(flow, content, flow));
FlowWithSource withDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
return flowRepository.findById(null, flow.getNamespace(), flow.getId())
.map(previous -> flowRepository.update(flow, previous, content, withDefault))
.orElseGet(() -> flowRepository.create(flow, content, withDefault));
}

@Override
public void deleteFlow(FlowWithSource toDelete) {
flowRepositoryInterface.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepositoryInterface::delete);
log.error("Flow {} has been deleted", toDelete.getId());
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", toDelete.getId());
}

@Override
public void deleteFlow(String tenantId, String namespace, String id) {
flowRepositoryInterface.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepositoryInterface::delete);
log.error("Flow {} has been deleted", id);
flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package io.kestra.cli.services;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.checkerframework.checker.units.qual.A;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
class FileChangedEventListenerTest {
public static final String FILE_WATCH = "build/file-watch";
@Inject
private FileChangedEventListener fileWatcher;

@Inject
private FlowRepositoryInterface flowRepository;

private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean started = new AtomicBoolean(false);

@BeforeAll
static void setup() throws IOException {
if (!Files.exists(Path.of(FILE_WATCH))) {
Files.createDirectories(Path.of(FILE_WATCH));
}
}

@AfterAll
static void tearDown() throws IOException {
if (Files.exists(Path.of(FILE_WATCH))) {
FileUtils.deleteDirectory(Path.of(FILE_WATCH).toFile());
}
}

@BeforeEach
void beforeEach() throws Exception {
if (started.compareAndSet(false, true)) {
executorService.execute(throwRunnable(() -> fileWatcher.startListeningFromConfig()));
}
}

@Test
void test() throws IOException, TimeoutException {
// remove the flow if it already exists
flowRepository.findByIdWithSource(null, "company.team", "myflow").ifPresent(flow -> flowRepository.delete(flow));

// create a basic flow
String flow = """
id: myflow
namespace: company.team
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
""";
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(null, "company.team", "myflow").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(null, "company.team", "myflow").orElseThrow();
assertThat(myflow.getTasks(), hasSize(1));
assertThat(myflow.getTasks().getFirst().getId(), is("hello"));
assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));

// delete the flow
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
Await.until(

Check failure on line 90 in cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

FileChangedEventListenerTest.test()

java.util.concurrent.TimeoutException: Await failed to terminate within PT10S.
Raw output
java.util.concurrent.TimeoutException: Await failed to terminate within PT10S.
	at io.kestra.core.utils.Await.until(Await.java:42)
	at io.kestra.core.utils.Await.until(Await.java:31)
	at io.kestra.cli.services.FileChangedEventListenerTest.test(FileChangedEventListenerTest.java:90)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
() -> flowRepository.findById(null, "company.team", "myflow").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}

@Test
void testWithPluginDefault() throws IOException, TimeoutException {
// remove the flow if it already exists
flowRepository.findByIdWithSource(null, "company.team", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));

// create a flow with plugin default
String pluginDefault = """
id: pluginDefault
namespace: company.team
tasks:
- id: helloWithDefault
type: io.kestra.plugin.core.log.Log
pluginDefaults:
- type: io.kestra.plugin.core.log.Log
values:
message: Hello World!
""";
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
Await.until(

Check failure on line 117 in cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

FileChangedEventListenerTest.testWithPluginDefault()

java.util.concurrent.TimeoutException: Await failed to terminate within PT10S.
Raw output
java.util.concurrent.TimeoutException: Await failed to terminate within PT10S.
	at io.kestra.core.utils.Await.until(Await.java:42)
	at io.kestra.core.utils.Await.until(Await.java:31)
	at io.kestra.cli.services.FileChangedEventListenerTest.testWithPluginDefault(FileChangedEventListenerTest.java:117)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
() -> flowRepository.findById(null, "company.team", "pluginDefault").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(null, "company.team", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks(), hasSize(1));
assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault"));
assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));

// delete both files
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
Await.until(
() -> flowRepository.findById(null, "company.team", "pluginDefault").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}
}
12 changes: 12 additions & 0 deletions cli/src/test/resources/application-file-watch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
micronaut:
io:
watch:
enabled: true
paths:
- build/file-watch

kestra:
repository:
type: memory
queue:
type: memory

0 comments on commit b190fc5

Please sign in to comment.