Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import io.prestosql.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
Expand Down Expand Up @@ -94,7 +93,6 @@
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcLazyReadSmallRanges;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcMaxBufferSize;
Expand Down Expand Up @@ -165,6 +163,7 @@ public ConnectorPageSource createPageSource(
new Path(split.getPath()),
split.getStart(),
split.getLength(),
split.getFileSize(),
split.getFileFormat(),
regularColumns,
table.getPredicate());
Expand All @@ -178,20 +177,13 @@ private ConnectorPageSource createDataPageSource(
Path path,
long start,
long length,
long fileSize,
FileFormat fileFormat,
List<IcebergColumnHandle> dataColumns,
TupleDomain<IcebergColumnHandle> predicate)
{
switch (fileFormat) {
case ORC:
FileStatus fileStatus = null;
try {
fileStatus = hdfsEnvironment.doAs(session.getUser(), () -> hdfsEnvironment.getFileSystem(hdfsContext, path).getFileStatus(path));
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, e);
}
long fileSize = fileStatus.getLen();
return createOrcPageSource(
hdfsEnvironment,
session.getUser(),
Expand Down Expand Up @@ -220,6 +212,7 @@ private ConnectorPageSource createDataPageSource(
path,
start,
length,
fileSize,
dataColumns,
parquetReaderOptions
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
Expand Down Expand Up @@ -346,6 +339,7 @@ private static ConnectorPageSource createParquetPageSource(
Path path,
long start,
long length,
long fileSize,
List<IcebergColumnHandle> regularColumns,
ParquetReaderOptions options,
TupleDomain<IcebergColumnHandle> effectivePredicate,
Expand All @@ -356,10 +350,8 @@ private static ConnectorPageSource createParquetPageSource(
ParquetDataSource dataSource = null;
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(user, path, configuration);
FileStatus fileStatus = hdfsEnvironment.doAs(user, () -> fileSystem.getFileStatus(path));
long estimatedFileSize = fileStatus.getLen();
FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path));
dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), estimatedFileSize, inputStream, fileFormatDataSourceStats, options);
dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, fileFormatDataSourceStats, options);
ParquetDataSource theDataSource = dataSource; // extra variable required for lambda below
ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(theDataSource));
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class IcebergSplit
private final String path;
private final long start;
private final long length;
private final long fileSize;
private final FileFormat fileFormat;
private final List<HostAddress> addresses;
private final Map<Integer, String> partitionKeys;
Expand All @@ -43,13 +44,15 @@ public IcebergSplit(
@JsonProperty("path") String path,
@JsonProperty("start") long start,
@JsonProperty("length") long length,
@JsonProperty("fileSize") long fileSize,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("partitionKeys") Map<Integer, String> partitionKeys)
{
this.path = requireNonNull(path, "path is null");
this.start = start;
this.length = length;
this.fileSize = fileSize;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.partitionKeys = Collections.unmodifiableMap(requireNonNull(partitionKeys, "partitionKeys is null"));
Expand Down Expand Up @@ -86,6 +89,12 @@ public long getLength()
return length;
}

@JsonProperty
public long getFileSize()
{
return fileSize;
}

@JsonProperty
public FileFormat getFileFormat()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
task.file().path().toString(),
task.start(),
task.length(),
task.file().fileSizeInBytes(),
task.file().format(),
ImmutableList.of(),
getPartitionKeys(task));
Expand Down