diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index 6b94dadace08..fc0269f63160 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -39,6 +39,7 @@ import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -70,6 +71,14 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami .noDefaultValue() .withDescription("Table name managed in the underlying iceberg catalog and database."); + // Flink 1.13.x change the return type from CatalogTable interface to ResolvedCatalogTable which extends the + // CatalogTable. Here we use the dynamic method loading approach to avoid adding explicit CatalogTable or + // ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose. + private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable") + .impl(Context.class, "getCatalogTable") + .orNoop() + .build(); + private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -80,12 +89,16 @@ public FlinkDynamicTableFactory(FlinkCatalog catalog) { this.catalog = catalog; } + private static CatalogTable loadCatalogTable(Context context) { + return GET_CATALOG_TABLE.invoke(context); + } + @Override public DynamicTableSource createDynamicTableSource(Context context) { ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); - Map tableProps = context.getCatalogTable().getOptions(); - CatalogTable catalogTable = context.getCatalogTable(); - TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + CatalogTable catalogTable = loadCatalogTable(context); + Map tableProps = catalogTable.getOptions(); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); TableLoader tableLoader; if (catalog != null) { @@ -101,9 +114,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { @Override public DynamicTableSink createDynamicTableSink(Context context) { ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); - Map tableProps = context.getCatalogTable().getOptions(); - CatalogTable catalogTable = context.getCatalogTable(); - TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + CatalogTable catalogTable = loadCatalogTable(context); + Map tableProps = catalogTable.getOptions(); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); TableLoader tableLoader; if (catalog != null) { diff --git a/versions.props b/versions.props index c4e7043831aa..d35401fb486b 100644 --- a/versions.props +++ b/versions.props @@ -1,7 +1,7 @@ org.slf4j:* = 1.7.25 org.apache.avro:avro = 1.10.1 org.apache.calcite:* = 1.10.0 -org.apache.flink:* = 1.13.2 +org.apache.flink:* = 1.12.5 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-metastore = 2.3.8 org.apache.hive:hive-serde = 2.3.8