Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -91,23 +92,27 @@
* <p>The Iceberg table manages its partitions by itself. The partition of the Iceberg table is
* independent of the partition of Flink.
*/
@Internal
public class FlinkCatalog extends AbstractCatalog {
private final CatalogLoader catalogLoader;
private final Catalog icebergCatalog;
private final Namespace baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final Map<String, String> catalogProps;
private final boolean cacheEnabled;

public FlinkCatalog(
String catalogName,
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down Expand Up @@ -332,7 +337,34 @@ public List<String> listTables(String databaseName)
public CatalogTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
Table table = loadIcebergTable(tablePath);
return toCatalogTable(table);

// Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table.
// Inorder to create such table in non iceberg catalog, we need to send across catalog
// properties also.
// As Flink API accepts only Map<String, String> for props, here we are serializing catalog
// props as json string to distinguish between catalog and table properties in createTable.
String srcCatalogProps =
FlinkCreateTableOptions.toJson(
getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps);

Map<String, String> tableProps = table.properties();
if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)
|| tableProps.containsKey(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) {
throw new IllegalArgumentException(
String.format(
"Source table %s contains one/all of the reserved property keys: %s, %s.",
tablePath,
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY,
FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY));
}

ImmutableMap.Builder<String, String> mergedProps = ImmutableMap.builder();
mergedProps.put(
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, FlinkDynamicTableFactory.FACTORY_IDENTIFIER);
mergedProps.put(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps);
mergedProps.putAll(tableProps);

return toCatalogTableWithProps(table, mergedProps.build());
}

private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
Expand Down Expand Up @@ -384,13 +416,17 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws CatalogException, TableAlreadyExistException {
// Creating Iceberg table using connector is allowed only when table is created using LIKE
if (Objects.equals(
table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
table.getOptions().get(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY),
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)
&& table.getOptions().get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY) == null) {
throw new IllegalArgumentException(
"Cannot create the table with 'connector'='iceberg' table property in "
+ "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
}

Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");
createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists);
}
Expand All @@ -404,10 +440,14 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea
ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
String location = null;
for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
if ("location".equalsIgnoreCase(entry.getKey())) {
location = entry.getValue();
} else {
if (!isReservedProperty(entry.getKey())) {
properties.put(entry.getKey(), entry.getValue());
} else {
// Filtering reserved properties like catalog properties(added to support CREATE TABLE LIKE
// in getTable()), location and not persisting on table properties.
if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(entry.getKey())) {
location = entry.getValue();
}
}
}

Expand All @@ -421,6 +461,12 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea
}
}

private boolean isReservedProperty(String prop) {
return FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(prop)
|| FlinkCreateTableOptions.CONNECTOR_PROPS_KEY.equalsIgnoreCase(prop)
|| FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY.equalsIgnoreCase(prop);
}

private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) {
TableSchema ts1 = ct1.getSchema();
TableSchema ts2 = ct2.getSchema();
Expand Down Expand Up @@ -501,7 +547,7 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
continue;
}

if ("location".equalsIgnoreCase(key)) {
if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(key)) {
setLocation = value;
} else if ("current-snapshot-id".equalsIgnoreCase(key)) {
setSnapshotId = value;
Expand Down Expand Up @@ -558,7 +604,7 @@ public void alterTable(
if (change instanceof TableChange.SetOption) {
TableChange.SetOption set = (TableChange.SetOption) change;

if ("location".equalsIgnoreCase(set.getKey())) {
if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(set.getKey())) {
setLocation = set.getValue();
} else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) {
setSnapshotId = set.getValue();
Expand Down Expand Up @@ -625,7 +671,7 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
return partitionKeysBuilder.build();
}

static CatalogTable toCatalogTable(Table table) {
static CatalogTable toCatalogTableWithProps(Table table, Map<String, String> props) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

Expand All @@ -634,7 +680,11 @@ static CatalogTable toCatalogTable(Table table) {
// CatalogTableImpl to copy a new catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
// Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
return new CatalogTableImpl(schema, partitionKeys, props, null);
}

static CatalogTable toCatalogTable(Table table) {
return toCatalogTableWithProps(table, table.properties());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected Catalog createCatalog(
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.util.JsonUtil;

class FlinkCreateTableOptions {
private final String catalogName;
private final String catalogDb;
private final String catalogTable;
private final Map<String, String> catalogProps;

private FlinkCreateTableOptions(
String catalogName, String catalogDb, String catalogTable, Map<String, String> props) {
this.catalogName = catalogName;
this.catalogDb = catalogDb;
this.catalogTable = catalogTable;
this.catalogProps = props;
}

public static final ConfigOption<String> CATALOG_NAME =
ConfigOptions.key("catalog-name")
.stringType()
.noDefaultValue()
.withDescription("Catalog name");

public static final ConfigOption<String> CATALOG_TYPE =
ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
.stringType()
.noDefaultValue()
.withDescription("Catalog type, the optional types are: custom, hadoop, hive.");

public static final ConfigOption<String> CATALOG_DATABASE =
ConfigOptions.key("catalog-database")
.stringType()
.defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
.withDescription("Database name managed in the iceberg catalog.");

public static final ConfigOption<String> CATALOG_TABLE =
ConfigOptions.key("catalog-table")
.stringType()
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");

public static final ConfigOption<Map<String, String>> CATALOG_PROPS =
ConfigOptions.key("catalog-props")
.mapType()
.noDefaultValue()
.withDescription("Properties for the underlying catalog for iceberg table.");

public static final String SRC_CATALOG_PROPS_KEY = "src-catalog";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also does Flink create table ... like ... work for Hive tables as source? I assume no. otherwise, we can refer to how that is implemented.

It feels tacky to use special property key to carry over catalog props and source table identifier. Does it make sense to modify Flink API to support this more elegantly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this is not supported in hive,
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/hive-compatibility/hive-dialect/create/

Supported in Kafka, but yeah there is no concept of catalog , mostly flat properties of cluster for kafka connector.
Sure, can look into Flink API changes for long term, but as all connectors don't have concept of catalog or hierarchy like Iceberg, not sure how that works out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, Kafka doesn't have catalog concept. we can take this discussion separately. it won't be a blocker for this PR

public static final String CONNECTOR_PROPS_KEY = "connector";
public static final String LOCATION_KEY = "location";

static String toJson(
String catalogName, String catalogDb, String catalogTable, Map<String, String> catalogProps) {
return JsonUtil.generate(
gen -> {
gen.writeStartObject();
gen.writeStringField(CATALOG_NAME.key(), catalogName);
gen.writeStringField(CATALOG_DATABASE.key(), catalogDb);
gen.writeStringField(CATALOG_TABLE.key(), catalogTable);
JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen);
gen.writeEndObject();
},
false);
}

static FlinkCreateTableOptions fromJson(String createTableOptions) {
return JsonUtil.parse(
createTableOptions,
node -> {
String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node);
String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node);
String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node);
Map<String, String> catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node);

return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps);
});
}

String catalogName() {
return catalogName;
}

String catalogDb() {
return catalogDb;
}

String catalogTable() {
return catalogTable;
}

Map<String, String> catalogProps() {
return catalogProps;
}
}
Loading