diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java index 8f5b17433d6c..2e6f467b6a4d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -52,8 +52,10 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.data.SparkAvroWriter; +import org.apache.iceberg.spark.data.SparkOrcWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -309,6 +311,14 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor .overwrite() .build(); + case ORC: + return ORC.write(file) + .createWriterFunc(SparkOrcWriter::new) + .setAll(properties) + .schema(dsSchema) + .overwrite() + .build(); + default: throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat); } @@ -389,7 +399,9 @@ private abstract static class BaseWriter implements DataWriter { public abstract void write(InternalRow row) throws IOException; public void writeInternal(InternalRow row) throws IOException { - if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) { + //TODO: ORC file now not support target file size before closed + if (!format.equals(FileFormat.ORC) && + currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) { closeCurrent(); openCurrent(); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java similarity index 81% rename from spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java rename to spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 7fa335a13be8..f6bcb66a8458 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -23,8 +23,10 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Locale; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestReader; import org.apache.iceberg.PartitionSpec; @@ -43,11 +45,16 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; -public class TestParquetWrite { +@RunWith(Parameterized.class) +public class TestSparkDataWrite { private static final Configuration CONF = new Configuration(); + private final FileFormat format; + private static SparkSession spark = null; private static final Schema SCHEMA = new Schema( optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()) @@ -56,23 +63,34 @@ public class TestParquetWrite { @Rule public TemporaryFolder temp = new TemporaryFolder(); - private static SparkSession spark = null; + @Parameterized.Parameters + public static Object[][] parameters() { + return new Object[][] { + new Object[] { "parquet" }, + new Object[] { "avro" }, + new Object[] { "orc" } + }; + } @BeforeClass public static void startSpark() { - TestParquetWrite.spark = SparkSession.builder().master("local[2]").getOrCreate(); + TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate(); } @AfterClass public static void stopSpark() { - SparkSession currentSpark = TestParquetWrite.spark; - TestParquetWrite.spark = null; + SparkSession currentSpark = TestSparkDataWrite.spark; + TestSparkDataWrite.spark = null; currentSpark.stop(); } + public TestSparkDataWrite(String format) { + this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + } + @Test public void testBasicWrite() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -86,10 +104,10 @@ public void testBasicWrite() throws IOException { ); Dataset df = spark.createDataFrame(expected, SimpleRecord.class); - // TODO: incoming columns must be ordered according to the table's schema df.select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString()); @@ -104,20 +122,26 @@ public void testBasicWrite() throws IOException { Assert.assertEquals("Result rows should match", expected, actual); for (ManifestFile manifest : table.currentSnapshot().manifests()) { for (DataFile file : ManifestReader.read(manifest, table.io())) { - Assert.assertNotNull("Split offsets not present", file.splitOffsets()); + // TODO: avro not support split + if (!format.equals(FileFormat.AVRO)) { + Assert.assertNotNull("Split offsets not present", file.splitOffsets()); + } Assert.assertEquals("Should have reported record count as 1", 1, file.recordCount()); - Assert.assertNotNull("Column sizes metric not present", file.columnSizes()); - Assert.assertNotNull("Counts metric not present", file.valueCounts()); - Assert.assertNotNull("Null value counts metric not present", file.nullValueCounts()); - Assert.assertNotNull("Lower bounds metric not present", file.lowerBounds()); - Assert.assertNotNull("Upper bounds metric not present", file.upperBounds()); + //TODO: append more metric info + if (format.equals(FileFormat.PARQUET)) { + Assert.assertNotNull("Column sizes metric not present", file.columnSizes()); + Assert.assertNotNull("Counts metric not present", file.valueCounts()); + Assert.assertNotNull("Null value counts metric not present", file.nullValueCounts()); + Assert.assertNotNull("Lower bounds metric not present", file.lowerBounds()); + Assert.assertNotNull("Upper bounds metric not present", file.upperBounds()); + } } } } @Test public void testAppend() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -143,11 +167,13 @@ public void testAppend() throws IOException { df.select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString()); df.withColumn("id", df.col("id").plus(3)).select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString()); @@ -164,7 +190,7 @@ public void testAppend() throws IOException { @Test public void testOverwrite() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -189,12 +215,14 @@ public void testOverwrite() throws IOException { df.select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString()); // overwrite with 2*id to replace record 2, append 4 and 6 df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("overwrite") .save(location.toString()); @@ -211,7 +239,7 @@ public void testOverwrite() throws IOException { @Test public void testUnpartitionedOverwrite() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -228,12 +256,14 @@ public void testUnpartitionedOverwrite() throws IOException { df.select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString()); // overwrite with the same data; should not produce two copies df.select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("overwrite") .save(location.toString()); @@ -250,7 +280,7 @@ public void testUnpartitionedOverwrite() throws IOException { @Test public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -270,6 +300,7 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws df.select("id", "data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString()); @@ -289,13 +320,16 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws files.add(file); } } - Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); + // TODO: ORC file now not support target file size + if (!format.equals(FileFormat.ORC)) { + Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); + } } @Test public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -314,6 +348,7 @@ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOExceptio df.select("id", "data").sort("data").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger .save(location.toString()); @@ -334,13 +369,16 @@ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOExceptio files.add(file); } } - Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); + // TODO: ORC file now not support target file size + if (!format.equals(FileFormat.ORC)) { + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); + } } @Test public void testWriteProjection() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -357,6 +395,7 @@ public void testWriteProjection() throws IOException { df.select("id").write() // select only id column .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString()); @@ -373,7 +412,7 @@ public void testWriteProjection() throws IOException { @Test public void testWriteProjectionWithMiddle() throws IOException { - File parent = temp.newFolder("parquet"); + File parent = temp.newFolder(format.toString()); File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); @@ -395,6 +434,7 @@ public void testWriteProjectionWithMiddle() throws IOException { df.select("c1", "c3").write() .format("iceberg") + .option("write-format", format.toString()) .mode("append") .save(location.toString());