Skip to content

Commit

Permalink
PARQUET-84: Avoid reading rowgroup metadata in memory on the client s…
Browse files Browse the repository at this point in the history
…ide.

This will improve reading big datasets with a large schema (thousands of columns)
Instead rowgroup metadata can be read in the tasks where each tasks reads only the metadata of the file it's reading

Author: julien <julien@twitter.com>

Closes apache#45 from julienledem/skip_reading_row_groups and squashes the following commits:

ccdd08c [julien] fix parquet-hive
24a2050 [julien] Merge branch 'master' into skip_reading_row_groups
3d7e35a [julien] adress review feedback
5b6bd1b [julien] more tests
323d254 [julien] sdd unit tests
f599259 [julien] review feedback
fb11f02 [julien] fix backward compatibility check
2c20b46 [julien] cleanup readFooters methods
3da37d8 [julien] fix read summary
ab95a45 [julien] cleanup
4d16df3 [julien] implement task side metadata
9bb8059 [julien] first stab at integrating skipping row groups
  • Loading branch information
julienledem authored and tongjiechen committed Oct 8, 2014
1 parent faf4239 commit 29cca34
Showing 22 changed files with 1,361 additions and 724 deletions.
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import parquet.Log;
import parquet.common.schema.ColumnPath;
import parquet.format.ColumnChunk;
import parquet.format.ColumnMetaData;
import parquet.format.ConvertedType;
import parquet.format.DataPageHeader;
import parquet.format.DictionaryPageHeader;
@@ -58,7 +59,7 @@
import parquet.schema.Type.Repetition;
import parquet.schema.TypeVisitor;
import parquet.schema.Types;

import static java.lang.Math.min;
import static parquet.format.Util.readFileMetaData;
import static parquet.format.Util.writePageHeader;

@@ -340,8 +341,124 @@ private void addKeyValue(FileMetaData fileMetaData, String key, String value) {
fileMetaData.addToKey_value_metadata(keyValue);
}

private static interface MetadataFilterVisitor<T, E extends Throwable> {
T visit(NoFilter filter) throws E;
T visit(SkipMetadataFilter filter) throws E;
T visit(RangeMetadataFilter filter) throws E;
}

public abstract static class MetadataFilter {
private MetadataFilter() {}
abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
}
public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
/**
* [ startOffset, endOffset )
* @param startOffset
* @param endOffset
* @return the filter
*/
public static final MetadataFilter range(long startOffset, long endOffset) {
return new RangeMetadataFilter(startOffset, endOffset);
}
private static final class NoFilter extends MetadataFilter {
private NoFilter() {}
@Override
<T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
return visitor.visit(this);
}
@Override
public String toString() {
return "NO_FILTER";
}
}
private static final class SkipMetadataFilter extends MetadataFilter {
private SkipMetadataFilter() {}
@Override
<T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
return visitor.visit(this);
}
@Override
public String toString() {
return "SKIP_ROW_GROUPS";
}
}
/**
* [ startOffset, endOffset )
* @author Julien Le Dem
*/
static final class RangeMetadataFilter extends MetadataFilter {
final long startOffset;
final long endOffset;
RangeMetadataFilter(long startOffset, long endOffset) {
super();
this.startOffset = startOffset;
this.endOffset = endOffset;
}
@Override
<T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
return visitor.visit(this);
}
boolean contains(long offset) {
return offset >= this.startOffset && offset < this.endOffset;
}
@Override
public String toString() {
return "range(s:" + startOffset + ", e:" + endOffset + ")";
}
}

@Deprecated
public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
FileMetaData fileMetaData = readFileMetaData(from);
return readParquetMetadata(from, NO_FILTER);
}

static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
long totalSize = 0;
long startIndex = getOffset(rowGroup.getColumns().get(0));
for (ColumnChunk col : rowGroup.getColumns()) {
totalSize += col.getMeta_data().getTotal_compressed_size();
}
long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
newRowGroups.add(rowGroup);
}
}
metaData.setRow_groups(newRowGroups);
return metaData;
}

static long getOffset(RowGroup rowGroup) {
return getOffset(rowGroup.getColumns().get(0));
}
static long getOffset(ColumnChunk columnChunk) {
ColumnMetaData md = columnChunk.getMeta_data();
long offset = md.getData_page_offset();
if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
offset = md.getDictionary_page_offset();
}
return offset;
}

public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
return readFileMetaData(from);
}
@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
return readFileMetaData(from, true);
}
@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaData(readFileMetaData(from), filter);
}
});
if (Log.DEBUG) LOG.debug(fileMetaData);
ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
@@ -352,37 +469,39 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
for (RowGroup rowGroup : row_groups) {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
List<ColumnChunk> columns = rowGroup.getColumns();
String filePath = columns.get(0).getFile_path();
for (ColumnChunk columnChunk : columns) {
if ((filePath == null && columnChunk.getFile_path() != null)
|| (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
if (row_groups != null) {
for (RowGroup rowGroup : row_groups) {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
List<ColumnChunk> columns = rowGroup.getColumns();
String filePath = columns.get(0).getFile_path();
for (ColumnChunk columnChunk : columns) {
if ((filePath == null && columnChunk.getFile_path() != null)
|| (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
}
parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
ColumnPath path = getPath(metaData);
ColumnChunkMetaData column = ColumnChunkMetaData.get(
path,
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
CompressionCodecName.fromParquet(metaData.codec),
fromFormatEncodings(metaData.encodings),
fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
// TODO
// index_page_offset
// key_value_metadata
blockMetaData.addColumn(column);
}
parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
ColumnPath path = getPath(metaData);
ColumnChunkMetaData column = ColumnChunkMetaData.get(
path,
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
CompressionCodecName.fromParquet(metaData.codec),
fromFormatEncodings(metaData.encodings),
fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
// TODO
// index_page_offset
// key_value_metadata
blockMetaData.addColumn(column);
blockMetaData.setPath(filePath);
blocks.add(blockMetaData);
}
blockMetaData.setPath(filePath);
blocks.add(blockMetaData);
}
Map<String, String> keyValueMetaData = new HashMap<String, String>();
List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();
Loading

0 comments on commit 29cca34

Please sign in to comment.