Skip to content

Commit

Permalink
fix(cli): flow watcher should compute plugin defaults
Browse files Browse the repository at this point in the history
fixes #6908
  • Loading branch information
loicmathieu committed Jan 29, 2025
1 parent f4fdfc2 commit 2d98f90
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 58 deletions.
100 changes: 54 additions & 46 deletions cli/src/main/java/io/kestra/cli/services/FileChangedEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
Expand Down Expand Up @@ -36,6 +37,9 @@ public class FileChangedEventListener {
@Inject
private FlowRepositoryInterface flowRepositoryInterface;

@Inject
private PluginDefaultService pluginDefaultService;

@Inject
private YamlParser yamlParser;

Expand Down Expand Up @@ -64,7 +68,7 @@ public FileChangedEventListener(@Nullable FileWatchConfiguration fileWatchConfig

public void startListeningFromConfig() throws IOException, InterruptedException {
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, pluginDefaultService);
List<Path> paths = fileWatchConfiguration.getPaths();
this.setup(paths);

Expand Down Expand Up @@ -107,7 +111,6 @@ public void startListeningFromConfig() throws IOException, InterruptedException
} else {
log.info("File watching is disabled.");
}

}

public void startListening(List<Path> paths) throws IOException, InterruptedException {
Expand All @@ -118,60 +121,64 @@ public void startListening(List<Path> paths) throws IOException, InterruptedExce
WatchKey key;
while ((key = watchService.take()) != null) {
for (WatchEvent<?> watchEvent : key.pollEvents()) {
WatchEvent.Kind<?> kind = watchEvent.kind();
Path entry = (Path) watchEvent.context();

if (entry.toString().endsWith(".yml") || entry.toString().endsWith(".yaml")) {

if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {

Path filePath = ((Path) key.watchable()).resolve(entry);
if (Files.isDirectory(filePath)) {
loadFlowsFromFolder(filePath);
} else {

try {
String content = Files.readString(filePath, Charset.defaultCharset());

Optional<Flow> flow = parseFlow(content, entry);
if (flow.isPresent()) {
if (kind == StandardWatchEventKinds.ENTRY_MODIFY) {
// Check if we already have a file with the given path
if (flows.stream().anyMatch(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()))) {
Optional<FlowWithPath> previous = flows.stream().filter(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())).findFirst();
// Check if Flow from file has id/namespace updated
if (previous.isPresent() && !previous.get().uidWithoutRevision().equals(flow.get().uidWithoutRevision())) {
flows.removeIf(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()));
flowFilesManager.deleteFlow(previous.get().getTenantId(), previous.get().getNamespace(), previous.get().getId());
try {
WatchEvent.Kind<?> kind = watchEvent.kind();
Path entry = (Path) watchEvent.context();

if (entry.toString().endsWith(".yml") || entry.toString().endsWith(".yaml")) {

if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {

Path filePath = ((Path) key.watchable()).resolve(entry);
if (Files.isDirectory(filePath)) {
loadFlowsFromFolder(filePath);
} else {

try {
String content = Files.readString(filePath, Charset.defaultCharset());

Optional<Flow> flow = parseFlow(content, entry);
if (flow.isPresent()) {
if (kind == StandardWatchEventKinds.ENTRY_MODIFY) {
// Check if we already have a file with the given path
if (flows.stream().anyMatch(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()))) {
Optional<FlowWithPath> previous = flows.stream().filter(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())).findFirst();
// Check if Flow from file has id/namespace updated
if (previous.isPresent() && !previous.get().uidWithoutRevision().equals(flow.get().uidWithoutRevision())) {
flows.removeIf(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()));
flowFilesManager.deleteFlow(previous.get().getTenantId(), previous.get().getNamespace(), previous.get().getId());
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
}
} else {
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
}
} else {
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
}
} else {
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));

flowFilesManager.createOrUpdateFlow(flow.get(), content);
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
}

flowFilesManager.createOrUpdateFlow(flow.get(), content);
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
} catch (NoSuchFileException e) {
log.error("File not found: {}", entry, e);
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}

} catch (NoSuchFileException e) {
log.error("File not found: {}", entry, e);
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}
} else {
Path filePath = ((Path) key.watchable()).resolve(entry);
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
}
} else {
Path filePath = ((Path) key.watchable()).resolve(entry);
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
}
} catch (Exception e) {
log.error("Unexpected error while watching flows", e);
}
}
key.reset();
Expand Down Expand Up @@ -230,7 +237,8 @@ private void flowToFile(FlowWithSource flow, Path path) {
private Optional<Flow> parseFlow(String content, Path entry) {
try {
Flow flow = yamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
FlowWithSource withPluginDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
modelValidator.validate(withPluginDefault);
return Optional.of(flow);
} catch (ConstraintViolationException e) {
log.warn("Error while parsing flow: {}", entry, e);
Expand Down
28 changes: 16 additions & 12 deletions cli/src/main/java/io/kestra/cli/services/LocalFlowFileWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,36 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.context.annotation.Requires;
import io.kestra.core.services.PluginDefaultService;
import lombok.extern.slf4j.Slf4j;

@Requires(property = "micronaut.io.watch.enabled", value = "true")
@Slf4j
public class LocalFlowFileWatcher implements FlowFilesManager {
private FlowRepositoryInterface flowRepositoryInterface;
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;

public LocalFlowFileWatcher(FlowRepositoryInterface flowRepositoryInterface) {
this.flowRepositoryInterface = flowRepositoryInterface;
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
}

@Override
public FlowWithSource createOrUpdateFlow(Flow flow, String content) {
return flowRepositoryInterface.findById(null, flow.getNamespace(), flow.getId())
.map(previous -> flowRepositoryInterface.update(flow, previous, content, flow))
.orElseGet(() -> flowRepositoryInterface.create(flow, content, flow));
FlowWithSource withDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
return flowRepository.findById(null, flow.getNamespace(), flow.getId())
.map(previous -> flowRepository.update(flow, previous, content, withDefault))
.orElseGet(() -> flowRepository.create(flow, content, withDefault));
}

@Override
public void deleteFlow(FlowWithSource toDelete) {
flowRepositoryInterface.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepositoryInterface::delete);
log.error("Flow {} has been deleted", toDelete.getId());
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", toDelete.getId());
}

@Override
public void deleteFlow(String tenantId, String namespace, String id) {
flowRepositoryInterface.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepositoryInterface::delete);
log.error("Flow {} has been deleted", id);
flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package io.kestra.cli.services;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
class FileChangedEventListenerTest {
public static final String FILE_WATCH = "build/file-watch";
@Inject
private FileChangedEventListener fileWatcher;

@Inject
private FlowRepositoryInterface flowRepository;

private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean started = new AtomicBoolean(false);

@BeforeAll
static void setup() throws IOException {
if (!Files.exists(Path.of(FILE_WATCH))) {
Files.createDirectories(Path.of(FILE_WATCH));
}
}

@AfterAll
static void tearDown() throws IOException {
if (Files.exists(Path.of(FILE_WATCH))) {
FileUtils.deleteDirectory(Path.of(FILE_WATCH).toFile());
}
}

@BeforeEach
void beforeEach() throws Exception {
if (started.compareAndSet(false, true)) {
executorService.execute(throwRunnable(() -> fileWatcher.startListeningFromConfig()));
}
}

@Test
void test() throws IOException, TimeoutException {
// remove the flow if it already exists
flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));

// create a basic flow
String flow = """
id: myflow
namespace: io.kestra.tests.watch
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
""";
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(null, "io.kestra.tests.watch", "myflow").orElseThrow();
assertThat(myflow.getTasks(), hasSize(1));
assertThat(myflow.getTasks().getFirst().getId(), is("hello"));
assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));

// delete the flow
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
Await.until(
() -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}

@Test
void testWithPluginDefault() throws IOException, TimeoutException {
// remove the flow if it already exists
flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));

// create a flow with plugin default
String pluginDefault = """
id: pluginDefault
namespace: io.kestra.tests.watch
tasks:
- id: helloWithDefault
type: io.kestra.plugin.core.log.Log
pluginDefaults:
- type: io.kestra.plugin.core.log.Log
values:
message: Hello World!
""";
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks(), hasSize(1));
assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault"));
assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));

// delete both files
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
Await.until(
() -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}
}
12 changes: 12 additions & 0 deletions cli/src/test/resources/application-file-watch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
micronaut:
io:
watch:
enabled: true
paths:
- build/file-watch

kestra:
repository:
type: memory
queue:
type: memory

0 comments on commit 2d98f90

Please sign in to comment.