Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.sql.types.StructType;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
Expand All @@ -56,16 +57,19 @@ public String shortName() {

@Override
public DataSourceReader createReader(DataSourceOptions options) {
Table table = findTable(options);
return new Reader(table, lazyConf());
Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap());
Table table = findTable(options, conf);
Configuration withTableConfs = mergeIcebergHadoopConfs(conf, table.properties());
return new Reader(table, withTableConfs);
}

@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
DataSourceOptions options) {
Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);

Table table = findTable(options);
Configuration conf = mergeIcebergHadoopConfs(lazyBaseConf(), options.asMap());
Table table = findTable(options, conf);
Configuration withTableHadoopConfs = mergeIcebergHadoopConfs(conf, table.properties());

Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
Expand All @@ -89,15 +93,15 @@ public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct
.toUpperCase(Locale.ENGLISH));
}

return Optional.of(new Writer(table, lazyConf(), format));
return Optional.of(new Writer(table, withTableHadoopConfs, format));
}

protected Table findTable(DataSourceOptions options) {
protected Table findTable(DataSourceOptions options, Configuration conf) {
Optional<String> location = options.get("path");
Preconditions.checkArgument(location.isPresent(),
"Cannot open table without a location: path is not set");

HadoopTables tables = new HadoopTables(lazyConf());
HadoopTables tables = new HadoopTables(conf);

return tables.load(location.get());
}
Expand All @@ -109,10 +113,19 @@ protected SparkSession lazySparkSession() {
return lazySpark;
}

protected Configuration lazyConf() {
protected Configuration lazyBaseConf() {
if (lazyConf == null) {
this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration();
}
return lazyConf;
}

protected Configuration mergeIcebergHadoopConfs(Configuration baseConf, Map<String, String> options) {
Configuration resolvedConf = new Configuration(baseConf);
options.keySet().stream()
.filter(key -> key.startsWith("iceberg.hadoop"))
.filter(key -> baseConf.get(key) == null)
.forEach(key -> resolvedConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key)));
return resolvedConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package com.netflix.iceberg.spark.source;

import com.netflix.iceberg.Table;
import org.apache.hadoop.conf.Configuration;

import org.apache.spark.sql.sources.v2.DataSourceOptions;

public class TestIcebergSource extends IcebergSource {
Expand All @@ -29,7 +31,7 @@ public String shortName() {
}

@Override
protected Table findTable(DataSourceOptions options) {
protected Table findTable(DataSourceOptions options, Configuration conf) {
return TestTables.load(options.get("iceberg.table.name").get());
}
}