Skip to content

Commit

Permalink
feat(core): State task isolation per namespace
Browse files Browse the repository at this point in the history
close #544
  • Loading branch information
tchiotludo committed Aug 2, 2022
1 parent 41a4fdf commit dc40a61
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 15 deletions.
29 changes: 23 additions & 6 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,42 +465,59 @@ private URI putTempFile(File file, String prefix, String name) throws IOExceptio
}

@SuppressWarnings("unchecked")
private String taskStateFilePathPrefix(String name) {
private String taskStateFilePathPrefix(String name, Boolean namespace) {
Map<String, String> taskrun = (Map<String, String>) this.getVariables().get("taskrun");

return "/" + this.storageInterface.statePrefix(
((Map<String, String>) this.getVariables().get("flow")).get("namespace"),
((Map<String, String>) this.getVariables().get("flow")).get("id"),
namespace ? null : ((Map<String, String>) this.getVariables().get("flow")).get("id"),
name,
taskrun != null ? taskrun.getOrDefault("value", null) : null
);
}

public InputStream getTaskStateFile(String state, String name) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(state));
return this.getTaskStateFile(state, name, false);
}


public InputStream getTaskStateFile(String state, String name, Boolean namespace) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(state, namespace));
URI resolve = uri.resolve(uri.getPath() + "/" + name);

return this.storageInterface.get(resolve);
}

public URI putTaskStateFile(byte[] content, String state, String name) throws IOException {
return this.putTaskStateFile(content, state, name, false);
}

public URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace) throws IOException {
return this.putTempFile(
new ByteArrayInputStream(content),
this.taskStateFilePathPrefix(state),
this.taskStateFilePathPrefix(state, namespace),
name
);
}

public URI putTaskStateFile(File file, String state, String name) throws IOException {
return this.putTaskStateFile(file, state, name, false);
}

public URI putTaskStateFile(File file, String state, String name, Boolean namespace) throws IOException {
return this.putTempFile(
file,
this.taskStateFilePathPrefix(state),
this.taskStateFilePathPrefix(state, namespace),
name
);
}

public boolean deleteTaskStateFile(String state, String name) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(state));
return this.deleteTaskStateFile(state, name, false);
}

public boolean deleteTaskStateFile(String state, String name, Boolean namespace) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(state, namespace));
URI resolve = uri.resolve(uri.getPath() + "/" + name);

return this.storageInterface.delete(resolve);
Expand Down
19 changes: 13 additions & 6 deletions core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,19 @@ default String executionPrefix(TaskRun taskRun) {
);
}

default String statePrefix(String namespace, String flowId, @Nullable String name, @Nullable String value) {
ArrayList<String> paths = new ArrayList<>(List.of(
namespace.replace(".", "/"),
Slugify.of(flowId),
"states"
));
default String statePrefix(String namespace, @Nullable String flowId, @Nullable String name, @Nullable String value) {
String namespacePrefix = namespace.replace(".", "/");

ArrayList<String> paths = new ArrayList<>(
flowId == null ? List.of(
namespacePrefix,
"states"
) : List.of(
namespacePrefix,
Slugify.of(flowId),
"states"
)
);

if (name != null) {
paths.add(name);
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/java/io/kestra/core/tasks/states/AbstractState.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ public abstract class AbstractState extends Task {
@Builder.Default
protected String name = "default";

@Schema(
title = "Share state for the current namespace",
description = "By default, the state is isolated by namespace **and** flow, setting to `true` will allow to share the state between the **same** namespace"
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean namespace = false;

protected Map<String, Object> get(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
InputStream taskStateFile = runContext.getTaskStateFile("tasks-states", runContext.render(this.name));
InputStream taskStateFile = runContext.getTaskStateFile("tasks-states", runContext.render(this.name), this.namespace);

return JacksonMapper.ofJson(false).readValue(taskStateFile, TYPE_REFERENCE);
}
Expand All @@ -56,13 +64,14 @@ protected Pair<URI, Map<String, Object>> merge(RunContext runContext, Map<String
URI uri = runContext.putTaskStateFile(
JacksonMapper.ofJson(false).writeValueAsBytes(merge),
"tasks-states",
runContext.render(this.name)
runContext.render(this.name),
this.namespace
);

return Pair.of(uri, merge);
}

protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
return runContext.deleteTaskStateFile("tasks-states", runContext.render(this.name));
return runContext.deleteTaskStateFile("tasks-states", runContext.render(this.name), this.namespace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.kestra.core.tasks.states;

import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest
class StateNamespaceTest {
@Inject
RunContextFactory runContextFactory;

private RunContext runContextFlow1(Task task) {
return TestsUtils.mockRunContext(runContextFactory, task, Map.of());
}

private RunContext runContextFlow2(Task task) {
return TestsUtils.mockRunContext(runContextFactory, task, Map.of());
}

@Test
void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.namespace(true)
.data(Map.of(
"john", "doe"
))
.build();
Set.Output setOutput = set.run(runContextFlow1(set));
assertThat(setOutput.getCount(), is(1));

Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.namespace(true)
.build();
Get.Output getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount(), is(1));
assertThat(getOutput.getData().get("john"), is("doe"));

get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.build();
getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount(), is(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ protected <T> HttpResponse<T> validateFile(String executionId, URI path, String
return null;
}

prefix = storageInterface.statePrefix(flow.get().getNamespace(), null, null, null);
if (path.getPath().substring(1).startsWith(prefix)) {
return null;
}

// maybe redirect to correct execution
Optional<String> redirectedExecution = storageInterface.extractExecutionId(path);

Expand Down

0 comments on commit dc40a61

Please sign in to comment.