diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index b2b102b883ef..7e0a82681afa 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.deletes; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DeleteFile; @@ -29,10 +28,12 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class EqualityDeleteWriter implements Closeable { +public class EqualityDeleteWriter implements FileWriter { private final FileAppender appender; private final FileFormat format; private final String location; @@ -56,14 +57,32 @@ public EqualityDeleteWriter(FileAppender appender, FileFormat format, String this.equalityFieldIds = equalityFieldIds; } + @Override + public void write(T row) throws IOException { + appender.add(row); + } + + /** + * Writes equality deletes. + * + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(Iterable)} instead. + */ + @Deprecated public void deleteAll(Iterable rows) { appender.addAll(rows); } + /** + * Writes an equality delete. + * + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(Object)} instead. + */ + @Deprecated public void delete(T row) { appender.add(row); } + @Override public long length() { return appender.length(); } @@ -89,4 +108,9 @@ public DeleteFile toDeleteFile() { Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); return deleteFile; } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile()); + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index 8c5eecfb924e..753242013ec1 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.deletes; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DeleteFile; @@ -28,11 +27,13 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; -public class PositionDeleteWriter implements Closeable { +public class PositionDeleteWriter implements FileWriter, DeleteWriteResult> { private final FileAppender appender; private final FileFormat format; private final String location; @@ -40,7 +41,7 @@ public class PositionDeleteWriter implements Closeable { private final StructLike partition; private final ByteBuffer keyMetadata; private final PositionDelete delete; - private final CharSequenceSet pathSet; + private final CharSequenceSet referencedDataFiles; private DeleteFile deleteFile = null; public PositionDeleteWriter(FileAppender appender, FileFormat format, String location, @@ -52,18 +53,41 @@ public PositionDeleteWriter(FileAppender appender, FileFormat format this.partition = partition; this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; this.delete = PositionDelete.create(); - this.pathSet = CharSequenceSet.empty(); + this.referencedDataFiles = CharSequenceSet.empty(); } + @Override + public void write(PositionDelete positionDelete) throws IOException { + referencedDataFiles.add(positionDelete.path()); + appender.add(positionDelete); + } + + /** + * Writes a position delete. + * + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(PositionDelete)} instead. + */ + @Deprecated public void delete(CharSequence path, long pos) { delete(path, pos, null); } + /** + * Writes a position delete and persists the deleted row. + * + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(PositionDelete)} instead. + */ + @Deprecated public void delete(CharSequence path, long pos, T row) { - pathSet.add(path); + referencedDataFiles.add(path); appender.add(delete.set(path, pos, row)); } + @Override + public long length() { + return appender.length(); + } + @Override public void close() throws IOException { if (deleteFile == null) { @@ -81,11 +105,16 @@ public void close() throws IOException { } public CharSequenceSet referencedDataFiles() { - return pathSet; + return referencedDataFiles; } public DeleteFile toDeleteFile() { Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); return deleteFile; } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile(), referencedDataFiles()); + } } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java new file mode 100644 index 000000000000..97e450f96f05 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DataFile; + +/** + * A result of writing data files. + *

+ * Note that objects of this class are NOT meant to be serialized. Task or delta writers will wrap + * these results into their own serializable results that can be sent back to query engines. + */ +public class DataWriteResult { + private final List dataFiles; + + public DataWriteResult(DataFile dataFile) { + this.dataFiles = Collections.singletonList(dataFile); + } + + public DataWriteResult(List dataFiles) { + this.dataFiles = dataFiles; + } + + public List dataFiles() { + return dataFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 113557a26434..30e1cde22a4d 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.io; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DataFile; @@ -31,7 +30,7 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class DataWriter implements Closeable { +public class DataWriter implements FileWriter { private final FileAppender appender; private final FileFormat format; private final String location; @@ -57,10 +56,22 @@ public DataWriter(FileAppender appender, FileFormat format, String location, this.sortOrder = sortOrder; } + @Override + public void write(T row) { + appender.add(row); + } + + /** + * Writes a data record. + * + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #write(Object)} instead. + */ + @Deprecated public void add(T row) { appender.add(row); } + @Override public long length() { return appender.length(); } @@ -86,4 +97,9 @@ public DataFile toDataFile() { Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer"); return dataFile; } + + @Override + public DataWriteResult result() { + return new DataWriteResult(toDataFile()); + } } diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java new file mode 100644 index 000000000000..e8fe7934ace6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.util.CharSequenceSet; + +/** + * A result of writing delete files. + *

+ * Note that objects of this class are NOT meant to be serialized. Task or delta writers will wrap + * these results into their own serializable results that can be sent back to query engines. + */ +public class DeleteWriteResult { + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public DeleteWriteResult(DeleteFile deleteFile) { + this.deleteFiles = Collections.singletonList(deleteFile); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + public DeleteWriteResult(DeleteFile deleteFile, CharSequenceSet referencedDataFiles) { + this.deleteFiles = Collections.singletonList(deleteFile); + this.referencedDataFiles = referencedDataFiles; + } + + public DeleteWriteResult(List deleteFiles) { + this.deleteFiles = deleteFiles; + this.referencedDataFiles = CharSequenceSet.empty(); + } + + public DeleteWriteResult(List deleteFiles, CharSequenceSet referencedDataFiles) { + this.deleteFiles = deleteFiles; + this.referencedDataFiles = referencedDataFiles; + } + + public List deleteFiles() { + return deleteFiles; + } + + public CharSequenceSet referencedDataFiles() { + return referencedDataFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriter.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java new file mode 100644 index 000000000000..cbaca4f3ec5e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; + +/** + * A writer capable of writing files of a single type (i.e. data/delete) to one spec/partition. + *

+ * As opposed to {@link FileAppender}, this interface should be implemented by classes that not only + * append records to files but actually produce {@link DataFile}s or {@link DeleteFile}s objects + * with Iceberg metadata. Implementations may wrap {@link FileAppender}s with extra information + * such as spec, partition, sort order ID needed to construct {@link DataFile}s or {@link DeleteFile}s. + * + * @param the row type + * @param the result type + */ +public interface FileWriter extends Closeable { + + /** + * Writes rows to a predefined spec/partition. + * + * @param rows data or delete records + * @throws IOException in case of an error during the write process + */ + default void write(Iterable rows) throws IOException { + for (T row : rows) { + write(row); + } + } + + /** + * Writes a row to a predefined spec/partition. + * + * @param row a data or delete record + * @throws IOException in case of an error during the write process + */ + void write(T row) throws IOException; + + /** + * Returns the number of bytes that were currently written by this writer. + * + * @return the number of written bytes + */ + long length(); + + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the file writer result + */ + R result(); +} diff --git a/core/src/main/java/org/apache/iceberg/io/WriterFactory.java b/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java similarity index 98% rename from core/src/main/java/org/apache/iceberg/io/WriterFactory.java rename to core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java index e797c1ec67bb..9b57676f099d 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriterFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java @@ -28,7 +28,7 @@ /** * A factory for creating data and delete writers. */ -public interface WriterFactory { +public interface FileWriterFactory { /** * Creates a new {@link DataWriter}. diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 0da20579a62a..f1c9e52a860a 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.io; -import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; @@ -28,15 +27,17 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; -class SortedPosDeleteWriter implements Closeable { +class SortedPosDeleteWriter implements FileWriter, DeleteWriteResult> { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; private final Map>> posDeletes = Maps.newHashMap(); @@ -51,6 +52,7 @@ class SortedPosDeleteWriter implements Closeable { private final long recordsNumThreshold; private int records = 0; + private boolean closed = false; SortedPosDeleteWriter(FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, @@ -71,6 +73,16 @@ class SortedPosDeleteWriter implements Closeable { this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD); } + @Override + public long length() { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length"); + } + + @Override + public void write(PositionDelete payload) throws IOException { + delete(payload.path(), payload.pos(), payload.row()); + } + public void delete(CharSequence path, long pos) { delete(path, pos, null); } @@ -103,7 +115,16 @@ public CharSequenceSet referencedDataFiles() { @Override public void close() throws IOException { - flushDeletes(); + if (!closed) { + flushDeletes(); + this.closed = true; + } + } + + @Override + public DeleteWriteResult result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return new DeleteWriteResult(completedFiles, referencedDataFiles); } private void flushDeletes() { diff --git a/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java similarity index 94% rename from data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java rename to data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 48b368cab6f3..bc3a0b34eeb9 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -35,15 +35,15 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.WriterFactory; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; /** * A base writer factory to be extended by query engine integrations. */ -public abstract class BaseWriterFactory implements WriterFactory { +public abstract class BaseFileWriterFactory implements FileWriterFactory { private final Table table; private final FileFormat dataFileFormat; private final Schema dataSchema; @@ -54,10 +54,10 @@ public abstract class BaseWriterFactory implements WriterFactory { private final SortOrder equalityDeleteSortOrder; private final Schema positionDeleteRowSchema; - protected BaseWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, - SortOrder dataSortOrder, FileFormat deleteFileFormat, - int[] equalityFieldIds, Schema equalityDeleteRowSchema, - SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema) { + protected BaseFileWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, + SortOrder dataSortOrder, FileFormat deleteFileFormat, + int[] equalityFieldIds, Schema equalityDeleteRowSchema, + SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema) { this.table = table; this.dataFileFormat = dataFileFormat; this.dataSchema = dataSchema; diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java similarity index 91% rename from data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java rename to data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java index ec193264f375..734e5d3da8de 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java @@ -59,7 +59,7 @@ import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME; @RunWith(Parameterized.class) -public abstract class TestWriterFactory extends TableTestBase { +public abstract class TestFileWriterFactory extends TableTestBase { @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") public static Object[] parameters() { return new Object[][] { @@ -81,7 +81,7 @@ public static Object[] parameters() { private StructLike partition = null; private OutputFileFactory fileFactory = null; - public TestWriterFactory(FileFormat fileFormat, boolean partitioned) { + public TestFileWriterFactory(FileFormat fileFormat, boolean partitioned) { super(TABLE_FORMAT_VERSION); this.fileFormat = fileFormat; this.partitioned = partitioned; @@ -94,8 +94,9 @@ public TestWriterFactory(FileFormat fileFormat, boolean partitioned) { ); } - protected abstract WriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema, Schema positionDeleteRowSchema); + protected abstract FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema); protected abstract T toRow(Integer id, String data); @@ -125,7 +126,7 @@ public void setupTable() throws Exception { @Test public void testDataWriter() throws IOException { - WriterFactory writerFactory = newWriterFactory(table.schema()); + FileWriterFactory writerFactory = newWriterFactory(table.schema()); DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); @@ -142,7 +143,7 @@ public void testEqualityDeleteWriter() throws IOException { List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); Schema equalityDeleteRowSchema = table.schema().select("id"); - WriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); // write a data file DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); @@ -191,7 +192,7 @@ public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException { List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); Schema equalityDeleteRowSchema = table.schema().select("id"); - WriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); // write an unpartitioned data file DataFile firstDataFile = writeData(writerFactory, dataRows, table.spec(), partition); @@ -251,7 +252,7 @@ public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException { public void testPositionDeleteWriter() throws IOException { Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); - WriterFactory writerFactory = newWriterFactory(table.schema()); + FileWriterFactory writerFactory = newWriterFactory(table.schema()); // write a data file DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); @@ -297,7 +298,7 @@ public void testPositionDeleteWriter() throws IOException { public void testPositionDeleteWriterWithRow() throws IOException { Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); - WriterFactory writerFactory = newWriterFactory(table.schema(), table.schema()); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), table.schema()); // write a data file DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); @@ -351,20 +352,20 @@ private PartitionKey initPartitionKey() { return partitionKey; } - private WriterFactory newWriterFactory(Schema dataSchema) { + private FileWriterFactory newWriterFactory(Schema dataSchema) { return newWriterFactory(dataSchema, null, null, null); } - private WriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema) { + private FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema) { return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null); } - private WriterFactory newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) { + private FileWriterFactory newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) { return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema); } - private DataFile writeData(WriterFactory writerFactory, List rows, + private DataFile writeData(FileWriterFactory writerFactory, List rows, PartitionSpec spec, StructLike partitionKey) throws IOException { EncryptedOutputFile file = newOutputFile(spec, partitionKey); @@ -379,7 +380,7 @@ private DataFile writeData(WriterFactory writerFactory, List rows, return writer.toDataFile(); } - private DeleteFile writeEqualityDeletes(WriterFactory writerFactory, List deletes, + private DeleteFile writeEqualityDeletes(FileWriterFactory writerFactory, List deletes, PartitionSpec spec, StructLike partitionKey) throws IOException { EncryptedOutputFile file = newOutputFile(spec, partitionKey); @@ -392,7 +393,7 @@ private DeleteFile writeEqualityDeletes(WriterFactory writerFactory, List return writer.toDeleteFile(); } - private Pair writePositionDeletes(WriterFactory writerFactory, + private Pair writePositionDeletes(FileWriterFactory writerFactory, List> deletes, PartitionSpec spec, StructLike partitionKey) throws IOException { diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java index 1cccef3c2072..758c803f5b7d 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java +++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java @@ -91,7 +91,7 @@ public TestWriterMetrics(FileFormat fileFormat) { this.fileFormat = fileFormat; } - protected abstract WriterFactory newWriterFactory(Schema dataSchema); + protected abstract FileWriterFactory newWriterFactory(Schema dataSchema); protected abstract T toRow(Integer id, String data, boolean boolValue, Long longValue); diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java similarity index 92% rename from flink/src/main/java/org/apache/iceberg/flink/sink/FlinkWriterFactory.java rename to flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java index 422d9aaecf24..6ce2542b965f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java @@ -30,7 +30,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseWriterFactory; +import org.apache.iceberg.data.BaseFileWriterFactory; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -45,16 +45,16 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; -class FlinkWriterFactory extends BaseWriterFactory implements Serializable { +class FlinkFileWriterFactory extends BaseFileWriterFactory implements Serializable { private RowType dataFlinkType; private RowType equalityDeleteFlinkType; private RowType positionDeleteFlinkType; - FlinkWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, RowType dataFlinkType, - SortOrder dataSortOrder, FileFormat deleteFileFormat, - int[] equalityFieldIds, Schema equalityDeleteRowSchema, RowType equalityDeleteFlinkType, - SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, - RowType positionDeleteFlinkType) { + FlinkFileWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, RowType dataFlinkType, + SortOrder dataSortOrder, FileFormat deleteFileFormat, + int[] equalityFieldIds, Schema equalityDeleteRowSchema, RowType equalityDeleteFlinkType, + SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, + RowType positionDeleteFlinkType) { super(table, dataFileFormat, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, positionDeleteRowSchema); @@ -233,13 +233,13 @@ Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType) { return this; } - FlinkWriterFactory build() { + FlinkFileWriterFactory build() { boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null; boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null; Preconditions.checkArgument(noEqualityDeleteConf || fullEqualityDeleteConf, "Equality field IDs and equality delete row schema must be set together"); - return new FlinkWriterFactory( + return new FlinkFileWriterFactory( table, dataFileFormat, dataSchema, dataFlinkType, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteFlinkType, equalityDeleteSortOrder, positionDeleteRowSchema, positionDeleteFlinkType); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterFactory.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java similarity index 78% rename from flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterFactory.java rename to flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java index b9fed8bfe94c..3223b6e28b92 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterFactory.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java @@ -27,21 +27,22 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.io.TestWriterFactory; -import org.apache.iceberg.io.WriterFactory; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestFileWriterFactory; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.StructLikeSet; -public class TestFlinkWriterFactory extends TestWriterFactory { +public class TestFlinkFileWriterFactory extends TestFileWriterFactory { - public TestFlinkWriterFactory(FileFormat fileFormat, boolean partitioned) { + public TestFlinkFileWriterFactory(FileFormat fileFormat, boolean partitioned) { super(fileFormat, partitioned); } @Override - protected WriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema, Schema positionDeleteRowSchema) { - return FlinkWriterFactory.builderFor(table) + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return FlinkFileWriterFactory.builderFor(table) .dataSchema(table.schema()) .dataFileFormat(format()) .deleteFileFormat(format()) diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java index df4662eb248a..f2890b1811e4 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java @@ -24,8 +24,8 @@ import org.apache.flink.table.data.StringData; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; -import org.apache.iceberg.io.WriterFactory; public class TestFlinkWriterMetrics extends TestWriterMetrics { @@ -34,8 +34,8 @@ public TestFlinkWriterMetrics(FileFormat fileFormat) { } @Override - protected WriterFactory newWriterFactory(Schema dataSchema) { - return FlinkWriterFactory.builderFor(table) + protected FileWriterFactory newWriterFactory(Schema dataSchema) { + return FlinkFileWriterFactory.builderFor(table) .dataSchema(table.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java similarity index 92% rename from spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java rename to spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index 1ce202d2971b..daff3b9da0ce 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -26,7 +26,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseWriterFactory; +import org.apache.iceberg.data.BaseFileWriterFactory; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -45,16 +45,16 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; -class SparkWriterFactory extends BaseWriterFactory { +class SparkFileWriterFactory extends BaseFileWriterFactory { private StructType dataSparkType; private StructType equalityDeleteSparkType; private StructType positionDeleteSparkType; - SparkWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, StructType dataSparkType, - SortOrder dataSortOrder, FileFormat deleteFileFormat, - int[] equalityFieldIds, Schema equalityDeleteRowSchema, StructType equalityDeleteSparkType, - SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, - StructType positionDeleteSparkType) { + SparkFileWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, StructType dataSparkType, + SortOrder dataSortOrder, FileFormat deleteFileFormat, + int[] equalityFieldIds, Schema equalityDeleteRowSchema, StructType equalityDeleteSparkType, + SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, + StructType positionDeleteSparkType) { super(table, dataFileFormat, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, positionDeleteRowSchema); @@ -219,13 +219,13 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) { return this; } - SparkWriterFactory build() { + SparkFileWriterFactory build() { boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null; boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null; Preconditions.checkArgument(noEqualityDeleteConf || fullEqualityDeleteConf, "Equality field IDs and equality delete row schema must be set together"); - return new SparkWriterFactory( + return new SparkFileWriterFactory( table, dataFileFormat, dataSchema, dataSparkType, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSparkType, equalityDeleteSortOrder, positionDeleteRowSchema, positionDeleteSparkType); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java similarity index 78% rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java rename to spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java index eecbd665a19d..702e8ab98990 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java @@ -22,8 +22,8 @@ import java.util.List; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; -import org.apache.iceberg.io.TestWriterFactory; -import org.apache.iceberg.io.WriterFactory; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestFileWriterFactory; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.StructLikeSet; @@ -32,17 +32,17 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; -public class TestSparkWriterFactory extends TestWriterFactory { +public class TestSparkFileWriterFactory extends TestFileWriterFactory { - public TestSparkWriterFactory(FileFormat fileFormat, boolean partitioned) { + public TestSparkFileWriterFactory(FileFormat fileFormat, boolean partitioned) { super(fileFormat, partitioned); } @Override - protected WriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema, - Schema positionDeleteRowSchema) { - return SparkWriterFactory.builderFor(table) + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return SparkFileWriterFactory.builderFor(table) .dataSchema(table.schema()) .dataFileFormat(format()) .deleteFileFormat(format()) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java index 6e2dd7c0f35c..b17ed7d22780 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java @@ -21,8 +21,8 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.TestWriterMetrics; -import org.apache.iceberg.io.WriterFactory; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.unsafe.types.UTF8String; @@ -34,8 +34,8 @@ public TestSparkWriterMetrics(FileFormat fileFormat) { } @Override - protected WriterFactory newWriterFactory(Schema dataSchema) { - return SparkWriterFactory.builderFor(table) + protected FileWriterFactory newWriterFactory(Schema dataSchema) { + return SparkFileWriterFactory.builderFor(table) .dataSchema(table.schema()) .dataFileFormat(fileFormat) .deleteFileFormat(fileFormat)