diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index 1838ef749b08..05bb8ec77131 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -61,6 +61,10 @@ public void delete(T row) { appender.add(row); } + public long length() { + return appender.length(); + } + @Override public void close() throws IOException { if (deleteFile == null) { diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index ca16318d8bdc..5542305f6c76 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -23,17 +23,19 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; public abstract class BaseTaskWriter implements TaskWriter { private final List completedFiles = Lists.newArrayList(); + private final List completedDeletes = Lists.newArrayList(); private final PartitionSpec spec; private final FileFormat format; private final FileAppenderFactory appenderFactory; @@ -56,7 +58,7 @@ public void abort() throws IOException { close(); // clean up files created by this writer - Tasks.foreach(completedFiles) + Tasks.foreach(Iterables.concat(completedFiles, completedDeletes)) .throwFailureWhenFinished() .noRetry() .run(file -> io.deleteFile(file.path().toString())); @@ -69,21 +71,26 @@ public DataFile[] complete() throws IOException { return completedFiles.toArray(new DataFile[0]); } - protected class RollingFileWriter implements Closeable { + protected abstract class BaseRollingWriter implements Closeable { private static final int ROWS_DIVISOR = 1000; private final PartitionKey partitionKey; private EncryptedOutputFile currentFile = null; - private FileAppender currentAppender = null; + private W currentWriter = null; private long currentRows = 0; - public RollingFileWriter(PartitionKey partitionKey) { + public BaseRollingWriter(PartitionKey partitionKey) { this.partitionKey = partitionKey; openCurrent(); } - public void add(T record) throws IOException { - this.currentAppender.add(record); + abstract W newWriter(EncryptedOutputFile file, PartitionKey key); + abstract long length(W writer); + abstract void write(W writer, T record); + abstract void complete(W closedWriter); + + public void write(T record) throws IOException { + write(currentWriter, record); this.currentRows++; if (shouldRollToNewFile()) { @@ -95,45 +102,33 @@ public void add(T record) throws IOException { private void openCurrent() { if (partitionKey == null) { // unpartitioned - currentFile = fileFactory.newOutputFile(); + this.currentFile = fileFactory.newOutputFile(); } else { // partitioned - currentFile = fileFactory.newOutputFile(partitionKey); + this.currentFile = fileFactory.newOutputFile(partitionKey); } - currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format); - currentRows = 0; + this.currentWriter = newWriter(currentFile, partitionKey); + this.currentRows = 0; } private boolean shouldRollToNewFile() { // TODO: ORC file now not support target file size before closed return !format.equals(FileFormat.ORC) && - currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize; + currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize; } private void closeCurrent() throws IOException { - if (currentAppender != null) { - currentAppender.close(); - // metrics are only valid after the appender is closed - Metrics metrics = currentAppender.metrics(); - long fileSizeInBytes = currentAppender.length(); - List splitOffsets = currentAppender.splitOffsets(); - this.currentAppender = null; - - if (metrics.recordCount() == 0L) { + if (currentWriter != null) { + currentWriter.close(); + + if (currentRows == 0L) { io.deleteFile(currentFile.encryptingOutputFile()); } else { - DataFile dataFile = DataFiles.builder(spec) - .withEncryptionKeyMetadata(currentFile.keyMetadata()) - .withPath(currentFile.encryptingOutputFile().location()) - .withFileSizeInBytes(fileSizeInBytes) - .withPartition(spec.fields().size() == 0 ? null : partitionKey) // set null if unpartitioned - .withMetrics(metrics) - .withSplitOffsets(splitOffsets) - .build(); - completedFiles.add(dataFile); + complete(currentWriter); } this.currentFile = null; + this.currentWriter = null; this.currentRows = 0; } } @@ -143,4 +138,56 @@ public void close() throws IOException { closeCurrent(); } } + + protected class RollingFileWriter extends BaseRollingWriter> { + public RollingFileWriter(PartitionKey partitionKey) { + super(partitionKey); + } + + @Override + DataWriter newWriter(EncryptedOutputFile file, PartitionKey key) { + return appenderFactory.newDataWriter(file, format, key); + } + + @Override + long length(DataWriter writer) { + return writer.length(); + } + + @Override + void write(DataWriter writer, T record) { + writer.add(record); + } + + @Override + void complete(DataWriter closedWriter) { + completedFiles.add(closedWriter.toDataFile()); + } + } + + protected class RollingEqDeleteWriter extends BaseRollingWriter> { + public RollingEqDeleteWriter(PartitionKey partitionKey) { + super(partitionKey); + } + + @Override + EqualityDeleteWriter newWriter(EncryptedOutputFile file, PartitionKey key) { + return appenderFactory.newEqDeleteWriter(file, format, key); + } + + @Override + long length(EqualityDeleteWriter writer) { + return writer.length(); + } + + @Override + void write(EqualityDeleteWriter writer, T record) { + writer.delete(record); + } + + @Override + void complete(EqualityDeleteWriter closedWriter) { + completedDeletes.add(closedWriter.toDeleteFile()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java new file mode 100644 index 000000000000..dd7deea3402e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class DataWriter implements Closeable { + private final FileAppender appender; + private final FileFormat format; + private final String location; + private final PartitionSpec spec; + private final StructLike partition; + private final ByteBuffer keyMetadata; + private DataFile dataFile = null; + + public DataWriter(FileAppender appender, FileFormat format, String location, + PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { + this.appender = appender; + this.format = format; + this.location = location; + this.spec = spec; + this.partition = partition; + this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; + } + + public void addAll(Iterable rows) { + appender.addAll(rows); + } + + public void add(T row) { + appender.add(row); + } + + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + if (dataFile == null) { + appender.close(); + this.dataFile = DataFiles.builder(spec) + .withFormat(format) + .withPath(location) + .withPartition(partition) + .withEncryptionKeyMetadata(keyMetadata) + .withFileSizeInBytes(appender.length()) + .withMetrics(appender.metrics()) + .withSplitOffsets(appender.splitOffsets()) + .build(); + } + } + + public DataFile toDataFile() { + Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer"); + return dataFile; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 9afdca460f0a..e2d8f88f1fe9 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -20,6 +20,10 @@ package org.apache.iceberg.io; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; /** * Factory to create a new {@link FileAppender} to write records. @@ -36,4 +40,34 @@ public interface FileAppenderFactory { * @return a newly created {@link FileAppender} */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + + /** + * Create a new {@link DataWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link DataWriter} for rows + */ + DataWriter newDataWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link EqualityDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for equality deletes + */ + EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link PositionDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for position deletes + */ + PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); } diff --git a/core/src/main/java/org/apache/iceberg/io/UnpartitionedDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/UnpartitionedDeltaWriter.java new file mode 100644 index 000000000000..25840604b05e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/UnpartitionedDeltaWriter.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; + +public class UnpartitionedDeltaWriter extends BaseTaskWriter { + + private final RollingFileWriter currentWriter; + private final RollingEqDeleteWriter currentEqDeletes; + + public UnpartitionedDeltaWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + currentWriter = new RollingFileWriter(null); + currentEqDeletes = new RollingEqDeleteWriter(null); + } + + @Override + public void write(T record) throws IOException { + currentWriter.add(record); + } + + public void delete(T record) throws IOException { + currentEqDeletes.delete(record); + } + + @Override + public void close() throws IOException { + currentWriter.close(); + currentEqDeletes.close(); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 3383fe7c29f7..e77b39b27a9f 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -20,12 +20,19 @@ package org.apache.iceberg.spark.source; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -41,11 +48,16 @@ class SparkAppenderFactory implements FileAppenderFactory { private final Map properties; private final Schema writeSchema; private final StructType dsSchema; + private final PartitionSpec spec; + private final int[] equalityFieldIds; SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema) { this.properties = properties; this.writeSchema = writeSchema; this.dsSchema = dsSchema; + // TODO: set these for real + this.spec = PartitionSpec.unpartitioned(); + this.equalityFieldIds = new int[] { 0 }; } @Override @@ -86,4 +98,81 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor throw new RuntimeIOException(e); } } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat fileFormat, StructLike partition) { + return new DataWriter<>( + newAppender(file.encryptingOutputFile(), fileFormat), fileFormat, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + try { + switch (format) { + case PARQUET: + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) + .overwrite() + .rowSchema(writeSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(outputFile.keyMetadata()) + .buildEqualityWriter(); + + case AVRO: + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) + .overwrite() + .rowSchema(writeSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(outputFile.keyMetadata()) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unsupported format: " + format); + } + + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + try { + switch (format) { + case PARQUET: + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) + .overwrite() + .rowSchema(writeSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(outputFile.keyMetadata()) + .buildPositionWriter(); + + case AVRO: + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) + .overwrite() + .rowSchema(writeSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(outputFile.keyMetadata()) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unsupported format: " + format); + } + + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } }