Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -37,6 +38,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.SerializableConfiguration;

public class SparkUtil {
Expand All @@ -52,6 +54,12 @@ public class SparkUtil {
public static final String USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES =
"spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables";

private static final String SPARK_CATALOG_CONF_PREFIX = "spark.sql.catalog";
// Format string used as the prefix for spark configuration keys to override hadoop configuration values
// for Iceberg tables from a given catalog. These keys can be specified as `spark.sql.catalog.$catalogName.hadoop.*`,
// similar to using `spark.hadoop.*` to override hadoop configurations globally for a given spark session.
private static final String SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR = SPARK_CATALOG_CONF_PREFIX + ".%s.hadoop.";

private SparkUtil() {
}

Expand Down Expand Up @@ -170,4 +178,37 @@ public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionCo
return false;
}

/**
* Pulls any Catalog specific overrides for the Hadoop conf from the current SparkSession, which can be
* set via `spark.sql.catalog.$catalogName.hadoop.*`
*
* Mirrors the override of hadoop configurations for a given spark session using `spark.hadoop.*`.
*
* The SparkCatalog allows for hadoop configurations to be overridden per catalog, by setting
* them on the SQLConf, where the following will add the property "fs.default.name" with value
* "hdfs://hanksnamenode:8020" to the catalog's hadoop configuration.
* SparkSession.builder()
* .config(s"spark.sql.catalog.$catalogName.hadoop.fs.default.name", "hdfs://hanksnamenode:8020")
* .getOrCreate()
* @param spark The current Spark session
* @param catalogName Name of the catalog to find overrides for.
* @return the Hadoop Configuration that should be used for this catalog, with catalog specific overrides applied.
*/
public static Configuration hadoopConfCatalogOverrides(SparkSession spark, String catalogName) {
// Find keys for the catalog intended to be hadoop configurations
final String hadoopConfCatalogPrefix = hadoopConfPrefixForCatalog(catalogName);
final Configuration conf = spark.sessionState().newHadoopConf();
spark.sqlContext().conf().settings().forEach((k, v) -> {
// These checks are copied from `spark.sessionState().newHadoopConfWithOptions()`, which we
// avoid using to not have to convert back and forth between scala / java map types.
if (v != null && k != null && k.startsWith(hadoopConfCatalogPrefix)) {
conf.set(k.substring(hadoopConfCatalogPrefix.length()), v);
}
});
return conf;
}

private static String hadoopConfPrefixForCatalog(String catalogName) {
return String.format(SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR, catalogName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private static Catalog buildCatalog(Pair<SparkSession, String> sparkAndName) {
SparkSession spark = sparkAndName.first();
String name = sparkAndName.second();
SparkConf sparkConf = spark.sparkContext().getConf();
Configuration conf = spark.sessionState().newHadoopConf();
Configuration conf = SparkUtil.hadoopConfCatalogOverrides(spark, name);

String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name);
if (!name.equals(ICEBERG_DEFAULT_CATALOG) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class SparkCatalog extends BaseCatalog {
* @return an Iceberg catalog
*/
protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) {
Configuration conf = SparkSession.active().sessionState().newHadoopConf();
Configuration conf = SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name);
Map<String, String> optionsMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
optionsMap.putAll(options);
optionsMap.put(CatalogProperties.APP_ID, SparkSession.active().sparkContext().applicationId());
Expand Down Expand Up @@ -390,7 +390,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
SparkSession sparkSession = SparkSession.active();
this.useTimestampsWithoutZone = SparkUtil.useTimestampWithoutZoneInNewTables(sparkSession.conf());
this.tables = new HadoopTables(sparkSession.sessionState().newHadoopConf());
this.tables = new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name));
this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog;
if (catalog instanceof SupportsNamespaces) {
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.KryoHelpers;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;


public class TestSparkCatalogHadoopOverrides extends SparkCatalogTestBase {

private static final String configToOverride = "fs.s3a.buffer.dir";
// prepend "hadoop." so that the test base formats SQLConf correctly
// as `spark.sql.catalogs.<catalogName>.hadoop.<configToOverride>
private static final String hadoopPrefixedConfigToOverride = "hadoop." + configToOverride;
private static final String configOverrideValue = "/tmp-overridden";

@Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
public static Object[][] parameters() {
return new Object[][] {
{ "testhive", SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
hadoopPrefixedConfigToOverride, configOverrideValue
) },
{ "testhadoop", SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hadoop",
hadoopPrefixedConfigToOverride, configOverrideValue
) },
{ "spark_catalog", SparkSessionCatalog.class.getName(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
hadoopPrefixedConfigToOverride, configOverrideValue
) }
};
}

public TestSparkCatalogHadoopOverrides(String catalogName,
String implementation,
Map<String, String> config) {
super(catalogName, implementation, config);
}

@Before
public void createTable() {
sql("CREATE TABLE IF NOT EXISTS %s (id bigint) USING iceberg", tableName(tableIdent.name()));
}

@After
public void dropTable() {
sql("DROP TABLE IF EXISTS %s", tableName(tableIdent.name()));
}

@Test
public void testTableFromCatalogHasOverrides() throws Exception {
Table table = getIcebergTableFromSparkCatalog();
Configuration conf = ((Configurable) table.io()).getConf();
String actualCatalogOverride = conf.get(configToOverride, "/whammies");
Assert.assertEquals(
"Iceberg tables from spark should have the overridden hadoop configurations from the spark config",
configOverrideValue, actualCatalogOverride);
}

@Test
public void ensureRoundTripSerializedTableRetainsHadoopConfig() throws Exception {
Table table = getIcebergTableFromSparkCatalog();
Configuration originalConf = ((Configurable) table.io()).getConf();
String actualCatalogOverride = originalConf.get(configToOverride, "/whammies");
Assert.assertEquals(
"Iceberg tables from spark should have the overridden hadoop configurations from the spark config",
configOverrideValue, actualCatalogOverride);

// Now convert to SerializableTable and ensure overridden property is still present.
Table serializableTable = SerializableTable.copyOf(table);
Table kryoSerializedTable = KryoHelpers.roundTripSerialize(SerializableTable.copyOf(table));
Configuration configFromKryoSerde = ((Configurable) kryoSerializedTable.io()).getConf();
String kryoSerializedCatalogOverride = configFromKryoSerde.get(configToOverride, "/whammies");
Assert.assertEquals(
"Tables serialized with Kryo serialization should retain overridden hadoop configuration properties",
configOverrideValue, kryoSerializedCatalogOverride);

// Do the same for Java based serde
Table javaSerializedTable = TestHelpers.roundTripSerialize(serializableTable);
Configuration configFromJavaSerde = ((Configurable) javaSerializedTable.io()).getConf();
String javaSerializedCatalogOverride = configFromJavaSerde.get(configToOverride, "/whammies");
Assert.assertEquals(
"Tables serialized with Java serialization should retain overridden hadoop configuration properties",
configOverrideValue, javaSerializedCatalogOverride);
}

@SuppressWarnings("ThrowSpecificity")
private Table getIcebergTableFromSparkCatalog() throws Exception {
Identifier identifier = Identifier.of(tableIdent.namespace().levels(), tableIdent.name());
TableCatalog catalog = (TableCatalog) spark.sessionState().catalogManager().catalog(catalogName);
SparkTable sparkTable = (SparkTable) catalog.loadTable(identifier);
return sparkTable.table();
}
}