Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.io;

import java.io.IOException;

/**
* {@code ParquetDataSource} is an interface with the methods needed by Parquet
* to read data files using {@link SeekableInputStream} instances.
*/
public interface ParquetDataSource {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a SeekableInputStream provider with a length.
maybe call it InputFile ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't too happy with the name either. InputFile is something I hadn't though of and sounds pretty good. I'll go with that.


/**
* Returns the file location.
*/
String getLocation();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we keep this as Path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an abstraction that isn't tied to Hadoop or another FS library. A string location should be portable across implementations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was thinking more on the lines of a java Path / URI.

Copy link
Member

@julienledem julienledem Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the location at all? Is this for showing in error messages? Maybe just toString is enough?


/**
* Returns the total length of the file, in bytes.
* @throws IOException if the length cannot be determined
*/
long getLength() throws IOException;

/**
* Opens a new {@link SeekableInputStream} for the underlying
* data file.
* @throws IOException if the stream cannot be opened.
*/
SeekableInputStream newStream() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopDataSource;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.ParquetDataSource;

/**
* Internal implementation of the Parquet file reader as a block container
Expand Down Expand Up @@ -410,8 +412,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, Path
* @throws IOException if an error occurs while reading the file
*/
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
FileSystem fileSystem = file.getFileSystem(configuration);
return readFooter(configuration, fileSystem.getFileStatus(file), filter);
return readFooter(HadoopDataSource.fromPath(file, configuration), filter);
}

/**
Expand All @@ -431,12 +432,21 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
* @throws IOException if an error occurs while reading the file
*/
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
FileSystem fileSystem = file.getPath().getFileSystem(configuration);
SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
try {
return readFooter(file.getLen(), file.getPath().toString(), in, filter);
} finally {
in.close();
return readFooter(HadoopDataSource.fromStatus(file, configuration), filter);
}

/**
* Reads the meta data block in the footer of the file using provided input stream
* @param file a {@link ParquetDataSource} to read
* @param filter the filter to apply to row groups
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
public static final ParquetMetadata readFooter(
ParquetDataSource file, MetadataFilter filter) throws IOException {
try (SeekableInputStream in = file.newStream()) {
return readFooter(converter, file.getLength(), file.getLocation(),
in, filter);
}
}

Expand All @@ -449,7 +459,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
if (Log.DEBUG) {
LOG.debug("File length " + fileLen);
}
Expand Down Expand Up @@ -563,7 +573,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
this.f = HadoopStreams.wrap(fs.open(file));
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
this.fileMetaData = footer.getFileMetaData();
this.blocks = footer.getBlocks();
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
Expand Down Expand Up @@ -602,7 +612,7 @@ public ParquetMetadata getFooter() {
if (footer == null) {
try {
// don't read the row groups because this.blocks is always set
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
} catch (IOException e) {
throw new ParquetDecodingException("Unable to read file footer", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.hadoop.util;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.ParquetDataSource;
import java.io.IOException;

public class HadoopDataSource implements ParquetDataSource, Configurable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make it Configurable?
The conf is passed in the constructor and does not need to be settable or exposed.
even better once initialized, the conf is not used anymore. I would remove the conf field as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to be able to create a ParquetMetadataConverter using its constructor that takes a Configuration, but I think it's better to remove this because that option should be removed in the next release.


private final FileSystem fs;
private final FileStatus stat;
private Configuration conf;

public static HadoopDataSource fromPath(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
return new HadoopDataSource(fs, fs.getFileStatus(path), conf);
}

public static HadoopDataSource fromStatus(FileStatus stat, Configuration conf)
throws IOException {
FileSystem fs = stat.getPath().getFileSystem(conf);
return new HadoopDataSource(fs, stat, conf);
}

private HadoopDataSource(FileSystem fs, FileStatus stat, Configuration conf) {
this.conf = conf;
this.fs = fs;
this.stat = stat;
}

@Override
public String getLocation() {
return stat.getPath().toString();
}

@Override
public long getLength() {
return stat.getLen();
}

@Override
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(fs.open(stat.getPath()));
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

@Override
public Configuration getConf() {
return conf;
}
}