diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 86295d78cc13..76c6dc758265 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -91,12 +92,14 @@ *

The Iceberg table manages its partitions by itself. The partition of the Iceberg table is * independent of the partition of Flink. */ +@Internal public class FlinkCatalog extends AbstractCatalog { private final CatalogLoader catalogLoader; private final Catalog icebergCatalog; private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; + private final Map catalogProps; private final boolean cacheEnabled; public FlinkCatalog( @@ -104,10 +107,12 @@ public FlinkCatalog( String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, + Map catalogProps, boolean cacheEnabled, long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; + this.catalogProps = catalogProps; this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; @@ -332,7 +337,34 @@ public List listTables(String databaseName) public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table = loadIcebergTable(tablePath); - return toCatalogTable(table); + + // Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table. + // Inorder to create such table in non iceberg catalog, we need to send across catalog + // properties also. + // As Flink API accepts only Map for props, here we are serializing catalog + // props as json string to distinguish between catalog and table properties in createTable. + String srcCatalogProps = + FlinkCreateTableOptions.toJson( + getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps); + + Map tableProps = table.properties(); + if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY) + || tableProps.containsKey(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) { + throw new IllegalArgumentException( + String.format( + "Source table %s contains one/all of the reserved property keys: %s, %s.", + tablePath, + FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, + FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)); + } + + ImmutableMap.Builder mergedProps = ImmutableMap.builder(); + mergedProps.put( + FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + mergedProps.put(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + mergedProps.putAll(tableProps); + + return toCatalogTableWithProps(table, mergedProps.build()); } private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { @@ -384,13 +416,17 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { + // Creating Iceberg table using connector is allowed only when table is created using LIKE if (Objects.equals( - table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { + table.getOptions().get(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY), + FlinkDynamicTableFactory.FACTORY_IDENTIFIER) + && table.getOptions().get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY) == null) { throw new IllegalArgumentException( "Cannot create the table with 'connector'='iceberg' table property in " + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + "create table without 'connector'='iceberg' related properties in an iceberg table."); } + Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved"); createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists); } @@ -404,10 +440,14 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea ImmutableMap.Builder properties = ImmutableMap.builder(); String location = null; for (Map.Entry entry : table.getOptions().entrySet()) { - if ("location".equalsIgnoreCase(entry.getKey())) { - location = entry.getValue(); - } else { + if (!isReservedProperty(entry.getKey())) { properties.put(entry.getKey(), entry.getValue()); + } else { + // Filtering reserved properties like catalog properties(added to support CREATE TABLE LIKE + // in getTable()), location and not persisting on table properties. + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(entry.getKey())) { + location = entry.getValue(); + } } } @@ -421,6 +461,12 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea } } + private boolean isReservedProperty(String prop) { + return FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(prop) + || FlinkCreateTableOptions.CONNECTOR_PROPS_KEY.equalsIgnoreCase(prop) + || FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY.equalsIgnoreCase(prop); + } + private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) { TableSchema ts1 = ct1.getSchema(); TableSchema ts2 = ct2.getSchema(); @@ -501,7 +547,7 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean continue; } - if ("location".equalsIgnoreCase(key)) { + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(key)) { setLocation = value; } else if ("current-snapshot-id".equalsIgnoreCase(key)) { setSnapshotId = value; @@ -558,7 +604,7 @@ public void alterTable( if (change instanceof TableChange.SetOption) { TableChange.SetOption set = (TableChange.SetOption) change; - if ("location".equalsIgnoreCase(set.getKey())) { + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(set.getKey())) { setLocation = set.getValue(); } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) { setSnapshotId = set.getValue(); @@ -625,7 +671,7 @@ private static List toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - static CatalogTable toCatalogTable(Table table) { + static CatalogTable toCatalogTableWithProps(Table table, Map props) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); @@ -634,7 +680,11 @@ static CatalogTable toCatalogTable(Table table) { // CatalogTableImpl to copy a new catalog table. // Let's re-loading table from Iceberg catalog when creating source/sink operators. // Iceberg does not have Table comment, so pass a null (Default comment value in Flink). - return new CatalogTableImpl(schema, partitionKeys, table.properties(), null); + return new CatalogTableImpl(schema, partitionKeys, props, null); + } + + static CatalogTable toCatalogTable(Table table) { + return toCatalogTableWithProps(table, table.properties()); } @Override diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 9b0c7a938920..dd065617bd88 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -72,7 +72,6 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String DEFAULT_DATABASE_NAME = "default"; public static final String DEFAULT_CATALOG_NAME = "default_catalog"; public static final String BASE_NAMESPACE = "base-namespace"; - public static final String TYPE = "type"; public static final String PROPERTY_VERSION = "property-version"; @@ -169,6 +168,7 @@ protected Catalog createCatalog( defaultDatabase, baseNamespace, catalogLoader, + properties, cacheEnabled, cacheExpirationIntervalMs); } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java new file mode 100644 index 000000000000..ab69ec5adc7f --- /dev/null +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +class FlinkCreateTableOptions { + private final String catalogName; + private final String catalogDb; + private final String catalogTable; + private final Map catalogProps; + + private FlinkCreateTableOptions( + String catalogName, String catalogDb, String catalogTable, Map props) { + this.catalogName = catalogName; + this.catalogDb = catalogDb; + this.catalogTable = catalogTable; + this.catalogProps = props; + } + + public static final ConfigOption CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + public static final ConfigOption> CATALOG_PROPS = + ConfigOptions.key("catalog-props") + .mapType() + .noDefaultValue() + .withDescription("Properties for the underlying catalog for iceberg table."); + + public static final String SRC_CATALOG_PROPS_KEY = "src-catalog"; + public static final String CONNECTOR_PROPS_KEY = "connector"; + public static final String LOCATION_KEY = "location"; + + static String toJson( + String catalogName, String catalogDb, String catalogTable, Map catalogProps) { + return JsonUtil.generate( + gen -> { + gen.writeStartObject(); + gen.writeStringField(CATALOG_NAME.key(), catalogName); + gen.writeStringField(CATALOG_DATABASE.key(), catalogDb); + gen.writeStringField(CATALOG_TABLE.key(), catalogTable); + JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen); + gen.writeEndObject(); + }, + false); + } + + static FlinkCreateTableOptions fromJson(String createTableOptions) { + return JsonUtil.parse( + createTableOptions, + node -> { + String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node); + String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node); + String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node); + Map catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node); + + return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps); + }); + } + + String catalogName() { + return catalogName; + } + + String catalogDb() { + return catalogDb; + } + + String catalogTable() { + return catalogTable; + } + + Map catalogProps() { + return catalogProps; + } +} diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b7f1be4b93fb..a92848d7ccd1 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -18,10 +18,10 @@ */ package org.apache.iceberg.flink; +import java.util.Collections; import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -45,31 +45,6 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { static final String FACTORY_IDENTIFIER = "iceberg"; - - private static final ConfigOption CATALOG_NAME = - ConfigOptions.key("catalog-name") - .stringType() - .noDefaultValue() - .withDescription("Catalog name"); - - private static final ConfigOption CATALOG_TYPE = - ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) - .stringType() - .noDefaultValue() - .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); - - private static final ConfigOption CATALOG_DATABASE = - ConfigOptions.key("catalog-database") - .stringType() - .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) - .withDescription("Database name managed in the iceberg catalog."); - - private static final ConfigOption CATALOG_TABLE = - ConfigOptions.key("catalog-table") - .stringType() - .noDefaultValue() - .withDescription("Table name managed in the underlying iceberg catalog and database."); - private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -127,16 +102,16 @@ public DynamicTableSink createDynamicTableSink(Context context) { @Override public Set> requiredOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_TYPE); - options.add(CATALOG_NAME); + options.add(FlinkCreateTableOptions.CATALOG_TYPE); + options.add(FlinkCreateTableOptions.CATALOG_NAME); return options; } @Override public Set> optionalOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_DATABASE); - options.add(CATALOG_TABLE); + options.add(FlinkCreateTableOptions.CATALOG_DATABASE); + options.add(FlinkCreateTableOptions.CATALOG_TABLE); return options; } @@ -151,22 +126,28 @@ private static TableLoader createTableLoader( String databaseName, String tableName) { Configuration flinkConf = new Configuration(); - tableProps.forEach(flinkConf::setString); - String catalogName = flinkConf.getString(CATALOG_NAME); + Map mergedProps = mergeSrcCatalogProps(tableProps); + + mergedProps.forEach(flinkConf::setString); + + String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME); Preconditions.checkNotNull( - catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key()); + catalogName, + "Table property '%s' cannot be null", + FlinkCreateTableOptions.CATALOG_NAME.key()); - String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName); + String catalogDatabase = + flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName); Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null"); - String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); + String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName); Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); FlinkCatalogFactory factory = new FlinkCatalogFactory(); FlinkCatalog flinkCatalog = - (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf); + (FlinkCatalog) factory.createCatalog(catalogName, mergedProps, hadoopConf); ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable); // Create database if not exists in the external catalog. @@ -201,6 +182,42 @@ private static TableLoader createTableLoader( flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable)); } + /** + * Merges source catalog properties with connector properties. Iceberg Catalog properties are + * serialized as json in FlinkCatalog#getTable to be able to isolate catalog props from iceberg + * table props, Here, we flatten and merge them back to use to create catalog. + * + * @param tableProps the existing table properties + * @return a map of merged properties, with source catalog properties taking precedence when keys + * conflict + */ + private static Map mergeSrcCatalogProps(Map tableProps) { + String srcCatalogProps = tableProps.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY); + if (srcCatalogProps != null) { + Map mergedProps = Maps.newHashMap(); + FlinkCreateTableOptions createTableOptions = + FlinkCreateTableOptions.fromJson(srcCatalogProps); + + mergedProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), createTableOptions.catalogName()); + mergedProps.put( + FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb()); + mergedProps.put( + FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable()); + mergedProps.putAll(createTableOptions.catalogProps()); + + tableProps.forEach( + (k, v) -> { + if (!FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY.equals(k)) { + mergedProps.put(k, v); + } + }); + + return Collections.unmodifiableMap(mergedProps); + } + + return tableProps; + } + private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) { Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 65adce77d9f9..662dc30e27ca 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -35,12 +35,14 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkFilters; +import org.apache.iceberg.flink.FlinkReadOptions; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.assigner.SplitAssignerType; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -53,7 +55,8 @@ public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, - SupportsLimitPushDown { + SupportsLimitPushDown, + SupportsSourceWatermark { private int[] projectedFields; private Long limit; @@ -175,6 +178,17 @@ public Result applyFilters(List flinkFilters) { return Result.of(acceptedFilters, flinkFilters); } + @Override + public void applySourceWatermark() { + Preconditions.checkArgument( + readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE), + "Source watermarks are supported only in flip-27 iceberg source implementation"); + + Preconditions.checkNotNull( + properties.get(FlinkReadOptions.WATERMARK_COLUMN), + "watermark-column needs to be configured to use source watermark."); + } + @Override public boolean supportsNestedProjection() { // TODO: support nested projection diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index c07ebed8cef9..b401d2d1cf9c 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -73,7 +73,7 @@ public void before() { public void cleanNamespaces() { sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase); sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } @@ -188,6 +188,40 @@ public void testCreateTableLike() throws TableNotExistException { .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } + @TestTemplate + public void testCreateTableLikeInDiffIcebergCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + + String catalog2 = catalogName + "2"; + sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config)); + sql("CREATE DATABASE %s", catalog2 + ".testdb"); + sql("CREATE TABLE %s LIKE tl", catalog2 + ".testdb.tl2"); + + CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + + dropCatalog(catalog2, true); + } + + @TestTemplate + public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + + sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl"); + + CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + + String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", config); + Map options = catalogTable.getOptions(); + assertThat(options.get(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)) + .isEqualTo(FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + assertThat(options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) + .isEqualTo(srcCatalogProps); + } + @TestTemplate public void testCreateTableLocation() { assumeThat(isHadoopCatalog) @@ -639,11 +673,11 @@ public void testSetCurrentAndCherryPickSnapshotId() { private void validateTableFiles(Table tbl, DataFile... expectedFiles) { tbl.refresh(); Set expectedFilePaths = - Arrays.stream(expectedFiles).map(DataFile::path).collect(Collectors.toSet()); + Arrays.stream(expectedFiles).map(DataFile::location).collect(Collectors.toSet()); Set actualFilePaths = StreamSupport.stream(tbl.newScan().planFiles().spliterator(), false) .map(FileScanTask::file) - .map(ContentFile::path) + .map(ContentFile::location) .collect(Collectors.toSet()); assertThat(actualFilePaths).as("Files should match").isEqualTo(expectedFilePaths); } @@ -653,10 +687,12 @@ private Table table(String name) { } private CatalogTable catalogTable(String name) throws TableNotExistException { + return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name); + } + + private CatalogTable catalogTable(String catalog, String database, String table) + throws TableNotExistException { return (CatalogTable) - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .getTable(new ObjectPath(DATABASE, name)); + getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table)); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 66bdeee1d407..0cdaf8371cbd 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import java.time.Instant; @@ -40,6 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase { @BeforeEach @Override public void before() throws IOException { - TableEnvironment tableEnvironment = getTableEnv(); + setUpTableEnv(getTableEnv()); + setUpTableEnv(getStreamingTableEnv()); + } + + private static void setUpTableEnv(TableEnvironment tableEnvironment) { Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); // Disable inferring parallelism to avoid interfering watermark tests @@ -72,6 +78,11 @@ public void before() throws IOException { tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); } + @AfterEach + public void after() throws IOException { + CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + private Record generateRecord(Instant t1, long t2) { Record record = GenericRecord.create(SCHEMA_TS); record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); @@ -162,4 +173,61 @@ public void testWatermarkOptionsDescending() throws Exception { expected, SCHEMA_TS); } + + @Test + public void testReadFlinkDynamicTable() throws Exception { + List expected = generateExpectedRecords(false); + SqlHelpers.sql( + getTableEnv(), + "create table `default_catalog`.`default_database`.flink_table LIKE iceberg_catalog.`default`.%s", + TestFixtures.TABLE); + + // Read from table in flink catalog + TestHelpers.assertRecords( + SqlHelpers.sql( + getTableEnv(), "select * from `default_catalog`.`default_database`.flink_table"), + expected, + SCHEMA_TS); + } + + @Test + public void testWatermarkInvalidConfig() { + CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + + String flinkTable = "`default_catalog`.`default_database`.flink_table"; + SqlHelpers.sql( + getStreamingTableEnv(), + "CREATE TABLE %s " + + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s", + flinkTable, + TestFixtures.TABLE); + + assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable)) + .isInstanceOf(NullPointerException.class) + .hasMessage("watermark-column needs to be configured to use source watermark."); + } + + @Test + public void testWatermarkValidConfig() throws Exception { + List expected = generateExpectedRecords(true); + + String flinkTable = "`default_catalog`.`default_database`.flink_table"; + + SqlHelpers.sql( + getStreamingTableEnv(), + "CREATE TABLE %s " + + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s", + flinkTable, + TestFixtures.TABLE); + + TestHelpers.assertRecordsWithOrder( + SqlHelpers.sql( + getStreamingTableEnv(), + "SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), INTERVAL '1' SECOND))", + flinkTable), + expected, + SCHEMA_TS); + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java index f9b776397cfc..dd63154fe03b 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java @@ -63,6 +63,8 @@ public abstract class TestSqlBase { private volatile TableEnvironment tEnv; + private volatile TableEnvironment streamingTEnv; + protected TableEnvironment getTableEnv() { if (tEnv == null) { synchronized (this) { @@ -75,6 +77,19 @@ protected TableEnvironment getTableEnv() { return tEnv; } + protected TableEnvironment getStreamingTableEnv() { + if (streamingTEnv == null) { + synchronized (this) { + if (streamingTEnv == null) { + this.streamingTEnv = + TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); + } + } + } + + return streamingTEnv; + } + @BeforeEach public abstract void before() throws IOException; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 86295d78cc13..76c6dc758265 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -91,12 +92,14 @@ *

The Iceberg table manages its partitions by itself. The partition of the Iceberg table is * independent of the partition of Flink. */ +@Internal public class FlinkCatalog extends AbstractCatalog { private final CatalogLoader catalogLoader; private final Catalog icebergCatalog; private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; + private final Map catalogProps; private final boolean cacheEnabled; public FlinkCatalog( @@ -104,10 +107,12 @@ public FlinkCatalog( String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, + Map catalogProps, boolean cacheEnabled, long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; + this.catalogProps = catalogProps; this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; @@ -332,7 +337,34 @@ public List listTables(String databaseName) public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table = loadIcebergTable(tablePath); - return toCatalogTable(table); + + // Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table. + // Inorder to create such table in non iceberg catalog, we need to send across catalog + // properties also. + // As Flink API accepts only Map for props, here we are serializing catalog + // props as json string to distinguish between catalog and table properties in createTable. + String srcCatalogProps = + FlinkCreateTableOptions.toJson( + getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps); + + Map tableProps = table.properties(); + if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY) + || tableProps.containsKey(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) { + throw new IllegalArgumentException( + String.format( + "Source table %s contains one/all of the reserved property keys: %s, %s.", + tablePath, + FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, + FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)); + } + + ImmutableMap.Builder mergedProps = ImmutableMap.builder(); + mergedProps.put( + FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + mergedProps.put(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + mergedProps.putAll(tableProps); + + return toCatalogTableWithProps(table, mergedProps.build()); } private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { @@ -384,13 +416,17 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { + // Creating Iceberg table using connector is allowed only when table is created using LIKE if (Objects.equals( - table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { + table.getOptions().get(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY), + FlinkDynamicTableFactory.FACTORY_IDENTIFIER) + && table.getOptions().get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY) == null) { throw new IllegalArgumentException( "Cannot create the table with 'connector'='iceberg' table property in " + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + "create table without 'connector'='iceberg' related properties in an iceberg table."); } + Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved"); createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists); } @@ -404,10 +440,14 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea ImmutableMap.Builder properties = ImmutableMap.builder(); String location = null; for (Map.Entry entry : table.getOptions().entrySet()) { - if ("location".equalsIgnoreCase(entry.getKey())) { - location = entry.getValue(); - } else { + if (!isReservedProperty(entry.getKey())) { properties.put(entry.getKey(), entry.getValue()); + } else { + // Filtering reserved properties like catalog properties(added to support CREATE TABLE LIKE + // in getTable()), location and not persisting on table properties. + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(entry.getKey())) { + location = entry.getValue(); + } } } @@ -421,6 +461,12 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea } } + private boolean isReservedProperty(String prop) { + return FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(prop) + || FlinkCreateTableOptions.CONNECTOR_PROPS_KEY.equalsIgnoreCase(prop) + || FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY.equalsIgnoreCase(prop); + } + private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) { TableSchema ts1 = ct1.getSchema(); TableSchema ts2 = ct2.getSchema(); @@ -501,7 +547,7 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean continue; } - if ("location".equalsIgnoreCase(key)) { + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(key)) { setLocation = value; } else if ("current-snapshot-id".equalsIgnoreCase(key)) { setSnapshotId = value; @@ -558,7 +604,7 @@ public void alterTable( if (change instanceof TableChange.SetOption) { TableChange.SetOption set = (TableChange.SetOption) change; - if ("location".equalsIgnoreCase(set.getKey())) { + if (FlinkCreateTableOptions.LOCATION_KEY.equalsIgnoreCase(set.getKey())) { setLocation = set.getValue(); } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) { setSnapshotId = set.getValue(); @@ -625,7 +671,7 @@ private static List toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - static CatalogTable toCatalogTable(Table table) { + static CatalogTable toCatalogTableWithProps(Table table, Map props) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); @@ -634,7 +680,11 @@ static CatalogTable toCatalogTable(Table table) { // CatalogTableImpl to copy a new catalog table. // Let's re-loading table from Iceberg catalog when creating source/sink operators. // Iceberg does not have Table comment, so pass a null (Default comment value in Flink). - return new CatalogTableImpl(schema, partitionKeys, table.properties(), null); + return new CatalogTableImpl(schema, partitionKeys, props, null); + } + + static CatalogTable toCatalogTable(Table table) { + return toCatalogTableWithProps(table, table.properties()); } @Override diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index fe4008a13ce5..dd065617bd88 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -168,6 +168,7 @@ protected Catalog createCatalog( defaultDatabase, baseNamespace, catalogLoader, + properties, cacheEnabled, cacheExpirationIntervalMs); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java new file mode 100644 index 000000000000..ab69ec5adc7f --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.util.JsonUtil; + +class FlinkCreateTableOptions { + private final String catalogName; + private final String catalogDb; + private final String catalogTable; + private final Map catalogProps; + + private FlinkCreateTableOptions( + String catalogName, String catalogDb, String catalogTable, Map props) { + this.catalogName = catalogName; + this.catalogDb = catalogDb; + this.catalogTable = catalogTable; + this.catalogProps = props; + } + + public static final ConfigOption CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + public static final ConfigOption CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + public static final ConfigOption CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + public static final ConfigOption CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + public static final ConfigOption> CATALOG_PROPS = + ConfigOptions.key("catalog-props") + .mapType() + .noDefaultValue() + .withDescription("Properties for the underlying catalog for iceberg table."); + + public static final String SRC_CATALOG_PROPS_KEY = "src-catalog"; + public static final String CONNECTOR_PROPS_KEY = "connector"; + public static final String LOCATION_KEY = "location"; + + static String toJson( + String catalogName, String catalogDb, String catalogTable, Map catalogProps) { + return JsonUtil.generate( + gen -> { + gen.writeStartObject(); + gen.writeStringField(CATALOG_NAME.key(), catalogName); + gen.writeStringField(CATALOG_DATABASE.key(), catalogDb); + gen.writeStringField(CATALOG_TABLE.key(), catalogTable); + JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen); + gen.writeEndObject(); + }, + false); + } + + static FlinkCreateTableOptions fromJson(String createTableOptions) { + return JsonUtil.parse( + createTableOptions, + node -> { + String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node); + String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node); + String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node); + Map catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node); + + return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps); + }); + } + + String catalogName() { + return catalogName; + } + + String catalogDb() { + return catalogDb; + } + + String catalogTable() { + return catalogTable; + } + + Map catalogProps() { + return catalogProps; + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b7f1be4b93fb..a92848d7ccd1 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -18,10 +18,10 @@ */ package org.apache.iceberg.flink; +import java.util.Collections; import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -45,31 +45,6 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { static final String FACTORY_IDENTIFIER = "iceberg"; - - private static final ConfigOption CATALOG_NAME = - ConfigOptions.key("catalog-name") - .stringType() - .noDefaultValue() - .withDescription("Catalog name"); - - private static final ConfigOption CATALOG_TYPE = - ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) - .stringType() - .noDefaultValue() - .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); - - private static final ConfigOption CATALOG_DATABASE = - ConfigOptions.key("catalog-database") - .stringType() - .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) - .withDescription("Database name managed in the iceberg catalog."); - - private static final ConfigOption CATALOG_TABLE = - ConfigOptions.key("catalog-table") - .stringType() - .noDefaultValue() - .withDescription("Table name managed in the underlying iceberg catalog and database."); - private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -127,16 +102,16 @@ public DynamicTableSink createDynamicTableSink(Context context) { @Override public Set> requiredOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_TYPE); - options.add(CATALOG_NAME); + options.add(FlinkCreateTableOptions.CATALOG_TYPE); + options.add(FlinkCreateTableOptions.CATALOG_NAME); return options; } @Override public Set> optionalOptions() { Set> options = Sets.newHashSet(); - options.add(CATALOG_DATABASE); - options.add(CATALOG_TABLE); + options.add(FlinkCreateTableOptions.CATALOG_DATABASE); + options.add(FlinkCreateTableOptions.CATALOG_TABLE); return options; } @@ -151,22 +126,28 @@ private static TableLoader createTableLoader( String databaseName, String tableName) { Configuration flinkConf = new Configuration(); - tableProps.forEach(flinkConf::setString); - String catalogName = flinkConf.getString(CATALOG_NAME); + Map mergedProps = mergeSrcCatalogProps(tableProps); + + mergedProps.forEach(flinkConf::setString); + + String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME); Preconditions.checkNotNull( - catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key()); + catalogName, + "Table property '%s' cannot be null", + FlinkCreateTableOptions.CATALOG_NAME.key()); - String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName); + String catalogDatabase = + flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName); Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null"); - String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); + String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName); Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); FlinkCatalogFactory factory = new FlinkCatalogFactory(); FlinkCatalog flinkCatalog = - (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf); + (FlinkCatalog) factory.createCatalog(catalogName, mergedProps, hadoopConf); ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable); // Create database if not exists in the external catalog. @@ -201,6 +182,42 @@ private static TableLoader createTableLoader( flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable)); } + /** + * Merges source catalog properties with connector properties. Iceberg Catalog properties are + * serialized as json in FlinkCatalog#getTable to be able to isolate catalog props from iceberg + * table props, Here, we flatten and merge them back to use to create catalog. + * + * @param tableProps the existing table properties + * @return a map of merged properties, with source catalog properties taking precedence when keys + * conflict + */ + private static Map mergeSrcCatalogProps(Map tableProps) { + String srcCatalogProps = tableProps.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY); + if (srcCatalogProps != null) { + Map mergedProps = Maps.newHashMap(); + FlinkCreateTableOptions createTableOptions = + FlinkCreateTableOptions.fromJson(srcCatalogProps); + + mergedProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), createTableOptions.catalogName()); + mergedProps.put( + FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb()); + mergedProps.put( + FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable()); + mergedProps.putAll(createTableOptions.catalogProps()); + + tableProps.forEach( + (k, v) -> { + if (!FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY.equals(k)) { + mergedProps.put(k, v); + } + }); + + return Collections.unmodifiableMap(mergedProps); + } + + return tableProps; + } + private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) { Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 65adce77d9f9..662dc30e27ca 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -35,12 +35,14 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkFilters; +import org.apache.iceberg.flink.FlinkReadOptions; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.assigner.SplitAssignerType; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -53,7 +55,8 @@ public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, - SupportsLimitPushDown { + SupportsLimitPushDown, + SupportsSourceWatermark { private int[] projectedFields; private Long limit; @@ -175,6 +178,17 @@ public Result applyFilters(List flinkFilters) { return Result.of(acceptedFilters, flinkFilters); } + @Override + public void applySourceWatermark() { + Preconditions.checkArgument( + readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE), + "Source watermarks are supported only in flip-27 iceberg source implementation"); + + Preconditions.checkNotNull( + properties.get(FlinkReadOptions.WATERMARK_COLUMN), + "watermark-column needs to be configured to use source watermark."); + } + @Override public boolean supportsNestedProjection() { // TODO: support nested projection diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index c4f50658b7a4..b401d2d1cf9c 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -188,6 +188,40 @@ public void testCreateTableLike() throws TableNotExistException { .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } + @TestTemplate + public void testCreateTableLikeInDiffIcebergCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + + String catalog2 = catalogName + "2"; + sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config)); + sql("CREATE DATABASE %s", catalog2 + ".testdb"); + sql("CREATE TABLE %s LIKE tl", catalog2 + ".testdb.tl2"); + + CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + + dropCatalog(catalog2, true); + } + + @TestTemplate + public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT)"); + + sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl"); + + CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); + assertThat(catalogTable.getSchema()) + .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); + + String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", config); + Map options = catalogTable.getOptions(); + assertThat(options.get(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)) + .isEqualTo(FlinkDynamicTableFactory.FACTORY_IDENTIFIER); + assertThat(options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) + .isEqualTo(srcCatalogProps); + } + @TestTemplate public void testCreateTableLocation() { assumeThat(isHadoopCatalog) @@ -653,10 +687,12 @@ private Table table(String name) { } private CatalogTable catalogTable(String name) throws TableNotExistException { + return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name); + } + + private CatalogTable catalogTable(String catalog, String database, String table) + throws TableNotExistException { return (CatalogTable) - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .getTable(new ObjectPath(DATABASE, name)); + getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table)); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 66bdeee1d407..0cdaf8371cbd 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import java.time.Instant; @@ -40,6 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase { @BeforeEach @Override public void before() throws IOException { - TableEnvironment tableEnvironment = getTableEnv(); + setUpTableEnv(getTableEnv()); + setUpTableEnv(getStreamingTableEnv()); + } + + private static void setUpTableEnv(TableEnvironment tableEnvironment) { Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); // Disable inferring parallelism to avoid interfering watermark tests @@ -72,6 +78,11 @@ public void before() throws IOException { tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); } + @AfterEach + public void after() throws IOException { + CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + private Record generateRecord(Instant t1, long t2) { Record record = GenericRecord.create(SCHEMA_TS); record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); @@ -162,4 +173,61 @@ public void testWatermarkOptionsDescending() throws Exception { expected, SCHEMA_TS); } + + @Test + public void testReadFlinkDynamicTable() throws Exception { + List expected = generateExpectedRecords(false); + SqlHelpers.sql( + getTableEnv(), + "create table `default_catalog`.`default_database`.flink_table LIKE iceberg_catalog.`default`.%s", + TestFixtures.TABLE); + + // Read from table in flink catalog + TestHelpers.assertRecords( + SqlHelpers.sql( + getTableEnv(), "select * from `default_catalog`.`default_database`.flink_table"), + expected, + SCHEMA_TS); + } + + @Test + public void testWatermarkInvalidConfig() { + CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + + String flinkTable = "`default_catalog`.`default_database`.flink_table"; + SqlHelpers.sql( + getStreamingTableEnv(), + "CREATE TABLE %s " + + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s", + flinkTable, + TestFixtures.TABLE); + + assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable)) + .isInstanceOf(NullPointerException.class) + .hasMessage("watermark-column needs to be configured to use source watermark."); + } + + @Test + public void testWatermarkValidConfig() throws Exception { + List expected = generateExpectedRecords(true); + + String flinkTable = "`default_catalog`.`default_database`.flink_table"; + + SqlHelpers.sql( + getStreamingTableEnv(), + "CREATE TABLE %s " + + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s", + flinkTable, + TestFixtures.TABLE); + + TestHelpers.assertRecordsWithOrder( + SqlHelpers.sql( + getStreamingTableEnv(), + "SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), INTERVAL '1' SECOND))", + flinkTable), + expected, + SCHEMA_TS); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java index f9b776397cfc..dd63154fe03b 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java @@ -63,6 +63,8 @@ public abstract class TestSqlBase { private volatile TableEnvironment tEnv; + private volatile TableEnvironment streamingTEnv; + protected TableEnvironment getTableEnv() { if (tEnv == null) { synchronized (this) { @@ -75,6 +77,19 @@ protected TableEnvironment getTableEnv() { return tEnv; } + protected TableEnvironment getStreamingTableEnv() { + if (streamingTEnv == null) { + synchronized (this) { + if (streamingTEnv == null) { + this.streamingTEnv = + TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); + } + } + } + + return streamingTEnv; + } + @BeforeEach public abstract void before() throws IOException;