Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.parquet;

import com.facebook.presto.spi.type.Type;

import static java.util.Objects.requireNonNull;

public abstract class Field
{
private final Type type;
private final int repetitionLevel;
private final int definitionLevel;
private final boolean required;

protected Field(Type type, int repetitionLevel, int definitionLevel, boolean required)
{
this.type = requireNonNull(type, "type is required");
this.repetitionLevel = repetitionLevel;
this.definitionLevel = definitionLevel;
this.required = required;
}

public Type getType()
{
return type;
}

public int getRepetitionLevel()
{
return repetitionLevel;
}

public int getDefinitionLevel()
{
return definitionLevel;
}

public boolean isRequired()
{
return required;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.parquet;

import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class GroupField
extends Field
{
private final ImmutableList<Optional<Field>> children;

public GroupField(Type type, int repetitionLevel, int definitionLevel, boolean required, ImmutableList<Optional<Field>> children)
{
super(type, repetitionLevel, definitionLevel, required);
this.children = requireNonNull(children, "children is required");
}

public List<Optional<Field>> getChildren()
{
return children;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return ImmutableList.copyOf(children);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed children type to ImmutableList instead, to make it obvious that copying is not needed

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static com.facebook.presto.hive.HiveUtil.closeWithSuppression;
import static com.facebook.presto.hive.HiveUtil.getDecimalType;
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getDescriptors;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getParquetType;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.buildParquetPredicate;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.getParquetTupleDomain;
Expand Down Expand Up @@ -349,9 +350,10 @@ private ParquetRecordReader<FakeParquetRecord> createParquetRecordReader(
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
if (firstDataPage >= start && firstDataPage < start + length) {
if (predicatePushdownEnabled) {
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(fileSchema, requestedSchema, effectivePredicate);
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, fileSchema);
if (predicateMatches(parquetPredicate, block, dataSource, fileSchema, requestedSchema, parquetTupleDomain)) {
Map<List<String>, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
if (predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) {
offsets.add(block.getStartingPos());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.parquet.reader.ParquetReader;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException;
Expand All @@ -27,40 +26,35 @@
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.collect.ImmutableList;
import parquet.column.ColumnDescriptor;
import parquet.io.MessageColumnIO;
import parquet.schema.MessageType;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getDescriptor;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getFieldIndex;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getParquetType;
import static com.facebook.presto.spi.type.StandardTypes.ARRAY;
import static com.facebook.presto.spi.type.StandardTypes.MAP;
import static com.facebook.presto.spi.type.StandardTypes.ROW;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static parquet.io.ColumnIOConverter.constructField;

public class ParquetPageSource
implements ConnectorPageSource
{
private static final int MAX_VECTOR_LENGTH = 1024;

private final ParquetReader parquetReader;
private final ParquetDataSource dataSource;
private final MessageType fileSchema;
// for debugging heap dump
private final MessageType requestedSchema;
private final List<String> columnNames;
private final List<Type> types;
private final List<Optional<Field>> fields;

private final Block[] constantBlocks;
private final int[] hiveColumnIndexes;
Expand All @@ -70,28 +64,21 @@ public class ParquetPageSource
private long readTimeNanos;
private final boolean useParquetColumnNames;

private final AggregatedMemoryContext systemMemoryContext;

public ParquetPageSource(
ParquetReader parquetReader,
ParquetDataSource dataSource,
MessageType fileSchema,
MessageType requestedSchema,
MessageColumnIO messageColumnIO,
TypeManager typeManager,
Properties splitSchema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
TypeManager typeManager,
boolean useParquetColumnNames,
AggregatedMemoryContext systemMemoryContext)
boolean useParquetColumnNames)
{
requireNonNull(splitSchema, "splitSchema is null");
requireNonNull(columns, "columns is null");
requireNonNull(effectivePredicate, "effectivePredicate is null");

this.parquetReader = requireNonNull(parquetReader, "parquetReader is null");
this.dataSource = requireNonNull(dataSource, "dataSource is null");
this.fileSchema = requireNonNull(fileSchema, "fileSchema is null");
this.requestedSchema = requireNonNull(requestedSchema, "requestedSchema is null");
this.useParquetColumnNames = useParquetColumnNames;

int size = columns.size();
Expand All @@ -100,6 +87,7 @@ public ParquetPageSource(

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
for (int columnIndex = 0; columnIndex < size; columnIndex++) {
HiveColumnHandle column = columns.get(columnIndex);
checkState(column.getColumnType() == REGULAR, "column type must be regular");
Expand All @@ -109,22 +97,26 @@ public ParquetPageSource(

namesBuilder.add(name);
typesBuilder.add(type);

hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex();

if (getParquetType(column, fileSchema, useParquetColumnNames) == null) {
constantBlocks[columnIndex] = RunLengthEncodedBlock.create(type, null, MAX_VECTOR_LENGTH);
fieldsBuilder.add(Optional.empty());
}
else {
String columnName = useParquetColumnNames ? name : fileSchema.getFields().get(column.getHiveColumnIndex()).getName();
fieldsBuilder.add(constructField(type, messageColumnIO.getChild(columnName)));
}
}
types = typesBuilder.build();
fields = fieldsBuilder.build();
columnNames = namesBuilder.build();
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
}

@Override
public long getCompletedBytes()
{
return dataSource.getReadBytes();
return parquetReader.getDataSource().getReadBytes();
}

@Override
Expand All @@ -142,7 +134,7 @@ public boolean isFinished()
@Override
public long getSystemMemoryUsage()
{
return systemMemoryContext.getBytes();
return parquetReader.getSystemMemoryContext().getBytes();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind removing the systemMemoryContext field? It seems unrelated to the purpose of the PR. Same for the dataSource field.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetPageSource class has a lot of constructor parameters. To simplify it a little bit it is possible to get systemMemoryContext and dataSource from parquetReader.

}

@Override
Expand All @@ -168,39 +160,19 @@ public Page getNextPage()
}
else {
Type type = types.get(fieldId);
Optional<Field> field = fields.get(fieldId);
int fieldIndex;
if (useParquetColumnNames) {
fieldIndex = getFieldIndex(fileSchema, columnNames.get(fieldId));
}
else {
fieldIndex = hiveColumnIndexes[fieldId];
}

if (fieldIndex == -1) {
blocks[fieldId] = RunLengthEncodedBlock.create(type, null, batchSize);
continue;
}

String fieldName = fileSchema.getFields().get(fieldIndex).getName();
List<String> path = new ArrayList<>();
path.add(fieldName);
if (ROW.equals(type.getTypeSignature().getBase())) {
blocks[fieldId] = parquetReader.readStruct(type, path);
}
else if (MAP.equals(type.getTypeSignature().getBase())) {
blocks[fieldId] = parquetReader.readMap(type, path);
}
else if (ARRAY.equals(type.getTypeSignature().getBase())) {
blocks[fieldId] = parquetReader.readArray(type, path);
if (fieldIndex != -1 && field.isPresent()) {
blocks[fieldId] = new LazyBlock(batchSize, new ParquetBlockLoader(field.get()));
}
else {
Optional<RichColumnDescriptor> descriptor = getDescriptor(fileSchema, requestedSchema, path);
if (descriptor.isPresent()) {
blocks[fieldId] = new LazyBlock(batchSize, new ParquetBlockLoader(descriptor.get(), type));
}
else {
blocks[fieldId] = RunLengthEncodedBlock.create(type, null, batchSize);
}
blocks[fieldId] = RunLengthEncodedBlock.create(type, null, batchSize);
}
}
}
Expand All @@ -210,11 +182,7 @@ else if (ARRAY.equals(type.getTypeSignature().getBase())) {
closeWithSuppression(e);
throw e;
}
catch (ParquetCorruptionException e) {
closeWithSuppression(e);
throw new PrestoException(HIVE_BAD_DATA, e);
}
catch (IOException | RuntimeException e) {
catch (RuntimeException e) {
closeWithSuppression(e);
throw new PrestoException(HIVE_CURSOR_ERROR, e);
}
Expand Down Expand Up @@ -254,14 +222,12 @@ private final class ParquetBlockLoader
implements LazyBlockLoader<LazyBlock>
{
private final int expectedBatchId = batchId;
private final ColumnDescriptor columnDescriptor;
private final Type type;
private final Field field;
private boolean loaded;

public ParquetBlockLoader(ColumnDescriptor columnDescriptor, Type type)
public ParquetBlockLoader(Field field)
{
this.columnDescriptor = columnDescriptor;
this.type = requireNonNull(type, "type is null");
this.field = requireNonNull(field, "field is null");
}

@Override
Expand All @@ -274,7 +240,7 @@ public final void load(LazyBlock lazyBlock)
checkState(batchId == expectedBatchId);

try {
Block block = parquetReader.readPrimitive(columnDescriptor, type);
Block block = parquetReader.readBlock(field);
lazyBlock.setBlock(block);
}
catch (ParquetCorruptionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.FileMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.MessageColumnIO;
import parquet.schema.MessageType;

import javax.inject.Inject;
Expand All @@ -45,6 +46,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -57,6 +59,8 @@
import static com.facebook.presto.hive.HiveSessionProperties.isParquetPredicatePushdownEnabled;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getColumnIO;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getDescriptors;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getParquetType;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.buildParquetPredicate;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.getParquetTupleDomain;
Expand Down Expand Up @@ -176,33 +180,30 @@ public static ParquetPageSource createParquetPageSource(
}

if (predicatePushdownEnabled) {
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(fileSchema, requestedSchema, effectivePredicate);
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, fileMetaData.getSchema());
Map<List<String>, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
final ParquetDataSource finalDataSource = dataSource;
blocks = blocks.stream()
.filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, fileSchema, requestedSchema, parquetTupleDomain))
.filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain))
.collect(toList());
}

MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ParquetReader parquetReader = new ParquetReader(
fileSchema,
requestedSchema,
messageColumnIO,
blocks,
dataSource,
typeManager,
systemMemoryContext);

return new ParquetPageSource(
parquetReader,
dataSource,
fileSchema,
requestedSchema,
messageColumnIO,
typeManager,
schema,
columns,
effectivePredicate,
typeManager,
useParquetColumnNames,
systemMemoryContext);
useParquetColumnNames);
}
catch (Exception e) {
try {
Expand Down
Loading