diff --git a/.baseline/checkstyle/checkstyle.xml b/.baseline/checkstyle/checkstyle.xml index 9df746fe8636..54be02b39616 100644 --- a/.baseline/checkstyle/checkstyle.xml +++ b/.baseline/checkstyle/checkstyle.xml @@ -97,6 +97,7 @@ org.apache.iceberg.IsolationLevel.*, org.apache.iceberg.NullOrder.*, org.apache.iceberg.MetadataTableType.*, + org.apache.iceberg.MetadataColumns.*, org.apache.iceberg.SortDirection.*, org.apache.iceberg.TableProperties.*, org.apache.iceberg.types.Type.*, diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index a8eb2eb37821..e1cf096cd003 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -44,6 +44,7 @@ private MetadataColumns() { Integer.MAX_VALUE - 101, "file_path", Types.StringType.get(), "Path of a file in which a deleted row is stored"); public static final NestedField DELETE_FILE_POS = NestedField.required( Integer.MAX_VALUE - 102, "pos", Types.LongType.get(), "Ordinal position of a deleted row in the data file"); + public static final String DELETE_FILE_ROW_FIELD_NAME = "row"; public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103; public static final String DELETE_FILE_ROW_DOC = "Deleted row values"; diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 4f661813157d..59715b01b0e7 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -321,6 +321,8 @@ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { public DataWriter build() throws IOException { Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating data writer for partitioned spec"); FileAppender fileAppender = appenderBuilder.build(); return new DataWriter<>(fileAppender, FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder); @@ -428,6 +430,10 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); Preconditions.checkState(createWriterFunc != null, "Cannot create equality delete file unless createWriterFunc is set"); + Preconditions.checkArgument(spec != null, + "Spec must not be null when creating equality delete writer"); + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); meta("delete-type", "equality"); meta("delete-field-ids", IntStream.of(equalityFieldIds) @@ -446,6 +452,10 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); + Preconditions.checkArgument(spec != null, + "Spec must not be null when creating position delete writer"); + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); meta("delete-type", "position"); diff --git a/core/src/main/java/org/apache/iceberg/io/WriterFactory.java b/core/src/main/java/org/apache/iceberg/io/WriterFactory.java new file mode 100644 index 000000000000..e797c1ec67bb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/WriterFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; + +/** + * A factory for creating data and delete writers. + */ +public interface WriterFactory { + + /** + * Creates a new {@link DataWriter}. + * + * @param file the output file + * @param spec the partition spec written data belongs to + * @param partition the partition written data belongs to or null if the spec is unpartitioned + * @return the constructed data writer + */ + DataWriter newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition); + + /** + * Creates a new {@link EqualityDeleteWriter}. + * + * @param file the output file + * @param spec the partition spec written deletes belong to + * @param partition the partition written deletes belong to or null if the spec is unpartitioned + * @return the constructed equality delete writer + */ + EqualityDeleteWriter newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition); + + /** + * Creates a new {@link PositionDeleteWriter}. + * + * @param file the output file + * @param spec the partition spec written deletes belong to + * @param partition the partition written deletes belong to or null if the spec is unpartitioned + * @return the constructed position delete writer + */ + PositionDeleteWriter newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition); +} diff --git a/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java new file mode 100644 index 000000000000..72c0ea5868be --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.WriterFactory; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +/** + * A base writer factory to be extended by query engine integrations. + */ +public abstract class BaseWriterFactory implements WriterFactory { + private final Table table; + private final FileFormat dataFileFormat; + private final Schema dataSchema; + private final SortOrder dataSortOrder; + private final FileFormat deleteFileFormat; + private final int[] equalityFieldIds; + private final Schema equalityDeleteRowSchema; + private final SortOrder equalityDeleteSortOrder; + private final Schema positionDeleteRowSchema; + + protected BaseWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, + SortOrder dataSortOrder, FileFormat deleteFileFormat, + int[] equalityFieldIds, Schema equalityDeleteRowSchema, + SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema) { + this.table = table; + this.dataFileFormat = dataFileFormat; + this.dataSchema = dataSchema; + this.dataSortOrder = dataSortOrder; + this.deleteFileFormat = deleteFileFormat; + this.equalityFieldIds = equalityFieldIds; + this.equalityDeleteRowSchema = equalityDeleteRowSchema; + this.equalityDeleteSortOrder = equalityDeleteSortOrder; + this.positionDeleteRowSchema = positionDeleteRowSchema; + } + + protected abstract void configureDataWrite(Avro.DataWriteBuilder builder); + protected abstract void configureEqualityDelete(Avro.DeleteWriteBuilder builder); + protected abstract void configurePositionDelete(Avro.DeleteWriteBuilder builder); + + protected abstract void configureDataWrite(Parquet.DataWriteBuilder builder); + protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder); + protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder); + + // TODO: provide ways to configure ORC delete writers once we support them + protected abstract void configureDataWrite(ORC.DataWriteBuilder builder); + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + OutputFile outputFile = file.encryptingOutputFile(); + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table.properties(); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); + + try { + switch (dataFileFormat) { + case AVRO: + Avro.DataWriteBuilder avroBuilder = Avro.writeData(outputFile) + .schema(dataSchema) + .setAll(properties) + .metricsConfig(metricsConfig) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(dataSortOrder) + .overwrite(); + + configureDataWrite(avroBuilder); + + return avroBuilder.build(); + + case PARQUET: + Parquet.DataWriteBuilder parquetBuilder = Parquet.writeData(outputFile) + .schema(dataSchema) + .setAll(properties) + .metricsConfig(metricsConfig) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(dataSortOrder) + .overwrite(); + + configureDataWrite(parquetBuilder); + + return parquetBuilder.build(); + + case ORC: + ORC.DataWriteBuilder orcBuilder = ORC.writeData(outputFile) + .schema(dataSchema) + .setAll(properties) + .metricsConfig(metricsConfig) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(dataSortOrder) + .overwrite(); + + configureDataWrite(orcBuilder); + + return orcBuilder.build(); + + default: + throw new UnsupportedOperationException("Unsupported data file format: " + dataFileFormat); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public EqualityDeleteWriter newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, + StructLike partition) { + OutputFile outputFile = file.encryptingOutputFile(); + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table.properties(); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); + + try { + switch (deleteFileFormat) { + case AVRO: + // TODO: support metrics configs in Avro equality delete writer + + Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile) + .setAll(properties) + .rowSchema(equalityDeleteRowSchema) + .equalityFieldIds(equalityFieldIds) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(equalityDeleteSortOrder) + .overwrite(); + + configureEqualityDelete(avroBuilder); + + return avroBuilder.buildEqualityWriter(); + + case PARQUET: + Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile) + .setAll(properties) + .metricsConfig(metricsConfig) + .rowSchema(equalityDeleteRowSchema) + .equalityFieldIds(equalityFieldIds) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(equalityDeleteSortOrder) + .overwrite(); + + configureEqualityDelete(parquetBuilder); + + return parquetBuilder.buildEqualityWriter(); + + default: + throw new UnsupportedOperationException("Unsupported format for equality deletes: " + deleteFileFormat); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, + StructLike partition) { + OutputFile outputFile = file.encryptingOutputFile(); + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table.properties(); + + // TODO: build and pass a correct metrics config for position deletes + + try { + switch (deleteFileFormat) { + case AVRO: + Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile) + .setAll(properties) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .overwrite(); + + configurePositionDelete(avroBuilder); + + return avroBuilder.buildPositionWriter(); + + case PARQUET: + Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile) + .setAll(properties) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .overwrite(); + + configurePositionDelete(parquetBuilder); + + return parquetBuilder.buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Unsupported format for position deletes: " + deleteFileFormat); + } + + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } + + protected Schema dataSchema() { + return dataSchema; + } + + protected Schema equalityDeleteRowSchema() { + return equalityDeleteRowSchema; + } + + protected Schema positionDeleteRowSchema() { + return positionDeleteRowSchema; + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java new file mode 100644 index 000000000000..ec193264f375 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME; + +@RunWith(Parameterized.class) +public abstract class TestWriterFactory extends TableTestBase { + @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") + public static Object[] parameters() { + return new Object[][] { + new Object[]{FileFormat.AVRO, false}, + new Object[]{FileFormat.AVRO, true}, + new Object[]{FileFormat.PARQUET, false}, + new Object[]{FileFormat.PARQUET, true}, + new Object[]{FileFormat.ORC, false}, + new Object[]{FileFormat.ORC, true} + }; + } + + private static final int TABLE_FORMAT_VERSION = 2; + + private final FileFormat fileFormat; + private final boolean partitioned; + private final List dataRows; + + private StructLike partition = null; + private OutputFileFactory fileFactory = null; + + public TestWriterFactory(FileFormat fileFormat, boolean partitioned) { + super(TABLE_FORMAT_VERSION); + this.fileFormat = fileFormat; + this.partitioned = partitioned; + this.dataRows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(3, "aaa"), + toRow(4, "aaa"), + toRow(5, "aaa") + ); + } + + protected abstract WriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, Schema positionDeleteRowSchema); + + protected abstract T toRow(Integer id, String data); + + protected abstract StructLikeSet toSet(Iterable records); + + protected FileFormat format() { + return fileFormat; + } + + @Before + public void setupTable() throws Exception { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); // created during table creation + + this.metadataDir = new File(tableDir, "metadata"); + + if (partitioned) { + this.table = create(SCHEMA, SPEC); + this.partition = initPartitionKey(); + } else { + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + this.partition = null; + } + + this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build(); + } + + @Test + public void testDataWriter() throws IOException { + WriterFactory writerFactory = newWriterFactory(table.schema()); + + DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); + + table.newRowDelta() + .addRows(dataFile) + .commit(); + + Assert.assertEquals("Records should match", toSet(dataRows), actualRowSet("*")); + } + + @Test + public void testEqualityDeleteWriter() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); + Schema equalityDeleteRowSchema = table.schema().select("id"); + WriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + + // write a data file + DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); + + // commit the written data file + table.newRowDelta() + .addRows(dataFile) + .commit(); + + // write an equality delete file + List deletes = ImmutableList.of( + toRow(1, "aaa"), + toRow(3, "bbb"), + toRow(5, "ccc") + ); + DeleteFile deleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition); + + // verify the written delete file + GenericRecord deleteRecord = GenericRecord.create(equalityDeleteRowSchema); + List expectedDeletes = ImmutableList.of( + deleteRecord.copy("id", 1), + deleteRecord.copy("id", 3), + deleteRecord.copy("id", 5) + ); + InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString()); + List actualDeletes = readFile(equalityDeleteRowSchema, inputDeleteFile); + Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes); + + // commit the written delete file + table.newRowDelta() + .addDeletes(deleteFile) + .commit(); + + // verify the delete file is applied correctly + List expectedRows = ImmutableList.of( + toRow(2, "aaa"), + toRow(4, "aaa") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + Assume.assumeFalse("Table must start unpartitioned", partitioned); + + List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); + Schema equalityDeleteRowSchema = table.schema().select("id"); + WriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + + // write an unpartitioned data file + DataFile firstDataFile = writeData(writerFactory, dataRows, table.spec(), partition); + Assert.assertEquals("First data file must be unpartitioned", 0, firstDataFile.partition().size()); + + List deletes = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(3, "aaa"), + toRow(4, "aaa") + ); + + // write an unpartitioned delete file + DeleteFile firstDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition); + Assert.assertEquals("First delete file must be unpartitioned", 0, firstDeleteFile.partition().size()); + + // commit the first data and delete files + table.newAppend() + .appendFile(firstDataFile) + .commit(); + table.newRowDelta() + .addDeletes(firstDeleteFile) + .commit(); + + // evolve the spec + table.updateSpec() + .addField("data") + .commit(); + + partition = initPartitionKey(); + + // write a partitioned data file + DataFile secondDataFile = writeData(writerFactory, dataRows, table.spec(), partition); + Assert.assertEquals("Second data file must be partitioned", 1, secondDataFile.partition().size()); + + // write a partitioned delete file + DeleteFile secondDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition); + Assert.assertEquals("Second delete file must be artitioned", 1, secondDeleteFile.partition().size()); + + // commit the second data and delete files + table.newAppend() + .appendFile(secondDataFile) + .commit(); + table.newRowDelta() + .addDeletes(secondDeleteFile) + .commit(); + + // verify both delete files are applied correctly + List expectedRows = ImmutableList.of( + toRow(5, "aaa"), + toRow(5, "aaa") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testPositionDeleteWriter() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + WriterFactory writerFactory = newWriterFactory(table.schema()); + + // write a data file + DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); + + // write a position delete file + List> deletes = ImmutableList.of( + new PositionDelete().set(dataFile.path(), 0L, null), + new PositionDelete().set(dataFile.path(), 2L, null), + new PositionDelete().set(dataFile.path(), 4L, null) + ); + Pair result = writePositionDeletes(writerFactory, deletes, table.spec(), partition); + DeleteFile deleteFile = result.first(); + CharSequenceSet referencedDataFiles = result.second(); + + // verify the written delete file + GenericRecord deleteRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema()); + List expectedDeletes = ImmutableList.of( + deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 0L), + deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 2L), + deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 4L) + ); + InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString()); + List actualDeletes = readFile(DeleteSchemaUtil.pathPosSchema(), inputDeleteFile); + Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes); + + // commit the data and delete files + table.newRowDelta() + .addRows(dataFile) + .addDeletes(deleteFile) + .validateDataFilesExist(referencedDataFiles) + .validateDeletedFiles() + .commit(); + + // verify the delete file is applied correctly + List expectedRows = ImmutableList.of( + toRow(2, "aaa"), + toRow(4, "aaa") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testPositionDeleteWriterWithRow() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + WriterFactory writerFactory = newWriterFactory(table.schema(), table.schema()); + + // write a data file + DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition); + + // write a position delete file and persist the deleted row + List> deletes = ImmutableList.of( + new PositionDelete().set(dataFile.path(), 0, dataRows.get(0)) + ); + Pair result = writePositionDeletes(writerFactory, deletes, table.spec(), partition); + DeleteFile deleteFile = result.first(); + CharSequenceSet referencedDataFiles = result.second(); + + // verify the written delete file + GenericRecord deletedRow = GenericRecord.create(table.schema()); + Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(table.schema()); + GenericRecord deleteRecord = GenericRecord.create(positionDeleteSchema); + Map deleteRecordColumns = ImmutableMap.of( + DELETE_FILE_PATH.name(), dataFile.path(), + DELETE_FILE_POS.name(), 0L, + DELETE_FILE_ROW_FIELD_NAME, deletedRow.copy("id", 1, "data", "aaa") + ); + List expectedDeletes = ImmutableList.of(deleteRecord.copy(deleteRecordColumns)); + InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString()); + List actualDeletes = readFile(positionDeleteSchema, inputDeleteFile); + Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes); + + // commit the data and delete files + table.newRowDelta() + .addRows(dataFile) + .addDeletes(deleteFile) + .validateDataFilesExist(referencedDataFiles) + .validateDeletedFiles() + .commit(); + + // verify the delete file is applied correctly + List expectedRows = ImmutableList.of( + toRow(2, "aaa"), + toRow(3, "aaa"), + toRow(4, "aaa"), + toRow(5, "aaa") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + private PartitionKey initPartitionKey() { + Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa")); + + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + private WriterFactory newWriterFactory(Schema dataSchema) { + return newWriterFactory(dataSchema, null, null, null); + } + + private WriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema) { + return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null); + } + + private WriterFactory newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) { + return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema); + } + + private DataFile writeData(WriterFactory writerFactory, List rows, + PartitionSpec spec, StructLike partitionKey) throws IOException { + + EncryptedOutputFile file = newOutputFile(spec, partitionKey); + DataWriter writer = writerFactory.newDataWriter(file, spec, partitionKey); + + try (DataWriter closeableWriter = writer) { + for (T row : rows) { + closeableWriter.add(row); + } + } + + return writer.toDataFile(); + } + + private DeleteFile writeEqualityDeletes(WriterFactory writerFactory, List deletes, + PartitionSpec spec, StructLike partitionKey) throws IOException { + + EncryptedOutputFile file = newOutputFile(spec, partitionKey); + EqualityDeleteWriter writer = writerFactory.newEqualityDeleteWriter(file, spec, partitionKey); + + try (EqualityDeleteWriter closableWriter = writer) { + closableWriter.deleteAll(deletes); + } + + return writer.toDeleteFile(); + } + + private Pair writePositionDeletes(WriterFactory writerFactory, + List> deletes, + PartitionSpec spec, + StructLike partitionKey) throws IOException { + + EncryptedOutputFile file = newOutputFile(spec, partitionKey); + PositionDeleteWriter writer = writerFactory.newPositionDeleteWriter(file, spec, partitionKey); + + try (PositionDeleteWriter closableWriter = writer) { + for (PositionDelete delete : deletes) { + closableWriter.delete(delete.path(), delete.pos(), delete.row()); + } + } + + return Pair.of(writer.toDeleteFile(), writer.referencedDataFiles()); + } + + private List readFile(Schema schema, InputFile inputFile) throws IOException { + switch (fileFormat) { + case PARQUET: + try (CloseableIterable records = Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build()) { + + return ImmutableList.copyOf(records); + } + + case AVRO: + try (CloseableIterable records = Avro.read(inputFile) + .project(schema) + .createReaderFunc(DataReader::create) + .build()) { + + return ImmutableList.copyOf(records); + } + + default: + throw new UnsupportedOperationException("Unsupported read file format: " + fileFormat); + } + } + + private StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } + + private EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitionKey) { + return fileFactory.newOutputFile(spec, partitionKey); + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 1ab5c70183f9..417598b33f9d 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -215,6 +215,8 @@ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { public DataWriter build() { Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating data writer for partitioned spec"); FileAppender fileAppender = appenderBuilder.build(); return new DataWriter<>(fileAppender, FileFormat.ORC, location, spec, partition, keyMetadata, sortOrder); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index fef320984e1e..aadddbf4b34b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -456,6 +456,8 @@ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { public DataWriter build() throws IOException { Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating data writer for partitioned spec"); FileAppender fileAppender = appenderBuilder.build(); return new DataWriter<>(fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder); @@ -571,6 +573,10 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); Preconditions.checkState(createWriterFunc != null, "Cannot create equality delete file unless createWriterFunc is set"); + Preconditions.checkArgument(spec != null, + "Spec must not be null when creating equality delete writer"); + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); meta("delete-type", "equality"); meta("delete-field-ids", IntStream.of(equalityFieldIds) @@ -589,6 +595,10 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); + Preconditions.checkArgument(spec != null, + "Spec must not be null when creating position delete writer"); + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); meta("delete-type", "position"); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java new file mode 100644 index 000000000000..1ce202d2971b --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Locale; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.BaseWriterFactory; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.SparkAvroWriter; +import org.apache.iceberg.spark.data.SparkOrcWriter; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; + +class SparkWriterFactory extends BaseWriterFactory { + private StructType dataSparkType; + private StructType equalityDeleteSparkType; + private StructType positionDeleteSparkType; + + SparkWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, StructType dataSparkType, + SortOrder dataSortOrder, FileFormat deleteFileFormat, + int[] equalityFieldIds, Schema equalityDeleteRowSchema, StructType equalityDeleteSparkType, + SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, + StructType positionDeleteSparkType) { + + super(table, dataFileFormat, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, + equalityDeleteRowSchema, equalityDeleteSortOrder, positionDeleteRowSchema); + + this.dataSparkType = dataSparkType; + this.equalityDeleteSparkType = equalityDeleteSparkType; + this.positionDeleteSparkType = positionDeleteSparkType; + } + + static Builder builderFor(Table table) { + return new Builder(table); + } + + @Override + protected void configureDataWrite(Avro.DataWriteBuilder builder) { + builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); + } + + @Override + protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { + builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); + } + + @Override + protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { + boolean withRow = positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); + if (withRow) { + // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos + StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME); + StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); + builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); + } + } + + @Override + protected void configureDataWrite(Parquet.DataWriteBuilder builder) { + builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); + } + + @Override + protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { + builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); + } + + @Override + protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { + builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); + builder.transformPaths(path -> UTF8String.fromString(path.toString())); + } + + @Override + protected void configureDataWrite(ORC.DataWriteBuilder builder) { + builder.createWriterFunc(SparkOrcWriter::new); + } + + private StructType dataSparkType() { + if (dataSparkType == null) { + Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); + this.dataSparkType = SparkSchemaUtil.convert(dataSchema()); + } + + return dataSparkType; + } + + private StructType equalityDeleteSparkType() { + if (equalityDeleteSparkType == null) { + Preconditions.checkNotNull(equalityDeleteRowSchema(), "Equality delete schema must not be null"); + this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema()); + } + + return equalityDeleteSparkType; + } + + private StructType positionDeleteSparkType() { + if (positionDeleteSparkType == null) { + // wrap the optional row schema into the position delete schema that contains path and position + Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); + this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema); + } + + return positionDeleteSparkType; + } + + static class Builder { + private final Table table; + private FileFormat dataFileFormat; + private Schema dataSchema; + private StructType dataSparkType; + private SortOrder dataSortOrder; + private FileFormat deleteFileFormat; + private int[] equalityFieldIds; + private Schema equalityDeleteRowSchema; + private StructType equalityDeleteSparkType; + private SortOrder equalityDeleteSortOrder; + private Schema positionDeleteRowSchema; + private StructType positionDeleteSparkType; + + Builder(Table table) { + this.table = table; + + Map properties = table.properties(); + + String dataFileFormatName = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + this.dataFileFormat = FileFormat.valueOf(dataFileFormatName.toUpperCase(Locale.ENGLISH)); + + String deleteFileFormatName = properties.getOrDefault(DELETE_DEFAULT_FILE_FORMAT, dataFileFormatName); + this.deleteFileFormat = FileFormat.valueOf(deleteFileFormatName.toUpperCase(Locale.ENGLISH)); + } + + Builder dataFileFormat(FileFormat newDataFileFormat) { + this.dataFileFormat = newDataFileFormat; + return this; + } + + Builder dataSchema(Schema newDataSchema) { + this.dataSchema = newDataSchema; + return this; + } + + Builder dataSparkType(StructType newDataSparkType) { + this.dataSparkType = newDataSparkType; + return this; + } + + Builder dataSortOrder(SortOrder newDataSortOrder) { + this.dataSortOrder = newDataSortOrder; + return this; + } + + Builder deleteFileFormat(FileFormat newDeleteFileFormat) { + this.deleteFileFormat = newDeleteFileFormat; + return this; + } + + Builder equalityFieldIds(int[] newEqualityFieldIds) { + this.equalityFieldIds = newEqualityFieldIds; + return this; + } + + Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) { + this.equalityDeleteRowSchema = newEqualityDeleteRowSchema; + return this; + } + + Builder equalityDeleteSparkType(StructType newEqualityDeleteSparkType) { + this.equalityDeleteSparkType = newEqualityDeleteSparkType; + return this; + } + + Builder equalityDeleteSortOrder(SortOrder newEqualityDeleteSortOrder) { + this.equalityDeleteSortOrder = newEqualityDeleteSortOrder; + return this; + } + + Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) { + this.positionDeleteRowSchema = newPositionDeleteRowSchema; + return this; + } + + Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) { + this.positionDeleteSparkType = newPositionDeleteSparkType; + return this; + } + + SparkWriterFactory build() { + boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null; + boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null; + Preconditions.checkArgument(noEqualityDeleteConf || fullEqualityDeleteConf, + "Equality field IDs and equality delete row schema must be set together"); + + return new SparkWriterFactory( + table, dataFileFormat, dataSchema, dataSparkType, dataSortOrder, deleteFileFormat, + equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSparkType, equalityDeleteSortOrder, + positionDeleteRowSchema, positionDeleteSparkType); + } + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java new file mode 100644 index 000000000000..eecbd665a19d --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.TestWriterFactory; +import org.apache.iceberg.io.WriterFactory; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class TestSparkWriterFactory extends TestWriterFactory { + + public TestSparkWriterFactory(FileFormat fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + } + + @Override + protected WriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return SparkWriterFactory.builderFor(table) + .dataSchema(table.schema()) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) + .build(); + } + + @Override + protected InternalRow toRow(Integer id, String data) { + InternalRow row = new GenericInternalRow(2); + row.update(0, id); + row.update(1, UTF8String.fromString(data)); + return row; + } + + @Override + protected StructLikeSet toSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + StructType sparkType = SparkSchemaUtil.convert(table.schema()); + for (InternalRow row : rows) { + InternalRowWrapper wrapper = new InternalRowWrapper(sparkType); + set.add(wrapper.wrap(row)); + } + return set; + } +}