Skip to content

Commit

Permalink
fix(cli): flow watcher should compute plugin defaults
Browse files Browse the repository at this point in the history
fixes #6908
  • Loading branch information
loicmathieu committed Jan 29, 2025
1 parent 0661899 commit ad351ec
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
Expand Down Expand Up @@ -36,6 +37,9 @@ public class FileChangedEventListener {
@Inject
private FlowRepositoryInterface flowRepositoryInterface;

@Inject
private PluginDefaultService pluginDefaultService;

@Inject
private YamlParser yamlParser;

Expand Down Expand Up @@ -64,7 +68,7 @@ public FileChangedEventListener(@Nullable FileWatchConfiguration fileWatchConfig

public void startListeningFromConfig() throws IOException, InterruptedException {
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, pluginDefaultService);
List<Path> paths = fileWatchConfiguration.getPaths();
this.setup(paths);

Expand Down Expand Up @@ -107,7 +111,6 @@ public void startListeningFromConfig() throws IOException, InterruptedException
} else {
log.info("File watching is disabled.");
}

}

public void startListening(List<Path> paths) throws IOException, InterruptedException {
Expand Down Expand Up @@ -230,7 +233,8 @@ private void flowToFile(FlowWithSource flow, Path path) {
private Optional<Flow> parseFlow(String content, Path entry) {
try {
Flow flow = yamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
FlowWithSource withPluginDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
modelValidator.validate(withPluginDefault);
return Optional.of(flow);
} catch (ConstraintViolationException e) {
log.warn("Error while parsing flow: {}", entry, e);
Expand Down
28 changes: 16 additions & 12 deletions cli/src/main/java/io/kestra/cli/services/LocalFlowFileWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,36 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.context.annotation.Requires;
import io.kestra.core.services.PluginDefaultService;
import lombok.extern.slf4j.Slf4j;

@Requires(property = "micronaut.io.watch.enabled", value = "true")
@Slf4j
public class LocalFlowFileWatcher implements FlowFilesManager {
private FlowRepositoryInterface flowRepositoryInterface;
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;

public LocalFlowFileWatcher(FlowRepositoryInterface flowRepositoryInterface) {
this.flowRepositoryInterface = flowRepositoryInterface;
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
}

@Override
public FlowWithSource createOrUpdateFlow(Flow flow, String content) {
return flowRepositoryInterface.findById(null, flow.getNamespace(), flow.getId())
.map(previous -> flowRepositoryInterface.update(flow, previous, content, flow))
.orElseGet(() -> flowRepositoryInterface.create(flow, content, flow));
FlowWithSource withDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
return flowRepository.findById(null, flow.getNamespace(), flow.getId())
.map(previous -> flowRepository.update(flow, previous, content, withDefault))
.orElseGet(() -> flowRepository.create(flow, content, withDefault));
}

@Override
public void deleteFlow(FlowWithSource toDelete) {
flowRepositoryInterface.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepositoryInterface::delete);
log.error("Flow {} has been deleted", toDelete.getId());
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", toDelete.getId());
}

@Override
public void deleteFlow(String tenantId, String namespace, String id) {
flowRepositoryInterface.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepositoryInterface::delete);
log.error("Flow {} has been deleted", id);
flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package io.kestra.cli.services;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.checkerframework.checker.units.qual.A;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
class FileChangedEventListenerTest {
public static final String FILE_WATCH = "build/file-watch";
@Inject
private FileChangedEventListener fileWatcher;

@Inject
private FlowRepositoryInterface flowRepository;

private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean started = new AtomicBoolean(false);

@BeforeAll
static void setup() throws IOException {
if (!Files.exists(Path.of(FILE_WATCH))) {
Files.createDirectories(Path.of(FILE_WATCH));
}
}

@AfterAll
static void tearDown() throws IOException {
if (Files.exists(Path.of(FILE_WATCH))) {
FileUtils.deleteDirectory(Path.of(FILE_WATCH).toFile());
}
}

@BeforeEach
void beforeEach() throws Exception {
if (started.compareAndSet(false, true)) {
executorService.execute(throwRunnable(() -> fileWatcher.startListeningFromConfig()));
}
}

@Test
void test() throws IOException, TimeoutException {
List<Flow> all = flowRepository.findAll(null);
assertThat(all, empty());

Check failure on line 65 in cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

FileChangedEventListenerTest.test()

java.lang.AssertionError: Expected: an empty collection but: <[Flow(super=io.kestra.core.models.flows.Flow@1041c6e7, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@b06cef11, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@80d30a98, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@3ee8c443, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@2e09bdb6, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@a3d3d2bf, format=Lorem ipsum dolor sit amet), Return(super=io.kestra.plugin.core.debug.Return@7f8fed4f, format=Lorem ipsum dolor sit amet Lorem ipsum dolor sit amet)], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@f14b2657, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@c95910fa, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@7971fade, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@8547de81, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@ea80443d, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@8981f712, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@304ab27f, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@382248b2, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@9c1324de, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@3e89e440, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@90705498, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@1265daa1, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null)]>
Raw output
java.lang.AssertionError: 
Expected: an empty collection
     but: <[Flow(super=io.kestra.core.models.flows.Flow@1041c6e7, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@b06cef11, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@80d30a98, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@3ee8c443, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@2e09bdb6, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@a3d3d2bf, format=Lorem ipsum dolor sit amet), Return(super=io.kestra.plugin.core.debug.Return@7f8fed4f, format=Lorem ipsum dolor sit amet
Lorem ipsum dolor sit amet)], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@f14b2657, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@c95910fa, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@7971fade, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@8547de81, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@ea80443d, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@8981f712, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@304ab27f, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@382248b2, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@9c1324de, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@3e89e440, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@90705498, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@1265daa1, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null)]>
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
	at io.kestra.cli.services.FileChangedEventListenerTest.test(FileChangedEventListenerTest.java:65)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

// create a basic flow
String flow = """
id: myflow
namespace: company.team
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
""";
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(null, "company.team", "myflow").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(null, "company.team", "myflow").orElseThrow();
assertThat(myflow.getTasks(), hasSize(1));
assertThat(myflow.getTasks().getFirst().getId(), is("hello"));
assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));

// delete the flow
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
Await.until(
() -> flowRepository.findById(null, "company.team", "myflow").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
all = flowRepository.findAll(null);
assertThat(all, empty());
}

@Test
void testWithPluginDefault() throws IOException, TimeoutException {
List<Flow> all = flowRepository.findAll(null);
assertThat(all, empty());

Check failure on line 102 in cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

FileChangedEventListenerTest.testWithPluginDefault()

java.lang.AssertionError: Expected: an empty collection but: <[Flow(super=io.kestra.core.models.flows.Flow@e2cfe57b, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@5e55d754, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@ed9bf126, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@93de40d9, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@e6cc6869, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@ef862ec4, format=Lorem ipsum dolor sit amet), Return(super=io.kestra.plugin.core.debug.Return@dcf7afe9, format=Lorem ipsum dolor sit amet Lorem ipsum dolor sit amet)], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@9ea8f9e4, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@6738fe8f, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@f615b889, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@808c9fb0, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@60eaa65e, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@6805a609, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@de75cc2a, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@bc35e49, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@a0453df6, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@339977b6, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@2d7b9043, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@44b79824, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null)]>
Raw output
java.lang.AssertionError: 
Expected: an empty collection
     but: <[Flow(super=io.kestra.core.models.flows.Flow@e2cfe57b, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@5e55d754, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@ed9bf126, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@93de40d9, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@e6cc6869, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@ef862ec4, format=Lorem ipsum dolor sit amet), Return(super=io.kestra.plugin.core.debug.Return@dcf7afe9, format=Lorem ipsum dolor sit amet
Lorem ipsum dolor sit amet)], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@9ea8f9e4, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@6738fe8f, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@f615b889, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@808c9fb0, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@60eaa65e, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@6805a609, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@de75cc2a, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@bc35e49, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@a0453df6, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@339977b6, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null), Flow(super=io.kestra.core.models.flows.Flow@2d7b9043, description=null, labels=null, variables=null, tasks=[Return(super=io.kestra.plugin.core.debug.Return@44b79824, format={{taskrun.startDate}})], errors=null, _finally=null, listeners=null, triggers=null, pluginDefaults=null, taskDefaults=null, concurrency=null, outputs=null, retry=null, sla=null)]>
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
	at io.kestra.cli.services.FileChangedEventListenerTest.testWithPluginDefault(FileChangedEventListenerTest.java:102)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

// create a flow with plugin default
String pluginDefault = """
id: pluginDefault
namespace: company.team
tasks:
- id: helloWithDefault
type: io.kestra.plugin.core.log.Log
pluginDefaults:
- type: io.kestra.plugin.core.log.Log
values:
message: Hello World!
""";
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(null, "company.team", "pluginDefault").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(null, "company.team", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks(), hasSize(1));
assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault"));
assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));

// delete both files
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
Await.until(
() -> flowRepository.findById(null, "company.team", "pluginDefault").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
all = flowRepository.findAll(null);
assertThat(all, empty());
}
}
12 changes: 12 additions & 0 deletions cli/src/test/resources/application-file-watch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
micronaut:
io:
watch:
enabled: true
paths:
- build/file-watch

kestra:
repository:
type: memory
queue:
type: memory

0 comments on commit ad351ec

Please sign in to comment.