diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java index e522f849f301..0d99c7ea4982 100644 --- a/core/src/main/java/com/netflix/iceberg/TableProperties.java +++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java @@ -67,6 +67,11 @@ public class TableProperties { public static final String OBJECT_STORE_PATH = "write.object-storage.path"; + // This only applies to files written after this property is set. Files previously written aren't relocated to + // reflect this parameter. + // If not set, defaults to a "data" folder underneath the root path of the table. + public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path"; + public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled"; public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false; } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index bed2cf6350e2..c9d3a7b31221 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -32,6 +32,7 @@ import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; +import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.avro.Avro; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.hadoop.HadoopInputFile; @@ -164,7 +165,9 @@ private int propertyAsInt(String property, int defaultValue) { } private String dataLocation() { - return new Path(new Path(table.location()), "data").toString(); + return table.properties().getOrDefault( + TableProperties.WRITE_NEW_DATA_LOCATION, + new Path(new Path(table.location()), "data").toString()); } @Override diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java index f84c6fe8b763..bc74908d728d 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java @@ -38,7 +38,7 @@ public abstract class AvroDataTest { protected abstract void writeAndValidate(Schema schema) throws IOException; - private static final StructType SUPPORTED_PRIMITIVES = StructType.of( + protected static final StructType SUPPORTED_PRIMITIVES = StructType.of( required(100, "id", LongType.get()), optional(101, "data", Types.StringType.get()), required(102, "b", Types.BooleanType.get()), diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java index 3b0d32ba5876..05f8f80b9039 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java @@ -32,10 +32,12 @@ import com.netflix.iceberg.spark.data.AvroDataTest; import com.netflix.iceberg.spark.data.RandomData; import com.netflix.iceberg.spark.data.SparkAvroReader; +import com.netflix.iceberg.types.Types; import org.apache.avro.generic.GenericData.Record; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -43,10 +45,12 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; @@ -57,7 +61,7 @@ public class TestDataFrameWrites extends AvroDataTest { private static final Configuration CONF = new Configuration(); - private String format = null; + private final String format; @Parameterized.Parameters public static Object[][] parameters() { @@ -90,23 +94,43 @@ public static void stopSpark() { @Override protected void writeAndValidate(Schema schema) throws IOException { + File location = createTableFolder(); + Table table = createTable(schema, location); + writeAndValidateWithLocations(table, location, new File(location, "data")); + } + + @Test + public void testWriteWithCustomDataLocation() throws IOException { + File location = createTableFolder(); + File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir"); + Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location); + table.updateProperties().set( + TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit(); + writeAndValidateWithLocations(table, location, tablePropertyDataLocation); + } + + private File createTableFolder() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); Assert.assertTrue("Mkdir should succeed", location.mkdirs()); + return location; + } + private Table createTable(Schema schema, File location) { HadoopTables tables = new HadoopTables(CONF); - Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + return tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + } + + private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) throws IOException { Schema tableSchema = table.schema(); // use the table schema because ids are reassigned table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); List expected = RandomData.generateList(tableSchema, 100, 0L); Dataset df = createDataset(expected, tableSchema); + DataFrameWriter writer = df.write().format("iceberg").mode("append"); - df.write() - .format("iceberg") - .mode("append") - .save(location.toString()); + writer.save(location.toString()); table.refresh(); @@ -120,6 +144,14 @@ protected void writeAndValidate(Schema schema) throws IOException { for (int i = 0; i < expected.size(); i += 1) { assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i)); } + + table.currentSnapshot().addedFiles().forEach(dataFile -> + Assert.assertTrue( + String.format( + "File should have the parent directory %s, but has: %s.", + expectedDataDir.getAbsolutePath(), + dataFile.path()), + URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath()))); } private Dataset createDataset(List records, Schema schema) throws IOException { diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java index 4f71eada6392..a2d105d780e4 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java @@ -71,7 +71,6 @@ public static void stopSpark() { public void testBasicWrite() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); - location.mkdirs(); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();