Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.io.PathOffset;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class EqualityDeleteWriter<T> implements FileWriter<T, DeleteWriteResult> {
Expand All @@ -43,6 +44,7 @@ public class EqualityDeleteWriter<T> implements FileWriter<T, DeleteWriteResult>
private final int[] equalityFieldIds;
private final SortOrder sortOrder;
private DeleteFile deleteFile = null;
private long recordCount = 0;

public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata,
Expand All @@ -58,8 +60,10 @@ public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String
}

@Override
public void write(T row) {
public PathOffset write(T row) {
appender.add(row);
long offset = recordCount++;
return PathOffset.of(location, offset);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.io.PathOffset;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;

Expand All @@ -43,6 +44,7 @@ public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, De
private final PositionDelete<T> delete;
private final CharSequenceSet referencedDataFiles;
private DeleteFile deleteFile = null;
private long recordCount = 0;

public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) {
Expand All @@ -57,9 +59,11 @@ public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format
}

@Override
public void write(PositionDelete<T> positionDelete) {
public PathOffset write(PositionDelete<T> positionDelete) {
referencedDataFiles.add(positionDelete.path());
appender.add(positionDelete);
long offset = recordCount++;
return PathOffset.of(location, offset);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
protected abstract R aggregatedResult();

@Override
public void write(T row, PartitionSpec spec, StructLike partition) {
public PathOffset write(T row, PartitionSpec spec, StructLike partition) {
if (!spec.equals(currentSpec)) {
if (currentSpec != null) {
closeCurrentWriter();
Expand Down Expand Up @@ -100,7 +100,7 @@ public void write(T row, PartitionSpec spec, StructLike partition) {
this.currentWriter = newWriter(currentSpec, currentPartition);
}

currentWriter.write(row);
return currentWriter.write(row);
}

@Override
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/org/apache/iceberg/io/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DataWriter<T> implements FileWriter<T, DataWriteResult> {
private final ByteBuffer keyMetadata;
private final SortOrder sortOrder;
private DataFile dataFile = null;
private long recordCount = 0;

public DataWriter(FileAppender<T> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) {
Expand All @@ -57,8 +58,10 @@ public DataWriter(FileAppender<T> appender, FileFormat format, String location,
}

@Override
public void write(T row) {
public PathOffset write(T row) {
appender.add(row);
long offset = recordCount++;
return PathOffset.of(location, offset);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.deletes.PositionDelete;

class DefaultPartitioningWriterFactory<T> implements PartitioningWriterFactory<T> {
private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final Type type;

DefaultPartitioningWriterFactory(
FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, Type type) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.type = type;
}

@Override
public PartitioningWriter<T, DataWriteResult> newDataWriter() {
return type == Type.CLUSTERED ?
new ClusteredDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes) :
new FanoutDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes);
}

@Override
public PartitioningWriter<T, DeleteWriteResult> newEqualityDeleteWriter() {
return type == Type.CLUSTERED ?
new ClusteredEqualityDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes) :
new FanoutEqualityDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes);
}

@Override
public PartitioningWriter<PositionDelete<T>, DeleteWriteResult> newPositionDeleteWriter() {
return type == Type.CLUSTERED ?
new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes) :
new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes);
}

enum Type {
CLUSTERED,
FANOUT,
}
}
88 changes: 88 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/DirectTaskWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.IOException;
import java.util.function.Function;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.Tasks;

public class DirectTaskWriter<T> implements TaskWriter<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx are you have any naming suggestion for this class, DirectTaskWriter, AppendTaskWriter,...?

@SuppressWarnings("rawtypes")
private static final Function UNPARTITION = s -> null;

@SuppressWarnings("unchecked")
public static <T> Function<T, StructLike> unpartition() {
return UNPARTITION;
}

private final PartitioningWriter<T, DataWriteResult> writer;
private final Function<T, StructLike> partitioner;
private final PartitionSpec spec;
private final FileIO io;

public DirectTaskWriter(
PartitioningWriterFactory<T> partitioningWriterFactory,
Function<T, StructLike> partitioner, PartitionSpec spec,
FileIO io) {
this.writer = partitioningWriterFactory.newDataWriter();
this.partitioner = partitioner;
this.spec = spec;
this.io = io;
}

@Override
public void write(T row) throws IOException {
StructLike partition = partitioner.apply(row);
writer.write(row, spec, partition);
}

@Override
public void abort() throws IOException {
close();

// clean up files created by this writer
WriteResult result = writeResult();
Tasks.foreach(result.dataFiles())
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));
}

@Override
public WriteResult complete() throws IOException {
close();
return writeResult();
}

@Override
public void close() throws IOException {
writer.close();
}

private WriteResult writeResult() {
DataWriteResult result = writer.result();

return WriteResult.builder()
.addDataFiles(result.dataFiles())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* An equality delete writer capable of writing to multiple specs and partitions that keeps
* delete writers for each seen spec/partition pair open until this writer is closed.
*/
public class FanoutEqualityDeleteWriter<T> extends FanoutWriter<T, DeleteWriteResult> {

private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DeleteFile> deleteFiles;

public FanoutEqualityDeleteWriter(
FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.deleteFiles = Lists.newArrayList();
}

@Override
protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition);
} else {
return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
}

@Override
protected void addResult(DeleteWriteResult result) {
Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files");
deleteFiles.addAll(result.deleteFiles());
}

@Override
protected DeleteWriteResult aggregatedResult() {
return new DeleteWriteResult(deleteFiles);
}
}
Loading