diff --git a/api/src/test/java/org/apache/iceberg/AssertHelpers.java b/api/src/test/java/org/apache/iceberg/AssertHelpers.java
index 0172222524a3..0ce7575b2bc2 100644
--- a/api/src/test/java/org/apache/iceberg/AssertHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/AssertHelpers.java
@@ -92,6 +92,32 @@ public static void assertThrows(String message,
assertThrows(message, expected, null, runnable);
}
+ /**
+ * A convenience method to assert the cause of thrown exception.
+ * @param message A String message to describe this assertion
+ * @param expected An Exception class that the cause of the Runnable should throw
+ * @param containedInMessage A String that should be contained by the cause of the thrown
+ * exception's message
+ * @param runnable A Runnable that is expected to throw the runtime exception
+ */
+ public static void assertThrowsCause(String message,
+ Class extends Exception> expected,
+ String containedInMessage,
+ Runnable runnable) {
+ try {
+ runnable.run();
+ Assert.fail("No exception was thrown (" + message + "), expected: " +
+ expected.getName());
+ } catch (Exception actual) {
+ Throwable cause = actual.getCause();
+ if (cause instanceof Exception) {
+ handleException(message, expected, containedInMessage, (Exception) actual.getCause());
+ } else {
+ Assert.fail("Occur non-exception cause: " + cause);
+ }
+ }
+ }
+
private static void handleException(String message,
Class extends Exception> expected,
String containedInMessage,
diff --git a/build.gradle b/build.gradle
index 82933cfc475c..371893e2a166 100644
--- a/build.gradle
+++ b/build.gradle
@@ -234,6 +234,7 @@ project(':iceberg-flink') {
compile project(':iceberg-data')
compile project(':iceberg-orc')
compile project(':iceberg-parquet')
+ compile project(':iceberg-hive')
compileOnly "org.apache.flink:flink-streaming-java_2.12"
compileOnly "org.apache.flink:flink-streaming-java_2.12::tests"
@@ -248,13 +249,47 @@ project(':iceberg-flink') {
testCompile "org.apache.flink:flink-core"
testCompile "org.apache.flink:flink-runtime_2.12"
+ testCompile "org.apache.flink:flink-table-planner-blink_2.12"
testCompile "org.apache.flink:flink-test-utils-junit"
testCompile("org.apache.flink:flink-test-utils_2.12") {
exclude group: "org.apache.curator", module: 'curator-test'
}
+ testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
+
+ // By default, hive-exec is a fat/uber jar and it exports a guava library
+ // that's really old. We use the core classifier to be able to override our guava
+ // version. Luckily, hive-exec seems to work okay so far with this version of guava
+ // See: https://github.com/apache/hive/blob/master/ql/pom.xml#L911 for more context.
+ testCompile("org.apache.hive:hive-exec::core") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ exclude group: 'org.pentaho' // missing dependency
+ exclude group: 'org.apache.hive', module: 'hive-llap-tez'
+ exclude group: 'org.apache.logging.log4j'
+ exclude group: 'com.google.protobuf', module: 'protobuf-java'
+ exclude group: 'org.apache.calcite'
+ exclude group: 'org.apache.calcite.avatica'
+ exclude group: 'com.google.code.findbugs', module: 'jsr305'
+ }
+
+ testCompile("org.apache.hive:hive-metastore") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ exclude group: 'org.pentaho' // missing dependency
+ exclude group: 'org.apache.hbase'
+ exclude group: 'org.apache.logging.log4j'
+ exclude group: 'co.cask.tephra'
+ exclude group: 'com.google.code.findbugs', module: 'jsr305'
+ exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all'
+ exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet'
+ exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle'
+ exclude group: 'com.tdunning', module: 'json'
+ exclude group: 'javax.transaction', module: 'transaction-api'
+ exclude group: 'com.zaxxer', module: 'HikariCP'
+ }
}
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
new file mode 100644
index 000000000000..eea4c07d1da0
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+/**
+ * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
+ *
+ * The mapping between Flink database and Iceberg namespace:
+ * Supplying a base namespace for a given catalog, so if you have a catalog that supports a 2-level namespace, you
+ * would supply the first level in the catalog configuration and the second level would be exposed as Flink databases.
+ *
+ * The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the
+ * partition of Flink.
+ */
+public class FlinkCatalog extends AbstractCatalog {
+
+ private final Catalog originalCatalog;
+ private final Catalog icebergCatalog;
+ private final String[] baseNamespace;
+ private final SupportsNamespaces asNamespaceCatalog;
+
+ public FlinkCatalog(
+ String catalogName,
+ String defaultDatabase,
+ String[] baseNamespace,
+ Catalog icebergCatalog,
+ boolean cacheEnabled) {
+ super(catalogName, defaultDatabase);
+ this.originalCatalog = icebergCatalog;
+ this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
+ this.baseNamespace = baseNamespace;
+ if (icebergCatalog instanceof SupportsNamespaces) {
+ asNamespaceCatalog = (SupportsNamespaces) icebergCatalog;
+ } else {
+ asNamespaceCatalog = null;
+ }
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (originalCatalog instanceof Closeable) {
+ try {
+ ((Closeable) originalCatalog).close();
+ } catch (IOException e) {
+ throw new CatalogException(e);
+ }
+ }
+ }
+
+ private Namespace toNamespace(String database) {
+ String[] namespace = new String[baseNamespace.length + 1];
+ System.arraycopy(baseNamespace, 0, namespace, 0, baseNamespace.length);
+ namespace[baseNamespace.length] = database;
+ return Namespace.of(namespace);
+ }
+
+ private TableIdentifier toIdentifier(ObjectPath path) {
+ return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName());
+ }
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ if (asNamespaceCatalog == null) {
+ return Collections.singletonList(getDefaultDatabase());
+ }
+
+ return asNamespaceCatalog.listNamespaces(Namespace.of(baseNamespace)).stream()
+ .map(n -> n.level(n.levels().length - 1))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog == null) {
+ if (!getDefaultDatabase().equals(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } else {
+ return new CatalogDatabaseImpl(Maps.newHashMap(), "");
+ }
+ } else {
+ try {
+ Map metadata =
+ Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
+ String comment = metadata.remove("comment");
+ return new CatalogDatabaseImpl(metadata, comment);
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName, e);
+ }
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ try {
+ getDatabase(databaseName);
+ return true;
+ } catch (DatabaseNotExistException ignore) {
+ return false;
+ }
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ asNamespaceCatalog.createNamespace(
+ toNamespace(name),
+ mergeComment(database.getProperties(), database.getComment()));
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(getName(), name, e);
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException("Namespaces are not supported by catalog: " + getName());
+ }
+ }
+
+ private Map mergeComment(Map metadata, String comment) {
+ Map ret = Maps.newHashMap(metadata);
+ if (metadata.containsKey("comment")) {
+ throw new CatalogException("Database properties should not contain key: 'comment'.");
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ ret.put("comment", comment);
+ }
+ return ret;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ boolean success = asNamespaceCatalog.dropNamespace(toNamespace(name));
+ if (!success && !ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ } catch (NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ } catch (NamespaceNotEmptyException e) {
+ throw new DatabaseNotEmptyException(getName(), name, e);
+ }
+ } else {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ Namespace namespace = toNamespace(name);
+ Map updates = Maps.newHashMap();
+ Set removals = Sets.newHashSet();
+
+ try {
+ Map oldOptions = asNamespaceCatalog.loadNamespaceMetadata(namespace);
+ Map newOptions = mergeComment(newDatabase.getProperties(), newDatabase.getComment());
+
+ for (String key : oldOptions.keySet()) {
+ if (!newOptions.containsKey(key)) {
+ removals.add(key);
+ }
+ }
+
+ for (Map.Entry entry : newOptions.entrySet()) {
+ if (!entry.getValue().equals(oldOptions.get(entry.getKey()))) {
+ updates.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (!updates.isEmpty()) {
+ asNamespaceCatalog.setProperties(namespace, updates);
+ }
+
+ if (!removals.isEmpty()) {
+ asNamespaceCatalog.removeProperties(namespace, removals);
+ }
+
+ } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ }
+ } else {
+ if (getDefaultDatabase().equals(name)) {
+ throw new CatalogException(
+ "Can not alter the default database when the iceberg catalog doesn't support namespaces.");
+ }
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ try {
+ return icebergCatalog.listTables(toNamespace(databaseName)).stream()
+ .map(TableIdentifier::name)
+ .collect(Collectors.toList());
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName, e);
+ }
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ try {
+ Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+ TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+
+ // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new
+ // catalog table.
+ // Let's re-loading table from Iceberg catalog when creating source/sink operators.
+ return new CatalogTableImpl(tableSchema, table.properties(), null);
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return icebergCatalog.tableExists(toIdentifier(tablePath));
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ icebergCatalog.dropTable(toIdentifier(tablePath));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ try {
+ icebergCatalog.renameTable(
+ toIdentifier(tablePath),
+ toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ }
+
+ /**
+ * TODO Add partitioning to the Flink DDL parser.
+ */
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException("Not support createTable now.");
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException("Not support alterTable now.");
+ }
+
+ // ------------------------------ Unsupported methods ---------------------------------------------
+
+ @Override
+ public List listViews(String databaseName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition,
+ boolean ignoreIfExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listFunctions(String dbName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listPartitions(ObjectPath tablePath)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listPartitionsByFilter(ObjectPath tablePath, List filters)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // After partition pruning and filter push down, the statistics have become very inaccurate, so the statistics from
+ // here are of little significance.
+ // Flink will support something like SupportsReportStatistics in future.
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
new file mode 100644
index 000000000000..59f4e8201d8b
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
+ *
+ * This supports the following catalog configuration options:
+ *
+ *
type - Flink catalog factory key, should be "iceberg"
+ *
catalog-type - iceberg catalog type, "hive" or "hadoop"
+ *
uri - the Hive Metastore URI (Hive catalog only)
+ *
clients - the Hive Client Pool Size (Hive catalog only)
+ *
warehouse - the warehouse path (Hadoop catalog only)
+ *
default-database - a database name to use as the default
+ *
base-namespace - a base namespace as the prefix for all databases (Hadoop catalog only)
+ *
+ *
+ * To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
+ * {@link #buildIcebergCatalog(String, Map)}.
+ */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+ // Can not just use "type", it conflicts with CATALOG_TYPE.
+ public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+ public static final String HIVE_URI = "uri";
+ public static final String HIVE_CLIENT_POOL_SIZE = "clients";
+ public static final String HADOOP_WAREHOUSE_LOCATION = "warehouse";
+
+ public static final String DEFAULT_DATABASE = "default-database";
+ public static final String BASE_NAMESPACE = "base-namespace";
+
+ /**
+ * Build an Iceberg {@link org.apache.iceberg.catalog.Catalog} to be used by this Flink catalog adapter.
+ *
+ * @param name Flink's catalog name
+ * @param options Flink's catalog options
+ * @return an Iceberg catalog
+ */
+ protected org.apache.iceberg.catalog.Catalog buildIcebergCatalog(String name, Map options) {
+ Configuration conf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+ return buildIcebergCatalog(name, options, conf);
+ }
+
+ /**
+ * Build an Iceberg {@link org.apache.iceberg.catalog.Catalog} to be used by this Flink catalog adapter.
+ *
+ * @param name Flink's catalog name
+ * @param options Flink's catalog options
+ * @param conf Flink's hadoop configuration
+ * @return an Iceberg catalog
+ */
+ protected org.apache.iceberg.catalog.Catalog buildIcebergCatalog(
+ String name, Map options, Configuration conf) {
+ String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, "hive");
+ switch (catalogType) {
+ case "hive":
+ int clientPoolSize = Integer.parseInt(options.getOrDefault(HIVE_CLIENT_POOL_SIZE, "2"));
+ String uri = options.get(HIVE_URI);
+ return new HiveCatalog(name, uri, clientPoolSize, conf);
+
+ case "hadoop":
+ String warehouseLocation = options.get(HADOOP_WAREHOUSE_LOCATION);
+ return new HadoopCatalog(name, conf, warehouseLocation);
+
+ default:
+ throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
+ }
+ }
+
+ @Override
+ public Map requiredContext() {
+ Map context = Maps.newHashMap();
+ context.put(CatalogDescriptorValidator.CATALOG_TYPE, "iceberg");
+ context.put(CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION, "1");
+ return context;
+ }
+
+ @Override
+ public List supportedProperties() {
+ List properties = Lists.newArrayList();
+ properties.add(ICEBERG_CATALOG_TYPE);
+ properties.add(HIVE_URI);
+ properties.add(HIVE_CLIENT_POOL_SIZE);
+ properties.add(HADOOP_WAREHOUSE_LOCATION);
+ properties.add(DEFAULT_DATABASE);
+ properties.add(BASE_NAMESPACE);
+ return properties;
+ }
+
+ @Override
+ public Catalog createCatalog(String name, Map properties) {
+ org.apache.iceberg.catalog.Catalog catalog = buildIcebergCatalog(name, properties);
+ String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, "default");
+ String[] baseNamespace = properties.containsKey(BASE_NAMESPACE) ?
+ Splitter.on('.').splitToList(properties.get(BASE_NAMESPACE)).toArray(new String[0]) :
+ new String[0];
+ boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault("cache-enabled", "true"));
+ return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalog, cacheEnabled);
+ }
+}
diff --git a/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 000000000000..2b6bfa3cd579
--- /dev/null
+++ b/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.iceberg.flink.FlinkCatalogFactory
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
new file mode 100644
index 000000000000..387b927ac875
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.ArrayUtils;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class FlinkCatalogTestBase extends FlinkTestBase {
+
+ protected static final String DATABASE = "db";
+ private static File warehouse = null;
+
+ @BeforeClass
+ public static void createWarehouse() throws IOException {
+ FlinkCatalogTestBase.warehouse = File.createTempFile("warehouse", null);
+ Assert.assertTrue(warehouse.delete());
+ }
+
+ @AfterClass
+ public static void dropWarehouse() {
+ if (warehouse != null && warehouse.exists()) {
+ warehouse.delete();
+ }
+ }
+
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] { "testhive", new String[0] },
+ new Object[] { "testhadoop", new String[0] },
+ new Object[] { "testhadoop_basenamespace", new String[] { "l0", "l1" }},
+ };
+ }
+
+ protected final TableEnvironment tEnv =
+ TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
+
+ protected final String catalogName;
+ protected final String[] baseNamespace;
+ protected final Catalog validationCatalog;
+ protected final SupportsNamespaces validationNamespaceCatalog;
+
+ protected final String flinkDatabase;
+ protected final Namespace icebergNamespace;
+ protected final boolean isHadoopCatalog;
+
+ public FlinkCatalogTestBase(String catalogName, String[] baseNamespace) {
+ this.catalogName = catalogName;
+ this.baseNamespace = baseNamespace;
+ this.isHadoopCatalog = catalogName.startsWith("testhadoop");
+ this.validationCatalog = isHadoopCatalog ?
+ new HadoopCatalog(hiveConf, "file:" + warehouse) :
+ catalog;
+ this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
+
+ Map config = Maps.newHashMap();
+ config.put("type", "iceberg");
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, isHadoopCatalog ? "hadoop" : "hive");
+ config.put(FlinkCatalogFactory.HADOOP_WAREHOUSE_LOCATION, "file:" + warehouse);
+ if (baseNamespace.length > 0) {
+ config.put(FlinkCatalogFactory.BASE_NAMESPACE, Joiner.on(".").join(baseNamespace));
+ }
+
+ FlinkCatalogFactory factory = new FlinkCatalogFactory() {
+ @Override
+ protected Catalog buildIcebergCatalog(String name, Map options) {
+ // Flink hadoop configuration depends on system env, it is quiet hard to set from testing. So directly pass
+ // correct hadoop configuration.
+ return super.buildIcebergCatalog(name, options, hiveConf);
+ }
+ };
+ tEnv.registerCatalog(
+ catalogName,
+ flinkCatalogs.computeIfAbsent(catalogName, k -> factory.createCatalog(k, config)));
+
+ this.flinkDatabase = catalogName + "." + DATABASE;
+ this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace, new String[] { DATABASE }));
+ }
+
+ public List