Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.openjdk.jmh.annotations.Mode.SingleShotTime;
import static org.openjdk.jmh.annotations.Scope.Benchmark;

import java.io.IOException;
import java.util.Random;

import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

/**
* Benchmark to measure writing nested null values. (See PARQUET-343 for details.)
* <p>
* To execute this benchmark a jar file shall be created of this module. Then the jar file can be executed using the JMH
* framework.<br>
* The following one-liner (shall be executed in the parquet-benchmarks submodule) generates result statistics in the
* file {@code jmh-result.json}. This json might be visualized by using the tool at
* <a href="https://jmh.morethan.io">https://jmh.morethan.io</a>.
*
* <pre>
* mvn clean package &amp;&amp; java -jar target/parquet-benchmarks.jar org.apache.parquet.benchmarks.NestedNullWritingBenchmarks -rf json
* </pre>
*/
@BenchmarkMode(SingleShotTime)
@Fork(1)
@Warmup(iterations = 10, batchSize = 1)
@Measurement(iterations = 50, batchSize = 1)
@OutputTimeUnit(MILLISECONDS)
@State(Benchmark)
public class NestedNullWritingBenchmarks {
private static final MessageType SCHEMA = Types.buildMessage()
.optionalList()
.optionalElement(INT32)
.named("int_list")
.optionalList()
.optionalListElement()
.optionalElement(BINARY)
.named("dummy_list")
.optionalMap()
.key(BINARY)
.value(BINARY, OPTIONAL)
.named("dummy_map")
.optionalGroup()
.optional(BINARY).named("dummy_group_value1")
.optional(BINARY).named("dummy_group_value2")
.optional(BINARY).named("dummy_group_value3")
.named("dummy_group")
.named("msg");
private static final int RECORD_COUNT = 10_000_000;
private static final double NULL_RATIO = 0.99;
private static final OutputFile BLACK_HOLE = new OutputFile() {
@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return -1L;
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) {
return create(blockSizeHint);
}

@Override
public PositionOutputStream create(long blockSizeHint) {
return new PositionOutputStream() {
private long pos;

@Override
public long getPos() throws IOException {
return pos;
}

@Override
public void write(int b) throws IOException {
++pos;
}
};
}
};

private static class ValueGenerator {
private static final GroupFactory FACTORY = new SimpleGroupFactory(SCHEMA);
private static final Group NULL = FACTORY.newGroup();
private final Random random = new Random(42);

public Group nextValue() {
if (random.nextDouble() > NULL_RATIO) {
Group group = FACTORY.newGroup();
group.addGroup("int_list").addGroup("list").append("element", random.nextInt());
return group;
} else {
return NULL;
}
}
}

@Benchmark
public void benchmarkWriting() throws IOException {
ValueGenerator generator = new ValueGenerator();
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(BLACK_HOLE)
.withWriteMode(Mode.OVERWRITE)
.withType(SCHEMA)
.build()) {
for (int i = 0; i < RECORD_COUNT; ++i) {
writer.write(generator.nextValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface ColumnWriteStore {
abstract public long getBufferedSize();

/**
* used for debugging pupose
* used for debugging purpose
* @return a formated string representing memory usage per column
*/
abstract public String memUsageString();
Expand All @@ -62,4 +62,14 @@ public interface ColumnWriteStore {
*/
abstract public void close();

/**
* Returns whether flushing the possibly cached values (or nulls) to the underlying column writers is necessary,
* because the pages might be closed after the next invocation of {@link #endRecord()}.
*
* @return {@code true} if all the values shall be written to the underlying column writers before calling
* {@link #endRecord()}
*/
default boolean isColumnFlushNeeded() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,9 @@ private void sizeCheck() {
rowCountForNextSizeCheck = rowCountForNextRowCountCheck;
}
}

@Override
public boolean isColumnFlushNeeded() {
return rowCount + 1 >= rowCountForNextSizeCheck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ long getRowsWrittenSoFar() {
* Writes the current data to a new page in the page store
*/
void writePage() {
if (valueCount == 0) {
throw new ParquetEncodingException("writing empty page");
}
this.rowsWrittenSoFar += pageRowCount;
if (DEBUG)
LOG.debug("write page");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,13 @@ public void startMessage() {
@Override
public void endMessage() {
writeNullForMissingFieldsAtCurrentLevel();

// We need to flush the cached null values before ending the record to ensure that everything is sent to the
// writer before the current page would be closed
if (columns.isColumnFlushNeeded()) {
flush();
}

columns.endRecord();
if (DEBUG) log("< MESSAGE END >");
if (DEBUG) printState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.HashMap;
Expand All @@ -47,6 +48,17 @@ public static Builder builder(Path file) {
return new Builder(file);
}

/**
* Creates a Builder for configuring ParquetWriter with the example object
* model. THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE.
*
* @param file the output file to create
* @return a {@link Builder} to create a {@link ParquetWriter}
*/
public static Builder builder(OutputFile file) {
return new Builder(file);
}

/**
* Create a new {@link ExampleParquetWriter}.
*
Expand Down Expand Up @@ -78,6 +90,10 @@ private Builder(Path file) {
super(file);
}

private Builder(OutputFile file) {
super(file);
}

public Builder withType(MessageType type) {
this.type = type;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.Arrays.asList;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
import static org.apache.parquet.column.Encoding.PLAIN;
Expand All @@ -32,20 +33,24 @@
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.example.ExampleInputFormat;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Rule;
Expand All @@ -54,6 +59,7 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
Expand Down Expand Up @@ -166,4 +172,39 @@ public Void call() throws IOException {
Assert.assertFalse("Should not create a file when schema is rejected",
file.exists());
}

// Testing the issue of PARQUET-1531 where writing null nested rows leads to empty pages if the page row count limit
// is reached.
@Test
public void testNullValuesWithPageRowLimit() throws IOException {
MessageType schema = Types.buildMessage().optionalList().optionalElement(BINARY).as(stringType()).named("str_list")
.named("msg");
final int recordCount = 100;
Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);

GroupFactory factory = new SimpleGroupFactory(schema);
Group listNull = factory.newGroup();

File file = temp.newFile();
file.delete();
Path path = new Path(file.getAbsolutePath());
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withPageRowCountLimit(10)
.withConf(conf)
.build()) {
for (int i = 0; i < recordCount; ++i) {
writer.write(listNull);
}
}

try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).build()) {
int readRecordCount = 0;
for (Group group = reader.read(); group != null; group = reader.read()) {
assertEquals(listNull.toString(), group.toString());
++readRecordCount;
}
assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount);
}
}
}