From 186697360adaa5e5cda01a3eaaab980f11281efb Mon Sep 17 00:00:00 2001 From: liliwei Date: Mon, 28 Mar 2022 14:53:44 +0800 Subject: [PATCH] Spark 3.0: ORC support estimated length for unclosed file. --- .../apache/iceberg/spark/source/SparkWrite.java | 9 +-------- .../iceberg/spark/source/TestSparkDataWrite.java | 16 ++++++---------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 896ba82fc7df..e0addcde7979 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -43,7 +43,6 @@ import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.ClusteredDataWriter; @@ -585,13 +584,7 @@ private static class UnpartitionedDataWriter implements DataWriter private UnpartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, FileFormat format, long targetFileSize) { - // TODO: support ORC rolling writers - if (format == FileFormat.ORC) { - EncryptedOutputFile outputFile = fileFactory.newOutputFile(); - delegate = writerFactory.newDataWriter(outputFile, spec, null); - } else { - delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); - } + this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 5e99dac5c26a..f9241758cd11 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -365,11 +365,9 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } @Test @@ -585,11 +583,9 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } public enum IcebergOptionsType {