From 07f6b97beb0c6e43d8a71bf04c90722036ed6f74 Mon Sep 17 00:00:00 2001 From: jinyang_li Date: Fri, 4 Oct 2024 09:50:25 -0700 Subject: [PATCH 1/7] add unit test for parquet row groups size --- .../apache/iceberg/parquet/TestParquet.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index ae0a822d3464..df3a6d54962b 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -35,6 +35,8 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; +import java.util.Random; +import java.util.UUID; import java.util.function.Function; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -46,6 +48,7 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -57,6 +60,7 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -219,6 +223,52 @@ public void testTwoLevelList() throws IOException { assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary); } + + @Test + public void testParquetRowGroupSize() throws IOException { + // verify parquet row group size should be close to configured size + ImmutableList.Builder columnsBuilder = ImmutableList.builder(); + + for (int i = 1; i <= 50; i++) { + columnsBuilder.add(optional(i, "stringCol" + i, Types.StringType.get())); + } + + List columns = columnsBuilder.build(); + Schema schema = new Schema(columns); + + int recordCount = 10000; + File file = createTempFile(temp); + + List records = Lists.newArrayListWithCapacity(recordCount); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= recordCount; i++) { + Random random = new Random(); + String value = "A".repeat(100); + GenericData.Record record = new GenericData.Record(avroSchema); + for (Types.NestedField column : columns) { + record.put(column.name(), value); + } + + records.add(record); + } + + long actualSize = + write( + file, + schema, + ImmutableMap.of("write.parquet.row-group-size-bytes", "1048576"), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { + ParquetMetadata footer = reader.getFooter(); + for (int i = 1; i < footer.getBlocks().size() - 1; i++) { + BlockMetaData blockMetaData = footer.getBlocks().get(i); + System.out.println("Block " + i + " compressed size: " + blockMetaData.getCompressedSize()); + } + } + } + private Pair generateFile( Function> createWriterFunc, int desiredRecordCount, From 5854ccb8a23362a87ef6ec9750963b43e23a5180 Mon Sep 17 00:00:00 2001 From: jinyang_li Date: Fri, 4 Oct 2024 10:51:45 -0700 Subject: [PATCH 2/7] Estimate row group size --- .../apache/iceberg/parquet/ParquetWriter.java | 18 +++++++++++++++++- .../apache/iceberg/parquet/TestParquet.java | 7 +++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 099cffc33bb8..f529f50efa70 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -66,6 +66,9 @@ class ParquetWriter implements FileAppender, Closeable { private boolean closed; private ParquetFileWriter writer; private int rowGroupOrdinal; + private long currentBufferSize = 0; + private long totalBufferSize = 0; + private long totalRowGroupSize = 0; private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; @@ -132,7 +135,9 @@ private void ensureWriterInitialized() { @Override public void add(T value) { recordCount += 1; + long sizeBeforeWrite = writeStore.getBufferedSize(); model.write(0, value); + this.currentBufferSize += writeStore.getBufferedSize() - sizeBeforeWrite; writeStore.endRecord(); checkSize(); } @@ -185,9 +190,17 @@ public List splitOffsets() { return null; } + private long estimateBufferSize() { + if (totalRowGroupSize == 0 || totalBufferSize == 0) { + return currentBufferSize; + } + + return currentBufferSize * totalRowGroupSize / totalBufferSize; + } + private void checkSize() { if (recordCount >= nextCheckRecordCount) { - long bufferedSize = writeStore.getBufferedSize(); + long bufferedSize = estimateBufferSize(); double avgRecordSize = ((double) bufferedSize) / recordCount; if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) { @@ -211,6 +224,8 @@ private void flushRowGroup(boolean finished) { writer.startBlock(recordCount); writeStore.flush(); pageStore.flushToFileWriter(writer); + totalBufferSize += currentBufferSize; + totalRowGroupSize += writeStore.getBufferedSize(); writer.endBlock(); if (!finished) { writeStore.close(); @@ -245,6 +260,7 @@ private void startRowGroup() { this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, pageStore); model.setColumnStore(writeStore); + this.currentBufferSize = 0; } @Override diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index df3a6d54962b..ddf1a6e184ef 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -236,17 +236,16 @@ public void testParquetRowGroupSize() throws IOException { List columns = columnsBuilder.build(); Schema schema = new Schema(columns); - int recordCount = 10000; + int recordCount = 100000; File file = createTempFile(temp); List records = Lists.newArrayListWithCapacity(recordCount); org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); for (int i = 1; i <= recordCount; i++) { - Random random = new Random(); - String value = "A".repeat(100); GenericData.Record record = new GenericData.Record(avroSchema); for (Types.NestedField column : columns) { - record.put(column.name(), value); + String value = column.name().repeat(10) + i; + record.put(column.name(), value); } records.add(record); From c83198e4319b67bd1282fc82a8277627df664cdb Mon Sep 17 00:00:00 2001 From: jinyang_li Date: Fri, 4 Oct 2024 13:28:40 -0700 Subject: [PATCH 3/7] code style --- .../apache/iceberg/parquet/TestParquet.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index ddf1a6e184ef..009a1097a321 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -35,9 +35,8 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.UUID; import java.util.function.Function; +import java.util.stream.IntStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; @@ -223,20 +222,18 @@ public void testTwoLevelList() throws IOException { assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary); } - @Test public void testParquetRowGroupSize() throws IOException { // verify parquet row group size should be close to configured size - ImmutableList.Builder columnsBuilder = ImmutableList.builder(); - - for (int i = 1; i <= 50; i++) { - columnsBuilder.add(optional(i, "stringCol" + i, Types.StringType.get())); - } + int recordCount = 100000; + int columnCount = 50; - List columns = columnsBuilder.build(); + List columns = + IntStream.rangeClosed(1, columnCount) + .mapToObj(i -> optional(i, "stringCol" + i, Types.StringType.get())) + .collect(ImmutableList.toImmutableList()); Schema schema = new Schema(columns); - int recordCount = 100000; File file = createTempFile(temp); List records = Lists.newArrayListWithCapacity(recordCount); @@ -252,19 +249,22 @@ public void testParquetRowGroupSize() throws IOException { } long actualSize = - write( - file, - schema, - ImmutableMap.of("write.parquet.row-group-size-bytes", "1048576"), - ParquetAvroWriter::buildWriter, - records.toArray(new GenericData.Record[] {})); + write( + file, + schema, + ImmutableMap.of("write.parquet.row-group-size-bytes", "1048576"), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { ParquetMetadata footer = reader.getFooter(); for (int i = 1; i < footer.getBlocks().size() - 1; i++) { - BlockMetaData blockMetaData = footer.getBlocks().get(i); - System.out.println("Block " + i + " compressed size: " + blockMetaData.getCompressedSize()); + assertThat(footer.getBlocks().get(i).getCompressedSize()) + .isBetween((long) 900 * 1024, (long) 1200 * 1024); } + + assertThat(footer.getBlocks().get(footer.getBlocks().size() - 1).getCompressedSize()) + .isLessThan((long) 1200 * 1024); } } From 5e646685cf116570e32ececf388bf15782b6049a Mon Sep 17 00:00:00 2001 From: jinyang_li Date: Mon, 7 Oct 2024 15:54:50 -0700 Subject: [PATCH 4/7] fix testBinPackCombineMediumFiles test failure --- .../main/java/org/apache/iceberg/parquet/ParquetWriter.java | 4 ++-- .../iceberg/spark/actions/TestRewriteDataFilesAction.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index f529f50efa70..ff0aa3685b51 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -172,7 +172,7 @@ public long length() { if (!closed && recordCount > 0) { // recordCount > 0 when there are records in the write store that have not been flushed to // the Parquet file - length += writeStore.getBufferedSize(); + length += estimateBufferSize(); } return length; @@ -192,7 +192,7 @@ public List splitOffsets() { private long estimateBufferSize() { if (totalRowGroupSize == 0 || totalBufferSize == 0) { - return currentBufferSize; + return writeStore.getBufferedSize(); } return currentBufferSize * totalRowGroupSize / totalBufferSize; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index b67ee87c7d3e..89dbacb53091 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -557,8 +557,8 @@ public void testBinPackCombineMixedFiles() { @Test public void testBinPackCombineMediumFiles() { - Table table = createTable(4); - shouldHaveFiles(table, 4); + Table table = createTable(6); + shouldHaveFiles(table, 6); List expectedRecords = currentData(); int targetSize = ((int) testDataSize(table) / 3); @@ -578,7 +578,7 @@ public void testBinPackCombineMediumFiles() { assertThat(result.rewrittenDataFilesCount()) .as("Action should delete 4 data files") - .isEqualTo(4); + .isEqualTo(6); assertThat(result.addedDataFilesCount()).as("Action should add 3 data files").isEqualTo(3); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); From 742eaac8d2d429530bd169548231de6fb8445e2e Mon Sep 17 00:00:00 2001 From: jinyang_li Date: Tue, 8 Oct 2024 11:20:49 -0700 Subject: [PATCH 5/7] address comments --- .../apache/iceberg/parquet/ParquetWriter.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index ff0aa3685b51..ba09dd7cb76b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -66,8 +66,8 @@ class ParquetWriter implements FileAppender, Closeable { private boolean closed; private ParquetFileWriter writer; private int rowGroupOrdinal; - private long currentBufferSize = 0; - private long totalBufferSize = 0; + private long currentRawBufferedSize = 0; + private long totalRawBufferedSize = 0; private long totalRowGroupSize = 0; private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; @@ -137,7 +137,7 @@ public void add(T value) { recordCount += 1; long sizeBeforeWrite = writeStore.getBufferedSize(); model.write(0, value); - this.currentBufferSize += writeStore.getBufferedSize() - sizeBeforeWrite; + this.currentRawBufferedSize += writeStore.getBufferedSize() - sizeBeforeWrite; writeStore.endRecord(); checkSize(); } @@ -172,7 +172,7 @@ public long length() { if (!closed && recordCount > 0) { // recordCount > 0 when there are records in the write store that have not been flushed to // the Parquet file - length += estimateBufferSize(); + length += estimateBufferedSize(); } return length; @@ -190,17 +190,21 @@ public List splitOffsets() { return null; } - private long estimateBufferSize() { - if (totalRowGroupSize == 0 || totalBufferSize == 0) { + /* + * Data size could reduce after written out due to encoding/compression. + * Use the ratio totalRowGroupSize / totalBufferSize to estimate the size after write out. + */ + private long estimateBufferedSize() { + if (totalRowGroupSize == 0 || totalRawBufferedSize == 0 || currentRawBufferedSize == 0) { return writeStore.getBufferedSize(); } - return currentBufferSize * totalRowGroupSize / totalBufferSize; + return currentRawBufferedSize * totalRowGroupSize / totalRawBufferedSize; } private void checkSize() { if (recordCount >= nextCheckRecordCount) { - long bufferedSize = estimateBufferSize(); + long bufferedSize = estimateBufferedSize(); double avgRecordSize = ((double) bufferedSize) / recordCount; if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) { @@ -224,7 +228,7 @@ private void flushRowGroup(boolean finished) { writer.startBlock(recordCount); writeStore.flush(); pageStore.flushToFileWriter(writer); - totalBufferSize += currentBufferSize; + totalRawBufferedSize += currentRawBufferedSize; totalRowGroupSize += writeStore.getBufferedSize(); writer.endBlock(); if (!finished) { @@ -260,7 +264,7 @@ private void startRowGroup() { this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, pageStore); model.setColumnStore(writeStore); - this.currentBufferSize = 0; + this.currentRawBufferedSize = 0; } @Override From 07fe92754b0e3fde874412cd809ab8b04f57ecbb Mon Sep 17 00:00:00 2001 From: jinyang_li Date: Tue, 8 Oct 2024 22:47:05 -0700 Subject: [PATCH 6/7] remove -2 estimate --- .../main/java/org/apache/iceberg/parquet/ParquetWriter.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index ba09dd7cb76b..5296e8573dc0 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -205,11 +205,10 @@ private long estimateBufferedSize() { private void checkSize() { if (recordCount >= nextCheckRecordCount) { long bufferedSize = estimateBufferedSize(); - double avgRecordSize = ((double) bufferedSize) / recordCount; - - if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) { + if (bufferedSize > targetRowGroupSize) { flushRowGroup(false); } else { + double avgRecordSize = ((double) bufferedSize) / recordCount; long remainingSpace = targetRowGroupSize - bufferedSize; long remainingRecords = (long) (remainingSpace / avgRecordSize); this.nextCheckRecordCount = From a6e9ef3b1a0bd681d6ab463f2fb7da19d201e857 Mon Sep 17 00:00:00 2001 From: jinyang_li Date: Wed, 9 Oct 2024 15:14:22 -0700 Subject: [PATCH 7/7] update unit tests --- core/src/test/java/org/apache/iceberg/TestMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index 2c4849135f64..a161231c160a 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -533,7 +533,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce assertThat(recordsFile).isNotNull(); // rowgroup size should be > 1 - assertThat(splitCount(recordsFile)).isEqualTo(3); + assertThat(splitCount(recordsFile)).isEqualTo(2); assertThat(metrics.recordCount()).isEqualTo(201L); assertCounts(1, 201L, 0L, metrics);