diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 88057bfd09d6..7479fd69f307 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -100,4 +100,7 @@ private TableProperties() {} public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled"; public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false"; + + public static final String WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"; + public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = Long.MAX_VALUE; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 593de9b9c778..14838d5d0ca5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -126,7 +126,7 @@ public Metrics metrics() { @Override public long length() { try { - return writer.getPos(); + return writer.getPos() + writeStore.getBufferedSize(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to get file length"); } diff --git a/site/docs/configuration.md b/site/docs/configuration.md index 3e939fe350eb..419ef22232dc 100644 --- a/site/docs/configuration.md +++ b/site/docs/configuration.md @@ -25,6 +25,7 @@ Iceberg tables support table properties to configure table behavior, like the de | write.metadata.compression-codec | none | Metadata compression codec; none or gzip | | write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full | | write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full | +| write.target-file-size-bytes | Long.MAX_VALUE | Controls the size of files generated to target about this many bytes. | ### Table behavior properties @@ -75,4 +76,5 @@ df.write | Spark option | Default | Description | | ------------ | -------------------------- | ------------------------------------------------------------ | | write-format | Table write.format.default | File format to use for this write operation; parquet or avro | +| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java index f5205057e584..1b8ba50a2729 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -19,12 +19,10 @@ package org.apache.iceberg.spark.source; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -33,7 +31,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.function.Function; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; @@ -57,6 +54,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.data.SparkAvroWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -77,6 +75,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; // TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage class Writer implements DataSourceWriter { @@ -89,6 +89,7 @@ class Writer implements DataSourceWriter { private final boolean replacePartitions; private final String applicationId; private final String wapId; + private final long targetFileSize; Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) { this(table, options, replacePartitions, applicationId, null); @@ -102,6 +103,10 @@ class Writer implements DataSourceWriter { this.replacePartitions = replacePartitions; this.applicationId = applicationId; this.wapId = wapId; + + long tableTargetFileSize = PropertyUtil.propertyAsLong( + table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize); } private FileFormat getFileFormat(Map tableProperties, DataSourceOptions options) { @@ -119,7 +124,7 @@ private boolean isWapTable() { @Override public DataWriterFactory createWriterFactory() { return new WriterFactory( - table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager); + table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager, targetFileSize); } @Override @@ -242,34 +247,31 @@ private static class WriterFactory implements DataWriterFactory { private final FileFormat format; private final LocationProvider locations; private final Map properties; - private final String uuid = UUID.randomUUID().toString(); private final FileIO fileIo; private final EncryptionManager encryptionManager; + private final long targetFileSize; WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, - Map properties, FileIO fileIo, EncryptionManager encryptionManager) { + Map properties, FileIO fileIo, EncryptionManager encryptionManager, + long targetFileSize) { this.spec = spec; this.format = format; this.locations = locations; this.properties = properties; this.fileIo = fileIo; this.encryptionManager = encryptionManager; + this.targetFileSize = targetFileSize; } @Override public DataWriter createDataWriter(int partitionId, long taskId, long epochId) { - String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid)); - AppenderFactory factory = new SparkAppenderFactory(); + OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId); + AppenderFactory appenderFactory = new SparkAppenderFactory(); + if (spec.fields().isEmpty()) { - OutputFile outputFile = fileIo.newOutputFile(locations.newDataLocation(filename)); - return new UnpartitionedWriter(encryptionManager.encrypt(outputFile), format, factory, fileIo); + return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); } else { - Function newOutputFileForKey = - key -> { - OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, filename)); - return encryptionManager.encrypt(rawOutputFile); - }; - return new PartitionedWriter(spec, format, factory, newOutputFileForKey, fileIo); + return new PartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); } } @@ -303,142 +305,119 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor } } } - } - - private interface AppenderFactory { - FileAppender newAppender(OutputFile file, FileFormat format); - } - - private static class UnpartitionedWriter implements DataWriter, Closeable { - private final FileIO fileIo; - private FileAppender appender = null; - private Metrics metrics = null; - private List offsetRanges = null; - private final EncryptedOutputFile file; - - UnpartitionedWriter( - EncryptedOutputFile outputFile, - FileFormat format, - AppenderFactory factory, - FileIO fileIo) { - this.fileIo = fileIo; - this.file = outputFile; - this.appender = factory.newAppender(file.encryptingOutputFile(), format); - } - @Override - public void write(InternalRow record) { - appender.add(record); - } - - @Override - public WriterCommitMessage commit() throws IOException { - Preconditions.checkArgument(appender != null, "Commit called on a closed writer: %s", this); - - // metrics and splitOffsets are populated on close - close(); - - if (metrics.recordCount() == 0L) { - fileIo.deleteFile(file.encryptingOutputFile()); - return new TaskCommit(); + private class OutputFileFactory { + private final int partitionId; + private final long taskId; + private final long epochId; + // The purpose of this uuid is to be able to know from two paths that they were written by the same operation. + // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all + // with a recursive listing and grep. + private final String uuid = UUID.randomUUID().toString(); + private int fileCount; + + OutputFileFactory(int partitionId, long taskId, long epochId) { + this.partitionId = partitionId; + this.taskId = taskId; + this.epochId = epochId; + this.fileCount = 0; } - DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null, metrics, offsetRanges); - - return new TaskCommit(dataFile); - } - - @Override - public void abort() throws IOException { - Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this); + private String generateFilename() { + return format.addExtension(String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount++)); + } - close(); - fileIo.deleteFile(file.encryptingOutputFile()); - } + /** + * Generates EncryptedOutputFile for UnpartitionedWriter. + */ + public EncryptedOutputFile newOutputFile() { + OutputFile file = fileIo.newOutputFile(locations.newDataLocation(generateFilename())); + return encryptionManager.encrypt(file); + } - @Override - public void close() throws IOException { - if (this.appender != null) { - this.appender.close(); - this.metrics = appender.metrics(); - this.offsetRanges = appender.splitOffsets(); - this.appender = null; + /** + * Generates EncryptedOutputFile for PartitionedWriter. + */ + public EncryptedOutputFile newOutputFile(PartitionKey key) { + OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename())); + return encryptionManager.encrypt(rawOutputFile); } } } - private static class PartitionedWriter implements DataWriter { - private final Set completedPartitions = Sets.newHashSet(); + private interface AppenderFactory { + FileAppender newAppender(OutputFile file, FileFormat format); + } + + private abstract static class BaseWriter implements DataWriter { + protected static final int ROWS_DIVISOR = 1000; + private final List completedFiles = Lists.newArrayList(); private final PartitionSpec spec; private final FileFormat format; - private final AppenderFactory factory; - private final Function newOutputFileForKey; - private final PartitionKey key; + private final AppenderFactory appenderFactory; + private final WriterFactory.OutputFileFactory fileFactory; private final FileIO fileIo; - + private final long targetFileSize; private PartitionKey currentKey = null; private FileAppender currentAppender = null; private EncryptedOutputFile currentFile = null; + private long currentRows = 0; - PartitionedWriter( - PartitionSpec spec, - FileFormat format, - AppenderFactory factory, - Function newOutputFileForKey, - FileIO fileIo) { + BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory appenderFactory, + WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) { this.spec = spec; this.format = format; - this.factory = factory; - this.newOutputFileForKey = newOutputFileForKey; - this.key = new PartitionKey(spec); + this.appenderFactory = appenderFactory; + this.fileFactory = fileFactory; this.fileIo = fileIo; + this.targetFileSize = targetFileSize; } @Override - public void write(InternalRow row) throws IOException { - key.partition(row); + public abstract void write(InternalRow row) throws IOException; - if (!key.equals(currentKey)) { + public void writeInternal(InternalRow row) throws IOException { + if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) { closeCurrent(); - - if (completedPartitions.contains(key)) { - // if rows are not correctly grouped, detect and fail the write - PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null); - LOG.warn("Duplicate key: {} == {}", existingKey, key); - throw new IllegalStateException("Already closed file for partition: " + key.toPath()); - } - - this.currentKey = key.copy(); - this.currentFile = newOutputFileForKey.apply(currentKey); - this.currentAppender = factory.newAppender(currentFile.encryptingOutputFile(), format); + openCurrent(); } currentAppender.add(row); + currentRows++; } @Override public WriterCommitMessage commit() throws IOException { closeCurrent(); + return new TaskCommit(completedFiles); } @Override public void abort() throws IOException { + closeCurrent(); + // clean up files created by this writer Tasks.foreach(completedFiles) .throwFailureWhenFinished() .noRetry() .run(file -> fileIo.deleteFile(file.path().toString())); + } - if (currentAppender != null) { - currentAppender.close(); - this.currentAppender = null; - fileIo.deleteFile(currentFile.encryptingOutputFile()); + protected void openCurrent() { + if (spec.fields().size() == 0) { + // unpartitioned + currentFile = fileFactory.newOutputFile(); + } else { + // partitioned + currentFile = fileFactory.newOutputFile(currentKey); } + currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format); + currentRows = 0; } - private void closeCurrent() throws IOException { + protected void closeCurrent() throws IOException { if (currentAppender != null) { currentAppender.close(); // metrics are only valid after the appender is closed @@ -446,16 +425,89 @@ private void closeCurrent() throws IOException { List splitOffsets = currentAppender.splitOffsets(); this.currentAppender = null; - DataFile dataFile = DataFiles.builder(spec) - .withEncryptedOutputFile(currentFile) - .withPartition(currentKey) - .withMetrics(metrics) - .withSplitOffsets(splitOffsets) - .build(); + if (metrics.recordCount() == 0L) { + fileIo.deleteFile(currentFile.encryptingOutputFile()); + } else { + DataFile dataFile = DataFiles.builder(spec) + .withEncryptedOutputFile(currentFile) + .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned + .withMetrics(metrics) + .withSplitOffsets(splitOffsets) + .build(); + completedFiles.add(dataFile); + } + + this.currentFile = null; + } + } + + protected PartitionKey getCurrentKey() { + return currentKey; + } + + protected void setCurrentKey(PartitionKey currentKey) { + this.currentKey = currentKey; + } + } + + private static class UnpartitionedWriter extends BaseWriter { + private static final int ROWS_DIVISOR = 1000; + + UnpartitionedWriter( + PartitionSpec spec, + FileFormat format, + AppenderFactory appenderFactory, + WriterFactory.OutputFileFactory fileFactory, + FileIO fileIo, + long targetFileSize) { + super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); + + openCurrent(); + } + + @Override + public void write(InternalRow row) throws IOException { + writeInternal(row); + } + } + + private static class PartitionedWriter extends BaseWriter { + private final PartitionKey key; + private final Set completedPartitions = Sets.newHashSet(); + + PartitionedWriter( + PartitionSpec spec, + FileFormat format, + AppenderFactory appenderFactory, + WriterFactory.OutputFileFactory fileFactory, + FileIO fileIo, + long targetFileSize) { + super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); + + this.key = new PartitionKey(spec); + } + + @Override + public void write(InternalRow row) throws IOException { + key.partition(row); + PartitionKey currentKey = getCurrentKey(); + if (!key.equals(currentKey)) { + closeCurrent(); completedPartitions.add(currentKey); - completedFiles.add(dataFile); + + if (completedPartitions.contains(key)) { + // if rows are not correctly grouped, detect and fail the write + PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null); + LOG.warn("Duplicate key: {} == {}", existingKey, key); + throw new IllegalStateException("Already closed files for partition: " + key.toPath()); + } + + setCurrentKey(key.copy()); + openCurrent(); } + + writeInternal(row); } } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java index d8d164b59a5e..79a50d25da19 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java @@ -30,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; @@ -247,4 +248,94 @@ public void testUnpartitionedOverwrite() throws IOException { Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); Assert.assertEquals("Result rows should match", expected, actual); } + + @Test + public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException { + File parent = temp.newFolder("parquet"); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + table.updateProperties() + .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger + .commit(); + + List expected = Lists.newArrayListWithCapacity(4000); + for (int i = 0; i < 4000; i++) { + expected.add(new SimpleRecord(i, "a")); + } + + Dataset df = spark.createDataFrame(expected, SimpleRecord.class); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); + Assert.assertEquals("Result rows should match", expected, actual); + + List files = Lists.newArrayList(); + for (ManifestFile manifest : table.currentSnapshot().manifests()) { + for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) { + files.add(file); + } + } + Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); + } + + @Test + public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException { + File parent = temp.newFolder("parquet"); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List expected = Lists.newArrayListWithCapacity(8000); + for (int i = 0; i < 2000; i++) { + expected.add(new SimpleRecord(i, "a")); + expected.add(new SimpleRecord(i, "b")); + expected.add(new SimpleRecord(i, "c")); + expected.add(new SimpleRecord(i, "d")); + } + + Dataset df = spark.createDataFrame(expected, SimpleRecord.class); + + df.select("id", "data").sort("data").write() + .format("iceberg") + .mode("append") + .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger + .save(location.toString()); + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); + Assert.assertEquals("Result rows should match", expected, actual); + + List files = Lists.newArrayList(); + for (ManifestFile manifest : table.currentSnapshot().manifests()) { + for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) { + files.add(file); + } + } + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); + } }