Skip to content

Commit

Permalink
refactor: migrate package plugin.core.storage to dynamic properties (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Jan 15, 2025
1 parent ca8837f commit c4ec56f
Show file tree
Hide file tree
Showing 16 changed files with 72 additions and 72 deletions.
11 changes: 5 additions & 6 deletions core/src/main/java/io/kestra/plugin/core/storage/Concat.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.storage;

import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.models.property.Property;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
Expand Down Expand Up @@ -100,20 +101,18 @@ public class Concat extends Task implements RunnableTask<Concat.Output> {
@Schema(
title = "The separator to used between files, default is no separator."
)
@PluginProperty(dynamic = true)
private String separator;
private Property<String> separator;

@Schema(
title = "The extension of the created file, default is .tmp."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String extension = ".tmp";
private Property<String> extension = Property.of(".tmp");

@SuppressWarnings("unchecked")
@Override
public Concat.Output run(RunContext runContext) throws Exception {
File tempFile = runContext.workingDir().createTempFile(extension).toFile();
File tempFile = runContext.workingDir().createTempFile(runContext.render(extension).as(String.class).orElseThrow()).toFile();
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
List<String> finalFiles;
if (this.files instanceof List<?> listValue) {
Expand All @@ -136,7 +135,7 @@ public Concat.Output run(RunContext runContext) throws Exception {
}

if (separator != null) {
IOUtils.copy(new ByteArrayInputStream(this.separator.getBytes()), fileOutputStream);
IOUtils.copy(new ByteArrayInputStream(runContext.render(this.separator).as(String.class).orElseThrow().getBytes()), fileOutputStream);
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -61,9 +62,8 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
title = "The file to be deduplicated.",
description = "Must be a `kestra://` internal storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

@Schema(
title = "The 'pebble' expression to be used for extracting the deduplication key from each item.",
Expand All @@ -79,7 +79,7 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
@Override
public Output run(RunContext runContext) throws Exception {

URI from = new URI(runContext.render(this.from));
URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());

final PebbleFieldExtractor keyExtractor = getKeyExtractor(runContext);

Expand Down
11 changes: 5 additions & 6 deletions core/src/main/java/io/kestra/plugin/core/storage/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
Expand Down Expand Up @@ -39,25 +40,23 @@ public class Delete extends Task implements RunnableTask<Delete.Output> {
title = "The file to be deleted.",
description = "Must be a `kestra://` storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String uri;
private Property<String> uri;

@Schema(
title = "Raise an error if the file is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnMissing = false;
private final Property<Boolean> errorOnMissing = Property.of(false);

@Override
public Delete.Output run(RunContext runContext) throws Exception {
StorageInterface storageInterface = ((DefaultRunContext)runContext).getApplicationContext().getBean(StorageInterface.class);
URI render = URI.create(runContext.render(this.uri));
URI render = URI.create(runContext.render(this.uri).as(String.class).orElseThrow());

boolean delete = storageInterface.delete(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), render);

if (errorOnMissing && !delete) {
if (runContext.render(errorOnMissing).as(Boolean.class).orElseThrow() && !delete) {
throw new NoSuchElementException("Unable to find file '" + render + "'");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -61,9 +62,8 @@ public class FilterItems extends Task implements RunnableTask<FilterItems.Output
title = "The file to be filtered.",
description = "Must be a `kestra://` internal storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

@Schema(
title = "The 'pebble' expression used to match items to be included or excluded.",
Expand All @@ -78,25 +78,23 @@ public class FilterItems extends Task implements RunnableTask<FilterItems.Output
title = "Specifies the action to perform with items that match the `filterCondition` predicate",
description = "Use `INCLUDE` to pass the item through, or `EXCLUDE` to drop the items."
)
@PluginProperty
@Builder.Default
private FilterType filterType = FilterType.INCLUDE;
private Property<FilterType> filterType = Property.of(FilterType.INCLUDE);

@Schema(
title = "Specifies the behavior when the expression fail to be evaluated on an item or return `null`.",
description = "Use `FAIL` to throw the exception and fail the task, `INCLUDE` to pass the item through, or `EXCLUDE` to drop the item."
)
@PluginProperty
@Builder.Default
private ErrorOrNullBehavior errorOrNullBehavior = ErrorOrNullBehavior.FAIL;
private Property<ErrorOrNullBehavior> errorOrNullBehavior = Property.of(ErrorOrNullBehavior.FAIL);

/**
* {@inheritDoc}
**/
@Override
public Output run(RunContext runContext) throws Exception {

URI from = new URI(runContext.render(this.from));
URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());

final PebbleExpressionPredicate predicate = getExpressionPredication(runContext);

Expand All @@ -116,10 +114,10 @@ public Output run(RunContext runContext) throws Exception {
exception = e;
}

FilterType action = this.filterType;
FilterType action = runContext.render(this.filterType).as(FilterType.class).orElseThrow();

if (match == null) {
switch (errorOrNullBehavior) {
switch (runContext.render(errorOrNullBehavior).as(ErrorOrNullBehavior.class).orElseThrow()) {
case FAIL -> {
if (exception != null) {
throw exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -132,13 +133,12 @@ public class LocalFiles extends Task implements RunnableTask<LocalFiles.LocalFil
title = "The files from the local filesystem to be sent to the Kestra's internal storage.",
description = "Must be a list of [glob](https://en.wikipedia.org/wiki/Glob_(programming)) expressions relative to the current working directory, some examples: `my-dir/**`, `my-dir/*/**` or `my-dir/my-file.txt`."
)
@PluginProperty(dynamic = true)
private List<String> outputs;
private Property<List<String>> outputs;

@Override
public LocalFilesOutput run(RunContext runContext) throws Exception {
FilesService.inputFiles(runContext, this.inputs);
Map<String, URI> outputFiles = FilesService.outputFiles(runContext, this.outputs);
Map<String, URI> outputFiles = FilesService.outputFiles(runContext, runContext.render(this.outputs).asList(String.class));

return LocalFilesOutput.builder()
.uris(outputFiles)
Expand Down
16 changes: 7 additions & 9 deletions core/src/main/java/io/kestra/plugin/core/storage/Reverse.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -45,30 +46,27 @@ public class Reverse extends Task implements RunnableTask<Reverse.Output> {
@Schema(
title = "The file to be split."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

@Schema(
title = "The separator used to join the file into chunks. By default, it's a newline `\\n` character. If you are on Windows, you might want to use `\\r\\n` instead."
)
@PluginProperty(dynamic = true)
@Builder.Default
private String separator = "\n";
private Property<String> separator = Property.of("\n");

@Schema(
title = "The name of a supported charset"
)
@Builder.Default
@PluginProperty(dynamic = true)
private final String charset = StandardCharsets.UTF_8.name();
private final Property<String> charset = Property.of(StandardCharsets.UTF_8.name());

@Override
public Reverse.Output run(RunContext runContext) throws Exception {
URI from = new URI(runContext.render(this.from));
URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());
String extension = FileUtils.getExtension(from);
String separator = runContext.render(this.separator);
Charset charset = Charsets.toCharset(runContext.render(this.charset));
String separator = runContext.render(this.separator).as(String.class).orElseThrow();
Charset charset = Charsets.toCharset(runContext.render(this.charset).as(String.class).orElseThrow());

File tempFile = runContext.workingDir().createTempFile(extension).toFile();

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/storage/Size.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
Expand Down Expand Up @@ -38,14 +39,13 @@ public class Size extends Task implements RunnableTask<Size.Output> {
title = "The file whose size needs to be fetched.",
description = "Must be a `kestra://` storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String uri;
private Property<String> uri;

@Override
public Size.Output run(RunContext runContext) throws Exception {
StorageInterface storageInterface = ((DefaultRunContext)runContext).getApplicationContext().getBean(StorageInterface.class);
URI render = URI.create(runContext.render(this.uri));
URI render = URI.create(runContext.render(this.uri).as(String.class).orElseThrow());

Long size = storageInterface.getAttributes(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), render).getSize();

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/storage/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public class Split extends Task implements RunnableTask<Split.Output>, StorageSp
@Schema(
title = "The file to be split."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

private Property<String> bytes;

Expand All @@ -70,7 +69,7 @@ public class Split extends Task implements RunnableTask<Split.Output>, StorageSp

@Override
public Split.Output run(RunContext runContext) throws Exception {
URI from = new URI(runContext.render(this.from));
URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());

return Split.Output.builder()
.uris(StorageService.split(runContext, this, from))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.storage;

import com.google.common.io.CharStreams;
import io.kestra.core.models.property.Property;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.junit.annotations.KestraTest;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -49,8 +50,8 @@ void run(Boolean json) throws Exception {

Concat result = Concat.builder()
.files(json ? JacksonMapper.ofJson().writeValueAsString(files) : files)
.separator("\n")
.extension(".yml")
.separator(Property.of("\n"))
.extension(Property.of(".yml"))
.build();

Concat.Output run = result.run(runContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.storage;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.JacksonMapper;
Expand Down Expand Up @@ -48,7 +49,7 @@ void shouldDeduplicateFileGivenKeyExpression() throws Exception {

DeduplicateItems task = DeduplicateItems
.builder()
.from(generateKeyValueFile(values, runContext).toString())
.from(Property.of(generateKeyValueFile(values, runContext).toString()))
.expr(" {{ key }} ")
.build();

Expand Down Expand Up @@ -87,7 +88,7 @@ void shouldDeduplicateFileGivenKeyExpressionReturningArray() throws Exception {

DeduplicateItems task = DeduplicateItems
.builder()
.from(generateKeyValueFile(values, runContext).toString())
.from(Property.of(generateKeyValueFile(values, runContext).toString()))
.expr(" {{ key }}-{{ v1 }}")
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.core.storage;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import org.junit.jupiter.api.Test;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
Expand Down Expand Up @@ -39,7 +40,7 @@ void run() throws Exception {


Delete bash = Delete.builder()
.uri(put.toString())
.uri(Property.of(put.toString()))
.build();

Delete.Output run = bash.run(runContext);
Expand All @@ -50,8 +51,8 @@ void run() throws Exception {

assertThrows(NoSuchElementException.class, () -> {
Delete error = Delete.builder()
.uri(put.toString())
.errorOnMissing(true)
.uri(Property.of(put.toString()))
.errorOnMissing(Property.of(true))
.build();

error.run(runContext);
Expand Down
Loading

0 comments on commit c4ec56f

Please sign in to comment.