diff --git a/parquet-benchmarks/run_checksums.sh b/parquet-benchmarks/run_checksums.sh new file mode 100755 index 0000000000..e798488157 --- /dev/null +++ b/parquet-benchmarks/run_checksums.sh @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# !/usr/bin/env bash + +SCRIPT_PATH=$( cd "$(dirname "$0")" ; pwd -P ) + +echo "Page level CRC checksum benchmarks" +echo "Running write benchmarks" +java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumWriteBenchmarks -bm ss "$@" +echo "Running read benchmarks" +java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumReadBenchmarks -bm ss "$@" diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java index d9ef4fd32b..f039403bfc 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java @@ -37,4 +37,26 @@ public class BenchmarkFiles { // public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO"); public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY"); public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP"); + + // Page checksum files + public static final Path file_100K_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-UNCOMPRESSED"); + public static final Path file_100K_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-UNCOMPRESSED"); + public static final Path file_1M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-UNCOMPRESSED"); + public static final Path file_1M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-UNCOMPRESSED"); + public static final Path file_10M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-UNCOMPRESSED"); + public static final Path file_10M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-UNCOMPRESSED"); + + public static final Path file_100K_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-GZIP"); + public static final Path file_100K_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-GZIP"); + public static final Path file_1M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-GZIP"); + public static final Path file_1M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-GZIP"); + public static final Path file_10M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-GZIP"); + public static final Path file_10M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-GZIP"); + + public static final Path file_100K_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-SNAPPY"); + public static final Path file_100K_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-SNAPPY"); + public static final Path file_1M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-SNAPPY"); + public static final Path file_1M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-SNAPPY"); + public static final Path file_10M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-SNAPPY"); + public static final Path file_10M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-SNAPPY"); } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java new file mode 100644 index 0000000000..6c62cc6e6d --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; + +import static java.util.UUID.randomUUID; +import static org.apache.parquet.benchmarks.BenchmarkConstants.*; +import static org.apache.parquet.benchmarks.BenchmarkFiles.*; + +import java.io.IOException; +import java.util.Random; + +import static org.apache.parquet.benchmarks.BenchmarkUtils.deleteIfExists; +import static org.apache.parquet.benchmarks.BenchmarkUtils.exists; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*; + +public class PageChecksumDataGenerator { + + private final MessageType SCHEMA = MessageTypeParser.parseMessageType( + "message m {" + + " required int64 long_field;" + + " required binary binary_field;" + + " required group group {" + + " repeated int32 int_field;" + + " }" + + "}"); + + public void generateData(Path outFile, int nRows, boolean writeChecksums, + CompressionCodecName compression) throws IOException { + if (exists(configuration, outFile)) { + System.out.println("File already exists " + outFile); + return; + } + + ParquetWriter writer = ExampleParquetWriter.builder(outFile) + .withConf(configuration) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(compression) + .withDictionaryEncoding(true) + .withType(SCHEMA) + .withPageWriteChecksumEnabled(writeChecksums) + .build(); + + GroupFactory groupFactory = new SimpleGroupFactory(SCHEMA); + Random rand = new Random(42); + for (int i = 0; i < nRows; i++) { + Group group = groupFactory.newGroup(); + group + .append("long_field", (long) i) + .append("binary_field", randomUUID().toString()) + .addGroup("group") + // Force dictionary encoding by performing modulo + .append("int_field", rand.nextInt() % 100) + .append("int_field", rand.nextInt() % 100) + .append("int_field", rand.nextInt() % 100) + .append("int_field", rand.nextInt() % 100); + writer.write(group); + } + + writer.close(); + } + + public void generateAll() { + try { + // No need to generate the non-checksum versions, as the files generated here are only used in + // the read benchmarks + generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED); + generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP); + generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY); + generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED); + generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP); + generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY); + generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED); + generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP); + generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void cleanup() { + deleteIfExists(configuration, file_100K_NOCHECKSUMS_UNCOMPRESSED); + deleteIfExists(configuration, file_100K_CHECKSUMS_UNCOMPRESSED); + deleteIfExists(configuration, file_100K_NOCHECKSUMS_GZIP); + deleteIfExists(configuration, file_100K_CHECKSUMS_GZIP); + deleteIfExists(configuration, file_100K_NOCHECKSUMS_SNAPPY); + deleteIfExists(configuration, file_100K_CHECKSUMS_SNAPPY); + deleteIfExists(configuration, file_1M_NOCHECKSUMS_UNCOMPRESSED); + deleteIfExists(configuration, file_1M_CHECKSUMS_UNCOMPRESSED); + deleteIfExists(configuration, file_1M_NOCHECKSUMS_GZIP); + deleteIfExists(configuration, file_1M_CHECKSUMS_GZIP); + deleteIfExists(configuration, file_1M_NOCHECKSUMS_SNAPPY); + deleteIfExists(configuration, file_1M_CHECKSUMS_SNAPPY); + deleteIfExists(configuration, file_10M_NOCHECKSUMS_UNCOMPRESSED); + deleteIfExists(configuration, file_10M_CHECKSUMS_UNCOMPRESSED); + deleteIfExists(configuration, file_10M_NOCHECKSUMS_GZIP); + deleteIfExists(configuration, file_10M_CHECKSUMS_GZIP); + deleteIfExists(configuration, file_10M_NOCHECKSUMS_SNAPPY); + deleteIfExists(configuration, file_10M_CHECKSUMS_SNAPPY); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java new file mode 100644 index 0000000000..db23eeb672 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_K; +import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_MILLION; +import static org.apache.parquet.benchmarks.BenchmarkFiles.configuration; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY; + +import java.io.IOException; + +@State(Scope.Thread) +public class PageChecksumReadBenchmarks { + + private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator(); + + @Setup(Level.Trial) + public void setup() { + pageChecksumDataGenerator.generateAll(); + } + + @Setup(Level.Trial) + public void cleanup() { + pageChecksumDataGenerator.cleanup(); + } + + private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole) + throws IOException { + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withConf(configuration) + .usePageChecksumVerification(verifyChecksums) + .build()) { + for (int i = 0; i < nRows; i++) { + Group group = reader.read(); + blackhole.consume(group.getLong("long_field", 0)); + blackhole.consume(group.getBinary("binary_field", 0)); + Group subgroup = group.getGroup("group", 0); + blackhole.consume(subgroup.getInteger("int_field", 0)); + blackhole.consume(subgroup.getInteger("int_field", 1)); + blackhole.consume(subgroup.getInteger("int_field", 2)); + blackhole.consume(subgroup.getInteger("int_field", 3)); + } + } + } + + // 100k rows, uncompressed, GZIP, Snappy + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException { + readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException { + readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException { + readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole); + } + + // 1M rows, uncompressed, GZIP, Snappy + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException { + readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException { + readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException { + readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole); + } + + // 10M rows, uncompressed, GZIP, Snappy + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole); + } + +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java new file mode 100644 index 0000000000..c743dde01e --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_K; +import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_MILLION; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_UNCOMPRESSED; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_SNAPPY; + +import java.io.IOException; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*; + +@State(Scope.Thread) +public class PageChecksumWriteBenchmarks { + + private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator(); + + @Setup(Level.Iteration) + public void cleanup() { + pageChecksumDataGenerator.cleanup(); + } + + // 100k rows, uncompressed, GZIP, Snappy + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write100KRowsUncompressedWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, UNCOMPRESSED); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write100KRowsUncompressedWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write100KRowsGzipWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_GZIP, 100 * ONE_K, false, GZIP); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write100KRowsGzipWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write100KRowsSnappyWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_SNAPPY, 100 * ONE_K, false, SNAPPY); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write100KRowsSnappyWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY); + } + + // 1M rows, uncompressed, GZIP, Snappy + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write1MRowsUncompressedWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, UNCOMPRESSED); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write1MRowsUncompressedWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write1MRowsGzipWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_GZIP, ONE_MILLION, false, GZIP); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write1MRowsGzipWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write1MRowsSnappyWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_SNAPPY, ONE_MILLION, false, SNAPPY); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write1MRowsSnappyWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY); + } + + // 10M rows, uncompressed, GZIP, Snappy + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsUncompressedWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, UNCOMPRESSED); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsUncompressedWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsGzipWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_GZIP, 10 * ONE_MILLION, false, GZIP); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsGzipWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsSnappyWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, SNAPPY); + } + + @Benchmark @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsSnappyWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY); + } + +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 41e482cfdd..7492b547c8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -50,6 +50,8 @@ public class ParquetProperties { public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; + public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; + public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); private static final int MIN_SLAB_SIZE = 64; @@ -87,10 +89,12 @@ public static WriterVersion fromString(String name) { private final ValuesWriterFactory valuesWriterFactory; private final int columnIndexTruncateLength; private final int pageRowCountLimit; + private final boolean pageWriteChecksumEnabled; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, - ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) { + ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit, + boolean pageWriteChecksumEnabled) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); @@ -105,6 +109,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.valuesWriterFactory = writerFactory; this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength; this.pageRowCountLimit = pageRowCountLimit; + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -201,6 +206,10 @@ public int getPageRowCountLimit() { return pageRowCountLimit; } + public boolean getPageWriteChecksumEnabled() { + return pageWriteChecksumEnabled; + } + public static Builder builder() { return new Builder(); } @@ -221,6 +230,7 @@ public static class Builder { private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; + private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private Builder() { } @@ -236,6 +246,7 @@ private Builder(ParquetProperties toCopy) { this.valuesWriterFactory = toCopy.valuesWriterFactory; this.allocator = toCopy.allocator; this.pageRowCountLimit = toCopy.pageRowCountLimit; + this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; } /** @@ -330,11 +341,17 @@ public Builder withPageRowCountLimit(int rowCount) { return this; } + public Builder withPageWriteChecksumEnabled(boolean val) { + this.pageWriteChecksumEnabled = val; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit); + estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, + pageRowCountLimit, pageWriteChecksumEnabled); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java index 606f9f7421..0489449e91 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.column.page; +import java.util.OptionalInt; + /** * one page in a chunk */ @@ -43,4 +45,18 @@ public int getUncompressedSize() { return uncompressedSize; } + // Note: the following field is only used for testing purposes and are NOT used in checksum + // verification. There crc value here will merely be a copy of the actual crc field read in + // ParquetFileReader.Chunk.readAllPages() + private OptionalInt crc = OptionalInt.empty(); + + // Visible for testing + public void setCrc(int crc) { + this.crc = OptionalInt.of(crc); + } + + // Visible for testing + public OptionalInt getCrc() { + return crc; + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 4f5c78adb2..13ab80b01e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -30,9 +30,10 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; +import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; public class HadoopReadOptions extends ParquetReadOptions { @@ -45,6 +46,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, boolean useDictionaryFilter, boolean useRecordFilter, boolean useColumnIndexFilter, + boolean usePageChecksumVerification, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -54,7 +56,8 @@ private HadoopReadOptions(boolean useSignedStringMinMax, Configuration conf) { super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, - recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties + usePageChecksumVerification, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, + properties ); this.conf = conf; } @@ -86,6 +89,8 @@ public Builder(Configuration conf) { useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true)); useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true)); + usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, + usePageChecksumVerification)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); @@ -98,9 +103,9 @@ public Builder(Configuration conf) { @Override public ParquetReadOptions build() { return new HadoopReadOptions( - useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, - recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, - conf); + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, + useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter, + codecFactory, allocator, maxAllocationSize, properties, conf); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 846d3bd809..f0590236c5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -40,12 +40,14 @@ public class ParquetReadOptions { private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true; private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB + private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private final boolean useSignedStringMinMax; private final boolean useStatsFilter; private final boolean useDictionaryFilter; private final boolean useRecordFilter; private final boolean useColumnIndexFilter; + private final boolean usePageChecksumVerification; private final FilterCompat.Filter recordFilter; private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; @@ -58,6 +60,7 @@ public class ParquetReadOptions { boolean useDictionaryFilter, boolean useRecordFilter, boolean useColumnIndexFilter, + boolean usePageChecksumVerification, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -69,6 +72,7 @@ public class ParquetReadOptions { this.useDictionaryFilter = useDictionaryFilter; this.useRecordFilter = useRecordFilter; this.useColumnIndexFilter = useColumnIndexFilter; + this.usePageChecksumVerification = usePageChecksumVerification; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -97,6 +101,10 @@ public boolean useColumnIndexFilter() { return useColumnIndexFilter; } + public boolean usePageChecksumVerification() { + return usePageChecksumVerification; + } + public FilterCompat.Filter getRecordFilter() { return recordFilter; } @@ -143,6 +151,7 @@ public static class Builder { protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT; + protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT; protected FilterCompat.Filter recordFilter = null; protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors @@ -200,6 +209,16 @@ public Builder useColumnIndexFilter() { return useColumnIndexFilter(true); } + + public Builder usePageChecksumVerification(boolean usePageChecksumVerification) { + this.usePageChecksumVerification = usePageChecksumVerification; + return this; + } + + public Builder usePageChecksumVerification() { + return usePageChecksumVerification(true); + } + public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) { this.recordFilter = rowGroupFilter; return this; @@ -235,6 +254,11 @@ public Builder withMaxAllocationInBytes(int allocationSizeInBytes) { return this; } + public Builder withPageChecksumVerification(boolean val) { + this.usePageChecksumVerification = val; + return this; + } + public Builder set(String key, String value) { properties.put(key, value); return this; @@ -249,6 +273,7 @@ public Builder copy(ParquetReadOptions options) { withMetadataFilter(options.metadataFilter); withCodecFactory(options.codecFactory); withAllocator(options.allocator); + withPageChecksumVerification(options.usePageChecksumVerification); for (Map.Entry keyValue : options.properties.entrySet()) { set(keyValue.getKey(), keyValue.getValue()); } @@ -257,8 +282,9 @@ public Builder copy(ParquetReadOptions options) { public ParquetReadOptions build() { return new ParquetReadOptions( - useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, - recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties); + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, + useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter, + codecFactory, allocator, maxAllocationSize, properties); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index fb0ca7b09a..deeda658d7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1364,14 +1364,30 @@ public void writeDataPageHeader( to); } + private PageHeader newDataPageHeader( + int uncompressedSize, int compressedSize, + int valueCount, + org.apache.parquet.column.Encoding rlEncoding, + org.apache.parquet.column.Encoding dlEncoding, + org.apache.parquet.column.Encoding valuesEncoding) { + PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize); + pageHeader.setData_page_header(new DataPageHeader( + valueCount, + getEncoding(valuesEncoding), + getEncoding(dlEncoding), + getEncoding(rlEncoding))); + return pageHeader; + } + private PageHeader newDataPageHeader( int uncompressedSize, int compressedSize, int valueCount, org.apache.parquet.column.Encoding rlEncoding, org.apache.parquet.column.Encoding dlEncoding, - org.apache.parquet.column.Encoding valuesEncoding) { + org.apache.parquet.column.Encoding valuesEncoding, + int crc) { PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize); - // TODO: pageHeader.crc = ...; + pageHeader.setCrc(crc); pageHeader.setData_page_header(new DataPageHeader( valueCount, getEncoding(valuesEncoding), @@ -1397,6 +1413,22 @@ public void writeDataPageV2Header( rlByteLength, dlByteLength), to); } + public void writeDataPageV1Header( + int uncompressedSize, + int compressedSize, + int valueCount, + org.apache.parquet.column.Encoding rlEncoding, + org.apache.parquet.column.Encoding dlEncoding, + org.apache.parquet.column.Encoding valuesEncoding, + OutputStream to) throws IOException { + writePageHeader(newDataPageHeader(uncompressedSize, + compressedSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding), to); + } + public void writeDataPageV1Header( int uncompressedSize, int compressedSize, @@ -1404,13 +1436,15 @@ public void writeDataPageV1Header( org.apache.parquet.column.Encoding rlEncoding, org.apache.parquet.column.Encoding dlEncoding, org.apache.parquet.column.Encoding valuesEncoding, + int crc, OutputStream to) throws IOException { writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, rlEncoding, dlEncoding, - valuesEncoding), to); + valuesEncoding, + crc), to); } public void writeDataPageV2Header( @@ -1442,10 +1476,19 @@ private PageHeader newDataPageV2Header( return pageHeader; } + public void writeDictionaryPageHeader( + int uncompressedSize, int compressedSize, int valueCount, + org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException { + PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize); + pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding))); + writePageHeader(pageHeader, to); + } + public void writeDictionaryPageHeader( int uncompressedSize, int compressedSize, int valueCount, - org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException { + org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to) throws IOException { PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize); + pageHeader.setCrc(crc); pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding))); writePageHeader(pageHeader, to); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 0ca9fe3fe4..2e646e7290 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -99,8 +99,9 @@ public DataPage readPage() { public DataPage visit(DataPageV1 dataPageV1) { try { BytesInput decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()); + final DataPageV1 decompressedPage; if (offsetIndex == null) { - return new DataPageV1( + decompressedPage = new DataPageV1( decompressed, dataPageV1.getValueCount(), dataPageV1.getUncompressedSize(), @@ -110,7 +111,7 @@ public DataPage visit(DataPageV1 dataPageV1) { dataPageV1.getValueEncoding()); } else { long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex); - return new DataPageV1( + decompressedPage = new DataPageV1( decompressed, dataPageV1.getValueCount(), dataPageV1.getUncompressedSize(), @@ -121,6 +122,10 @@ public DataPage visit(DataPageV1 dataPageV1) { dataPageV1.getDlEncoding(), dataPageV1.getValueEncoding()); } + if (dataPageV1.getCrc().isPresent()) { + decompressedPage.setCrc(dataPageV1.getCrc().getAsInt()); + } + return decompressedPage; } catch (IOException e) { throw new ParquetDecodingException("could not decompress page", e); } @@ -185,10 +190,14 @@ public DictionaryPage readDictionaryPage() { return null; } try { - return new DictionaryPage( - decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()), - compressedDictionaryPage.getDictionarySize(), - compressedDictionaryPage.getEncoding()); + DictionaryPage decompressedPage = new DictionaryPage( + decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()), + compressedDictionaryPage.getDictionarySize(), + compressedDictionaryPage.getEncoding()); + if (compressedDictionaryPage.getCrc().isPresent()) { + decompressedPage.setCrc(compressedDictionaryPage.getCrc().getAsInt()); + } + return decompressedPage; } catch (IOException e) { throw new ParquetDecodingException("Could not decompress dictionary page", e); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index f85d374e8d..72f26fc115 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -26,11 +26,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.zip.CRC32; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.ConcatenatingByteArrayCollector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; @@ -74,16 +76,22 @@ private static final class ColumnChunkPageWriter implements PageWriter { private Statistics totalStatistics; private final ByteBufferAllocator allocator; + private final CRC32 crc; + boolean pageWriteChecksumEnabled; + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, ByteBufferAllocator allocator, - int columnIndexTruncateLength) { + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled) { this.path = path; this.compressor = compressor; this.allocator = allocator; this.buf = new ConcatenatingByteArrayCollector(); this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; } @Override @@ -119,14 +127,28 @@ public void writePage(BytesInput bytes, + compressedSize); } tempOutputStream.reset(); - parquetMetadataConverter.writeDataPageV1Header( + if (pageWriteChecksumEnabled) { + crc.reset(); + crc.update(compressedBytes.toByteArray()); + parquetMetadataConverter.writeDataPageV1Header( (int)uncompressedSize, (int)compressedSize, valueCount, rlEncoding, dlEncoding, valuesEncoding, + (int) crc.getValue(), tempOutputStream); + } else { + parquetMetadataConverter.writeDataPageV1Header( + (int)uncompressedSize, + (int)compressedSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + tempOutputStream); + } this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; @@ -273,10 +295,16 @@ public String memUsageString(String prefix) { private final MessageType schema; public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator, - int columnIndexTruncateLength) { + int columnIndexTruncateLength) { + this(compressor, schema, allocator, columnIndexTruncateLength, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); + } + + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator, + int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) { this.schema = schema; for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled)); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index d8af379d13..c3da3239b2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -102,7 +102,7 @@ public ParquetMetadata getFooter() { private void initStore() { pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(), - props.getColumnIndexTruncateLength()); + props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled()); columnStore = props.newColumnWriteStore(schema, pageStore); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 8e205f6487..4acd4c4668 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -50,6 +50,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -113,6 +114,8 @@ public class ParquetFileReader implements Closeable { private final ParquetMetadataConverter converter; + private final CRC32 crc; + /** * for files provided, check if there's a summary file. * If a summary file is found it is used otherwise the file footer is used. @@ -659,6 +662,7 @@ public ParquetFileReader( for (ColumnDescriptor col : columns) { paths.put(ColumnPath.get(col.getPath()), col); } + this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } /** @@ -695,6 +699,7 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } + this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { @@ -717,6 +722,7 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } + this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } private static List listWithNulls(int size) { @@ -1163,6 +1169,18 @@ protected PageHeader readPageHeader() throws IOException { return Util.readPageHeader(stream); } + /** + * Calculate checksum of input bytes, throw decoding exception if it does not match the provided + * reference crc + */ + private void verifyCrc(int referenceCrc, byte[] bytes, String exceptionMsg) { + crc.reset(); + crc.update(bytes); + if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) { + throw new ParquetDecodingException(exceptionMsg); + } + } + /** * Read all of the pages in a given column chunk. * @return the list of pages @@ -1178,36 +1196,54 @@ public ColumnChunkPageReader readAllPages() throws IOException { PageHeader pageHeader = readPageHeader(); int uncompressedPageSize = pageHeader.getUncompressed_page_size(); int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; switch (pageHeader.type) { case DICTIONARY_PAGE: // there is only one dictionary page per column chunk if (dictionaryPage != null) { throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col); } + pageBytes = this.readAsBytesInput(compressedPageSize); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify dictionary page integrity, CRC checksum verification failed"); + } DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); dictionaryPage = new DictionaryPage( - this.readAsBytesInput(compressedPageSize), + pageBytes, uncompressedPageSize, dicHeader.getNum_values(), converter.getEncoding(dicHeader.getEncoding()) ); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + dictionaryPage.setCrc(pageHeader.getCrc()); + } break; case DATA_PAGE: DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); - pagesInChunk.add( - new DataPageV1( - this.readAsBytesInput(compressedPageSize), - dataHeaderV1.getNum_values(), - uncompressedPageSize, - converter.fromParquetStatistics( - getFileMetaData().getCreatedBy(), - dataHeaderV1.getStatistics(), - type), - converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), - converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), - converter.getEncoding(dataHeaderV1.getEncoding()) - )); + pageBytes = this.readAsBytesInput(compressedPageSize); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(), + "could not verify page integrity, CRC checksum verification failed"); + } + DataPageV1 dataPageV1 = new DataPageV1( + pageBytes, + dataHeaderV1.getNum_values(), + uncompressedPageSize, + converter.fromParquetStatistics( + getFileMetaData().getCreatedBy(), + dataHeaderV1.getStatistics(), + type), + converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), + converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), + converter.getEncoding(dataHeaderV1.getEncoding())); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + dataPageV1.setCrc(pageHeader.getCrc()); + } + pagesInChunk.add(dataPageV1); valuesCountReadSoFar += dataHeaderV1.getNum_values(); ++dataPageCountReadSoFar; break; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index c875702f54..50cd31e457 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -141,6 +142,9 @@ public static enum Mode { // set when end is called private ParquetMetadata footer = null; + private final CRC32 crc; + private boolean pageWriteChecksumEnabled; + /** * Captures the order in which methods should be called */ @@ -200,7 +204,7 @@ private final STATE error() throws IOException { */ @Deprecated public ParquetFileWriter(Configuration configuration, MessageType schema, - Path file) throws IOException { + Path file) throws IOException { this(HadoopOutputFile.fromPath(file, configuration), schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT); } @@ -253,7 +257,8 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, long rowGroupSize, int maxPaddingSize) throws IOException { this(file, schema, mode, rowGroupSize, maxPaddingSize, - ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); + ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } /** * @param file OutputFile to create or overwrite @@ -262,10 +267,12 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, * @param rowGroupSize the row group size * @param maxPaddingSize the maximum padding * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to + * @param pageWriteChecksumEnabled whether to write out page level checksums * @throws IOException if the file can not be created */ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, - long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength) + long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled) throws IOException { TypeUtil.checkValidWriteSchema(schema); @@ -287,6 +294,8 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, this.encodingStatsBuilder = new EncodingStats.Builder(); this.columnIndexTruncateLength = columnIndexTruncateLength; + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; } /** @@ -311,6 +320,8 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, this.encodingStatsBuilder = new EncodingStats.Builder(); // no truncation is needed for testing this.columnIndexTruncateLength = Integer.MAX_VALUE; + this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration); + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; } /** * start the file @@ -380,12 +391,24 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio currentChunkDictionaryPageOffset = out.getPos(); int uncompressedSize = dictionaryPage.getUncompressedSize(); int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts - metadataConverter.writeDictionaryPageHeader( + if (pageWriteChecksumEnabled) { + crc.reset(); + crc.update(dictionaryPage.getBytes().toByteArray()); + metadataConverter.writeDictionaryPageHeader( uncompressedSize, compressedPageSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding(), + (int) crc.getValue(), out); + } else { + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out); + } long headerSize = out.getPos() - currentChunkDictionaryPageOffset; this.uncompressedLength += uncompressedSize + headerSize; this.compressedLength += compressedPageSize + headerSize; @@ -505,13 +528,26 @@ private void innerWriteDataPage( } LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); int compressedPageSize = (int) bytes.size(); - metadataConverter.writeDataPageV1Header( + if (pageWriteChecksumEnabled) { + crc.reset(); + crc.update(bytes.toByteArray()); + metadataConverter.writeDataPageV1Header( uncompressedPageSize, compressedPageSize, valueCount, rlEncoding, dlEncoding, valuesEncoding, + (int) crc.getValue(), out); + } else { + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out); + } long headerSize = out.getPos() - beforeHeader; this.uncompressedLength += uncompressedPageSize + headerSize; this.compressedLength += compressedPageSize + headerSize; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index b8fce2f65d..7eab611b61 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -134,6 +134,11 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled"; + /** + * key to configure whether page level checksum verification is enabled + */ + public static final String PAGE_VERIFY_CHECKSUM_ENABLED = "parquet.page.verify-checksum.enabled"; + /** * key to turn on or off task side metadata loading (default true) * if true then metadata is read on the task side and some tasks may finish immediately. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index cd25b23e7e..afcbbff1c3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -145,6 +145,7 @@ public static enum JobSummaryLevel { public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; + public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -338,6 +339,18 @@ private static int getPageRowCountLimit(Configuration conf) { return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT); } + public static void setPageWriteChecksumEnabled(JobContext jobContext, boolean val) { + setPageWriteChecksumEnabled(getConfiguration(jobContext), val); + } + + public static void setPageWriteChecksumEnabled(Configuration conf, boolean val) { + conf.setBoolean(PAGE_WRITE_CHECKSUM_ENABLED, val); + } + + public static boolean getPageWriteChecksumEnabled(Configuration conf) { + return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); + } + private WriteSupport writeSupport; private ParquetOutputCommitter committer; @@ -409,6 +422,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) + .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -428,11 +442,13 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength()); LOG.info("Page row count limit to {}", props.getPageRowCountLimit()); + LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off"); } WriteContext init = writeSupport.init(conf); ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf), - init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength()); + init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(), + props.getPageWriteChecksumEnabled()); w.start(); float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index de20808ff8..28e1967398 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -280,6 +280,16 @@ public Builder useColumnIndexFilter() { return this; } + public Builder usePageChecksumVerification(boolean usePageChecksumVerification) { + optionsBuilder.usePageChecksumVerification(usePageChecksumVerification); + return this; + } + + public Builder usePageChecksumVerification() { + optionsBuilder.usePageChecksumVerification(); + return this; + } + public Builder withFileRange(long start, long end) { optionsBuilder.withRange(start, end); return this; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 1ed5e32ca7..7fb71864b7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -278,7 +278,8 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport MessageType schema = writeContext.getSchema(); ParquetFileWriter fileWriter = new ParquetFileWriter( - file, schema, mode, rowGroupSize, maxPaddingSize, encodingProps.getColumnIndexTruncateLength()); + file, schema, mode, rowGroupSize, maxPaddingSize, + encodingProps.getColumnIndexTruncateLength(), encodingProps.getPageWriteChecksumEnabled()); fileWriter.start(); this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); @@ -515,6 +516,27 @@ public SELF withWriterVersion(WriterVersion version) { return self(); } + /** + * Enables writing page level checksums for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF enablePageWriteChecksum() { + encodingPropsBuilder.withPageWriteChecksumEnabled(true); + return self(); + } + + /** + * Enables writing page level checksums for the constructed writer. + * + * @param enablePageWriteChecksum whether page checksums should be written out + * @return this builder for method chaining. + */ + public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { + encodingPropsBuilder.withPageWriteChecksumEnabled(enablePageWriteChecksum); + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index c353ee3fe7..88c8d83ea1 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java new file mode 100644 index 0000000000..61a9d63319 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.zip.CRC32; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.Page; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.codec.SnappyCompressor; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests that page level checksums are correctly written and that checksum verification works as + * expected + */ +public class TestDataPageV1Checksums { + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final Statistics EMPTY_STATS_INT32 = Statistics.getBuilderForReading( + Types.required(INT32).named("a")).build(); + + private CRC32 crc = new CRC32(); + + // Sample data, two columns 'a' and 'b' (both int32), + + private static final int PAGE_SIZE = 1024 * 1024; // 1MB + + private static final MessageType schemaSimple = MessageTypeParser.parseMessageType( + "message m {" + + " required int32 a;" + + " required int32 b;" + + "}"); + private static final ColumnDescriptor colADesc = schemaSimple.getColumns().get(0); + private static final ColumnDescriptor colBDesc = schemaSimple.getColumns().get(1); + private static final byte[] colAPage1Bytes = new byte[PAGE_SIZE]; + private static final byte[] colAPage2Bytes = new byte[PAGE_SIZE]; + private static final byte[] colBPage1Bytes = new byte[PAGE_SIZE]; + private static final byte[] colBPage2Bytes = new byte[PAGE_SIZE]; + private static final int numRecordsLargeFile = (2 * PAGE_SIZE) / Integer.BYTES; + + /** Write out sample Parquet file using ColumnChunkPageWriteStore directly, return path to file */ + private Path writeSimpleParquetFile(Configuration conf, CompressionCodecName compression) + throws IOException { + File file = tempFolder.newFile(); + file.delete(); + Path path = new Path(file.toURI()); + + for (int i = 0; i < PAGE_SIZE; i++) { + colAPage1Bytes[i] = (byte) i; + colAPage2Bytes[i] = (byte) -i; + colBPage1Bytes[i] = (byte) (i + 100); + colBPage2Bytes[i] = (byte) (i - 100); + } + + ParquetFileWriter writer = new ParquetFileWriter(conf, schemaSimple, path, + ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT); + + writer.start(); + writer.startBlock(numRecordsLargeFile); + + CodecFactory codecFactory = new CodecFactory(conf, PAGE_SIZE); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compression); + + ColumnChunkPageWriteStore writeStore = new ColumnChunkPageWriteStore( + compressor, schemaSimple, new HeapByteBufferAllocator(), + Integer.MAX_VALUE, ParquetOutputFormat.getPageWriteChecksumEnabled(conf)); + + PageWriter pageWriter = writeStore.getPageWriter(colADesc); + pageWriter.writePage(BytesInput.from(colAPage1Bytes), numRecordsLargeFile / 2, + numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN); + pageWriter.writePage(BytesInput.from(colAPage2Bytes), numRecordsLargeFile / 2, + numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN); + + pageWriter = writeStore.getPageWriter(colBDesc); + pageWriter.writePage(BytesInput.from(colBPage1Bytes), numRecordsLargeFile / 2, + numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN); + pageWriter.writePage(BytesInput.from(colBPage2Bytes), numRecordsLargeFile / 2, + numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN); + + writeStore.flushToFileWriter(writer); + + writer.endBlock(); + writer.end(new HashMap<>()); + + codecFactory.release(); + + return path; + } + + // Sample data, nested schema with nulls + + private static final MessageType schemaNestedWithNulls = MessageTypeParser.parseMessageType( + "message m {" + + " optional group c {" + + " required int64 id;" + + " required group d {" + + " repeated int32 val;" + + " }" + + " }" + + "}"); + private static final ColumnDescriptor colCIdDesc = schemaNestedWithNulls.getColumns().get(0); + private static final ColumnDescriptor colDValDesc = schemaNestedWithNulls.getColumns().get(1); + + private static final double nullRatio = 0.3; + private static final int numRecordsNestedWithNullsFile = 2000; + + private Path writeNestedWithNullsSampleParquetFile(Configuration conf, + boolean dictionaryEncoding, + CompressionCodecName compression) + throws IOException { + File file = tempFolder.newFile(); + file.delete(); + Path path = new Path(file.toURI()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(compression) + .withDictionaryEncoding(dictionaryEncoding) + .withType(schemaNestedWithNulls) + .withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf)) + .build()) { + GroupFactory groupFactory = new SimpleGroupFactory(schemaNestedWithNulls); + Random rand = new Random(42); + + for (int i = 0; i < numRecordsNestedWithNullsFile; i++) { + Group group = groupFactory.newGroup(); + if (rand.nextDouble() > nullRatio) { + // With equal probability, write out either 1 or 3 values in group e. To ensure our values + // are dictionary encoded when required, perform modulo. + if (rand.nextDouble() > 0.5) { + group.addGroup("c").append("id", (long) i).addGroup("d") + .append("val", rand.nextInt() % 10); + } else { + group.addGroup("c").append("id", (long) i).addGroup("d") + .append("val", rand.nextInt() % 10) + .append("val", rand.nextInt() % 10) + .append("val", rand.nextInt() % 10); + } + } + writer.write(group); + } + } + + return path; + } + + /** + * Enable writing out page level crc checksum, disable verification in read path but check that + * the crc checksums are correct. Tests whether we successfully write out correct crc checksums + * without potentially failing on the read path verification . + */ + @Test + public void testWriteOnVerifyOff() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false); + + Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED); + + try (ParquetFileReader reader = getParquetFileReader(path, conf, + Arrays.asList(colADesc, colBDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore); + assertCrcSetAndCorrect(colAPage1, colAPage1Bytes); + assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes); + + DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore); + assertCrcSetAndCorrect(colAPage2, colAPage2Bytes); + assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes); + + DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore); + assertCrcSetAndCorrect(colBPage1, colBPage1Bytes); + assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes); + + DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore); + assertCrcSetAndCorrect(colBPage2, colBPage2Bytes); + assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes); + } + } + + /** Test that we do not write out checksums if the feature is turned off */ + @Test + public void testWriteOffVerifyOff() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false); + + Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED); + + try (ParquetFileReader reader = getParquetFileReader(path, conf, + Arrays.asList(colADesc, colBDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + assertCrcNotSet(readNextPage(colADesc, pageReadStore)); + assertCrcNotSet(readNextPage(colADesc, pageReadStore)); + assertCrcNotSet(readNextPage(colBDesc, pageReadStore)); + assertCrcNotSet(readNextPage(colBDesc, pageReadStore)); + } + } + + /** + * Do not write out page level crc checksums, but enable verification on the read path. Tests + * that the read still succeeds and does not throw an exception. + */ + @Test + public void testWriteOffVerifyOn() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true); + + Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED); + + try (ParquetFileReader reader = getParquetFileReader(path, conf, + Arrays.asList(colADesc, colBDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + assertCorrectContent(readNextPage(colADesc, pageReadStore).getBytes().toByteArray(), + colAPage1Bytes); + assertCorrectContent(readNextPage(colADesc, pageReadStore).getBytes().toByteArray(), + colAPage2Bytes); + assertCorrectContent(readNextPage(colBDesc, pageReadStore).getBytes().toByteArray(), + colBPage1Bytes); + assertCorrectContent(readNextPage(colBDesc, pageReadStore).getBytes().toByteArray(), + colBPage2Bytes); + } + } + + /** + * Write out checksums and verify them on the read path. Tests that crc is set and that we can + * read back what we wrote if checksums are enabled on both the write and read path. + */ + @Test + public void testWriteOnVerifyOn() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true); + + Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED); + + try (ParquetFileReader reader = getParquetFileReader(path, conf, + Arrays.asList(colADesc, colBDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore); + assertCrcSetAndCorrect(colAPage1, colAPage1Bytes); + assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes); + + DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore); + assertCrcSetAndCorrect(colAPage2, colAPage2Bytes); + assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes); + + DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore); + assertCrcSetAndCorrect(colBPage1, colBPage1Bytes); + assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes); + + DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore); + assertCrcSetAndCorrect(colBPage2, colBPage2Bytes); + assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes); + } + } + + /** + * Test whether corruption in the page content is detected by checksum verification + */ + @Test + public void testCorruptedPage() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true); + + Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED); + + InputFile inputFile = HadoopInputFile.fromPath(path, conf); + try (SeekableInputStream inputStream = inputFile.newStream()) { + int fileLen = (int) inputFile.getLength(); + byte[] fileBytes = new byte[fileLen]; + inputStream.readFully(fileBytes); + inputStream.close(); + + // There are 4 pages in total (2 per column), we corrupt the first page of the first column + // and the second page of the second column. We do this by altering a byte roughly in the + // middle of each page to be corrupted + fileBytes[fileLen / 8]++; + fileBytes[fileLen / 8 + ((fileLen / 4) * 3)]++; + + OutputFile outputFile = HadoopOutputFile.fromPath(path, conf); + try (PositionOutputStream outputStream = outputFile.createOrOverwrite(1024 * 1024)) { + outputStream.write(fileBytes); + outputStream.close(); + + // First we disable checksum verification, the corruption will go undetected as it is in the + // data section of the page + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false); + try (ParquetFileReader reader = getParquetFileReader(path, conf, + Arrays.asList(colADesc, colBDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore); + assertFalse("Data in page was not corrupted", + Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes)); + readNextPage(colADesc, pageReadStore); + readNextPage(colBDesc, pageReadStore); + DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore); + assertFalse("Data in page was not corrupted", + Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes)); + } + + // Now we enable checksum verification, the corruption should be detected + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true); + try (ParquetFileReader reader = + getParquetFileReader(path, conf, Arrays.asList(colADesc, colBDesc))) { + // We expect an exception on the first encountered corrupt page (in readAllPages) + assertVerificationFailed(reader); + } + } + } + } + + /** + * Tests that the checksum is calculated using the compressed version of the data and that + * checksum verification succeeds + */ + @Test + public void testCompression() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true); + + Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY); + + try (ParquetFileReader reader = getParquetFileReader(path, conf, + Arrays.asList(colADesc, colBDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore); + assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes)); + assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes); + + DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore); + assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes)); + assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes); + + DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore); + assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes)); + assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes); + + DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore); + assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes)); + assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes); + } + } + + /** + * Tests that we adhere to the checksum calculation specification, namely that the crc is + * calculated using the compressed concatenation of the repetition levels, definition levels and + * the actual data. This is done by generating sample data with a nested schema containing nulls + * (generating non trivial repetition and definition levels). + */ + @Test + public void testNestedWithNulls() throws IOException { + Configuration conf = new Configuration(); + + // Write out sample file via the non-checksum code path, extract the raw bytes to calculate the + // reference crc with + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false); + Path refPath = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY); + + try (ParquetFileReader refReader = getParquetFileReader(refPath, conf, + Arrays.asList(colCIdDesc, colDValDesc))) { + PageReadStore refPageReadStore = refReader.readNextRowGroup(); + byte[] colCIdPageBytes = readNextPage(colCIdDesc, refPageReadStore).getBytes().toByteArray(); + byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray(); + + // Write out sample file with checksums + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true); + Path path = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY); + + try (ParquetFileReader reader = getParquetFileReader(path, conf, + Arrays.asList(colCIdDesc, colDValDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + DataPageV1 colCIdPage = readNextPage(colCIdDesc, pageReadStore); + assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes)); + assertCorrectContent(colCIdPage.getBytes().toByteArray(), colCIdPageBytes); + + DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore); + assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes)); + assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes); + } + } + } + + @Test + public void testDictionaryEncoding() throws IOException { + Configuration conf = new Configuration(); + + // Write out dictionary encoded sample file via the non-checksum code path, extract the raw + // bytes to calculate the reference crc with + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false); + Path refPath = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY); + + try (ParquetFileReader refReader = + getParquetFileReader(refPath, conf, Collections.singletonList(colDValDesc))) { + PageReadStore refPageReadStore = refReader.readNextRowGroup(); + // Read (decompressed) dictionary page + byte[] dictPageBytes = readDictPage(colDValDesc, refPageReadStore).getBytes().toByteArray(); + byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray(); + + // Write out sample file with checksums + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true); + conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true); + Path path = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY); + + try (ParquetFileReader reader = + getParquetFileReader(path, conf, Collections.singletonList(colDValDesc))) { + PageReadStore pageReadStore = reader.readNextRowGroup(); + + DictionaryPage dictPage = readDictPage(colDValDesc, pageReadStore); + assertCrcSetAndCorrect(dictPage, snappy(dictPageBytes)); + assertCorrectContent(dictPage.getBytes().toByteArray(), dictPageBytes); + + DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore); + assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes)); + assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes); + } + } + } + + /** Compress using snappy */ + private byte[] snappy(byte[] bytes) throws IOException { + SnappyCompressor compressor = new SnappyCompressor(); + compressor.reset(); + compressor.setInput(bytes, 0, bytes.length); + compressor.finish(); + byte[] buffer = new byte[bytes.length * 2]; + int compressedSize = compressor.compress(buffer, 0, buffer.length); + return Arrays.copyOfRange(buffer, 0, compressedSize); + } + + /** Construct ParquetFileReader for input file and columns */ + private ParquetFileReader getParquetFileReader(Path path, Configuration conf, + List columns) + throws IOException { + ParquetMetadata footer = ParquetFileReader.readFooter(conf, path); + return new ParquetFileReader(conf, footer.getFileMetaData(), path, + footer.getBlocks(), columns); + } + + /** Read the dictionary page for the column */ + private DictionaryPage readDictPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) { + return pageReadStore.getPageReader(colDesc).readDictionaryPage(); + } + + /** Read the next page for a column */ + private DataPageV1 readNextPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) { + return (DataPageV1) pageReadStore.getPageReader(colDesc).readPage(); + } + + /** + * Compare the extracted (decompressed) bytes to the reference bytes + */ + private void assertCorrectContent(byte[] pageBytes, byte[] referenceBytes) { + assertArrayEquals("Read page content was different from expected page content", referenceBytes, + pageBytes); + } + + /** + * Verify that the crc is set in a page, calculate the reference crc using the reference bytes and + * check that the crc's are identical. + */ + private void assertCrcSetAndCorrect(Page page, byte[] referenceBytes) { + assertTrue("Checksum was not set in page", page.getCrc().isPresent()); + int crcFromPage = page.getCrc().getAsInt(); + crc.reset(); + crc.update(referenceBytes); + assertEquals("Checksum found in page did not match calculated reference checksum", + crc.getValue(), (long) crcFromPage & 0xffffffffL); + } + + /** Verify that the crc is not set */ + private void assertCrcNotSet(Page page) { + assertFalse("Checksum was set in page", page.getCrc().isPresent()); + } + + /** + * Read the next page for a column, fail if this did not throw an checksum verification exception, + * if the read succeeds (no exception was thrown ), verify that the checksum was not set. + */ + private void assertVerificationFailed(ParquetFileReader reader) { + try { + reader.readNextRowGroup(); + fail("Expected checksum verification exception to be thrown"); + } catch (Exception e) { + assertTrue("Thrown exception is of incorrect type", e instanceof ParquetDecodingException); + assertTrue("Did not catch checksum verification ParquetDecodingException", + e.getMessage().contains("CRC checksum verification failed")); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 917ad57910..8763cace8b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -225,6 +225,8 @@ public void testAlignmentWithPadding() throws Exception { Path path = new Path(testFile.toURI()); Configuration conf = new Configuration(); + // Disable writing out checksums as hardcoded byte offsets in assertions below expect it + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); // uses the test constructor ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60); @@ -330,6 +332,8 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { Path path = new Path(testFile.toURI()); Configuration conf = new Configuration(); + // Disable writing out checksums as hardcoded byte offsets in assertions below expect it + conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); // uses the test constructor ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50); diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java index 27043b9480..eaf6e8e8c4 100644 --- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.zip.CRC32; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -73,6 +74,8 @@ public class DumpCommand extends ArgsOnlyCommand { public static final int BLOCK_BUFFER_SIZE = 64 * 1024; public static final String[] USAGE = new String[] { "", "where is the parquet file to print to stdout" }; + private static CRC32 crc = new CRC32(); + public static final Options OPTIONS; static { OPTIONS = new Options(); @@ -242,6 +245,12 @@ store, new DumpGroupConverter(), schema, } } + private static boolean verifyCrc(int referenceCrc, byte[] bytes) { + crc.reset(); + crc.update(bytes); + return crc.getValue() == ((long) referenceCrc & 0xffffffffL); + } + public static void dump(final PrettyPrintWriter out, PageReadStore store, ColumnDescriptor column) throws IOException { PageReader reader = store.getPageReader(column); @@ -274,6 +283,15 @@ public Void visit(DataPageV1 pageV1) { } else { out.format(" ST:[none]"); } + if (pageV1.getCrc().isPresent()) { + try { + out.format(" CRC:%s", verifyCrc(pageV1.getCrc().getAsInt(), pageV1.getBytes().toByteArray()) ? "[verified]" : "[PAGE CORRUPT]"); + } catch (IOException e) { + out.format(" CRC:[error getting page bytes]"); + } + } else { + out.format(" CRC:[none]"); + } return null; }