diff --git a/CHANGES.md b/CHANGES.md index e8ba58f482..e56b4d89fe 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -26,6 +26,7 @@ Release Notes - Parquet - Version 1.11.0 #### Bug * [PARQUET-1364](https://issues.apache.org/jira/browse/PARQUET-1364) - Column Indexes: Invalid row indexes for pages starting with nulls +* [PARQUET-138](https://issues.apache.org/jira/browse/PARQUET-138) - Parquet should allow a merge between required and optional schemas * [PARQUET-952](https://issues.apache.org/jira/browse/PARQUET-952) - Avro union with single type fails with 'is not a group' * [PARQUET-1128](https://issues.apache.org/jira/browse/PARQUET-1128) - \[Java\] Upgrade the Apache Arrow version to 0.8.0 for SchemaConverter * [PARQUET-1285](https://issues.apache.org/jira/browse/PARQUET-1285) - \[Java\] SchemaConverter should not convert from TimeUnit.SECOND AND TimeUnit.NANOSECOND of Arrow @@ -49,18 +50,29 @@ Release Notes - Parquet - Version 1.11.0 * [PARQUET-1456](https://issues.apache.org/jira/browse/PARQUET-1456) - Use page index, ParquetFileReader throw ArrayIndexOutOfBoundsException * [PARQUET-1460](https://issues.apache.org/jira/browse/PARQUET-1460) - Fix javadoc errors and include javadoc checking in Travis checks * [PARQUET-1461](https://issues.apache.org/jira/browse/PARQUET-1461) - Third party code does not compile after parquet-mr minor version update +* [PARQUET-1470](https://issues.apache.org/jira/browse/PARQUET-1470) - Inputstream leakage in ParquetFileWriter.appendFile * [PARQUET-1472](https://issues.apache.org/jira/browse/PARQUET-1472) - Dictionary filter fails on FIXED\_LEN\_BYTE\_ARRAY +* [PARQUET-1475](https://issues.apache.org/jira/browse/PARQUET-1475) - DirectCodecFactory's ParquetCompressionCodecException drops a passed in cause in one constructor * [PARQUET-1478](https://issues.apache.org/jira/browse/PARQUET-1478) - Can't read spec compliant, 3-level lists via parquet-proto +* [PARQUET-1480](https://issues.apache.org/jira/browse/PARQUET-1480) - INT96 to avro not yet implemented error should mention deprecation +* [PARQUET-1485](https://issues.apache.org/jira/browse/PARQUET-1485) - Snappy Decompressor/Compressor may cause direct memory leak +* [PARQUET-1498](https://issues.apache.org/jira/browse/PARQUET-1498) - \[Java\] Add instructions to install thrift via homebrew +* [PARQUET-1510](https://issues.apache.org/jira/browse/PARQUET-1510) - Dictionary filter skips null values when evaluating not-equals. +* [PARQUET-1514](https://issues.apache.org/jira/browse/PARQUET-1514) - ParquetFileWriter Records Compressed Bytes instead of Uncompressed Bytes +* [PARQUET-1527](https://issues.apache.org/jira/browse/PARQUET-1527) - \[parquet-tools\] cat command throw java.lang.ClassCastException +* [PARQUET-1529](https://issues.apache.org/jira/browse/PARQUET-1529) - Shade fastutil in all modules where used +* [PARQUET-1531](https://issues.apache.org/jira/browse/PARQUET-1531) - Page row count limit causes empty pages to be written from MessageColumnIO +* [PARQUET-1533](https://issues.apache.org/jira/browse/PARQUET-1533) - TestSnappy() throws OOM exception with Parquet-1485 change #### New Feature * [PARQUET-1201](https://issues.apache.org/jira/browse/PARQUET-1201) - Column indexes * [PARQUET-1253](https://issues.apache.org/jira/browse/PARQUET-1253) - Support for new logical type representation -* [PARQUET-1381](https://issues.apache.org/jira/browse/PARQUET-1381) - Add merge blocks command to parquet-tools * [PARQUET-1388](https://issues.apache.org/jira/browse/PARQUET-1388) - Nanosecond precision time and timestamp - parquet-mr #### Improvement +* [PARQUET-1280](https://issues.apache.org/jira/browse/PARQUET-1280) - \[parquet-protobuf\] Use maven protoc plugin * [PARQUET-1321](https://issues.apache.org/jira/browse/PARQUET-1321) - LogicalTypeAnnotation.LogicalTypeAnnotationVisitor#visit methods should have a return value * [PARQUET-1335](https://issues.apache.org/jira/browse/PARQUET-1335) - Logical type names in parquet-mr are not consistent with parquet-format * [PARQUET-1336](https://issues.apache.org/jira/browse/PARQUET-1336) - PrimitiveComparator should implements Serializable @@ -73,18 +85,41 @@ Release Notes - Parquet - Version 1.11.0 * [PARQUET-1418](https://issues.apache.org/jira/browse/PARQUET-1418) - Run integration tests in Travis * [PARQUET-1435](https://issues.apache.org/jira/browse/PARQUET-1435) - Benchmark filtering column-indexes * [PARQUET-1462](https://issues.apache.org/jira/browse/PARQUET-1462) - Allow specifying new development version in prepare-release.sh +* [PARQUET-1466](https://issues.apache.org/jira/browse/PARQUET-1466) - Upgrade to the latest guava 27.0-jre * [PARQUET-1474](https://issues.apache.org/jira/browse/PARQUET-1474) - Less verbose and lower level logging for missing column/offset indexes * [PARQUET-1476](https://issues.apache.org/jira/browse/PARQUET-1476) - Don't emit a warning message for files without new logical type * [PARQUET-1487](https://issues.apache.org/jira/browse/PARQUET-1487) - Do not write original type for timezone-agnostic timestamps * [PARQUET-1489](https://issues.apache.org/jira/browse/PARQUET-1489) - Insufficient documentation for UserDefinedPredicate.keep(T) - +* [PARQUET-1490](https://issues.apache.org/jira/browse/PARQUET-1490) - Add branch-specific Travis steps +* [PARQUET-1492](https://issues.apache.org/jira/browse/PARQUET-1492) - Remove protobuf install in travis build +* [PARQUET-1500](https://issues.apache.org/jira/browse/PARQUET-1500) - Remove the Closables +* [PARQUET-1502](https://issues.apache.org/jira/browse/PARQUET-1502) - Convert FIXED\_LEN\_BYTE\_ARRAY to arrow type in +* [PARQUET-1503](https://issues.apache.org/jira/browse/PARQUET-1503) - Remove Ints Utility Class +* [PARQUET-1504](https://issues.apache.org/jira/browse/PARQUET-1504) - Add an option to convert Parquet Int96 to Arrow Timestamp +* [PARQUET-1505](https://issues.apache.org/jira/browse/PARQUET-1505) - Use Java 7 NIO StandardCharsets +* [PARQUET-1506](https://issues.apache.org/jira/browse/PARQUET-1506) - Migrate from maven-thrift-plugin to thrift-maven-plugin +* [PARQUET-1507](https://issues.apache.org/jira/browse/PARQUET-1507) - Bump Apache Thrift to 0.12.0 +* [PARQUET-1509](https://issues.apache.org/jira/browse/PARQUET-1509) - Update Docs for Hive Deprecation +* [PARQUET-1513](https://issues.apache.org/jira/browse/PARQUET-1513) - HiddenFileFilter Streamline +* [PARQUET-1518](https://issues.apache.org/jira/browse/PARQUET-1518) - Bump Jackson2 version of parquet-cli #### Task * [PARQUET-968](https://issues.apache.org/jira/browse/PARQUET-968) - Add Hive/Presto support in ProtoParquet +* [PARQUET-1294](https://issues.apache.org/jira/browse/PARQUET-1294) - Update release scripts for the new Apache policy +* [PARQUET-1434](https://issues.apache.org/jira/browse/PARQUET-1434) - Release parquet-mr 1.11.0 * [PARQUET-1436](https://issues.apache.org/jira/browse/PARQUET-1436) - TimestampMicrosStringifier shows wrong microseconds for timestamps before 1970 * [PARQUET-1452](https://issues.apache.org/jira/browse/PARQUET-1452) - Deprecate old logical types API +### Version 1.10.1 ### + +Release Notes - Parquet - Version 1.10.1 + +#### Bug + +* [PARQUET-1510](https://issues.apache.org/jira/browse/PARQUET-1510) \- Dictionary filter skips null values when evaluating not-equals. +* [PARQUET-1309](https://issues.apache.org/jira/browse/PARQUET-1309) \- Parquet Java uses incorrect stats and dictionary filter properties + ### Version 1.10.0 ### Release Notes - Parquet - Version 1.10.0 diff --git a/FORK.md b/FORK.md index 7b8002db9a..6184e3bda9 100644 --- a/FORK.md +++ b/FORK.md @@ -1,8 +1,5 @@ # Differences to mainline -This repo exists mostly to make releases of parquet-mr more often. The only difference to upstream is as follows: - -1. Solution for [PARQUET-686](https://issues.apache.org/jira/browse/PARQUET-686). -2. Temporarily revert [PARQUET-1414](https://issues.apache.org/jira/browse/PARQUET-1414) because it causes Spark to write unreadable empty Parquet pages. +This repo exists mostly to make releases of parquet-mr more often. The only difference to upstream is solution for [PARQUET-686](https://issues.apache.org/jira/browse/PARQUET-686). The change that we had made that upstream only made in statistics v2 is to change binary comparison to be unsigned and declare all statistics priori to that change as corrupted. This lets us more quickly take advantage of binary statistics and removes burden on user to know whether they should account for signed binary comparison in their values. diff --git a/README.md b/README.md index 890b30aaaa..c93adba8e6 100644 --- a/README.md +++ b/README.md @@ -35,14 +35,21 @@ Parquet-MR uses Maven to build and depends on the thrift compiler (protoc is now To build and install the thrift compiler, run: ``` -wget -nv http://archive.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz -tar xzf thrift-0.9.3.tar.gz -cd thrift-0.9.3 +wget -nv http://archive.apache.org/dist/thrift/0.12.0/thrift-0.12.0.tar.gz +tar xzf thrift-0.12.0.tar.gz +cd thrift-0.12.0 chmod +x ./configure ./configure --disable-gen-erl --disable-gen-hs --without-ruby --without-haskell --without-erlang --without-php --without-nodejs sudo make install ``` +If you're on OSX and use homebrew, you can instead install Thrift 0.12.0 with `brew` and ensure that it comes first in your `PATH`. + +``` +brew install thrift@0.12.0 +export PATH="/usr/local/opt/thrift@0.12.0/bin:$PATH" +``` + ### Build Parquet with Maven Once protobuf and thrift are available in your path, you can build the project by running: @@ -57,7 +64,7 @@ Parquet is a very active project, and new features are being added quickly. Here * Type-specific encoding -* Hive integration +* Hive integration (deprecated) * Pig integration * Cascading integration * Crunch integration @@ -120,6 +127,8 @@ If the data was stored using Pig, things will "just work". If the data was store Hive integration is provided via the [parquet-hive](https://github.com/apache/parquet-mr/tree/master/parquet-hive) sub-project. +Hive integration is now deprecated within the Parquet project. It is now maintained by Apache Hive. + ## Build To run the unit tests: `mvn test` diff --git a/dev/docker-images/Dockerfile b/dev/docker-images/Dockerfile index e0b19707be..bd5ccc339a 100644 --- a/dev/docker-images/Dockerfile +++ b/dev/docker-images/Dockerfile @@ -99,10 +99,10 @@ RUN groupadd --gid 3434 circleci \ SHELL ["/bin/bash", "-eux", "-o", "pipefail", "-c"] -RUN THRIFT_URL="http://archive.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz" \ +RUN THRIFT_URL="http://archive.apache.org/dist/thrift/0.12.0/thrift-0.12.0.tar.gz" \ && curl --silent --show-error --location --fail --retry 3 --output /tmp/thrift.tar.gz $THRIFT_URL \ && tar -C /tmp -xzvf /tmp/thrift.tar.gz \ - && cd /tmp/thrift-0.9.3 \ + && cd /tmp/thrift-0.12.0 \ && chmod +x ./configure \ && ./configure --disable-gen-erl --disable-gen-hs --without-ruby --without-haskell --without-erlang --without-python \ && make install \ diff --git a/dev/travis-before_install.sh b/dev/travis-before_install.sh index 5161f223f4..67fee74fd0 100644 --- a/dev/travis-before_install.sh +++ b/dev/travis-before_install.sh @@ -19,6 +19,8 @@ # This script gets invoked by .travis.yml in the before_install step ################################################################################ +export THIFT_VERSION=0.12.0 + set -e date sudo apt-get update -qq @@ -27,9 +29,9 @@ sudo apt-get install -qq build-essential pv autoconf automake libtool curl make libevent-dev automake libtool flex bison pkg-config g++ libssl-dev xmlstarlet date pwd -wget -nv http://archive.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz -tar zxf thrift-0.9.3.tar.gz -cd thrift-0.9.3 +wget -nv http://archive.apache.org/dist/thrift/${THIFT_VERSION}/thrift-${THIFT_VERSION}.tar.gz +tar zxf thrift-${THIFT_VERSION}.tar.gz +cd thrift-${THIFT_VERSION} chmod +x ./configure ./configure --disable-gen-erl --disable-gen-hs --without-ruby --without-haskell --without-erlang --without-php --without-nodejs sudo make install diff --git a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java index 51057c589e..6275ca37b1 100644 --- a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java +++ b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java @@ -86,10 +86,19 @@ */ public class SchemaConverter { + // Indicates if Int96 should be converted to Arrow Timestamp + private final boolean convertInt96ToArrowTimestamp; + /** * For when we'll need this to be configurable */ public SchemaConverter() { + this(false); + } + + // TODO(PARQUET-1511): pass the parameters in a configuration object + public SchemaConverter(final boolean convertInt96ToArrowTimestamp) { + this.convertInt96ToArrowTimestamp = convertInt96ToArrowTimestamp; } /** @@ -492,13 +501,26 @@ private String getTimeZone(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation @Override public TypeMapping convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException { - // Possibly timestamp - return field(new ArrowType.Binary()); + if (convertInt96ToArrowTimestamp) { + return field(new ArrowType.Timestamp(TimeUnit.NANOSECOND, null)); + } else { + return field(new ArrowType.Binary()); + } } @Override public TypeMapping convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) throws RuntimeException { - return field(new ArrowType.Binary()); + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation == null) { + return field(new ArrowType.Binary()); + } + + return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { + @Override + public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { + return of(decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale())); + } + }).orElseThrow(() -> new IllegalArgumentException("illegal type " + type)); } @Override diff --git a/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java index c962b5456f..764621a13c 100644 --- a/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java +++ b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java @@ -47,6 +47,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; import java.io.IOException; import java.util.List; @@ -419,6 +420,47 @@ public void testParquetInt64TimeMicrosToArrow() { Assert.assertEquals(expected, converter.fromParquet(parquet).getArrowSchema()); } + @Test + public void testParquetFixedBinaryToArrow() { + MessageType parquet = Types.buildMessage() + .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).named("a")).named("root"); + Schema expected = new Schema(asList( + field("a", new ArrowType.Binary()) + )); + Assert.assertEquals(expected, converter.fromParquet(parquet).getArrowSchema()); + } + + @Test + public void testParquetFixedBinaryToArrowDecimal() { + MessageType parquet = Types.buildMessage() + .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(5).as(DECIMAL).precision(8).scale(2).named("a")).named("root"); + Schema expected = new Schema(asList( + field("a", new ArrowType.Decimal(8, 2)) + )); + Assert.assertEquals(expected, converter.fromParquet(parquet).getArrowSchema()); + } + + @Test + public void testParquetInt96ToArrowBinary() { + MessageType parquet = Types.buildMessage() + .addField(Types.optional(INT96).named("a")).named("root"); + Schema expected = new Schema(asList( + field("a", new ArrowType.Binary()) + )); + Assert.assertEquals(expected, converter.fromParquet(parquet).getArrowSchema()); + } + + @Test + public void testParquetInt96ToArrowTimestamp() { + final SchemaConverter converterInt96ToTimestamp = new SchemaConverter(true); + MessageType parquet = Types.buildMessage() + .addField(Types.optional(INT96).named("a")).named("root"); + Schema expected = new Schema(asList( + field("a", new ArrowType.Timestamp(TimeUnit.NANOSECOND, null)) + )); + Assert.assertEquals(expected, converterInt96ToTimestamp.fromParquet(parquet).getArrowSchema()); + } + @Test(expected = IllegalStateException.class) public void testParquetInt64TimeMillisToArrow() { converter.fromParquet(Types.buildMessage() diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 953bfe4f9f..16498038f6 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -161,6 +161,10 @@ + + org.apache.maven.plugins + maven-shade-plugin + diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 558446e6ba..b4bac2f101 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -291,7 +291,7 @@ public Schema convertINT64(PrimitiveTypeName primitiveTypeName) { } @Override public Schema convertINT96(PrimitiveTypeName primitiveTypeName) { - throw new IllegalArgumentException("INT96 not yet implemented."); + throw new IllegalArgumentException("INT96 not implemented and is deprecated"); } @Override public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) { diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 69a73cb062..436893873f 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -18,7 +18,6 @@ */ package org.apache.parquet.avro; -import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; @@ -27,6 +26,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -369,7 +369,7 @@ public void testAll() throws Exception { .set("mylong", 2L) .set("myfloat", 3.1f) .set("mydouble", 4.1) - .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8))) + .set("mybytes", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))) .set("mystring", "hello") .set("mynestedrecord", nestedRecord) .set("myenum", "a") @@ -398,7 +398,7 @@ public void testAll() throws Exception { assertEquals(2L, nextRecord.get("mylong")); assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); - assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); + assertEquals(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)), nextRecord.get("mybytes")); assertEquals(str("hello"), nextRecord.get("mystring")); assertEquals(expectedEnumSymbol, nextRecord.get("myenum")); assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); @@ -567,7 +567,7 @@ public void write(Map record) { record.put("mylong", 2L); record.put("myfloat", 3.1f); record.put("mydouble", 4.1); - record.put("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8))); + record.put("mybytes", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))); record.put("mystring", "hello"); record.put("myenum", "a"); record.put("mynestedint", 1); @@ -615,7 +615,7 @@ public void write(Map record) { assertEquals(2L, nextRecord.get("mylong")); assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); - assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); + assertEquals(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)), nextRecord.get("mybytes")); assertEquals(str("hello"), nextRecord.get("mystring")); assertEquals(str("a"), nextRecord.get("myenum")); // enum symbols are unknown assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java index af6f938115..bcf553eb73 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java @@ -18,12 +18,12 @@ */ package org.apache.parquet.avro; -import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; import java.io.File; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -247,7 +247,7 @@ public void testAll() throws Exception { .set("mylong", 2L) .set("myfloat", 3.1f) .set("mydouble", 4.1) - .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8))) + .set("mybytes", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))) .set("mystring", "hello") .set("mynestedrecord", nestedRecord) .set("myenum", "a") @@ -276,7 +276,7 @@ public void testAll() throws Exception { assertEquals(2L, nextRecord.get("mylong")); assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); - assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); + assertEquals(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)), nextRecord.get("mybytes")); assertEquals(str("hello"), nextRecord.get("mystring")); assertEquals(expectedEnumSymbol, nextRecord.get("myenum")); assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); @@ -327,7 +327,7 @@ public void testArrayWithNullValues() throws Exception { .set("mylong", 2L) .set("myfloat", 3.1f) .set("mydouble", 4.1) - .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8))) + .set("mybytes", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))) .set("mystring", "hello") .set("mynestedrecord", nestedRecord) .set("myenum", "a") @@ -512,7 +512,7 @@ public void write(Map record) { record.put("mylong", 2L); record.put("myfloat", 3.1f); record.put("mydouble", 4.1); - record.put("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8))); + record.put("mybytes", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))); record.put("mystring", "hello"); record.put("myenum", "a"); record.put("mynestedint", 1); @@ -573,7 +573,7 @@ public void write(Map record) { assertEquals(2L, nextRecord.get("mylong")); assertEquals(3.1f, nextRecord.get("myfloat")); assertEquals(4.1, nextRecord.get("mydouble")); - assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes")); + assertEquals(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)), nextRecord.get("mybytes")); assertEquals(str("hello"), nextRecord.get("mystring")); assertEquals(str("a"), nextRecord.get("myenum")); assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index 39b75f70ab..fe72f3bbf7 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -32,7 +32,7 @@ https://parquet.apache.org - 1.17.3 + 1.21 parquet-benchmarks @@ -73,7 +73,11 @@ org.slf4j slf4j-simple ${slf4j.version} - test + + + it.unimi.dsi + fastutil + ${fastutil.version} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FilteringBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FilteringBenchmarks.java new file mode 100644 index 0000000000..ac47f7bc2f --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FilteringBenchmarks.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.openjdk.jmh.annotations.Level.Invocation; +import static org.openjdk.jmh.annotations.Mode.SingleShotTime; +import static org.openjdk.jmh.annotations.Scope.Benchmark; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetReader.Builder; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongList; + +/** + * Benchmarks related to the different filtering options (e.g. w or w/o column index). + *

+ * To execute this benchmark a jar file shall be created of this module. Then the jar file can be executed using the JMH + * framework.
+ * The following one-liner (shall be executed in the parquet-benchmarks submodule) generates result statistics in the + * file {@code jmh-result.json}. This json might be visualized by using the tool at + * https://jmh.morethan.io. + * + *

+ * mvn clean package && java -jar target/parquet-benchmarks.jar org.apache.parquet.benchmarks.FilteringBenchmarks -rf json
+ * 
+ */ +@BenchmarkMode(SingleShotTime) +@Fork(1) +@Warmup(iterations = 10, batchSize = 1) +@Measurement(iterations = 50, batchSize = 1) +@OutputTimeUnit(MILLISECONDS) +public class FilteringBenchmarks { + private static final int RECORD_COUNT = 2_000_000; + private static final Logger LOGGER = LoggerFactory.getLogger(FilteringBenchmarks.class); + + /* + * For logging human readable file size + */ + private static class FileSize { + private static final String[] SUFFIXES = { "KiB", "MiB", "GiB", "TiB", "PiB", "EiB" }; + private final Path file; + + FileSize(Path file) { + this.file = file; + } + + @Override + public String toString() { + try { + FileSystem fs = file.getFileSystem(new Configuration()); + long bytes = fs.getFileStatus(file).getLen(); + int exp = (int) (Math.log(bytes) / Math.log(1024)); + if (exp == 0) { + return Long.toString(bytes); + } + String suffix = SUFFIXES[exp - 1]; + return String.format("%d [%.2f%s]", bytes, bytes / Math.pow(1024, exp), suffix); + } catch (IOException e) { + return "N/A"; + } + } + } + + /* + * For generating binary values + */ + private static class StringGenerator { + private static final int MAX_LENGTH = 100; + private static final int MIN_LENGTH = 50; + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ "; + private final Random random = new Random(44); + + String nextString() { + char[] str = new char[MIN_LENGTH + random.nextInt(MAX_LENGTH - MIN_LENGTH)]; + for (int i = 0, n = str.length; i < n; ++i) { + str[i] = ALPHABET.charAt(random.nextInt(ALPHABET.length())); + } + return new String(str); + } + } + + interface ReadConfigurator { + static ReadConfigurator DEFAULT = new ReadConfigurator() { + @Override + public Builder configureBuilder(Builder builder) { + return builder; + } + + @Override + public String toString() { + return "DEFAULT"; + } + }; + + ParquetReader.Builder configureBuilder(ParquetReader.Builder builder); + } + + interface WriteConfigurator { + static WriteConfigurator DEFAULT = new WriteConfigurator() { + @Override + public org.apache.parquet.hadoop.ParquetWriter.Builder configureBuilder( + org.apache.parquet.hadoop.ParquetWriter.Builder builder) { + return builder; + } + + @Override + public String toString() { + return "DEFAULT"; + } + }; + + ParquetWriter.Builder configureBuilder(ParquetWriter.Builder builder); + } + + public static enum ColumnIndexUsage implements ReadConfigurator { + WITHOUT_COLUMN_INDEX { + @Override + public Builder configureBuilder(Builder builder) { + return builder.useColumnIndexFilter(false); + } + }, + WITH_COLUMN_INDEX { + @Override + public Builder configureBuilder(Builder builder) { + return builder.useColumnIndexFilter(true); + } + }; + } + + public static enum ColumnCharacteristic { + SORTED { + @Override + void arrangeData(long[] data) { + Arrays.parallelSort(data); + } + }, + CLUSTERED_9 { + @Override + void arrangeData(long[] data) { + arrangeToBuckets(data, 9); + } + }, + CLUSTERED_5 { + @Override + void arrangeData(long[] data) { + arrangeToBuckets(data, 5); + } + }, + CLUSTERED_3 { + @Override + void arrangeData(long[] data) { + arrangeToBuckets(data, 3); + } + }, + RANDOM { + @Override + void arrangeData(long[] data) { + // Nothing to do + } + }; + abstract void arrangeData(long[] data); + + public static void arrangeToBuckets(long[] data, int bucketCnt) { + long bucketSize = (long) (Long.MAX_VALUE / (bucketCnt / 2.0)); + long bucketBorders[] = new long[bucketCnt - 1]; + for (int i = 0, n = bucketBorders.length; i < n; ++i) { + bucketBorders[i] = Long.MIN_VALUE + (i + 1) * bucketSize; + } + LongList[] buckets = new LongList[bucketCnt]; + for (int i = 0; i < bucketCnt; ++i) { + buckets[i] = new LongArrayList(data.length / bucketCnt); + } + for (int i = 0, n = data.length; i < n; ++i) { + long value = data[i]; + int bucket = Arrays.binarySearch(bucketBorders, value); + if (bucket < 0) { + bucket = -(bucket + 1); + } + buckets[bucket].add(value); + } + int offset = 0; + int mid = bucketCnt / 2; + for (int i = 0; i < bucketCnt; ++i) { + int bucketIndex; + if (i % 2 == 0) { + bucketIndex = mid + i / 2; + } else { + bucketIndex = mid - i / 2 - 1; + } + LongList bucket = buckets[bucketIndex]; + bucket.getElements(0, data, offset, bucket.size()); + offset += bucket.size(); + } + } + } + + public enum PageRowLimit implements WriteConfigurator { + PAGE_ROW_COUNT_1K { + @Override + public ParquetWriter.Builder configureBuilder(ParquetWriter.Builder builder) { + return builder + .withPageSize(Integer.MAX_VALUE) // Ensure that only the row count limit takes into account + .withPageRowCountLimit(1_000); + } + }, + PAGE_ROW_COUNT_10K { + @Override + public ParquetWriter.Builder configureBuilder(ParquetWriter.Builder builder) { + return builder + .withPageSize(Integer.MAX_VALUE) // Ensure that only the row count limit takes into account + .withPageRowCountLimit(10_000); + } + }, + PAGE_ROW_COUNT_50K { + @Override + public ParquetWriter.Builder configureBuilder(ParquetWriter.Builder builder) { + return builder + .withPageSize(Integer.MAX_VALUE) // Ensure that only the row count limit takes into account + .withPageRowCountLimit(50_000); + } + }, + PAGE_ROW_COUNT_100K { + @Override + public ParquetWriter.Builder configureBuilder(ParquetWriter.Builder builder) { + return builder + .withPageSize(Integer.MAX_VALUE) // Ensure that only the row count limit takes into account + .withPageRowCountLimit(100_000); + } + }; + } + + @State(Benchmark) + public static abstract class BaseContext { + private static final MessageType SCHEMA = Types.buildMessage() + .required(INT64).named("int64_col") + .required(BINARY).as(stringType()).named("dummy1_col") + .required(BINARY).as(stringType()).named("dummy2_col") + .required(BINARY).as(stringType()).named("dummy3_col") + .required(BINARY).as(stringType()).named("dummy4_col") + .required(BINARY).as(stringType()).named("dummy5_col") + .named("schema"); + public static LongColumn COLUMN = FilterApi.longColumn("int64_col"); + private Path file; + private Random random; + private StringGenerator dummyGenerator; + @Param + private ColumnCharacteristic characteristic; + + @Setup + public void writeFile() throws IOException { + WriteConfigurator writeConfigurator = getWriteConfigurator(); + file = new Path( + Files.createTempFile("benchmark-filtering_" + characteristic + '_' + writeConfigurator + '_', ".parquet") + .toAbsolutePath().toString()); + long[] data = generateData(); + characteristic.arrangeData(data); + try (ParquetWriter writer = writeConfigurator.configureBuilder(ExampleParquetWriter.builder(file) + .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, SCHEMA.toString()) + .withRowGroupSize(Integer.MAX_VALUE) // Ensure to have one row-group per file only + .withWriteMode(OVERWRITE)) + .build()) { + for (long value : data) { + Group group = new SimpleGroup(SCHEMA); + group.add(0, value); + group.add(1, Binary.fromString(dummyGenerator.nextString())); + group.add(2, Binary.fromString(dummyGenerator.nextString())); + group.add(3, Binary.fromString(dummyGenerator.nextString())); + group.add(4, Binary.fromString(dummyGenerator.nextString())); + group.add(5, Binary.fromString(dummyGenerator.nextString())); + writer.write(group); + } + } + } + + WriteConfigurator getWriteConfigurator() { + return WriteConfigurator.DEFAULT; + } + + ReadConfigurator getReadConfigurator() { + return ReadConfigurator.DEFAULT; + } + + private long[] generateData() { + Random random = new Random(43); + long[] data = new long[RECORD_COUNT]; + for (int i = 0, n = data.length; i < n; ++i) { + data[i] = random.nextLong(); + } + return data; + } + + // Resetting the random so every measurement would use the same sequence + @Setup + public void resetRandom() { + random = new Random(42); + dummyGenerator = new StringGenerator(); + } + + @Setup(Invocation) + public void gc() { + System.gc(); + } + + @TearDown + public void deleteFile() throws IOException { + LOGGER.info("Deleting file {} (size: {})", file, new FileSize(file)); + file.getFileSystem(new Configuration()).delete(file, false); + } + + public ParquetReader.Builder createReaderBuilder() throws IOException { + ReadConfigurator readConfigurator = getReadConfigurator(); + return readConfigurator.configureBuilder( + new ParquetReader.Builder(HadoopInputFile.fromPath(file, new Configuration())) { + @Override + protected ReadSupport getReadSupport() { + return new GroupReadSupport(); + } + }.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, SCHEMA.toString())); + } + + public Random getRandom() { + return random; + } + } + + @State(Benchmark) + public static class WithOrWithoutColumnIndexContext extends BaseContext { + @Param + private ColumnIndexUsage columnIndexUsage; + + @Override + ReadConfigurator getReadConfigurator() { + return columnIndexUsage; + } + } + + @State(Benchmark) + public static class PageSizeContext extends BaseContext { + @Param + private PageRowLimit pageRowLimit; + + @Override + WriteConfigurator getWriteConfigurator() { + return pageRowLimit; + } + + @Override + ReadConfigurator getReadConfigurator() { + return ColumnIndexUsage.WITH_COLUMN_INDEX; + } + } + + @Benchmark + public void benchmarkWithOrWithoutColumnIndex(Blackhole blackhole, WithOrWithoutColumnIndexContext context) + throws Exception { + benchmark(blackhole, context); + } + + @Benchmark + public void benchmarkPageSize(Blackhole blackhole, PageSizeContext context) throws Exception { + benchmark(blackhole, context); + } + + private void benchmark(Blackhole blackhole, BaseContext context) throws Exception { + FilterPredicate filter = FilterApi.eq(BaseContext.COLUMN, context.getRandom().nextLong()); + try (ParquetReader reader = context.createReaderBuilder() + .withFilter(FilterCompat.get(filter)) + .build()) { + blackhole.consume(reader.read()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java new file mode 100644 index 0000000000..324775b9a3 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.openjdk.jmh.annotations.Mode.SingleShotTime; +import static org.openjdk.jmh.annotations.Scope.Benchmark; + +import java.io.IOException; +import java.util.Random; + +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter.Mode; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmark to measure writing nested null values. (See PARQUET-343 for details.) + *

+ * To execute this benchmark a jar file shall be created of this module. Then the jar file can be executed using the JMH + * framework.
+ * The following one-liner (shall be executed in the parquet-benchmarks submodule) generates result statistics in the + * file {@code jmh-result.json}. This json might be visualized by using the tool at + * https://jmh.morethan.io. + * + *

+ * mvn clean package && java -jar target/parquet-benchmarks.jar org.apache.parquet.benchmarks.NestedNullWritingBenchmarks -rf json
+ * 
+ */ +@BenchmarkMode(SingleShotTime) +@Fork(1) +@Warmup(iterations = 10, batchSize = 1) +@Measurement(iterations = 50, batchSize = 1) +@OutputTimeUnit(MILLISECONDS) +@State(Benchmark) +public class NestedNullWritingBenchmarks { + private static final MessageType SCHEMA = Types.buildMessage() + .optionalList() + .optionalElement(INT32) + .named("int_list") + .optionalList() + .optionalListElement() + .optionalElement(BINARY) + .named("dummy_list") + .optionalMap() + .key(BINARY) + .value(BINARY, OPTIONAL) + .named("dummy_map") + .optionalGroup() + .optional(BINARY).named("dummy_group_value1") + .optional(BINARY).named("dummy_group_value2") + .optional(BINARY).named("dummy_group_value3") + .named("dummy_group") + .named("msg"); + private static final int RECORD_COUNT = 10_000_000; + private static final double NULL_RATIO = 0.99; + private static final OutputFile BLACK_HOLE = new OutputFile() { + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return -1L; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return new PositionOutputStream() { + private long pos; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void write(int b) throws IOException { + ++pos; + } + }; + } + }; + + private static class ValueGenerator { + private static final GroupFactory FACTORY = new SimpleGroupFactory(SCHEMA); + private static final Group NULL = FACTORY.newGroup(); + private final Random random = new Random(42); + + public Group nextValue() { + if (random.nextDouble() > NULL_RATIO) { + Group group = FACTORY.newGroup(); + group.addGroup("int_list").addGroup("list").append("element", random.nextInt()); + return group; + } else { + return NULL; + } + } + } + + @Benchmark + public void benchmarkWriting() throws IOException { + ValueGenerator generator = new ValueGenerator(); + try (ParquetWriter writer = ExampleParquetWriter.builder(BLACK_HOLE) + .withWriteMode(Mode.OVERWRITE) + .withType(SCHEMA) + .build()) { + for (int i = 0; i < RECORD_COUNT; ++i) { + writer.write(generator.nextValue()); + } + } + } +} diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java index f385fde732..96ca5a5d49 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java @@ -20,7 +20,6 @@ package org.apache.parquet.cli; import com.beust.jcommander.internal.Lists; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.io.CharStreams; import com.google.common.io.Resources; @@ -52,7 +51,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.security.AccessController; import java.util.Iterator; import java.util.List; @@ -60,9 +59,6 @@ public abstract class BaseCommand implements Command, Configurable { - @VisibleForTesting - static final Charset UTF8 = Charset.forName("utf8"); - private static final String RESOURCE_URI_SCHEME = "resource"; private static final String STDIN_AS_SOURCE = "stdin"; @@ -103,7 +99,7 @@ public void output(String content, Logger console, String filename) } else { FSDataOutputStream outgoing = create(filename); try { - outgoing.write(content.getBytes(UTF8)); + outgoing.write(content.getBytes(StandardCharsets.UTF_8)); } finally { outgoing.close(); } diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index cfdc5553dc..8bfba0257c 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -109,28 +109,6 @@ org.apache.maven.plugins maven-shade-plugin - - - package - - shade - - - true - - - it.unimi.dsi:fastutil - - - - - it.unimi.dsi - org.apache.parquet.it.unimi.dsi - - - - - org.codehaus.mojo diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java index e14c7dcad4..db5320a006 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java @@ -52,7 +52,7 @@ public interface ColumnWriteStore { abstract public long getBufferedSize(); /** - * used for debugging pupose + * used for debugging purpose * @return a formated string representing memory usage per column */ abstract public String memUsageString(); @@ -62,4 +62,14 @@ public interface ColumnWriteStore { */ abstract public void close(); + /** + * Returns whether flushing the possibly cached values (or nulls) to the underlying column writers is necessary, + * because the pages might be closed after the next invocation of {@link #endRecord()}. + * + * @return {@code true} if all the values shall be written to the underlying column writers before calling + * {@link #endRecord()} + */ + default boolean isColumnFlushNeeded() { + return false; + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index b173239332..41e482cfdd 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -48,6 +48,7 @@ public class ParquetProperties { public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; + public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); @@ -85,10 +86,11 @@ public static WriterVersion fromString(String name) { private final ByteBufferAllocator allocator; private final ValuesWriterFactory valuesWriterFactory; private final int columnIndexTruncateLength; + private final int pageRowCountLimit; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, - ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) { + ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); @@ -102,6 +104,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.valuesWriterFactory = writerFactory; this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength; + this.pageRowCountLimit = pageRowCountLimit; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -194,6 +197,10 @@ public boolean estimateNextSizeCheck() { return estimateNextSizeCheck; } + public int getPageRowCountLimit() { + return pageRowCountLimit; + } + public static Builder builder() { return new Builder(); } @@ -213,18 +220,22 @@ public static class Builder { private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; + private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private Builder() { } private Builder(ParquetProperties toCopy) { + this.pageSize = toCopy.pageSizeThreshold; this.enableDict = toCopy.enableDictionary; this.dictPageSize = toCopy.dictionaryPageSizeThreshold; this.writerVersion = toCopy.writerVersion; this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck; this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck; + this.valuesWriterFactory = toCopy.valuesWriterFactory; this.allocator = toCopy.allocator; + this.pageRowCountLimit = toCopy.pageRowCountLimit; } /** @@ -313,11 +324,17 @@ public Builder withColumnIndexTruncateLength(int length) { return this; } + public Builder withPageRowCountLimit(int rowCount) { + Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount); + pageRowCountLimit = rowCount; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength); + estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java index b7e159775f..755985d1ae 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java @@ -85,7 +85,7 @@ public ColumnReader getColumnReader(ColumnDescriptor path) { } } - public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) { + private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) { PrimitiveConverter converter = getPrimitiveConverter(path); return new ColumnReaderImpl(path, pageReader, converter, writerVersion); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index 5cd7d876e4..2670c31dcd 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -67,7 +67,7 @@ private interface ColumnWriterProvider { this.columns = new TreeMap<>(); - this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); + this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit()); columnWriterProvider = new ColumnWriterProvider() { @Override @@ -95,7 +95,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { } this.columns = unmodifiableMap(mcolumns); - this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); + this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit()); columnWriterProvider = new ColumnWriterProvider() { @Override @@ -190,13 +190,17 @@ public void endRecord() { private void sizeCheck() { long minRecordToWait = Long.MAX_VALUE; + int pageRowCountLimit = props.getPageRowCountLimit(); + long rowCountForNextRowCountCheck = rowCount + pageRowCountLimit; for (ColumnWriterBase writer : columns.values()) { long usedMem = writer.getCurrentPageBufferedSize(); long rows = rowCount - writer.getRowsWrittenSoFar(); long remainingMem = props.getPageSizeThreshold() - usedMem; - if (remainingMem <= thresholdTolerance) { + if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) { writer.writePage(); remainingMem = props.getPageSizeThreshold(); + } else { + rowCountForNextRowCountCheck = min(rowCountForNextRowCountCheck, writer.getRowsWrittenSoFar() + pageRowCountLimit); } long rowsToFillPage = usedMem == 0 ? @@ -219,5 +223,15 @@ private void sizeCheck() { } else { rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck(); } + + // Do the check earlier if required to keep the row count limit + if (rowCountForNextRowCountCheck < rowCountForNextSizeCheck) { + rowCountForNextSizeCheck = rowCountForNextRowCountCheck; + } + } + + @Override + public boolean isColumnFlushNeeded() { + return rowCount + 1 >= rowCountForNextSizeCheck; } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index 3788c82e46..8fc7d31ba1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -305,6 +305,9 @@ long getRowsWrittenSoFar() { * Writes the current data to a new page in the page store */ void writePage() { + if (valueCount == 0) { + throw new ParquetEncodingException("writing empty page"); + } this.rowsWrittenSoFar += pageRowCount; if (DEBUG) LOG.debug("write page"); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java index 04076c96ba..e4e8563cb9 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java @@ -20,7 +20,6 @@ import java.io.IOException; -import org.apache.parquet.Ints; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -78,7 +77,7 @@ void writePage(int rowCount, int valueCount, Statistics statistics, ValuesWri Encoding encoding = values.getEncoding(); pageWriter.writePageV2( rowCount, - Ints.checkedCast(statistics.getNumNulls()), + Math.toIntExact(statistics.getNumNulls()), valueCount, repetitionLevels.getBytes(), definitionLevels.getBytes(), diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java index b1f68aefba..f03ffc0349 100755 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java @@ -20,7 +20,6 @@ import java.util.Optional; -import org.apache.parquet.Ints; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.statistics.Statistics; @@ -44,7 +43,7 @@ public class DataPageV1 extends DataPage { * @param valuesEncoding the values encoding for this page */ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) { - super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount); + super(Math.toIntExact(bytes.size()), uncompressedSize, valueCount); this.bytes = bytes; this.statistics = statistics; this.rlEncoding = rlEncoding; @@ -66,7 +65,7 @@ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statis */ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex, int rowCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) { - super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount, firstRowIndex); + super(Math.toIntExact(bytes.size()), uncompressedSize, valueCount, firstRowIndex); this.bytes = bytes; this.statistics = statistics; this.rlEncoding = rlEncoding; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java index a1700aea00..706b699da8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java @@ -20,7 +20,6 @@ import java.util.Optional; -import org.apache.parquet.Ints; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.statistics.Statistics; @@ -47,7 +46,7 @@ public static DataPageV2 uncompressed( rowCount, nullCount, valueCount, repetitionLevels, definitionLevels, dataEncoding, data, - Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), + Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), statistics, false); } @@ -73,7 +72,7 @@ public static DataPageV2 uncompressed( rowCount, nullCount, valueCount, firstRowIndex, repetitionLevels, definitionLevels, dataEncoding, data, - Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), + Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), statistics, false); } @@ -121,7 +120,7 @@ public DataPageV2( int uncompressedSize, Statistics statistics, boolean isCompressed) { - super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount); + super(Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount); this.rowCount = rowCount; this.nullCount = nullCount; this.repetitionLevels = repetitionLevels; @@ -139,7 +138,7 @@ private DataPageV2( int uncompressedSize, Statistics statistics, boolean isCompressed) { - super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, + super(Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount, firstRowIndex); this.rowCount = rowCount; this.nullCount = nullCount; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java index 2401fef400..21e1114f67 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java @@ -22,7 +22,6 @@ import java.io.IOException; -import org.apache.parquet.Ints; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; @@ -53,7 +52,7 @@ public DictionaryPage(BytesInput bytes, int dictionarySize, Encoding encoding) { * @param encoding the encoding used */ public DictionaryPage(BytesInput bytes, int uncompressedSize, int dictionarySize, Encoding encoding) { - super(Ints.checkedCast(bytes.size()), uncompressedSize); + super(Math.toIntExact(bytes.size()), uncompressedSize); this.bytes = checkNotNull(bytes, "bytes"); this.dictionarySize = dictionarySize; this.encoding = checkNotNull(encoding, "encoding"); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java index a51a8c4d82..8edf308227 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java @@ -22,7 +22,6 @@ import java.util.Objects; import org.apache.parquet.bytes.ByteBufferAllocator; -import org.apache.parquet.Ints; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; @@ -68,7 +67,7 @@ public BytesInput getBytes() { try { // prepend the length of the column BytesInput rle = encoder.toBytes(); - return BytesInput.concat(BytesInput.fromInt(Ints.checkedCast(rle.size())), rle); + return BytesInput.concat(BytesInput.fromInt(Math.toIntExact(rle.size())), rle); } catch (IOException e) { throw new ParquetEncodingException(e); } diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java index f1da36302f..8fc4f91957 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java @@ -297,6 +297,13 @@ public void startMessage() { @Override public void endMessage() { writeNullForMissingFieldsAtCurrentLevel(); + + // We need to flush the cached null values before ending the record to ensure that everything is sent to the + // writer before the current page would be closed + if (columns.isColumnFlushNeeded()) { + flush(); + } + columns.endRecord(); if (DEBUG) log("< MESSAGE END >"); if (DEBUG) printState(); diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java index 85c82bddbd..021d171ccb 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java @@ -23,7 +23,6 @@ import java.io.ObjectStreamException; import java.io.OutputStream; import java.io.Serializable; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; @@ -31,12 +30,9 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; -import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.schema.PrimitiveComparator; -import static org.apache.parquet.bytes.BytesUtils.UTF8; - abstract public class Binary implements Comparable, Serializable { protected boolean isBackingBytesReused; @@ -133,11 +129,10 @@ public ByteArraySliceBackedBinary(byte[] value, int offset, int length, boolean @Override public String toStringUsingUTF8() { - return UTF8.decode(ByteBuffer.wrap(value, offset, length)).toString(); - // TODO: figure out why the following line was much slower - // rdb: new String(...) is slower because it instantiates a new Decoder, - // while Charset#decode uses a thread-local decoder cache - // return new String(value, offset, length, BytesUtils.UTF8); + // Charset#decode uses a thread-local decoder cache and is faster than + // new String(...) which instantiates a new Decoder per invocation + return StandardCharsets.UTF_8 + .decode(ByteBuffer.wrap(value, offset, length)).toString(); } @Override @@ -220,11 +215,7 @@ public String toString() { } private static ByteBuffer encodeUTF8(String value) { - try { - return ByteBuffer.wrap(value.getBytes("UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new ParquetEncodingException("UTF-8 not supported.", e); - } + return ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)); } } @@ -284,7 +275,7 @@ public ByteArrayBackedBinary(byte[] value, boolean isBackingBytesReused) { @Override public String toStringUsingUTF8() { - return UTF8.decode(ByteBuffer.wrap(value)).toString(); + return StandardCharsets.UTF_8.decode(ByteBuffer.wrap(value)).toString(); } @Override @@ -393,11 +384,8 @@ public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean public String toStringUsingUTF8() { String ret; if (value.hasArray()) { - try { - ret = new String(value.array(), value.arrayOffset() + offset, length, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new ParquetDecodingException("UTF-8 not supported"); - } + ret = new String(value.array(), value.arrayOffset() + offset, length, + StandardCharsets.UTF_8); } else { int limit = value.limit(); value.limit(offset+length); @@ -406,7 +394,7 @@ public String toStringUsingUTF8() { // no corresponding interface to read a subset of a buffer, would have to slice it // which creates another ByteBuffer object or do what is done here to adjust the // limit/offset and set them back after - ret = UTF8.decode(value).toString(); + ret = StandardCharsets.UTF_8.decode(value).toString(); value.limit(limit); value.position(position); } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java index 64e7062959..52184e1044 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java @@ -400,9 +400,6 @@ List mergeFields(GroupType toMerge, boolean strict) { Type merged; if (toMerge.containsField(type.getName())) { Type fieldToMerge = toMerge.getType(type.getName()); - if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) { - throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type); - } if (type.getLogicalTypeAnnotation() != null && !type.getLogicalTypeAnnotation().equals(fieldToMerge.getLogicalTypeAnnotation())) { throw new IncompatibleSchemaModificationException("cannot merge logical type " + fieldToMerge.getLogicalTypeAnnotation() + " into " + type.getLogicalTypeAnnotation()); } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 9ab53b2b03..b59fdec15d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -750,7 +750,8 @@ protected Type union(Type toMerge, boolean strict) { } } - Types.PrimitiveBuilder builder = Types.primitive(primitive, toMerge.getRepetition()); + Repetition repetition = Repetition.leastRestrictive(this.getRepetition(), toMerge.getRepetition()); + Types.PrimitiveBuilder builder = Types.primitive(primitive, repetition); if (PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == primitive) { builder.length(length); diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java index d046957374..dd13ec112a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java @@ -20,6 +20,7 @@ import static org.apache.parquet.Preconditions.checkNotNull; +import java.util.Arrays; import java.util.List; import org.apache.parquet.io.InvalidRecordException; @@ -111,6 +112,28 @@ public boolean isMoreRestrictiveThan(Repetition other) { */ abstract public boolean isMoreRestrictiveThan(Repetition other); + + /** + * @param repetitions repetitions to traverse + * @return the least restrictive repetition of all repetitions provided + */ + public static Repetition leastRestrictive(Repetition... repetitions) { + boolean hasOptional = false; + + for (Repetition repetition : repetitions) { + if (repetition == REPEATED) { + return REPEATED; + } else if (repetition == OPTIONAL) { + hasOptional = true; + } + } + + if (hasOptional) { + return OPTIONAL; + } + + return REQUIRED; + } } private final String name; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index e5db38c945..f89d0cbf7a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -18,19 +18,28 @@ */ package org.apache.parquet.column.mem; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.impl.ColumnReadStoreImpl; import org.apache.parquet.column.impl.ColumnWriteStoreV1; +import org.apache.parquet.column.impl.ColumnWriteStoreV2; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.page.mem.MemPageStore; import org.apache.parquet.example.DummyRecordConverter; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,6 +175,68 @@ public void testMemColumnSeveralPagesRepeated() throws Exception { } } + @Test + public void testPageSize() { + MessageType schema = Types.buildMessage() + .requiredList().requiredElement(BINARY).named("binary_col") + .requiredList().requiredElement(INT32).named("int32_col") + .named("msg"); + System.out.println(schema); + MemPageStore memPageStore = new MemPageStore(123); + + // Using V2 pages so we have rowCount info + ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder() + .withPageSize(1024) // Less than 10 records for binary_col + .withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing + .withPageRowCountLimit(10) + .withDictionaryEncoding(false) // Enforce having large binary_col pages + .build()); + ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col", "list", "element" }); + ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol); + ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list", "element" }); + ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col); + // Writing 123 records + for (int i = 0; i < 123; ++i) { + // Writing 10 values per record + for (int j = 0; j < 10; ++j) { + binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2); + int32ColWriter.write(42, j == 0 ? 0 : 2, 2); + } + writeStore.endRecord(); + } + writeStore.flush(); + + // Check that all the binary_col pages are <= 1024 bytes + { + PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol); + assertEquals(1230, binaryColPageReader.getTotalValueCount()); + int pageCnt = 0; + int valueCnt = 0; + while (valueCnt < binaryColPageReader.getTotalValueCount()) { + DataPage page = binaryColPageReader.readPage(); + ++pageCnt; + valueCnt += page.getValueCount(); + LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get()); + assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <= 1024); + } + } + + // Check that all the int32_col pages contain <= 10 rows + { + PageReader int32ColPageReader = memPageStore.getPageReader(int32Col); + assertEquals(1230, int32ColPageReader.getTotalValueCount()); + int pageCnt = 0; + int valueCnt = 0; + while (valueCnt < int32ColPageReader.getTotalValueCount()) { + DataPage page = int32ColPageReader.readPage(); + ++pageCnt; + valueCnt += page.getValueCount(); + LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get()); + assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <= 10); + } + } + } + private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) { return new ColumnWriteStoreV1(memPageStore, ParquetProperties.builder() diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java index ba3f9034ad..2783b696d5 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java @@ -27,8 +27,8 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Assert; @@ -627,9 +627,8 @@ private void writeRepeated(int COUNT, ValuesWriter cw, String prefix) { } } - private void writeRepeatedWithReuse(int COUNT, ValuesWriter cw, - String prefix) throws UnsupportedEncodingException { - Binary reused = Binary.fromReusedByteArray((prefix + "0").getBytes("UTF-8")); + private void writeRepeatedWithReuse(int COUNT, ValuesWriter cw, String prefix) { + Binary reused = Binary.fromReusedByteArray((prefix + "0").getBytes(StandardCharsets.UTF_8)); for (int i = 0; i < COUNT; i++) { Binary content = Binary.fromString(prefix + i % 10); System.arraycopy(content.getBytesUnsafe(), 0, reused.getBytesUnsafe(), 0, reused.length()); diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java index e511d4252f..ee7663c31a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java @@ -20,9 +20,6 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.apache.parquet.schema.OriginalType.LIST; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; @@ -30,6 +27,9 @@ import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; import static org.apache.parquet.schema.Type.Repetition.REPEATED; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.junit.Test; @@ -89,12 +89,11 @@ public void testMergeSchema() { MessageType t4 = new MessageType("root2", new PrimitiveType(REQUIRED, BINARY, "a")); - try { - t3.union(t4); - fail("moving from optional to required"); - } catch (IncompatibleSchemaModificationException e) { - assertEquals("repetition constraint is more restrictive: can not merge type required binary a into optional binary a", e.getMessage()); - } + assertEquals( + t3.union(t4), + new MessageType("root1", + new PrimitiveType(OPTIONAL, BINARY, "a")) + ); assertEquals( t4.union(t3), diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestRepetitionType.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestRepetitionType.java new file mode 100644 index 0000000000..524b03c66c --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestRepetitionType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.schema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestRepetitionType { + @Test + public void testLeastRestrictiveRepetition() { + Type.Repetition REQUIRED = Type.Repetition.REQUIRED; + Type.Repetition OPTIONAL = Type.Repetition.OPTIONAL; + Type.Repetition REPEATED = Type.Repetition.REPEATED; + + assertEquals(REPEATED, Type.Repetition.leastRestrictive(REQUIRED, OPTIONAL, REPEATED, REQUIRED, OPTIONAL, REPEATED)); + assertEquals(OPTIONAL, Type.Repetition.leastRestrictive(REQUIRED, OPTIONAL, REQUIRED, OPTIONAL)); + assertEquals(REQUIRED, Type.Repetition.leastRestrictive(REQUIRED, REQUIRED)); + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/Closeables.java b/parquet-common/src/main/java/org/apache/parquet/Closeables.java index 086f6cc777..2f312aa58f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/Closeables.java +++ b/parquet-common/src/main/java/org/apache/parquet/Closeables.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,7 +26,9 @@ /** * Utility for working with {@link java.io.Closeable}ss + * @deprecated will be removed in 2.0.0. Use Java try-with-resource instead. */ +@Deprecated public final class Closeables { private Closeables() { } diff --git a/parquet-common/src/main/java/org/apache/parquet/Ints.java b/parquet-common/src/main/java/org/apache/parquet/Ints.java index 6137236d9b..4a362f804f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/Ints.java +++ b/parquet-common/src/main/java/org/apache/parquet/Ints.java @@ -21,6 +21,7 @@ /** * Utilities for working with ints */ +@Deprecated public final class Ints { private Ints() { } @@ -31,6 +32,7 @@ private Ints() { } * @param value a long to be casted to an int * @return an int that is == to value * @throws IllegalArgumentException if value can't be casted to an int + * @deprecated replaced by {@link java.lang.Math#toIntExact(long)} */ public static int checkedCast(long value) { int valueI = (int) value; diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java index 2657c7e96e..2c8162cd1c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,8 @@ public class BytesUtils { private static final Logger LOG = LoggerFactory.getLogger(BytesUtils.class); + /** @deprecated Use {@link StandardCharsets#UTF_8} instead */ + @Deprecated public static final Charset UTF8 = Charset.forName("UTF-8"); /** diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml index 60ecfefb2f..5ec79aeeea 100644 --- a/parquet-format-structures/pom.xml +++ b/parquet-format-structures/pom.xml @@ -68,9 +68,9 @@ - org.apache.thrift.tools - maven-thrift-plugin - 0.1.11 + org.apache.thrift + thrift-maven-plugin + ${thrift-maven-plugin.version} ${parquet.thrift.path} ${format.thrift.executable} @@ -158,6 +158,11 @@ libthrift ${format.thrift.version} + + javax.annotation + javax.annotation-api + 1.3.2 + diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 32c162c536..bca4992f21 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -121,6 +121,10 @@ org.apache.maven.plugins maven-jar-plugin + + org.apache.maven.plugins + maven-shade-plugin + diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index ecd104327f..52e1458fb6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -189,7 +189,10 @@ public > Boolean visit(NotEq notEq) { try { Set dictSet = expandDictionary(meta); - if (dictSet != null && dictSet.size() == 1 && dictSet.contains(value)) { + boolean mayContainNull = (meta.getStatistics() == null + || !meta.getStatistics().isNumNullsSet() + || meta.getStatistics().getNumNulls() > 0); + if (dictSet != null && dictSet.size() == 1 && dictSet.contains(value) && !mayContainNull) { return BLOCK_CANNOT_MATCH; } } catch (IOException e) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 0dc71e0743..0ca9fe3fe4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -18,8 +18,6 @@ */ package org.apache.parquet.hadoop; -import static org.apache.parquet.Ints.checkedCast; - import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; @@ -27,7 +25,6 @@ import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; -import org.apache.parquet.Ints; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.DataPage; @@ -118,7 +115,7 @@ public DataPage visit(DataPageV1 dataPageV1) { dataPageV1.getValueCount(), dataPageV1.getUncompressedSize(), firstRowIndex, - checkedCast(offsetIndex.getLastRowIndex(currentPageIndex, rowCount) - firstRowIndex + 1), + Math.toIntExact(offsetIndex.getLastRowIndex(currentPageIndex, rowCount) - firstRowIndex + 1), dataPageV1.getStatistics(), dataPageV1.getRlEncoding(), dataPageV1.getDlEncoding(), @@ -148,7 +145,7 @@ public DataPage visit(DataPageV2 dataPageV2) { } } try { - int uncompressedSize = Ints.checkedCast( + int uncompressedSize = Math.toIntExact( dataPageV2.getUncompressedSize() - dataPageV2.getDefinitionLevels().size() - dataPageV2.getRepetitionLevels().size()); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index f87630bf24..f85d374e8d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -292,9 +292,4 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException { } } - void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException { - ColumnChunkPageWriter pageWriter = writers.get(path); - pageWriter.writeToFileWriter(writer); - } - } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 2ed7d0ee78..ab7d7152fa 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -67,7 +66,6 @@ import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; -import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.filter2.compat.FilterCompat; @@ -1408,7 +1406,27 @@ public void addChunk(ChunkDescriptor descriptor) { * @throws IOException if there is an error while reading from the stream */ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException { - List buffers = readBlocks(f, offset, length); + List result = new ArrayList(chunks.size()); + f.seek(offset); + + int fullAllocations = length / options.getMaxAllocationSize(); + int lastAllocationSize = length % options.getMaxAllocationSize(); + + int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); + List buffers = new ArrayList<>(numAllocations); + + for (int i = 0; i < fullAllocations; i += 1) { + buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); + } + + if (lastAllocationSize > 0) { + buffers.add(options.getAllocator().allocate(lastAllocationSize)); + } + + for (ByteBuffer buffer : buffers) { + f.readFully(buffer); + buffer.flip(); + } // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); @@ -1428,72 +1446,4 @@ public long endPos() { } - /** - * @param f file to read the blocks from - * @return the ByteBuffer blocks - * @throws IOException if there is an error while reading from the stream - */ - List readBlocks(SeekableInputStream f, long offset, int length) throws IOException { - f.seek(offset); - - int fullAllocations = length / options.getMaxAllocationSize(); - int lastAllocationSize = length % options.getMaxAllocationSize(); - - int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); - List buffers = new ArrayList<>(numAllocations); - - for (int i = 0; i < fullAllocations; i++) { - buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); - } - - if (lastAllocationSize > 0) { - buffers.add(options.getAllocator().allocate(lastAllocationSize)); - } - - for (ByteBuffer buffer : buffers) { - f.readFully(buffer); - buffer.flip(); - } - return buffers; - } - - Optional readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor) { - BlockMetaData block = blocks.get(blockIndex); - if (block.getRowCount() == 0) { - throw new RuntimeException("Illegal row group of 0 rows"); - } - Optional mc = findColumnByPath(block, columnDescriptor.getPath()); - - return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(), (int) column.getTotalSize())) - .map(chunk -> readChunk(f, chunk)); - } - - private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor) { - try { - List buffers = readBlocks(f, descriptor.fileOffset, descriptor.size); - ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers); - Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f, null); - return chunk.readAllPages(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private Optional findColumnByPath(BlockMetaData block, String[] path) { - for (ColumnChunkMetaData column : block.getColumns()) { - if (Arrays.equals(column.getPath().toArray(), path)) { - return Optional.of(column); - } - } - return Optional.empty(); - } - - public int blocksCount() { - return blocks.size(); - } - - public BlockMetaData getBlockMetaData(int blockIndex) { - return blocks.get(blockIndex); - } - } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 20efe47573..c875702f54 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -24,17 +24,14 @@ import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; import java.io.IOException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -45,23 +42,14 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.Strings; import org.apache.parquet.Version; -import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; -import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.ColumnReader; -import org.apache.parquet.column.ColumnWriteStore; -import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.impl.ColumnReadStoreImpl; -import org.apache.parquet.column.impl.ColumnWriteStoreV1; import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.example.DummyRecordConverter; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.Util; @@ -72,7 +60,6 @@ import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.BlocksCombiner; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.internal.column.columnindex.ColumnIndex; @@ -101,7 +88,7 @@ public class ParquetFileWriter { public static final String PARQUET_METADATA_FILE = "_metadata"; public static final String MAGIC_STR = "PAR1"; - public static final byte[] MAGIC = MAGIC_STR.getBytes(Charset.forName("ASCII")); + public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII); public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; public static final int CURRENT_VERSION = 1; @@ -664,116 +651,8 @@ public void appendFile(Configuration conf, Path file) throws IOException { } public void appendFile(InputFile file) throws IOException { - ParquetFileReader.open(file).appendTo(this); - } - - public int merge(List inputFiles, CodecFactory.BytesCompressor compressor, String createdBy, long maxBlockSize) throws IOException { - List readers = getReaders(inputFiles); - try { - ByteBufferAllocator allocator = new HeapByteBufferAllocator(); - ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(), schema, createdBy); - this.start(); - List largeBlocks = BlocksCombiner.combineLargeBlocks(readers, maxBlockSize); - for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) { - for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) { - ColumnDescriptor path = schema.getColumns().get(columnIndex); - ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema, allocator, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); - ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(schema, store, ParquetProperties.builder().build()); - for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) { - ParquetFileReader parquetFileReader = smallBlock.getReader(); - try { - Optional columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(), path); - ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path); - if (columnChunkPageReader.isPresent()) { - ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get()); - for (int i = 0; i < columnReader.getTotalValueCount(); i++) { - consumeTriplet(columnWriteStoreV1, columnWriter, columnReader); - } - } else { - MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema(); - String[] parentPath = getExisingParentPath(path, inputFileSchema); - int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath); - int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath); - for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) { - columnWriter.writeNull(rep, def); - if (def == 0) { - // V1 pages also respect record boundaries so we have to mark them - columnWriteStoreV1.endRecord(); - } - } - } - } catch (Exception e) { - LOG.error("File {} is not readable", parquetFileReader.getFile(), e); - } - } - if (columnIndex == 0) { - this.startBlock(smallBlocks.getRowCount()); - } - columnWriteStoreV1.flush(); - store.flushToFileWriter(path, this); - } - this.endBlock(); - } - this.end(Collections.emptyMap()); - }finally { - BlocksCombiner.closeReaders(readers); - } - return 0; - } - - private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema) { - List parentPath = Arrays.asList(path.getPath()); - while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new String[parentPath.size()]))) { - parentPath = parentPath.subList(0, parentPath.size() - 1); - } - return parentPath.toArray(new String[parentPath.size()]); - } - - private List getReaders(List inputFiles) throws IOException { - List readers = new ArrayList<>(inputFiles.size()); - for (InputFile inputFile : inputFiles) { - readers.add(ParquetFileReader.open(inputFile)); - } - return readers; - } - - private void consumeTriplet(ColumnWriteStore columnWriteStore, ColumnWriter columnWriter, ColumnReader columnReader) { - int definitionLevel = columnReader.getCurrentDefinitionLevel(); - int repetitionLevel = columnReader.getCurrentRepetitionLevel(); - ColumnDescriptor column = columnReader.getDescriptor(); - PrimitiveType type = column.getPrimitiveType(); - if (definitionLevel < column.getMaxDefinitionLevel()) { - columnWriter.writeNull(repetitionLevel, definitionLevel); - } else { - switch (type.getPrimitiveTypeName()) { - case INT32: - columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel); - break; - case INT64: - columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel); - break; - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - case INT96: - columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel); - break; - case BOOLEAN: - columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel); - break; - case FLOAT: - columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel); - break; - case DOUBLE: - columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel); - break; - default: - throw new IllegalArgumentException("Unknown primitive type " + type); - } - } - columnReader.consume(); - if (repetitionLevel == 0) { - // V1 pages also respect record boundaries so we have to mark them - columnWriteStore.endRecord(); + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + reader.appendTo(this); } } @@ -848,7 +727,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, // copy the data for all chunks long start = -1; long length = 0; - long blockCompressedSize = 0; + long blockUncompressedSize = 0L; for (int i = 0; i < columnsInOrder.size(); i += 1) { ColumnChunkMetaData chunk = columnsInOrder.get(i); @@ -889,10 +768,10 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, chunk.getTotalSize(), chunk.getTotalUncompressedSize())); - blockCompressedSize += chunk.getTotalSize(); + blockUncompressedSize += chunk.getTotalUncompressedSize(); } - currentBlock.setTotalByteSize(blockCompressedSize); + currentBlock.setTotalByteSize(blockUncompressedSize); endBlock(); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 0789bf50d4..04cbd15c0b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -144,6 +144,7 @@ public static enum JobSummaryLevel { public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; + public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -325,6 +326,18 @@ private static int getColumnIndexTruncateLength(Configuration conf) { return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); } + public static void setPageRowCountLimit(JobContext jobContext, int rowCount) { + setPageRowCountLimit(getConfiguration(jobContext), rowCount); + } + + public static void setPageRowCountLimit(Configuration conf, int rowCount) { + conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount); + } + + private static int getPageRowCountLimit(Configuration conf) { + return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT); + } + private WriteSupport writeSupport; private ParquetOutputCommitter committer; @@ -380,6 +393,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)) + .withPageRowCountLimit(getPageRowCountLimit(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -398,6 +412,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength()); + LOG.info("Page row count limit to {}", props.getPageRowCountLimit()); } WriteContext init = writeSupport.init(conf); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 5b0e4f82d1..1ed5e32ca7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -425,6 +425,17 @@ public SELF withPageSize(int pageSize) { return self(); } + /** + * Sets the Parquet format page row count limit used by the constructed writer. + * + * @param rowCount limit for the number of rows stored in a page + * @return this builder for method chaining + */ + public SELF withPageRowCountLimit(int rowCount) { + encodingPropsBuilder.withPageRowCountLimit(rowCount); + return self(); + } + /** * Set the Parquet format dictionary page size used by the constructed * writer. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java new file mode 100644 index 0000000000..8bf24b2ca5 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.codec; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Helper class which use reflections to clean up DirectBuffer. It's implemented for + * better compatibility with both java8 and java9+, because the Cleaner class is moved to + * another place since java9+. + */ +public class CleanUtil { + private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class); + private static final Field CLEANER_FIELD; + private static final Method CLEAN_METHOD; + + static { + ByteBuffer buf = null; + Field cleanerField = null; + Method cleanMethod = null; + try { + buf = ByteBuffer.allocateDirect(1); + cleanerField = buf.getClass().getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + Object cleaner = cleanerField.get(buf); + cleanMethod = cleaner.getClass().getDeclaredMethod("clean"); + } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) { + logger.warn("Initialization failed for cleanerField or cleanMethod", e); + } finally { + clean(buf); + } + CLEANER_FIELD = cleanerField; + CLEAN_METHOD = cleanMethod; + } + + public static void clean(ByteBuffer buffer) { + if (CLEANER_FIELD == null || CLEAN_METHOD == null) { + return; + } + try { + Object cleaner = CLEANER_FIELD.get(buffer); + CLEAN_METHOD.invoke(cleaner); + } catch (IllegalAccessException | InvocationTargetException | NullPointerException e) { + // Ignore clean failure + logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index d0270ca7c1..4720c08445 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -66,7 +66,9 @@ public synchronized int compress(byte[] buffer, int off, int len) throws IOExcep // There is uncompressed input, compress it now int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position()); if (maxOutputSize > outputBuffer.capacity()) { + ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(maxOutputSize); + CleanUtil.clean(oldBuffer); } // Reset the previous outputBuffer outputBuffer.clear(); @@ -97,7 +99,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) { ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len); inputBuffer.rewind(); tmp.put(inputBuffer); + ByteBuffer oldBuffer = inputBuffer; inputBuffer = tmp; + CleanUtil.clean(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -109,7 +113,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) { @Override public void end() { - // No-op + CleanUtil.clean(inputBuffer); + CleanUtil.clean(outputBuffer); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 190f8d5318..c3da63f9c4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -34,7 +34,7 @@ public class SnappyDecompressor implements Decompressor { private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); private boolean finished; - + /** * Fills specified buffer with uncompressed data. Returns actual number * of bytes of uncompressed data. A return value of 0 indicates that @@ -61,7 +61,9 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc // There is compressed input, decompress it now. int decompressedSize = Snappy.uncompressedLength(inputBuffer); if (decompressedSize > outputBuffer.capacity()) { + ByteBuffer oldBuffer = outputBuffer; outputBuffer = ByteBuffer.allocateDirect(decompressedSize); + CleanUtil.clean(oldBuffer); } // Reset the previous outputBuffer (i.e. set position to 0) @@ -102,7 +104,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) { ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len); inputBuffer.rewind(); newBuffer.put(inputBuffer); - inputBuffer = newBuffer; + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = newBuffer; + CleanUtil.clean(oldBuffer); } else { inputBuffer.limit(inputBuffer.position() + len); } @@ -111,7 +115,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) { @Override public void end() { - // No-op + CleanUtil.clean(inputBuffer); + CleanUtil.clean(outputBuffer); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java index 88879c206a..12a67d301d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java @@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.HashMap; @@ -47,6 +48,17 @@ public static Builder builder(Path file) { return new Builder(file); } + /** + * Creates a Builder for configuring ParquetWriter with the example object + * model. THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE. + * + * @param file the output file to create + * @return a {@link Builder} to create a {@link ParquetWriter} + */ + public static Builder builder(OutputFile file) { + return new Builder(file); + } + /** * Create a new {@link ExampleParquetWriter}. * @@ -78,6 +90,10 @@ private Builder(Path file) { super(file); } + private Builder(OutputFile file) { + super(file); + } + public Builder withType(MessageType type) { this.type = type; return this; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java deleted file mode 100644 index 02dadc7f54..0000000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.hadoop.util; - -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static java.util.Collections.unmodifiableList; - -public class BlocksCombiner { - - private static final Logger LOG = LoggerFactory.getLogger(BlocksCombiner.class); - - public static List combineLargeBlocks(List readers, long maxBlockSize) { - List blocks = new ArrayList<>(); - long largeBlockSize = 0; - long largeBlockRecords = 0; - List smallBlocks = new ArrayList<>(); - for (ParquetFileReader reader : readers) { - for (int blockIndex = 0; blockIndex < reader.blocksCount(); blockIndex++) { - BlockMetaData block = reader.getBlockMetaData(blockIndex); - if (!smallBlocks.isEmpty() && largeBlockSize + block.getTotalByteSize() > maxBlockSize) { - blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords)); - smallBlocks = new ArrayList<>(); - largeBlockSize = 0; - largeBlockRecords = 0; - } - largeBlockSize += block.getTotalByteSize(); - largeBlockRecords += block.getRowCount(); - smallBlocks.add(new SmallBlock(reader, blockIndex)); - } - } - if (!smallBlocks.isEmpty()) { - blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords)); - } - return unmodifiableList(blocks); - } - - public static void closeReaders(List readers) { - readers.forEach(r -> { - try { - r.close(); - } catch (IOException e) { - LOG.error("Error closing reader {}", r.getFile(), e); - } - }); - } - - public static class SmallBlocksUnion { - private final List blocks; - private final long rowCount; - - public SmallBlocksUnion(List blocks, long rowCount) { - this.blocks = blocks; - this.rowCount = rowCount; - } - - public List getBlocks() { - return blocks; - } - - public long getRowCount() { - return rowCount; - } - } - - public static class SmallBlock { - private final ParquetFileReader reader; - private final int blockIndex; - - public SmallBlock(ParquetFileReader reader, int blockIndex) { - this.reader = reader; - this.blockIndex = blockIndex; - } - - public ParquetFileReader getReader() { - return reader; - } - - public int getBlockIndex() { - return blockIndex; - } - } -} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java index 1817bb279a..d5de3417b9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java @@ -21,6 +21,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +/** + * A {@link PathFilter} that filters out hidden files. A file is considered to + * be hidden, and should not be considered for processing, when the file name + * starts with a period ('.') or an underscore ('_'). + */ public class HiddenFileFilter implements PathFilter { public static final HiddenFileFilter INSTANCE = new HiddenFileFilter(); @@ -28,6 +33,7 @@ private HiddenFileFilter() {} @Override public boolean accept(Path p) { - return !p.getName().startsWith("_") && !p.getName().startsWith("."); + final char c = p.getName().charAt(0); + return c != '.' && c != '_'; } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java index 529115b3f5..06d5fea55d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,13 +23,13 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,22 +53,13 @@ private SerializationUtil() { } * @throws IOException if there is an error while writing */ public static void writeObjectToConfAsBase64(String key, Object obj, Configuration conf) throws IOException { - ByteArrayOutputStream baos = null; - GZIPOutputStream gos = null; - ObjectOutputStream oos = null; - - try { - baos = new ByteArrayOutputStream(); - gos = new GZIPOutputStream(baos); - oos = new ObjectOutputStream(gos); - oos.writeObject(obj); - } finally { - Closeables.close(oos); - Closeables.close(gos); - Closeables.close(baos); + try(ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try(GZIPOutputStream gos = new GZIPOutputStream(baos); + ObjectOutputStream oos = new ObjectOutputStream(gos)) { + oos.writeObject(obj); + } + conf.set(key, new String(Base64.encodeBase64(baos.toByteArray()), StandardCharsets.UTF_8)); } - - conf.set(key, new String(Base64.encodeBase64(baos.toByteArray()), "UTF-8")); } /** @@ -88,25 +79,16 @@ public static T readObjectFromConfAsBase64(String key, Configuration conf) t return null; } - byte[] bytes = Base64.decodeBase64(b64.getBytes("UTF-8")); - - ByteArrayInputStream bais = null; - GZIPInputStream gis = null; - ObjectInputStream ois = null; + byte[] bytes = Base64.decodeBase64(b64.getBytes(StandardCharsets.UTF_8)); - try { - bais = new ByteArrayInputStream(bytes); - gis = new GZIPInputStream(bais); - ois = new ObjectInputStream(gis); + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + GZIPInputStream gis = new GZIPInputStream(bais); + ObjectInputStream ois = new ObjectInputStream(gis)) { return (T) ois.readObject(); } catch (ClassNotFoundException e) { throw new IOException("Could not read object from config with key " + key, e); } catch (ClassCastException e) { throw new IOException("Couldn't cast object read from config with key " + key, e); - } finally { - Closeables.close(ois); - Closeables.close(gis); - Closeables.close(bais); } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 241ac5844f..537265770b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -90,6 +90,7 @@ public class DictionaryFilterTest { "message test { " + "required binary binary_field; " + "required binary single_value_field; " + + "optional binary optional_single_value_field; " + "required fixed_len_byte_array(17) fixed_field (DECIMAL(40,4)); " + "required int32 int32_field; " + "required int64 int64_field; " @@ -167,6 +168,11 @@ private static void writeData(SimpleGroupFactory f, ParquetWriter writer) ALPHABET.substring(index, index+1) : UUID.randomUUID().toString()) .append("int96_field", INT96_VALUES[i % INT96_VALUES.length]); + // 10% of the time, leave the field null + if (index % 10 > 0) { + group.append("optional_single_value_field", "sharp"); + } + writer.write(group); } writer.close(); @@ -258,7 +264,7 @@ public void testDictionaryEncodedColumns() throws Exception { @SuppressWarnings("deprecation") private void testDictionaryEncodedColumnsV1() throws Exception { Set dictionaryEncodedColumns = new HashSet(Arrays.asList( - "binary_field", "single_value_field", "int32_field", "int64_field", + "binary_field", "single_value_field", "optional_single_value_field", "int32_field", "int64_field", "double_field", "float_field", "int96_field")); for (ColumnChunkMetaData column : ccmd) { String name = column.getPath().toDotString(); @@ -283,7 +289,7 @@ private void testDictionaryEncodedColumnsV1() throws Exception { private void testDictionaryEncodedColumnsV2() throws Exception { Set dictionaryEncodedColumns = new HashSet(Arrays.asList( - "binary_field", "single_value_field", "fixed_field", "int32_field", + "binary_field", "single_value_field", "optional_single_value_field", "fixed_field", "int32_field", "int64_field", "double_field", "float_field", "int96_field")); for (ColumnChunkMetaData column : ccmd) { EncodingStats encStats = column.getEncodingStats(); @@ -357,6 +363,7 @@ public void testEqInt96() throws Exception { @Test public void testNotEqBinary() throws Exception { BinaryColumn sharp = binaryColumn("single_value_field"); + BinaryColumn sharpAndNull = binaryColumn("optional_single_value_field"); BinaryColumn b = binaryColumn("binary_field"); assertTrue("Should drop block with only the excluded value", @@ -365,6 +372,12 @@ public void testNotEqBinary() throws Exception { assertFalse("Should not drop block with any other value", canDrop(notEq(sharp, Binary.fromString("applause")), ccmd, dictionaries)); + assertFalse("Should not drop block with only the excluded value and null", + canDrop(notEq(sharpAndNull, Binary.fromString("sharp")), ccmd, dictionaries)); + + assertFalse("Should not drop block with any other value", + canDrop(notEq(sharpAndNull, Binary.fromString("applause")), ccmd, dictionaries)); + assertFalse("Should not drop block with a known value", canDrop(notEq(b, Binary.fromString("x")), ccmd, dictionaries)); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 6fc3c72f84..25c9608563 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -21,6 +21,7 @@ import static java.util.Arrays.asList; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; import static org.apache.parquet.column.Encoding.PLAIN; @@ -32,20 +33,24 @@ import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.example.ExampleInputFormat; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.junit.Assert; import org.junit.Rule; @@ -54,6 +59,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -166,4 +172,39 @@ public Void call() throws IOException { Assert.assertFalse("Should not create a file when schema is rejected", file.exists()); } + + // Testing the issue of PARQUET-1531 where writing null nested rows leads to empty pages if the page row count limit + // is reached. + @Test + public void testNullValuesWithPageRowLimit() throws IOException { + MessageType schema = Types.buildMessage().optionalList().optionalElement(BINARY).as(stringType()).named("str_list") + .named("msg"); + final int recordCount = 100; + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + Group listNull = factory.newGroup(); + + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(10) + .withConf(conf) + .build()) { + for (int i = 0; i < recordCount; ++i) { + writer.write(listNull); + } + } + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + int readRecordCount = 0; + for (Group group = reader.read(); group != null; group = reader.read()) { + assertEquals(listNull.toString(), group.toString()); + ++readRecordCount; + } + assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java deleted file mode 100644 index a972238cbe..0000000000 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.hadoop; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.Preconditions; -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; -import org.apache.parquet.hadoop.example.ExampleParquetWriter; -import org.apache.parquet.hadoop.example.GroupReadSupport; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.hadoop.metadata.FileMetaData; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.io.InputFile; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Types; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; - -import static java.util.Arrays.asList; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; -import static org.apache.parquet.schema.OriginalType.UTF8; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; - -public class TestParquetWriterMergeBlocks { - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - public static final int FILE_SIZE = 10000; - public static final Configuration CONF = new Configuration(); - public static final Map EMPTY_METADATA = - new HashMap(); - public static final MessageType FILE_SCHEMA = Types.buildMessage() - .required(INT32).named("id") - .required(BINARY).as(UTF8).named("string") - .named("AppendTest"); - public static final SimpleGroupFactory GROUP_FACTORY = - new SimpleGroupFactory(FILE_SCHEMA); - - public Path file1; - public List file1content = new ArrayList(); - public Path file2; - public List file2content = new ArrayList(); - - @Before - public void createSourceData() throws IOException { - this.file1 = newTemp(); - this.file2 = newTemp(); - - ParquetWriter writer1 = ExampleParquetWriter.builder(file1) - .withType(FILE_SCHEMA) - .build(); - ParquetWriter writer2 = ExampleParquetWriter.builder(file2) - .withType(FILE_SCHEMA) - .build(); - - for (int i = 0; i < FILE_SIZE; i += 1) { - Group group1 = GROUP_FACTORY.newGroup(); - group1.add("id", i); - group1.add("string", UUID.randomUUID().toString()); - writer1.write(group1); - file1content.add(group1); - - Group group2 = GROUP_FACTORY.newGroup(); - group2.add("id", FILE_SIZE+i); - group2.add("string", UUID.randomUUID().toString()); - writer2.write(group2); - file2content.add(group2); - } - - writer1.close(); - writer2.close(); - } - - @Test - public void testBasicBehavior() throws IOException { - Path combinedFile = newTemp(); - ParquetFileWriter writer = new ParquetFileWriter( - CONF, FILE_SCHEMA, combinedFile); - - // Merge schema and extraMeta - List inputFiles = asList(file1, file2); - FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData(); - List inputFileList = toInputFiles(inputFiles); - CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(CompressionCodecName.SNAPPY); - - writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024); - - LinkedList expected = new LinkedList<>(); - expected.addAll(file1content); - expected.addAll(file2content); - - ParquetReader reader = ParquetReader - .builder(new GroupReadSupport(), combinedFile) - .build(); - - Group next; - while ((next = reader.read()) != null) { - Group expectedNext = expected.removeFirst(); - // check each value; equals is not supported for simple records - Assert.assertEquals("Each id should match", - expectedNext.getInteger("id", 0), next.getInteger("id", 0)); - Assert.assertEquals("Each string should match", - expectedNext.getString("string", 0), next.getString("string", 0)); - } - - Assert.assertEquals("All records should be present", 0, expected.size()); - } - - private List toInputFiles(List inputFiles) { - return inputFiles.stream() - .map(input -> { - try { - return HadoopInputFile.fromPath(input, CONF); - } catch (Exception e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - } - - @Test - public void testMergedMetadata() throws IOException { - Path combinedFile = newTemp(); - ParquetFileWriter writer = new ParquetFileWriter( - CONF, FILE_SCHEMA, combinedFile); - - // Merge schema and extraMeta - List inputFiles = asList(file1, file2); - FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData(); - List inputFileList = toInputFiles(inputFiles); - CompressionCodecName codecName = CompressionCodecName.GZIP; - CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName); - writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024); - - ParquetMetadata combinedFooter = ParquetFileReader.readFooter( - CONF, combinedFile, NO_FILTER); - ParquetMetadata f1Footer = ParquetFileReader.readFooter( - CONF, file1, NO_FILTER); - ParquetMetadata f2Footer = ParquetFileReader.readFooter( - CONF, file2, NO_FILTER); - - LinkedList expectedRowGroups = new LinkedList<>(); - expectedRowGroups.addAll(f1Footer.getBlocks()); - expectedRowGroups.addAll(f2Footer.getBlocks()); - long totalRowCount = expectedRowGroups.stream().mapToLong(BlockMetaData::getRowCount).sum(); - Assert.assertEquals("Combined should have a single row group", - 1, - combinedFooter.getBlocks().size()); - - BlockMetaData rowGroup = combinedFooter.getBlocks().get(0); - Assert.assertEquals("Row count should match", - totalRowCount, rowGroup.getRowCount()); - assertColumnsEquivalent(f1Footer.getBlocks().get(0).getColumns(), rowGroup.getColumns(), codecName); - } - - public void assertColumnsEquivalent(List expected, - List actual, - CompressionCodecName codecName) { - Assert.assertEquals("Should have the expected columns", - expected.size(), actual.size()); - for (int i = 0; i < actual.size(); i += 1) { - long numNulls = 0; - long valueCount = 0; - ColumnChunkMetaData current = actual.get(i); - Statistics statistics = current.getStatistics(); - numNulls += statistics.getNumNulls(); - valueCount += current.getValueCount(); - if (i != 0) { - ColumnChunkMetaData previous = actual.get(i - 1); - long expectedStart = previous.getStartingPos() + previous.getTotalSize(); - Assert.assertEquals("Should start after the previous column", - expectedStart, current.getStartingPos()); - } - - assertColumnMetadataEquivalent(expected.get(i), current, codecName, numNulls, valueCount); - } - } - - public void assertColumnMetadataEquivalent(ColumnChunkMetaData expected, - ColumnChunkMetaData actual, - CompressionCodecName codecName, - long numNulls, - long valueCount) { - Assert.assertEquals("Should be the expected column", - expected.getPath(), expected.getPath()); - Assert.assertEquals("Primitive type should not change", - expected.getType(), actual.getType()); - Assert.assertEquals("Compression codec should not change", - codecName, actual.getCodec()); - Assert.assertEquals("Data encodings should not change", - expected.getEncodings(), actual.getEncodings()); - Assert.assertEquals("Statistics should not change", - numNulls, actual.getStatistics().getNumNulls()); - Assert.assertEquals("Number of values should not change", - valueCount, actual.getValueCount()); - - } - - @Test - public void testAllowDroppingColumns() throws IOException { - MessageType droppedColumnSchema = Types.buildMessage() - .required(BINARY).as(UTF8).named("string") - .named("AppendTest"); - - Path droppedColumnFile = newTemp(); - List inputFiles = asList(file1, file2); - ParquetFileWriter writer = new ParquetFileWriter( - CONF, droppedColumnSchema, droppedColumnFile); - List inputFileList = toInputFiles(inputFiles); - CompressionCodecName codecName = CompressionCodecName.GZIP; - CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName); - writer.merge(inputFileList, compressor, "", 128*1024*1024); - - LinkedList expected = new LinkedList(); - expected.addAll(file1content); - expected.addAll(file2content); - - ParquetMetadata footer = ParquetFileReader.readFooter( - CONF, droppedColumnFile, NO_FILTER); - for (BlockMetaData rowGroup : footer.getBlocks()) { - Assert.assertEquals("Should have only the string column", - 1, rowGroup.getColumns().size()); - } - - ParquetReader reader = ParquetReader - .builder(new GroupReadSupport(), droppedColumnFile) - .build(); - - Group next; - while ((next = reader.read()) != null) { - Group expectedNext = expected.removeFirst(); - Assert.assertEquals("Each string should match", - expectedNext.getString("string", 0), next.getString("string", 0)); - } - - Assert.assertEquals("All records should be present", 0, expected.size()); - } - - private Path newTemp() throws IOException { - File file = temp.newFile(); - Preconditions.checkArgument(file.delete(), "Could not remove temp file"); - return new Path(file.toString()); - } -} diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java index 6d5b31380f..fe64587560 100644 --- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java @@ -19,29 +19,20 @@ package org.apache.parquet.tools.command; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HiddenFileFilter; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.FileMetaData; -import org.apache.parquet.io.InputFile; import org.apache.parquet.tools.Main; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; public class MergeCommand extends ArgsOnlyCommand { public static final String[] USAGE = new String[] { @@ -58,43 +49,12 @@ public class MergeCommand extends ArgsOnlyCommand { private Configuration conf; - private static final Options OPTIONS; - static { - OPTIONS = new Options(); - - Option block = Option.builder("b") - .longOpt("block") - .desc("Merge adjacent blocks into one up to upper bound size limit default to 128 MB") - .build(); - - Option limit = Option.builder("l") - .longOpt("limit") - .desc("Upper bound for merged block size in megabytes. Default: 128 MB") - .hasArg() - .build(); - - Option codec = Option.builder("c") - .longOpt("codec") - .desc("Compression codec name. Default: SNAPPY. Valid values: UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD") - .hasArg() - .build(); - - OPTIONS.addOption(limit); - OPTIONS.addOption(block); - OPTIONS.addOption(codec); - } - public MergeCommand() { super(2, MAX_FILE_NUM + 1); conf = new Configuration(); } - @Override - public Options getOptions() { - return OPTIONS; - } - @Override public String[] getUsageDescription() { return USAGE; @@ -103,32 +63,18 @@ public String[] getUsageDescription() { @Override public String getCommandDescription() { return "Merges multiple Parquet files into one. " + - "Without -b option the command doesn't merge row groups, just places one after the other. " + + "The command doesn't merge row groups, just places one after the other. " + "When used to merge many small files, the resulting file will still contain small row groups, " + - "which usually leads to bad query performance. " + - "To have adjacent small blocks merged together use -b option. " + - "Blocks will be grouped into larger one until the upper bound is reached. " + - "Default block upper bound 128 MB and default compression SNAPPY can be customized using -l and -c options"; + "which usually leads to bad query performance."; } @Override public void execute(CommandLine options) throws Exception { - boolean mergeBlocks = options.hasOption('b'); - int maxBlockSize = options.hasOption('l')? Integer.parseInt(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE; - CompressionCodecName compressionCodec = options.hasOption('c') ? CompressionCodecName.valueOf(options.getOptionValue('c')) : CompressionCodecName.SNAPPY; // Prepare arguments List args = options.getArgList(); List inputFiles = getInputFiles(args.subList(0, args.size() - 1)); Path outputFile = new Path(args.get(args.size() - 1)); - if (mergeBlocks) { - CodecFactory.BytesCompressor compressor = new CodecFactory(conf, DEFAULT_PAGE_SIZE).getCompressor(compressionCodec); - mergeBlocks(maxBlockSize, compressor, inputFiles, outputFile); - } else { - mergeFiles(inputFiles, outputFile); - } - } - private void mergeFiles(List inputFiles, Path outputFile) throws IOException { // Merge schema and extraMeta FileMetaData mergedMeta = mergedMetadata(inputFiles); PrintWriter out = new PrintWriter(Main.out, true); @@ -157,23 +103,6 @@ private void mergeFiles(List inputFiles, Path outputFile) throws IOExcepti writer.end(mergedMeta.getKeyValueMetaData()); } - private void mergeBlocks(int maxBlockSize, CodecFactory.BytesCompressor compressor, List inputFiles, Path outputFile) throws IOException { - // Merge schema and extraMeta - FileMetaData mergedMeta = mergedMetadata(inputFiles); - - // Merge data - ParquetFileWriter writer = new ParquetFileWriter(conf, mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE); - List inputFileList = inputFiles.stream() - .map(input -> { - try { - return HadoopInputFile.fromPath(input, conf); - } catch (Exception e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), maxBlockSize); - } - private FileMetaData mergedMetadata(List inputFiles) throws IOException { return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf).getFileMetaData(); } diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java b/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java index 7a1c81d6f8..4e6129043e 100644 --- a/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java @@ -71,6 +71,7 @@ public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotat } }).orElse(new SimplePrimitiveConverter(field.getName())); } + return new SimplePrimitiveConverter(field.getName()); } GroupType groupType = field.asGroupType(); diff --git a/parquet-tools/src/test/java/org/apache/parquet/tools/read/TestSimpleRecordConverter.java b/parquet-tools/src/test/java/org/apache/parquet/tools/read/TestSimpleRecordConverter.java new file mode 100644 index 0000000000..69a339cdc3 --- /dev/null +++ b/parquet-tools/src/test/java/org/apache/parquet/tools/read/TestSimpleRecordConverter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.tools.read; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class TestSimpleRecordConverter { + + private static final String INT32_FIELD = "int32_field"; + private static final String INT64_FIELD = "int64_field"; + private static final String FLOAT_FIELD = "float_field"; + private static final String DOUBLE_FIELD = "double_field"; + private static final String BINARY_FIELD = "binary_field"; + private static final String FIXED_LEN_BYTE_ARRAY_FIELD = "flba_field"; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConverter() throws IOException { + try ( + ParquetReader reader = + ParquetReader.builder(new SimpleReadSupport(), new Path(testFile().getAbsolutePath())).build()) { + for (SimpleRecord record = reader.read(); record != null; record = reader.read()) { + for (SimpleRecord.NameValue value : record.getValues()) { + switch(value.getName()) { + case INT32_FIELD: + Assert.assertEquals(32, value.getValue()); + break; + case INT64_FIELD: + Assert.assertEquals(64L, value.getValue()); + break; + case FLOAT_FIELD: + Assert.assertEquals(1.0f, value.getValue()); + break; + case DOUBLE_FIELD: + Assert.assertEquals(2.0d, value.getValue()); + break; + case BINARY_FIELD: + Assert.assertArrayEquals("foobar".getBytes(), (byte[])value.getValue()); + break; + case FIXED_LEN_BYTE_ARRAY_FIELD: + Assert.assertArrayEquals(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }, (byte[])value.getValue()); + break; + } + } + } + } + } + + @Before + public void setUp() throws IOException { + createTestParquetFile(); + } + + private static MessageType createSchema() { + return new MessageType("schema", + new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, INT32_FIELD), + new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, INT64_FIELD), + new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.FLOAT, FLOAT_FIELD), + new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.DOUBLE, DOUBLE_FIELD), + new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, BINARY_FIELD), + new PrimitiveType(Type.Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 12, FIXED_LEN_BYTE_ARRAY_FIELD) + ); + } + + private void createTestParquetFile() throws IOException { + Path fsPath = new Path(testFile().getPath()); + Configuration conf = new Configuration(); + + MessageType schema = createSchema(); + SimpleGroupFactory fact = new SimpleGroupFactory(schema); + GroupWriteSupport.setSchema(schema, conf); + + try ( + ParquetWriter writer = new ParquetWriter<>( + fsPath, + new GroupWriteSupport(), + CompressionCodecName.UNCOMPRESSED, + 1024, + 1024, + 512, + true, + false, + ParquetProperties.WriterVersion.PARQUET_2_0, + conf)) { + writer.write(fact.newGroup() + .append(INT32_FIELD, 32) + .append(INT64_FIELD, 64L) + .append(FLOAT_FIELD, 1.0f) + .append(DOUBLE_FIELD, 2.0d) + .append(BINARY_FIELD, Binary.fromString("foobar")) + .append(FIXED_LEN_BYTE_ARRAY_FIELD, + Binary.fromConstantByteArray(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }))); + } + } + + private File testFile() { + return new File(this.tempFolder.getRoot(), getClass().getSimpleName() + ".parquet"); + } +} diff --git a/pom.xml b/pom.xml index 904d7daf8a..9b8a5db9ec 100644 --- a/pom.xml +++ b/pom.xml @@ -117,8 +117,8 @@ 0.16.0 h2 0.10.0 - 0.9.3 - 0.9.3 + 0.12.0 + 0.12.0 7.0.13 0.9.33 1.7.22 @@ -129,7 +129,7 @@ 2.3 - 2.3.1 + 2.9.8 1.35 1.10