Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkOrcWriter;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -309,6 +311,14 @@ public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFor
.overwrite()
.build();

case ORC:
return ORC.write(file)
.createWriterFunc(SparkOrcWriter::new)
.setAll(properties)
.schema(dsSchema)
.overwrite()
.build();

default:
throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat);
}
Expand Down Expand Up @@ -389,7 +399,9 @@ private abstract static class BaseWriter implements DataWriter<InternalRow> {
public abstract void write(InternalRow row) throws IOException;

public void writeInternal(InternalRow row) throws IOException {
if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
//TODO: ORC file now not support target file size before closed
if (!format.equals(FileFormat.ORC) &&
currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
closeCurrent();
openCurrent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -43,11 +45,16 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.optional;

public class TestParquetWrite {
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
private static final Configuration CONF = new Configuration();
private final FileFormat format;
private static SparkSession spark = null;
private static final Schema SCHEMA = new Schema(
optional(1, "id", Types.IntegerType.get()),
optional(2, "data", Types.StringType.get())
Expand All @@ -56,23 +63,34 @@ public class TestParquetWrite {
@Rule
public TemporaryFolder temp = new TemporaryFolder();

private static SparkSession spark = null;
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { "parquet" },
new Object[] { "avro" },
new Object[] { "orc" }
};
}

@BeforeClass
public static void startSpark() {
TestParquetWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}

@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestParquetWrite.spark;
TestParquetWrite.spark = null;
SparkSession currentSpark = TestSparkDataWrite.spark;
TestSparkDataWrite.spark = null;
currentSpark.stop();
}

public TestSparkDataWrite(String format) {
this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
}

@Test
public void testBasicWrite() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -86,10 +104,10 @@ public void testBasicWrite() throws IOException {
);

Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

// TODO: incoming columns must be ordered according to the table's schema
df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

Expand All @@ -104,20 +122,26 @@ public void testBasicWrite() throws IOException {
Assert.assertEquals("Result rows should match", expected, actual);
for (ManifestFile manifest : table.currentSnapshot().manifests()) {
for (DataFile file : ManifestReader.read(manifest, table.io())) {
Assert.assertNotNull("Split offsets not present", file.splitOffsets());
// TODO: avro not support split
if (!format.equals(FileFormat.AVRO)) {
Assert.assertNotNull("Split offsets not present", file.splitOffsets());
}
Assert.assertEquals("Should have reported record count as 1", 1, file.recordCount());
Assert.assertNotNull("Column sizes metric not present", file.columnSizes());
Assert.assertNotNull("Counts metric not present", file.valueCounts());
Assert.assertNotNull("Null value counts metric not present", file.nullValueCounts());
Assert.assertNotNull("Lower bounds metric not present", file.lowerBounds());
Assert.assertNotNull("Upper bounds metric not present", file.upperBounds());
//TODO: append more metric info
if (format.equals(FileFormat.PARQUET)) {
Assert.assertNotNull("Column sizes metric not present", file.columnSizes());
Assert.assertNotNull("Counts metric not present", file.valueCounts());
Assert.assertNotNull("Null value counts metric not present", file.nullValueCounts());
Assert.assertNotNull("Lower bounds metric not present", file.lowerBounds());
Assert.assertNotNull("Upper bounds metric not present", file.upperBounds());
}
}
}
}

@Test
public void testAppend() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -143,11 +167,13 @@ public void testAppend() throws IOException {

df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

df.withColumn("id", df.col("id").plus(3)).select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

Expand All @@ -164,7 +190,7 @@ public void testAppend() throws IOException {

@Test
public void testOverwrite() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -189,12 +215,14 @@ public void testOverwrite() throws IOException {

df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

// overwrite with 2*id to replace record 2, append 4 and 6
df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("overwrite")
.save(location.toString());

Expand All @@ -211,7 +239,7 @@ public void testOverwrite() throws IOException {

@Test
public void testUnpartitionedOverwrite() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -228,12 +256,14 @@ public void testUnpartitionedOverwrite() throws IOException {

df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

// overwrite with the same data; should not produce two copies
df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("overwrite")
.save(location.toString());

Expand All @@ -250,7 +280,7 @@ public void testUnpartitionedOverwrite() throws IOException {

@Test
public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -270,6 +300,7 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws

df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

Expand All @@ -289,13 +320,16 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws
files.add(file);
}
}
Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
// TODO: ORC file now not support target file size
if (!format.equals(FileFormat.ORC)) {
Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
}

@Test
public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -314,6 +348,7 @@ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOExceptio

df.select("id", "data").sort("data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
.save(location.toString());
Expand All @@ -334,13 +369,16 @@ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOExceptio
files.add(file);
}
}
Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
// TODO: ORC file now not support target file size
if (!format.equals(FileFormat.ORC)) {
Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
}

@Test
public void testWriteProjection() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -357,6 +395,7 @@ public void testWriteProjection() throws IOException {

df.select("id").write() // select only id column
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

Expand All @@ -373,7 +412,7 @@ public void testWriteProjection() throws IOException {

@Test
public void testWriteProjectionWithMiddle() throws IOException {
File parent = temp.newFolder("parquet");
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
Expand All @@ -395,6 +434,7 @@ public void testWriteProjectionWithMiddle() throws IOException {

df.select("c1", "c3").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.save(location.toString());

Expand Down