Skip to content
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,7 @@ private TableProperties() {}

public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled";
public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false";

public static final String WRITE_TARGET_FILE_SIZE = "write.target-file-size";
public static final long WRITE_TARGET_FILE_SIZE_DEFAULT = Long.MAX_VALUE;
}
2 changes: 2 additions & 0 deletions site/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
| write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full |
| write.target-file-size | Long.MAX_VALUE | Controls the size of files generated to target about this many bytes. |

### Table behavior properties

Expand Down Expand Up @@ -75,4 +76,5 @@ df.write
| Spark option | Default | Description |
| ------------ | -------------------------- | ------------------------------------------------------------ |
| write-format | Table write.format.default | File format to use for this write operation; parquet or avro |
| target-file-size | As per table property | Overrides this table's write.target-file-size |

218 changes: 148 additions & 70 deletions spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -33,7 +32,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
Expand Down Expand Up @@ -77,6 +75,8 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_DEFAULT;

// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
class Writer implements DataSourceWriter {
Expand All @@ -89,6 +89,7 @@ class Writer implements DataSourceWriter {
private final boolean replacePartitions;
private final String applicationId;
private final String wapId;
private final long targetFileSize;

Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) {
this(table, options, replacePartitions, applicationId, null);
Expand All @@ -102,6 +103,10 @@ class Writer implements DataSourceWriter {
this.replacePartitions = replacePartitions;
this.applicationId = applicationId;
this.wapId = wapId;

long tableTargetFileSize = Long.parseLong(table.properties().getOrDefault(
WRITE_TARGET_FILE_SIZE, String.valueOf(WRITE_TARGET_FILE_SIZE_DEFAULT)));
this.targetFileSize = options.getLong("target-file-size", tableTargetFileSize);
}

private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
Expand All @@ -119,7 +124,7 @@ private boolean isWapTable() {
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
return new WriterFactory(
table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager);
table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager, targetFileSize);
}

@Override
Expand Down Expand Up @@ -242,34 +247,31 @@ private static class WriterFactory implements DataWriterFactory<InternalRow> {
private final FileFormat format;
private final LocationProvider locations;
private final Map<String, String> properties;
private final String uuid = UUID.randomUUID().toString();
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
private final long targetFileSize;

WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager) {
Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
long targetFileSize) {
this.spec = spec;
this.format = format;
this.locations = locations;
this.properties = properties;
this.fileIo = fileIo;
this.encryptionManager = encryptionManager;
this.targetFileSize = targetFileSize;
}

@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
OutputFileFactory<EncryptedOutputFile> fileFactory = new EncryptedOutputFileFactory(partitionId, taskId, epochId);
AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();

if (spec.fields().isEmpty()) {
OutputFile outputFile = fileIo.newOutputFile(locations.newDataLocation(filename));
return new UnpartitionedWriter(encryptionManager.encrypt(outputFile), format, factory, fileIo);
return new UnpartitionedWriter(fileFactory, format, appenderFactory, fileIo, targetFileSize);
} else {
Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey =
key -> {
OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, filename));
return encryptionManager.encrypt(rawOutputFile);
};
return new PartitionedWriter(spec, format, factory, newOutputFileForKey, fileIo);
return new PartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
}
}

Expand Down Expand Up @@ -303,66 +305,128 @@ public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFor
}
}
}

private class EncryptedOutputFileFactory implements OutputFileFactory<EncryptedOutputFile> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think OutputFileFactory is needed. This is the only implementation of it. This could also be named OutputFileFactory and not mention encryption because the files may not actually be encrypted if the plaintext encryption manager is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

private final int partitionId;
private final long taskId;
private final long epochId;

EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
this.partitionId = partitionId;
this.taskId = taskId;
this.epochId = epochId;
}

private String generateFilename() {
return format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, UUID.randomUUID().toString()));
}

/**
* Generates EncryptedOutputFile for UnpartitionedWriter.
*/
public EncryptedOutputFile newOutputFile() {
OutputFile file = fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
return encryptionManager.encrypt(file);
}

/**
* Generates EncryptedOutputFile for PartitionedWriter.
*/
public EncryptedOutputFile newOutputFile(PartitionKey key) {
OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
return encryptionManager.encrypt(rawOutputFile);
}
}
}

private interface AppenderFactory<T> {
FileAppender<T> newAppender(OutputFile file, FileFormat format);
}

private static class UnpartitionedWriter implements DataWriter<InternalRow>, Closeable {
private interface OutputFileFactory<T> {
T newOutputFile();
T newOutputFile(PartitionKey key);
}

private static class UnpartitionedWriter implements DataWriter<InternalRow> {
private final FileIO fileIo;
private FileAppender<InternalRow> appender = null;
private Metrics metrics = null;
private List<Long> offsetRanges = null;
private final EncryptedOutputFile file;
private FileAppender<InternalRow> currentAppender = null;
private final OutputFileFactory<EncryptedOutputFile> fileFactory;
private final FileFormat format;
private final AppenderFactory<InternalRow> appenderFactory;
private EncryptedOutputFile currentFile = null;
private final List<DataFile> completedFiles = Lists.newArrayList();
private final long targetFileSize;

UnpartitionedWriter(
EncryptedOutputFile outputFile,
OutputFileFactory<EncryptedOutputFile> fileFactory,
FileFormat format,
AppenderFactory<InternalRow> factory,
FileIO fileIo) {
AppenderFactory<InternalRow> appenderFactory,
FileIO fileIo,
long targetFileSize) {
this.fileFactory = fileFactory;
this.format = format;
this.appenderFactory = appenderFactory;
this.fileIo = fileIo;
this.file = outputFile;
this.appender = factory.newAppender(file.encryptingOutputFile(), format);
this.targetFileSize = targetFileSize;

openCurrent();
}

@Override
public void write(InternalRow record) {
appender.add(record);
public void write(InternalRow record) throws IOException {
if (currentAppender.length() >= targetFileSize) {
closeCurrent();
openCurrent();
}

currentAppender.add(record);
}

@Override
public WriterCommitMessage commit() throws IOException {
Preconditions.checkArgument(appender != null, "Commit called on a closed writer: %s", this);
Preconditions.checkArgument(currentAppender != null, "Commit called on a closed writer: %s", this);

// metrics and splitOffsets are populated on close
close();

if (metrics.recordCount() == 0L) {
fileIo.deleteFile(file.encryptingOutputFile());
return new TaskCommit();
}

DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null, metrics, offsetRanges);
closeCurrent();

return new TaskCommit(dataFile);
return new TaskCommit(completedFiles);
}

@Override
public void abort() throws IOException {
Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this);
Preconditions.checkArgument(currentAppender != null, "Abort called on a closed writer: %s", this);

close();
fileIo.deleteFile(file.encryptingOutputFile());
closeCurrent();

// clean up files created by this writer
Tasks.foreach(completedFiles)
.throwFailureWhenFinished()
.noRetry()
.run(file -> fileIo.deleteFile(file.path().toString()));
}

@Override
public void close() throws IOException {
if (this.appender != null) {
this.appender.close();
this.metrics = appender.metrics();
this.offsetRanges = appender.splitOffsets();
this.appender = null;
private void openCurrent() {
this.currentFile = fileFactory.newOutputFile();
this.currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
}

private void closeCurrent() throws IOException {
if (currentAppender != null) {
currentAppender.close();
// metrics are only valid after the appender is closed
Metrics metrics = currentAppender.metrics();
List<Long> splitOffsets = currentAppender.splitOffsets();
this.currentAppender = null;

if (metrics.recordCount() == 0L) {
fileIo.deleteFile(currentFile.encryptingOutputFile());
} else {
DataFile dataFile = DataFiles.fromEncryptedOutputFile(currentFile, null, metrics, splitOffsets);
completedFiles.add(dataFile);
}

this.currentFile = null;
}
}
}
Expand All @@ -372,10 +436,11 @@ private static class PartitionedWriter implements DataWriter<InternalRow> {
private final List<DataFile> completedFiles = Lists.newArrayList();
private final PartitionSpec spec;
private final FileFormat format;
private final AppenderFactory<InternalRow> factory;
private final Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey;
private final AppenderFactory<InternalRow> appenderFactory;
private final OutputFileFactory<EncryptedOutputFile> fileFactory;
private final PartitionKey key;
private final FileIO fileIo;
private final long targetFileSize;

private PartitionKey currentKey = null;
private FileAppender<InternalRow> currentAppender = null;
Expand All @@ -384,15 +449,17 @@ private static class PartitionedWriter implements DataWriter<InternalRow> {
PartitionedWriter(
PartitionSpec spec,
FileFormat format,
AppenderFactory<InternalRow> factory,
Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey,
FileIO fileIo) {
AppenderFactory<InternalRow> appenderFactory,
OutputFileFactory<EncryptedOutputFile> fileFactory,
FileIO fileIo,
long targetFileSize) {
this.spec = spec;
this.format = format;
this.factory = factory;
this.newOutputFileForKey = newOutputFileForKey;
this.appenderFactory = appenderFactory;
this.fileFactory = fileFactory;
this.key = new PartitionKey(spec);
this.fileIo = fileIo;
this.targetFileSize = targetFileSize;
}

@Override
Expand All @@ -401,17 +468,23 @@ public void write(InternalRow row) throws IOException {

if (!key.equals(currentKey)) {
closeCurrent();
completedPartitions.add(currentKey);

if (completedPartitions.contains(key)) {
// if rows are not correctly grouped, detect and fail the write
PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
LOG.warn("Duplicate key: {} == {}", existingKey, key);
throw new IllegalStateException("Already closed file for partition: " + key.toPath());
throw new IllegalStateException("Already closed files for partition: " + key.toPath());
}

this.currentKey = key.copy();
this.currentFile = newOutputFileForKey.apply(currentKey);
this.currentAppender = factory.newAppender(currentFile.encryptingOutputFile(), format);
this.currentFile = fileFactory.newOutputFile(currentKey);
this.currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
}

if (currentAppender.length() >= targetFileSize) {
closeCurrent();
openCurrent();
}

currentAppender.add(row);
Expand All @@ -425,17 +498,18 @@ public WriterCommitMessage commit() throws IOException {

@Override
public void abort() throws IOException {
closeCurrent();

// clean up files created by this writer
Tasks.foreach(completedFiles)
.throwFailureWhenFinished()
.noRetry()
.run(file -> fileIo.deleteFile(file.path().toString()));
}

if (currentAppender != null) {
currentAppender.close();
this.currentAppender = null;
fileIo.deleteFile(currentFile.encryptingOutputFile());
}
private void openCurrent() {
this.currentFile = fileFactory.newOutputFile(currentKey);
this.currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
}

private void closeCurrent() throws IOException {
Expand All @@ -446,15 +520,19 @@ private void closeCurrent() throws IOException {
List<Long> splitOffsets = currentAppender.splitOffsets();
this.currentAppender = null;

DataFile dataFile = DataFiles.builder(spec)
.withEncryptedOutputFile(currentFile)
.withPartition(currentKey)
.withMetrics(metrics)
.withSplitOffsets(splitOffsets)
.build();
if (metrics.recordCount() == 0L) {
fileIo.deleteFile(currentFile.encryptingOutputFile());
} else {
DataFile dataFile = DataFiles.builder(spec)
.withEncryptedOutputFile(currentFile)
.withPartition(currentKey)
.withMetrics(metrics)
.withSplitOffsets(splitOffsets)
.build();
completedFiles.add(dataFile);
}

completedPartitions.add(currentKey);
completedFiles.add(dataFile);
this.currentFile = null;
}
}
}
Expand Down
Loading