diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 9eb471f26c..6481b8f7c7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.Log; import org.apache.parquet.format.PageEncodingStats; @@ -81,6 +82,20 @@ public class ParquetMetadataConverter { private static final Log LOG = Log.getLog(ParquetMetadataConverter.class); + private final boolean useSignedStringMinMax; + + public ParquetMetadataConverter() { + this(false); + } + + public ParquetMetadataConverter(Configuration conf) { + this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); + } + + private ParquetMetadataConverter(boolean useSignedStringMinMax) { + this.useSignedStringMinMax = useSignedStringMinMax; + } + // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate // sets of encodings. It is important that all collections inserted to this cache be // immutable and have thread-safe read-only access. This can be achieved by wrapping @@ -308,14 +323,28 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist return fromParquetStatistics(null, statistics, type); } + /** + * @deprecated Use {@link #fromParquetStatistics(String, Statistics, PrimitiveType)} instead. + */ + @Deprecated public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics (String createdBy, Statistics statistics, PrimitiveTypeName type) { + return fromParquetStatisticsInternal(createdBy, statistics, type, defaultSortOrder(type)); + } + + // Visible for testing + static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal + (String createdBy, Statistics statistics, PrimitiveTypeName type, SortOrder typeSortOrder) { // create stats object based on the column type org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type); // If there was no statistics written to the footer, create an empty Statistics object and return // NOTE: See docs in CorruptStatistics for explanation of why this check is needed - if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type)) { + // The sort order is checked to avoid returning min/max stats that are not + // valid with the type's sort order. Currently, all stats are aggregated + // using a signed ordering, which isn't valid for strings or unsigned ints. + if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type) && + SortOrder.SIGNED == typeSortOrder) { if (statistics.isSetMax() && statistics.isSetMin()) { stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array()); } @@ -324,6 +353,112 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist return stats; } + public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( + String createdBy, Statistics statistics, PrimitiveType type) { + SortOrder expectedOrder = overrideSortOrderToSigned(type) ? + SortOrder.SIGNED : sortOrder(type); + return fromParquetStatisticsInternal( + createdBy, statistics, type.getPrimitiveTypeName(), expectedOrder); + } + + /** + * Sort order for page and column statistics. Types are associated with sort + * orders (e.g., UTF8 columns should use UNSIGNED) and column stats are + * aggregated using a sort order. As of parquet-format version 2.3.1, the + * order used to aggregate stats is always SIGNED and is not stored in the + * Parquet file. These stats are discarded for types that need unsigned. + * + * See PARQUET-686. + */ + enum SortOrder { + SIGNED, + UNSIGNED, + UNKNOWN + } + + private static final Set STRING_TYPES = Collections + .unmodifiableSet(new HashSet<>(Arrays.asList( + OriginalType.UTF8, OriginalType.ENUM, OriginalType.JSON + ))); + + /** + * Returns whether to use signed order min and max with a type. It is safe to + * use signed min and max when the type is a string type and contains only + * ASCII characters (where the sign bit was 0). This checks whether the type + * is a string type and uses {@code useSignedStringMinMax} to determine if + * only ASCII characters were written. + * + * @param type a primitive type with a logical type annotation + * @return true if signed order min/max can be used with this type + */ + private boolean overrideSortOrderToSigned(PrimitiveType type) { + // even if the override is set, only return stats for string-ish types + // a null type annotation is considered string-ish because some writers + // failed to use the UTF8 annotation. + OriginalType annotation = type.getOriginalType(); + return useSignedStringMinMax && + PrimitiveTypeName.BINARY == type.getPrimitiveTypeName() && + (annotation == null || STRING_TYPES.contains(annotation)); + } + + /** + * @param primitive a primitive physical type + * @return the default sort order used when the logical type is not known + */ + private static SortOrder defaultSortOrder(PrimitiveTypeName primitive) { + switch (primitive) { + case BOOLEAN: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + return SortOrder.SIGNED; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + case INT96: // only used for timestamp, which uses unsigned values + return SortOrder.UNSIGNED; + } + return SortOrder.UNKNOWN; + } + + /** + * @param primitive a primitive type with a logical type annotation + * @return the "correct" sort order of the type that applications assume + */ + private static SortOrder sortOrder(PrimitiveType primitive) { + OriginalType annotation = primitive.getOriginalType(); + if (annotation != null) { + switch (annotation) { + case INT_8: + case INT_16: + case INT_32: + case INT_64: + case DATE: + case TIME_MICROS: + case TIME_MILLIS: + case TIMESTAMP_MICROS: + case TIMESTAMP_MILLIS: + return SortOrder.SIGNED; + case UINT_8: + case UINT_16: + case UINT_32: + case UINT_64: + case ENUM: + case UTF8: + case BSON: + case JSON: + return SortOrder.UNSIGNED; + case DECIMAL: + case LIST: + case MAP: + case MAP_KEY_VALUE: + case INTERVAL: + return SortOrder.UNKNOWN; + } + } + return defaultSortOrder(primitive.getPrimitiveTypeName()); + } + public PrimitiveTypeName getPrimitive(Type type) { switch (type) { case BYTE_ARRAY: // TODO: rename BINARY and remove this switch @@ -687,7 +822,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws fromParquetStatistics( parquetMetadata.getCreated_by(), metaData.statistics, - messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()), + messageType.getType(path.toArray()).asPrimitiveType()), metaData.data_page_offset, metaData.dictionary_page_offset, metaData.num_values, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 57cdb7dca8..9e95535b7c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -24,7 +24,6 @@ import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE; @@ -53,6 +52,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -95,6 +95,7 @@ import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.PrimitiveType; /** * Internal implementation of the Parquet file reader as a block container @@ -108,7 +109,7 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; - private static ParquetMetadataConverter converter = new ParquetMetadataConverter(); + private final ParquetMetadataConverter converter; /** * for files provided, check if there's a summary file. @@ -444,9 +445,19 @@ public static final ParquetMetadata readFooter(Configuration configuration, File */ public static final ParquetMetadata readFooter( InputFile file, MetadataFilter filter) throws IOException { + ParquetMetadataConverter converter; + // TODO: remove this temporary work-around. + // this is necessary to pass the Configuration to ParquetMetadataConverter + // and should be removed when there is a non-Hadoop configuration. + if (file instanceof HadoopInputFile) { + converter = new ParquetMetadataConverter( + ((HadoopInputFile) file).getConfiguration()); + } else { + converter = new ParquetMetadataConverter(); + } try (SeekableInputStream in = file.newStream()) { - return readFooter(converter, file.getLength(), file.toString(), - in, filter); + + return readFooter(converter, file.getLength(), file.toString(), in, filter); } } @@ -538,6 +549,7 @@ public ParquetFileReader(Configuration configuration, Path filePath, List blocks, List columns) throws IOException { + this.converter = new ParquetMetadataConverter(configuration); this.conf = configuration; this.fileMetaData = fileMetaData; FileSystem fs = filePath.getFileSystem(configuration); @@ -569,6 +581,7 @@ private ParquetFileReader(Configuration configuration, Path file) throws IOExcep * @throws IOException if the file can not be opened */ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException { + this.converter = new ParquetMetadataConverter(conf); this.conf = conf; FileSystem fs = file.getFileSystem(conf); this.fileStatus = fs.getFileStatus(file); @@ -592,6 +605,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t * @throws IOException if the file can not be opened */ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) throws IOException { + this.converter = new ParquetMetadataConverter(conf); this.conf = conf; FileSystem fs = file.getFileSystem(conf); this.fileStatus = fs.getFileStatus(file); @@ -781,7 +795,7 @@ DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException { compressedPage.getEncoding()); } - private static DictionaryPage readCompressedDictionary( + private DictionaryPage readCompressedDictionary( PageHeader pageHeader, SeekableInputStream fin) throws IOException { DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); @@ -843,6 +857,8 @@ protected PageHeader readPageHeader() throws IOException { public ColumnChunkPageReader readAllPages() throws IOException { List pagesInChunk = new ArrayList(); DictionaryPage dictionaryPage = null; + PrimitiveType type = getFileMetaData().getSchema() + .getType(descriptor.col.getPath()).asPrimitiveType(); long valuesCountReadSoFar = 0; while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) { PageHeader pageHeader = readPageHeader(); @@ -870,10 +886,10 @@ public ColumnChunkPageReader readAllPages() throws IOException { this.readAsBytesInput(compressedPageSize), dataHeaderV1.getNum_values(), uncompressedPageSize, - fromParquetStatistics( + converter.fromParquetStatistics( getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), - descriptor.col.getType()), + type), converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), converter.getEncoding(dataHeaderV1.getEncoding()) @@ -893,10 +909,10 @@ public ColumnChunkPageReader readAllPages() throws IOException { converter.getEncoding(dataHeaderV2.getEncoding()), this.readAsBytesInput(dataSize), uncompressedPageSize, - fromParquetStatistics( + converter.fromParquetStatistics( getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), - descriptor.col.getType()), + type), dataHeaderV2.isIs_compressed() )); valuesCountReadSoFar += dataHeaderV2.getNum_values(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index d5868d3011..fb876a8501 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -31,22 +31,28 @@ public class HadoopInputFile implements InputFile { private final FileSystem fs; private final FileStatus stat; + private final Configuration conf; public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); - return new HadoopInputFile(fs, fs.getFileStatus(path)); + return new HadoopInputFile(fs, fs.getFileStatus(path), conf); } public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) throws IOException { FileSystem fs = stat.getPath().getFileSystem(conf); - return new HadoopInputFile(fs, stat); + return new HadoopInputFile(fs, stat, conf); } - private HadoopInputFile(FileSystem fs, FileStatus stat) { + private HadoopInputFile(FileSystem fs, FileStatus stat, Configuration conf) { this.fs = fs; this.stat = stat; + this.conf = conf; + } + + public Configuration getConfiguration() { + return conf; } @Override diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 3c888c37d9..35c35c19c6 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -45,6 +45,7 @@ import java.util.TreeSet; import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -401,8 +402,9 @@ public void testBinaryStats() { Assert.assertFalse("Num nulls should not be set", formatStats.isSetNull_count()); - Statistics roundTripStats = ParquetMetadataConverter.fromParquetStatistics( - Version.FULL_VERSION, formatStats, PrimitiveTypeName.BINARY); + Statistics roundTripStats = ParquetMetadataConverter.fromParquetStatisticsInternal( + Version.FULL_VERSION, formatStats, PrimitiveTypeName.BINARY, + ParquetMetadataConverter.SortOrder.SIGNED); Assert.assertTrue(roundTripStats.isEmpty()); } @@ -515,4 +517,51 @@ public void testBooleanStats() { Assert.assertEquals("Num nulls should match", 3004, formatStats.getNull_count()); } + + @Test + public void testIgnoreStatsWithSignedSortOrder() { + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + BinaryStatistics stats = new BinaryStatistics(); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("A")); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("z")); + stats.incrementNumNulls(); + + Statistics convertedStats = converter.fromParquetStatistics( + Version.FULL_VERSION, + ParquetMetadataConverter.toParquetStatistics(stats), + Types.required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8).named("b")); + + Assert.assertTrue("Stats should be empty", convertedStats.isEmpty()); + } + + @Test + public void testUseStatsWithSignedSortOrder() { + // override defaults and use stats that were accumulated using signed order + Configuration conf = new Configuration(); + conf.setBoolean("parquet.strings.signed-min-max.enabled", true); + + ParquetMetadataConverter converter = new ParquetMetadataConverter(conf); + BinaryStatistics stats = new BinaryStatistics(); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("A")); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("z")); + stats.incrementNumNulls(); + + Statistics convertedStats = converter.fromParquetStatistics( + Version.FULL_VERSION, + ParquetMetadataConverter.toParquetStatistics(stats), + Types.required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8).named("b")); + + Assert.assertFalse("Stats should not be empty", convertedStats.isEmpty()); + Assert.assertEquals("Should have 3 nulls", 3, convertedStats.getNumNulls()); + Assert.assertEquals("Should have correct min (unsigned sort)", + Binary.fromString("A"), convertedStats.genericGetMin()); + Assert.assertEquals("Should have correct max (unsigned sort)", + Binary.fromString("z"), convertedStats.genericGetMax()); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 597daa801b..c56515fcaf 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -24,9 +24,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.CorruptStatistics; import org.apache.parquet.Version; -import org.apache.parquet.VersionParser; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.junit.Assume; @@ -452,8 +450,9 @@ public void testWriteReadStatistics() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); - MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}"); + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b (UTF8);} required group c { required int64 d; }}"); String[] path1 = {"a", "b"}; ColumnDescriptor c1 = schema.getColumnDescription(path1); String[] path2 = {"c", "d"}; @@ -584,13 +583,14 @@ public void testWriteReadStatisticsAllNulls() throws Exception { testFile.delete(); writeSchema = "message example {\n" + - "required binary content;\n" + + "required binary content (UTF8);\n" + "}"; Path path = new Path(testFile.toURI()); MessageType schema = MessageTypeParser.parseMessageType(writeSchema); Configuration configuration = new Configuration(); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); GroupWriteSupport.setSchema(schema, configuration); ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport()); diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java index 1c416dd165..2407e6198a 100644 --- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java +++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java @@ -114,6 +114,7 @@ public void testWriteStatistics() throws Exception { new RequiredPrimitiveFixture(false, (byte)100, (short)100, 100, 287l, -9.0d, "world"), new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello")); final Configuration configuration = new Configuration(); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); final FileSystem fs = p.getFileSystem(configuration); FileStatus fileStatus = fs.getFileStatus(p); ParquetMetadata footer = ParquetFileReader.readFooter(configuration, p); @@ -160,6 +161,7 @@ public void testWriteStatistics() throws Exception { // make new configuration and create file with new large stats final Configuration configuration_large = new Configuration(); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); final FileSystem fs_large = p_large.getFileSystem(configuration_large); FileStatus fileStatus_large = fs_large.getFileStatus(p_large); ParquetMetadata footer_large = ParquetFileReader.readFooter(configuration_large, p_large);