diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index e70c3efb936b..9caf2717661c 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -154,6 +154,19 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, .equalityFieldIds(equalityFieldIds) .buildEqualityWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .overwrite() + .setAll(config) + .metricsConfig(metricsConfig) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + case PARQUET: return Parquet.writeDeletes(file.encryptingOutputFile()) .createWriterFunc(GenericParquetWriter::buildWriter) @@ -193,6 +206,17 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, .withKeyMetadata(file.keyMetadata()) .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + case PARQUET: return Parquet.writeDeletes(file.encryptingOutputFile()) .createWriterFunc(GenericParquetWriter::buildWriter) diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java index 0a88cbcea193..978f86668088 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -35,11 +35,13 @@ import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -67,6 +69,8 @@ public static Object[] parameters() { return new Object[][] { new Object[] {"avro", false}, new Object[] {"avro", true}, + new Object[] {"orc", false}, + new Object[] {"orc", true}, new Object[] {"parquet", false}, new Object[] {"parquet", true} }; @@ -326,6 +330,12 @@ private CloseableIterable createReader(Schema schema, InputFile inputFil .createReaderFunc(DataReader::create) .build(); + case ORC: + return ORC.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) + .build(); + default: throw new UnsupportedOperationException("Unsupported file format: " + format); } diff --git a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java index b141e7c50848..1806ecc82d5e 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java @@ -37,8 +37,10 @@ import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -62,7 +64,8 @@ public class TestGenericSortedPosDeleteWriter extends TableTestBase { public static Object[] parameters() { return new Object[][] { new Object[] {"avro"}, - new Object[] {"parquet"}, + new Object[] {"orc"}, + new Object[] {"parquet"} }; } @@ -326,6 +329,13 @@ private List readRecordsAsList(Schema schema, CharSequence path) throws .build(); break; + case ORC: + iterable = ORC.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) + .build(); + break; + default: throw new UnsupportedOperationException("Unsupported file format: " + format); } diff --git a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java index a344bb2d4817..76e1b19d69fe 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java @@ -39,12 +39,11 @@ @RunWith(Parameterized.class) public abstract class TestPositionDeltaWriters extends WriterTestBase { - // TODO: add ORC once we support ORC delete files - @Parameterized.Parameters(name = "FileFormat={0}") public static Object[] parameters() { return new Object[][] { new Object[]{FileFormat.AVRO}, + new Object[]{FileFormat.ORC}, new Object[]{FileFormat.PARQUET} }; } diff --git a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java index 6231a569cd2d..c928fbe83aa8 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java @@ -42,7 +42,9 @@ import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -71,6 +73,7 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase { public static Object[][] parameters() { return new Object[][] { {"avro"}, + {"orc"}, {"parquet"} }; } @@ -503,6 +506,13 @@ private List readRecordsAsList(Schema schema, CharSequence path) throws .build(); break; + case ORC: + iterable = ORC.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) + .build(); + break; + default: throw new UnsupportedOperationException("Unsupported file format: " + format); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index e39d470ec7ed..ade5c28837ec 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -162,6 +162,18 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu .equalityFieldIds(equalityFieldIds) .buildEqualityWriter(); + case ORC: + return ORC.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) + .withPartition(partition) + .overwrite() + .setAll(props) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + case PARQUET: return Parquet.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType)) @@ -201,6 +213,20 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp .withKeyMetadata(outputFile.keyMetadata()) .buildPositionWriter(); + case ORC: + RowType orcPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); + return ORC.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(orcPosDeleteSchema, iSchema)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .transformPaths(path -> StringData.fromString(path.toString())) + .buildPositionWriter(); + case PARQUET: RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); return Parquet.writeDeletes(outputFile.encryptingOutputFile()) diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index 71978fd8c453..e96e92930f6a 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -65,6 +65,7 @@ public class TestDeltaTaskWriter extends TableTestBase { public static Object[][] parameters() { return new Object[][] { {"avro"}, + {"orc"}, {"parquet"} }; } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 70eb9b5cc983..9f5f1a608694 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -95,6 +95,10 @@ public static Object[][] parameters() { new Object[] {"avro", 1, false}, new Object[] {"avro", 2, true}, new Object[] {"avro", 2, false}, + new Object[] {"orc", 1, true}, + new Object[] {"orc", 1, false}, + new Object[] {"orc", 2, true}, + new Object[] {"orc", 2, false}, new Object[] {"parquet", 1, true}, new Object[] {"parquet", 1, false}, new Object[] {"parquet", 2, true}, diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index e16940fc4875..69e027df4120 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -85,6 +85,7 @@ public static Object[][] parameters() { new Object[] {"parquet", 1}, new Object[] {"parquet", 2}, new Object[] {"orc", 1}, + new Object[] {"orc", 2} }; } diff --git a/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 5038d136614e..c2ad9e5ea019 100644 --- a/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -512,6 +512,8 @@ protected Object get(PositionDelete delete, int index) { @Override public void write(PositionDelete row, VectorizedRowBatch output) throws IOException { Preconditions.checkArgument(row != null, "value must not be null"); + Preconditions.checkArgument(writers().size() == 2 || row.row() != null, + "The row in PositionDelete must not be null because it was set row schema in position delete."); writeRow(row, output); } } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 29bb4edea174..4becf666ed3e 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -224,6 +224,17 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f .withKeyMetadata(file.keyMetadata()) .buildEqualityWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(SparkOrcWriter::new) + .overwrite() + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(file.keyMetadata()) + .buildEqualityWriter(); + default: throw new UnsupportedOperationException( "Cannot write equality-deletes for unsupported file format: " + format); @@ -261,6 +272,17 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile .withKeyMetadata(file.keyMetadata()) .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(SparkOrcWriter::new) + .overwrite() + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(file.keyMetadata()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .buildPositionWriter(); + default: throw new UnsupportedOperationException( "Cannot write pos-deletes for unsupported file format: " + format); diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 29bb4edea174..4becf666ed3e 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -224,6 +224,17 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f .withKeyMetadata(file.keyMetadata()) .buildEqualityWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(SparkOrcWriter::new) + .overwrite() + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(file.keyMetadata()) + .buildEqualityWriter(); + default: throw new UnsupportedOperationException( "Cannot write equality-deletes for unsupported file format: " + format); @@ -261,6 +272,17 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile .withKeyMetadata(file.keyMetadata()) .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(SparkOrcWriter::new) + .overwrite() + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(file.keyMetadata()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .buildPositionWriter(); + default: throw new UnsupportedOperationException( "Cannot write pos-deletes for unsupported file format: " + format); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 29bb4edea174..4becf666ed3e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -224,6 +224,17 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f .withKeyMetadata(file.keyMetadata()) .buildEqualityWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(SparkOrcWriter::new) + .overwrite() + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(file.keyMetadata()) + .buildEqualityWriter(); + default: throw new UnsupportedOperationException( "Cannot write equality-deletes for unsupported file format: " + format); @@ -261,6 +272,17 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile .withKeyMetadata(file.keyMetadata()) .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(SparkOrcWriter::new) + .overwrite() + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(file.keyMetadata()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .buildPositionWriter(); + default: throw new UnsupportedOperationException( "Cannot write pos-deletes for unsupported file format: " + format);