diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 86295d78cc13..081f81cb8450 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -97,6 +97,7 @@ public class FlinkCatalog extends AbstractCatalog { private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; + private final Map catalogProps; private final boolean cacheEnabled; public FlinkCatalog( @@ -104,10 +105,12 @@ public FlinkCatalog( String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, + Map catalogProps, boolean cacheEnabled, long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; + this.catalogProps = catalogProps; this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; @@ -332,7 +335,15 @@ public List listTables(String databaseName) public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table = loadIcebergTable(tablePath); - return toCatalogTable(table); + Map catalogAndTableProps = Maps.newHashMap(catalogProps); + catalogAndTableProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), getName()); + catalogAndTableProps.put( + FlinkCreateTableOptions.CATALOG_DATABASE.key(), tablePath.getDatabaseName()); + catalogAndTableProps.put( + FlinkCreateTableOptions.CATALOG_TABLE.key(), tablePath.getObjectName()); + catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + catalogAndTableProps.putAll(table.properties()); + return toCatalogTableWithProps(table, catalogAndTableProps); } private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { @@ -384,13 +395,6 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { - if (Objects.equals( - table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { - throw new IllegalArgumentException( - "Cannot create the table with 'connector'='iceberg' table property in " - + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " - + "create table without 'connector'='iceberg' related properties in an iceberg table."); - } Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved"); createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists); } @@ -625,7 +629,7 @@ private static List toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - static CatalogTable toCatalogTable(Table table) { + static CatalogTable toCatalogTableWithProps(Table table, Map props) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); @@ -634,7 +638,11 @@ static CatalogTable toCatalogTable(Table table) { // CatalogTableImpl to copy a new catalog table. // Let's re-loading table from Iceberg catalog when creating source/sink operators. // Iceberg does not have Table comment, so pass a null (Default comment value in Flink). - return new CatalogTableImpl(schema, partitionKeys, table.properties(), null); + return new CatalogTableImpl(schema, partitionKeys, props, null); + } + + static CatalogTable toCatalogTable(Table table) { + return toCatalogTableWithProps(table, table.properties()); } @Override diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index fe4008a13ce5..dd065617bd88 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -168,6 +168,7 @@ protected Catalog createCatalog( defaultDatabase, baseNamespace, catalogLoader, + properties, cacheEnabled, cacheExpirationIntervalMs); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java new file mode 100644 index 000000000000..f0df076abe31 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class FlinkCreateTableOptions { + + private FlinkCreateTableOptions() {} + + public static final ConfigOption CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b7f1be4b93fb..f49ab9c646c2 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -45,31 +44,6 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { static final String FACTORY_IDENTIFIER = "iceberg"; - - private static final ConfigOption CATALOG_NAME = - ConfigOptions.key("catalog-name") - .stringType() - .noDefaultValue() - .withDescription("Catalog name"); - - private static final ConfigOption CATALOG_TYPE = - ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) - .stringType() - .noDefaultValue() - .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); - - private static final ConfigOption CATALOG_DATABASE = - ConfigOptions.key("catalog-database") - .stringType() - .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) - .withDescription("Database name managed in the iceberg catalog."); - - private static final ConfigOption CATALOG_TABLE = - ConfigOptions.key("catalog-table") - .stringType() - .noDefaultValue() - .withDescription("Table name managed in the underlying iceberg catalog and database."); - private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -127,16 +101,16 @@ public DynamicTableSink createDynamicTableSink(Context context) { @Override public Set> requiredOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_TYPE); - options.add(CATALOG_NAME); + options.add(FlinkCreateTableOptions.CATALOG_TYPE); + options.add(FlinkCreateTableOptions.CATALOG_NAME); return options; } @Override public Set> optionalOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_DATABASE); - options.add(CATALOG_TABLE); + options.add(FlinkCreateTableOptions.CATALOG_DATABASE); + options.add(FlinkCreateTableOptions.CATALOG_TABLE); return options; } @@ -153,14 +127,17 @@ private static TableLoader createTableLoader( Configuration flinkConf = new Configuration(); tableProps.forEach(flinkConf::setString); - String catalogName = flinkConf.getString(CATALOG_NAME); + String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME); Preconditions.checkNotNull( - catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key()); + catalogName, + "Table property '%s' cannot be null", + FlinkCreateTableOptions.CATALOG_NAME.key()); - String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName); + String catalogDatabase = + flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName); Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null"); - String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); + String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName); Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 65adce77d9f9..1325b8d8dd70 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -35,6 +35,7 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; @@ -53,7 +54,8 @@ public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, - SupportsLimitPushDown { + SupportsLimitPushDown, + SupportsSourceWatermark { private int[] projectedFields; private Long limit; @@ -175,6 +177,14 @@ public Result applyFilters(List flinkFilters) { return Result.of(acceptedFilters, flinkFilters); } + @Override + public void applySourceWatermark() { + if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) { + throw new UnsupportedOperationException( + "Source watermarks are supported only in flip-27 iceberg source implementation"); + } + } + @Override public boolean supportsNestedProjection() { // TODO: support nested projection diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 04d7b8da6b9c..be5aa53dff75 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -188,6 +188,23 @@ public void testCreateTableLike() throws TableNotExistException { .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } + @TestTemplate + public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + + sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl"); + + CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + + Map options = catalogTable.getOptions(); + assertThat(options.entrySet().containsAll(config.entrySet())).isTrue(); + assertThat(options.get(FlinkCreateTableOptions.CATALOG_NAME.key())).isEqualTo(catalogName); + assertThat(options.get(FlinkCreateTableOptions.CATALOG_DATABASE.key())).isEqualTo(DATABASE); + assertThat(options.get(FlinkCreateTableOptions.CATALOG_TABLE.key())).isEqualTo("tl"); + } + @TestTemplate public void testCreateTableLocation() { assumeThat(isHadoopCatalog) @@ -660,10 +677,12 @@ private Table table(String name) { } private CatalogTable catalogTable(String name) throws TableNotExistException { + return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name); + } + + private CatalogTable catalogTable(String catalog, String database, String table) + throws TableNotExistException { return (CatalogTable) - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .getTable(new ObjectPath(DATABASE, name)); + getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table)); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java index 47f5485df879..525df0e3d136 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java @@ -256,43 +256,6 @@ public void testCatalogDatabaseConflictWithFlinkDatabase() { .hasMessageStartingWith("Could not execute CreateTable in path"); } - @TestTemplate - public void testConnectorTableInIcebergCatalog() { - // Create the catalog properties - Map catalogProps = Maps.newHashMap(); - catalogProps.put("type", "iceberg"); - if (isHiveCatalog()) { - catalogProps.put("catalog-type", "hive"); - catalogProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf)); - } else { - catalogProps.put("catalog-type", "hadoop"); - } - catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse()); - - // Create the table properties - Map tableProps = createTableProps(); - - // Create a connector table in an iceberg catalog. - sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps)); - try { - assertThatThrownBy( - () -> - sql( - "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s", - FlinkCatalogFactory.DEFAULT_DATABASE_NAME, - TABLE_NAME, - toWithClause(tableProps))) - .cause() - .isInstanceOf(IllegalArgumentException.class) - .hasMessage( - "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, " - + "Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " - + "create table without 'connector'='iceberg' related properties in an iceberg table."); - } finally { - sql("DROP CATALOG IF EXISTS `test_catalog`"); - } - } - private Map createTableProps() { Map tableProps = Maps.newHashMap(properties); tableProps.put("catalog-name", catalogName);