Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.io;

import java.io.IOException;

/**
* {@code InputFile} is an interface with the methods needed by Parquet to read
* data files using {@link SeekableInputStream} instances.
*/
public interface InputFile {

/**
* Returns the total length of the file, in bytes.
* @throws IOException if the length cannot be determined
*/
long getLength() throws IOException;

/**
* Opens a new {@link SeekableInputStream} for the underlying
* data file.
* @throws IOException if the stream cannot be opened.
*/
SeekableInputStream newStream() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.InputFile;

/**
* Internal implementation of the Parquet file reader as a block container
Expand Down Expand Up @@ -410,8 +412,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, Path
* @throws IOException if an error occurs while reading the file
*/
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
FileSystem fileSystem = file.getFileSystem(configuration);
return readFooter(configuration, fileSystem.getFileStatus(file), filter);
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
}

/**
Expand All @@ -431,12 +432,21 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
* @throws IOException if an error occurs while reading the file
*/
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
FileSystem fileSystem = file.getPath().getFileSystem(configuration);
SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
try {
return readFooter(file.getLen(), file.getPath().toString(), in, filter);
} finally {
in.close();
return readFooter(HadoopInputFile.fromStatus(file, configuration), filter);
}

/**
* Reads the meta data block in the footer of the file using provided input stream
* @param file a {@link InputFile} to read
* @param filter the filter to apply to row groups
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
public static final ParquetMetadata readFooter(
InputFile file, MetadataFilter filter) throws IOException {
try (SeekableInputStream in = file.newStream()) {
return readFooter(converter, file.getLength(), file.toString(),
in, filter);
}
}

Expand All @@ -449,7 +459,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
if (Log.DEBUG) {
LOG.debug("File length " + fileLen);
}
Expand Down Expand Up @@ -563,7 +573,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
this.f = HadoopStreams.wrap(fs.open(file));
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
this.fileMetaData = footer.getFileMetaData();
this.blocks = footer.getBlocks();
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
Expand Down Expand Up @@ -602,7 +612,7 @@ public ParquetMetadata getFooter() {
if (footer == null) {
try {
// don't read the row groups because this.blocks is always set
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
} catch (IOException e) {
throw new ParquetDecodingException("Unable to read file footer", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.hadoop.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.InputFile;
import java.io.IOException;

public class HadoopInputFile implements InputFile {

private final FileSystem fs;
private final FileStatus stat;

public static HadoopInputFile fromPath(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
return new HadoopInputFile(fs, fs.getFileStatus(path));
}

public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf)
throws IOException {
FileSystem fs = stat.getPath().getFileSystem(conf);
return new HadoopInputFile(fs, stat);
}

private HadoopInputFile(FileSystem fs, FileStatus stat) {
this.fs = fs;
this.stat = stat;
}

@Override
public long getLength() {
return stat.getLen();
}

@Override
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(fs.open(stat.getPath()));
}

@Override
public String toString() {
return stat.getPath().toString();
}
}