diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 86295d78cc13..76c6dc758265 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -91,12 +92,14 @@ *

The Iceberg table manages its partitions by itself. The partition of the Iceberg table is * independent of the partition of Flink. */ +@Internal public class FlinkCatalog extends AbstractCatalog { private final CatalogLoader catalogLoader; private final Catalog icebergCatalog; private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; + private final Map catalogProps; private final boolean cacheEnabled; public FlinkCatalog( @@ -104,10 +107,12 @@ public FlinkCatalog( String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, + Map catalogProps, boolean cacheEnabled, long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; + this.catalogProps = catalogProps; this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; @@ -332,7 +337,34 @@ public List listTables(String databaseName) public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table = loadIcebergTable(tablePath); - return toCatalogTable(table); + + // Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table. + // Inorder to create such table in non iceberg catalog, we need to send across catalog + // properties also. + // As Flink API accepts only Map for props, here we are serializing catalog + // props as json string to distinguish between catalog and table properties in createTable. + String srcCatalogProps = + FlinkCreateTableOptions.toJson( + getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps); + + Map tableProps = table.properties(); + if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY) + || tableProps.containsKey(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) { + throw new IllegalArgumentException( + String.format( + "Source table %s contains one/all of the reserved property keys: %s, %s.", + tablePath, + FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, + FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)); + } + + ImmutableMap.Builder mergedProps = ImmutableMap.builder(); + mergedProps.put( + FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + mergedProps.put(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + mergedProps.putAll(tableProps); + + return toCatalogTableWithProps(table, mergedProps.build()); } private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { @@ -384,13 +416,17 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { + // Creating Iceberg table using connector is allowed only when table is created using LIKE if (Objects.equals( - table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { + table.getOptions().get(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY), + FlinkDynamicTableFactory.FACTORY_IDENTIFIER) + && table.getOptions().get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY) == null) { throw new IllegalArgumentException( "Cannot create the table with 'connector'='iceberg' table property in " + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + "create table without 'connector'='iceberg' related properties in an iceberg table."); } + Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved"); createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists); } @@ -404,10 +440,14 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea ImmutableMap.Builder properties = ImmutableMap.builder(); String location = null; for (Map.Entry entry : table.getOptions().entrySet()) { - if ("location".equalsIgnoreCase(entry.getKey())) { - location = entry.getValue(); - } else { + if (!isReservedProperty(entry.getKey())) { properties.put(entry.getKey(), entry.getValue()); + } else { + // Filtering reserved properties like catalog properties(added to support CREATE TABLE LIKE + // in getTable()), location and not persisting on table properties. + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(entry.getKey())) { + location = entry.getValue(); + } } } @@ -421,6 +461,12 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea } } + private boolean isReservedProperty(String prop) { + return FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(prop) + || FlinkCreateTableOptions.CONNECTOR_PROPS_KEY.equalsIgnoreCase(prop) + || FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY.equalsIgnoreCase(prop); + } + private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) { TableSchema ts1 = ct1.getSchema(); TableSchema ts2 = ct2.getSchema(); @@ -501,7 +547,7 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean continue; } - if ("location".equalsIgnoreCase(key)) { + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(key)) { setLocation = value; } else if ("current-snapshot-id".equalsIgnoreCase(key)) { setSnapshotId = value; @@ -558,7 +604,7 @@ public void alterTable( if (change instanceof TableChange.SetOption) { TableChange.SetOption set = (TableChange.SetOption) change; - if ("location".equalsIgnoreCase(set.getKey())) { + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(set.getKey())) { setLocation = set.getValue(); } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) { setSnapshotId = set.getValue(); @@ -625,7 +671,7 @@ private static List toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - static CatalogTable toCatalogTable(Table table) { + static CatalogTable toCatalogTableWithProps(Table table, Map props) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); @@ -634,7 +680,11 @@ static CatalogTable toCatalogTable(Table table) { // CatalogTableImpl to copy a new catalog table. // Let's re-loading table from Iceberg catalog when creating source/sink operators. // Iceberg does not have Table comment, so pass a null (Default comment value in Flink). - return new CatalogTableImpl(schema, partitionKeys, table.properties(), null); + return new CatalogTableImpl(schema, partitionKeys, props, null); + } + + static CatalogTable toCatalogTable(Table table) { + return toCatalogTableWithProps(table, table.properties()); } @Override diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index fe4008a13ce5..dd065617bd88 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -168,6 +168,7 @@ protected Catalog createCatalog( defaultDatabase, baseNamespace, catalogLoader, + properties, cacheEnabled, cacheExpirationIntervalMs); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java new file mode 100644 index 000000000000..ab69ec5adc7f --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +class FlinkCreateTableOptions { + private final String catalogName; + private final String catalogDb; + private final String catalogTable; + private final Map catalogProps; + + private FlinkCreateTableOptions( + String catalogName, String catalogDb, String catalogTable, Map props) { + this.catalogName = catalogName; + this.catalogDb = catalogDb; + this.catalogTable = catalogTable; + this.catalogProps = props; + } + + public static final ConfigOption CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + public static final ConfigOption> CATALOG_PROPS = + ConfigOptions.key("catalog-props") + .mapType() + .noDefaultValue() + .withDescription("Properties for the underlying catalog for iceberg table."); + + public static final String SRC_CATALOG_PROPS_KEY = "src-catalog"; + public static final String CONNECTOR_PROPS_KEY = "connector"; + public static final String LOCATION_KEY = "location"; + + static String toJson( + String catalogName, String catalogDb, String catalogTable, Map catalogProps) { + return JsonUtil.generate( + gen -> { + gen.writeStartObject(); + gen.writeStringField(CATALOG_NAME.key(), catalogName); + gen.writeStringField(CATALOG_DATABASE.key(), catalogDb); + gen.writeStringField(CATALOG_TABLE.key(), catalogTable); + JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen); + gen.writeEndObject(); + }, + false); + } + + static FlinkCreateTableOptions fromJson(String createTableOptions) { + return JsonUtil.parse( + createTableOptions, + node -> { + String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node); + String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node); + String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node); + Map catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node); + + return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps); + }); + } + + String catalogName() { + return catalogName; + } + + String catalogDb() { + return catalogDb; + } + + String catalogTable() { + return catalogTable; + } + + Map catalogProps() { + return catalogProps; + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b7f1be4b93fb..a92848d7ccd1 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -18,10 +18,10 @@ */ package org.apache.iceberg.flink; +import java.util.Collections; import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -45,31 +45,6 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { static final String FACTORY_IDENTIFIER = "iceberg"; - - private static final ConfigOption CATALOG_NAME = - ConfigOptions.key("catalog-name") - .stringType() - .noDefaultValue() - .withDescription("Catalog name"); - - private static final ConfigOption CATALOG_TYPE = - ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) - .stringType() - .noDefaultValue() - .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); - - private static final ConfigOption CATALOG_DATABASE = - ConfigOptions.key("catalog-database") - .stringType() - .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) - .withDescription("Database name managed in the iceberg catalog."); - - private static final ConfigOption CATALOG_TABLE = - ConfigOptions.key("catalog-table") - .stringType() - .noDefaultValue() - .withDescription("Table name managed in the underlying iceberg catalog and database."); - private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -127,16 +102,16 @@ public DynamicTableSink createDynamicTableSink(Context context) { @Override public Set> requiredOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_TYPE); - options.add(CATALOG_NAME); + options.add(FlinkCreateTableOptions.CATALOG_TYPE); + options.add(FlinkCreateTableOptions.CATALOG_NAME); return options; } @Override public Set> optionalOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_DATABASE); - options.add(CATALOG_TABLE); + options.add(FlinkCreateTableOptions.CATALOG_DATABASE); + options.add(FlinkCreateTableOptions.CATALOG_TABLE); return options; } @@ -151,22 +126,28 @@ private static TableLoader createTableLoader( String databaseName, String tableName) { Configuration flinkConf = new Configuration(); - tableProps.forEach(flinkConf::setString); - String catalogName = flinkConf.getString(CATALOG_NAME); + Map mergedProps = mergeSrcCatalogProps(tableProps); + + mergedProps.forEach(flinkConf::setString); + + String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME); Preconditions.checkNotNull( - catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key()); + catalogName, + "Table property '%s' cannot be null", + FlinkCreateTableOptions.CATALOG_NAME.key()); - String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName); + String catalogDatabase = + flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName); Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null"); - String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); + String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName); Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); FlinkCatalogFactory factory = new FlinkCatalogFactory(); FlinkCatalog flinkCatalog = - (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf); + (FlinkCatalog) factory.createCatalog(catalogName, mergedProps, hadoopConf); ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable); // Create database if not exists in the external catalog. @@ -201,6 +182,42 @@ private static TableLoader createTableLoader( flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable)); } + /** + * Merges source catalog properties with connector properties. Iceberg Catalog properties are + * serialized as json in FlinkCatalog#getTable to be able to isolate catalog props from iceberg + * table props, Here, we flatten and merge them back to use to create catalog. + * + * @param tableProps the existing table properties + * @return a map of merged properties, with source catalog properties taking precedence when keys + * conflict + */ + private static Map mergeSrcCatalogProps(Map tableProps) { + String srcCatalogProps = tableProps.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY); + if (srcCatalogProps != null) { + Map mergedProps = Maps.newHashMap(); + FlinkCreateTableOptions createTableOptions = + FlinkCreateTableOptions.fromJson(srcCatalogProps); + + mergedProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), createTableOptions.catalogName()); + mergedProps.put( + FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb()); + mergedProps.put( + FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable()); + mergedProps.putAll(createTableOptions.catalogProps()); + + tableProps.forEach( + (k, v) -> { + if (!FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY.equals(k)) { + mergedProps.put(k, v); + } + }); + + return Collections.unmodifiableMap(mergedProps); + } + + return tableProps; + } + private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) { Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 04d7b8da6b9c..20076595fdde 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -188,6 +188,40 @@ public void testCreateTableLike() throws TableNotExistException { .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } + @TestTemplate + public void testCreateTableLikeInDiffIcebergCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + + String catalog2 = catalogName + "2"; + sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config)); + sql("CREATE DATABASE %s", catalog2 + ".testdb"); + sql("CREATE TABLE %s LIKE tl", catalog2 + ".testdb.tl2"); + + CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + + dropCatalog(catalog2, true); + } + + @TestTemplate + public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + + sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl"); + + CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + + String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", config); + Map options = catalogTable.getOptions(); + assertThat(options.get(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)) + .isEqualTo(FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + assertThat(options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) + .isEqualTo(srcCatalogProps); + } + @TestTemplate public void testCreateTableLocation() { assumeThat(isHadoopCatalog) @@ -660,10 +694,12 @@ private Table table(String name) { } private CatalogTable catalogTable(String name) throws TableNotExistException { + return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name); + } + + private CatalogTable catalogTable(String catalog, String database, String table) + throws TableNotExistException { return (CatalogTable) - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .getTable(new ObjectPath(DATABASE, name)); + getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table)); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 66bdeee1d407..c8f0b8172d45 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -162,4 +162,20 @@ public void testWatermarkOptionsDescending() throws Exception { expected, SCHEMA_TS); } + + @Test + public void testReadFlinkDynamicTable() throws Exception { + List expected = generateExpectedRecords(false); + SqlHelpers.sql( + getTableEnv(), + "create table `default_catalog`.`default_database`.flink_table LIKE iceberg_catalog.`default`.%s", + TestFixtures.TABLE); + + // Read from table in flink catalog + TestHelpers.assertRecords( + SqlHelpers.sql( + getTableEnv(), "select * from `default_catalog`.`default_database`.flink_table"), + expected, + SCHEMA_TS); + } }