diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index a462749fcb83f..a1943d82f0051 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -101,6 +101,7 @@ public class HiveClientConfig private DataSize textMaxLineLength = new DataSize(100, MEGABYTE); private boolean useParquetColumnNames; + private boolean failOnCorruptedParquetStatistics = true; private boolean assumeCanonicalPartitionKeys; @@ -903,6 +904,19 @@ public HiveClientConfig setUseParquetColumnNames(boolean useParquetColumnNames) return this; } + public boolean isFailOnCorruptedParquetStatistics() + { + return failOnCorruptedParquetStatistics; + } + + @Config("hive.parquet.fail-on-corrupted-statistics") + @ConfigDescription("Fail when scanning Parquet files with corrupted statistics") + public HiveClientConfig setFailOnCorruptedParquetStatistics(boolean failOnCorruptedParquetStatistics) + { + this.failOnCorruptedParquetStatistics = failOnCorruptedParquetStatistics; + return this; + } + public boolean isOptimizeMismatchedBucketCount() { return optimizeMismatchedBucketCount; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index d45641fb5a24f..a217f50b2296a 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -62,6 +62,7 @@ public final class HiveSessionProperties private static final String HIVE_STORAGE_FORMAT = "hive_storage_format"; private static final String RESPECT_TABLE_FORMAT = "respect_table_format"; private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names"; + private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String MAX_SPLIT_SIZE = "max_split_size"; @@ -225,6 +226,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon "Experimental: Parquet: Access Parquet columns using names from the file", hiveClientConfig.isUseParquetColumnNames(), false), + booleanProperty( + PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, + "Parquet: Fail when scanning Parquet files with corrupted statistics", + hiveClientConfig.isFailOnCorruptedParquetStatistics(), + false), dataSizeSessionProperty( PARQUET_WRITER_BLOCK_SIZE, "Parquet: Writer block size", @@ -409,6 +415,11 @@ public static boolean isUseParquetColumnNames(ConnectorSession session) return session.getProperty(PARQUET_USE_COLUMN_NAME, Boolean.class); } + public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession session) + { + return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class); + } + public static DataSize getParquetWriterBlockSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java index 04be3dd10922b..bb294534f6b73 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java @@ -15,6 +15,7 @@ import com.facebook.presto.hive.FileFormatDataSourceStats; import com.facebook.presto.parquet.ParquetDataSource; +import com.facebook.presto.parquet.ParquetDataSourceId; import com.facebook.presto.spi.PrestoException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -27,24 +28,31 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static com.google.common.base.Strings.nullToEmpty; import static java.lang.String.format; +import static java.util.Objects.requireNonNull; public class HdfsParquetDataSource implements ParquetDataSource { - private final String name; + private final ParquetDataSourceId id; private final long size; private final FSDataInputStream inputStream; private long readBytes; private final FileFormatDataSourceStats stats; - public HdfsParquetDataSource(Path path, long size, FSDataInputStream inputStream, FileFormatDataSourceStats stats) + public HdfsParquetDataSource(ParquetDataSourceId id, long size, FSDataInputStream inputStream, FileFormatDataSourceStats stats) { - this.name = path.toString(); + this.id = requireNonNull(id, "id is null"); this.size = size; this.inputStream = inputStream; this.stats = stats; } + @Override + public ParquetDataSourceId getId() + { + return id; + } + @Override public final long getReadBytes() { @@ -89,7 +97,7 @@ private void readInternal(long position, byte[] buffer, int bufferOffset, int bu throw e; } catch (Exception e) { - throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Error reading from %s at position %s", name, position), e); + throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Error reading from %s at position %s", id, position), e); } } @@ -97,7 +105,7 @@ public static HdfsParquetDataSource buildHdfsParquetDataSource(FileSystem fileSy { try { FSDataInputStream inputStream = fileSystem.open(path); - return new HdfsParquetDataSource(path, fileSize, inputStream, stats); + return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats); } catch (Exception e) { if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || @@ -110,6 +118,6 @@ public static HdfsParquetDataSource buildHdfsParquetDataSource(FileSystem fileSy public static HdfsParquetDataSource buildHdfsParquetDataSource(FSDataInputStream inputStream, Path path, long fileSize, FileFormatDataSourceStats stats) { - return new HdfsParquetDataSource(path, fileSize, inputStream, stats); + return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats); } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 479221ce449b2..d84117f27c4c9 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -18,6 +18,7 @@ import com.facebook.presto.hive.HiveColumnHandle; import com.facebook.presto.hive.HivePageSourceFactory; import com.facebook.presto.memory.context.AggregatedMemoryContext; +import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; import com.facebook.presto.parquet.RichColumnDescriptor; import com.facebook.presto.parquet.predicate.Predicate; @@ -48,7 +49,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -58,8 +58,10 @@ import java.util.Set; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; +import static com.facebook.presto.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics; import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames; import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName; import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; @@ -123,6 +125,7 @@ public Optional createPageSource( schema, columns, isUseParquetColumnNames(session), + isFailOnCorruptedParquetStatistics(session), typeManager, effectivePredicate, stats)); @@ -139,6 +142,7 @@ public static ParquetPageSource createParquetPageSource( Properties schema, List columns, boolean useParquetColumnNames, + boolean failOnCorruptedParquetStatistics, TypeManager typeManager, TupleDomain effectivePredicate, FileFormatDataSourceStats stats) @@ -162,11 +166,11 @@ public static ParquetPageSource createParquetPageSource( MessageType requestedSchema = new MessageType(fileSchema.getName(), fields); - List blocks = new ArrayList<>(); + ImmutableList.Builder footerBlocks = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); if (firstDataPage >= start && firstDataPage < start + length) { - blocks.add(block); + footerBlocks.add(block); } } @@ -174,13 +178,16 @@ public static ParquetPageSource createParquetPageSource( TupleDomain parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate); Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath); final ParquetDataSource finalDataSource = dataSource; - blocks = blocks.stream() - .filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain)) - .collect(toList()); + ImmutableList.Builder blocks = ImmutableList.builder(); + for (BlockMetaData block : footerBlocks.build()) { + if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, failOnCorruptedParquetStatistics)) { + blocks.add(block); + } + } MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema); ParquetReader parquetReader = new ParquetReader( messageColumnIO, - blocks, + blocks.build(), dataSource, systemMemoryContext); @@ -205,6 +212,9 @@ public static ParquetPageSource createParquetPageSource( if (e instanceof PrestoException) { throw (PrestoException) e; } + if (e instanceof ParquetCorruptionException) { + throw new PrestoException(HIVE_BAD_DATA, e); + } if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || e instanceof FileNotFoundException) { throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index dcea7bd542b0a..4efded22907ef 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -80,6 +80,7 @@ public void testDefaults() .setWriteValidationThreads(16) .setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE)) .setUseParquetColumnNames(false) + .setFailOnCorruptedParquetStatistics(true) .setUseOrcColumnNames(false) .setAssumeCanonicalPartitionKeys(false) .setOrcBloomFiltersEnabled(false) @@ -160,6 +161,7 @@ public void testExplicitPropertyMappings() .put("hive.assume-canonical-partition-keys", "true") .put("hive.text.max-line-length", "13MB") .put("hive.parquet.use-column-names", "true") + .put("hive.parquet.fail-on-corrupted-statistics", "false") .put("hive.orc.use-column-names", "true") .put("hive.orc.bloom-filters.enabled", "true") .put("hive.orc.default-bloom-filter-fpp", "0.96") @@ -236,6 +238,7 @@ public void testExplicitPropertyMappings() .setS3FileSystemType(S3FileSystemType.EMRFS) .setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE)) .setUseParquetColumnNames(true) + .setFailOnCorruptedParquetStatistics(false) .setUseOrcColumnNames(true) .setAssumeCanonicalPartitionKeys(true) .setOrcBloomFiltersEnabled(true) diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetDataSource.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetDataSource.java index 43458a6661997..609c7b0bf1a12 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetDataSource.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetDataSource.java @@ -19,6 +19,8 @@ public interface ParquetDataSource extends Closeable { + ParquetDataSourceId getId(); + long getReadBytes(); long getSize(); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetDataSourceId.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetDataSourceId.java new file mode 100644 index 0000000000000..59bba0a598767 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetDataSourceId.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public final class ParquetDataSourceId +{ + private final String id; + + public ParquetDataSourceId(String id) + { + this.id = requireNonNull(id, "id is null"); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ParquetDataSourceId that = (ParquetDataSourceId) o; + return Objects.equals(id, that.id); + } + + @Override + public int hashCode() + { + return Objects.hash(id); + } + + @Override + public String toString() + { + return id; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/Predicate.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/Predicate.java index 611f71a587356..e92a87c7a9933 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/Predicate.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/Predicate.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.parquet.predicate; +import com.facebook.presto.parquet.ParquetCorruptionException; +import com.facebook.presto.parquet.ParquetDataSourceId; import parquet.column.ColumnDescriptor; import parquet.column.statistics.Statistics; @@ -23,7 +25,8 @@ public interface Predicate Predicate TRUE = new Predicate() { @Override - public boolean matches(long numberOfRows, Map> statistics) + public boolean matches(long numberOfRows, Map> statistics, ParquetDataSourceId id, boolean failOnCorruptedParquetStatistics) + throws ParquetCorruptionException { return true; } @@ -41,8 +44,11 @@ public boolean matches(Map dictionaries) * @param numberOfRows the number of rows in the segment; this can be used with * Statistics to determine if a column is only null * @param statistics column statistics + * @param id Parquet file name + * @param failOnCorruptedParquetStatistics whether to fail query when scanning a Parquet file with corrupted statistics */ - boolean matches(long numberOfRows, Map> statistics); + boolean matches(long numberOfRows, Map> statistics, ParquetDataSourceId id, boolean failOnCorruptedParquetStatistics) + throws ParquetCorruptionException; /** * Should the Parquet Reader process a file section with the specified dictionary. diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java index 6816d15ac19d1..b4b85c2079de1 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java @@ -14,6 +14,7 @@ package com.facebook.presto.parquet.predicate; import com.facebook.presto.parquet.DictionaryPage; +import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; import com.facebook.presto.parquet.ParquetEncoding; import com.facebook.presto.parquet.RichColumnDescriptor; @@ -84,10 +85,11 @@ public static Predicate buildPredicate(MessageType requestedSchema, TupleDomain< return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build()); } - public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map, RichColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain) + public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map, RichColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain, boolean failOnCorruptedParquetStatistics) + throws ParquetCorruptionException { Map> columnStatistics = getStatistics(block, descriptorsByPath); - if (!parquetPredicate.matches(block.getRowCount(), columnStatistics)) { + if (!parquetPredicate.matches(block.getRowCount(), columnStatistics, dataSource.getId(), failOnCorruptedParquetStatistics)) { return false; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/TupleDomainParquetPredicate.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/TupleDomainParquetPredicate.java index 500844f96676f..b1d24247eab6d 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/TupleDomainParquetPredicate.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/TupleDomainParquetPredicate.java @@ -14,6 +14,8 @@ package com.facebook.presto.parquet.predicate; import com.facebook.presto.parquet.DictionaryPage; +import com.facebook.presto.parquet.ParquetCorruptionException; +import com.facebook.presto.parquet.ParquetDataSourceId; import com.facebook.presto.parquet.RichColumnDescriptor; import com.facebook.presto.parquet.dictionary.Dictionary; import com.facebook.presto.spi.predicate.Domain; @@ -54,6 +56,7 @@ import static com.facebook.presto.spi.type.TinyintType.TINYINT; import static com.facebook.presto.spi.type.Varchars.isVarcharType; import static java.lang.Float.floatToRawIntBits; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class TupleDomainParquetPredicate @@ -69,7 +72,8 @@ public TupleDomainParquetPredicate(TupleDomain effectivePredic } @Override - public boolean matches(long numberOfRows, Map> statistics) + public boolean matches(long numberOfRows, Map> statistics, ParquetDataSourceId id, boolean failOnCorruptedParquetStatistics) + throws ParquetCorruptionException { if (numberOfRows == 0) { return false; @@ -86,7 +90,7 @@ public boolean matches(long numberOfRows, Map> s domain = Domain.all(type); } else { - domain = getDomain(type, numberOfRows, columnStatistics); + domain = getDomain(type, numberOfRows, columnStatistics, id, column.toString(), failOnCorruptedParquetStatistics); } domains.put(column, domain); } @@ -113,7 +117,8 @@ public boolean matches(Map dictionaries) } @VisibleForTesting - public static Domain getDomain(Type type, long rowCount, Statistics statistics) + public static Domain getDomain(Type type, long rowCount, Statistics statistics, ParquetDataSourceId id, String column, boolean failOnCorruptedParquetStatistics) + throws ParquetCorruptionException { if (statistics == null || statistics.isEmpty()) { return Domain.all(type); @@ -125,7 +130,6 @@ public static Domain getDomain(Type type, long rowCount, Statistics statistic boolean hasNullValue = statistics.getNumNulls() != 0L; - // ignore corrupted statistics if (statistics.genericGetMin() == null || statistics.genericGetMax() == null) { return Domain.create(ValueSet.all(type), hasNullValue); } @@ -149,16 +153,16 @@ else if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) | ParquetIntegerStatistics parquetIntegerStatistics; if (statistics instanceof LongStatistics) { LongStatistics longStatistics = (LongStatistics) statistics; - // ignore corrupted statistics if (longStatistics.genericGetMin() > longStatistics.genericGetMax()) { + failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, longStatistics); return Domain.create(ValueSet.all(type), hasNullValue); } parquetIntegerStatistics = new ParquetIntegerStatistics(longStatistics.genericGetMin(), longStatistics.genericGetMax()); } else { IntStatistics intStatistics = (IntStatistics) statistics; - // ignore corrupted statistics if (intStatistics.genericGetMin() > intStatistics.genericGetMax()) { + failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, intStatistics); return Domain.create(ValueSet.all(type), hasNullValue); } parquetIntegerStatistics = new ParquetIntegerStatistics((long) intStatistics.getMin(), (long) intStatistics.getMax()); @@ -170,8 +174,8 @@ else if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) | } else if (type.equals(REAL) && statistics instanceof FloatStatistics) { FloatStatistics floatStatistics = (FloatStatistics) statistics; - // ignore corrupted statistics if (floatStatistics.genericGetMin() > floatStatistics.genericGetMax()) { + failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, floatStatistics); return Domain.create(ValueSet.all(type), hasNullValue); } @@ -183,8 +187,8 @@ else if (type.equals(REAL) && statistics instanceof FloatStatistics) { } else if (type.equals(DOUBLE) && statistics instanceof DoubleStatistics) { DoubleStatistics doubleStatistics = (DoubleStatistics) statistics; - // ignore corrupted statistics if (doubleStatistics.genericGetMin() > doubleStatistics.genericGetMax()) { + failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, doubleStatistics); return Domain.create(ValueSet.all(type), hasNullValue); } ParquetDoubleStatistics parquetDoubleStatistics = new ParquetDoubleStatistics(doubleStatistics.genericGetMin(), doubleStatistics.genericGetMax()); @@ -194,8 +198,8 @@ else if (isVarcharType(type) && statistics instanceof BinaryStatistics) { BinaryStatistics binaryStatistics = (BinaryStatistics) statistics; Slice minSlice = Slices.wrappedBuffer(binaryStatistics.getMin().getBytes()); Slice maxSlice = Slices.wrappedBuffer(binaryStatistics.getMax().getBytes()); - // ignore corrupted statistics if (minSlice.compareTo(maxSlice) > 0) { + failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, binaryStatistics); return Domain.create(ValueSet.all(type), hasNullValue); } ParquetStringStatistics parquetStringStatistics = new ParquetStringStatistics(minSlice, maxSlice); @@ -203,8 +207,8 @@ else if (isVarcharType(type) && statistics instanceof BinaryStatistics) { } else if (type.equals(DATE) && statistics instanceof IntStatistics) { IntStatistics intStatistics = (IntStatistics) statistics; - // ignore corrupted statistics if (intStatistics.genericGetMin() > intStatistics.genericGetMax()) { + failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, intStatistics); return Domain.create(ValueSet.all(type), hasNullValue); } ParquetIntegerStatistics parquetIntegerStatistics = new ParquetIntegerStatistics((long) intStatistics.getMin(), (long) intStatistics.getMax()); @@ -280,6 +284,14 @@ else if (isVarcharType(type) && columnDescriptor.getType() == PrimitiveTypeName. return null; } + private static void failWithCorruptionException(boolean failOnCorruptedParquetStatistics, String column, ParquetDataSourceId id, Statistics statistics) + throws ParquetCorruptionException + { + if (failOnCorruptedParquetStatistics) { + throw new ParquetCorruptionException(format("Corrupted statistics for column \"%s\" in Parquet file \"%s\": [%s]", column, id, statistics)); + } + } + private static > Domain createDomain(Type type, boolean hasNullValue, ParquetRangeStatistics rangeStatistics) { return createDomain(type, hasNullValue, rangeStatistics, value -> value); diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/TestTupleDomainParquetPredicate.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/TestTupleDomainParquetPredicate.java index 3cce026ad887d..dfaf6325bcfb1 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/TestTupleDomainParquetPredicate.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/TestTupleDomainParquetPredicate.java @@ -59,6 +59,7 @@ import static java.lang.Float.floatToRawIntBits; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static parquet.column.statistics.Statistics.getStatsBasedOnType; @@ -67,15 +68,19 @@ public class TestTupleDomainParquetPredicate { + private static final ParquetDataSourceId ID = new ParquetDataSourceId("testFile"); + @Test public void testBoolean() + throws ParquetCorruptionException { - assertEquals(getDomain(BOOLEAN, 0, null), all(BOOLEAN)); + String column = "BooleanColumn"; + assertEquals(getDomain(BOOLEAN, 0, null, ID, column, true), all(BOOLEAN)); - assertEquals(getDomain(BOOLEAN, 10, booleanColumnStats(true, true)), singleValue(BOOLEAN, true)); - assertEquals(getDomain(BOOLEAN, 10, booleanColumnStats(false, false)), singleValue(BOOLEAN, false)); + assertEquals(getDomain(BOOLEAN, 10, booleanColumnStats(true, true), ID, column, true), singleValue(BOOLEAN, true)); + assertEquals(getDomain(BOOLEAN, 10, booleanColumnStats(false, false), ID, column, true), singleValue(BOOLEAN, false)); - assertEquals(getDomain(BOOLEAN, 20, booleanColumnStats(false, true)), all(BOOLEAN)); + assertEquals(getDomain(BOOLEAN, 20, booleanColumnStats(false, true), ID, column, true), all(BOOLEAN)); } private static BooleanStatistics booleanColumnStats(boolean minimum, boolean maximum) @@ -87,12 +92,20 @@ private static BooleanStatistics booleanColumnStats(boolean minimum, boolean max @Test public void testBigint() + throws ParquetCorruptionException { - assertEquals(getDomain(BIGINT, 0, null), all(BIGINT)); - - assertEquals(getDomain(BIGINT, 10, longColumnStats(100L, 100L)), singleValue(BIGINT, 100L)); - - assertEquals(getDomain(BIGINT, 10, longColumnStats(0L, 100L)), create(ValueSet.ofRanges(range(BIGINT, 0L, true, 100L, true)), false)); + String column = "BigintColumn"; + assertEquals(getDomain(BIGINT, 0, null, ID, column, true), all(BIGINT)); + + assertEquals(getDomain(BIGINT, 10, longColumnStats(100L, 100L), ID, column, true), singleValue(BIGINT, 100L)); + + assertEquals(getDomain(BIGINT, 10, longColumnStats(0L, 100L), ID, column, true), create(ValueSet.ofRanges(range(BIGINT, 0L, true, 100L, true)), false)); + // ignore corrupted statistics + assertEquals(getDomain(BIGINT, 10, longColumnStats(100L, 0L), ID, column, false), create(ValueSet.all(BIGINT), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(BIGINT, 10, longColumnStats(100L, 10L), ID, column, true)) + .withMessage("Corrupted statistics for column \"BigintColumn\" in Parquet file \"testFile\": [min: 100, max: 10, num_nulls: 0]"); } private static LongStatistics longColumnStats(long minimum, long maximum) @@ -104,48 +117,82 @@ private static LongStatistics longColumnStats(long minimum, long maximum) @Test public void testInteger() + throws ParquetCorruptionException { - assertEquals(getDomain(INTEGER, 0, null), all(INTEGER)); + String column = "IntegerColumn"; + assertEquals(getDomain(INTEGER, 0, null, ID, column, true), all(INTEGER)); - assertEquals(getDomain(INTEGER, 10, longColumnStats(100, 100)), singleValue(INTEGER, 100L)); + assertEquals(getDomain(INTEGER, 10, longColumnStats(100, 100), ID, column, true), singleValue(INTEGER, 100L)); - assertEquals(getDomain(INTEGER, 10, longColumnStats(0, 100)), create(ValueSet.ofRanges(range(INTEGER, 0L, true, 100L, true)), false)); + assertEquals(getDomain(INTEGER, 10, longColumnStats(0, 100), ID, column, true), create(ValueSet.ofRanges(range(INTEGER, 0L, true, 100L, true)), false)); - assertEquals(getDomain(INTEGER, 20, longColumnStats(0, 2147483648L)), notNull(INTEGER)); + assertEquals(getDomain(INTEGER, 20, longColumnStats(0, 2147483648L), ID, column, true), notNull(INTEGER)); + // ignore corrupted statistics + assertEquals(getDomain(INTEGER, 10, longColumnStats(2147483648L, 0), ID, column, false), create(ValueSet.all(INTEGER), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(INTEGER, 10, longColumnStats(2147483648L, 10), ID, column, true)) + .withMessage("Corrupted statistics for column \"IntegerColumn\" in Parquet file \"testFile\": [min: 2147483648, max: 10, num_nulls: 0]"); } @Test public void testSmallint() + throws ParquetCorruptionException { - assertEquals(getDomain(SMALLINT, 0, null), all(SMALLINT)); + String column = "SmallintColumn"; + assertEquals(getDomain(SMALLINT, 0, null, ID, column, true), all(SMALLINT)); - assertEquals(getDomain(SMALLINT, 10, longColumnStats(100, 100)), singleValue(SMALLINT, 100L)); + assertEquals(getDomain(SMALLINT, 10, longColumnStats(100, 100), ID, column, true), singleValue(SMALLINT, 100L)); - assertEquals(getDomain(SMALLINT, 10, longColumnStats(0, 100)), create(ValueSet.ofRanges(range(SMALLINT, 0L, true, 100L, true)), false)); + assertEquals(getDomain(SMALLINT, 10, longColumnStats(0, 100), ID, column, true), create(ValueSet.ofRanges(range(SMALLINT, 0L, true, 100L, true)), false)); - assertEquals(getDomain(SMALLINT, 20, longColumnStats(0, 2147483648L)), notNull(SMALLINT)); + assertEquals(getDomain(SMALLINT, 20, longColumnStats(0, 2147483648L), ID, column, true), notNull(SMALLINT)); + // ignore corrupted statistics + assertEquals(getDomain(SMALLINT, 10, longColumnStats(2147483648L, 0), ID, column, false), create(ValueSet.all(SMALLINT), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(SMALLINT, 10, longColumnStats(2147483648L, 10), ID, column, true)) + .withMessage("Corrupted statistics for column \"SmallintColumn\" in Parquet file \"testFile\": [min: 2147483648, max: 10, num_nulls: 0]"); } @Test public void testTinyint() + throws ParquetCorruptionException { - assertEquals(getDomain(TINYINT, 0, null), all(TINYINT)); + String column = "TinyintColumn"; + assertEquals(getDomain(TINYINT, 0, null, ID, column, true), all(TINYINT)); - assertEquals(getDomain(TINYINT, 10, longColumnStats(100, 100)), singleValue(TINYINT, 100L)); + assertEquals(getDomain(TINYINT, 10, longColumnStats(100, 100), ID, column, true), singleValue(TINYINT, 100L)); - assertEquals(getDomain(TINYINT, 10, longColumnStats(0, 100)), create(ValueSet.ofRanges(range(TINYINT, 0L, true, 100L, true)), false)); + assertEquals(getDomain(TINYINT, 10, longColumnStats(0, 100), ID, column, true), create(ValueSet.ofRanges(range(TINYINT, 0L, true, 100L, true)), false)); - assertEquals(getDomain(TINYINT, 20, longColumnStats(0, 2147483648L)), notNull(TINYINT)); + assertEquals(getDomain(TINYINT, 20, longColumnStats(0, 2147483648L), ID, column, true), notNull(TINYINT)); + + // ignore corrupted statistics + assertEquals(getDomain(TINYINT, 10, longColumnStats(2147483648L, 0), ID, column, false), create(ValueSet.all(TINYINT), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(TINYINT, 10, longColumnStats(2147483648L, 10), ID, column, true)) + .withMessage("Corrupted statistics for column \"TinyintColumn\" in Parquet file \"testFile\": [min: 2147483648, max: 10, num_nulls: 0]"); } @Test public void testDouble() + throws ParquetCorruptionException { - assertEquals(getDomain(DOUBLE, 0, null), all(DOUBLE)); + String column = "DoubleColumn"; + assertEquals(getDomain(DOUBLE, 0, null, ID, column, true), all(DOUBLE)); + + assertEquals(getDomain(DOUBLE, 10, doubleColumnStats(42.24, 42.24), ID, column, true), singleValue(DOUBLE, 42.24)); - assertEquals(getDomain(DOUBLE, 10, doubleColumnStats(42.24, 42.24)), singleValue(DOUBLE, 42.24)); + assertEquals(getDomain(DOUBLE, 10, doubleColumnStats(3.3, 42.24), ID, column, true), create(ValueSet.ofRanges(range(DOUBLE, 3.3, true, 42.24, true)), false)); - assertEquals(getDomain(DOUBLE, 10, doubleColumnStats(3.3, 42.24)), create(ValueSet.ofRanges(range(DOUBLE, 3.3, true, 42.24, true)), false)); + // ignore corrupted statistics + assertEquals(getDomain(DOUBLE, 10, doubleColumnStats(42.24, 3.3), ID, column, false), create(ValueSet.all(DOUBLE), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(DOUBLE, 10, doubleColumnStats(42.24, 3.3), ID, column, true)) + .withMessage("Corrupted statistics for column \"DoubleColumn\" in Parquet file \"testFile\": [min: 42.24000, max: 3.30000, num_nulls: 0]"); } private static DoubleStatistics doubleColumnStats(double minimum, double maximum) @@ -157,14 +204,23 @@ private static DoubleStatistics doubleColumnStats(double minimum, double maximum @Test public void testString() + throws ParquetCorruptionException { - assertEquals(getDomain(createUnboundedVarcharType(), 0, null), all(createUnboundedVarcharType())); + String column = "StringColumn"; + assertEquals(getDomain(createUnboundedVarcharType(), 0, null, ID, column, true), all(createUnboundedVarcharType())); + + assertEquals(getDomain(createUnboundedVarcharType(), 10, stringColumnStats("taco", "taco"), ID, column, true), singleValue(createUnboundedVarcharType(), utf8Slice("taco"))); - assertEquals(getDomain(createUnboundedVarcharType(), 10, stringColumnStats("taco", "taco")), singleValue(createUnboundedVarcharType(), utf8Slice("taco"))); + assertEquals(getDomain(createUnboundedVarcharType(), 10, stringColumnStats("apple", "taco"), ID, column, true), create(ValueSet.ofRanges(range(createUnboundedVarcharType(), utf8Slice("apple"), true, utf8Slice("taco"), true)), false)); - assertEquals(getDomain(createUnboundedVarcharType(), 10, stringColumnStats("apple", "taco")), create(ValueSet.ofRanges(range(createUnboundedVarcharType(), utf8Slice("apple"), true, utf8Slice("taco"), true)), false)); + assertEquals(getDomain(createUnboundedVarcharType(), 10, stringColumnStats("中国", "美利坚"), ID, column, true), create(ValueSet.ofRanges(range(createUnboundedVarcharType(), utf8Slice("中国"), true, utf8Slice("美利坚"), true)), false)); - assertEquals(getDomain(createUnboundedVarcharType(), 10, stringColumnStats("中国", "美利坚")), create(ValueSet.ofRanges(range(createUnboundedVarcharType(), utf8Slice("中国"), true, utf8Slice("美利坚"), true)), false)); + // ignore corrupted statistics + assertEquals(getDomain(createUnboundedVarcharType(), 10, stringColumnStats("taco", "apple"), ID, column, false), create(ValueSet.all(createUnboundedVarcharType()), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(createUnboundedVarcharType(), 10, stringColumnStats("taco", "apple"), ID, column, true)) + .withMessage("Corrupted statistics for column \"StringColumn\" in Parquet file \"testFile\": [min: taco, max: apple, num_nulls: 0]"); } private static BinaryStatistics stringColumnStats(String minimum, String maximum) @@ -176,33 +232,48 @@ private static BinaryStatistics stringColumnStats(String minimum, String maximum @Test public void testFloat() + throws ParquetCorruptionException { - assertEquals(getDomain(REAL, 0, null), all(REAL)); + String column = "FloatColumn"; + assertEquals(getDomain(REAL, 0, null, ID, column, true), all(REAL)); float minimum = 4.3f; float maximum = 40.3f; - assertEquals(getDomain(REAL, 10, floatColumnStats(minimum, minimum)), singleValue(REAL, (long) floatToRawIntBits(minimum))); + assertEquals(getDomain(REAL, 10, floatColumnStats(minimum, minimum), ID, column, true), singleValue(REAL, (long) floatToRawIntBits(minimum))); assertEquals( - getDomain(REAL, 10, floatColumnStats(minimum, maximum)), + getDomain(REAL, 10, floatColumnStats(minimum, maximum), ID, column, true), create(ValueSet.ofRanges(range(REAL, (long) floatToRawIntBits(minimum), true, (long) floatToRawIntBits(maximum), true)), false)); - assertEquals(getDomain(REAL, 10, floatColumnStats(maximum, minimum)), create(ValueSet.all(REAL), false)); + // ignore corrupted statistics + assertEquals(getDomain(REAL, 10, floatColumnStats(maximum, minimum), ID, column, false), create(ValueSet.all(REAL), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(REAL, 10, floatColumnStats(maximum, minimum), ID, column, true)) + .withMessage("Corrupted statistics for column \"FloatColumn\" in Parquet file \"testFile\": [min: 40.30000, max: 4.30000, num_nulls: 0]"); } @Test public void testDate() + throws ParquetCorruptionException { - assertEquals(getDomain(DATE, 0, null), all(DATE)); - assertEquals(getDomain(DATE, 10, intColumnStats(100, 100)), singleValue(DATE, 100L)); - assertEquals(getDomain(DATE, 10, intColumnStats(0, 100)), create(ValueSet.ofRanges(range(DATE, 0L, true, 100L, true)), false)); - // assert corrupt stats are ignored properly - assertEquals(getDomain(DATE, 10, intColumnStats(200, 100)), create(ValueSet.all(DATE), false)); + String column = "DateColumn"; + assertEquals(getDomain(DATE, 0, null, ID, column, true), all(DATE)); + assertEquals(getDomain(DATE, 10, intColumnStats(100, 100), ID, column, true), singleValue(DATE, 100L)); + assertEquals(getDomain(DATE, 10, intColumnStats(0, 100), ID, column, true), create(ValueSet.ofRanges(range(DATE, 0L, true, 100L, true)), false)); + + // ignore corrupted statistics + assertEquals(getDomain(DATE, 10, intColumnStats(200, 100), ID, column, false), create(ValueSet.all(DATE), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(DATE, 10, intColumnStats(200, 100), ID, column, true)) + .withMessage("Corrupted statistics for column \"DateColumn\" in Parquet file \"testFile\": [min: 200, max: 100, num_nulls: 0]"); } @Test public void testMatchesWithStatistics() + throws ParquetCorruptionException { String value = "Test"; ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"path"}, BINARY, 0, 0); @@ -212,11 +283,12 @@ public void testMatchesWithStatistics() Statistics stats = getStatsBasedOnType(column.getType()); stats.setNumNulls(1L); stats.setMinMaxFromBytes(value.getBytes(), value.getBytes()); - assertTrue(parquetPredicate.matches(2, singletonMap(column, stats))); + assertTrue(parquetPredicate.matches(2, singletonMap(column, stats), ID, true)); } @Test public void testMatchesWithDescriptors() + throws ParquetCorruptionException { ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"path"}, BINARY, 0, 0); RichColumnDescriptor column = new RichColumnDescriptor(columnDescriptor, new PrimitiveType(OPTIONAL, BINARY, "Test column"));