Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.cache.MetadataReader;
import com.facebook.presto.parquet.predicate.Predicate;
import com.facebook.presto.parquet.reader.ColumnIndexFilterUtils;
import com.facebook.presto.parquet.reader.ParquetReader;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
Expand All @@ -55,6 +56,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.GroupType;
Expand All @@ -64,6 +66,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -164,7 +167,8 @@ public ConnectorPageSource createPageSource(
isParquetBatchReaderVerificationEnabled(session),
typeManager,
deltaTableLayoutHandle.getPredicate(),
fileFormatDataSourceStats);
fileFormatDataSourceStats,
false);

return new DeltaPageSource(
deltaColumnHandles,
Expand Down Expand Up @@ -207,7 +211,8 @@ private static ConnectorPageSource createParquetPageSource(
boolean verificationEnabled,
TypeManager typeManager,
TupleDomain<DeltaColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
boolean columnIndexFilterEnabled)
{
AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext();

Expand Down Expand Up @@ -243,9 +248,12 @@ private static ConnectorPageSource createParquetPageSource(
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
final ParquetDataSource finalDataSource = dataSource;
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
List<ColumnIndexStore> blockIndexStores = new ArrayList<>();
for (BlockMetaData block : footerBlocks.build()) {
if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain)) {
Optional<ColumnIndexStore> columnIndexStore = ColumnIndexFilterUtils.getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled);
if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) {
blocks.add(block);
blockIndexStores.add(columnIndexStore.orElse(null));
}
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
Expand All @@ -256,7 +264,10 @@ private static ConnectorPageSource createParquetPageSource(
systemMemoryContext,
maxReadBlockSize,
batchReaderEnabled,
verificationEnabled);
verificationEnabled,
parquetPredicate,
blockIndexStores,
columnIndexFilterEnabled);

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ public class HiveClientConfig

private boolean userDefinedTypeEncodingEnabled;

private boolean columnIndexFilterEnabled;

@Min(0)
public int getMaxInitialSplits()
{
Expand Down Expand Up @@ -1704,6 +1706,19 @@ public int getMaterializedViewMissingPartitionsThreshold()
return this.materializedViewMissingPartitionsThreshold;
}

@Config("hive.parquet-column-index-filter-enabled")
@ConfigDescription("enable using parquet column index filter")
public HiveClientConfig setReadColumnIndexFilter(boolean columnIndexFilterEnabled)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setColumnIndexFilterEnabled

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I change to 'setColumnIndexFilterEnabled' it always causes some odd test failures that seem unrelated. I changed it to setColumnIndexFilter

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems even we change to setColumnIndexFilterEnabled still doesn't work ether. Just keep the original name for now.

{
this.columnIndexFilterEnabled = columnIndexFilterEnabled;
return this;
}

public boolean getReadColumnIndexFilter()
{
return this.columnIndexFilterEnabled;
}

@Config("hive.size-based-split-weights-enabled")
public HiveClientConfig setSizeBasedSplitWeightsEnabled(boolean sizeBasedSplitWeightsEnabled)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public final class HiveSessionProperties
public static final String VERBOSE_RUNTIME_STATS_ENABLED = "verbose_runtime_stats_enabled";
private static final String DWRF_WRITER_STRIPE_CACHE_ENABLED = "dwrf_writer_stripe_cache_enabled";
private static final String DWRF_WRITER_STRIPE_CACHE_SIZE = "dwrf_writer_stripe_cache_size";
public static final String USE_COLUMN_INDEX_FILTER = "use_column_index_filter";
public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled";
public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
private static final String USE_RECORD_PAGE_SOURCE_FOR_CUSTOM_SPLIT = "use_record_page_source_for_custom_split";
Expand Down Expand Up @@ -628,6 +629,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Maximum size of DWRF stripe cache to be held in memory",
orcFileWriterConfig.getDwrfStripeCacheMaxSize(),
false),
booleanProperty(
USE_COLUMN_INDEX_FILTER,
"should use column index statistics filtering",
hiveClientConfig.getReadColumnIndexFilter(),
false),
booleanProperty(
SIZE_BASED_SPLIT_WEIGHTS_ENABLED,
"Enable estimating split weights based on size in bytes",
Expand Down Expand Up @@ -1118,6 +1124,11 @@ public static DataSize getDwrfWriterStripeCacheeMaxSize(ConnectorSession session
return session.getProperty(DWRF_WRITER_STRIPE_CACHE_SIZE, DataSize.class);
}

public static boolean columnIndexFilterEnabled(ConnectorSession session)
{
return session.getProperty(USE_COLUMN_INDEX_FILTER, Boolean.class);
}

public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session)
{
return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Optional;

import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
Expand Down Expand Up @@ -67,6 +74,30 @@ protected void readInternal(long position, byte[] buffer, int bufferOffset, int
}
}

@Override
public Optional<ColumnIndex> readColumnIndex(ColumnChunkMetaData column)
throws IOException
{
IndexReference indexRef = column.getColumnIndexReference();
if (indexRef == null) {
return Optional.empty();
}
inputStream.seek(indexRef.getOffset());
return Optional.of(ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), Util.readColumnIndex(inputStream)));
}

@Override
public Optional<OffsetIndex> readOffsetIndex(ColumnChunkMetaData column)
throws IOException
{
IndexReference indexRef = column.getOffsetIndexReference();
if (indexRef == null) {
return Optional.empty();
}
inputStream.seek(indexRef.getOffset());
return Optional.of(ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(inputStream)));
}

public static HdfsParquetDataSource buildHdfsParquetDataSource(FileSystem fileSystem, Path path, long start, long length, FileFormatDataSourceStats stats)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.cache.ParquetMetadataSource;
import com.facebook.presto.parquet.predicate.Predicate;
import com.facebook.presto.parquet.reader.ColumnIndexFilterUtils;
import com.facebook.presto.parquet.reader.ParquetReader;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
Expand All @@ -52,6 +53,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.GroupType;
Expand All @@ -63,6 +65,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -94,6 +97,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static com.facebook.presto.hive.HiveSessionProperties.columnIndexFilterEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.getParquetMaxReadBlockSize;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetBatchReaderVerificationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetBatchReadsEnabled;
Expand Down Expand Up @@ -186,7 +190,8 @@ public Optional<? extends ConnectorPageSource> createPageSource(
effectivePredicate,
stats,
hiveFileContext,
parquetMetadataSource));
parquetMetadataSource,
columnIndexFilterEnabled(session)));
}

public static ConnectorPageSource createParquetPageSource(
Expand All @@ -208,7 +213,8 @@ public static ConnectorPageSource createParquetPageSource(
TupleDomain<HiveColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats,
HiveFileContext hiveFileContext,
ParquetMetadataSource parquetMetadataSource)
ParquetMetadataSource parquetMetadataSource,
boolean columnIndexFilterEnabled)
{
AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext();

Expand Down Expand Up @@ -248,9 +254,12 @@ public static ConnectorPageSource createParquetPageSource(
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
final ParquetDataSource finalDataSource = dataSource;
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
List<ColumnIndexStore> blockIndexStores = new ArrayList<>();
for (BlockMetaData block : footerBlocks.build()) {
if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain)) {
Optional<ColumnIndexStore> columnIndexStore = ColumnIndexFilterUtils.getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled);
if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) {
blocks.add(block);
blockIndexStores.add(columnIndexStore.orElse(null));
hiveFileContext.incrementCounter("parquet.blocksRead", 1);
hiveFileContext.incrementCounter("parquet.rowsRead", block.getRowCount());
hiveFileContext.incrementCounter("parquet.totalBytesRead", block.getTotalByteSize());
Expand All @@ -269,7 +278,10 @@ public static ConnectorPageSource createParquetPageSource(
systemMemoryContext,
maxReadBlockSize,
batchReaderEnabled,
verificationEnabled);
verificationEnabled,
parquetPredicate,
blockIndexStores,
columnIndexFilterEnabled);

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void testDefaults()
.setPartitionLeaseDuration(new Duration(0, TimeUnit.SECONDS))
.setMaterializedViewMissingPartitionsThreshold(100)
.setLooseMemoryAccountingEnabled(false)
.setReadColumnIndexFilter(false)
.setSizeBasedSplitWeightsEnabled(true)
.setMinimumAssignedSplitWeight(0.05)
.setUserDefinedTypeEncodingEnabled(false)
Expand Down Expand Up @@ -278,6 +279,7 @@ public void testExplicitPropertyMappings()
.put("hive.loose-memory-accounting-enabled", "true")
.put("hive.verbose-runtime-stats-enabled", "true")
.put("hive.materialized-view-missing-partitions-threshold", "50")
.put("hive.parquet-column-index-filter-enabled", "true")
.put("hive.size-based-split-weights-enabled", "false")
.put("hive.user-defined-type-encoding-enabled", "true")
.put("hive.minimum-assigned-split-weight", "1.0")
Expand Down Expand Up @@ -396,6 +398,7 @@ public void testExplicitPropertyMappings()
.setPartitionLeaseDuration(new Duration(4, TimeUnit.HOURS))
.setMaterializedViewMissingPartitionsThreshold(50)
.setLooseMemoryAccountingEnabled(true)
.setReadColumnIndexFilter(true)
.setSizeBasedSplitWeightsEnabled(false)
.setMinimumAssignedSplitWeight(1.0)
.setUserDefinedTypeEncodingEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ ParquetPageSource createParquetPageSource()
fields.add(ColumnIOConverter.constructField(getTypeFromTypeSignature(), messageColumnIO.getChild(i)));
}

ParquetReader parquetReader = new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), batchReadEnabled, enableVerification);

ParquetReader parquetReader = new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), batchReadEnabled, enableVerification, null, null, false);
return new ParquetPageSource(parquetReader, Collections.nCopies(channelCount, type), fields, columnNames, new RuntimeStats());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.cache.MetadataReader;
import com.facebook.presto.parquet.predicate.Predicate;
import com.facebook.presto.parquet.reader.ColumnIndexFilterUtils;
import com.facebook.presto.parquet.reader.ParquetReader;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
Expand All @@ -78,6 +79,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;

Expand Down Expand Up @@ -231,7 +233,8 @@ private ConnectorPageSource createDataPageSource(
isParquetBatchReadsEnabled(session),
isParquetBatchReaderVerificationEnabled(session),
predicate,
fileFormatDataSourceStats);
fileFormatDataSourceStats,
false);
case ORC:
OrcReaderOptions readerOptions = new OrcReaderOptions(
getOrcMaxMergeDistance(session),
Expand Down Expand Up @@ -281,7 +284,8 @@ private static ConnectorPageSource createParquetPageSource(
boolean batchReaderEnabled,
boolean verificationEnabled,
TupleDomain<IcebergColumnHandle> effectivePredicate,
FileFormatDataSourceStats fileFormatDataSourceStats)
FileFormatDataSourceStats fileFormatDataSourceStats,
boolean columnIndexFilterEnabled)
{
AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext();

Expand Down Expand Up @@ -319,13 +323,16 @@ private static ConnectorPageSource createParquetPageSource(
Map<List<String>, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);

final ParquetDataSource finalDataSource = dataSource;
List<BlockMetaData> blocks = new ArrayList<>();
List<ColumnIndexStore> blockIndexStores = new ArrayList<>();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
Optional<ColumnIndexStore> columnIndexStore = ColumnIndexFilterUtils.getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled);
if ((firstDataPage >= start) && (firstDataPage < (start + length)) &&
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) {
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) {
blocks.add(block);
blockIndexStores.add(columnIndexStore.orElse(null));
}
}

Expand All @@ -337,7 +344,10 @@ private static ConnectorPageSource createParquetPageSource(
systemMemoryContext,
maxReadBlockSize,
batchReaderEnabled,
verificationEnabled);
verificationEnabled,
parquetPredicate,
blockIndexStores,
columnIndexFilterEnabled);

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> prestoTypes = ImmutableList.builder();
Expand Down
7 changes: 7 additions & 0 deletions presto-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

import com.facebook.presto.parquet.reader.ColumnChunk;
import com.facebook.presto.parquet.reader.PageReader;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;

public interface ColumnReader
{
boolean isInitialized();

void init(PageReader pageReader, Field field);
void init(PageReader pageReader, Field field, RowRanges rowRanges);

void prepareNextRead(int batchSize);

Expand Down
Loading