Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,20 @@ public class FlinkCatalog extends AbstractCatalog {
private final Namespace baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final Map<String, String> catalogProps;
private final boolean cacheEnabled;

public FlinkCatalog(
String catalogName,
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down Expand Up @@ -332,7 +335,15 @@ public List<String> listTables(String databaseName)
public CatalogTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
Table table = loadIcebergTable(tablePath);
return toCatalogTable(table);
Map<String, String> catalogAndTableProps = Maps.newHashMap(catalogProps);
catalogAndTableProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), getName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_DATABASE.key(), tablePath.getDatabaseName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_TABLE.key(), tablePath.getObjectName());
catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);
catalogAndTableProps.putAll(table.properties());
return toCatalogTableWithProps(table, catalogAndTableProps);
}

private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
Expand Down Expand Up @@ -384,13 +395,6 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws CatalogException, TableAlreadyExistException {
if (Objects.equals(
table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
throw new IllegalArgumentException(
"Cannot create the table with 'connector'='iceberg' table property in "
+ "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
}
Comment on lines -387 to -393
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this check?

Copy link
Contributor Author

@swapna267 swapna267 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tables can be created using LIKE in

  1. Flink Catalog - Not supported currently.
  2. Another table in Iceberg catalog itself as detailed in doc

This check basically fails, if we try to create table using LIKE in Iceberg catalog, basically case#2 if we have connector=iceberg in options . For example, DDL like below,

CREATE TABLE  `hive_catalog`.`default`.`sample_like` 
LIKE `hive_catalog`.`default`.`sample`
WITH ('connector'='iceberg')

In order to support Case#1 without user setting any extra Options using WITH clause, we need to add connector in getTable,

catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);

This check was added in very old PR,
#2666
#2666 (comment) where Flink SQL didn't support CREATE TABLE A LIKE B , where A and B are in different Catalogs.

So, in this case by removing this check, we are ignoring connector option being passed, so following DDL can create table table_like in Flink catalog backed by iceberg_catalog.db.table. As we know source table is an Iceberg table, adding connector=iceberg would be redundant.

CREATE TABLE table_like (
      eventTS AS CAST(t1 AS TIMESTAMP(3)),
) LIKE iceberg_catalog.db.table;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the source table is not an Iceberg table?
I'm trying to understand here, where we get the missing information in this case, and wether we have a way to check that we actually get the missing information. If we can create such a check, then we can still throw an exception when we don't get this information from any source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createTable in this catalog is called only when Source table is an Iceberg Table. Currently the Catalog information comes when Catalog is created.

Following are the scenarios when getTable / createTable methods in this Catalog are being used.

  1. Create Iceberg table in Iceberg catalog -> Only createTable is called, where Catalog instance has all catalog related info.
  2. Create table in iceberg catalog catalog1 like table in iceberg catalog catalog1 -> getTable() sets schema/partitioning info , which is used to create the table with same schema/partitioning as source in Catalog1.
  3. Create table in iceberg catalog catalog2 like table in iceberg catalog catalog1 -> getTable() sets schema/partitioning info , which is used to create the table with same schema/partitioning as source in Catalog2.
  4. Create table in Flink catalog like table in iceberg catalog -> getTable() is only called to get the source table info and createTable called on Flink catalog, where connector/iceberg catalog properties are being used to instantiate FlinkDynamicTableFactory.

When createTable is invoked, currently there is no easy way to differentiate between Case 2) / Case 3) / Case 4) Or user is doing

CREATE TABLE  `hive_catalog`.`default`.`sample_like` 
WITH ('connector'='iceberg', 'catalog-name'='')

Copy link
Contributor Author

@swapna267 swapna267 Feb 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With current changes in PR, there is one side-effect. In Case 2 & 3 mentioned above, the extra properties added in getTable,

catalogAndTableProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), getName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_DATABASE.key(), tablePath.getDatabaseName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_TABLE.key(), tablePath.getObjectName());
catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);
will be added as Table properties in destination table. Looking into nicer ways to avoid that. An option can be to add an extraProperty like say , createtabletype -> like in getTable , which can be used to differentiate the way table is being created. And extra props can be dropped in case 2&3 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this:

Currently the Catalog information comes when Catalog is created.

Let's talk offline

Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");
createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists);
}
Expand Down Expand Up @@ -625,7 +629,7 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
return partitionKeysBuilder.build();
}

static CatalogTable toCatalogTable(Table table) {
static CatalogTable toCatalogTableWithProps(Table table, Map<String, String> props) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

Expand All @@ -634,7 +638,11 @@ static CatalogTable toCatalogTable(Table table) {
// CatalogTableImpl to copy a new catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
// Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
return new CatalogTableImpl(schema, partitionKeys, props, null);
}

static CatalogTable toCatalogTable(Table table) {
return toCatalogTableWithProps(table, table.properties());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected Catalog createCatalog(
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

public class FlinkCreateTableOptions {

private FlinkCreateTableOptions() {}

public static final ConfigOption<String> CATALOG_NAME =
ConfigOptions.key("catalog-name")
.stringType()
.noDefaultValue()
.withDescription("Catalog name");

public static final ConfigOption<String> CATALOG_TYPE =
ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
.stringType()
.noDefaultValue()
.withDescription("Catalog type, the optional types are: custom, hadoop, hive.");

public static final ConfigOption<String> CATALOG_DATABASE =
ConfigOptions.key("catalog-database")
.stringType()
.defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
.withDescription("Database name managed in the iceberg catalog.");

public static final ConfigOption<String> CATALOG_TABLE =
ConfigOptions.key("catalog-table")
.stringType()
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
Expand All @@ -45,31 +44,6 @@
public class FlinkDynamicTableFactory
implements DynamicTableSinkFactory, DynamicTableSourceFactory {
static final String FACTORY_IDENTIFIER = "iceberg";

private static final ConfigOption<String> CATALOG_NAME =
ConfigOptions.key("catalog-name")
.stringType()
.noDefaultValue()
.withDescription("Catalog name");

private static final ConfigOption<String> CATALOG_TYPE =
ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
.stringType()
.noDefaultValue()
.withDescription("Catalog type, the optional types are: custom, hadoop, hive.");

private static final ConfigOption<String> CATALOG_DATABASE =
ConfigOptions.key("catalog-database")
.stringType()
.defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
.withDescription("Database name managed in the iceberg catalog.");

private static final ConfigOption<String> CATALOG_TABLE =
ConfigOptions.key("catalog-table")
.stringType()
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");

private final FlinkCatalog catalog;

public FlinkDynamicTableFactory() {
Expand Down Expand Up @@ -127,16 +101,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = Sets.newHashSet();
options.add(CATALOG_TYPE);
options.add(CATALOG_NAME);
options.add(FlinkCreateTableOptions.CATALOG_TYPE);
options.add(FlinkCreateTableOptions.CATALOG_NAME);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = Sets.newHashSet();
options.add(CATALOG_DATABASE);
options.add(CATALOG_TABLE);
options.add(FlinkCreateTableOptions.CATALOG_DATABASE);
options.add(FlinkCreateTableOptions.CATALOG_TABLE);
return options;
}

Expand All @@ -153,14 +127,17 @@ private static TableLoader createTableLoader(
Configuration flinkConf = new Configuration();
tableProps.forEach(flinkConf::setString);

String catalogName = flinkConf.getString(CATALOG_NAME);
String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME);
Preconditions.checkNotNull(
catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
catalogName,
"Table property '%s' cannot be null",
FlinkCreateTableOptions.CATALOG_NAME.key());

String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
String catalogDatabase =
flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");

String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName);
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");

org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
Expand All @@ -53,7 +54,8 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
SupportsLimitPushDown {
SupportsLimitPushDown,
SupportsSourceWatermark {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have 2 feature in a single PR:

  • CREATE TABLE LIKE
  • Watermark support

Could we separate out these features to different PRs?
Could we write tests for both features?

Copy link
Contributor Author

@swapna267 swapna267 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These features were driven by mainly a use case, where an iceberg table is needed to be used in Flink window functions. This needs an incoming table to have MILLISECOND precision timestamp column and also watermark to be defined on source table.

As iceberg only supports MICROSECOND timestamp columns, we need to have a table with computed columns and we can create these only in Flink Catalog. Iceberg catalog doesn't support creating tables with computed columns.

i am happy to split them into 2 separate PR's .
I have tests for CREATE TABLE LIKE.

As Watermark support is just making Source to implement interface and falling back to #9346 for core logic, i didn't have a test case. I can add a validation on if watermark-column is configured or not , so it can fail fast. And a test case around that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please separate out the features to 2 PR


private int[] projectedFields;
private Long limit;
Expand Down Expand Up @@ -175,6 +177,14 @@ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
return Result.of(acceptedFilters, flinkFilters);
}

@Override
public void applySourceWatermark() {
if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) {
throw new UnsupportedOperationException(
"Source watermarks are supported only in flip-27 iceberg source implementation");
}
}

@Override
public boolean supportsNestedProjection() {
// TODO: support nested projection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ public void testCreateTableLike() throws TableNotExistException {
.isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build());
}

@TestTemplate
public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");

sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl");

CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
assertThat(catalogTable.getSchema())
.isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build());

Map<String, String> options = catalogTable.getOptions();
assertThat(options.entrySet().containsAll(config.entrySet())).isTrue();
assertThat(options.get(FlinkCreateTableOptions.CATALOG_NAME.key())).isEqualTo(catalogName);
assertThat(options.get(FlinkCreateTableOptions.CATALOG_DATABASE.key())).isEqualTo(DATABASE);
assertThat(options.get(FlinkCreateTableOptions.CATALOG_TABLE.key())).isEqualTo("tl");
}

@TestTemplate
public void testCreateTableLocation() {
assumeThat(isHadoopCatalog)
Expand Down Expand Up @@ -660,10 +677,12 @@ private Table table(String name) {
}

private CatalogTable catalogTable(String name) throws TableNotExistException {
return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name);
}

private CatalogTable catalogTable(String catalog, String database, String table)
throws TableNotExistException {
return (CatalogTable)
getTableEnv()
.getCatalog(getTableEnv().getCurrentCatalog())
.get()
.getTable(new ObjectPath(DATABASE, name));
getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,43 +256,6 @@ public void testCatalogDatabaseConflictWithFlinkDatabase() {
.hasMessageStartingWith("Could not execute CreateTable in path");
}

@TestTemplate
public void testConnectorTableInIcebergCatalog() {
// Create the catalog properties
Map<String, String> catalogProps = Maps.newHashMap();
Comment on lines -259 to -262
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing the check mentioned in #12116 (comment)

Fail creating a table in Iceberg Catalog if connector=iceberg is specified in the Option. As the check is been deleted, i removed this test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is still a valid check in most cases. Only not valid when the table is created with "CREATE TABLE.. LIKE" and only if the source table is an iceberg table.
Do I miss something?

catalogProps.put("type", "iceberg");
if (isHiveCatalog()) {
catalogProps.put("catalog-type", "hive");
catalogProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
} else {
catalogProps.put("catalog-type", "hadoop");
}
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse());

// Create the table properties
Map<String, String> tableProps = createTableProps();

// Create a connector table in an iceberg catalog.
sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps));
try {
assertThatThrownBy(
() ->
sql(
"CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s",
FlinkCatalogFactory.DEFAULT_DATABASE_NAME,
TABLE_NAME,
toWithClause(tableProps)))
.cause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, "
+ "Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
} finally {
sql("DROP CATALOG IF EXISTS `test_catalog`");
}
}

private Map<String, String> createTableProps() {
Map<String, String> tableProps = Maps.newHashMap(properties);
tableProps.put("catalog-name", catalogName);
Expand Down