From 2d98f909de9feea8c49ee7b0a67eded55bfb67ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 29 Jan 2025 12:17:23 +0100 Subject: [PATCH] fix(cli): flow watcher should compute plugin defaults fixes #6908 --- .../services/FileChangedEventListener.java | 100 +++++++------- .../cli/services/LocalFlowFileWatcher.java | 28 ++-- .../FileChangedEventListenerTest.java | 130 ++++++++++++++++++ .../test/resources/application-file-watch.yml | 12 ++ 4 files changed, 212 insertions(+), 58 deletions(-) create mode 100644 cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java create mode 100644 cli/src/test/resources/application-file-watch.yml diff --git a/cli/src/main/java/io/kestra/cli/services/FileChangedEventListener.java b/cli/src/main/java/io/kestra/cli/services/FileChangedEventListener.java index 3e7e146036e..18b399b1642 100644 --- a/cli/src/main/java/io/kestra/cli/services/FileChangedEventListener.java +++ b/cli/src/main/java/io/kestra/cli/services/FileChangedEventListener.java @@ -7,6 +7,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.serializers.YamlParser; import io.kestra.core.services.FlowListenersInterface; +import io.kestra.core.services.PluginDefaultService; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; import io.micronaut.scheduling.io.watch.FileWatchConfiguration; @@ -36,6 +37,9 @@ public class FileChangedEventListener { @Inject private FlowRepositoryInterface flowRepositoryInterface; + @Inject + private PluginDefaultService pluginDefaultService; + @Inject private YamlParser yamlParser; @@ -64,7 +68,7 @@ public FileChangedEventListener(@Nullable FileWatchConfiguration fileWatchConfig public void startListeningFromConfig() throws IOException, InterruptedException { if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) { - this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface); + this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, pluginDefaultService); List paths = fileWatchConfiguration.getPaths(); this.setup(paths); @@ -107,7 +111,6 @@ public void startListeningFromConfig() throws IOException, InterruptedException } else { log.info("File watching is disabled."); } - } public void startListening(List paths) throws IOException, InterruptedException { @@ -118,60 +121,64 @@ public void startListening(List paths) throws IOException, InterruptedExce WatchKey key; while ((key = watchService.take()) != null) { for (WatchEvent watchEvent : key.pollEvents()) { - WatchEvent.Kind kind = watchEvent.kind(); - Path entry = (Path) watchEvent.context(); - - if (entry.toString().endsWith(".yml") || entry.toString().endsWith(".yaml")) { - - if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { - - Path filePath = ((Path) key.watchable()).resolve(entry); - if (Files.isDirectory(filePath)) { - loadFlowsFromFolder(filePath); - } else { - - try { - String content = Files.readString(filePath, Charset.defaultCharset()); - - Optional flow = parseFlow(content, entry); - if (flow.isPresent()) { - if (kind == StandardWatchEventKinds.ENTRY_MODIFY) { - // Check if we already have a file with the given path - if (flows.stream().anyMatch(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()))) { - Optional previous = flows.stream().filter(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())).findFirst(); - // Check if Flow from file has id/namespace updated - if (previous.isPresent() && !previous.get().uidWithoutRevision().equals(flow.get().uidWithoutRevision())) { - flows.removeIf(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())); - flowFilesManager.deleteFlow(previous.get().getTenantId(), previous.get().getNamespace(), previous.get().getId()); + try { + WatchEvent.Kind kind = watchEvent.kind(); + Path entry = (Path) watchEvent.context(); + + if (entry.toString().endsWith(".yml") || entry.toString().endsWith(".yaml")) { + + if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { + + Path filePath = ((Path) key.watchable()).resolve(entry); + if (Files.isDirectory(filePath)) { + loadFlowsFromFolder(filePath); + } else { + + try { + String content = Files.readString(filePath, Charset.defaultCharset()); + + Optional flow = parseFlow(content, entry); + if (flow.isPresent()) { + if (kind == StandardWatchEventKinds.ENTRY_MODIFY) { + // Check if we already have a file with the given path + if (flows.stream().anyMatch(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()))) { + Optional previous = flows.stream().filter(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())).findFirst(); + // Check if Flow from file has id/namespace updated + if (previous.isPresent() && !previous.get().uidWithoutRevision().equals(flow.get().uidWithoutRevision())) { + flows.removeIf(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())); + flowFilesManager.deleteFlow(previous.get().getTenantId(), previous.get().getNamespace(), previous.get().getId()); + flows.add(FlowWithPath.of(flow.get(), filePath.toString())); + } + } else { flows.add(FlowWithPath.of(flow.get(), filePath.toString())); } } else { flows.add(FlowWithPath.of(flow.get(), filePath.toString())); } - } else { - flows.add(FlowWithPath.of(flow.get(), filePath.toString())); + + flowFilesManager.createOrUpdateFlow(flow.get(), content); + log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry); } - flowFilesManager.createOrUpdateFlow(flow.get(), content); - log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry); + } catch (NoSuchFileException e) { + log.error("File not found: {}", entry, e); + } catch (IOException e) { + log.error("Error reading file: {}", entry, e); } - - } catch (NoSuchFileException e) { - log.error("File not found: {}", entry, e); - } catch (IOException e) { - log.error("Error reading file: {}", entry, e); } + } else { + Path filePath = ((Path) key.watchable()).resolve(entry); + flows.stream() + .filter(flow -> flow.getPath().equals(filePath.toString())) + .findFirst() + .ifPresent(flowWithPath -> { + flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId()); + this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision())); + }); } - } else { - Path filePath = ((Path) key.watchable()).resolve(entry); - flows.stream() - .filter(flow -> flow.getPath().equals(filePath.toString())) - .findFirst() - .ifPresent(flowWithPath -> { - flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId()); - this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision())); - }); } + } catch (Exception e) { + log.error("Unexpected error while watching flows", e); } } key.reset(); @@ -230,7 +237,8 @@ private void flowToFile(FlowWithSource flow, Path path) { private Optional parseFlow(String content, Path entry) { try { Flow flow = yamlParser.parse(content, Flow.class); - modelValidator.validate(flow); + FlowWithSource withPluginDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content)); + modelValidator.validate(withPluginDefault); return Optional.of(flow); } catch (ConstraintViolationException e) { log.warn("Error while parsing flow: {}", entry, e); diff --git a/cli/src/main/java/io/kestra/cli/services/LocalFlowFileWatcher.java b/cli/src/main/java/io/kestra/cli/services/LocalFlowFileWatcher.java index 2cf8ee53ce1..a9ba8673967 100644 --- a/cli/src/main/java/io/kestra/cli/services/LocalFlowFileWatcher.java +++ b/cli/src/main/java/io/kestra/cli/services/LocalFlowFileWatcher.java @@ -3,32 +3,36 @@ import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.repositories.FlowRepositoryInterface; -import io.micronaut.context.annotation.Requires; +import io.kestra.core.services.PluginDefaultService; import lombok.extern.slf4j.Slf4j; -@Requires(property = "micronaut.io.watch.enabled", value = "true") @Slf4j public class LocalFlowFileWatcher implements FlowFilesManager { - private FlowRepositoryInterface flowRepositoryInterface; + private final FlowRepositoryInterface flowRepository; + private final PluginDefaultService pluginDefaultService; - public LocalFlowFileWatcher(FlowRepositoryInterface flowRepositoryInterface) { - this.flowRepositoryInterface = flowRepositoryInterface; + public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) { + this.flowRepository = flowRepository; + this.pluginDefaultService = pluginDefaultService; } + @Override public FlowWithSource createOrUpdateFlow(Flow flow, String content) { - return flowRepositoryInterface.findById(null, flow.getNamespace(), flow.getId()) - .map(previous -> flowRepositoryInterface.update(flow, previous, content, flow)) - .orElseGet(() -> flowRepositoryInterface.create(flow, content, flow)); + FlowWithSource withDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content)); + return flowRepository.findById(null, flow.getNamespace(), flow.getId()) + .map(previous -> flowRepository.update(flow, previous, content, withDefault)) + .orElseGet(() -> flowRepository.create(flow, content, withDefault)); } + @Override public void deleteFlow(FlowWithSource toDelete) { - flowRepositoryInterface.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepositoryInterface::delete); - log.error("Flow {} has been deleted", toDelete.getId()); + flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete); + log.info("Flow {} has been deleted", toDelete.getId()); } @Override public void deleteFlow(String tenantId, String namespace, String id) { - flowRepositoryInterface.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepositoryInterface::delete); - log.error("Flow {} has been deleted", id); + flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete); + log.info("Flow {} has been deleted", id); } } diff --git a/cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java b/cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java new file mode 100644 index 00000000000..49b207a125b --- /dev/null +++ b/cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java @@ -0,0 +1,130 @@ +package io.kestra.cli.services; + +import io.kestra.core.models.flows.Flow; +import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.utils.Await; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static io.kestra.core.utils.Rethrow.throwRunnable; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@MicronautTest(environments = {"test", "file-watch"}, transactional = false) +class FileChangedEventListenerTest { + public static final String FILE_WATCH = "build/file-watch"; + @Inject + private FileChangedEventListener fileWatcher; + + @Inject + private FlowRepositoryInterface flowRepository; + + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final AtomicBoolean started = new AtomicBoolean(false); + + @BeforeAll + static void setup() throws IOException { + if (!Files.exists(Path.of(FILE_WATCH))) { + Files.createDirectories(Path.of(FILE_WATCH)); + } + } + + @AfterAll + static void tearDown() throws IOException { + if (Files.exists(Path.of(FILE_WATCH))) { + FileUtils.deleteDirectory(Path.of(FILE_WATCH).toFile()); + } + } + + @BeforeEach + void beforeEach() throws Exception { + if (started.compareAndSet(false, true)) { + executorService.execute(throwRunnable(() -> fileWatcher.startListeningFromConfig())); + } + } + + @Test + void test() throws IOException, TimeoutException { + // remove the flow if it already exists + flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow)); + + // create a basic flow + String flow = """ + id: myflow + namespace: io.kestra.tests.watch + + tasks: + - id: hello + type: io.kestra.plugin.core.log.Log + message: Hello World! 🚀 + """; + Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes()); + Await.until( + () -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isPresent(), + Duration.ofMillis(100), + Duration.ofSeconds(10) + ); + Flow myflow = flowRepository.findById(null, "io.kestra.tests.watch", "myflow").orElseThrow(); + assertThat(myflow.getTasks(), hasSize(1)); + assertThat(myflow.getTasks().getFirst().getId(), is("hello")); + assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log")); + + // delete the flow + Files.delete(Path.of(FILE_WATCH + "/myflow.yaml")); + Await.until( + () -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isEmpty(), + Duration.ofMillis(100), + Duration.ofSeconds(10) + ); + } + + @Test + void testWithPluginDefault() throws IOException, TimeoutException { + // remove the flow if it already exists + flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow)); + + // create a flow with plugin default + String pluginDefault = """ + id: pluginDefault + namespace: io.kestra.tests.watch + + tasks: + - id: helloWithDefault + type: io.kestra.plugin.core.log.Log + + pluginDefaults: + - type: io.kestra.plugin.core.log.Log + values: + message: Hello World! + """; + Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes()); + Await.until( + () -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isPresent(), + Duration.ofMillis(100), + Duration.ofSeconds(10) + ); + Flow pluginDefaultFlow = flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").orElseThrow(); + assertThat(pluginDefaultFlow.getTasks(), hasSize(1)); + assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault")); + assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log")); + + // delete both files + Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml")); + Await.until( + () -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isEmpty(), + Duration.ofMillis(100), + Duration.ofSeconds(10) + ); + } +} \ No newline at end of file diff --git a/cli/src/test/resources/application-file-watch.yml b/cli/src/test/resources/application-file-watch.yml new file mode 100644 index 00000000000..34f7856c6bb --- /dev/null +++ b/cli/src/test/resources/application-file-watch.yml @@ -0,0 +1,12 @@ +micronaut: + io: + watch: + enabled: true + paths: + - build/file-watch + +kestra: + repository: + type: memory + queue: + type: memory \ No newline at end of file