Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public interface FileIO extends Serializable, Closeable {
*/
InputFile newInputFile(String path);

/**
* Get a {@link InputFile} instance to read bytes from the file at the given path, with a known file length.
*/
default InputFile newInputFile(String path, long length) {
return newInputFile(path);
}

/**
* Get a {@link OutputFile} instance to write bytes to the file at the given path.
*/
Expand Down
5 changes: 5 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public InputFile newInputFile(String path) {
return S3InputFile.fromLocation(path, client(), awsProperties, metrics);
}

@Override
public InputFile newInputFile(String path, long length) {
return S3InputFile.fromLocation(path, length, client(), awsProperties, metrics);
}

@Override
public OutputFile newOutputFile(String path) {
return S3OutputFile.fromLocation(path, client(), awsProperties, metrics);
Expand Down
18 changes: 15 additions & 3 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,23 @@

public class S3InputFile extends BaseS3File implements InputFile, NativelyEncryptedFile {
private NativeFileCryptoParameters nativeDecryptionParameters;
private Long length;

public static S3InputFile fromLocation(String location, S3Client client, AwsProperties awsProperties,
MetricsContext metrics) {
return new S3InputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()),
return new S3InputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()), null,
awsProperties, metrics);
}

S3InputFile(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics) {
public static S3InputFile fromLocation(String location, long length, S3Client client, AwsProperties awsProperties,
MetricsContext metrics) {
return new S3InputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()),
length > 0 ? length : null, awsProperties, metrics);
}

S3InputFile(S3Client client, S3URI uri, Long length, AwsProperties awsProperties, MetricsContext metrics) {
super(client, uri, awsProperties, metrics);
this.length = length;
}

/**
Expand All @@ -47,7 +55,11 @@ public static S3InputFile fromLocation(String location, S3Client client, AwsProp
*/
@Override
public long getLength() {
return getObjectMetadata().contentLength();
if (length == null) {
this.length = getObjectMetadata().contentLength();
}

return length;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public PositionOutputStream createOrOverwrite() {

@Override
public InputFile toInputFile() {
return new S3InputFile(client(), uri(), awsProperties(), metrics());
return new S3InputFile(client(), uri(), null, awsProperties(), metrics());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to GCS, could we just call S3InputFile.fromLocation(...) to avoid calling the constructor with null?

}

@Override
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO io) {
public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s", manifest);
InputFile file = io.newInputFile(manifest.path());
InputFile file = io.newInputFile(manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES);
}
Expand Down Expand Up @@ -133,7 +133,7 @@ public static ManifestReader<DeleteFile> readDeleteManifest(ManifestFile manifes
Map<Integer, PartitionSpec> specsById) {
Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
"Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
InputFile file = io.newInputFile(manifest.path());
InputFile file = io.newInputFile(manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ private Set<String> findFilesToDelete(Set<ManifestFile> manifestsToScan, Set<Man
}

private static final Schema MANIFEST_PROJECTION = ManifestFile.schema()
.select("manifest_path", "added_snapshot_id", "deleted_data_files_count");
.select("manifest_path", "manifest_length", "added_snapshot_id", "deleted_data_files_count");

private CloseableIterable<ManifestFile> readManifestFiles(Snapshot snapshot) {
if (snapshot.manifestListLocation() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public InputFile newInputFile(String path) {
return HadoopInputFile.fromLocation(path, hadoopConf.get());
}

@Override
public InputFile newInputFile(String path, long length) {
return HadoopInputFile.fromLocation(path, length, hadoopConf.get());
}

@Override
public OutputFile newOutputFile(String path) {
return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public static HadoopInputFile fromLocation(CharSequence location, Configuration
public static HadoopInputFile fromLocation(CharSequence location, long length,
Configuration conf) {
FileSystem fs = Util.getFs(new Path(location.toString()), conf);
return new HadoopInputFile(fs, location.toString(), length, conf);
if (length > 0) {
return new HadoopInputFile(fs, location.toString(), length, conf);
} else {
return new HadoopInputFile(fs, location.toString(), conf);
}
}

public static HadoopInputFile fromLocation(CharSequence location, FileSystem fs) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public InputFile newInputFile(String location) {
return io(location).newInputFile(location);
}

@Override
public InputFile newInputFile(String location, long length) {
return io(location).newInputFile(location, length);
}

@Override
public OutputFile newOutputFile(String location) {
return io(location).newOutputFile(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public void dropTableDataDeletesExpectedFiles() {
FileIO fileIO = Mockito.mock(FileIO.class);
Mockito.when(fileIO.newInputFile(Mockito.anyString()))
.thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0)));
Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong()))
.thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1)));

CatalogUtil.dropTableData(fileIO, tableMetadata);
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Expand Down Expand Up @@ -90,6 +92,8 @@ public void dropTableDataDoNotThrowWhenDeletesFail() {
FileIO fileIO = Mockito.mock(FileIO.class);
Mockito.when(fileIO.newInputFile(Mockito.anyString()))
.thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0)));
Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong()))
.thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1)));
Mockito.doThrow(new RuntimeException()).when(fileIO).deleteFile(ArgumentMatchers.anyString());

CatalogUtil.dropTableData(fileIO, tableMetadata);
Expand Down
5 changes: 5 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public InputFile newInputFile(String path) {
return GCSInputFile.fromLocation(path, client(), gcpProperties, metrics);
}

@Override
public InputFile newInputFile(String path, long length) {
return GCSInputFile.fromLocation(path, length, client(), gcpProperties, metrics);
}

@Override
public OutputFile newOutputFile(String path) {
return GCSOutputFile.fromLocation(path, client(), gcpProperties, metrics);
Expand Down
18 changes: 15 additions & 3 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,31 @@
import org.apache.iceberg.metrics.MetricsContext;

class GCSInputFile extends BaseGCSFile implements InputFile {
private Long length;

static GCSInputFile fromLocation(String location, Storage storage,
GCPProperties gcpProperties, MetricsContext metrics) {
return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), null, gcpProperties, metrics);
}

static GCSInputFile fromLocation(String location, long length, Storage storage,
GCPProperties gcpProperties, MetricsContext metrics) {
return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), gcpProperties, metrics);
return new GCSInputFile(
storage, BlobId.fromGsUtilUri(location), length > 0 ? length : null, gcpProperties, metrics);
}

GCSInputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties, MetricsContext metrics) {
GCSInputFile(Storage storage, BlobId blobId, Long length, GCPProperties gcpProperties, MetricsContext metrics) {
super(storage, blobId, gcpProperties, metrics);
this.length = length;
}

@Override
public long getLength() {
return getBlob().getSize();
if (length == null) {
this.length = getBlob().getSize();
}

return length;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public PositionOutputStream createOrOverwrite() {

@Override
public InputFile toInputFile() {
return new GCSInputFile(storage(), blobId(), gcpProperties(), metrics());
return new GCSInputFile(storage(), blobId(), null, gcpProperties(), metrics());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just use GCSInputFile.fromLocation(...) to avoid calling the constructor with null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just didn't want to change very much here. We probably could.

}
}