From 01a2868c6bc723c32d3ecd50dc891d3b8feb7998 Mon Sep 17 00:00:00 2001 From: amogh-jahagirdar Date: Wed, 13 Mar 2024 11:24:26 -0700 Subject: [PATCH] API: Fix default FileIO#newInputFile ManifestFile, DataFile and DeleteFile implementations --- .../java/org/apache/iceberg/io/FileIO.java | 12 ++-- .../apache/iceberg/aws/s3/TestS3FileIO.java | 58 +++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index fc6a53367f21..a521cbf79d7f 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -49,25 +49,25 @@ default InputFile newInputFile(String path, long length) { default InputFile newInputFile(DataFile file) { Preconditions.checkArgument( file.keyMetadata() == null, - "Cannot decrypt data file: {} (use EncryptingFileIO)", + "Cannot decrypt data file: %s (use EncryptingFileIO)", file.path()); - return newInputFile(file.path().toString()); + return newInputFile(file.path().toString(), file.fileSizeInBytes()); } default InputFile newInputFile(DeleteFile file) { Preconditions.checkArgument( file.keyMetadata() == null, - "Cannot decrypt delete file: {} (use EncryptingFileIO)", + "Cannot decrypt delete file: %s (use EncryptingFileIO)", file.path()); - return newInputFile(file.path().toString()); + return newInputFile(file.path().toString(), file.fileSizeInBytes()); } default InputFile newInputFile(ManifestFile manifest) { Preconditions.checkArgument( manifest.keyMetadata() == null, - "Cannot decrypt manifest: {} (use EncryptingFileIO)", + "Cannot decrypt manifest: %s (use EncryptingFileIO)", manifest.path()); - return newInputFile(manifest.path()); + return newInputFile(manifest.path(), manifest.length()); } /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index a74e574c9751..26c9bc133b13 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -22,6 +22,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -38,6 +40,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.AwsProperties; @@ -74,6 +83,7 @@ import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.S3Error; @ExtendWith(S3MockExtension.class) @@ -377,6 +387,54 @@ public void testResolvingFileIOLoad() { Assertions.assertThat(result).isInstanceOf(S3FileIO.class); } + @Test + public void testInputFileWithDataFile() throws IOException { + String location = "s3://bucket/path/to/data-file.parquet"; + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(location) + .withFileSizeInBytes(123L) + .withFormat(FileFormat.PARQUET) + .withRecordCount(123L) + .build(); + OutputStream outputStream = s3FileIO.newOutputFile(location).create(); + byte[] data = "testing".getBytes(); + outputStream.write(data); + outputStream.close(); + + InputFile inputFile = s3FileIO.newInputFile(dataFile); + reset(s3mock); + + Assertions.assertThat(inputFile.getLength()) + .as("Data file length should be determined from the file size stats") + .isEqualTo(123L); + verify(s3mock, never()).headObject(any(HeadObjectRequest.class)); + } + + @Test + public void testInputFileWithManifest() throws IOException { + String dataFileLocation = "s3://bucket/path/to/data-file-2.parquet"; + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(dataFileLocation) + .withFileSizeInBytes(123L) + .withFormat(FileFormat.PARQUET) + .withRecordCount(123L) + .build(); + String manifestLocation = "s3://bucket/path/to/manifest.avro"; + OutputFile outputFile = s3FileIO.newOutputFile(manifestLocation); + ManifestWriter writer = + ManifestFiles.write(PartitionSpec.unpartitioned(), outputFile); + writer.add(dataFile); + writer.close(); + ManifestFile manifest = writer.toManifestFile(); + InputFile inputFile = s3FileIO.newInputFile(manifest); + reset(s3mock); + + Assertions.assertThat(inputFile.getLength()).isEqualTo(manifest.length()); + verify(s3mock, never()).headObject(any(HeadObjectRequest.class)); + } + private void createRandomObjects(String prefix, int count) { S3URI s3URI = new S3URI(prefix);