Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class HiveClientConfig
private DataSize textMaxLineLength = new DataSize(100, MEGABYTE);

private boolean useParquetColumnNames;
private boolean failOnCorruptedParquetStatistics = true;

private boolean assumeCanonicalPartitionKeys;

Expand Down Expand Up @@ -903,6 +904,19 @@ public HiveClientConfig setUseParquetColumnNames(boolean useParquetColumnNames)
return this;
}

public boolean isFailOnCorruptedParquetStatistics()
{
return failOnCorruptedParquetStatistics;
}

@Config("hive.parquet.fail-on-corrupted-statistics")
@ConfigDescription("Fail when scanning Parquet files with corrupted statistics")
public HiveClientConfig setFailOnCorruptedParquetStatistics(boolean failOnCorruptedParquetStatistics)
{
this.failOnCorruptedParquetStatistics = failOnCorruptedParquetStatistics;
return this;
}

public boolean isOptimizeMismatchedBucketCount()
{
return optimizeMismatchedBucketCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public final class HiveSessionProperties
private static final String HIVE_STORAGE_FORMAT = "hive_storage_format";
private static final String RESPECT_TABLE_FORMAT = "respect_table_format";
private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names";
private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String MAX_SPLIT_SIZE = "max_split_size";
Expand Down Expand Up @@ -225,6 +226,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Experimental: Parquet: Access Parquet columns using names from the file",
hiveClientConfig.isUseParquetColumnNames(),
false),
booleanProperty(
PARQUET_FAIL_WITH_CORRUPTED_STATISTICS,
"Parquet: Fail when scanning Parquet files with corrupted statistics",
hiveClientConfig.isFailOnCorruptedParquetStatistics(),
false),
dataSizeSessionProperty(
PARQUET_WRITER_BLOCK_SIZE,
"Parquet: Writer block size",
Expand Down Expand Up @@ -409,6 +415,11 @@ public static boolean isUseParquetColumnNames(ConnectorSession session)
return session.getProperty(PARQUET_USE_COLUMN_NAME, Boolean.class);
}

public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession session)
{
return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class);
}

public static DataSize getParquetWriterBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.parquet.ParquetDataSource;
import com.facebook.presto.parquet.ParquetDataSourceId;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -27,24 +28,31 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.google.common.base.Strings.nullToEmpty;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class HdfsParquetDataSource
implements ParquetDataSource
{
private final String name;
private final ParquetDataSourceId id;
private final long size;
private final FSDataInputStream inputStream;
private long readBytes;
private final FileFormatDataSourceStats stats;

public HdfsParquetDataSource(Path path, long size, FSDataInputStream inputStream, FileFormatDataSourceStats stats)
public HdfsParquetDataSource(ParquetDataSourceId id, long size, FSDataInputStream inputStream, FileFormatDataSourceStats stats)
{
this.name = path.toString();
this.id = requireNonNull(id, "id is null");
this.size = size;
this.inputStream = inputStream;
this.stats = stats;
}

@Override
public ParquetDataSourceId getId()
{
return id;
}

@Override
public final long getReadBytes()
{
Expand Down Expand Up @@ -89,15 +97,15 @@ private void readInternal(long position, byte[] buffer, int bufferOffset, int bu
throw e;
}
catch (Exception e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Error reading from %s at position %s", name, position), e);
throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Error reading from %s at position %s", id, position), e);
}
}

public static HdfsParquetDataSource buildHdfsParquetDataSource(FileSystem fileSystem, Path path, long start, long length, long fileSize, FileFormatDataSourceStats stats)
{
try {
FSDataInputStream inputStream = fileSystem.open(path);
return new HdfsParquetDataSource(path, fileSize, inputStream, stats);
return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats);
}
catch (Exception e) {
if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") ||
Expand All @@ -110,6 +118,6 @@ public static HdfsParquetDataSource buildHdfsParquetDataSource(FileSystem fileSy

public static HdfsParquetDataSource buildHdfsParquetDataSource(FSDataInputStream inputStream, Path path, long fileSize, FileFormatDataSourceStats stats)
{
return new HdfsParquetDataSource(path, fileSize, inputStream, stats);
return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HivePageSourceFactory;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.ParquetDataSource;
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.predicate.Predicate;
Expand Down Expand Up @@ -48,7 +49,6 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -58,8 +58,10 @@
import java.util.Set;

import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static com.facebook.presto.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics;
import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
Expand Down Expand Up @@ -123,6 +125,7 @@ public Optional<? extends ConnectorPageSource> createPageSource(
schema,
columns,
isUseParquetColumnNames(session),
isFailOnCorruptedParquetStatistics(session),
typeManager,
effectivePredicate,
stats));
Expand All @@ -139,6 +142,7 @@ public static ParquetPageSource createParquetPageSource(
Properties schema,
List<HiveColumnHandle> columns,
boolean useParquetColumnNames,
boolean failOnCorruptedParquetStatistics,
TypeManager typeManager,
TupleDomain<HiveColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats)
Expand All @@ -162,25 +166,28 @@ public static ParquetPageSource createParquetPageSource(

MessageType requestedSchema = new MessageType(fileSchema.getName(), fields);

List<BlockMetaData> blocks = new ArrayList<>();
ImmutableList.Builder<BlockMetaData> footerBlocks = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
if (firstDataPage >= start && firstDataPage < start + length) {
blocks.add(block);
footerBlocks.add(block);
}
}

Map<List<String>, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath);
final ParquetDataSource finalDataSource = dataSource;
blocks = blocks.stream()
.filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain))
.collect(toList());
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
for (BlockMetaData block : footerBlocks.build()) {
if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, failOnCorruptedParquetStatistics)) {
blocks.add(block);
}
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks,
blocks.build(),
dataSource,
systemMemoryContext);

Expand All @@ -205,6 +212,9 @@ public static ParquetPageSource createParquetPageSource(
if (e instanceof PrestoException) {
throw (PrestoException) e;
}
if (e instanceof ParquetCorruptionException) {
throw new PrestoException(HIVE_BAD_DATA, e);
}
if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") ||
e instanceof FileNotFoundException) {
throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void testDefaults()
.setWriteValidationThreads(16)
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
.setUseParquetColumnNames(false)
.setFailOnCorruptedParquetStatistics(true)
.setUseOrcColumnNames(false)
.setAssumeCanonicalPartitionKeys(false)
.setOrcBloomFiltersEnabled(false)
Expand Down Expand Up @@ -160,6 +161,7 @@ public void testExplicitPropertyMappings()
.put("hive.assume-canonical-partition-keys", "true")
.put("hive.text.max-line-length", "13MB")
.put("hive.parquet.use-column-names", "true")
.put("hive.parquet.fail-on-corrupted-statistics", "false")
.put("hive.orc.use-column-names", "true")
.put("hive.orc.bloom-filters.enabled", "true")
.put("hive.orc.default-bloom-filter-fpp", "0.96")
Expand Down Expand Up @@ -236,6 +238,7 @@ public void testExplicitPropertyMappings()
.setS3FileSystemType(S3FileSystemType.EMRFS)
.setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE))
.setUseParquetColumnNames(true)
.setFailOnCorruptedParquetStatistics(false)
.setUseOrcColumnNames(true)
.setAssumeCanonicalPartitionKeys(true)
.setOrcBloomFiltersEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
public interface ParquetDataSource
extends Closeable
{
ParquetDataSourceId getId();

long getReadBytes();

long getSize();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.parquet;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

public final class ParquetDataSourceId
{
private final String id;

public ParquetDataSourceId(String id)
{
this.id = requireNonNull(id, "id is null");
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ParquetDataSourceId that = (ParquetDataSourceId) o;
return Objects.equals(id, that.id);
}

@Override
public int hashCode()
{
return Objects.hash(id);
}

@Override
public String toString()
{
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.parquet.predicate;

import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.ParquetDataSourceId;
import parquet.column.ColumnDescriptor;
import parquet.column.statistics.Statistics;

Expand All @@ -23,7 +25,8 @@ public interface Predicate
Predicate TRUE = new Predicate()
{
@Override
public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics)
public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id, boolean failOnCorruptedParquetStatistics)
throws ParquetCorruptionException
{
return true;
}
Expand All @@ -41,8 +44,11 @@ public boolean matches(Map<ColumnDescriptor, DictionaryDescriptor> dictionaries)
* @param numberOfRows the number of rows in the segment; this can be used with
* Statistics to determine if a column is only null
* @param statistics column statistics
* @param id Parquet file name
* @param failOnCorruptedParquetStatistics whether to fail query when scanning a Parquet file with corrupted statistics
*/
boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics);
boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id, boolean failOnCorruptedParquetStatistics)
throws ParquetCorruptionException;

/**
* Should the Parquet Reader process a file section with the specified dictionary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.parquet.predicate;

import com.facebook.presto.parquet.DictionaryPage;
import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.ParquetDataSource;
import com.facebook.presto.parquet.ParquetEncoding;
import com.facebook.presto.parquet.RichColumnDescriptor;
Expand Down Expand Up @@ -84,10 +85,11 @@ public static Predicate buildPredicate(MessageType requestedSchema, TupleDomain<
return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build());
}

public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, boolean failOnCorruptedParquetStatistics)
throws ParquetCorruptionException
{
Map<ColumnDescriptor, Statistics<?>> columnStatistics = getStatistics(block, descriptorsByPath);
if (!parquetPredicate.matches(block.getRowCount(), columnStatistics)) {
if (!parquetPredicate.matches(block.getRowCount(), columnStatistics, dataSource.getId(), failOnCorruptedParquetStatistics)) {
return false;
}

Expand Down
Loading