Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public void delete(T row) {
appender.add(row);
}

public long length() {
return appender.length();
}

@Override
public void close() throws IOException {
if (deleteFile == null) {
Expand Down
109 changes: 78 additions & 31 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;

public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private final List<DataFile> completedFiles = Lists.newArrayList();
private final List<DeleteFile> completedDeletes = Lists.newArrayList();
private final PartitionSpec spec;
private final FileFormat format;
private final FileAppenderFactory<T> appenderFactory;
Expand All @@ -56,7 +58,7 @@ public void abort() throws IOException {
close();

// clean up files created by this writer
Tasks.foreach(completedFiles)
Tasks.foreach(Iterables.concat(completedFiles, completedDeletes))
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));
Expand All @@ -69,21 +71,26 @@ public DataFile[] complete() throws IOException {
return completedFiles.toArray(new DataFile[0]);
}

protected class RollingFileWriter implements Closeable {
protected abstract class BaseRollingWriter<T, W extends Closeable> implements Closeable {
private static final int ROWS_DIVISOR = 1000;
private final PartitionKey partitionKey;

private EncryptedOutputFile currentFile = null;
private FileAppender<T> currentAppender = null;
private W currentWriter = null;
private long currentRows = 0;

public RollingFileWriter(PartitionKey partitionKey) {
public BaseRollingWriter(PartitionKey partitionKey) {
this.partitionKey = partitionKey;
openCurrent();
}

public void add(T record) throws IOException {
this.currentAppender.add(record);
abstract W newWriter(EncryptedOutputFile file, PartitionKey key);
abstract long length(W writer);
abstract void write(W writer, T record);
abstract void complete(W closedWriter);

public void write(T record) throws IOException {
write(currentWriter, record);
this.currentRows++;

if (shouldRollToNewFile()) {
Expand All @@ -95,45 +102,33 @@ public void add(T record) throws IOException {
private void openCurrent() {
if (partitionKey == null) {
// unpartitioned
currentFile = fileFactory.newOutputFile();
this.currentFile = fileFactory.newOutputFile();
} else {
// partitioned
currentFile = fileFactory.newOutputFile(partitionKey);
this.currentFile = fileFactory.newOutputFile(partitionKey);
}
currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
currentRows = 0;
this.currentWriter = newWriter(currentFile, partitionKey);
this.currentRows = 0;
}

private boolean shouldRollToNewFile() {
// TODO: ORC file now not support target file size before closed
return !format.equals(FileFormat.ORC) &&
currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize;
currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
}

private void closeCurrent() throws IOException {
if (currentAppender != null) {
currentAppender.close();
// metrics are only valid after the appender is closed
Metrics metrics = currentAppender.metrics();
long fileSizeInBytes = currentAppender.length();
List<Long> splitOffsets = currentAppender.splitOffsets();
this.currentAppender = null;

if (metrics.recordCount() == 0L) {
if (currentWriter != null) {
currentWriter.close();

if (currentRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
} else {
DataFile dataFile = DataFiles.builder(spec)
.withEncryptionKeyMetadata(currentFile.keyMetadata())
.withPath(currentFile.encryptingOutputFile().location())
.withFileSizeInBytes(fileSizeInBytes)
.withPartition(spec.fields().size() == 0 ? null : partitionKey) // set null if unpartitioned
.withMetrics(metrics)
.withSplitOffsets(splitOffsets)
.build();
completedFiles.add(dataFile);
complete(currentWriter);
}

this.currentFile = null;
this.currentWriter = null;
this.currentRows = 0;
}
}
Expand All @@ -143,4 +138,56 @@ public void close() throws IOException {
closeCurrent();
}
}

protected class RollingFileWriter extends BaseRollingWriter<T, DataWriter<T>> {
public RollingFileWriter(PartitionKey partitionKey) {
super(partitionKey);
}

@Override
DataWriter<T> newWriter(EncryptedOutputFile file, PartitionKey key) {
return appenderFactory.newDataWriter(file, format, key);
}

@Override
long length(DataWriter<T> writer) {
return writer.length();
}

@Override
void write(DataWriter<T> writer, T record) {
writer.add(record);
}

@Override
void complete(DataWriter<T> closedWriter) {
completedFiles.add(closedWriter.toDataFile());
}
}

protected class RollingEqDeleteWriter extends BaseRollingWriter<T, EqualityDeleteWriter<T>> {
public RollingEqDeleteWriter(PartitionKey partitionKey) {
super(partitionKey);
}

@Override
EqualityDeleteWriter<T> newWriter(EncryptedOutputFile file, PartitionKey key) {
return appenderFactory.newEqDeleteWriter(file, format, key);
}

@Override
long length(EqualityDeleteWriter<T> writer) {
return writer.length();
}

@Override
void write(EqualityDeleteWriter<T> writer, T record) {
writer.delete(record);
}

@Override
void complete(EqualityDeleteWriter<T> closedWriter) {
completedDeletes.add(closedWriter.toDeleteFile());
}
}
}
79 changes: 79 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/DataWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class DataWriter<T> implements Closeable {
private final FileAppender<T> appender;
private final FileFormat format;
private final String location;
private final PartitionSpec spec;
private final StructLike partition;
private final ByteBuffer keyMetadata;
private DataFile dataFile = null;

public DataWriter(FileAppender<T> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) {
this.appender = appender;
this.format = format;
this.location = location;
this.spec = spec;
this.partition = partition;
this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null;
}

public void addAll(Iterable<T> rows) {
appender.addAll(rows);
}

public void add(T row) {
appender.add(row);
}

public long length() {
return appender.length();
}

@Override
public void close() throws IOException {
if (dataFile == null) {
appender.close();
this.dataFile = DataFiles.builder(spec)
.withFormat(format)
.withPath(location)
.withPartition(partition)
.withEncryptionKeyMetadata(keyMetadata)
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.build();
}
}

public DataFile toDataFile() {
Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer");
return dataFile;
}
}
34 changes: 34 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
package org.apache.iceberg.io;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;

/**
* Factory to create a new {@link FileAppender} to write records.
Expand All @@ -36,4 +40,34 @@ public interface FileAppenderFactory<T> {
* @return a newly created {@link FileAppender}
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

/**
* Create a new {@link DataWriter}.
*
* @param outputFile an OutputFile used to create an output stream.
* @param format a file format
* @param partition a tuple of partition values
* @return a newly created {@link DataWriter} for rows
*/
DataWriter<T> newDataWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition);

/**
* Create a new {@link EqualityDeleteWriter}.
*
* @param outputFile an OutputFile used to create an output stream.
* @param format a file format
* @param partition a tuple of partition values
* @return a newly created {@link EqualityDeleteWriter} for equality deletes
*/
EqualityDeleteWriter<T> newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition);

/**
* Create a new {@link PositionDeleteWriter}.
*
* @param outputFile an OutputFile used to create an output stream.
* @param format a file format
* @param partition a tuple of partition values
* @return a newly created {@link EqualityDeleteWriter} for position deletes
*/
PositionDeleteWriter<T> newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.io;

import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;

public class UnpartitionedDeltaWriter<T> extends BaseTaskWriter<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would still need to add an extra DeltaWriter between the TaskWriter and RollingFileWriters, because for a fanout TaskWriter, it will have rows from different partitions or buckets, and the writer for different partitions(or buckets) would accept data record, equality deletions , which is named DeltaWriter.

In this way, we could move all the equality delete logics inside a single common DeltaWriter class, and the TaskWriter will focus on how to dispatch the records with the customized policy to the methods in DeltaWriter, for example, Flink's RowData has INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER, if the row is a DELETE, then could use the fanout policy to direct it to DeltaWriter's delete method.


private final RollingFileWriter currentWriter;
private final RollingEqDeleteWriter currentEqDeletes;

public UnpartitionedDeltaWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
currentWriter = new RollingFileWriter(null);
currentEqDeletes = new RollingEqDeleteWriter(null);
}

@Override
public void write(T record) throws IOException {
currentWriter.add(record);
}

public void delete(T record) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be better to add this delete(T record) to the TaskWriter interface ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not TaskWriter, it's DeltaWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. I just wanted to demonstrate that we can add a delete here that works with the rolling writer. What we actually expose will probably be different.

currentEqDeletes.delete(record);
}

@Override
public void close() throws IOException {
currentWriter.close();
currentEqDeletes.close();
}
}
Loading