Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Nov 5, 2021

This is trying to separate flink 1.13 code from flink 1.12, about the background, pls see: #3434 (comment)

The current difference between flink1.12 and flink1.13:

git diff --no-index  flink/v1.12 flink/v1.13  > ~/diff.patch
diff --git a/flink/v1.12/build.gradle b/flink/v1.13/build.gradle
index 2dd474edf..0e97de314 100644
--- a/flink/v1.12/build.gradle
+++ b/flink/v1.13/build.gradle
@@ -18,17 +18,17 @@
  */
 
 def flinkProjects = [
-    project(':iceberg-flink:iceberg-flink-1.12'),
-    project(':iceberg-flink:iceberg-flink-1.12-runtime')
+  project(':iceberg-flink:iceberg-flink-1.13'),
+  project(':iceberg-flink:iceberg-flink-1.13-runtime')
 ]
 
 configure(flinkProjects) {
   project.ext {
-    flinkVersion = '1.12.5'
+    flinkVersion = '1.13.2'
   }
 }
 
-project(':iceberg-flink:iceberg-flink-1.12') {
+project(':iceberg-flink:iceberg-flink-1.13') {
 
   dependencies {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
@@ -71,7 +71,7 @@ project(':iceberg-flink:iceberg-flink-1.12') {
     testImplementation "org.apache.flink:flink-core:${flinkVersion}"
     testImplementation "org.apache.flink:flink-runtime_2.12:${flinkVersion}"
     testImplementation "org.apache.flink:flink-table-planner-blink_2.12:${flinkVersion}"
-    testImplementation("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
+    testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
       exclude group: 'junit'
     }
     testImplementation("org.apache.flink:flink-test-utils_2.12:${flinkVersion}") {
@@ -118,7 +118,7 @@ project(':iceberg-flink:iceberg-flink-1.12') {
   }
 }
 
-project(':iceberg-flink:iceberg-flink-1.12-runtime') {
+project(':iceberg-flink:iceberg-flink-1.13-runtime') {
   apply plugin: 'com.github.johnrengelman.shadow'
 
   tasks.jar.dependsOn tasks.shadowJar
@@ -138,7 +138,7 @@ project(':iceberg-flink:iceberg-flink-1.12-runtime') {
   }
 
   dependencies {
-    implementation project(':iceberg-flink:iceberg-flink-1.12')
+    implementation project(':iceberg-flink:iceberg-flink-1.13')
     implementation project(':iceberg-aws')
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
@@ -175,4 +175,3 @@ project(':iceberg-flink:iceberg-flink-1.12-runtime') {
     enabled = false
   }
 }
-

@stevenzwu
Copy link
Contributor

there is a whitespace change

  • testImplementation("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
  • testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, if we're heading in the direction of using two separate copies entirely, this makes sense to me.

As a follow up, let's remove the reflection based call that has a note that it's required for 1.13 from the 1.13 (and even 1.12) code base.

Thanks @openinx for taking care of this!

* To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
* {@link #createCatalogLoader(String, Map, Configuration)}.
*/
public class FlinkCatalogFactory implements CatalogFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is trying to separate flink 1.13 code from flink 1.12, about the background, pls see: #3434 (comment)

In addition to what's mentioned about the new sink interface, this also allows us to use the new CatalogFactory API.

When we were upgrading from 1.12 to 1.13, one of the concerns was that CatalogFactory in 1.13 allows users to implement both the older deprecated interface, TableFactory, and the newer interface, Factory.

If I remember correctly, the flink catalog loader in 1.13 will use the deprecated methods from TableFactory if they're implemented.

If we separate, will we choose to implement 1.13's CatalogFactory using the Factory methods instead of continuing to implement the TableFactory interface methods? Perhaps that's what is meant in the comment about issues with the new sink interface etc?

EDIT: It seems like the Factory methods are implemented in FlinkDynamicTableFactory. I seem to recall though that it was this class that needed to also implement the new methods so that internal Flink code would instantiate via the new pathway.

I can pull up the old PR with more details if we'd like.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we could upgrade the implemented API to the newly introduced Factory in flink 1.13. I think we can address this in the next following PR.

Comment on lines +35 to +36
.withDescription("If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking Nit: Consider: When false, the parallelism of Iceberg sources comes from the config. Otherwise, source parallelism is inferred based on the number of splits.

For me, just changing If is false to When false would be more concise. But up to you whether you want to change (or change in a later PR).

ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max")
.intType()
.defaultValue(100)
.withDescription("Sets max infer parallelism for source operator.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consider adding Ignored if $TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM is true. Or if we throw when TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM is set to true, I'd mention that.

But if this is pre-existing, there's no need to change in this PR to keep the changes minimal.

Comment on lines +77 to +80
private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable")
.impl(Context.class, "getCatalogTable")
.orNoop()
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we remove this in a follow-up PR?

@rdblue
Copy link
Contributor

rdblue commented Nov 7, 2021

@kbendick, I think this PR should focus on just the build and file changes. We can update the code and remove dynamic calls later. That keeps this simple so we don't have to find where 1.13 changed in a huge commit.

@rdblue
Copy link
Contributor

rdblue commented Nov 7, 2021

I'm going to merge this so that we can unblock follow-up projects, like FLIP-27 and Flink 1.14. Thanks @openinx!

@rdblue rdblue merged commit 1d81643 into apache:master Nov 7, 2021
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Nov 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants