diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index 4d47839d401..198edc0f806 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -465,42 +465,59 @@ private URI putTempFile(File file, String prefix, String name) throws IOExceptio } @SuppressWarnings("unchecked") - private String taskStateFilePathPrefix(String name) { + private String taskStateFilePathPrefix(String name, Boolean namespace) { Map taskrun = (Map) this.getVariables().get("taskrun"); return "/" + this.storageInterface.statePrefix( ((Map) this.getVariables().get("flow")).get("namespace"), - ((Map) this.getVariables().get("flow")).get("id"), + namespace ? null : ((Map) this.getVariables().get("flow")).get("id"), name, taskrun != null ? taskrun.getOrDefault("value", null) : null ); } public InputStream getTaskStateFile(String state, String name) throws IOException { - URI uri = URI.create(this.taskStateFilePathPrefix(state)); + return this.getTaskStateFile(state, name, false); + } + + + public InputStream getTaskStateFile(String state, String name, Boolean namespace) throws IOException { + URI uri = URI.create(this.taskStateFilePathPrefix(state, namespace)); URI resolve = uri.resolve(uri.getPath() + "/" + name); return this.storageInterface.get(resolve); } public URI putTaskStateFile(byte[] content, String state, String name) throws IOException { + return this.putTaskStateFile(content, state, name, false); + } + + public URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace) throws IOException { return this.putTempFile( new ByteArrayInputStream(content), - this.taskStateFilePathPrefix(state), + this.taskStateFilePathPrefix(state, namespace), name ); } public URI putTaskStateFile(File file, String state, String name) throws IOException { + return this.putTaskStateFile(file, state, name, false); + } + + public URI putTaskStateFile(File file, String state, String name, Boolean namespace) throws IOException { return this.putTempFile( file, - this.taskStateFilePathPrefix(state), + this.taskStateFilePathPrefix(state, namespace), name ); } public boolean deleteTaskStateFile(String state, String name) throws IOException { - URI uri = URI.create(this.taskStateFilePathPrefix(state)); + return this.deleteTaskStateFile(state, name, false); + } + + public boolean deleteTaskStateFile(String state, String name, Boolean namespace) throws IOException { + URI uri = URI.create(this.taskStateFilePathPrefix(state, namespace)); URI resolve = uri.resolve(uri.getPath() + "/" + name); return this.storageInterface.delete(resolve); diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterface.java b/core/src/main/java/io/kestra/core/storages/StorageInterface.java index ed29e0c55fb..419b9414906 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageInterface.java +++ b/core/src/main/java/io/kestra/core/storages/StorageInterface.java @@ -63,12 +63,19 @@ default String executionPrefix(TaskRun taskRun) { ); } - default String statePrefix(String namespace, String flowId, @Nullable String name, @Nullable String value) { - ArrayList paths = new ArrayList<>(List.of( - namespace.replace(".", "/"), - Slugify.of(flowId), - "states" - )); + default String statePrefix(String namespace, @Nullable String flowId, @Nullable String name, @Nullable String value) { + String namespacePrefix = namespace.replace(".", "/"); + + ArrayList paths = new ArrayList<>( + flowId == null ? List.of( + namespacePrefix, + "states" + ) : List.of( + namespacePrefix, + Slugify.of(flowId), + "states" + ) + ); if (name != null) { paths.add(name); diff --git a/core/src/main/java/io/kestra/core/tasks/states/AbstractState.java b/core/src/main/java/io/kestra/core/tasks/states/AbstractState.java index 636e0fbb180..f40f597b20c 100644 --- a/core/src/main/java/io/kestra/core/tasks/states/AbstractState.java +++ b/core/src/main/java/io/kestra/core/tasks/states/AbstractState.java @@ -36,8 +36,16 @@ public abstract class AbstractState extends Task { @Builder.Default protected String name = "default"; + @Schema( + title = "Share state for the current namespace", + description = "By default, the state is isolated by namespace **and** flow, setting to `true` will allow to share the state between the **same** namespace" + ) + @PluginProperty(dynamic = true) + @Builder.Default + private final Boolean namespace = false; + protected Map get(RunContext runContext) throws IllegalVariableEvaluationException, IOException { - InputStream taskStateFile = runContext.getTaskStateFile("tasks-states", runContext.render(this.name)); + InputStream taskStateFile = runContext.getTaskStateFile("tasks-states", runContext.render(this.name), this.namespace); return JacksonMapper.ofJson(false).readValue(taskStateFile, TYPE_REFERENCE); } @@ -56,13 +64,14 @@ protected Pair> merge(RunContext runContext, Map HttpResponse validateFile(String executionId, URI path, String return null; } + prefix = storageInterface.statePrefix(flow.get().getNamespace(), null, null, null); + if (path.getPath().substring(1).startsWith(prefix)) { + return null; + } + // maybe redirect to correct execution Optional redirectedExecution = storageInterface.extractExecutionId(path);