Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/com/netflix/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class TableProperties {

public static final String OBJECT_STORE_PATH = "write.object-storage.path";

// This only applies to files written after this property is set. Files previously written aren't relocated to
// reflect this parameter.
// If not set, defaults to a "data" folder underneath the root path of the table.
public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";

public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.iceberg.PartitionSpec;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.Table;
import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.hadoop.HadoopInputFile;
Expand Down Expand Up @@ -164,7 +165,9 @@ private int propertyAsInt(String property, int defaultValue) {
}

private String dataLocation() {
return new Path(new Path(table.location()), "data").toString();
return table.properties().getOrDefault(
TableProperties.WRITE_NEW_DATA_LOCATION,
new Path(new Path(table.location()), "data").toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class AvroDataTest {

protected abstract void writeAndValidate(Schema schema) throws IOException;

private static final StructType SUPPORTED_PRIMITIVES = StructType.of(
protected static final StructType SUPPORTED_PRIMITIVES = StructType.of(
required(100, "id", LongType.get()),
optional(101, "data", Types.StringType.get()),
required(102, "b", Types.BooleanType.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,25 @@
import com.netflix.iceberg.spark.data.AvroDataTest;
import com.netflix.iceberg.spark.data.RandomData;
import com.netflix.iceberg.spark.data.SparkAvroReader;
import com.netflix.iceberg.types.Types;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;

import static com.netflix.iceberg.spark.SparkSchemaUtil.convert;
Expand All @@ -57,12 +61,14 @@
public class TestDataFrameWrites extends AvroDataTest {
private static final Configuration CONF = new Configuration();

private String format = null;
private final String format;

@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { "parquet" },
new Object[] { "parquet" },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add these?

new Object[] { "avro" },
new Object[] { "avro" }
};
}
Expand Down Expand Up @@ -90,23 +96,43 @@ public static void stopSpark() {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
File location = createTableFolder();
Table table = createTable(schema, location);
writeAndValidateWithLocations(table, location, new File(location, "data"));
}

@Test
public void testWriteWithCustomDataLocation() throws IOException {
File location = createTableFolder();
File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir");
Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location);
table.updateProperties().set(
TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit();
writeAndValidateWithLocations(table, location, tablePropertyDataLocation);
}

private File createTableFolder() throws IOException {
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
Assert.assertTrue("Mkdir should succeed", location.mkdirs());
return location;
}

private Table createTable(Schema schema, File location) {
HadoopTables tables = new HadoopTables(CONF);
Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
return tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
}

private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) throws IOException {
Schema tableSchema = table.schema(); // use the table schema because ids are reassigned

table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

List<Record> expected = RandomData.generateList(tableSchema, 100, 0L);
Dataset<Row> df = createDataset(expected, tableSchema);
DataFrameWriter<?> writer = df.write().format("iceberg").mode("append");

df.write()
.format("iceberg")
.mode("append")
.save(location.toString());
writer.save(location.toString());

table.refresh();

Expand All @@ -120,6 +146,14 @@ protected void writeAndValidate(Schema schema) throws IOException {
for (int i = 0; i < expected.size(); i += 1) {
assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i));
}

table.currentSnapshot().addedFiles().forEach(dataFile ->
Assert.assertTrue(
String.format(
"File should have the parent directory %s, but has: %s.",
expectedDataDir.getAbsolutePath(),
dataFile.path()),
URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath())));
}

private Dataset<Row> createDataset(List<Record> records, Schema schema) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public static void stopSpark() {
public void testBasicWrite() throws IOException {
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
location.mkdirs();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
Expand Down