diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 8b5d00026887..390012624ddf 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -33,6 +33,7 @@ import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.column.ColumnWriteStore; @@ -74,6 +75,7 @@ class ParquetWriter implements FileAppender, Closeable { private long nextRowGroupSize = 0; private long recordCount = 0; private long nextCheckRecordCount = 10; + private boolean closed; private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; @@ -124,10 +126,23 @@ public Metrics metrics() { return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig); } + /** + * Returns the approximate length of the output file produced by this writer. + *

+ * Prior to calling {@link ParquetWriter#close}, the result is approximate. After calling close, the length is + * exact. + * + * @return the approximate length of the output file produced by this writer or the exact length if this writer is + * closed. + */ @Override public long length() { try { - return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0); + if (closed) { + return writer.getPos(); + } else { + return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0); + } } catch (IOException e) { throw new RuntimeIOException(e, "Failed to get file length"); } @@ -170,6 +185,8 @@ private void flushRowGroup(boolean finished) { } private void startRowGroup() { + Preconditions.checkState(!closed, "Writer is closed"); + try { this.nextRowGroupSize = Math.min(writer.getNextRowGroupSize(), targetRowGroupSize); } catch (IOException e) { @@ -189,8 +206,11 @@ private void startRowGroup() { @Override public void close() throws IOException { - flushRowGroup(true); - writeStore.close(); - writer.end(metadata); + if (!closed) { + this.closed = true; + flushRowGroup(true); + writeStore.close(); + writer.end(metadata); + } } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java index 2a0f6a4f63f6..24effa7496a5 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java @@ -19,6 +19,7 @@ package org.apache.iceberg.parquet; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Collections; @@ -53,20 +54,41 @@ static File writeRecords(TemporaryFolder temp, Schema schema, } static File writeRecords( - TemporaryFolder temp, - Schema schema, Map properties, - Function> createWriterFunc, - GenericData.Record... records) throws IOException { - File tmpFolder = temp.newFolder("parquet"); - String filename = UUID.randomUUID().toString(); - File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename)); - try (FileAppender writer = Parquet.write(localOutput(file)) - .schema(schema) - .setAll(properties) - .createWriterFunc(createWriterFunc) - .build()) { + TemporaryFolder temp, + Schema schema, Map properties, + Function> createWriterFunc, + GenericData.Record... records) throws IOException { + File file = createTempFile(temp); + write(file, schema, properties, createWriterFunc, records); + return file; + } + + static long write(File file, Schema schema, Map properties, + Function> createWriterFunc, + GenericData.Record... records) throws IOException { + + long len = 0; + + FileAppender writer = Parquet.write(localOutput(file)) + .schema(schema) + .setAll(properties) + .createWriterFunc(createWriterFunc) + .build(); + + try (Closeable toClose = writer) { writer.addAll(Lists.newArrayList(records)); + len = writer.length(); // in deprecated adapter we need to get the length first and then close the writer } - return file; + + if (writer instanceof ParquetWriter) { + len = writer.length(); + } + return len; + } + + static File createTempFile(TemporaryFolder temp) throws IOException { + File tmpFolder = temp.newFolder("parquet"); + String filename = UUID.randomUUID().toString(); + return new File(tmpFolder, FileFormat.PARQUET.addExtension(filename)); } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index d9d5eb0af93c..c931e26601aa 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Function; import org.apache.avro.generic.GenericData; @@ -29,6 +30,7 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.util.Pair; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; import org.junit.Assert; @@ -38,7 +40,8 @@ import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; -import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords; +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.write; import static org.apache.iceberg.types.Types.NestedField.optional; public class TestParquet { @@ -49,7 +52,7 @@ public class TestParquet { @Test public void testRowGroupSizeConfigurable() throws IOException { // Without an explicit writer function - File parquetFile = generateFileWithTwoRowGroups(null); + File parquetFile = generateFileWithTwoRowGroups(null).first(); try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) { Assert.assertEquals(2, reader.getRowGroups().size()); @@ -58,14 +61,43 @@ public void testRowGroupSizeConfigurable() throws IOException { @Test public void testRowGroupSizeConfigurableWithWriter() throws IOException { - File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter); + File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter).first(); try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) { Assert.assertEquals(2, reader.getRowGroups().size()); } } - private File generateFileWithTwoRowGroups(Function> createWriterFunc) + @Test + public void testNumberOfBytesWritten() throws IOException { + Schema schema = new Schema( + optional(1, "intCol", IntegerType.get()) + ); + + // this value was specifically derived to reproduce iss1980 + // record count grow factor is 10000 (hardcoded) + // total 10 checkSize method calls + // for the 10th time (the last call of the checkSize method) nextCheckRecordCount == 100100 + // 100099 + 1 >= 100100 + int recordCount = 100099; + File file = createTempFile(temp); + + List records = new ArrayList<>(recordCount); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= recordCount; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", i); + records.add(record); + } + + long actualSize = write(file, schema, Collections.emptyMap(), ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[]{})); + + long expectedSize = ParquetIO.file(localInput(file)).getLength(); + Assert.assertEquals(expectedSize, actualSize); + } + + private Pair generateFileWithTwoRowGroups(Function> createWriterFunc) throws IOException { Schema schema = new Schema( optional(1, "intCol", IntegerType.get()) @@ -85,12 +117,14 @@ private File generateFileWithTwoRowGroups(Function