diff --git a/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java new file mode 100644 index 000000000000..68ad1f0ee0da --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class BasePositionDeltaWriter implements PositionDeltaWriter { + + private final PartitioningWriter dataWriter; + private final PartitioningWriter, DeleteWriteResult> deleteWriter; + private final PositionDelete positionDelete; + + private boolean closed; + + public BasePositionDeltaWriter(PartitioningWriter dataWriter, + PartitioningWriter, DeleteWriteResult> deleteWriter) { + Preconditions.checkArgument(dataWriter != null, "Data writer cannot be null"); + Preconditions.checkArgument(deleteWriter != null, "Delete writer cannot be null"); + + this.dataWriter = dataWriter; + this.deleteWriter = deleteWriter; + this.positionDelete = PositionDelete.create(); + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) { + dataWriter.write(row, spec, partition); + } + + @Override + public void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) { + positionDelete.set(path, pos, row); + deleteWriter.write(positionDelete, spec, partition); + } + + @Override + public WriteResult result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + + DataWriteResult dataWriteResult = dataWriter.result(); + DeleteWriteResult deleteWriteResult = deleteWriter.result(); + + return WriteResult.builder() + .addDataFiles(dataWriteResult.dataFiles()) + .addDeleteFiles(deleteWriteResult.deleteFiles()) + .addReferencedDataFiles(deleteWriteResult.referencedDataFiles()) + .build(); + } + + @Override + public void close() throws IOException { + if (!closed) { + dataWriter.close(); + deleteWriter.close(); + + this.closed = true; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/EqualityDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/EqualityDeltaWriter.java new file mode 100644 index 000000000000..196292047f3a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/EqualityDeltaWriter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +/** + * A writer capable of writing data and equality deletes that may belong to different specs and partitions. + * + * @param the row type + */ +public interface EqualityDeltaWriter extends Closeable { + + /** + * Inserts a row to the provided spec/partition. + * + * @param row a data record + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + */ + void insert(T row, PartitionSpec spec, StructLike partition); + + /** + * Deletes a row from the provided spec/partition. + *

+ * This method assumes the delete record has the same schema as the rows that will be inserted. + * + * @param row a delete record + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + */ + void delete(T row, PartitionSpec spec, StructLike partition); + + /** + * Deletes a key from the provided spec/partition. + *

+ * This method assumes the delete key contains values only for equality fields. + * + * @param key a delete key + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + */ + void deleteKey(T key, PartitionSpec spec, StructLike partition); + + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the writer result + */ + WriteResult result(); +} diff --git a/core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java new file mode 100644 index 000000000000..2995bbf1296f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +/** + * A writer capable of writing data and position deletes that may belong to different specs and partitions. + * + * @param the row type + */ +public interface PositionDeltaWriter extends Closeable { + + /** + * Inserts a row to the provided spec/partition. + * + * @param row a data record + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + */ + void insert(T row, PartitionSpec spec, StructLike partition); + + /** + * Deletes a position in the provided spec/partition. + * + * @param path a data file path + * @param pos a position + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + */ + default void delete(CharSequence path, long pos, PartitionSpec spec, StructLike partition) { + delete(path, pos, null, spec, partition); + } + + /** + * Deletes a position in the provided spec/partition and records the deleted row in the delete file. + * + * @param path a data file path + * @param pos a position + * @param row a deleted row + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + */ + void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition); + + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the writer result + */ + WriteResult result(); +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java new file mode 100644 index 000000000000..a344bb2d4817 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class TestPositionDeltaWriters extends WriterTestBase { + + // TODO: add ORC once we support ORC delete files + + @Parameterized.Parameters(name = "FileFormat={0}") + public static Object[] parameters() { + return new Object[][] { + new Object[]{FileFormat.AVRO}, + new Object[]{FileFormat.PARQUET} + }; + } + private static final int TABLE_FORMAT_VERSION = 2; + private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024; + + private final FileFormat fileFormat; + private OutputFileFactory fileFactory = null; + + public TestPositionDeltaWriters(FileFormat fileFormat) { + super(TABLE_FORMAT_VERSION); + this.fileFormat = fileFormat; + } + + protected abstract StructLikeSet toSet(Iterable records); + + protected FileFormat format() { + return fileFormat; + } + + @Before + public void setupTable() throws Exception { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); // created during table creation + + this.metadataDir = new File(tableDir, "metadata"); + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build(); + } + + @Test + public void testPositionDeltaInsertOnly() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + ClusteredPositionDeleteWriter deleteWriter = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + PositionDeltaWriter deltaWriter = new BasePositionDeltaWriter<>(dataWriter, deleteWriter); + + deltaWriter.insert(toRow(1, "aaa"), table.spec(), null); + deltaWriter.close(); + + WriteResult result = deltaWriter.result(); + DataFile[] dataFiles = result.dataFiles(); + DeleteFile[] deleteFiles = result.deleteFiles(); + CharSequence[] referencedDataFiles = result.referencedDataFiles(); + + Assert.assertEquals("Must be 1 data files", 1, dataFiles.length); + Assert.assertEquals("Must be no delete files", 0, deleteFiles.length); + Assert.assertEquals("Must not reference data files", 0, referencedDataFiles.length); + + RowDelta rowDelta = table.newRowDelta(); + for (DataFile dataFile : dataFiles) { + rowDelta.addRows(dataFile); + } + rowDelta.commit(); + + List expectedRows = ImmutableList.of( + toRow(1, "aaa") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testPositionDeltaDeleteOnly() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // add an unpartitioned data file + ImmutableList rows1 = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(11, "aaa") + ); + DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null); + table.newFastAppend() + .appendFile(dataFile1) + .commit(); + + // partition by data + table.updateSpec() + .addField(Expressions.ref("data")) + .commit(); + + // add a data file partitioned by data + ImmutableList rows2 = ImmutableList.of( + toRow(3, "bbb"), + toRow(4, "bbb") + ); + DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb")); + table.newFastAppend() + .appendFile(dataFile2) + .commit(); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec partitionedSpec = table.specs().get(1); + + ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + ClusteredPositionDeleteWriter deleteWriter = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + PositionDeltaWriter deltaWriter = new BasePositionDeltaWriter<>(dataWriter, deleteWriter); + + deltaWriter.delete(dataFile1.path(), 2L, unpartitionedSpec, null); + deltaWriter.delete(dataFile2.path(), 1L, partitionedSpec, partitionKey(partitionedSpec, "bbb")); + + deltaWriter.close(); + + WriteResult result = deltaWriter.result(); + DataFile[] dataFiles = result.dataFiles(); + DeleteFile[] deleteFiles = result.deleteFiles(); + CharSequence[] referencedDataFiles = result.referencedDataFiles(); + + Assert.assertEquals("Must be 0 data files", 0, dataFiles.length); + Assert.assertEquals("Must be 2 delete files", 2, deleteFiles.length); + Assert.assertEquals("Must reference 2 data files", 2, referencedDataFiles.length); + + RowDelta rowDelta = table.newRowDelta(); + for (DeleteFile deleteFile : deleteFiles) { + rowDelta.addDeletes(deleteFile); + } + rowDelta.commit(); + + List expectedRows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(3, "bbb") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testPositionDeltaMultipleSpecs() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // add an unpartitioned data file + ImmutableList rows1 = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(11, "aaa") + ); + DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null); + table.newFastAppend() + .appendFile(dataFile1) + .commit(); + + // partition by data + table.updateSpec() + .addField(Expressions.ref("data")) + .commit(); + + // add a data file partitioned by data + ImmutableList rows2 = ImmutableList.of( + toRow(3, "bbb"), + toRow(4, "bbb") + ); + DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb")); + table.newFastAppend() + .appendFile(dataFile2) + .commit(); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec partitionedSpec = table.specs().get(1); + + ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + ClusteredPositionDeleteWriter deleteWriter = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + PositionDeltaWriter deltaWriter = new BasePositionDeltaWriter<>(dataWriter, deleteWriter); + + deltaWriter.delete(dataFile1.path(), 2L, unpartitionedSpec, null); + deltaWriter.delete(dataFile2.path(), 1L, partitionedSpec, partitionKey(partitionedSpec, "bbb")); + deltaWriter.insert(toRow(10, "ccc"), partitionedSpec, partitionKey(partitionedSpec, "ccc")); + + deltaWriter.close(); + + WriteResult result = deltaWriter.result(); + DataFile[] dataFiles = result.dataFiles(); + DeleteFile[] deleteFiles = result.deleteFiles(); + CharSequence[] referencedDataFiles = result.referencedDataFiles(); + + Assert.assertEquals("Must be 1 data files", 1, dataFiles.length); + Assert.assertEquals("Must be 2 delete files", 2, deleteFiles.length); + Assert.assertEquals("Must reference 2 data files", 2, referencedDataFiles.length); + + RowDelta rowDelta = table.newRowDelta(); + for (DataFile dataFile : dataFiles) { + rowDelta.addRows(dataFile); + } + for (DeleteFile deleteFile : deleteFiles) { + rowDelta.addDeletes(deleteFile); + } + rowDelta.commit(); + + List expectedRows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(3, "bbb"), + toRow(10, "ccc") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java new file mode 100644 index 000000000000..5fd5c5eebee9 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestPositionDeltaWriters; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; + +public class TestFlinkPositionDeltaWriters extends TestPositionDeltaWriters { + + public TestFlinkPositionDeltaWriters(FileFormat fileFormat) { + super(fileFormat); + } + + @Override + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return FlinkFileWriterFactory.builderFor(table) + .dataSchema(table.schema()) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) + .build(); + } + + @Override + protected RowData toRow(Integer id, String data) { + return SimpleDataUtil.createRowData(id, data); + } + + @Override + protected StructLikeSet toSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + RowType flinkType = FlinkSchemaUtil.convert(table.schema()); + for (RowData row : rows) { + RowDataWrapper wrapper = new RowDataWrapper(flinkType, table.schema().asStruct()); + set.add(wrapper.wrap(row)); + } + return set; + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java new file mode 100644 index 000000000000..480448e13a8f --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestPositionDeltaWriters; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class TestSparkPositionDeltaWriters extends TestPositionDeltaWriters { + + public TestSparkPositionDeltaWriters(FileFormat fileFormat) { + super(fileFormat); + } + + @Override + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return SparkFileWriterFactory.builderFor(table) + .dataSchema(table.schema()) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) + .build(); + } + + @Override + protected InternalRow toRow(Integer id, String data) { + InternalRow row = new GenericInternalRow(2); + row.update(0, id); + row.update(1, UTF8String.fromString(data)); + return row; + } + + @Override + protected StructLikeSet toSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + StructType sparkType = SparkSchemaUtil.convert(table.schema()); + for (InternalRow row : rows) { + InternalRowWrapper wrapper = new InternalRowWrapper(sparkType); + set.add(wrapper.wrap(row)); + } + return set; + } +}