diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index e1afaca994..696fec3140 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -615,6 +615,28 @@ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) { return self(); } + /** + * Sets the minimum number of rows to write before a page size check is done. + * + * @param min writes at least `min` rows before invoking a page size check + * @return this builder for method chaining + */ + public SELF withMinRowCountForPageSizeCheck(int min) { + encodingPropsBuilder.withMinRowCountForPageSizeCheck(min); + return self(); + } + + /** + * Sets the maximum number of rows to write before a page size check is done. + * + * @param max makes a page size check after `max` rows have been written + * @return this builder for method chaining + */ + public SELF withMaxRowCountForPageSizeCheck(int max) { + encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max); + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index de53e96264..9e9b735f32 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -45,6 +45,7 @@ import net.openhft.hashing.LongHashFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.hadoop.example.ExampleParquetWriter; @@ -279,4 +280,50 @@ public void testParquetFileWithBloomFilter() throws IOException { LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); } } + + @Test + public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException { + testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK, + 1); + testParquetFileNumberOfBlocks(1, 1, 3); + + } + + private void testParquetFileNumberOfBlocks(int minRowCountForPageSizeCheck, + int maxRowCountForPageSizeCheck, + int expectedNumberOfBlocks) throws IOException { + MessageType schema = Types + .buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + File file = temp.newFile(); + temp.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + // Set row group size to 1, to make sure we flush every time + // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded + .withRowGroupSize(1) + .withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck) + .withMaxRowCountForPageSizeCheck(maxRowCountForPageSizeCheck) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + writer.write(factory.newGroup().append("str", "foo")); + writer.write(factory.newGroup().append("str", "bar")); + writer.write(factory.newGroup().append("str", "baz")); + } + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + ParquetMetadata footer = reader.getFooter(); + assertEquals(expectedNumberOfBlocks, footer.getBlocks().size()); + } + } }