Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnWriteStore;
Expand Down Expand Up @@ -74,6 +75,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private long nextRowGroupSize = 0;
private long recordCount = 0;
private long nextCheckRecordCount = 10;
private boolean closed;

private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
Expand Down Expand Up @@ -124,10 +126,23 @@ public Metrics metrics() {
return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
}

/**
* Returns the approximate length of the output file produced by this writer.
* <p>
* Prior to calling {@link ParquetWriter#close}, the result is approximate. After calling close, the length is
* exact.
*
* @return the approximate length of the output file produced by this writer or the exact length if this writer is
* closed.
*/
@Override
public long length() {
try {
return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
if (closed) {
return writer.getPos();
} else {
return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get file length");
}
Expand Down Expand Up @@ -170,6 +185,8 @@ private void flushRowGroup(boolean finished) {
}

private void startRowGroup() {
Preconditions.checkState(!closed, "Writer is closed");

try {
this.nextRowGroupSize = Math.min(writer.getNextRowGroupSize(), targetRowGroupSize);
} catch (IOException e) {
Expand All @@ -189,8 +206,11 @@ private void startRowGroup() {

@Override
public void close() throws IOException {
flushRowGroup(true);
writeStore.close();
writer.end(metadata);
if (!closed) {
this.closed = true;
flushRowGroup(true);
writeStore.close();
writer.end(metadata);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.parquet;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -53,20 +54,41 @@ static File writeRecords(TemporaryFolder temp, Schema schema,
}

static File writeRecords(
TemporaryFolder temp,
Schema schema, Map<String, String> properties,
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
GenericData.Record... records) throws IOException {
File tmpFolder = temp.newFolder("parquet");
String filename = UUID.randomUUID().toString();
File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
try (FileAppender<GenericData.Record> writer = Parquet.write(localOutput(file))
.schema(schema)
.setAll(properties)
.createWriterFunc(createWriterFunc)
.build()) {
TemporaryFolder temp,
Schema schema, Map<String, String> properties,
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
GenericData.Record... records) throws IOException {
File file = createTempFile(temp);
write(file, schema, properties, createWriterFunc, records);
return file;
}

static long write(File file, Schema schema, Map<String, String> properties,
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
GenericData.Record... records) throws IOException {

long len = 0;

FileAppender<GenericData.Record> writer = Parquet.write(localOutput(file))
.schema(schema)
.setAll(properties)
.createWriterFunc(createWriterFunc)
.build();

try (Closeable toClose = writer) {
writer.addAll(Lists.newArrayList(records));
len = writer.length(); // in deprecated adapter we need to get the length first and then close the writer
}
return file;

if (writer instanceof ParquetWriter) {
len = writer.length();
}
return len;
}

static File createTempFile(TemporaryFolder temp) throws IOException {
File tmpFolder = temp.newFolder("parquet");
String filename = UUID.randomUUID().toString();
return new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
}
}
46 changes: 40 additions & 6 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.util.Pair;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
Expand All @@ -38,7 +40,8 @@

import static org.apache.iceberg.Files.localInput;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords;
import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile;
import static org.apache.iceberg.parquet.ParquetWritingTestUtils.write;
import static org.apache.iceberg.types.Types.NestedField.optional;

public class TestParquet {
Expand All @@ -49,7 +52,7 @@ public class TestParquet {
@Test
public void testRowGroupSizeConfigurable() throws IOException {
// Without an explicit writer function
File parquetFile = generateFileWithTwoRowGroups(null);
File parquetFile = generateFileWithTwoRowGroups(null).first();

try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
Assert.assertEquals(2, reader.getRowGroups().size());
Expand All @@ -58,14 +61,43 @@ public void testRowGroupSizeConfigurable() throws IOException {

@Test
public void testRowGroupSizeConfigurableWithWriter() throws IOException {
File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter);
File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter).first();

try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
Assert.assertEquals(2, reader.getRowGroups().size());
}
}

private File generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWriter<?>> createWriterFunc)
@Test
public void testNumberOfBytesWritten() throws IOException {
Schema schema = new Schema(
optional(1, "intCol", IntegerType.get())
);

// this value was specifically derived to reproduce iss1980
// record count grow factor is 10000 (hardcoded)
// total 10 checkSize method calls
// for the 10th time (the last call of the checkSize method) nextCheckRecordCount == 100100
// 100099 + 1 >= 100100
int recordCount = 100099;
File file = createTempFile(temp);

List<GenericData.Record> records = new ArrayList<>(recordCount);
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
for (int i = 1; i <= recordCount; i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("intCol", i);
records.add(record);
}

long actualSize = write(file, schema, Collections.emptyMap(), ParquetAvroWriter::buildWriter,
records.toArray(new GenericData.Record[]{}));

long expectedSize = ParquetIO.file(localInput(file)).getLength();
Assert.assertEquals(expectedSize, actualSize);
}

private Pair<File, Long> generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWriter<?>> createWriterFunc)
throws IOException {
Schema schema = new Schema(
optional(1, "intCol", IntegerType.get())
Expand All @@ -85,12 +117,14 @@ private File generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWrit
// Force multiple row groups by making the byte size very small
// Note there'a also minimumRowGroupRecordCount which cannot be configured so we have to write
// at least that many records for a new row group to occur
return writeRecords(temp,
File file = createTempFile(temp);
long size = write(file,
schema,
ImmutableMap.of(
PARQUET_ROW_GROUP_SIZE_BYTES,
Integer.toString(minimumRowGroupRecordCount * Integer.BYTES)),
createWriterFunc,
records.toArray(new GenericData.Record[] {}));
records.toArray(new GenericData.Record[]{}));
return Pair.of(file, size);
}
}