diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index 187b53fa2a0f..7bf725ff7e6e 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -37,6 +37,13 @@ public interface FileIO extends Serializable, Closeable { */ InputFile newInputFile(String path); + /** + * Get a {@link InputFile} instance to read bytes from the file at the given path, with a known file length. + */ + default InputFile newInputFile(String path, long length) { + return newInputFile(path); + } + /** * Get a {@link OutputFile} instance to write bytes to the file at the given path. */ diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index f09eb948c545..5d5d88eafbe9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -119,6 +119,11 @@ public InputFile newInputFile(String path) { return S3InputFile.fromLocation(path, client(), awsProperties, metrics); } + @Override + public InputFile newInputFile(String path, long length) { + return S3InputFile.fromLocation(path, length, client(), awsProperties, metrics); + } + @Override public OutputFile newOutputFile(String path) { return S3OutputFile.fromLocation(path, client(), awsProperties, metrics); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java index 8e8a81e7bfd4..e4862e0dcfbd 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java @@ -29,15 +29,23 @@ public class S3InputFile extends BaseS3File implements InputFile, NativelyEncryptedFile { private NativeFileCryptoParameters nativeDecryptionParameters; + private Long length; public static S3InputFile fromLocation(String location, S3Client client, AwsProperties awsProperties, MetricsContext metrics) { - return new S3InputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()), + return new S3InputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()), null, awsProperties, metrics); } - S3InputFile(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics) { + public static S3InputFile fromLocation(String location, long length, S3Client client, AwsProperties awsProperties, + MetricsContext metrics) { + return new S3InputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()), + length > 0 ? length : null, awsProperties, metrics); + } + + S3InputFile(S3Client client, S3URI uri, Long length, AwsProperties awsProperties, MetricsContext metrics) { super(client, uri, awsProperties, metrics); + this.length = length; } /** @@ -47,7 +55,11 @@ public static S3InputFile fromLocation(String location, S3Client client, AwsProp */ @Override public long getLength() { - return getObjectMetadata().contentLength(); + if (length == null) { + this.length = getObjectMetadata().contentLength(); + } + + return length; } @Override diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java index 7c29097d6cfb..f2af425f1c08 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java @@ -70,7 +70,7 @@ public PositionOutputStream createOrOverwrite() { @Override public InputFile toInputFile() { - return new S3InputFile(client(), uri(), awsProperties(), metrics()); + return new S3InputFile(client(), uri(), null, awsProperties(), metrics()); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 9dbf50e762c7..52e82c969621 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -82,7 +82,7 @@ public static ManifestReader read(ManifestFile manifest, FileIO io) { public static ManifestReader read(ManifestFile manifest, FileIO io, Map specsById) { Preconditions.checkArgument(manifest.content() == ManifestContent.DATA, "Cannot read a delete manifest with a ManifestReader: %s", manifest); - InputFile file = io.newInputFile(manifest.path()); + InputFile file = io.newInputFile(manifest.path(), manifest.length()); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES); } @@ -133,7 +133,7 @@ public static ManifestReader readDeleteManifest(ManifestFile manifes Map specsById) { Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES, "Cannot read a data manifest with a DeleteManifestReader: %s", manifest); - InputFile file = io.newInputFile(manifest.path()); + InputFile file = io.newInputFile(manifest.path(), manifest.length()); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES); } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index cfad2a4b3b21..dd96b365969a 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -563,7 +563,7 @@ private Set findFilesToDelete(Set manifestsToScan, Set readManifestFiles(Snapshot snapshot) { if (snapshot.manifestListLocation() != null) { diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index fd207eb2f737..2ba2cbf2693b 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -67,6 +67,11 @@ public InputFile newInputFile(String path) { return HadoopInputFile.fromLocation(path, hadoopConf.get()); } + @Override + public InputFile newInputFile(String path, long length) { + return HadoopInputFile.fromLocation(path, length, hadoopConf.get()); + } + @Override public OutputFile newOutputFile(String path) { return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get()); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java index 7393c91ce32b..8e39dcaf09c7 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java @@ -61,7 +61,11 @@ public static HadoopInputFile fromLocation(CharSequence location, Configuration public static HadoopInputFile fromLocation(CharSequence location, long length, Configuration conf) { FileSystem fs = Util.getFs(new Path(location.toString()), conf); - return new HadoopInputFile(fs, location.toString(), length, conf); + if (length > 0) { + return new HadoopInputFile(fs, location.toString(), length, conf); + } else { + return new HadoopInputFile(fs, location.toString(), conf); + } } public static HadoopInputFile fromLocation(CharSequence location, FileSystem fs) { diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index 3b6d9725f521..3815d5da5402 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -63,6 +63,11 @@ public InputFile newInputFile(String location) { return io(location).newInputFile(location); } + @Override + public InputFile newInputFile(String location, long length) { + return io(location).newInputFile(location, length); + } + @Override public OutputFile newOutputFile(String location) { return io(location).newOutputFile(location); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index 4c114a280465..449445a91833 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -60,6 +60,8 @@ public void dropTableDataDeletesExpectedFiles() { FileIO fileIO = Mockito.mock(FileIO.class); Mockito.when(fileIO.newInputFile(Mockito.anyString())) .thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0))); + Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong())) + .thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1))); CatalogUtil.dropTableData(fileIO, tableMetadata); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); @@ -90,6 +92,8 @@ public void dropTableDataDoNotThrowWhenDeletesFail() { FileIO fileIO = Mockito.mock(FileIO.class); Mockito.when(fileIO.newInputFile(Mockito.anyString())) .thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0))); + Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong())) + .thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1))); Mockito.doThrow(new RuntimeException()).when(fileIO).deleteFile(ArgumentMatchers.anyString()); CatalogUtil.dropTableData(fileIO, tableMetadata); diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index 24e37e23d841..ecb520f1d20b 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -80,6 +80,11 @@ public InputFile newInputFile(String path) { return GCSInputFile.fromLocation(path, client(), gcpProperties, metrics); } + @Override + public InputFile newInputFile(String path, long length) { + return GCSInputFile.fromLocation(path, length, client(), gcpProperties, metrics); + } + @Override public OutputFile newOutputFile(String path) { return GCSOutputFile.fromLocation(path, client(), gcpProperties, metrics); diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java index 82b6f10ae8cc..c220615e40ec 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java @@ -27,19 +27,31 @@ import org.apache.iceberg.metrics.MetricsContext; class GCSInputFile extends BaseGCSFile implements InputFile { + private Long length; static GCSInputFile fromLocation(String location, Storage storage, + GCPProperties gcpProperties, MetricsContext metrics) { + return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), null, gcpProperties, metrics); + } + + static GCSInputFile fromLocation(String location, long length, Storage storage, GCPProperties gcpProperties, MetricsContext metrics) { - return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), gcpProperties, metrics); + return new GCSInputFile( + storage, BlobId.fromGsUtilUri(location), length > 0 ? length : null, gcpProperties, metrics); } - GCSInputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties, MetricsContext metrics) { + GCSInputFile(Storage storage, BlobId blobId, Long length, GCPProperties gcpProperties, MetricsContext metrics) { super(storage, blobId, gcpProperties, metrics); + this.length = length; } @Override public long getLength() { - return getBlob().getSize(); + if (length == null) { + this.length = getBlob().getSize(); + } + + return length; } @Override diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java index 9c01b5dae0b4..a8f5d608082b 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java @@ -67,6 +67,6 @@ public PositionOutputStream createOrOverwrite() { @Override public InputFile toInputFile() { - return new GCSInputFile(storage(), blobId(), gcpProperties(), metrics()); + return new GCSInputFile(storage(), blobId(), null, gcpProperties(), metrics()); } }