Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,28 @@ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) {
return self();
}

/**
* Sets the minimum number of rows to write before a page size check is done.
*
* @param min writes at least `min` rows before invoking a page size check
* @return this builder for method chaining
*/
public SELF withMinRowCountForPageSizeCheck(int min) {
encodingPropsBuilder.withMinRowCountForPageSizeCheck(min);
return self();
}

/**
* Sets the maximum number of rows to write before a page size check is done.
*
* @param max makes a page size check after `max` rows have been written
* @return this builder for method chaining
*/
public SELF withMaxRowCountForPageSizeCheck(int max) {
encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max);
return self();
}

/**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import net.openhft.hashing.LongHashFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
Expand Down Expand Up @@ -279,4 +280,50 @@ public void testParquetFileWithBloomFilter() throws IOException {
LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
}
}

@Test
public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
1);
testParquetFileNumberOfBlocks(1, 1, 3);

}

private void testParquetFileNumberOfBlocks(int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck,
int expectedNumberOfBlocks) throws IOException {
MessageType schema = Types
.buildMessage()
.required(BINARY)
.as(stringType())
.named("str")
.named("msg");

Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);

File file = temp.newFile();
temp.delete();
Path path = new Path(file.getAbsolutePath());
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withConf(conf)
// Set row group size to 1, to make sure we flush every time
// minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
.withRowGroupSize(1)
.withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck)
.withMaxRowCountForPageSizeCheck(maxRowCountForPageSizeCheck)
.build()) {

SimpleGroupFactory factory = new SimpleGroupFactory(schema);
writer.write(factory.newGroup().append("str", "foo"));
writer.write(factory.newGroup().append("str", "bar"));
writer.write(factory.newGroup().append("str", "baz"));
}

try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
ParquetMetadata footer = reader.getFooter();
assertEquals(expectedNumberOfBlocks, footer.getBlocks().size());
}
}
}