Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Oct 22, 2021

This PR is trying to address the #3187 by using the approach from this comment: #3187 (comment)

In this way, the iceberg-flink-runtime jar compiled with flink 1.12.x (or flink 1.13.x) could work fine with flink 1.13.x (or flink 1.12.x). I verified this feature by hand under my localhost, everything works fine.

org.apache.avro:avro = 1.10.1
org.apache.calcite:* = 1.10.0
org.apache.flink:* = 1.13.2
org.apache.flink:* = 1.12.5
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment from @pnowojski says (The comment from this email)

We are trying to provide forward compatibility: applications using @Public APIs
compiled against Flink 1.12.x, should work fine in Flink 1.13.x. We do not
guarantee it the other way: applications compiled against Flink 1.13.x are
not intended to work with Flink 1.12.x.

So we'd better to use flink 1.12.x to build the iceberg-flink-runtime jar to work against flink 1.13.x runtime. Currently, the latest version is 1.12.5

Copy link
Contributor

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx @pnowojski there is a breaking change for SplitEnumerator in 1.13.
https://issues.apache.org/jira/browse/FLINK-22133

This revert to 1.12 breaks the compiling of FLIP-27 Iceberg source dev branch, which has been updated to the 1.13 SplitEnumerator API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu , Currently, the 1.12.5 is used to compile the flink common module (Let's say iceberg-flink) after we got this PR merged: #3364. Although we apache flink introduced this breaking changes, we could still make it work for both flink 1.12.x and flink 1.13.x by separating the difference classes into different modules (Let's say iceberg-flink:iceberg-flink-1.12 and iceberg-flink:iceberg-flink-1.13.

// ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose.
private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable")
.impl(Context.class, "getCatalogTable")
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this fail if there is no getCatalogTable method? And if the method exists then it wouldn't need to be called dynamically. You may need a orNoop() call here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the method exists then it wouldn't need to be called dynamically

The reason why I use the dynamic approach is to avoid add the flink 1.13's ResolvedCatalogTable into the runtime jar. For example, if we decode the FlinkDynmaicTableFactory.class from iceberg-flink-runtime.jar (which is compiled by flink 1.13.2) by using javap -c ./org/apache/iceberg/flink/FlinkDynamicTableFactory.class , it will has the following JVM instructions:

Compiled from "FlinkDynamicTableFactory.java"
public class org.apache.iceberg.flink.FlinkDynamicTableFactory implements org.apache.flink.table.factories.DynamicTableSinkFactory,org.apache.flink.table.factories.DynamicTableSourceFactory {
  static final java.lang.String FACTORY_IDENTIFIER;

  public org.apache.iceberg.flink.FlinkDynamicTableFactory();
    Code:
       0: aload_0
       1: invokespecial #43                 // Method java/lang/Object."<init>":()V
       4: aload_0
       5: aconst_null
       6: putfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
       9: return

  public org.apache.iceberg.flink.FlinkDynamicTableFactory(org.apache.iceberg.flink.FlinkCatalog);
    Code:
       0: aload_0
       1: invokespecial #43                 // Method java/lang/Object."<init>":()V
       4: aload_0
       5: aload_1
       6: putfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
       9: return

  public org.apache.flink.table.connector.source.DynamicTableSource createDynamicTableSource(org.apache.flink.table.factories.DynamicTableFactory$Context);
    Code:
       0: aload_1
       1: invokeinterface #54,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getObjectIdentifier:()Lorg/apache/flink/table/catalog/ObjectIdentifier;
       6: astore_2
       7: aload_1
       8: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
      13: invokevirtual #64                 // Method org/apache/flink/table/catalog/ResolvedCatalogTable.getOptions:()Ljava/util/Map;
      16: astore_3
      17: aload_1
      18: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
      23: astore        4
      25: aload_1
      26: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
      31: invokevirtual #68                 // Method org/apache/flink/table/catalog/ResolvedCatalogTable.getSchema:()Lorg/apache/flink/table/api/TableSchema;
      34: invokestatic  #74                 // Method org/apache/flink/table/utils/TableSchemaUtils.getPhysicalSchema:(Lorg/apache/flink/table/api/TableSchema;)Lorg/apache/flink/table/api/TableSchema;
      37: astore        5
      39: aload_0
      40: getfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
      43: ifnull        62
      46: aload_0
      47: getfield      #45                 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
      50: aload_2
      51: invokevirtual #80                 // Method org/apache/flink/table/catalog/ObjectIdentifier.toObjectPath:()Lorg/apache/flink/table/catalog/ObjectPath;
      54: invokestatic  #84                 // Method createTableLoader:(Lorg/apache/iceberg/flink/FlinkCatalog;Lorg/apache/flink/table/catalog/ObjectPath;)Lorg/apache/iceberg/flink/TableLoader;
      57: astore        6
      59: goto          78
      62: aload         4
      64: aload_3
      65: aload_2
      66: invokevirtual #94                 // Method org/apache/flink/table/catalog/ObjectIdentifier.getDatabaseName:()Ljava/lang/String;

In this line :

8: invokeinterface #58,  1           // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;

It will add the ResolvedCatalogTable explicitly into the iceberg-flink-runtime.jar, that's not what we expected. Because we expect the iceberg-flink-runtime jar could run perfectly in both flink 1.12&flink1.13 clusters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need a orNoop() call here.

This looks good to me for handling the extreme case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so the method exists in both cases, but returns a more specific object in Flink 1.13?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct.

@rdblue
Copy link
Contributor

rdblue commented Oct 22, 2021

@openinx, what do you think about taking an approach similar to what we've done in Spark and building a module for each supported Flink version? That would help us add support for multiple versions of Scala as well.

I don't think that should block this PR, but I'd like to have a good plan for supporting multiple Flink versions in parallel without having issues that are reported by users.

@openinx
Copy link
Member Author

openinx commented Oct 25, 2021

what do you think about taking an approach similar to what we've done in Spark and building a module for each supported Flink version?

That's exactly what I'm planning to push forward in the next step. Except to separate multiple flink versions in different module (if necessary), other things are required to address:

  1. Make the flink+iceberg build against scala 2.11 & 2.12. Currently we use scala 2.12 to build flink runtime jar, but actually the official apache flink are now releasing both scala 2.11 & 2.12, maybe it's good to add a scala 2.11 release for iceberg-flink-runtime.jar because most flink users are still using the scala 2.11 by default. Anyway, we will need to add the travis CI case to guarantee the flink release bundled with scala 2.11 works fine with apache iceberg.
  2. Make both flink 1.12 & flink 1.13 works fine agains the iceberg in travis CI environment.

@openinx
Copy link
Member Author

openinx commented Oct 26, 2021

@rdblue , any other concern for this PR ?

@rdblue rdblue merged commit e4d841b into apache:master Oct 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flink: Get NoSuchMethodError when submit flink sql by flink 1.12.x since iceberg upgrade flink dependence to 1.13.2

3 participants