diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 2060dd0d439c..2cfb194bdf53 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -360,6 +360,17 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { + if (Objects.equals(table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { + throw new IllegalArgumentException("Cannot create the table with 'connector'='iceberg' table property in " + + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + + "create table without 'connector'='iceberg' related properties in an iceberg table."); + } + + createIcebergTable(tablePath, table, ignoreIfExists); + } + + void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws CatalogException, TableAlreadyExistException { validateFlinkTable(table); Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema()); diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index e95e2580361d..7ab23c53a4b7 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -66,6 +66,7 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String HIVE_CONF_DIR = "hive-conf-dir"; public static final String DEFAULT_DATABASE = "default-database"; + public static final String DEFAULT_DATABASE_NAME = "default"; public static final String BASE_NAMESPACE = "base-namespace"; public static final String CACHE_ENABLED = "cache-enabled"; @@ -77,7 +78,7 @@ public class FlinkCatalogFactory implements CatalogFactory { * @param hadoopConf Hadoop configuration for catalog * @return an Iceberg catalog loader */ - protected CatalogLoader createCatalogLoader(String name, Map properties, Configuration hadoopConf) { + static CatalogLoader createCatalogLoader(String name, Map properties, Configuration hadoopConf) { String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL); if (catalogImpl != null) { return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl); @@ -120,7 +121,7 @@ public Catalog createCatalog(String name, Map properties) { protected Catalog createCatalog(String name, Map properties, Configuration hadoopConf) { CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf); - String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, "default"); + String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME); Namespace baseNamespace = Namespace.empty(); if (properties.containsKey(BASE_NAMESPACE)) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b51b3e09df11..6b94dadace08 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -19,56 +19,170 @@ package org.apache.iceberg.flink; +import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { + static final String FACTORY_IDENTIFIER = "iceberg"; + + private static final ConfigOption CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + private static final ConfigOption CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + private static final ConfigOption CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + private static final ConfigOption CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + private final FlinkCatalog catalog; + public FlinkDynamicTableFactory() { + this.catalog = null; + } + public FlinkDynamicTableFactory(FlinkCatalog catalog) { this.catalog = catalog; } @Override public DynamicTableSource createDynamicTableSource(Context context) { - ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); - TableLoader tableLoader = createTableLoader(objectPath); + ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); + Map tableProps = context.getCatalogTable().getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - return new IcebergTableSource(tableLoader, tableSchema, context.getCatalogTable().getOptions(), - context.getConfiguration()); + + TableLoader tableLoader; + if (catalog != null) { + tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath()); + } else { + tableLoader = createTableLoader(catalogTable, tableProps, objectIdentifier.getDatabaseName(), + objectIdentifier.getObjectName()); + } + + return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration()); } @Override public DynamicTableSink createDynamicTableSink(Context context) { ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); - TableLoader tableLoader = createTableLoader(objectPath); + Map tableProps = context.getCatalogTable().getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + + TableLoader tableLoader; + if (catalog != null) { + tableLoader = createTableLoader(catalog, objectPath); + } else { + tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(), + objectPath.getObjectName()); + } + return new IcebergTableSink(tableLoader, tableSchema); } @Override public Set> requiredOptions() { - throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); + Set> options = Sets.newHashSet(); + options.add(CATALOG_TYPE); + options.add(CATALOG_NAME); + return options; } @Override public Set> optionalOptions() { - throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); + Set> options = Sets.newHashSet(); + options.add(CATALOG_DATABASE); + options.add(CATALOG_TABLE); + return options; } @Override public String factoryIdentifier() { - throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); + return FACTORY_IDENTIFIER; + } + + private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable, + Map tableProps, + String databaseName, + String tableName) { + Configuration flinkConf = new Configuration(); + tableProps.forEach(flinkConf::setString); + + String catalogName = flinkConf.getString(CATALOG_NAME); + Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key()); + + String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName); + Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null"); + + String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); + Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); + + org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); + FlinkCatalogFactory factory = new FlinkCatalogFactory(); + FlinkCatalog flinkCatalog = (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf); + ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable); + + // Create database if not exists in the external catalog. + if (!flinkCatalog.databaseExists(catalogDatabase)) { + try { + flinkCatalog.createDatabase(catalogDatabase, new CatalogDatabaseImpl(Maps.newHashMap(), null), true); + } catch (DatabaseAlreadyExistException e) { + throw new AlreadyExistsException(e, "Database %s already exists in the iceberg catalog %s.", catalogName, + catalogDatabase); + } + } + + // Create table if not exists in the external catalog. + if (!flinkCatalog.tableExists(objectPath)) { + try { + flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true); + } catch (TableAlreadyExistException e) { + throw new AlreadyExistsException(e, "Table %s already exists in the database %s and catalog %s", + catalogTable, catalogDatabase, catalogName); + } + } + + return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable)); } - private TableLoader createTableLoader(ObjectPath objectPath) { + private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) { + Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); } } diff --git a/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000000..be65d974fd5e --- /dev/null +++ b/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.flink.FlinkDynamicTableFactory \ No newline at end of file diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java new file mode 100644 index 000000000000..61cf3dd0e7ed --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestIcebergConnector extends FlinkTestBase { + + private static final String TABLE_NAME = "test_table"; + + @ClassRule + public static final TemporaryFolder WAREHOUSE = new TemporaryFolder(); + + private final String catalogName; + private final Map properties; + private final boolean isStreaming; + private volatile TableEnvironment tEnv; + + @Parameterized.Parameters(name = "catalogName = {0}, properties = {1}, isStreaming={2}") + public static Iterable parameters() { + return Lists.newArrayList( + // Create iceberg table in the hadoop catalog and default database. + new Object[] { + "testhadoop", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hadoop" + ), + true + }, + new Object[] { + "testhadoop", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hadoop", + "catalog-table", "not_existing_table" + ), + true + }, + new Object[] { + "testhadoop", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hadoop" + ), + false + }, + // Create iceberg table in the hadoop catalog and not_existing_db. + new Object[] { + "testhadoop", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hadoop", + "catalog-database", "not_existing_db" + ), + true + }, + new Object[] { + "testhadoop", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hadoop", + "catalog-database", "not_existing_db", + "catalog-table", "not_existing_table" + ), + true + }, + new Object[] { + "testhadoop", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hadoop", + "catalog-database", "not_existing_db" + ), + false + }, + // Create iceberg table in the hive catalog and default database. + new Object[] { + "testhive", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hive" + ), + true + }, + new Object[] { + "testhive", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hive", + "catalog-table", "not_existing_table" + ), + true + }, + new Object[] { + "testhive", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hive" + ), + false + }, + // Create iceberg table in the hive catalog and not_existing_db. + new Object[] { + "testhive", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hive", + "catalog-database", "not_existing_db" + ), + true + }, + new Object[] { + "testhive", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hive", + "catalog-database", "not_existing_db", + "catalog-table", "not_existing_table" + ), + true + }, + new Object[] { + "testhive", + ImmutableMap.of( + "connector", "iceberg", + "catalog-type", "hive", + "catalog-database", "not_existing_db" + ), + false + } + ); + } + + public TestIcebergConnector(String catalogName, Map properties, boolean isStreaming) { + this.catalogName = catalogName; + this.properties = properties; + this.isStreaming = isStreaming; + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + if (tEnv == null) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance() + .useBlinkPlanner(); + if (isStreaming) { + settingsBuilder.inStreamingMode(); + StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); + env.enableCheckpointing(400); + env.setMaxParallelism(2); + env.setParallelism(2); + tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + } else { + settingsBuilder.inBatchMode(); + tEnv = TableEnvironment.create(settingsBuilder.build()); + } + // Set only one parallelism. + tEnv.getConfig().getConfiguration() + .set(CoreOptions.DEFAULT_PARALLELISM, 1) + .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); + } + } + } + return tEnv; + } + + @After + public void after() throws TException { + sql("DROP TABLE IF EXISTS %s", TABLE_NAME); + + // Clean the created orphan databases and tables from hive-metastore. + if (isHiveCatalog()) { + HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf); + try { + metaStoreClient.dropTable(databaseName(), tableName()); + if (!isDefaultDatabaseName()) { + try { + metaStoreClient.dropDatabase(databaseName()); + } catch (Exception ignored) { + // Ignore + } + } + } finally { + metaStoreClient.close(); + } + } + } + + private void testCreateConnectorTable() { + Map tableProps = createTableProps(); + + // Create table under the flink's current database. + sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, toWithClause(tableProps)); + sql("INSERT INTO %s VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC')", TABLE_NAME); + Assert.assertEquals("Should have expected rows", + Lists.newArrayList(Row.of(1L, "AAA"), Row.of(2L, "BBB"), Row.of(3L, "CCC")), + sql("SELECT * FROM %s", TABLE_NAME)); + + FlinkCatalogFactory factory = new FlinkCatalogFactory(); + Catalog flinkCatalog = factory.createCatalog(catalogName, tableProps, new Configuration()); + Assert.assertTrue("Should have created the expected database", + flinkCatalog.databaseExists(databaseName())); + Assert.assertTrue("Should have created the expected table", + flinkCatalog.tableExists(new ObjectPath(databaseName(), tableName()))); + + // Drop and create it again. + sql("DROP TABLE %s", TABLE_NAME); + sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, toWithClause(tableProps)); + Assert.assertEquals("Should have expected rows", + Lists.newArrayList(Row.of(1L, "AAA"), Row.of(2L, "BBB"), Row.of(3L, "CCC")), + sql("SELECT * FROM %s", TABLE_NAME)); + } + + @Test + public void testCreateTableUnderDefaultDatabase() { + testCreateConnectorTable(); + } + + @Test + public void testCatalogDatabaseConflictWithFlinkDatabase() { + sql("CREATE DATABASE IF NOT EXISTS `%s`", databaseName()); + sql("USE `%s`", databaseName()); + + try { + testCreateConnectorTable(); + // Ensure that the table was created under the specific database. + AssertHelpers.assertThrows("Table should already exists", + ValidationException.class, + "Could not execute CreateTable in path", + () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME)); + } finally { + sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME); + if (!isDefaultDatabaseName()) { + sql("DROP DATABASE `%s`", databaseName()); + } + } + } + + @Test + public void testConnectorTableInIcebergCatalog() { + // Create the catalog properties + Map catalogProps = Maps.newHashMap(); + catalogProps.put("type", "iceberg"); + if (isHiveCatalog()) { + catalogProps.put("catalog-type", "hive"); + catalogProps.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf)); + } else { + catalogProps.put("catalog-type", "hadoop"); + } + catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse()); + + // Create the table properties + Map tableProps = createTableProps(); + + // Create a connector table in an iceberg catalog. + sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps)); + try { + AssertHelpers.assertThrowsCause("Cannot create the iceberg connector table in iceberg catalog", + IllegalArgumentException.class, + "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog", + () -> sql("CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s", + FlinkCatalogFactory.DEFAULT_DATABASE_NAME, + TABLE_NAME, + toWithClause(tableProps) + ) + ); + } finally { + sql("DROP CATALOG IF EXISTS `test_catalog`"); + } + } + + private Map createTableProps() { + Map tableProps = Maps.newHashMap(properties); + tableProps.put("catalog-name", catalogName); + tableProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse()); + if (isHiveCatalog()) { + tableProps.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf)); + } + return tableProps; + } + + private boolean isHiveCatalog() { + return "testhive".equalsIgnoreCase(catalogName); + } + + private boolean isDefaultDatabaseName() { + return FlinkCatalogFactory.DEFAULT_DATABASE_NAME.equalsIgnoreCase(databaseName()); + } + + private String tableName() { + return properties.getOrDefault("catalog-table", TABLE_NAME); + } + + private String databaseName() { + return properties.getOrDefault("catalog-database", "default_database"); + } + + private String toWithClause(Map props) { + return FlinkCatalogTestBase.toWithClause(props); + } + + private static String createWarehouse() { + try { + return String.format("file://%s", WAREHOUSE.newFolder().getAbsolutePath()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +}