diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 7daa330d1271..cd1a0afe3622 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -39,6 +39,7 @@ import org.apache.spark.sql.types.StructType; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; @@ -56,16 +57,18 @@ public String shortName() { @Override public DataSourceReader createReader(DataSourceOptions options) { - Table table = findTable(options); - return new Reader(table, lazyConf()); + Configuration conf = new Configuration(lazyBaseConf()); + Table table = getTableAndResolveHadoopConfiguration(options, conf); + + return new Reader(table, conf); } @Override public Optional createWriter(String jobId, StructType dfStruct, SaveMode mode, DataSourceOptions options) { Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); - - Table table = findTable(options); + Configuration conf = new Configuration(lazyBaseConf()); + Table table = getTableAndResolveHadoopConfiguration(options, conf); Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct); List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); @@ -89,30 +92,49 @@ public Optional createWriter(String jobId, StructType dfStruct .toUpperCase(Locale.ENGLISH)); } - return Optional.of(new Writer(table, lazyConf(), format)); + return Optional.of(new Writer(table, conf, format)); } - protected Table findTable(DataSourceOptions options) { + protected Table findTable(DataSourceOptions options, Configuration conf) { Optional location = options.get("path"); Preconditions.checkArgument(location.isPresent(), "Cannot open table without a location: path is not set"); - HadoopTables tables = new HadoopTables(lazyConf()); + HadoopTables tables = new HadoopTables(conf); return tables.load(location.get()); } - protected SparkSession lazySparkSession() { + private SparkSession lazySparkSession() { if (lazySpark == null) { this.lazySpark = SparkSession.builder().getOrCreate(); } return lazySpark; } - protected Configuration lazyConf() { + private Configuration lazyBaseConf() { if (lazyConf == null) { this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration(); } return lazyConf; } + + private Table getTableAndResolveHadoopConfiguration( + DataSourceOptions options, Configuration conf) { + // Overwrite configurations from the Spark Context with configurations from the options. + mergeIcebergHadoopConfs(conf, options.asMap()); + Table table = findTable(options, conf); + // Set confs from table properties + mergeIcebergHadoopConfs(conf, table.properties()); + // Re-overwrite values set in options and table properties but were not in the environment. + mergeIcebergHadoopConfs(conf, options.asMap()); + return table; + } + + private static void mergeIcebergHadoopConfs( + Configuration baseConf, Map options) { + options.keySet().stream() + .filter(key -> key.startsWith("iceberg.hadoop")) + .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); + } } diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java index a544162eca14..357671b1f446 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java @@ -20,6 +20,7 @@ package com.netflix.iceberg.spark.source; import com.netflix.iceberg.Table; +import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.sources.v2.DataSourceOptions; public class TestIcebergSource extends IcebergSource { @@ -29,7 +30,7 @@ public String shortName() { } @Override - protected Table findTable(DataSourceOptions options) { + protected Table findTable(DataSourceOptions options, Configuration conf) { return TestTables.load(options.get("iceberg.table.name").get()); } }