diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java index 2f5273e83929..670276282436 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java @@ -325,6 +325,19 @@ default TableBuilder buildTable(TableIdentifier identifier, Schema schema) { throw new UnsupportedOperationException(this.getClass().getName() + " does not implement buildTable"); } + /** + * Initialize a catalog given a custom name and a map of catalog properties. + *

+ * A custom Catalog implementation must have a no-arg constructor. + * A compute engine like Spark or Flink will first initialize the catalog without any arguments, + * and then call this method to complete catalog initialization with properties passed into the engine. + * + * @param name a custom name for the catalog + * @param properties catalog properties + */ + default void initialize(String name, Map properties) { + } + /** * A builder used to create valid {@link Table tables} or start create/replace {@link Transaction transactions}. *

diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java new file mode 100644 index 000000000000..fad7cb402484 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +public class CatalogProperties { + + private CatalogProperties() { + } + + public static final String CATALOG_IMPL = "catalog-impl"; +} diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index cb7f191dbe25..2d96d49e4f49 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -22,9 +22,14 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -117,4 +122,50 @@ private static void deleteFiles(FileIO io, Set allManifests) { } }); } + + /** + * Load a custom catalog implementation. + *

+ * The catalog must have a no-arg constructor. + * If the class implements {@link Configurable}, + * a Hadoop config will be passed using {@link Configurable#setConf(Configuration)}. + * {@link Catalog#initialize(String catalogName, Map options)} is called to complete the initialization. + * + * @param impl catalog implementation full class name + * @param catalogName catalog name + * @param properties catalog properties + * @param hadoopConf hadoop configuration if needed + * @return initialized catalog object + * @throws IllegalArgumentException if no-arg constructor not found or error during initialization + */ + public static Catalog loadCatalog( + String impl, + String catalogName, + Map properties, + Configuration hadoopConf) { + Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null"); + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize Catalog, missing no-arg constructor: %s", impl), e); + } + + Catalog catalog; + try { + catalog = ctor.newInstance(); + + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize Catalog, %s does not implement Catalog.", impl), e); + } + + if (catalog instanceof Configurable) { + ((Configurable) catalog).setConf(hadoopConf); + } + + catalog.initialize(catalogName, properties); + return catalog; + } } diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java new file mode 100644 index 000000000000..67729d19a437 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.Assert; +import org.junit.Test; + +public class TestCatalogUtil { + + @Test + public void loadCustomCatalog() { + Map options = new HashMap<>(); + options.put("key", "val"); + Configuration hadoopConf = new Configuration(); + String name = "custom"; + Catalog catalog = CatalogUtil.loadCatalog(TestCatalog.class.getName(), name, options, hadoopConf); + Assert.assertTrue(catalog instanceof TestCatalog); + Assert.assertEquals(name, ((TestCatalog) catalog).catalogName); + Assert.assertEquals(options, ((TestCatalog) catalog).flinkOptions); + } + + @Test + public void loadCustomCatalog_withHadoopConfig() { + Map options = new HashMap<>(); + options.put("key", "val"); + Configuration hadoopConf = new Configuration(); + hadoopConf.set("key", "val"); + String name = "custom"; + Catalog catalog = CatalogUtil.loadCatalog(TestCatalogConfigurable.class.getName(), name, options, hadoopConf); + Assert.assertTrue(catalog instanceof TestCatalogConfigurable); + Assert.assertEquals(name, ((TestCatalogConfigurable) catalog).catalogName); + Assert.assertEquals(options, ((TestCatalogConfigurable) catalog).flinkOptions); + Assert.assertEquals(hadoopConf, ((TestCatalogConfigurable) catalog).configuration); + } + + @Test + public void loadCustomCatalog_NoArgConstructorNotFound() { + Map options = new HashMap<>(); + options.put("key", "val"); + Configuration hadoopConf = new Configuration(); + String name = "custom"; + AssertHelpers.assertThrows("must have no-arg constructor", + IllegalArgumentException.class, + "missing no-arg constructor", + () -> CatalogUtil.loadCatalog(TestCatalogBadConstructor.class.getName(), name, options, hadoopConf)); + } + + @Test + public void loadCustomCatalog_NotImplementCatalog() { + Map options = new HashMap<>(); + options.put("key", "val"); + Configuration hadoopConf = new Configuration(); + String name = "custom"; + + AssertHelpers.assertThrows("must implement catalog", + IllegalArgumentException.class, + "does not implement Catalog", + () -> CatalogUtil.loadCatalog(TestCatalogNoInterface.class.getName(), name, options, hadoopConf)); + } + + public static class TestCatalog extends BaseMetastoreCatalog { + + private String catalogName; + private Map flinkOptions; + + public TestCatalog() { + } + + @Override + public void initialize(String name, Map properties) { + this.catalogName = name; + this.flinkOptions = properties; + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return null; + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return null; + } + + @Override + public List listTables(Namespace namespace) { + return null; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return false; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + + } + } + + public static class TestCatalogConfigurable extends BaseMetastoreCatalog implements Configurable { + + private String catalogName; + private Map flinkOptions; + private Configuration configuration; + + public TestCatalogConfigurable() { + } + + @Override + public void initialize(String name, Map properties) { + this.catalogName = name; + this.flinkOptions = properties; + } + + @Override + public void setConf(Configuration conf) { + this.configuration = conf; + } + + @Override + public Configuration getConf() { + return configuration; + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return null; + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return null; + } + + @Override + public List listTables(Namespace namespace) { + return null; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return false; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + + } + } + + public static class TestCatalogBadConstructor extends BaseMetastoreCatalog { + + public TestCatalogBadConstructor(String arg) { + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return null; + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return null; + } + + @Override + public List listTables(Namespace namespace) { + return null; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return false; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + + } + + @Override + public void initialize(String name, Map properties) { + } + } + + public static class TestCatalogNoInterface { + public TestCatalogNoInterface() { + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java index 4d0670cffa3a..982b3da44c16 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java +++ b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java @@ -20,12 +20,16 @@ package org.apache.iceberg.flink; import java.io.Serializable; +import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** * Serializable loader to load an Iceberg {@link Catalog}. @@ -49,6 +53,10 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, Str return new HiveCatalogLoader(name, hadoopConf, uri, warehouse, clientPoolSize); } + static CatalogLoader custom(String name, Map properties, Configuration hadoopConf, String impl) { + return new CustomCatalogLoader(name, properties, hadoopConf, impl); + } + class HadoopCatalogLoader implements CatalogLoader { private final String catalogName; private final SerializableConfiguration hadoopConf; @@ -105,4 +113,37 @@ public String toString() { .toString(); } } + + class CustomCatalogLoader implements CatalogLoader { + + private final SerializableConfiguration hadoopConf; + private final Map properties; + private final String name; + private final String impl; + + private CustomCatalogLoader( + String name, + Map properties, + Configuration conf, + String impl) { + this.hadoopConf = new SerializableConfiguration(conf); + this.properties = Maps.newHashMap(properties); // wrap into a hashmap for serialization + this.name = name; + this.impl = Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null"); + } + + @Override + public Catalog loadCatalog() { + return CatalogUtil.loadCatalog(impl, name, properties, hadoopConf.get()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("impl", impl) + .toString(); + } + } + } diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 9484a9664543..acaece5f19fb 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -23,6 +23,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.util.HadoopUtils; @@ -31,6 +32,7 @@ import org.apache.flink.table.factories.CatalogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -58,6 +60,9 @@ public class FlinkCatalogFactory implements CatalogFactory { // Can not just use "type", it conflicts with CATALOG_TYPE. public static final String ICEBERG_CATALOG_TYPE = "catalog-type"; + public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop"; + public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive"; + public static final String HIVE_URI = "uri"; public static final String HIVE_CLIENT_POOL_SIZE = "clients"; public static final String HIVE_CONF_DIR = "hive-conf-dir"; @@ -75,9 +80,14 @@ public class FlinkCatalogFactory implements CatalogFactory { * @return an Iceberg catalog loader */ protected CatalogLoader createCatalogLoader(String name, Map properties, Configuration hadoopConf) { - String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, "hive"); - switch (catalogType) { - case "hive": + String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL); + if (catalogImpl != null) { + return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl); + } + + String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE); + switch (catalogType.toLowerCase(Locale.ENGLISH)) { + case ICEBERG_CATALOG_TYPE_HIVE: // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in that case it will // fallback to parse those values from hadoop configuration which is loaded from classpath. String uri = properties.get(HIVE_URI); @@ -87,7 +97,7 @@ protected CatalogLoader createCatalogLoader(String name, Map pro Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir); return CatalogLoader.hive(name, newHadoopConf, uri, warehouse, clientPoolSize); - case "hadoop": + case ICEBERG_CATALOG_TYPE_HADOOP: String warehouseLocation = properties.get(WAREHOUSE_LOCATION); return CatalogLoader.hadoop(name, hadoopConf, warehouseLocation); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index e6887a14b4d0..0228eae705d5 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -20,10 +20,13 @@ package org.apache.iceberg.spark; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -78,6 +81,10 @@ public class SparkCatalog implements StagingTableCatalog, org.apache.spark.sql.connector.catalog.SupportsNamespaces { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); + public static final String ICEBERG_CATALOG_TYPE = "type"; + public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop"; + public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive"; + private String catalogName = null; private Catalog icebergCatalog = null; private boolean cacheEnabled = true; @@ -93,14 +100,20 @@ public class SparkCatalog implements StagingTableCatalog, org.apache.spark.sql.c */ protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) { Configuration conf = SparkSession.active().sessionState().newHadoopConf(); - String catalogType = options.getOrDefault("type", "hive"); - switch (catalogType) { - case "hive": + + String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL); + if (catalogImpl != null) { + return CatalogUtil.loadCatalog(catalogImpl, name, options, conf); + } + + String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE); + switch (catalogType.toLowerCase(Locale.ENGLISH)) { + case ICEBERG_CATALOG_TYPE_HIVE: int clientPoolSize = options.getInt("clients", 2); String uri = options.get("uri"); return new HiveCatalog(name, uri, clientPoolSize, conf); - case "hadoop": + case ICEBERG_CATALOG_TYPE_HADOOP: String warehouseLocation = options.get("warehouse"); return new HadoopCatalog(name, conf, warehouseLocation);