From 78f88477d2eced2eef72a0bcc94072f1f591157c Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 19 Nov 2018 13:32:18 -0800 Subject: [PATCH 1/5] Allow custom hadoop properties to be loaded in the Spark data source. Supporting these custom Hadoop properties should also be done in other Iceberg integrations in subsequent patches. --- .../iceberg/spark/source/IcebergSource.java | 29 ++++++++++++++----- .../spark/source/TestIcebergSource.java | 4 ++- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 3db8a2236dbe..44306746a728 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.StructType; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; @@ -53,16 +54,19 @@ public String shortName() { @Override public DataSourceReader createReader(DataSourceOptions options) { - Table table = findTable(options); - return new Reader(table, lazyConf()); + Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap()); + Table table = findTable(options, conf); + Configuration withTableConfs = mergeIcebergHadoopConfs(conf, table.properties()); + return new Reader(table, withTableConfs); } @Override public Optional createWriter(String jobId, StructType dfStruct, SaveMode mode, DataSourceOptions options) { Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); - - Table table = findTable(options); + Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap()); + Table table = findTable(options, conf); + Configuration withTableHadoopConfs = mergeIcebergHadoopConfs(conf, table.properties()); Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct); List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); @@ -86,15 +90,15 @@ public Optional createWriter(String jobId, StructType dfStruct .toUpperCase(Locale.ENGLISH)); } - return Optional.of(new Writer(table, lazyConf(), format)); + return Optional.of(new Writer(table, withTableHadoopConfs, format)); } - protected Table findTable(DataSourceOptions options) { + protected Table findTable(DataSourceOptions options, Configuration conf) { Optional location = options.get("path"); Preconditions.checkArgument(location.isPresent(), "Cannot open table without a location: path is not set"); - HadoopTables tables = new HadoopTables(lazyConf()); + HadoopTables tables = new HadoopTables(conf); return tables.load(location.get()); } @@ -106,10 +110,19 @@ protected SparkSession lazySparkSession() { return lazySpark; } - protected Configuration lazyConf() { + protected Configuration lazyBaseConf() { if (lazyConf == null) { this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration(); } return lazyConf; } + + protected Configuration mergeIcebergHadoopConfs(Configuration baseConf, Map options) { + Configuration resolvedConf = new Configuration(baseConf); + options.keySet().stream() + .filter(key -> key.startsWith("iceberg.hadoop")) + .filter(key -> baseConf.get(key) == null) + .forEach(key -> resolvedConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); + return resolvedConf; + } } diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java index d61abc56b9e4..dd804f7308cb 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java @@ -17,6 +17,8 @@ package com.netflix.iceberg.spark.source; import com.netflix.iceberg.Table; +import org.apache.hadoop.conf.Configuration; + import org.apache.spark.sql.sources.v2.DataSourceOptions; public class TestIcebergSource extends IcebergSource { @@ -26,7 +28,7 @@ public String shortName() { } @Override - protected Table findTable(DataSourceOptions options) { + protected Table findTable(DataSourceOptions options, Configuration conf) { return TestTables.load(options.get("iceberg.table.name").get()); } } From 66f21603c33762fede06e447c765273574b77d32 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 26 Nov 2018 18:46:08 -0800 Subject: [PATCH 2/5] Reuse Hadoop configuration in second case where it's thread-safe to do so. --- .../com/netflix/iceberg/spark/source/IcebergSource.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 0c4402aa82c4..15b58441ea70 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -57,7 +57,7 @@ public String shortName() { @Override public DataSourceReader createReader(DataSourceOptions options) { - Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap()); + Configuration conf = mergeIcebergHadoopConfs(new Configuration(lazyBaseConf()), options.asMap()); Table table = findTable(options, conf); Configuration withTableConfs = mergeIcebergHadoopConfs(conf, table.properties()); return new Reader(table, withTableConfs); @@ -67,7 +67,7 @@ public DataSourceReader createReader(DataSourceOptions options) { public Optional createWriter(String jobId, StructType dfStruct, SaveMode mode, DataSourceOptions options) { Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); - Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap()); + Configuration conf = mergeIcebergHadoopConfs(new Configuration(lazyBaseConf()), options.asMap()); Table table = findTable(options, conf); Configuration withTableHadoopConfs = mergeIcebergHadoopConfs(conf, table.properties()); @@ -121,11 +121,10 @@ protected Configuration lazyBaseConf() { } protected Configuration mergeIcebergHadoopConfs(Configuration baseConf, Map options) { - Configuration resolvedConf = new Configuration(baseConf); options.keySet().stream() .filter(key -> key.startsWith("iceberg.hadoop")) .filter(key -> baseConf.get(key) == null) - .forEach(key -> resolvedConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); - return resolvedConf; + .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); + return baseConf; } } From 9e535084d568c5042413d5c6f938a50d2508ac9e Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Dec 2018 16:15:31 -0800 Subject: [PATCH 3/5] Set order of precedence. Don't create too many configurations. Fix spacing. --- .../iceberg/spark/source/IcebergSource.java | 31 +++++++++++-------- .../spark/source/TestIcebergSource.java | 1 - 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 0c4402aa82c4..4c7ce29f1e75 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -57,19 +57,25 @@ public String shortName() { @Override public DataSourceReader createReader(DataSourceOptions options) { - Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap()); + Configuration conf = new Configuration(lazyBaseConf()); + // Overwrite configurations from the Spark Context with configurations from the options. + mergeIcebergHadoopConfs(conf, options.asMap(), true); Table table = findTable(options, conf); - Configuration withTableConfs = mergeIcebergHadoopConfs(conf, table.properties()); - return new Reader(table, withTableConfs); + // Do not overwrite options from the Spark Context with configurations from the table + mergeIcebergHadoopConfs(conf, table.properties(), false); + return new Reader(table, conf); } @Override public Optional createWriter(String jobId, StructType dfStruct, SaveMode mode, DataSourceOptions options) { Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); - Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap()); + Configuration conf = new Configuration(lazyBaseConf()); + // Overwrite configurations from the Spark Context with configurations from the options. + mergeIcebergHadoopConfs(conf, options.asMap(), true); Table table = findTable(options, conf); - Configuration withTableHadoopConfs = mergeIcebergHadoopConfs(conf, table.properties()); + // Do not overwrite options from the Spark Context with configurations from the table + mergeIcebergHadoopConfs(conf, table.properties(), false); Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct); List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); @@ -93,7 +99,7 @@ public Optional createWriter(String jobId, StructType dfStruct .toUpperCase(Locale.ENGLISH)); } - return Optional.of(new Writer(table, withTableHadoopConfs, format)); + return Optional.of(new Writer(table, conf, format)); } protected Table findTable(DataSourceOptions options, Configuration conf) { @@ -106,26 +112,25 @@ protected Table findTable(DataSourceOptions options, Configuration conf) { return tables.load(location.get()); } - protected SparkSession lazySparkSession() { + private SparkSession lazySparkSession() { if (lazySpark == null) { this.lazySpark = SparkSession.builder().getOrCreate(); } return lazySpark; } - protected Configuration lazyBaseConf() { + private Configuration lazyBaseConf() { if (lazyConf == null) { this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration(); } return lazyConf; } - protected Configuration mergeIcebergHadoopConfs(Configuration baseConf, Map options) { - Configuration resolvedConf = new Configuration(baseConf); + private static void mergeIcebergHadoopConfs( + Configuration baseConf, Map options, boolean overwrite) { options.keySet().stream() .filter(key -> key.startsWith("iceberg.hadoop")) - .filter(key -> baseConf.get(key) == null) - .forEach(key -> resolvedConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); - return resolvedConf; + .filter(key -> overwrite || baseConf.get(key.replaceFirst("iceberg.hadoop", "")) == null) + .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); } } diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java index 9989681436a0..357671b1f446 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java @@ -21,7 +21,6 @@ import com.netflix.iceberg.Table; import org.apache.hadoop.conf.Configuration; - import org.apache.spark.sql.sources.v2.DataSourceOptions; public class TestIcebergSource extends IcebergSource { From 6f07979da5331c2e033cfdb82c69797c6fd32de1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Dec 2018 17:40:41 -0800 Subject: [PATCH 4/5] Properly resolve hadoop configurations by doing a second pass over options --- .../iceberg/spark/source/IcebergSource.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 4c7ce29f1e75..79cf0aa005fc 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -58,11 +58,8 @@ public String shortName() { @Override public DataSourceReader createReader(DataSourceOptions options) { Configuration conf = new Configuration(lazyBaseConf()); - // Overwrite configurations from the Spark Context with configurations from the options. - mergeIcebergHadoopConfs(conf, options.asMap(), true); - Table table = findTable(options, conf); - // Do not overwrite options from the Spark Context with configurations from the table - mergeIcebergHadoopConfs(conf, table.properties(), false); + Table table = getTableAndResolveHadoopConfiguration(options, conf); + return new Reader(table, conf); } @@ -71,11 +68,7 @@ public Optional createWriter(String jobId, StructType dfStruct DataSourceOptions options) { Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); Configuration conf = new Configuration(lazyBaseConf()); - // Overwrite configurations from the Spark Context with configurations from the options. - mergeIcebergHadoopConfs(conf, options.asMap(), true); - Table table = findTable(options, conf); - // Do not overwrite options from the Spark Context with configurations from the table - mergeIcebergHadoopConfs(conf, table.properties(), false); + Table table = getTableAndResolveHadoopConfiguration(options, conf); Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct); List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); @@ -126,6 +119,19 @@ private Configuration lazyBaseConf() { return lazyConf; } + private Table getTableAndResolveHadoopConfiguration( + DataSourceOptions options, Configuration conf) { + // Overwrite configurations from the Spark Context with configurations from the options. + mergeIcebergHadoopConfs(conf, options.asMap(), true); + Table table = findTable(options, conf); + // Set confs from table properties, but do not overwrite options from the Spark Context with + // configurations from the table + mergeIcebergHadoopConfs(conf, table.properties(), false); + // Re-overwrite values set in options and table properties but were not in the environment. + mergeIcebergHadoopConfs(conf, options.asMap(), true); + return table; + } + private static void mergeIcebergHadoopConfs( Configuration baseConf, Map options, boolean overwrite) { options.keySet().stream() From 8b238ea01e43876b7c42e7c841b91f297825b101 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 11 Dec 2018 12:54:08 -0800 Subject: [PATCH 5/5] Remove overwrite flag --- .../netflix/iceberg/spark/source/IcebergSource.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 79cf0aa005fc..cd1a0afe3622 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -122,21 +122,19 @@ private Configuration lazyBaseConf() { private Table getTableAndResolveHadoopConfiguration( DataSourceOptions options, Configuration conf) { // Overwrite configurations from the Spark Context with configurations from the options. - mergeIcebergHadoopConfs(conf, options.asMap(), true); + mergeIcebergHadoopConfs(conf, options.asMap()); Table table = findTable(options, conf); - // Set confs from table properties, but do not overwrite options from the Spark Context with - // configurations from the table - mergeIcebergHadoopConfs(conf, table.properties(), false); + // Set confs from table properties + mergeIcebergHadoopConfs(conf, table.properties()); // Re-overwrite values set in options and table properties but were not in the environment. - mergeIcebergHadoopConfs(conf, options.asMap(), true); + mergeIcebergHadoopConfs(conf, options.asMap()); return table; } private static void mergeIcebergHadoopConfs( - Configuration baseConf, Map options, boolean overwrite) { + Configuration baseConf, Map options) { options.keySet().stream() .filter(key -> key.startsWith("iceberg.hadoop")) - .filter(key -> overwrite || baseConf.get(key.replaceFirst("iceberg.hadoop", "")) == null) .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); } }