Skip to content

Commit

Permalink
refactor: migrate plugin.core.namespace to dynamic properties (#6832)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Jan 20, 2025
1 parent 0283129 commit 914bd94
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -75,8 +76,7 @@ public class DeleteFiles extends Task implements RunnableTask<Output> {
@Schema(
title = "The namespace from which the files should be deleted."
)
@PluginProperty(dynamic = true)
private String namespace;
private Property<String> namespace;

@NotNull
@Schema(
Expand All @@ -92,15 +92,14 @@ public class DeleteFiles extends Task implements RunnableTask<Output> {
description = "If true, parent folders that become empty after file deletion will also be removed.",
defaultValue = "false"
)
@PluginProperty(dynamic = false)
@Builder.Default
private Boolean deleteParentFolder = false;
private Property<Boolean> deleteParentFolder = Property.of(false);

@SuppressWarnings("unchecked")
@Override
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
String renderedNamespace = runContext.render(this.namespace);
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElseThrow();

final Namespace namespace = runContext.storage().namespace(renderedNamespace);

Expand All @@ -113,15 +112,17 @@ public Output run(RunContext runContext) throws Exception {
throw new IllegalArgumentException("Files must be a String or a list of String");
}

var deleteParent = runContext.render(this.deleteParentFolder).as(Boolean.class).orElseThrow();

List<NamespaceFile> matched = namespace.findAllFilesMatching(PathMatcherPredicate.matches(renderedFiles));
Set<String> parentFolders = Boolean.TRUE.equals(deleteParentFolder) ? new TreeSet<>() : null;
Set<String> parentFolders = Boolean.TRUE.equals(deleteParent) ? new TreeSet<>() : null;
long count = matched
.stream()
.map(Rethrow.throwFunction(file -> {
if (namespace.delete(NamespaceFile.of(renderedNamespace, Path.of(file.path().replace("\\","/"))).storagePath())) {
logger.debug(String.format("Deleted %s", (file.path())));

if (Boolean.TRUE.equals(deleteParentFolder)) {
if (Boolean.TRUE.equals(deleteParent)) {
trackParentFolder(file, parentFolders);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -75,8 +76,7 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
@Schema(
title = "The namespace from which you want to download files."
)
@PluginProperty(dynamic = true)
private String namespace;
private Property<String> namespace;

@NotNull
@Schema(
Expand All @@ -90,17 +90,16 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
@Schema(
title = "The folder where the downloaded files will be stored"
)
@PluginProperty(dynamic = true)
@Builder.Default
private String destination = "";
private Property<String> destination = Property.of("");


@Override
@SuppressWarnings("unchecked")
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
String renderedNamespace = runContext.render(this.namespace);
String renderedDestination = runContext.render(destination);
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElseThrow();
String renderedDestination = runContext.render(destination).as(String.class).orElseThrow();

final Namespace namespace = runContext.storage().namespace(renderedNamespace);

Expand Down
29 changes: 16 additions & 13 deletions core/src/main/java/io/kestra/plugin/core/namespace/UploadFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -114,16 +115,14 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
@Schema(
title = "The namespace to which the files will be uploaded."
)
@PluginProperty(dynamic = true)
private String namespace;
private Property<String> namespace;

@Schema(
title = "A list of Regex that match files in the current directory.",
description = "This should be a list of Regex matching the [Apache Ant patterns](https://ant.apache.org/manual/dirtasks.html#patterns)." +
"It's primarily intended to be used with the `WorkingDirectory` task"
)
@PluginProperty(dynamic = true)
private List<String> files;
private Property<List<String>> files;

@Schema(
title = "A map of key-value pairs where the key is the filename and the value is the URI of the file to upload.",
Expand All @@ -141,33 +140,33 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
title = "The destination folder.",
description = "Required when providing a list of files."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String destination = "/";
private Property<String> destination = Property.of("/");

@Builder.Default

@Schema(
title = "Which action to take when uploading a file that already exists.",
description = "Can be one of the following options: OVERWRITE, ERROR or SKIP. Default is OVERWRITE."
)
private Namespace.Conflicts conflict = Namespace.Conflicts.OVERWRITE;
private Property<Namespace.Conflicts> conflict = Property.of(Namespace.Conflicts.OVERWRITE);

@Override
@SuppressWarnings({"unchecked"})
public UploadFiles.Output run(RunContext runContext) throws Exception {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
String renderedNamespace = namespace != null ? runContext.render(namespace) : flowInfo.namespace();
String renderedDestination = checkLeadingSlash(runContext.render(destination));
String renderedNamespace = namespace != null ? runContext.render(namespace).as(String.class).orElseThrow() : flowInfo.namespace();
String renderedDestination = checkLeadingSlash(runContext.render(destination).as(String.class).orElseThrow());

final Namespace storageNamespace = runContext.storage().namespace(renderedNamespace);

if (files == null && filesMap == null) {
throw new IllegalArgumentException("files or filesMap is required");
}

if (files != null) {
this.uploadFiles(runContext, files, storageNamespace, renderedDestination);
var renderedFiles = runContext.render(this.files).asList(String.class);
if (!renderedFiles.isEmpty()) {
this.uploadFiles(runContext, renderedFiles, storageNamespace, renderedDestination);
}

if (filesMap != null) {
Expand All @@ -192,7 +191,7 @@ private void uploadFiles(RunContext runContext, List<String> files, Namespace st
Path resolve = Paths.get("/").resolve(runContext.workingDir().path().relativize(file.toPath()));

Path targetFilePath = Path.of(destination, resolve.toString());
storageNamespace.putFile(targetFilePath, new FileInputStream(file), conflict);
storageNamespace.putFile(targetFilePath, new FileInputStream(file), runContext.render(conflict).as(Namespace.Conflicts.class).orElseThrow());
}
}

Expand All @@ -204,7 +203,11 @@ private void uploadFilesMap(RunContext runContext, Map<String, Object> filesMap,
if (key instanceof String targetFilePath && value instanceof String stringSourceFileURI) {
URI sourceFileURI = URI.create(stringSourceFileURI);
if (runContext.storage().isFileExist(sourceFileURI)) {
storageNamespace.putFile(Path.of(destination + targetFilePath), runContext.storage().getFile(sourceFileURI), conflict);
storageNamespace.putFile(
Path.of(destination + targetFilePath),
runContext.storage().getFile(sourceFileURI),
runContext.render(conflict).as(Namespace.Conflicts.class).orElseThrow()
);
}
} else {
throw new IllegalArgumentException("filesMap must be a Map<String, String>");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.namespace;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.Namespace;
Expand All @@ -20,7 +21,7 @@
import static org.hamcrest.Matchers.notNullValue;

@KestraTest
public class DeleteFilesTest {
class DeleteFilesTest {
@Inject
RunContextFactory runContextFactory;

Expand All @@ -33,7 +34,7 @@ void shouldDeleteNamespaceFilesForMatchingExpression() throws Exception {
.id(DeleteFiles.class.getSimpleName())
.type(DeleteFiles.class.getName())
.files(List.of("**test1*"))
.namespace("{{ inputs.namespace }}")
.namespace(new Property<>("{{ inputs.namespace }}"))
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId));
Expand All @@ -60,8 +61,8 @@ void shouldDeleteParentFolder() throws Exception {
.id(DeleteFiles.class.getSimpleName())
.type(DeleteFiles.class.getName())
.files(List.of("**/file.txt"))
.namespace("{{ inputs.namespace }}")
.deleteParentFolder(true)
.namespace(new Property<>("{{ inputs.namespace }}"))
.deleteParentFolder(Property.of(true))
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId));
Expand All @@ -88,8 +89,8 @@ void shouldNotDeleteParentFolderWhenFlagIsFalse() throws Exception {
.id(DeleteFiles.class.getSimpleName())
.type(DeleteFiles.class.getName())
.files(List.of("**/file.txt"))
.namespace("{{ inputs.namespace }}")
.deleteParentFolder(false)
.namespace(new Property<>("{{ inputs.namespace }}"))
.deleteParentFolder(Property.of(false))
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId));
Expand All @@ -116,8 +117,8 @@ void shouldNotDeleteParentFolderWhenMultipleFilesExist() throws Exception {
.id(DeleteFiles.class.getSimpleName())
.type(DeleteFiles.class.getName())
.files(List.of("**/file1.txt"))
.namespace("{{ inputs.namespace }}")
.deleteParentFolder(true)
.namespace(new Property<>("{{ inputs.namespace }}"))
.deleteParentFolder(Property.of(true))
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.namespace;

import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.Namespace;
Expand Down Expand Up @@ -35,7 +36,7 @@ void shouldDownloadNamespaceFile() throws Exception {
.id(DownloadFiles.class.getSimpleName())
.type(DownloadFiles.class.getName())
.files(List.of("**test1.txt"))
.namespace("{{ inputs.namespace }}")
.namespace(new Property<>("{{ inputs.namespace }}"))
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, downloadFiles, Map.of("namespace", namespaceId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.devskiller.friendly_id.FriendlyId;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.Namespace;
Expand Down Expand Up @@ -53,9 +54,9 @@ void shouldThrowExceptionGivenAlreadyExistingFileWhenConflictError() throws Exce
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.namespace(namespace)
.conflict(Namespace.Conflicts.ERROR)
.destination("/folder")
.namespace(Property.of(namespace))
.conflict(Property.of(Namespace.Conflicts.ERROR))
.destination(Property.of("/folder"))
.build();

RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of());
Expand All @@ -75,8 +76,8 @@ void shouldPutFileGivenAlreadyExistingFileWhenConflictOverwrite() throws Except
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.namespace("{{ inputs.namespace }}")
.destination("/folder")
.namespace(new Property<>("{{ inputs.namespace }}"))
.destination(Property.of("/folder"))
.build();

RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of("namespace", namespace));
Expand Down Expand Up @@ -113,9 +114,9 @@ void shouldPutFileGivenAlreadyExistingFileWhenConflictSkip() throws Exception {
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.namespace(namespace)
.conflict(Namespace.Conflicts.SKIP)
.destination("/folder")
.namespace(Property.of(namespace))
.conflict(Property.of(Namespace.Conflicts.SKIP))
.destination(Property.of("/folder"))
.build();

RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of());
Expand Down Expand Up @@ -150,10 +151,10 @@ void shouldPutFileFromRegex() throws Exception {
UploadFiles uploadFile = UploadFiles.builder()
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.files(List.of("glob:**application**"))
.namespace(namespace)
.conflict(Namespace.Conflicts.SKIP)
.destination("/folder/")
.files(Property.of(List.of("glob:**application**")))
.namespace(Property.of(namespace))
.conflict(Property.of(Namespace.Conflicts.SKIP))
.destination(Property.of("/folder/"))
.build();

RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of());
Expand Down

0 comments on commit 914bd94

Please sign in to comment.