diff --git a/api/src/main/java/org/apache/iceberg/io/OutputFile.java b/api/src/main/java/org/apache/iceberg/io/OutputFile.java index 34b4e54abf62..67195c46c448 100644 --- a/api/src/main/java/org/apache/iceberg/io/OutputFile.java +++ b/api/src/main/java/org/apache/iceberg/io/OutputFile.java @@ -48,6 +48,7 @@ public interface OutputFile { * * @return an output stream that can report its position * @throws RuntimeIOException If the implementation throws an {@link IOException} + * @throws SecurityException If staging directory creation fails due to missing JVM level permission */ PositionOutputStream createOrOverwrite(); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 690dc9aea255..f66a2967d05f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -175,6 +175,7 @@ private void newStream() throws IOException { stream.close(); } + createStagingDirectoryIfNotExists(); currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory); currentStagingFile.deleteOnExit(); stagingFiles.add(currentStagingFile); @@ -328,6 +329,26 @@ private static InputStream uncheckedInputStream(File file) { } } + private void createStagingDirectoryIfNotExists() throws IOException, SecurityException { + if (!stagingDirectory.exists()) { + LOG.info("Staging directory does not exist, trying to create one: {}", + stagingDirectory.getAbsolutePath()); + boolean createdStagingDirectory = stagingDirectory.mkdirs(); + if (createdStagingDirectory) { + LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath()); + } else { + if (stagingDirectory.exists()) { + LOG.info("Successfully created staging directory by another process: {}", + stagingDirectory.getAbsolutePath()); + } else { + throw new IOException( + "Failed to create staging directory due to some unknown reason: " + stagingDirectory + .getAbsolutePath()); + } + } + } + } + @SuppressWarnings("checkstyle:NoFinalizer") @Override protected void finalize() throws Throwable { diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java index b4dc1ecf0b5e..b0f8b7384513 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java @@ -20,6 +20,7 @@ package org.apache.iceberg.aws.s3; import com.adobe.testing.s3mock.junit4.S3MockRule; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; @@ -29,6 +30,7 @@ import java.util.stream.Stream; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -72,6 +74,7 @@ public class S3OutputStreamTest { private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3)); private final Random random = new Random(1); private final Path tmpDir = Files.createTempDirectory("s3fileio-test-"); + private final String newTmpDirectory = "/tmp/newStagingDirectory"; private final AwsProperties properties = new AwsProperties(ImmutableMap.of( AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024), @@ -85,6 +88,14 @@ public void before() { s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build()); } + @After + public void after() { + File newStagingDirectory = new File(newTmpDirectory); + if (newStagingDirectory.exists()) { + newStagingDirectory.delete(); + } + } + @Test public void testWrite() { // Run tests for both byte and array write paths @@ -140,6 +151,14 @@ public void testMultipleClose() throws IOException { stream.close(); } + @Test + public void testStagingDirectoryCreation() throws IOException { + AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of( + AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory)); + S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties); + stream.close(); + } + private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) { try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) { if (arrayWrite) {