From 0adbea534e78015efccc7b481bfe0d25a610a31b Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Wed, 7 Jul 2021 14:43:50 -0700 Subject: [PATCH 1/2] [SPARK] Allow spark catalogs to have hadoop configuration overrides per catalog --- .../org/apache/iceberg/spark/SparkUtil.java | 31 ++++ .../iceberg/spark/source/CustomCatalogs.java | 2 +- .../apache/iceberg/spark/SparkCatalog.java | 4 +- .../TestSparkCatalogHadoopOverrides.java | 133 ++++++++++++++++++ 4 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 4d5c3ec9e48e..72f47f05f84b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -24,6 +24,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -37,6 +38,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.spark.sql.RuntimeConfig; +import org.apache.spark.sql.SparkSession; import org.apache.spark.util.SerializableConfiguration; public class SparkUtil { @@ -52,6 +54,8 @@ public class SparkUtil { public static final String USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES = "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables"; + private static final String SPARK_CATALOG_CONF_PREFIX = "spark.sql.catalog"; + private SparkUtil() { } @@ -170,4 +174,31 @@ public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionCo return false; } + /** + * Pulls any Catalog specific overrides for the Hadoop conf from the current SparkSession, which can be + * set via spark.sql.catalog.$catalogName.hadoop.* + * + * The SparkCatalog allows for hadoop configurations to be overridden per catalog, by setting + * them on the SQLConf, where the following will add the property "fs.default.name" with value + * "hdfs://hanksnamenode:8020" to the catalog's hadoop configuration. + * SparkSession.builder() + * .config(s"spark.sql.catalog.$catalogName.hadoop.fs.default.name", "hdfs://hanksnamenode:8020") + * .getOrCreate() + * @param spark The current Spark session + * @param catalogName Name of the catalog to find overrides for. + * @return the Hadoop Configuration that should be used for this catalog, with catalog specific overrides applied. + */ + public static Configuration hadoopConfCatalogOverrides(SparkSession spark, String catalogName) { + // Find keys for the catalog intended to be hadoop configurations + final String hadoopConfCatalogPrefix = String.format("%s.%s.%s", SPARK_CATALOG_CONF_PREFIX, catalogName, "hadoop."); + final Configuration conf = spark.sessionState().newHadoopConf(); + spark.sqlContext().conf().settings().forEach((k, v) -> { + // These checks are copied from `spark.sessionState().newHadoopConfWithOptions()`, which we + // avoid using to not have to convert back and forth between scala / java map types. + if (v != null && k != null && k.startsWith(hadoopConfCatalogPrefix)) { + conf.set(k.substring(hadoopConfCatalogPrefix.length()), v); + } + }); + return conf; + } } diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java b/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java index 54e174834301..44f2851fd258 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java @@ -68,7 +68,7 @@ private static Catalog buildCatalog(Pair sparkAndName) { SparkSession spark = sparkAndName.first(); String name = sparkAndName.second(); SparkConf sparkConf = spark.sparkContext().getConf(); - Configuration conf = spark.sessionState().newHadoopConf(); + Configuration conf = SparkUtil.hadoopConfCatalogOverrides(spark, name); String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name); if (!name.equals(ICEBERG_DEFAULT_CATALOG) && diff --git a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index d099291fc353..8da4c427d115 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -97,7 +97,7 @@ public class SparkCatalog extends BaseCatalog { * @return an Iceberg catalog */ protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) { - Configuration conf = SparkSession.active().sessionState().newHadoopConf(); + Configuration conf = SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name); Map optionsMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); optionsMap.putAll(options); optionsMap.put(CatalogProperties.APP_ID, SparkSession.active().sparkContext().applicationId()); @@ -390,7 +390,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; SparkSession sparkSession = SparkSession.active(); this.useTimestampsWithoutZone = SparkUtil.useTimestampWithoutZoneInNewTables(sparkSession.conf()); - this.tables = new HadoopTables(sparkSession.sessionState().newHadoopConf()); + this.tables = new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name)); this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog; if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java new file mode 100644 index 000000000000..fba6c2686096 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.KryoHelpers; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + + +public class TestSparkCatalogHadoopOverrides extends SparkCatalogTestBase { + + private static final String configToOverride = "fs.s3a.buffer.dir"; + // prepend "hadoop." so that the test base formats SQLConf correctly + // as `spark.sql.catalogs..hadoop. + private static final String hadoopPrefixedConfigToOverride = "hadoop." + configToOverride; + private static final String configOverrideValue = "/tmp-overridden"; + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { "testhive", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + hadoopPrefixedConfigToOverride, configOverrideValue + ) }, + { "testhadoop", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hadoop", + hadoopPrefixedConfigToOverride, configOverrideValue + ) }, + { "spark_catalog", SparkSessionCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + hadoopPrefixedConfigToOverride, configOverrideValue + ) } + }; + } + + public TestSparkCatalogHadoopOverrides(String catalogName, + String implementation, + Map config) { + super(catalogName, implementation, config); + } + + @Before + public void createTable() { + sql("CREATE TABLE IF NOT EXISTS %s (id bigint) USING iceberg", tableName(tableIdent.name())); + } + + @After + public void dropTable() { + sql("DROP TABLE IF EXISTS %s", tableName(tableIdent.name())); + } + + @Test + public void testTableFromCatalogHasOverrides() throws Exception { + Table table = getIcebergTableFromSparkCatalog(); + Configuration conf = ((Configurable) table.io()).getConf(); + String actualCatalogOverride = conf.get(configToOverride, "/whammies"); + Assert.assertEquals( + "Iceberg tables from spark should have the overridden hadoop configurations from the spark config", + configOverrideValue, actualCatalogOverride); + } + + @Test + public void ensureRoundTripSerializedTableRetainsHadoopConfig() throws Exception { + Table table = getIcebergTableFromSparkCatalog(); + Configuration originalConf = ((Configurable) table.io()).getConf(); + String actualCatalogOverride = originalConf.get(configToOverride, "/whammies"); + Assert.assertEquals( + "Iceberg tables from spark should have the overridden hadoop configurations from the spark config", + configOverrideValue, actualCatalogOverride); + + // Now convert to SerializableTable and ensure overridden property is still present. + Table serializableTable = SerializableTable.copyOf(table); + Table kryoSerializedTable = KryoHelpers.roundTripSerialize(SerializableTable.copyOf(table)); + Configuration configFromKryoSerde = ((Configurable) kryoSerializedTable.io()).getConf(); + String kryoSerializedCatalogOverride = configFromKryoSerde.get(configToOverride, "/whammies"); + Assert.assertEquals( + "Tables serialized with Kryo serialization should retain overridden hadoop configuration properties", + configOverrideValue, kryoSerializedCatalogOverride); + + // Do the same for Java based serde + Table javaSerializedTable = TestHelpers.roundTripSerialize(serializableTable); + Configuration configFromJavaSerde = ((Configurable) javaSerializedTable.io()).getConf(); + String javaSerializedCatalogOverride = configFromJavaSerde.get(configToOverride, "/whammies"); + Assert.assertEquals( + "Tables serialized with Java serialization should retain overridden hadoop configuration properties", + configOverrideValue, javaSerializedCatalogOverride); + } + + @SuppressWarnings("ThrowSpecificity") + private Table getIcebergTableFromSparkCatalog() throws Exception { + Identifier identifier = Identifier.of(tableIdent.namespace().levels(), tableIdent.name()); + TableCatalog catalog = (TableCatalog) spark.sessionState().catalogManager().catalog(catalogName); + SparkTable sparkTable = (SparkTable) catalog.loadTable(identifier); + return sparkTable.table(); + } +} From 34f47711df7f291b3722540bea7d28eb4530922a Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Fri, 16 Jul 2021 15:32:45 -0700 Subject: [PATCH 2/2] Avoid using hardcoded hadoop string, switch to private static final format string per PR feedback --- .../java/org/apache/iceberg/spark/SparkUtil.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 72f47f05f84b..a656ae67dca0 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -55,6 +55,10 @@ public class SparkUtil { "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables"; private static final String SPARK_CATALOG_CONF_PREFIX = "spark.sql.catalog"; + // Format string used as the prefix for spark configuration keys to override hadoop configuration values + // for Iceberg tables from a given catalog. These keys can be specified as `spark.sql.catalog.$catalogName.hadoop.*`, + // similar to using `spark.hadoop.*` to override hadoop configurations globally for a given spark session. + private static final String SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR = SPARK_CATALOG_CONF_PREFIX + ".%s.hadoop."; private SparkUtil() { } @@ -176,7 +180,9 @@ public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionCo /** * Pulls any Catalog specific overrides for the Hadoop conf from the current SparkSession, which can be - * set via spark.sql.catalog.$catalogName.hadoop.* + * set via `spark.sql.catalog.$catalogName.hadoop.*` + * + * Mirrors the override of hadoop configurations for a given spark session using `spark.hadoop.*`. * * The SparkCatalog allows for hadoop configurations to be overridden per catalog, by setting * them on the SQLConf, where the following will add the property "fs.default.name" with value @@ -190,7 +196,7 @@ public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionCo */ public static Configuration hadoopConfCatalogOverrides(SparkSession spark, String catalogName) { // Find keys for the catalog intended to be hadoop configurations - final String hadoopConfCatalogPrefix = String.format("%s.%s.%s", SPARK_CATALOG_CONF_PREFIX, catalogName, "hadoop."); + final String hadoopConfCatalogPrefix = hadoopConfPrefixForCatalog(catalogName); final Configuration conf = spark.sessionState().newHadoopConf(); spark.sqlContext().conf().settings().forEach((k, v) -> { // These checks are copied from `spark.sessionState().newHadoopConfWithOptions()`, which we @@ -201,4 +207,8 @@ public static Configuration hadoopConfCatalogOverrides(SparkSession spark, Strin }); return conf; } + + private static String hadoopConfPrefixForCatalog(String catalogName) { + return String.format(SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR, catalogName); + } }