Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -70,6 +71,14 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");

// Flink 1.13.x change the return type from CatalogTable interface to ResolvedCatalogTable which extends the
// CatalogTable. Here we use the dynamic method loading approach to avoid adding explicit CatalogTable or
// ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose.
private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable")
.impl(Context.class, "getCatalogTable")
.orNoop()
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this fail if there is no getCatalogTable method? And if the method exists then it wouldn't need to be called dynamically. You may need a orNoop() call here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the method exists then it wouldn't need to be called dynamically

The reason why I use the dynamic approach is to avoid add the flink 1.13's ResolvedCatalogTable into the runtime jar. For example, if we decode the FlinkDynmaicTableFactory.class from iceberg-flink-runtime.jar (which is compiled by flink 1.13.2) by using javap -c ./org/apache/iceberg/flink/FlinkDynamicTableFactory.class , it will has the following JVM instructions:

Compiled from "FlinkDynamicTableFactory.java"
public class org.apache.iceberg.flink.FlinkDynamicTableFactory implements org.apache.flink.table.factories.DynamicTableSinkFactory,org.apache.flink.table.factories.DynamicTableSourceFactory {
  static final java.lang.String FACTORY_IDENTIFIER;

  public org.apache.iceberg.flink.FlinkDynamicTableFactory();
    Code:
       0: aload_0
       1: invokespecial #43                 // Method java/lang/Object."<init>":()V
       4: aload_0
       5: aconst_null
       6: putfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
       9: return

  public org.apache.iceberg.flink.FlinkDynamicTableFactory(org.apache.iceberg.flink.FlinkCatalog);
    Code:
       0: aload_0
       1: invokespecial #43                 // Method java/lang/Object."<init>":()V
       4: aload_0
       5: aload_1
       6: putfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
       9: return

  public org.apache.flink.table.connector.source.DynamicTableSource createDynamicTableSource(org.apache.flink.table.factories.DynamicTableFactory$Context);
    Code:
       0: aload_1
       1: invokeinterface #54,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getObjectIdentifier:()Lorg/apache/flink/table/catalog/ObjectIdentifier;
       6: astore_2
       7: aload_1
       8: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
      13: invokevirtual #64                 // Method org/apache/flink/table/catalog/ResolvedCatalogTable.getOptions:()Ljava/util/Map;
      16: astore_3
      17: aload_1
      18: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
      23: astore        4
      25: aload_1
      26: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
      31: invokevirtual #68                 // Method org/apache/flink/table/catalog/ResolvedCatalogTable.getSchema:()Lorg/apache/flink/table/api/TableSchema;
      34: invokestatic  #74                 // Method org/apache/flink/table/utils/TableSchemaUtils.getPhysicalSchema:(Lorg/apache/flink/table/api/TableSchema;)Lorg/apache/flink/table/api/TableSchema;
      37: astore        5
      39: aload_0
      40: getfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
      43: ifnull        62
      46: aload_0
      47: getfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
      50: aload_2
      51: invokevirtual #80                 // Method org/apache/flink/table/catalog/ObjectIdentifier.toObjectPath:()Lorg/apache/flink/table/catalog/ObjectPath;
      54: invokestatic  #84                 // Method createTableLoader:(Lorg/apache/iceberg/flink/FlinkCatalog;Lorg/apache/flink/table/catalog/ObjectPath;)Lorg/apache/iceberg/flink/TableLoader;
      57: astore        6
      59: goto          78
      62: aload         4
      64: aload_3
      65: aload_2
      66: invokevirtual #94                 // Method org/apache/flink/table/catalog/ObjectIdentifier.getDatabaseName:()Ljava/lang/String;

In this line :

8: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;

It will add the ResolvedCatalogTable explicitly into the iceberg-flink-runtime.jar, that's not what we expected. Because we expect the iceberg-flink-runtime jar could run perfectly in both flink 1.12&flink1.13 clusters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need a orNoop() call here.

This looks good to me for handling the extreme case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so the method exists in both cases, but returns a more specific object in Flink 1.13?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct.


private final FlinkCatalog catalog;

public FlinkDynamicTableFactory() {
Expand All @@ -80,12 +89,16 @@ public FlinkDynamicTableFactory(FlinkCatalog catalog) {
this.catalog = catalog;
}

private static CatalogTable loadCatalogTable(Context context) {
return GET_CATALOG_TABLE.invoke(context);
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
Map<String, String> tableProps = context.getCatalogTable().getOptions();
CatalogTable catalogTable = context.getCatalogTable();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
CatalogTable catalogTable = loadCatalogTable(context);
Map<String, String> tableProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());

TableLoader tableLoader;
if (catalog != null) {
Expand All @@ -101,9 +114,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
Map<String, String> tableProps = context.getCatalogTable().getOptions();
CatalogTable catalogTable = context.getCatalogTable();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
CatalogTable catalogTable = loadCatalogTable(context);
Map<String, String> tableProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());

TableLoader tableLoader;
if (catalog != null) {
Expand Down
2 changes: 1 addition & 1 deletion versions.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
org.slf4j:* = 1.7.25
org.apache.avro:avro = 1.10.1
org.apache.calcite:* = 1.10.0
org.apache.flink:* = 1.13.2
org.apache.flink:* = 1.12.5
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment from @pnowojski says (The comment from this email)

We are trying to provide forward compatibility: applications using @Public APIs
compiled against Flink 1.12.x, should work fine in Flink 1.13.x. We do not
guarantee it the other way: applications compiled against Flink 1.13.x are
not intended to work with Flink 1.12.x.

So we'd better to use flink 1.12.x to build the iceberg-flink-runtime jar to work against flink 1.13.x runtime. Currently, the latest version is 1.12.5

Copy link
Contributor

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx @pnowojski there is a breaking change for SplitEnumerator in 1.13.
https://issues.apache.org/jira/browse/FLINK-22133

This revert to 1.12 breaks the compiling of FLIP-27 Iceberg source dev branch, which has been updated to the 1.13 SplitEnumerator API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu , Currently, the 1.12.5 is used to compile the flink common module (Let's say iceberg-flink) after we got this PR merged: #3364. Although we apache flink introduced this breaking changes, we could still make it work for both flink 1.12.x and flink 1.13.x by separating the difference classes into different modules (Let's say iceberg-flink:iceberg-flink-1.12 and iceberg-flink:iceberg-flink-1.13.

org.apache.hadoop:* = 2.7.3
org.apache.hive:hive-metastore = 2.3.8
org.apache.hive:hive-serde = 2.3.8
Expand Down