Skip to content

Commit

Permalink
fix(core): fromIon is reading only the first rows by default, adding …
Browse files Browse the repository at this point in the history
…a parameter to read all rows
  • Loading branch information
tchiotludo committed Jan 22, 2025
1 parent 33308c4 commit b84b5c1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package io.kestra.core.runners.pebble.functions;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.FileSerde;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import reactor.core.publisher.Flux;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class FromIonFunction implements Function {
private static final ObjectMapper MAPPER = JacksonMapper.ofIon();

public List<String> getArgumentNames() {
return List.of("ion");
return List.of("ion", "allRows");
}

@Override
Expand All @@ -32,11 +33,26 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC
throw new PebbleException(null, "The 'fromIon' function expects an argument 'ion' with type string.", lineNumber, self.getName());
}

String ion = (String) args.get("ion");;
boolean allRows = args.containsKey("allRows") ? (Boolean) args.get("allRows") : false;

try {
return MAPPER.readValue(ion, JacksonMapper.OBJECT_TYPE_REFERENCE);
} catch (JsonProcessingException e) {
String ion = (String) args.get("ion");;

Flux<Object> flux = FileSerde.readAll(new BufferedReader(new StringReader(ion)));

if (!allRows) {
flux = flux.take(1);
}

Stream<Object> data = flux
.toStream();

if (allRows) {
return data.toList();
}

return data.findFirst().orElse(null);
} catch (RuntimeException | IOException e) {
throw new PebbleException(null, "Invalid ion: " + e.getMessage(), lineNumber, self.getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package io.kestra.core.runners.pebble.functions;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.*;
import java.net.URI;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -17,6 +23,9 @@ class FromIonFunctionTest {
@Inject
VariableRenderer variableRenderer;

@Inject
StorageInterface storageInterface;

@Test
void ionDecodeFunction() throws IllegalVariableEvaluationException {
String render = variableRenderer.render("{{ fromIon('{date:2024-04-21T23:00:00.000Z, title:\"Main_Page\",views:109787}').title }}", Map.of());
Expand All @@ -26,6 +35,35 @@ void ionDecodeFunction() throws IllegalVariableEvaluationException {
assertThat(render, emptyString());
}

@Test
void multiLine() throws IllegalVariableEvaluationException, IOException {
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);
for (int i = 0; i < 10; i++) {
FileSerde.write(output, ImmutableMap.of(
"id", i,
"name", "john"
));
}

Map<String, Object> variables = Map.of(
"flow", Map.of("id", "test", "namespace", "unit"),
"execution", Map.of("id", "id-exec")
);

URI internalStorageURI = URI.create("/unit/test/executions/id-exec/" + IdUtils.create() + ".ion");
URI internalStorageFile = storageInterface.put(null, "unit", internalStorageURI, new FileInputStream(tempFile));

String render = variableRenderer.render("{{ fromIon(read('" + internalStorageFile + "'), allRows=true) }}", variables);
assertThat(render, containsString("\"id\":0"));
assertThat(render, containsString("\"id\":9"));

render = variableRenderer.render("{{ fromIon(read('" + internalStorageFile + "')) }}", variables);
assertThat(render, containsString("\"id\":0"));
assertThat(render, not((containsString("\"id\":9"))));
}


@Test
void exception() {
assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ fromIon() }}", Map.of()));
Expand Down

0 comments on commit b84b5c1

Please sign in to comment.