diff --git a/CHANGES.md b/CHANGES.md index 7785db5486..98873a4794 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,33 @@ # Parquet # +### Version 1.12.2 ### + +Release Notes - Parquet - Version 1.12.2 + +#### Bug + +* [PARQUET-2094](https://issues.apache.org/jira/browse/PARQUET-2094) - Handle negative values in page headers + +### Version 1.12.1 ### + +Release Notes - Parquet - Version 1.12.1 + +#### Bug + +* [PARQUET-1633](https://issues.apache.org/jira/browse/PARQUET-1633) - Fix integer overflow +* [PARQUET-2022](https://issues.apache.org/jira/browse/PARQUET-2022) - ZstdDecompressorStream should close zstdInputStream +* [PARQUET-2027](https://issues.apache.org/jira/browse/PARQUET-2027) - Fix calculating directory offset for merge +* [PARQUET-2052](https://issues.apache.org/jira/browse/PARQUET-2052) - Integer overflow when writing huge binary using dictionary encoding +* [PARQUET-2054](https://issues.apache.org/jira/browse/PARQUET-2054) - fix TCP leaking when calling ParquetFileWriter.appendFile +* [PARQUET-2072](https://issues.apache.org/jira/browse/PARQUET-2072) - Do Not Determine Both Min/Max for Binary Stats +* [PARQUET-2073](https://issues.apache.org/jira/browse/PARQUET-2073) - Fix estimate remaining row count in ColumnWriteStoreBase. +* [PARQUET-2078](https://issues.apache.org/jira/browse/PARQUET-2078) - Failed to read parquet file after writing with the same parquet version + +#### Improvement + +* [PARQUET-2064](https://issues.apache.org/jira/browse/PARQUET-2064) - Make Range public accessible in RowRanges + ### Version 1.12.0 ### Release Notes - Parquet - Version 1.12.0 @@ -739,7 +766,7 @@ Release Notes - Parquet - Version 1.10.0 * ISSUE [346](https://github.com/Parquet/parquet-mr/pull/346): stop using strings and b64 for compressed input splits * ISSUE [345](https://github.com/Parquet/parquet-mr/pull/345): set cascading version to 2.5.3 * ISSUE [342](https://github.com/Parquet/parquet-mr/pull/342): compress kv pairs in ParquetInputSplits - + ### Version 1.4.0 ### * ISSUE [333](https://github.com/Parquet/parquet-mr/pull/333): Compress schemas in split * ISSUE [329](https://github.com/Parquet/parquet-mr/pull/329): fix filesystem resolution @@ -879,37 +906,37 @@ Release Notes - Parquet - Version 1.10.0 * ISSUE 159: Counter for mapred * ISSUE 156: Fix site * ISSUE 153: Fix projection required field - + ### Version 1.1.1 ### * ISSUE 150: add thrift validation on read ### Version 1.1.0 ### -* ISSUE 149: changing default block size to 128mb -* ISSUE 146: Fix and add unit tests for Hive nested types -* ISSUE 145: add getStatistics method to parquetloader -* ISSUE 144: Map key fields should allow other types than strings -* ISSUE 143: Fix empty encoding col metadata -* ISSUE 142: Fix total size row group -* ISSUE 141: add parquet counters for benchmark -* ISSUE 140: Implemented partial schema for GroupReadSupport -* ISSUE 138: fix bug of wrong column metadata size -* ISSUE 137: ParquetMetadataConverter bug -* ISSUE 133: Update plugin versions for maven aether migration - fixes #125 -* ISSUE 130: Schema validation should not validate the root element's name -* ISSUE 127: Adding dictionary encoding for non string types.. #99 -* ISSUE 125: Unable to build -* ISSUE 124: Fix Short and Byte types in Hive SerDe. -* ISSUE 123: Fix Snappy compressor in parquet-hadoop. -* ISSUE 120: Fix RLE bug with partial literal groups at end of stream. -* ISSUE 118: Refactor column reader -* ISSUE 115: Map key fields should allow other types than strings -* ISSUE 103: Map key fields should allow other types than strings -* ISSUE 99: Dictionary encoding for non string types (float double int long boolean) -* ISSUE 47: Add tests for parquet-scrooge and parquet-cascading +* ISSUE 149: changing default block size to 128mb +* ISSUE 146: Fix and add unit tests for Hive nested types +* ISSUE 145: add getStatistics method to parquetloader +* ISSUE 144: Map key fields should allow other types than strings +* ISSUE 143: Fix empty encoding col metadata +* ISSUE 142: Fix total size row group +* ISSUE 141: add parquet counters for benchmark +* ISSUE 140: Implemented partial schema for GroupReadSupport +* ISSUE 138: fix bug of wrong column metadata size +* ISSUE 137: ParquetMetadataConverter bug +* ISSUE 133: Update plugin versions for maven aether migration - fixes #125 +* ISSUE 130: Schema validation should not validate the root element's name +* ISSUE 127: Adding dictionary encoding for non string types.. #99 +* ISSUE 125: Unable to build +* ISSUE 124: Fix Short and Byte types in Hive SerDe. +* ISSUE 123: Fix Snappy compressor in parquet-hadoop. +* ISSUE 120: Fix RLE bug with partial literal groups at end of stream. +* ISSUE 118: Refactor column reader +* ISSUE 115: Map key fields should allow other types than strings +* ISSUE 103: Map key fields should allow other types than strings +* ISSUE 99: Dictionary encoding for non string types (float double int long boolean) +* ISSUE 47: Add tests for parquet-scrooge and parquet-cascading ### Version 1.0.1 ### -* ISSUE 126: Unit tests for parquet cascading -* ISSUE 121: fix wrong RecordConverter for ParquetTBaseScheme -* ISSUE 119: fix compatibility with thrift remove unused dependency +* ISSUE 126: Unit tests for parquet cascading +* ISSUE 121: fix wrong RecordConverter for ParquetTBaseScheme +* ISSUE 119: fix compatibility with thrift remove unused dependency ### Version 1.0.0 ### diff --git a/parquet-arrow/pom.xml b/parquet-arrow/pom.xml index dd20e9be9b..7d780a0a68 100644 --- a/parquet-arrow/pom.xml +++ b/parquet-arrow/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index c53071da88..7f77557a7d 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index 904e4e973e..97121e22c9 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-cascading-deprecated/pom.xml b/parquet-cascading-deprecated/pom.xml index 08f9d47753..00f92ec00b 100644 --- a/parquet-cascading-deprecated/pom.xml +++ b/parquet-cascading-deprecated/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-cascading3-deprecated/pom.xml b/parquet-cascading3-deprecated/pom.xml index 9c9d638416..64c28a97f6 100644 --- a/parquet-cascading3-deprecated/pom.xml +++ b/parquet-cascading3-deprecated/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index 379e81b4ec..d40a320e48 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java index fbeebdfba6..d7aa82dcfd 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java @@ -108,10 +108,8 @@ public boolean apply(@Nullable ColumnDescriptor input) { })); // now check to see if the data is actually corrupt - ParquetFileReader reader = new ParquetFileReader(getConf(), - fakeMeta, path, footer.getBlocks(), columns); - - try { + try (ParquetFileReader reader = new ParquetFileReader(getConf(), + fakeMeta, path, footer.getBlocks(), columns)) { PageStatsValidator validator = new PageStatsValidator(); for (PageReadStore pages = reader.readNextRowGroup(); pages != null; pages = reader.readNextRowGroup()) { diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java index ca29dd0268..988ab0f40f 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java @@ -119,9 +119,10 @@ private String getParquetSchema(String source) throws IOException { switch (format) { case PARQUET: - return new ParquetFileReader( - getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER) - .getFileMetaData().getSchema().toString(); + try (ParquetFileReader reader = new ParquetFileReader( + getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)) { + return reader.getFileMetaData().getSchema().toString(); + } default: throw new IllegalArgumentException(String.format( "Could not get a Parquet schema for format %s: %s", format, source)); diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java index 20a694ff7f..7a167ed635 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java @@ -64,56 +64,57 @@ public int run() throws IOException { String source = targets.get(0); - ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source)); - MessageType schema = reader.getFileMetaData().getSchema(); - ColumnDescriptor descriptor = Util.descriptor(column, schema); - PrimitiveType type = Util.primitive(column, schema); - Preconditions.checkNotNull(type); - - DictionaryPageReadStore dictionaryReader; - int rowGroup = 0; - while ((dictionaryReader = reader.getNextDictionaryReader()) != null) { - DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor); - - Dictionary dict = page.getEncoding().initDictionary(descriptor, page); - - console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize()); - for (int i = 0; i <= dict.getMaxId(); i += 1) { - switch(type.getPrimitiveTypeName()) { - case BINARY: - if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) { + MessageType schema = reader.getFileMetaData().getSchema(); + ColumnDescriptor descriptor = Util.descriptor(column, schema); + PrimitiveType type = Util.primitive(column, schema); + Preconditions.checkNotNull(type); + + DictionaryPageReadStore dictionaryReader; + int rowGroup = 0; + while ((dictionaryReader = reader.getNextDictionaryReader()) != null) { + DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor); + + Dictionary dict = page.getEncoding().initDictionary(descriptor, page); + + console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize()); + for (int i = 0; i <= dict.getMaxId(); i += 1) { + switch(type.getPrimitiveTypeName()) { + case BINARY: + if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + console.info("{}: {}", String.format("%6d", i), + Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70)); + } else { + console.info("{}: {}", String.format("%6d", i), + Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70)); + } + break; + case INT32: console.info("{}: {}", String.format("%6d", i), - Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70)); - } else { + dict.decodeToInt(i)); + break; + case INT64: console.info("{}: {}", String.format("%6d", i), - Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70)); - } - break; - case INT32: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToInt(i)); - break; - case INT64: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToLong(i)); - break; - case FLOAT: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToFloat(i)); - break; - case DOUBLE: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToDouble(i)); - break; - default: - throw new IllegalArgumentException( - "Unknown dictionary type: " + type.getPrimitiveTypeName()); + dict.decodeToLong(i)); + break; + case FLOAT: + console.info("{}: {}", String.format("%6d", i), + dict.decodeToFloat(i)); + break; + case DOUBLE: + console.info("{}: {}", String.format("%6d", i), + dict.decodeToDouble(i)); + break; + default: + throw new IllegalArgumentException( + "Unknown dictionary type: " + type.getPrimitiveTypeName()); + } } - } - reader.skipNextRowGroup(); + reader.skipNextRowGroup(); - rowGroup += 1; + rowGroup += 1; + } } console.info(""); diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java index 58321064e3..bf030ac606 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java @@ -75,57 +75,57 @@ public int run() throws IOException { "Cannot process multiple Parquet files."); String source = targets.get(0); - ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source)); - - MessageType schema = reader.getFileMetaData().getSchema(); - Map columns = Maps.newLinkedHashMap(); - if (this.columns == null || this.columns.isEmpty()) { - for (ColumnDescriptor descriptor : schema.getColumns()) { - columns.put(descriptor, primitive(schema, descriptor.getPath())); - } - } else { - for (String column : this.columns) { - columns.put(descriptor(column, schema), primitive(column, schema)); - } - } - - CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec(); - // accumulate formatted lines to print by column - Map> formatted = Maps.newLinkedHashMap(); - PageFormatter formatter = new PageFormatter(); - PageReadStore pageStore; - int rowGroupNum = 0; - while ((pageStore = reader.readNextRowGroup()) != null) { - for (ColumnDescriptor descriptor : columns.keySet()) { - List lines = formatted.get(columnName(descriptor)); - if (lines == null) { - lines = Lists.newArrayList(); - formatted.put(columnName(descriptor), lines); + try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) { + MessageType schema = reader.getFileMetaData().getSchema(); + Map columns = Maps.newLinkedHashMap(); + if (this.columns == null || this.columns.isEmpty()) { + for (ColumnDescriptor descriptor : schema.getColumns()) { + columns.put(descriptor, primitive(schema, descriptor.getPath())); } - - formatter.setContext(rowGroupNum, columns.get(descriptor), codec); - PageReader pages = pageStore.getPageReader(descriptor); - - DictionaryPage dict = pages.readDictionaryPage(); - if (dict != null) { - lines.add(formatter.format(dict)); + } else { + for (String column : this.columns) { + columns.put(descriptor(column, schema), primitive(column, schema)); } - DataPage page; - while ((page = pages.readPage()) != null) { - lines.add(formatter.format(page)); + } + + CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec(); + // accumulate formatted lines to print by column + Map> formatted = Maps.newLinkedHashMap(); + PageFormatter formatter = new PageFormatter(); + PageReadStore pageStore; + int rowGroupNum = 0; + while ((pageStore = reader.readNextRowGroup()) != null) { + for (ColumnDescriptor descriptor : columns.keySet()) { + List lines = formatted.get(columnName(descriptor)); + if (lines == null) { + lines = Lists.newArrayList(); + formatted.put(columnName(descriptor), lines); + } + + formatter.setContext(rowGroupNum, columns.get(descriptor), codec); + PageReader pages = pageStore.getPageReader(descriptor); + + DictionaryPage dict = pages.readDictionaryPage(); + if (dict != null) { + lines.add(formatter.format(dict)); + } + DataPage page; + while ((page = pages.readPage()) != null) { + lines.add(formatter.format(page)); + } } + rowGroupNum += 1; } - rowGroupNum += 1; - } - // TODO: Show total column size and overall size per value in the column summary line - for (String columnName : formatted.keySet()) { - console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-'))); - console.info(formatter.getHeader()); - for (String line : formatted.get(columnName)) { - console.info(line); + // TODO: Show total column size and overall size per value in the column summary line + for (String columnName : formatted.keySet()) { + console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-'))); + console.info(formatter.getHeader()); + for (String line : formatted.get(columnName)) { + console.info(line); + } + console.info(""); } - console.info(""); } return 0; diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 30bd635117..eed7a7504b 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 7e55ebcdb9..1ca1217a17 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.column; +import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; + import java.util.Objects; import java.util.OptionalLong; @@ -37,8 +39,6 @@ import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import org.apache.parquet.schema.MessageType; -import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; - /** * This class represents all the configurable Parquet properties. */ @@ -57,6 +57,8 @@ public class ParquetProperties { public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024; public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false; + public static final boolean DEFAULT_DYNAMIC_BLOOM_FILTER_ENABLED = true; + public static final int DEFAULT_BLOOM_FILTER_CANDIDATE_SIZE = 3; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; @@ -102,6 +104,8 @@ public static WriterVersion fromString(String name) { private final ColumnProperty bloomFilterNDVs; private final int maxBloomFilterBytes; private final ColumnProperty bloomFilterEnabled; + private final ColumnProperty dynamicBloomFilterEnabled; + private final ColumnProperty bloomFilterCandidateSize; private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final boolean enableByteStreamSplit; @@ -124,6 +128,8 @@ private ParquetProperties(Builder builder) { this.bloomFilterNDVs = builder.bloomFilterNDVs.build(); this.bloomFilterEnabled = builder.bloomFilterEnabled.build(); this.maxBloomFilterBytes = builder.maxBloomFilterBytes; + this.dynamicBloomFilterEnabled = builder.dynamicBloomFilterEnabled.build(); + this.bloomFilterCandidateSize = builder.bloomFilterCandidateSize.build(); this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.enableByteStreamSplit = builder.enableByteStreamSplit; @@ -266,6 +272,14 @@ public int getMaxBloomFilterBytes() { return maxBloomFilterBytes; } + public boolean getDynamicBloomFilterEnabled(ColumnDescriptor column) { + return dynamicBloomFilterEnabled.getValue(column); + } + + public int getBloomFilterCandidateSize(ColumnDescriptor column) { + return bloomFilterCandidateSize.getValue(column); + } + public static Builder builder() { return new Builder(); } @@ -306,6 +320,8 @@ public static class Builder { private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH; private final ColumnProperty.Builder bloomFilterNDVs; private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES; + private final ColumnProperty.Builder dynamicBloomFilterEnabled; + private final ColumnProperty.Builder bloomFilterCandidateSize; private final ColumnProperty.Builder bloomFilterEnabled; private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; @@ -315,6 +331,8 @@ private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); bloomFilterEnabled = ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED); bloomFilterNDVs = ColumnProperty.builder().withDefaultValue(null); + dynamicBloomFilterEnabled = ColumnProperty.builder().withDefaultValue(DEFAULT_DYNAMIC_BLOOM_FILTER_ENABLED); + bloomFilterCandidateSize = ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATE_SIZE); } private Builder(ParquetProperties toCopy) { @@ -331,6 +349,8 @@ private Builder(ParquetProperties toCopy) { this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs); this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled); + this.dynamicBloomFilterEnabled = ColumnProperty.builder(toCopy.dynamicBloomFilterEnabled); + this.bloomFilterCandidateSize = ColumnProperty.builder(toCopy.bloomFilterCandidateSize); this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; this.enableByteStreamSplit = toCopy.enableByteStreamSplit; } @@ -484,6 +504,17 @@ public Builder withBloomFilterEnabled(boolean enabled) { return this; } + public Builder withDynamicBloomFilterEnabled(String columnPath, boolean enabled) { + this.dynamicBloomFilterEnabled.withDefaultValue(enabled); + return this; + } + + public Builder withBloomFilterCandidateSize(String columnPath, int size) { + Preconditions.checkArgument(size > 0, "Invalid candidate size for column \"%s\": %d", columnPath, size); + this.bloomFilterCandidateSize.withDefaultValue(size); + return this; + } + /** * Enable or disable the bloom filter for the specified column. * One may either disable bloom filters for all columns by invoking {@link #withBloomFilterEnabled(boolean)} with a diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index a5e7836b3a..8cfdace8de 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -237,10 +237,11 @@ private void sizeCheck() { } else { rowCountForNextRowCountCheck = min(rowCountForNextRowCountCheck, writer.getRowsWrittenSoFar() + pageRowCountLimit); } + //estimate remaining row count by previous input for next row count check long rowsToFillPage = usedMem == 0 ? props.getMaxRowCountForPageSizeCheck() - : (long) rows / usedMem * remainingMem; + : rows * remainingMem / usedMem; if (rowsToFillPage < minRecordToWait) { minRecordToWait = rowsToFillPage; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index fce085fe50..1049e7d22e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -31,11 +31,11 @@ import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; +import org.apache.parquet.column.values.bloomfilter.DynamicBlockBloomFilter; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.parquet.bytes.BytesInput; /** * Base implementation for {@link ColumnWriter} to be extended to specialize for V1 and V2 pages. @@ -95,7 +95,13 @@ abstract class ColumnWriterBase implements ColumnWriter { int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), BlockSplitBloomFilter.DEFAULT_FPP); this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize); } else { - this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize); + boolean useDynamicBloomFilter = props.getDynamicBloomFilterEnabled(path); + int candidateSize = props.getBloomFilterCandidateSize(path); + if(useDynamicBloomFilter) { + this.bloomFilter = new DynamicBlockBloomFilter(maxBloomFilterSize, candidateSize, BlockSplitBloomFilter.DEFAULT_FPP, path); + } else { + this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize); + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java index 6cd5395c7f..7715c079c6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java @@ -54,9 +54,13 @@ private BinaryStatistics(BinaryStatistics other) { @Override public void updateStats(Binary value) { if (!this.hasNonNullValue()) { - initializeStats(value, value); - } else { - updateStats(value, value); + min = value.copy(); + max = value.copy(); + this.markAsNotEmpty(); + } else if (comparator().compare(min, value) > 0) { + min = value.copy(); + } else if (comparator().compare(max, value) < 0) { + max = value.copy(); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java new file mode 100644 index 0000000000..aaca591e7f --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES; +import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Objects; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.api.Binary; + +/** + * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in + * the candidates at the same time. + * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number + * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then + * remove bad bloom filter candidate during data insertion. + */ +public class DynamicBlockBloomFilter implements BloomFilter { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class); + + // multiple candidates, inserting data at the same time. If the deduplication value is greater than the + // expected NDV of candidate, it will be removed and finally choose the smallest candidate to write out. + private final TreeSet candidates = new TreeSet<>(); + + // the largest among candidates and used as an approximate deduplication counter + private BloomFilterCandidate maxCandidate; + + // the accumulator of the number of distinct values that have been inserted so far + private int distinctValueCounter = 0; + + // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted + private boolean finalized = false; + + private int maximumBytes = UPPER_BOUND_BYTES; + private int minimumBytes = LOWER_BOUND_BYTES; + // the hash strategy used in this bloom filter. + private final HashStrategy hashStrategy; + // the column to build bloom filter + private ColumnDescriptor column; + + public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) { + this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column); + } + + public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) { + this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column); + } + + public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy, + double fpp, int candidatesNum, ColumnDescriptor column) { + if (minimumBytes > maximumBytes) { + throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes"); + } + + if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) { + this.minimumBytes = minimumBytes; + } + + if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) { + this.maximumBytes = maximumBytes; + } + this.column = column; + switch (hashStrategy) { + case XXH64: + this.hashStrategy = hashStrategy; + break; + default: + throw new RuntimeException("Unsupported hash strategy"); + } + initCandidates(numBytes, candidatesNum, fpp); + } + + /** + * Given the maximum acceptable bytes size of bloom filter, generate candidates according + * to the max expected distinct values. The size of the candidate bytes needs to be a + * square number of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc. + * + * @param maxBytes the maximum acceptable bit size + * @param candidatesNum the number of candidates + * @param fpp the false positive probability + */ + private void initCandidates(int maxBytes, int candidatesNum, double fpp) { + int candidateByteSize = calculateTwoPowerSize(maxBytes); + for (int i = 1; i <= candidatesNum; i++) { + int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp); + // `candidateByteSize` is too small, just drop it + if (candidateExpectedNDV <= 0) { + break; + } + BloomFilterCandidate candidate = + new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy); + candidates.add(candidate); + candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2); + } + maxCandidate = candidates.last(); + } + + /** + * According to the size of bytes, calculate the expected number of distinct values. + * The expected number result may be slightly smaller than what `numBytes` can support. + * + * @param numBytes the bytes size + * @param fpp the false positive probability + * @return the expected number of distinct values + */ + private int expectedNDV(int numBytes, double fpp) { + int expectedNDV = 0; + int optimalBytes = 0; + int step = 500; + while (optimalBytes < numBytes) { + expectedNDV += step; + optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp); + } + // make sure it is slightly smaller than what `numBytes` can support + expectedNDV -= step; + + // numBytes is too small, 256 Bytes -> 1 NDV + if (expectedNDV <= 0 && numBytes >= 256) { + expectedNDV = 1; + } else if (expectedNDV <= 0) { + expectedNDV = 0; + } + return expectedNDV; + } + + /** + * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]] + * + * @param numBytes the bytes size + * @return power of 2 less than or equal to numBytes + */ + private int calculateTwoPowerSize(int numBytes) { + if (numBytes < minimumBytes) { + numBytes = minimumBytes; + } + // Get power of 2 less than or equal to numBytes if it is not power of 2. + if ((numBytes & (numBytes - 1)) != 0) { + numBytes = Integer.highestOneBit(numBytes) >> 1; + } + if (numBytes > maximumBytes || numBytes < 0) { + numBytes = maximumBytes; + } + if (numBytes < minimumBytes) { + numBytes = minimumBytes; + } + return numBytes; + } + + /** + * Used at the end of the insertion, select the candidate of the smallest size. + * At least one of the largest candidates will be kept when inserting data. + * + * @return the smallest candidate + */ + protected BloomFilterCandidate optimalCandidate() { + return candidates.stream() + .min(BloomFilterCandidate::compareTo).get(); + } + + protected TreeSet getCandidates() { + return candidates; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + finalized = true; + BloomFilterCandidate optimalBloomFilter = optimalCandidate(); + optimalBloomFilter.bloomFilter.writeTo(out); + String columnName = column != null && column.getPath() != null ? Arrays.toString(column.getPath()) : "unknown"; + LOG.info("The number of distinct values in {} is approximately {}, the optimal bloom filter NDV is {}, byte size is {}.", + columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(), + optimalBloomFilter.bloomFilter.getBitsetSize()); + } + + /** + * Insert an element to the multiple bloom filter candidates and remove the bad candidate + * if the number of distinct values exceeds its expected size. + * + * @param hash the hash result of element. + */ + @Override + public void insertHash(long hash) { + Preconditions.checkArgument(!finalized, + "Dynamic bloom filter insertion has been mark as finalized, no more data is allowed!"); + if (!maxCandidate.bloomFilter.findHash(hash)) { + distinctValueCounter++; + } + // distinct values exceed the expected size, remove the bad bloom filter (leave at least the max bloom filter candidate) + candidates.removeIf(candidate -> candidate.getExpectedNDV() < distinctValueCounter && candidate != maxCandidate); + candidates.forEach(candidate -> candidate.getBloomFilter().insertHash(hash)); + } + + @Override + public int getBitsetSize() { + return optimalCandidate().getBloomFilter().getBitsetSize(); + } + + @Override + public boolean findHash(long hash) { + return maxCandidate.bloomFilter.findHash(hash); + } + + @Override + public long hash(Object value) { + return maxCandidate.bloomFilter.hash(value); + } + + @Override + public HashStrategy getHashStrategy() { + return maxCandidate.bloomFilter.getHashStrategy(); + } + + @Override + public Algorithm getAlgorithm() { + return maxCandidate.bloomFilter.getAlgorithm(); + } + + @Override + public Compression getCompression() { + return maxCandidate.bloomFilter.getCompression(); + } + + @Override + public long hash(int value) { + return maxCandidate.bloomFilter.hash(value); + } + + @Override + public long hash(long value) { + return maxCandidate.bloomFilter.hash(value); + } + + @Override + public long hash(double value) { + return maxCandidate.bloomFilter.hash(value); + } + + @Override + public long hash(float value) { + return maxCandidate.bloomFilter.hash(value); + } + + @Override + public long hash(Binary value) { + return maxCandidate.bloomFilter.hash(value); + } + + protected class BloomFilterCandidate implements Comparable { + private BlockSplitBloomFilter bloomFilter; + private int expectedNDV; + + public BloomFilterCandidate(int expectedNDV, int candidateBytes, + int minimumBytes, int maximumBytes, HashStrategy hashStrategy) { + this.bloomFilter = new BlockSplitBloomFilter(candidateBytes, minimumBytes, maximumBytes, hashStrategy); + this.expectedNDV = expectedNDV; + } + + public BlockSplitBloomFilter getBloomFilter() { + return bloomFilter; + } + + public void setBloomFilter(BlockSplitBloomFilter bloomFilter) { + this.bloomFilter = bloomFilter; + } + + public int getExpectedNDV() { + return expectedNDV; + } + + public void setExpectedNDV(int expectedNDV) { + this.expectedNDV = expectedNDV; + } + + @Override + public int compareTo(BloomFilterCandidate o) { + return this.bloomFilter.getBitsetSize() - o.bloomFilter.getBitsetSize(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BloomFilterCandidate that = (BloomFilterCandidate) o; + return expectedNDV == that.expectedNDV && + Objects.equals(bloomFilter, that.bloomFilter); + } + + @Override + public int hashCode() { + return Objects.hash(bloomFilter, expectedNDV); + } + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java index 2999f3cdc4..c4a985224f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -81,7 +81,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req protected boolean dictionaryTooBig; /* current size in bytes the dictionary will take once serialized */ - protected int dictionaryByteSize; + protected long dictionaryByteSize; /* size in bytes of the dictionary at the end of last dictionary encoded page (in case the current page falls back to PLAIN) */ protected int lastUsedDictionaryByteSize; @@ -173,7 +173,7 @@ public BytesInput getBytes() { BytesInput bytes = concat(BytesInput.from(bytesHeader), rleEncodedBytes); // remember size of dictionary when we last wrote a page lastUsedDictionarySize = getDictionarySize(); - lastUsedDictionaryByteSize = dictionaryByteSize; + lastUsedDictionaryByteSize = Math.toIntExact(dictionaryByteSize); return bytes; } catch (IOException e) { throw new ParquetEncodingException("could not encode the values", e); @@ -249,7 +249,7 @@ public void writeBytes(Binary v) { id = binaryDictionaryContent.size(); binaryDictionaryContent.put(v.copy(), id); // length as int (4 bytes) + actual bytes - dictionaryByteSize += 4 + v.length(); + dictionaryByteSize += 4L + v.length(); } encodedValues.add(id); } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index cf6a1cadd9..52a8e50926 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -37,7 +37,8 @@ * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long) */ public class RowRanges { - private static class Range { + // Make it public because some uppler layer application need to access it + public static class Range { // Returns the union of the two ranges or null if there are elements between them. private static Range union(Range left, Range right) { @@ -63,8 +64,8 @@ private static Range intersection(Range left, Range right) { return null; } - final long from; - final long to; + public final long from; + public final long to; // Creates a range of [from, to] (from and to are inclusive; empty ranges are not valid) Range(long from, long to) { @@ -299,6 +300,10 @@ public boolean isOverlapping(long from, long to) { (r1, r2) -> r1.isBefore(r2) ? -1 : r1.isAfter(r2) ? 1 : 0) >= 0; } + public List getRanges() { + return Collections.unmodifiableList(ranges); + } + @Override public String toString() { return ranges.toString(); diff --git a/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java new file mode 100644 index 0000000000..9c0cbe3999 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.ParquetRuntimeException; + +public class InvalidFileOffsetException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public InvalidFileOffsetException(String message) { + super(message); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java index 6adabe79d8..9403200995 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java @@ -144,6 +144,38 @@ public void testBloomFilterFPPAccuracy() { assertTrue(exist < totalCount * (FPP * 1.2)); } + @Test + public void testDynamicBloomFilter() { + int maxBloomFilterSize = 1024 * 1024; + int candidateSize = 10; + DynamicBlockBloomFilter dynamicBloomFilter = new DynamicBlockBloomFilter(maxBloomFilterSize, + candidateSize, 0.01, null); + + assertEquals(candidateSize, dynamicBloomFilter.getCandidates().size()); + + Set existedValue = new HashSet<>(); + while (existedValue.size() < 10000) { + String str = RandomStringUtils.randomAlphabetic(1, 64); + dynamicBloomFilter.insertHash(dynamicBloomFilter.hash(Binary.fromString(str))); + existedValue.add(str); + } + // removed some small bloom filter + assertEquals(4, dynamicBloomFilter.getCandidates().size()); + BlockSplitBloomFilter optimalCandidate = dynamicBloomFilter.optimalCandidate().getBloomFilter(); + for (String value : existedValue) { + assertTrue(optimalCandidate.findHash(optimalCandidate.hash(Binary.fromString(value)))); + } + + int maxCandidateNDV = dynamicBloomFilter.getCandidates().last().getExpectedNDV(); + while (existedValue.size() < maxCandidateNDV + 1) { + String str = RandomStringUtils.randomAlphabetic(1, 64); + dynamicBloomFilter.insertHash(dynamicBloomFilter.hash(Binary.fromString(str))); + existedValue.add(str); + } + // the number of distinct value exceeds the maximum candidate's expected NDV, so only the maximum candidate is kept + assertEquals(1, dynamicBloomFilter.getCandidates().size()); + } + @Test public void testEquals() { final String[] words = {"hello", "parquet", "bloom", "filter"}; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java index 2783b696d5..174fad8918 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java @@ -53,6 +53,7 @@ import org.apache.parquet.column.values.plain.PlainValuesWriter; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.mockito.Mockito; public class TestDictionary { @@ -171,6 +172,20 @@ public void testBinaryDictionaryFallBack() throws IOException { assertEquals(0, cw.getBufferedSize()); } + @Test + public void testBinaryDictionaryIntegerOverflow() { + Binary mock = Mockito.mock(Binary.class); + Mockito.when(mock.length()).thenReturn(Integer.MAX_VALUE - 1); + // make the writer happy + Mockito.when(mock.copy()).thenReturn(Binary.fromString(" world")); + + final ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(100, 100); + cw.writeBytes(Binary.fromString("hello")); + cw.writeBytes(mock); + + assertEquals(PLAIN, cw.getEncoding()); + } + @Test public void testBinaryDictionaryChangedValues() throws IOException { int COUNT = 100; diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml index c4b944b3a2..30c5934b76 100644 --- a/parquet-common/pom.xml +++ b/parquet-common/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-encoding/pom.xml b/parquet-encoding/pom.xml index ac53aa6904..6bee29a8bd 100644 --- a/parquet-encoding/pom.xml +++ b/parquet-encoding/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml index d0aa0ee85a..cf422964ed 100644 --- a/parquet-format-structures/pom.xml +++ b/parquet-format-structures/pom.xml @@ -24,7 +24,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 parquet-format-structures diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/InvalidParquetMetadataException.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/InvalidParquetMetadataException.java new file mode 100644 index 0000000000..c0852bbbd2 --- /dev/null +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/InvalidParquetMetadataException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.format; + +/** + * A specific RuntimeException thrown when invalid values are found in the Parquet file metadata (including the footer, + * page header etc.). + */ +public class InvalidParquetMetadataException extends RuntimeException { + InvalidParquetMetadataException(String message) { + super(message); + } +} diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/MetadataValidator.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/MetadataValidator.java new file mode 100644 index 0000000000..b3738ec48f --- /dev/null +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/MetadataValidator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.format; + +/** + * Utility class to validate different types of Parquet metadata (e.g. footer, page headers etc.). + */ +public class MetadataValidator { + + static PageHeader validate(PageHeader pageHeader) { + int compressed_page_size = pageHeader.getCompressed_page_size(); + validateValue(compressed_page_size >= 0, + String.format("Compressed page size must not be negative but was: %s", compressed_page_size)); + return pageHeader; + } + + private static void validateValue(boolean valid, String message) { + if (!valid) { + throw new InvalidParquetMetadataException(message); + } + } + + private MetadataValidator() { + // Private constructor to prevent instantiation + } + +} diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java index 36aaf7494e..cb62325f99 100644 --- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java @@ -129,7 +129,7 @@ public static PageHeader readPageHeader(InputStream from) throws IOException { public static PageHeader readPageHeader(InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { - return read(from, new PageHeader(), decryptor, AAD); + return MetadataValidator.validate(read(from, new PageHeader(), decryptor, AAD)); } public static void writeFileMetaData(org.apache.parquet.format.FileMetaData fileMetadata, diff --git a/parquet-format-structures/src/test/java/org/apache/parquet/format/TestUtil.java b/parquet-format-structures/src/test/java/org/apache/parquet/format/TestUtil.java index 1adf0998fb..685e2514b0 100644 --- a/parquet-format-structures/src/test/java/org/apache/parquet/format/TestUtil.java +++ b/parquet-format-structures/src/test/java/org/apache/parquet/format/TestUtil.java @@ -23,13 +23,16 @@ import static junit.framework.Assert.assertNull; import static org.apache.parquet.format.Util.readFileMetaData; import static org.apache.parquet.format.Util.writeFileMetaData; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import org.junit.Test; - import org.apache.parquet.format.Util.DefaultFileMetaDataConsumer; + public class TestUtil { @Test @@ -77,6 +80,21 @@ public void testReadFileMetadata() throws Exception { assertEquals(md, md6); } + @Test + public void testInvalidPageHeader() throws IOException { + PageHeader ph = new PageHeader(PageType.DATA_PAGE, 100, -50); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Util.writePageHeader(ph, out); + + try { + Util.readPageHeader(in(out)); + fail("Expected exception but did not thrown"); + } catch (InvalidParquetMetadataException e) { + assertTrue("Exception message does not contain the expected parts", + e.getMessage().contains("Compressed page size")); + } + } + private ByteArrayInputStream in(ByteArrayOutputStream baos) { return new ByteArrayInputStream(baos.toByteArray()); } diff --git a/parquet-generator/pom.xml b/parquet-generator/pom.xml index 694fbf547e..61ba66b8dc 100644 --- a/parquet-generator/pom.xml +++ b/parquet-generator/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-hadoop-bundle/pom.xml b/parquet-hadoop-bundle/pom.xml index 6e1950a536..8efb3318e1 100644 --- a/parquet-hadoop-bundle/pom.xml +++ b/parquet-hadoop-bundle/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 5e61b31031..1c53f21d14 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java index d98416445f..c5e8d521af 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java @@ -19,9 +19,13 @@ package org.apache.parquet.filter2.bloomfilterlevel; +import static org.apache.parquet.Preconditions.checkNotNull; + import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +37,6 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; -import static org.apache.parquet.Preconditions.checkNotNull; - public class BloomFilterImpl implements FilterPredicate.Visitor{ private static final Logger LOG = LoggerFactory.getLogger(BloomFilterImpl.class); private static final boolean BLOCK_MIGHT_MATCH = false; @@ -42,12 +44,27 @@ public class BloomFilterImpl implements FilterPredicate.Visitor{ private final Map columns = new HashMap(); + public static boolean canDropWithInfo(FilterPredicate pred, List columns, + BloomFilterReader bloomFilterReader, AtomicInteger bloomInfo) { + checkNotNull(pred, "pred"); + checkNotNull(columns, "columns"); + return pred.accept(new BloomFilterImpl(columns, bloomFilterReader, bloomInfo)); + } + public static boolean canDrop(FilterPredicate pred, List columns, BloomFilterReader bloomFilterReader) { checkNotNull(pred, "pred"); checkNotNull(columns, "columns"); return pred.accept(new BloomFilterImpl(columns, bloomFilterReader)); } + private BloomFilterImpl(List columnsList, BloomFilterReader bloomFilterReader, AtomicInteger bloomInfo) { + for (ColumnChunkMetaData chunk : columnsList) { + columns.put(chunk.getPath(), chunk); + } + this.bloomFilterReader = bloomFilterReader; + this.bloomInfo = bloomInfo; + } + private BloomFilterImpl(List columnsList, BloomFilterReader bloomFilterReader) { for (ColumnChunkMetaData chunk : columnsList) { columns.put(chunk.getPath(), chunk); @@ -56,6 +73,8 @@ private BloomFilterImpl(List columnsList, BloomFilterReader this.bloomFilterReader = bloomFilterReader; } + private AtomicInteger bloomInfo = new AtomicInteger(0); + private BloomFilterReader bloomFilterReader; private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) { @@ -82,6 +101,10 @@ public > Boolean visit(Operators.Eq eq) { try { BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta); + if (bloomFilter != null) { + // use bloom + bloomInfo.set(1); + } if (bloomFilter != null && !bloomFilter.findHash(bloomFilter.hash(value))) { return BLOCK_CANNOT_MATCH; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/QueryMetrics.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/QueryMetrics.java new file mode 100644 index 0000000000..ed9a547b3f --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/QueryMetrics.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.filter2.compat; + +public class QueryMetrics { + private String skipBloomFilter = ""; + private long skipBloomBlocks = 0; + private long skipBloomRows = 0; + private long totalBloomBlocks = 0; + private long totalPagesCount = 0; + private long filteredPagesCount = 0; + private long afterFilterPagesCount = 0; + + public String getSkipBloomFilter() { + return skipBloomFilter; + } + + public void setSkipBloomFilter(String skipBloomFilter) { + this.skipBloomFilter = skipBloomFilter; + } + + public long getSkipBloomBlocks() { + return skipBloomBlocks; + } + + public void setSkipBloomBlocks(long skipBloomBlocks) { + this.skipBloomBlocks = skipBloomBlocks; + } + + public long getSkipBloomRows() { + return skipBloomRows; + } + + public void setSkipBloomRows(long skipBloomRows) { + this.skipBloomRows = skipBloomRows; + } + + public long getTotalBloomBlocks() { + return totalBloomBlocks; + } + + public void setTotalBloomBlocks(long totalBloomBlocks) { + this.totalBloomBlocks = totalBloomBlocks; + } + + public long getTotalPagesCount() { + return totalPagesCount; + } + + public void setTotalPagesCount(long totalPagesCount) { + this.totalPagesCount = totalPagesCount; + } + + public long getFilteredPagesCount() { + return filteredPagesCount; + } + + public void setFilteredPagesCount(long filteredPagesCount) { + this.filteredPagesCount = filteredPagesCount; + } + + public long getAfterFilterPagesCount() { + return afterFilterPagesCount; + } + + public void setAfterFilterPagesCount(long afterFilterPagesCount) { + this.afterFilterPagesCount = afterFilterPagesCount; + } + + public void logParquetPageFilter(long total, long afterFilter) { + totalPagesCount += total; + filteredPagesCount += (total - afterFilter); + afterFilterPagesCount += afterFilter; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java index 73ff9aae67..77e23ed3e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java @@ -22,6 +22,10 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.parquet.filter2.bloomfilterlevel.BloomFilterImpl; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -29,13 +33,12 @@ import org.apache.parquet.filter2.compat.FilterCompat.Visitor; import org.apache.parquet.filter2.dictionarylevel.DictionaryFilter; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator; import org.apache.parquet.filter2.statisticslevel.StatisticsFilter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Given a {@link Filter} applies it to a list of BlockMetaData (row groups) @@ -43,10 +46,14 @@ * no filtering will be performed. */ public class RowGroupFilter implements Visitor> { + + public static Logger LOGGER = LoggerFactory.getLogger(RowGroupFilter.class); + private final List blocks; private final MessageType schema; private final List levels; private final ParquetFileReader reader; + private QueryMetrics queryMetrics = new QueryMetrics(); public enum FilterLevel { STATISTICS, @@ -72,6 +79,12 @@ public static List filterRowGroups(List levels, Filt return filter.accept(new RowGroupFilter(levels, blocks, reader)); } + public static List filterRowGroups(List levels, Filter filter, List blocks, + ParquetFileReader reader, QueryMetrics queryMetrics) { + Objects.requireNonNull(filter, "filter cannot be null"); + return filter.accept(new RowGroupFilter(levels, blocks, reader, queryMetrics)); + } + @Deprecated private RowGroupFilter(List blocks, MessageType schema) { this.blocks = Objects.requireNonNull(blocks, "blocks cannnot be null"); @@ -87,6 +100,15 @@ private RowGroupFilter(List levels, List blocks, Par this.levels = levels; } + private RowGroupFilter(List levels, List blocks, ParquetFileReader reader, + QueryMetrics queryMetrics) { + this.blocks = Objects.requireNonNull(blocks, "blocks cannnot be null"); + this.reader = Objects.requireNonNull(reader, "reader cannnot be null"); + this.schema = reader.getFileMetaData().getSchema(); + this.levels = levels; + this.queryMetrics = queryMetrics; + } + @Override public List visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) { FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate(); @@ -95,7 +117,7 @@ public List visit(FilterCompat.FilterPredicateCompat filterPredic SchemaCompatibilityValidator.validate(filterPredicate, schema); List filteredBlocks = new ArrayList(); - + long start = System.currentTimeMillis(); for (BlockMetaData block : blocks) { boolean drop = false; @@ -108,14 +130,27 @@ public List visit(FilterCompat.FilterPredicateCompat filterPredic } if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) { - drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block)); + AtomicInteger bloomInfo = new AtomicInteger(0); + drop = BloomFilterImpl.canDropWithInfo(filterPredicate, block.getColumns(), + reader.getBloomFilterDataReader(block), bloomInfo); + if (bloomInfo.get() != 0) { + this.queryMetrics.setTotalBloomBlocks(this.queryMetrics.getTotalBloomBlocks() + 1); + if (drop) { + this.queryMetrics.setSkipBloomFilter(this.queryMetrics.getSkipBloomFilter() + filterPredicateCompat); + this.queryMetrics.setSkipBloomRows(this.queryMetrics.getSkipBloomRows() + block.getRowCount()); + this.queryMetrics.setSkipBloomBlocks(this.queryMetrics.getSkipBloomBlocks() + 1); + } + } } if(!drop) { filteredBlocks.add(block); } } - + long end = System.currentTimeMillis(); + if ((end - start) > 100) { + LOGGER.warn("Reading RowGroupFilter costs much time : " + (end - start)); + } return filteredBlocks; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 3a10b1c895..3c6e32cfde 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -119,10 +120,10 @@ import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.InvalidFileOffsetException; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; -import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; @@ -201,8 +202,22 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque List blocks = parquetMetadata.getBlocks(); List rowGroups = new ArrayList(); long numRows = 0; + long preBlockStartPos = 0; + long preBlockCompressedSize = 0; for (BlockMetaData block : blocks) { numRows += block.getRowCount(); + long blockStartPos = block.getStartingPos(); + // first block + if (blockStartPos == 4) { + preBlockStartPos = 0; + preBlockCompressedSize = 0; + } + if (preBlockStartPos != 0) { + Preconditions.checkState(blockStartPos >= preBlockStartPos + preBlockCompressedSize, + "Invalid block starting position:" + blockStartPos); + } + preBlockStartPos = blockStartPos; + preBlockCompressedSize = block.getCompressedSize(); addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor); } FileMetaData fileMetaData = new FileMetaData( @@ -1226,14 +1241,36 @@ public ParquetMetadata readParquetMetadata(InputStream from) throws IOException static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); + long preStartIndex = 0; + long preCompressedSize = 0; + boolean firstColumnWithMetadata = true; + if (rowGroups != null && rowGroups.size() > 0) { + firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data(); + } for (RowGroup rowGroup : rowGroups) { long totalSize = 0; long startIndex; + ColumnChunk columnChunk = rowGroup.getColumns().get(0); + if (firstColumnWithMetadata) { + startIndex = getOffset(columnChunk); + } else { + assert rowGroup.isSetFile_offset(); + assert rowGroup.isSetTotal_compressed_size(); - if (rowGroup.isSetFile_offset()) { + //the file_offset of first block always holds the truth, while other blocks don't : + //see PARQUET-2078 for details startIndex = rowGroup.getFile_offset(); - } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); + if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { + //first row group's offset is always 4 + if (preStartIndex == 0) { + startIndex = 4; + } else { + // use minStartIndex(imprecise in case of padding, but good enough for filtering) + startIndex = preStartIndex + preCompressedSize; + } + } + preStartIndex = startIndex; + preCompressedSize = rowGroup.getTotal_compressed_size(); } if (rowGroup.isSetTotal_compressed_size()) { @@ -1254,16 +1291,59 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet return metaData; } + private static boolean invalidFileOffset(long startIndex, long preStartIndex, long preCompressedSize) { + boolean invalid = false; + assert preStartIndex <= startIndex; + // checking the first rowGroup + if (preStartIndex == 0 && startIndex != 4) { + invalid = true; + return invalid; + } + + //calculate start index for other blocks + long minStartIndex = preStartIndex + preCompressedSize; + if (startIndex < minStartIndex) { + // a bad offset detected, try first column's offset + // can not use minStartIndex in case of padding + invalid = true; + } + + return invalid; + } + // Visible for testing static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); + long preStartIndex = 0; + long preCompressedSize = 0; + boolean firstColumnWithMetadata = true; + if (rowGroups != null && rowGroups.size() > 0) { + firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data(); + } for (RowGroup rowGroup : rowGroups) { long startIndex; - if (rowGroup.isSetFile_offset()) { - startIndex = rowGroup.getFile_offset(); + ColumnChunk columnChunk = rowGroup.getColumns().get(0); + if (firstColumnWithMetadata) { + startIndex = getOffset(columnChunk); } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); + assert rowGroup.isSetFile_offset(); + assert rowGroup.isSetTotal_compressed_size(); + + //the file_offset of first block always holds the truth, while other blocks don't : + //see PARQUET-2078 for details + startIndex = rowGroup.getFile_offset(); + if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { + //first row group's offset is always 4 + if (preStartIndex == 0) { + startIndex = 4; + } else { + throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " + + "please use file range instead of block offset for split."); + } + } + preStartIndex = startIndex; + preCompressedSize = rowGroup.getTotal_compressed_size(); } if (filter.contains(startIndex)) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index f0e7af35c5..513e71295c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -89,7 +89,7 @@ public static CodecFactory createDirectCodecFactory(Configuration config, ByteBu class HeapBytesDecompressor extends BytesDecompressor { - private final CompressionCodec codec; + final CompressionCodec codec; private final Decompressor decompressor; HeapBytesDecompressor(CompressionCodecName codecName) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 3d1bafe0a5..fc17edcb5e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -40,11 +40,13 @@ import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.hadoop.codec.SnappyCodec; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; /** * TODO: should this actually be called RowGroupImpl or something? @@ -72,14 +74,14 @@ static final class ColumnChunkPageReader implements PageReader { private final OffsetIndex offsetIndex; private final long rowCount; private int pageIndex = 0; - + private final BlockCipher.Decryptor blockDecryptor; private final byte[] dataPageAAD; private final byte[] dictionaryPageAAD; ColumnChunkPageReader(BytesInputDecompressor decompressor, List compressedPages, DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount, - BlockCipher.Decryptor blockDecryptor, byte[] fileAAD, + BlockCipher.Decryptor blockDecryptor, byte[] fileAAD, int rowGroupOrdinal, int columnOrdinal) { this.decompressor = decompressor; this.compressedPages = new ArrayDeque(compressedPages); @@ -91,9 +93,9 @@ static final class ColumnChunkPageReader implements PageReader { this.valueCount = count; this.offsetIndex = offsetIndex; this.rowCount = rowCount; - + this.blockDecryptor = blockDecryptor; - + if (null != blockDecryptor) { dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0); dictionaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage, rowGroupOrdinal, columnOrdinal, -1); @@ -102,12 +104,12 @@ static final class ColumnChunkPageReader implements PageReader { dictionaryPageAAD = null; } } - + private int getPageOrdinal(int currentPageIndex) { if (null == offsetIndex) { return currentPageIndex; } - + return offsetIndex.getPageOrdinal(currentPageIndex); } @@ -123,11 +125,11 @@ public DataPage readPage() { return null; } final int currentPageIndex = pageIndex++; - + if (null != blockDecryptor) { AesCipher.quickUpdatePageAAD(dataPageAAD, getPageOrdinal(currentPageIndex)); } - + return compressedPage.accept(new DataPage.Visitor() { @Override public DataPage visit(DataPageV1 dataPageV1) { @@ -136,8 +138,17 @@ public DataPage visit(DataPageV1 dataPageV1) { if (null != blockDecryptor) { bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD)); } - BytesInput decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize()); - + + BytesInput decompressed; + if (decompressor instanceof CodecFactory.HeapBytesDecompressor &&(((CodecFactory.HeapBytesDecompressor) decompressor).codec instanceof SnappyCodec)) { + byte[] bytesArray = bytes.toByteArray(); + byte[] decompressedBytes = new byte[dataPageV1.getUncompressedSize()]; + Snappy.uncompress(bytesArray, 0, bytesArray.length, decompressedBytes, 0); + decompressed = BytesInput.from(decompressedBytes); + } else { + decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()); + } + final DataPageV1 decompressedPage; if (offsetIndex == null) { decompressedPage = new DataPageV1( @@ -176,7 +187,7 @@ public DataPage visit(DataPageV2 dataPageV2) { return dataPageV2; } BytesInput pageBytes = dataPageV2.getData(); - + if (null != blockDecryptor) { try { pageBytes = BytesInput.from(blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD)); @@ -195,7 +206,7 @@ public DataPage visit(DataPageV2 dataPageV2) { throw new ParquetDecodingException("could not decompress page", e); } } - + if (offsetIndex == null) { return DataPageV2.uncompressed( dataPageV2.getRowCount(), @@ -218,7 +229,7 @@ public DataPage visit(DataPageV2 dataPageV2) { pageBytes, dataPageV2.getStatistics()); } - } + } }); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index d2e505babc..ecb2cf54a8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -65,7 +65,7 @@ class InternalParquetRecordWriter { private ColumnChunkPageWriteStore pageStore; private BloomFilterWriteStore bloomFilterWriteStore; private RecordConsumer recordConsumer; - + private InternalFileEncryptor fileEncryptor; private int rowGroupOrdinal; @@ -78,14 +78,14 @@ class InternalParquetRecordWriter { * @param compressor the codec used to compress */ public InternalParquetRecordWriter( - ParquetFileWriter parquetFileWriter, - WriteSupport writeSupport, - MessageType schema, - Map extraMetaData, - long rowGroupSize, - BytesCompressor compressor, - boolean validating, - ParquetProperties props) { + ParquetFileWriter parquetFileWriter, + WriteSupport writeSupport, + MessageType schema, + Map extraMetaData, + long rowGroupSize, + BytesCompressor compressor, + boolean validating, + ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null"); this.schema = schema; @@ -148,7 +148,8 @@ public long getDataSize() { } private void checkBlockSizeReached() throws IOException { - if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. + if (recordCount >= recordCountForNextMemCheck || writeSupport.needCheckRowSize) { // checking the memory size is relatively expensive, so let's not do it for every record. + writeSupport.needCheckRowSize = false; long memSize = columnStore.getBufferedSize(); long recordSize = memSize / recordCount; // flush the row group if it is within ~2 records of the limit @@ -172,7 +173,7 @@ private void checkBlockSizeReached() throws IOException { } private void flushRowGroupToStore() - throws IOException { + throws IOException { recordConsumer.flush(); LOG.debug("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize()); if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) { @@ -187,8 +188,8 @@ private void flushRowGroupToStore() recordCount = 0; parquetFileWriter.endBlock(); this.nextRowGroupSize = Math.min( - parquetFileWriter.getNextRowGroupSize(), - rowGroupSizeThreshold); + parquetFileWriter.getNextRowGroupSize(), + rowGroupSizeThreshold); } columnStore = null; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java index fa25943b82..2edc585c4b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java @@ -55,7 +55,7 @@ public static Offsets getOffsets(SeekableInputStream input, ColumnChunkMetaData * (0 cannot be a valid offset because of the MAGIC bytes) * - The firstDataPageOffset might point to the dictionary page */ - dictionaryPageSize = readDictionaryPageSize(input, newChunkStart); + dictionaryPageSize = readDictionaryPageSize(input, chunk); } else { dictionaryPageSize = chunk.getFirstDataPageOffset() - chunk.getDictionaryPageOffset(); } @@ -68,12 +68,14 @@ public static Offsets getOffsets(SeekableInputStream input, ColumnChunkMetaData return new Offsets(firstDataPageOffset, dictionaryPageOffset); } - private static long readDictionaryPageSize(SeekableInputStream in, long pos) throws IOException { + private static long readDictionaryPageSize(SeekableInputStream in, ColumnChunkMetaData chunk) throws IOException { long origPos = -1; try { origPos = in.getPos(); + in.seek(chunk.getStartingPos()); + long headerStart = in.getPos(); PageHeader header = Util.readPageHeader(in); - long headerSize = in.getPos() - origPos; + long headerSize = in.getPos() - headerStart; return headerSize + header.getCompressed_page_size(); } finally { if (origPos != -1) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 791f9ef188..e1bf68be2c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -60,6 +60,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience.Private; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.ByteBufferInputStream; @@ -81,6 +85,7 @@ import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; import org.apache.parquet.crypto.ParquetCryptoRuntimeException; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.QueryMetrics; import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.format.BlockCipher; import org.apache.parquet.format.BloomFilterHeader; @@ -113,9 +118,6 @@ import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.yetus.audience.InterfaceAudience.Private; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Internal implementation of the Parquet file reader as a block container @@ -130,6 +132,8 @@ public class ParquetFileReader implements Closeable { private final CRC32 crc; + public QueryMetrics queryMetrics = new QueryMetrics(); + /** * for files provided, check if there's a summary file. * If a summary file is found it is used otherwise the file footer is used. @@ -659,12 +663,12 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options) protected final SeekableInputStream f; private final InputFile file; - private final ParquetReadOptions options; + private ParquetReadOptions options; private final Map paths = new HashMap<>(); private final FileMetaData fileMetaData; // may be null - private final List blocks; - private final List blockIndexStores; - private final List blockRowRanges; + private List blocks; + private List blockIndexStores; + private List blockRowRanges; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -750,17 +754,8 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) this.f = this.file.newStream(); this.fileMetaData = footer.getFileMetaData(); this.fileDecryptor = fileMetaData.getFileDecryptor(); - if (null == fileDecryptor) { - this.options = HadoopReadOptions.builder(conf).build(); - } else { - this.options = HadoopReadOptions.builder(conf) - .withDecryption(fileDecryptor.getDecryptionProperties()) - .build(); - } this.footer = footer; - this.blocks = filterRowGroups(footer.getBlocks()); - this.blockIndexStores = listWithNulls(this.blocks.size()); - this.blockRowRanges = listWithNulls(this.blocks.size()); + resetBlocks(conf); for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } @@ -768,9 +763,11 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) } public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { + long start = System.currentTimeMillis(); this.converter = new ParquetMetadataConverter(options); this.file = file; this.f = file.newStream(); + long openStreamEndTime = System.currentTimeMillis(); this.options = options; try { this.footer = readFooter(file, options, f, converter); @@ -780,6 +777,7 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx f.close(); throw e; } + long footerReadEndtime = System.currentTimeMillis(); this.fileMetaData = footer.getFileMetaData(); this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! if (null != fileDecryptor && fileDecryptor.plaintextFile()) { @@ -793,6 +791,25 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx paths.put(ColumnPath.get(col.getPath()), col); } this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + long filterRowGroupEndTime = System.currentTimeMillis(); + if ((filterRowGroupEndTime - start) > 50) { + LOG.info("open stream costs {} ms, read footer costs {} ms, filter rowGroups cost {} ms", + (openStreamEndTime - start), (footerReadEndtime - openStreamEndTime), + (filterRowGroupEndTime - footerReadEndtime)); + } + } + + public void resetBlocks(Configuration conf) throws IOException { + if (null == fileDecryptor) { + this.options = HadoopReadOptions.builder(conf).build(); + } else { + this.options = HadoopReadOptions.builder(conf) + .withDecryption(fileDecryptor.getDecryptionProperties()) + .build(); + } + this.blocks = filterRowGroups(footer.getBlocks()); + this.blockIndexStores = listWithNulls(this.blocks.size()); + this.blockRowRanges = listWithNulls(this.blocks.size()); } private static List listWithNulls(int size) { @@ -867,7 +884,7 @@ private List filterRowGroups(List blocks) throws I if (options.useBloomFilter()) { levels.add(BLOOMFILTER); } - return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); + return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this, queryMetrics); } return blocks; @@ -916,7 +933,7 @@ public PageReadStore readNextRowGroup() throws IOException { currentParts = new ConsecutivePartList(startingPos); allParts.add(currentParts); } - currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize())); + currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize())); } } // actually read all the chunks @@ -985,6 +1002,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { OffsetIndex filteredOffsetIndex = filterOffsetIndex(offsetIndex, rowRanges, block.getRowCount()); + this.queryMetrics.logParquetPageFilter(offsetIndex.getPageCount(), filteredOffsetIndex.getPageCount()); for (OffsetRange range : calculateOffsetRanges(filteredOffsetIndex, mc, offsetIndex.getOffset(0))) { BenchmarkCounter.incrementTotalBytes(range.getLength()); long startingPos = range.getOffset(); @@ -994,7 +1012,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { allParts.add(currentParts); } ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos, - (int) range.getLength()); + range.getLength()); currentParts.addChunk(chunkDescriptor); builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex); } @@ -1613,7 +1631,7 @@ private static class ChunkDescriptor { private final ColumnDescriptor col; private final ColumnChunkMetaData metadata; private final long fileOffset; - private final int size; + private final long size; /** * @param col column this chunk is part of @@ -1625,7 +1643,7 @@ private ChunkDescriptor( ColumnDescriptor col, ColumnChunkMetaData metadata, long fileOffset, - int size) { + long size) { super(); this.col = col; this.metadata = metadata; @@ -1657,8 +1675,8 @@ public boolean equals(Object obj) { private class ConsecutivePartList { private final long offset; - private int length; - private final List chunks = new ArrayList(); + private long length; + private final List chunks = new ArrayList<>(); /** * @param offset where the first chunk starts @@ -1686,8 +1704,8 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx List result = new ArrayList(chunks.size()); f.seek(offset); - int fullAllocations = length / options.getMaxAllocationSize(); - int lastAllocationSize = length % options.getMaxAllocationSize(); + int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize()); + int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize()); int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); List buffers = new ArrayList<>(numAllocations); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index a246a52c73..8f425bda6b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.parquet.Preconditions; import org.apache.parquet.Version; @@ -62,19 +64,19 @@ import org.apache.parquet.crypto.ModuleCipherFactory; import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; import org.apache.parquet.crypto.ParquetCryptoRuntimeException; -import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; -import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.BlockCipher; import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.internal.column.columnindex.ColumnIndex; @@ -84,14 +86,12 @@ import org.apache.parquet.internal.hadoop.metadata.IndexReference; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.OutputFile; -import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.TypeUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Internal implementation of the Parquet file writer as a block container @@ -793,13 +793,16 @@ void writeColumnChunk(ColumnDescriptor descriptor, // write bloom filter if one of data pages is not dictionary encoded boolean isWriteBloomFilter = false; for (Encoding encoding : dataEncodings) { - if (encoding != Encoding.RLE_DICTIONARY) { + if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { isWriteBloomFilter = true; break; } } if (isWriteBloomFilter) { currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + } else { + LOG.info("skip write bloom filter because column is encoded as dictionary, column path {}", + descriptor.getPath()); } } LOG.debug("{}: write data pages", out.getPos()); @@ -852,6 +855,7 @@ public void endColumn() throws IOException { this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); this.uncompressedLength = 0; this.compressedLength = 0; + this.currentChunkDictionaryPageOffset = 0; columnIndexBuilder = null; offsetIndexBuilder = null; } @@ -887,7 +891,9 @@ public void endBlock() throws IOException { */ @Deprecated public void appendFile(Configuration conf, Path file) throws IOException { - ParquetFileReader.open(conf, file).appendTo(this); + try (ParquetFileReader reader = ParquetFileReader.open(conf, file)) { + reader.appendTo(this); + } } public void appendFile(InputFile file) throws IOException { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 6be27e47bd..7d4d7e10f3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -153,6 +153,8 @@ public static enum JobSummaryLevel { public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + public static final String DYNAMIC_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.dynamic.enabled"; + public static final String BLOOM_FILTER_CANDIDATE_SIZE = "parquet.bloom.filter.candidate.size"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -226,10 +228,12 @@ public static int getBloomFilterMaxBytes(Configuration conf) { public static boolean getBloomFilterEnabled(Configuration conf) { return conf.getBoolean(BLOOM_FILTER_ENABLED, DEFAULT_BLOOM_FILTER_ENABLED); } + public static int getBlockSize(JobContext jobContext) { return getBlockSize(getConfiguration(jobContext)); } + public static int getPageSize(JobContext jobContext) { return getPageSize(getConfiguration(jobContext)); } @@ -458,6 +462,14 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withColumnConfig(BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false), propsBuilder::withBloomFilterEnabled) .withColumnConfig(BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV) + .withColumnConfig( + DYNAMIC_BLOOM_FILTER_ENABLED, + key -> conf.getBoolean(key, ParquetProperties.DEFAULT_DYNAMIC_BLOOM_FILTER_ENABLED), + propsBuilder::withDynamicBloomFilterEnabled) + .withColumnConfig( + BLOOM_FILTER_CANDIDATE_SIZE, + key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATE_SIZE), + propsBuilder::withBloomFilterCandidateSize) .parseConfig(conf); ParquetProperties props = propsBuilder.build(); @@ -474,9 +486,9 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp } WriteContext fileWriteContext = writeSupport.init(conf); - + FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext); - + ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf), fileWriteContext.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(), props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled(), encryptionProperties); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index e1afaca994..18dc2f2e8b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -269,7 +269,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport ParquetFileWriter.Mode mode, WriteSupport writeSupport, CompressionCodecName compressionCodecName, - int rowGroupSize, + long rowGroupSize, boolean validating, Configuration conf, int maxPaddingSize, @@ -355,7 +355,7 @@ public abstract static class Builder> { private Configuration conf = new Configuration(); private ParquetFileWriter.Mode mode; private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; - private int rowGroupSize = DEFAULT_BLOCK_SIZE; + private long rowGroupSize = DEFAULT_BLOCK_SIZE; private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT; private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED; private ParquetProperties.Builder encodingPropsBuilder = @@ -432,8 +432,20 @@ public SELF withEncryption (FileEncryptionProperties encryptionProperties) { * * @param rowGroupSize an integer size in bytes * @return this builder for method chaining. + * @deprecated Use {@link #withRowGroupSize(long)} instead */ + @Deprecated public SELF withRowGroupSize(int rowGroupSize) { + return withRowGroupSize((long) rowGroupSize); + } + + /** + * Set the Parquet format row group size used by the constructed writer. + * + * @param rowGroupSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withRowGroupSize(long rowGroupSize) { this.rowGroupSize = rowGroupSize; return self(); } @@ -615,6 +627,29 @@ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) { return self(); } + /** + * When NDV (number of distinct values) for a specified column is not set, whether to use + * `DynamicBloomFilter` to automatically adjust the BloomFilter size according to `parquet.bloom.filter.max.bytes` + * + * @param columnPath the path of the column (dot-string) + * @param enabled whether to write bloom filter for the column + */ + public SELF withDynamicBloomFilterEnabled(String columnPath, boolean enabled) { + encodingPropsBuilder.withDynamicBloomFilterEnabled(columnPath, enabled); + return self(); + } + + /** + * When `DynamicBloomFilter` is enabled, set how many bloomFilters to split as candidates. + * + * @param columnPath the path of the column (dot-string) + * @param size the candidate size + */ + public SELF withBloomFilterCandidateSize(String columnPath, int size) { + encodingPropsBuilder.withBloomFilterCandidateSize(columnPath, size); + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java index 9549d5f492..f959a81771 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java @@ -36,6 +36,8 @@ */ abstract public class WriteSupport { + public boolean needCheckRowSize = false; + /** * information to be persisted in the file */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 2e0c558930..55f31ee366 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -53,7 +53,7 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) { return 0; } - + if (!outputBuffer.hasRemaining()) { inputBuffer.rewind(); Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 0."); @@ -79,7 +79,7 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc // Return compressed output up to 'len' int numBytes = Math.min(len, outputBuffer.remaining()); outputBuffer.get(buffer, off, numBytes); - return numBytes; + return numBytes; } /** @@ -101,15 +101,17 @@ public synchronized void setInput(byte[] buffer, int off, int len) { SnappyUtil.validateBuffer(buffer, off, len); if (inputBuffer.capacity() - inputBuffer.position() < len) { - final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len); + int maxSize = Math.max(inputBuffer.position() * 2 , inputBuffer.position() + len); + ByteBuffer newBuffer = ByteBuffer.allocateDirect(maxSize); inputBuffer.rewind(); newBuffer.put(inputBuffer); final ByteBuffer oldBuffer = inputBuffer; inputBuffer = newBuffer; CleanUtil.cleanDirectBuffer(oldBuffer); - } else { - inputBuffer.limit(inputBuffer.position() + len); } + + inputBuffer.limit(inputBuffer.position() + len); + inputBuffer.put(buffer, off, len); } @@ -150,7 +152,7 @@ public boolean needsDictionary() { @Override public void setDictionary(byte[] b, int off, int len) { - // No-op + // No-op } } //class SnappyDecompressor diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java index a505e7bde0..a24612f2ee 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java @@ -50,4 +50,13 @@ public int read() throws IOException { public void resetState() throws IOException { // no-opt, doesn't apply to ZSTD } + + @Override + public void close() throws IOException { + try { + zstdInputStream.close(); + } finally { + super.close(); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java index 4ebe15aecf..b51093d46f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java @@ -19,27 +19,14 @@ package org.apache.parquet.hadoop; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.crypto.ColumnEncryptionProperties; -import org.apache.parquet.crypto.DecryptionKeyRetrieverMock; -import org.apache.parquet.crypto.FileDecryptionProperties; -import org.apache.parquet.crypto.FileEncryptionProperties; -import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; -import org.apache.parquet.hadoop.example.ExampleParquetWriter; -import org.apache.parquet.hadoop.example.GroupReadSupport; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.io.api.Binary; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Files; @@ -48,17 +35,41 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.parquet.filter2.predicate.FilterApi.*; -import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; -import static org.junit.Assert.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.crypto.ColumnEncryptionProperties; +import org.apache.parquet.crypto.DecryptionKeyRetrieverMock; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.api.Binary; @RunWith(Parameterized.class) public class TestBloomFiltering { @@ -148,6 +159,22 @@ private static List generateNames(int rowCount) { return list; } + protected static List generateDictionaryData(int rowCount) { + List users = new ArrayList<>(); + List names = new ArrayList<>(); + for (int i = 0; i < rowCount / 5; i++) { + names.add("miller"); + names.add("anderson"); + names.add("thomas"); + names.add("chenLiang"); + names.add("len"); + } + for (int i = 0; i < rowCount; ++i) { + users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount))); + } + return users; + } + private static List generatePhoneNumbers() { int length = RANDOM.nextInt(5) - 1; if (length < 0) { @@ -178,18 +205,8 @@ private static PhoneBookWriter.Location generateLocation(int id, int rowCount) { } private List readUsers(FilterPredicate filter, boolean useOtherFiltering, - boolean useBloomFilter) throws IOException { - FileDecryptionProperties fileDecryptionProperties = null; - if (isEncrypted) { - DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock() - .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY) - .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1) - .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2); - - fileDecryptionProperties = FileDecryptionProperties.builder() - .withKeyRetriever(decryptionKeyRetrieverMock) - .build(); - } + boolean useBloomFilter) throws IOException { + FileDecryptionProperties fileDecryptionProperties = getFileDecryptionProperties(); return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file) .withFilter(FilterCompat.get(filter)) @@ -201,6 +218,20 @@ private List readUsers(FilterPredicate filter, boolean use .useColumnIndexFilter(useOtherFiltering)); } + public FileDecryptionProperties getFileDecryptionProperties() { + if (!isEncrypted) { + return null; + } + DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock() + .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY) + .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1) + .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2); + + return FileDecryptionProperties.builder() + .withKeyRetriever(decryptionKeyRetrieverMock) + .build(); + } + // Assumes that both lists are in the same order private static void assertContains(Stream expected, List actual) { Iterator expIt = expected.iterator(); @@ -237,7 +268,7 @@ private void assertCorrectFiltering(Predicate expectedFilt assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result); } - private static FileEncryptionProperties getFileEncryptionProperties() { + protected static FileEncryptionProperties getFileEncryptionProperties() { ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties .builder("id") .withKey(COLUMN_ENCRYPTION_KEY1) @@ -262,35 +293,55 @@ private static FileEncryptionProperties getFileEncryptionProperties() { return encryptionProperties; } - private static void writePhoneBookToFile(Path file, - ParquetProperties.WriterVersion parquetVersion, - FileEncryptionProperties encryptionProperties) throws IOException { + protected static void writePhoneBookToFile(Path file, + ParquetProperties.WriterVersion parquetVersion, + FileEncryptionProperties encryptionProperties, + boolean useDynamicBloomFilter) throws IOException { int pageSize = DATA.size() / 100; // Ensure that several pages will be created int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created - PhoneBookWriter.write(ExampleParquetWriter.builder(file) - .withWriteMode(OVERWRITE) - .withRowGroupSize(rowGroupSize) - .withPageSize(pageSize) + ExampleParquetWriter.Builder writeBuilder = ExampleParquetWriter.builder(file) + .withWriteMode(OVERWRITE) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withEncryption(encryptionProperties) + .withWriterVersion(parquetVersion); + if (useDynamicBloomFilter) { + writeBuilder + .withBloomFilterEnabled("location.lat", true) + .withBloomFilterCandidateSize("location.lat", 10) + .withBloomFilterEnabled("name", true) + .withBloomFilterCandidateSize("name", 10) + .withBloomFilterEnabled("id", true) + .withBloomFilterCandidateSize("id", 10); + } else { + writeBuilder .withBloomFilterNDV("location.lat", 10000L) .withBloomFilterNDV("name", 10000L) - .withBloomFilterNDV("id", 10000L) - .withEncryption(encryptionProperties) - .withWriterVersion(parquetVersion), - DATA); + .withBloomFilterNDV("id", 10000L); + } + PhoneBookWriter.write(writeBuilder, DATA); } private static void deleteFile(Path file) throws IOException { file.getFileSystem(new Configuration()).delete(file, false); } + public Path getFile() { + return file; + } + @BeforeClass public static void createFiles() throws IOException { - writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, null); - writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, null); + createFiles(false); + } + + public static void createFiles(boolean useDynamicBloomFilter) throws IOException { + writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, null, useDynamicBloomFilter); + writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, null, useDynamicBloomFilter); FileEncryptionProperties encryptionProperties = getFileEncryptionProperties(); - writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties); - writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties); + writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties, useDynamicBloomFilter); + writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties, useDynamicBloomFilter); } @AfterClass @@ -322,4 +373,23 @@ record -> { }, eq(doubleColumn("location.lat"), 99.9)); } + + @Test + public void checkBloomFilterSize() throws IOException { + FileDecryptionProperties fileDecryptionProperties = getFileDecryptionProperties(); + final ParquetReadOptions readOptions = ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build(); + InputFile inputFile = HadoopInputFile.fromPath(getFile(), new Configuration()); + try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) { + fileReader.getRowGroups().forEach(block -> { + BloomFilterReader bloomFilterReader = fileReader.getBloomFilterDataReader(block); + block.getColumns().stream() + .filter(column -> column.getBloomFilterOffset() > 0) + .forEach(column -> { + int bitsetSize = bloomFilterReader.readBloomFilter(column).getBitsetSize(); + // when setting nvd to a fixed value 10000L, bitsetSize will always be 16384 + assertEquals(16384, bitsetSize); + }); + }); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDynamicBlockBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDynamicBlockBloomFiltering.java new file mode 100644 index 0000000000..1f9f5fc1cc --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDynamicBlockBloomFiltering.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; + +public class TestDynamicBlockBloomFiltering extends TestBloomFiltering { + + @BeforeClass + public static void createFiles() throws IOException { + createFiles(true); + } + + public TestDynamicBlockBloomFiltering(Path file, boolean isEncrypted) { + super(file, isEncrypted); + } + + @Test + public void testSimpleFiltering() throws IOException { + super.testSimpleFiltering(); + } + + @Test + public void testNestedFiltering() throws IOException { + super.testNestedFiltering(); + } + + @Test + public void checkBloomFilterSize() throws IOException { + FileDecryptionProperties fileDecryptionProperties = getFileDecryptionProperties(); + final ParquetReadOptions readOptions = ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build(); + InputFile inputFile = HadoopInputFile.fromPath(getFile(), new Configuration()); + try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) { + fileReader.getRowGroups().forEach(block -> { + BloomFilterReader bloomFilterReader = fileReader.getBloomFilterDataReader(block); + block.getColumns().stream() + .filter(column -> column.getBloomFilterOffset() > 0) + .forEach(column -> { + int bitsetSize = bloomFilterReader.readBloomFilter(column).getBitsetSize(); + // set 10 candidates: + // [bitSize=8192, expectedNVD=500], [bitSize=16384, expectedNVD=1500] , [bitSize=32768, expectedNVD=3000], + // [bitSize=65536, expectedNVD=6500] ........ + // number of distinct values is less than 100, so the bitSize should be less than 131073. + assertTrue(bitsetSize <= 8192); + }); + }); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java new file mode 100644 index 0000000000..90015f57e1 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.buildMessage; +import static org.apache.parquet.schema.Types.required; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +/** + * This test is to test parquet-mr working with potential int overflows (when the sizes are greater than + * Integer.MAX_VALUE). The test requires ~3GB memory so it is likely to fail in the CI environment, so these + * tests are flagged to be ignored. + */ +@Ignore +public class TestLargeColumnChunk { + private static final MessageType SCHEMA = buildMessage().addFields( + required(INT64).named("id"), + required(BINARY).named("data")) + .named("schema"); + private static final int DATA_SIZE = 256; + // Ensure that the size of the column chunk would overflow an int + private static final int ROW_COUNT = Integer.MAX_VALUE / DATA_SIZE + 1000; + private static final long RANDOM_SEED = 42; + private static final int ID_INDEX = SCHEMA.getFieldIndex("id"); + private static final int DATA_INDEX = SCHEMA.getFieldIndex("data"); + + private static final long ID_OF_FILTERED_DATA = ROW_COUNT / 2; + private static Binary VALUE_IN_DATA; + private static Binary VALUE_NOT_IN_DATA; + private static Path file; + + @ClassRule + public static TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void createFile() throws IOException { + file = new Path(folder.newFile().getAbsolutePath()); + + GroupFactory factory = new SimpleGroupFactory(SCHEMA); + Random random = new Random(RANDOM_SEED); + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(SCHEMA, conf); + try (ParquetWriter writer = ExampleParquetWriter + .builder(HadoopOutputFile.fromPath(file, conf)) + .withWriteMode(OVERWRITE) + .withConf(conf) + .withCompressionCodec(UNCOMPRESSED) + .withRowGroupSize(4L * 1024 * 1024 * 1024) // 4G to ensure all data goes to one row group + .withBloomFilterEnabled(true) + .build()) { + for (long id = 0; id < ROW_COUNT; ++id) { + Group group = factory.newGroup(); + group.add(ID_INDEX, id); + Binary data = nextBinary(random); + group.add(DATA_INDEX, data); + writer.write(group); + if (id == ID_OF_FILTERED_DATA) { + VALUE_IN_DATA = data; + } + } + } + VALUE_NOT_IN_DATA = nextBinary(random); + } + + private static Binary nextBinary(Random random) { + byte[] bytes = new byte[DATA_SIZE]; + random.nextBytes(bytes); + return Binary.fromConstantByteArray(bytes); + } + + @Test + public void validateAllData() throws IOException { + Random random = new Random(RANDOM_SEED); + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file).build()) { + for (long id = 0; id < ROW_COUNT; ++id) { + Group group = reader.read(); + assertEquals(id, group.getLong(ID_INDEX, 0)); + assertEquals(nextBinary(random), group.getBinary(DATA_INDEX, 0)); + } + assertNull("No more record should be read", reader.read()); + } + } + + @Test + public void validateFiltering() throws IOException { + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_IN_DATA))) + .build()) { + Group group = reader.read(); + assertEquals(ID_OF_FILTERED_DATA, group.getLong(ID_INDEX, 0)); + assertEquals(VALUE_IN_DATA, group.getBinary(DATA_INDEX, 0)); + assertNull("No more record should be read", reader.read()); + } + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_NOT_IN_DATA))) + .build()) { + assertNull("No record should be read", reader.read()); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 73ef70e462..8dcbf4acf4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -23,6 +23,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesUtils; @@ -30,6 +34,8 @@ import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.io.ParquetEncodingException; import org.junit.Assume; import org.junit.Rule; @@ -67,6 +73,7 @@ import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics; import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_SUPPORT_CLASS; import static org.junit.Assert.*; import static org.apache.parquet.column.Encoding.BIT_PACKED; import static org.apache.parquet.column.Encoding.PLAIN; @@ -75,6 +82,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.Type.Repetition.*; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; +import static org.junit.Assert.assertEquals; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; @@ -204,39 +212,116 @@ public void testWriteRead() throws Exception { assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings()); { // read first block of col #1 - ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - assertNull(r.readNextRowGroup()); + try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + assertNull(r.readNextRowGroup()); + } } { // read all blocks of col #1 and #2 - ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); + try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); - pages = r.readNextRowGroup(); - assertEquals(4, pages.getRowCount()); + pages = r.readNextRowGroup(); + assertEquals(4, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); - validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); + validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); + validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); - assertNull(r.readNextRowGroup()); + assertNull(r.readNextRowGroup()); + } } PrintFooter.main(new String[] {path.toString()}); } + @Test + public void testWriteReadWithRecordReader() throws Exception { + File testFile = temp.newFile(); + testFile.delete(); + + Path path = new Path(testFile.toURI()); + Configuration configuration = new Configuration(); + + ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + w.start(); + w.startBlock(3); + w.startColumn(C1, 5, CODEC); + w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.startColumn(C2, 6, CODEC); + long c2Starts = w.getPos(); + w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY)); + long c2p1Starts = w.getPos(); + w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + long c2Ends = w.getPos(); + w.endBlock(); + w.startBlock(4); + w.startColumn(C1, 7, CODEC); + long c1Bock2Starts = w.getPos(); + long c1p1Bock2Starts = w.getPos(); + w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + long c1Block2Ends = w.getPos(); + w.startColumn(C2, 8, CODEC); + w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap()); + + ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); + assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size()); + BlockMetaData rowGroup = readFooter.getBlocks().get(0); + assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize()); + + assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset()); + assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos()); + assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset()); + assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset()); + + BlockMetaData rowGroup2 = readFooter.getBlocks().get(1); + assertEquals(0, rowGroup2.getColumns().get(0).getDictionaryPageOffset()); + assertEquals(c1Bock2Starts, rowGroup2.getColumns().get(0).getStartingPos()); + assertEquals(c1p1Bock2Starts, rowGroup2.getColumns().get(0).getFirstDataPageOffset()); + assertEquals(c1Block2Ends - c1Bock2Starts, rowGroup2.getColumns().get(0).getTotalSize()); + + HashSet expectedEncoding=new HashSet(); + expectedEncoding.add(PLAIN); + expectedEncoding.add(BIT_PACKED); + assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings()); + + ParquetInputSplit split = new ParquetInputSplit(path, 0, w.getPos(),null, + readFooter.getBlocks(), SCHEMA.toString(), + readFooter.getFileMetaData().getSchema().toString(), + readFooter.getFileMetaData().getKeyValueMetaData(), + null); + ParquetInputFormat input = new ParquetInputFormat(); + configuration.set(READ_SUPPORT_CLASS, GroupReadSupport.class.getName()); + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(configuration, taskAttemptID); + RecordReader reader = input.createRecordReader(split, taskContext); + assertTrue(reader instanceof ParquetRecordReader); + //RowGroup.file_offset is checked here + reader.initialize(split, taskContext); + reader.close(); + } + @Test public void testWriteEmptyBlock() throws Exception { File testFile = temp.newFile(); @@ -281,12 +366,14 @@ public void testBloomFilterWriteRead() throws Exception { w.endBlock(); w.end(new HashMap<>()); ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); - ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath))); - BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); - BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); - assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello")))); - assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world")))); + + try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)))) { + BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); + BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello")))); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world")))); + } } @Test @@ -340,16 +427,16 @@ public void testWriteReadDataPageV2() throws Exception { expectedEncoding.add(PLAIN); assertEquals(expectedEncoding, readFooter.getBlocks().get(0).getColumns().get(0).getEncodings()); - ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); - - PageReadStore pages = reader.readNextRowGroup(); - assertEquals(14, pages.getRowCount()); - validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12); - validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(),data.toByteArray(), 12); - validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); - validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); - assertNull(reader.readNextRowGroup()); + try (ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { + PageReadStore pages = reader.readNextRowGroup(); + assertEquals(14, pages.getRowCount()); + validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12); + validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12); + validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); + validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); + assertNull(reader.readNextRowGroup()); + } } @Test @@ -426,35 +513,37 @@ public void testAlignmentWithPadding() throws Exception { 120, readFooter.getBlocks().get(1).getStartingPos()); { // read first block of col #1 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - assertNull(r.readNextRowGroup()); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + assertNull(r.readNextRowGroup()); + } } { // read all blocks of col #1 and #2 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); - pages = r.readNextRowGroup(); - assertEquals(4, pages.getRowCount()); + pages = r.readNextRowGroup(); + assertEquals(4, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); - validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); + validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); + validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); - assertNull(r.readNextRowGroup()); + assertNull(r.readNextRowGroup()); + } } PrintFooter.main(new String[] {path.toString()}); } @@ -533,35 +622,36 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { 109, readFooter.getBlocks().get(1).getStartingPos()); { // read first block of col #1 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - assertNull(r.readNextRowGroup()); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + assertNull(r.readNextRowGroup()); + } } { // read all blocks of col #1 and #2 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); - - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); - pages = r.readNextRowGroup(); - assertEquals(4, pages.getRowCount()); + pages = r.readNextRowGroup(); + assertEquals(4, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); - validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); + validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); + validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); - assertNull(r.readNextRowGroup()); + assertNull(r.readNextRowGroup()); + } } PrintFooter.main(new String[] {path.toString()}); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index de53e96264..673d5503c7 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -269,14 +269,15 @@ public void testParquetFileWithBloomFilter() throws IOException { } } - ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration())); - BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); - BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) - .readBloomFilter(blockMetaData.getColumns().get(0)); + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); + BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) + .readBloomFilter(blockMetaData.getColumns().get(0)); - for (String name: testNames) { - assertTrue(bloomFilter.findHash( - LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); + for (String name : testNames) { + assertTrue(bloomFilter.findHash( + LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); + } } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java index bda5333523..82d48f411d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; @@ -68,6 +69,17 @@ public class TestParquetWriterAppendBlocks { public static final SimpleGroupFactory GROUP_FACTORY = new SimpleGroupFactory(FILE_SCHEMA); + private static final Path STATIC_FILE_1 = createPathFromCP("/test-append_1.parquet"); + private static final Path STATIC_FILE_2 = createPathFromCP("/test-append_2.parquet"); + + private static Path createPathFromCP(String path) { + try { + return new Path(TestParquetWriterAppendBlocks.class.getResource(path).toURI()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + public Path file1; public List file1content = new ArrayList(); public Path file2; @@ -134,6 +146,51 @@ public void testBasicBehavior() throws IOException { Assert.assertEquals("All records should be present", 0, expected.size()); } + /** + * This test is similar to {@link #testBasicBehavior()} only that it uses static files generated by a previous release + * (1.11.1). This test is to validate the fix of PARQUET-2027. + */ + @Test + public void testBasicBehaviorWithStaticFiles() throws IOException { + List expected = new ArrayList<>(); + readAll(STATIC_FILE_1, expected); + readAll(STATIC_FILE_2, expected); + + Path combinedFile = newTemp(); + ParquetFileWriter writer = new ParquetFileWriter( + CONF, FILE_SCHEMA, combinedFile); + writer.start(); + writer.appendFile(CONF, STATIC_FILE_1); + writer.appendFile(CONF, STATIC_FILE_2); + writer.end(EMPTY_METADATA); + + try (ParquetReader reader = ParquetReader + .builder(new GroupReadSupport(), combinedFile) + .build()) { + + for (Group expectedNext : expected) { + Group next = reader.read(); + // check each value; equals is not supported for simple records + Assert.assertEquals("Each id should match", + expectedNext.getInteger("id", 0), next.getInteger("id", 0)); + Assert.assertEquals("Each string should match", + expectedNext.getString("string", 0), next.getString("string", 0)); + } + Assert.assertNull("No extra records should be present", reader.read()); + } + + } + + private void readAll(Path file, List values) throws IOException { + try (ParquetReader reader = ParquetReader + .builder(new GroupReadSupport(), file) + .build()) { + for (Group g = reader.read(); g != null; g = reader.read()) { + values.add(g); + } + } + } + @Test public void testMergedMetadata() throws IOException { Path combinedFile = newTemp(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java index 69e11c14e9..fdb7c8677d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java @@ -93,29 +93,30 @@ public void testReadWrite() throws Exception { writeData(writer); writer.close(); - ParquetFileReader reader = ParquetFileReader.open(CONF, path); - assertEquals("Should have one row group", 1, reader.getRowGroups().size()); - BlockMetaData rowGroup = reader.getRowGroups().get(0); - - ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0); - EncodingStats dictStats = dictColumn.getEncodingStats(); - assertNotNull("Dict column should have non-null encoding stats", dictStats); - assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages()); - assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages()); - assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages()); - - ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1); - EncodingStats plainStats = plainColumn.getEncodingStats(); - assertNotNull("Plain column should have non-null encoding stats", plainStats); - assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages()); - assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages()); - assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages()); - - ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2); - EncodingStats fallbackStats = fallbackColumn.getEncodingStats(); - assertNotNull("Fallback column should have non-null encoding stats", fallbackStats); - assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages()); - assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages()); - assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages()); + try (ParquetFileReader reader = ParquetFileReader.open(CONF, path)) { + assertEquals("Should have one row group", 1, reader.getRowGroups().size()); + BlockMetaData rowGroup = reader.getRowGroups().get(0); + + ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0); + EncodingStats dictStats = dictColumn.getEncodingStats(); + assertNotNull("Dict column should have non-null encoding stats", dictStats); + assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages()); + assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages()); + assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages()); + + ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1); + EncodingStats plainStats = plainColumn.getEncodingStats(); + assertNotNull("Plain column should have non-null encoding stats", plainStats); + assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages()); + assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages()); + assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages()); + + ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2); + EncodingStats fallbackStats = fallbackColumn.getEncodingStats(); + assertNotNull("Fallback column should have non-null encoding stats", fallbackStats); + assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages()); + assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages()); + assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages()); + } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java index c0d98266f7..b6bab26785 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java @@ -79,8 +79,8 @@ private void testZstd(ZstandardCodec codec, int dataSize) throws IOException { byte[] data = new byte[dataSize]; (new Random()).nextBytes(data); BytesInput compressedData = compress(codec, BytesInput.from(data)); - BytesInput decompressedData = decompress(codec, compressedData, data.length); - Assert.assertArrayEquals(data, decompressedData.toByteArray()); + byte[] decompressedData = decompress(codec, compressedData, data.length); + Assert.assertArrayEquals(data, decompressedData); } private BytesInput compress(ZstandardCodec codec, BytesInput bytes) throws IOException { @@ -91,10 +91,9 @@ private BytesInput compress(ZstandardCodec codec, BytesInput bytes) throws IOExc return BytesInput.from(compressedOutBuffer); } - private BytesInput decompress(ZstandardCodec codec, BytesInput bytes, int uncompressedSize) throws IOException { - BytesInput decompressed; + private byte[] decompress(ZstandardCodec codec, BytesInput bytes, int uncompressedSize) throws IOException { InputStream is = codec.createInputStream(bytes.toInputStream(), null); - decompressed = BytesInput.from(is, uncompressedSize); + byte[] decompressed = BytesInput.from(is, uncompressedSize).toByteArray(); is.close(); return decompressed; } diff --git a/parquet-hadoop/src/test/resources/test-append_1.parquet b/parquet-hadoop/src/test/resources/test-append_1.parquet new file mode 100644 index 0000000000..a255f86ebc Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-append_1.parquet differ diff --git a/parquet-hadoop/src/test/resources/test-append_2.parquet b/parquet-hadoop/src/test/resources/test-append_2.parquet new file mode 100644 index 0000000000..3081f893f5 Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-append_2.parquet differ diff --git a/parquet-jackson/pom.xml b/parquet-jackson/pom.xml index 3810886037..6e07c27396 100644 --- a/parquet-jackson/pom.xml +++ b/parquet-jackson/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-pig-bundle/pom.xml b/parquet-pig-bundle/pom.xml index a86f6eee97..a39ab12a44 100644 --- a/parquet-pig-bundle/pom.xml +++ b/parquet-pig-bundle/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml index ac109b5eb9..724df97a1b 100644 --- a/parquet-pig/pom.xml +++ b/parquet-pig/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml index e333ff733b..e3793aee15 100644 --- a/parquet-protobuf/pom.xml +++ b/parquet-protobuf/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-scala/pom.xml b/parquet-scala/pom.xml index 482dc07322..7f3bddc640 100644 --- a/parquet-scala/pom.xml +++ b/parquet-scala/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 @@ -46,13 +46,13 @@ https://scala-tools.org/repo-releases - + org.apache.parquet parquet-column ${project.version} - + org.scala-lang scala-library diff --git a/parquet-scrooge-deprecated/pom.xml b/parquet-scrooge-deprecated/pom.xml index 9c762e0b48..8c96f2054c 100644 --- a/parquet-scrooge-deprecated/pom.xml +++ b/parquet-scrooge-deprecated/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml index a9e216dee4..e30315a655 100644 --- a/parquet-thrift/pom.xml +++ b/parquet-thrift/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/parquet-tools-deprecated/pom.xml b/parquet-tools-deprecated/pom.xml index 3290b112a0..b741cc76d4 100644 --- a/parquet-tools-deprecated/pom.xml +++ b/parquet-tools-deprecated/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0 + 1.12.2-kylin-r6 4.0.0 diff --git a/pom.xml b/pom.xml index 7445e1c0d4..5b22962209 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.apache.parquet parquet - 1.12.0 + 1.12.2-kylin-r6 pom Apache Parquet MR @@ -20,7 +20,7 @@ scm:git:git@github.com:apache/parquet-mr.git scm:git:git@github.com:apache/parquet-mr.git scm:git:git@github.com:apache/parquet-mr.git - apache-parquet-1.12.0-rc4 + apache-parquet-1.12.2-rc0 @@ -114,7 +114,6 @@ 1.8 - -Xmx512m INFO @@ -156,7 +155,33 @@ test - + + + ${repository.id} + ${repository.url} + ${repository.name} + default + + + ${repository.id.snapshots} + ${repository.url.snapshots} + ${repository.name.snapshots} + default + + + + + + + + + + + + + + + @@ -376,6 +401,23 @@ ${maven.compiler.target} + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + true + + + + create-source-jar + + jar-no-fork + test-jar-no-fork + + + + org.apache.maven.plugins @@ -506,6 +548,10 @@ ${shade.prefix} + + org.apache.parquet.column.values.dictionary.DictionaryValuesWriter#dictionaryByteSize @@ -598,7 +644,7 @@ ci-test WARN - -Xmx512m -XX:MaxJavaStackTraceDepth=10 + -XX:MaxJavaStackTraceDepth=10