diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index b52a7e8fc38..37d1c818390 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -466,14 +466,14 @@ private URI putTempFile(InputStream inputStream, String prefix, String name) thr } private URI putTempFile(File file, String prefix, String name) throws IOException { - URI put = this.putTempFile(new FileInputStream(file), prefix, (name != null ? name : file.getName())); - - boolean delete = file.delete(); - if (!delete) { - runContextLogger.logger().warn("Failed to delete temporary file"); + try (InputStream fileInput = new FileInputStream(file)) { + return this.putTempFile(fileInput, prefix, (name != null ? name : file.getName())); + } finally { + boolean delete = file.delete(); + if (!delete) { + runContextLogger.logger().warn("Failed to delete temporary file"); + } } - - return put; } @SuppressWarnings("unchecked") @@ -505,11 +505,13 @@ public URI putTaskStateFile(byte[] content, String state, String name) throws IO } public URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace) throws IOException { - return this.putTempFile( - new ByteArrayInputStream(content), - this.taskStateFilePathPrefix(state, namespace), - name - ); + try (InputStream inputStream = new ByteArrayInputStream(content)) { + return this.putTempFile( + inputStream, + this.taskStateFilePathPrefix(state, namespace), + name + ); + } } public URI putTaskStateFile(File file, String state, String name) throws IOException { diff --git a/core/src/test/java/io/kestra/core/runners/RunContextTest.java b/core/src/test/java/io/kestra/core/runners/RunContextTest.java index 128f3271f56..f9cd3af1e9b 100644 --- a/core/src/test/java/io/kestra/core/runners/RunContextTest.java +++ b/core/src/test/java/io/kestra/core/runners/RunContextTest.java @@ -9,6 +9,7 @@ import io.kestra.core.models.flows.State; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; +import io.kestra.core.storages.StorageInterface; import io.kestra.core.utils.TestsUtils; import io.micronaut.context.annotation.Property; import org.exparity.hamcrest.date.ZonedDateTimeMatchers; @@ -16,6 +17,7 @@ import org.slf4j.event.Level; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Path; import java.time.Duration; @@ -29,8 +31,7 @@ import jakarta.inject.Named; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.*; import static org.hamcrest.number.OrderingComparison.greaterThan; @Property(name = "kestra.tasks.tmp-dir.path", value = "/tmp/sub/dir/tmp/") @@ -45,6 +46,9 @@ class RunContextTest extends AbstractMemoryRunnerTest { @Inject RunContextFactory runContextFactory; + @Inject + StorageInterface storageInterface; + @Inject MetricRegistry metricRegistry; @@ -152,6 +156,21 @@ void tempFiles() throws IOException { assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/"), is(true)); } + @Test + void largeInput() throws IOException, InterruptedException { + RunContext runContext = runContextFactory.of(); + Path path = runContext.tempFile(); + + long size = 1024L * 1024 * 1024; + + Process p = Runtime.getRuntime().exec(String.format("dd if=/dev/zero of=%s bs=1 count=1 seek=%s", path, size)); + p.waitFor(); + p.destroy(); + + URI uri = runContext.putTempFile(path.toFile()); + assertThat(storageInterface.size(uri), is(size+1)); + } + @Test void invalidTaskDefaults() throws TimeoutException, IOException, URISyntaxException { repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/invalid-task-defaults.yaml")));