Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/com/netflix/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ public class TableProperties {
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;

public static final String OBJECT_STORE_PATH = "write.object-storage.path";

// This only applies to files written after this property is set. Files previously written aren't relocated to
// reflect this parameter.
// If not set, defaults to a "data" folder underneath the root path of the table.
public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.netflix.iceberg.FileFormat;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.Table;
import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.hadoop.HadoopTables;
import com.netflix.iceberg.spark.SparkSchemaUtil;
import com.netflix.iceberg.types.CheckCompatibility;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.DataSourceRegister;
Expand Down Expand Up @@ -89,7 +91,11 @@ public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct
.toUpperCase(Locale.ENGLISH));
}

return Optional.of(new Writer(table, lazyConf(), format));
String dataLocation = options.get(TableProperties.WRITE_NEW_DATA_LOCATION)
.orElse(table.properties().getOrDefault(
TableProperties.WRITE_NEW_DATA_LOCATION,
new Path(new Path(table.location()), "data").toString()));
return Optional.of(new Writer(table, lazyConf(), format, dataLocation));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding parameters to Writer whenever a change like this is made, I'd rather pass the options into Writer and handle these there. The dataLocation method could do this work instead of moving it outside the Writer class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doing options processing from a Map<String, String>, inside a constructor, is a bit of an antipattern. Consider for example writing a unit test for this class in the future. If we pass the Writer constructor only a HashMap, the unit test would have to construct that HashMap in a specific way, i.e. knowing what key-value pairs the constructor is expecting.

Perhaps we can have a builder object that acts as a factory that accepts the Map and returns the Writer. The Writer constructor accepts the builder object and copies the set fields on the builder into its own fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I think is strange is passing the location of a write into the writer when we're passing table into the writer. Why isn't that logic entirely handled in the writer? The normal case is for the write location to come from table config. I'm not even sure that we should allow overriding the write location in Spark's write properties. What is the use case there?

I like your reasoning about not passing options as a map to make testing clear in general, but doing it here just shifts the concern to a different test. The test case is that setting "write.folder-storage.path" in Spark options changes the location of output files. A test that passes in the location can validate that the location is respected, but what we actually want to do is test that the table's location defaults, or is set by the table property, or (maybe) is set by Spark options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for our use case we can have the write location specified in the table property. That would be sufficient. I also don't see the downside of introducing the extra flexibility of allowing the override to be specified in data source options, but we could defer the feature until later.

}

protected Table findTable(DataSourceOptions options) {
Expand Down
11 changes: 4 additions & 7 deletions spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package com.netflix.iceberg.spark.source;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -94,16 +93,18 @@ class Writer implements DataSourceWriter, SupportsWriteInternalRow {
private final Table table;
private final Configuration conf;
private final FileFormat format;
private final String dataLocation;

Writer(Table table, Configuration conf, FileFormat format) {
Writer(Table table, Configuration conf, FileFormat format, String dataLocation) {
this.table = table;
this.conf = conf;
this.format = format;
this.dataLocation = dataLocation;
}

@Override
public DataWriterFactory<InternalRow> createInternalRowWriterFactory() {
return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf);
return new WriterFactory(table.spec(), format, dataLocation, table.properties(), conf);
}

@Override
Expand Down Expand Up @@ -167,10 +168,6 @@ private int propertyAsInt(String property, int defaultValue) {
return defaultValue;
}

private String dataLocation() {
return new Path(new Path(table.location()), "data").toString();
}

@Override
public String toString() {
return String.format("IcebergWrite(table=%s, type=%s, format=%s)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,25 @@
import com.netflix.iceberg.spark.data.AvroDataTest;
import com.netflix.iceberg.spark.data.RandomData;
import com.netflix.iceberg.spark.data.SparkAvroReader;
import com.netflix.iceberg.types.Types;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;

import static com.netflix.iceberg.spark.SparkSchemaUtil.convert;
Expand All @@ -56,6 +60,9 @@
@RunWith(Parameterized.class)
public class TestDataFrameWrites extends AvroDataTest {
private static final Configuration CONF = new Configuration();
private static final Schema BASIC_SCHEMA = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.optional(1, "data", Types.ListType.ofOptional(2, Types.StringType.get())));

private String format = null;

Expand Down Expand Up @@ -91,23 +98,55 @@ public static void stopSpark() {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
writeAndValidateWithLocations(schema, false, false);
}

@Test
public void testWrite_overridingDataLocation_tablePropertyOnly() throws IOException {
writeAndValidateWithLocations(BASIC_SCHEMA, true, false);
}

@Test
public void testWrite_overridingDataLocation_sourceOptionOnly() throws IOException {
writeAndValidateWithLocations(BASIC_SCHEMA, false, true);
}

@Test
public void testWrite_overridingDataLocation_sourceOptionTakesPrecedence() throws IOException {
writeAndValidateWithLocations(BASIC_SCHEMA, true, true);
}

private void writeAndValidateWithLocations(
Schema schema,
boolean setTablePropertyDataLocation,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, rereading this I think it tries too hard to reuse code in exchange for the antipattern of using boolean switches. This can be written more idiomatically.

boolean setWriterOptionDataLocation) throws IOException {
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
Assert.assertTrue("Mkdir should succeed", location.mkdirs());

File tablePropertyDataLocation = new File(parent, "test-table-property-data-dir");
Assert.assertTrue("Mkdir should succeed", tablePropertyDataLocation.mkdirs());
File writerPropertyDataLocation = new File(parent, "test-source-option-data-dir");
Assert.assertTrue("Mkdir should succeed", writerPropertyDataLocation.mkdirs());

HadoopTables tables = new HadoopTables(CONF);
Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
Schema tableSchema = table.schema(); // use the table schema because ids are reassigned

table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
if (setTablePropertyDataLocation) {
table.updateProperties().set(
TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit();
}

List<Record> expected = RandomData.generateList(tableSchema, 100, 0L);
Dataset<Row> df = createDataset(expected, tableSchema);
DataFrameWriter<?> writer = df.write().format("iceberg").mode("append");
if (setWriterOptionDataLocation) {
writer = writer.option(TableProperties.WRITE_NEW_DATA_LOCATION, writerPropertyDataLocation.getAbsolutePath());
}

df.write()
.format("iceberg")
.mode("append")
.save(location.toString());
writer.save(location.toString());

table.refresh();

Expand All @@ -121,6 +160,22 @@ protected void writeAndValidate(Schema schema) throws IOException {
for (int i = 0; i < expected.size(); i += 1) {
assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i));
}

File expectedDataDir;
if (setWriterOptionDataLocation) {
expectedDataDir = writerPropertyDataLocation;
} else if (setTablePropertyDataLocation) {
expectedDataDir = tablePropertyDataLocation;
} else {
expectedDataDir = new File(location, "data");
}
table.currentSnapshot().addedFiles().forEach(dataFile ->
Assert.assertTrue(
String.format(
"File should have the parent directory %s, but has: %s.",
expectedDataDir.getAbsolutePath(),
dataFile.path()),
URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath())));
}

private Dataset<Row> createDataset(List<Record> records, Schema schema) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.iceberg.PartitionSpec;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.Table;
import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.hadoop.HadoopTables;
import com.netflix.iceberg.types.Types;
import org.apache.hadoop.conf.Configuration;
Expand Down