Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Nov 1, 2021

Addressing the issue: #3433

Flink1.14 changes.

There are following sub-issues that we need to address:

  1. The apache flink has removed the blink module and made it as the default planner, so we don't need to add the org.apache.flink:flink-table-planner-blink_2.12 explicitly in the dependencies.
  2. The flink 1.14 removed the StreamOperator#dispose method, instead it use finish to replace the old close and close to replace the old dispose. See this PR.
  3. Flink 1.14 moved the MailBoxExecutor from org.apache.flink.streaming.api.operators.MailboxExecutor to org.apache.flink.api.common.operators.MailboxExecutor, see: https://issues.apache.org/jira/browse/FLINK-23621
  4. Dropped the StreamStatusMaintainer from StreamSourceContext#getSourceContext method, pls see: FLINK-18934 and the changes
  5. the flink dependency flink-runtime_${scala.binary.version} has removed the scala suffix in flink 1.14, now the dependency is named flink-runtime because all the scala classes has been moved into a separate dependency named flink-scala. Pls see the PR

Considering the sub-issue.3 and sub-issue.4, the iceberg's StreamingReaderOperator has to refactor so that it could works fine with flink 1.14. But it will cause all classes in iceberg-flink that depend on StreamingReaderOperator to be moved to three modules:

  • iceberg-flink-1.12
  • iceberg-flink-1.13
  • and iceberg-flink1.14.

Their corresponding unit tests must also be moved to three versioned module. This seems to lead to more complicated maintenance work, because we always have to distinguish which common classes should be kept in iceberg-flink and which incompatible classes should be moved to the three versioned module. Considering the large changes in 1.14, I decided to copy all the source codes into a separate module for maintenance.

As we iceberg-flink-1.14 modules don't depend on iceberg-flink modules, I think it's better to move the iceberg-fink module's source code to the iceberg-flink-1.12 module, and then make the iceberg-flink-1.13 sharing the source code of iceberg-flink-1.12.

Iceberg diff between flink 1.13 module and flink 1.14 module.

git diff --no-index flink/v1.13 flink/v1.14

The diff.patch file is:

diff --git a/flink/v1.13/build.gradle b/flink/v1.14/build.gradle
index 0e97de314..2b2df13a8 100644
--- a/flink/v1.13/build.gradle
+++ b/flink/v1.14/build.gradle
@@ -18,17 +18,17 @@
  */
 
 def flinkProjects = [
-  project(':iceberg-flink:iceberg-flink-1.13'),
-  project(':iceberg-flink:iceberg-flink-1.13-runtime')
+  project(':iceberg-flink:iceberg-flink-1.14'),
+  project(':iceberg-flink:iceberg-flink-1.14-runtime')
 ]
 
 configure(flinkProjects) {
   project.ext {
-    flinkVersion = '1.13.2'
+    flinkVersion = '1.14.0'
   }
 }
 
-project(':iceberg-flink:iceberg-flink-1.13') {
+project(':iceberg-flink:iceberg-flink-1.14') {
 
   dependencies {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
@@ -43,7 +43,6 @@ project(':iceberg-flink:iceberg-flink-1.13') {
     compileOnly "org.apache.flink:flink-streaming-java_2.12:${flinkVersion}"
     compileOnly "org.apache.flink:flink-streaming-java_2.12:${flinkVersion}:tests"
     compileOnly "org.apache.flink:flink-table-api-java-bridge_2.12:${flinkVersion}"
-    compileOnly "org.apache.flink:flink-table-planner-blink_2.12:${flinkVersion}"
     compileOnly "org.apache.flink:flink-table-planner_2.12:${flinkVersion}"
     compileOnly "org.apache.hadoop:hadoop-hdfs"
     compileOnly "org.apache.hadoop:hadoop-common"
@@ -69,8 +68,7 @@ project(':iceberg-flink:iceberg-flink-1.13') {
     }
 
     testImplementation "org.apache.flink:flink-core:${flinkVersion}"
-    testImplementation "org.apache.flink:flink-runtime_2.12:${flinkVersion}"
-    testImplementation "org.apache.flink:flink-table-planner-blink_2.12:${flinkVersion}"
+    testImplementation "org.apache.flink:flink-runtime:${flinkVersion}"
     testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
       exclude group: 'junit'
     }
@@ -118,7 +116,7 @@ project(':iceberg-flink:iceberg-flink-1.13') {
   }
 }
 
-project(':iceberg-flink:iceberg-flink-1.13-runtime') {
+project(':iceberg-flink:iceberg-flink-1.14-runtime') {
   apply plugin: 'com.github.johnrengelman.shadow'
 
   tasks.jar.dependsOn tasks.shadowJar
@@ -138,7 +136,7 @@ project(':iceberg-flink:iceberg-flink-1.13-runtime') {
   }
 
   dependencies {
-    implementation project(':iceberg-flink:iceberg-flink-1.13')
+    implementation project(':iceberg-flink:iceberg-flink-1.14')
     implementation project(':iceberg-aws')
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 010df8cf5..8f8bdad6c 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -345,7 +345,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   }
 
   @Override
-  public void dispose() throws Exception {
+  public void close() throws Exception {
     if (tableLoader != null) {
       tableLoader.close();
     }
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
index 6d12310dd..cc8e6ce82 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
@@ -73,8 +73,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
   }
 
   @Override
-  public void dispose() throws Exception {
-    super.dispose();
+  public void close() throws Exception {
+    super.close();
     if (writer != null) {
       writer.close();
       writer = null;
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
index 235b17332..c8efc2b59 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
 
 import java.io.IOException;
 import java.util.Queue;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.runtime.state.JavaSerializer;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -110,10 +110,10 @@ public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
         getOperatorConfig().getTimeCharacteristic(),
         getProcessingTimeService(),
         new Object(), // no actual locking needed
-        getContainingTask().getStreamStatusMaintainer(),
         output,
         getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
-        -1);
+        -1,
+        true);
 
     // Enqueue to process the recovered input splits.
     enqueueProcessSplits();
@@ -169,8 +169,8 @@ public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
   }
 
   @Override
-  public void dispose() throws Exception {
-    super.dispose();
+  public void close() throws Exception {
+    super.close();
 
     if (format != null) {
       format.close();
@@ -182,8 +182,8 @@ public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
   }
 
   @Override
-  public void close() throws Exception {
-    super.close();
+  public void finish() throws Exception {
+    super.finish();
     output.close();
     if (sourceContext != null) {
       sourceContext.emitWatermark(Watermark.MAX_WATERMARK);

@openinx openinx marked this pull request as draft November 1, 2021 12:56
@rdblue
Copy link
Contributor

rdblue commented Nov 1, 2021

@openinx, it looks like the tests are passing for this in the latest commit. Did you solve the issues from the description? Just let us know when you're ready for a review.

@openinx
Copy link
Member Author

openinx commented Nov 2, 2021

Did you solve the issues from the description?

I'm still working on this to resolve the incompatibility issues between flink 1.13 and flink 1.14. The build seems does not run all the unit tests on top of flink 1.14, I left another comment in this issue. Will publish another PR to address the issues about unit tests cross flink versions.

If I think this PR is ready, I will ping you, @stevenzwu and @kbendick for reviewing. Thanks.

@github-actions github-actions bot added the INFRA label Nov 3, 2021
@openinx openinx marked this pull request as ready for review November 3, 2021 10:22
@stevenzwu
Copy link
Contributor

stevenzwu commented Nov 4, 2021

I am fine with copying all code to version specific module so that we don't have to decide which classes kept in common module vs version modules. This would be consistent with Spark version handling too.

I would ask we also duplicate the code in 1.12 and 1.13 too. FLIP-27 Flink source will have different code in 1.13. We may even skip 1.12 for FLIP-27 source. Then I can start with the FLIP-27 source in 1.13 only. After it is done, we can copy code to 1.14 module.

@rdblue
Copy link
Contributor

rdblue commented Nov 4, 2021

I don't have a preference about this for Flink. I think that copying will work well for Spark, but I think it's up to the people working on Flink whether to use the same strategy.

@openinx
Copy link
Member Author

openinx commented Nov 5, 2021

I would ask we also duplicate the code in 1.12 and 1.13 too. FLIP-27 Flink source will have different code in 1.13. We may even skip 1.12 for FLIP-27 source. Then I can start with the FLIP-27 source in 1.13 only.

The FLIP-27 API changes between 1.12 and 1.13 is caused by this FLINK-22133, the flink changed the basic snapshotState API, that means when @stevenzwu develop the flip-27 source reader , he will encounter the same problem as I have encountered in this issue. If we plan to keep the flink common codes in iceberg-flink, then we need to pick up all the upper layer classes which is built on top of snapshotState API to the :iceberg-flink:iceberg-flink-1.12 and :iceberg-flink:iceberg-flink-1.13. I have tried to do the pick up when supporting flink 1.14, this is a very tedious and error-prone work. so finally, I gave up the picking up work and turned to clone a same code base for flink 1.14.

As the increase of newly introduced features for flink+iceberg integration work, I think we will encounter more and more under layer API changes , that means we will need to copy more and more classes from the common flink module to the specific versioned module. Rather than splitting the work among different Flink contributors (It is very easy to miss classes during development and review, resulting in missing tests and features), I will prefer to clone the separate code base for each flink specified version.

In last week, I prefer to share the common code for flink 1.12 & flink1.13. But I changed my mind when I port the integration work for flink1.14 and realize that @stevenzwu 's flip-27 is encountering the similar issue.

@openinx
Copy link
Member Author

openinx commented Nov 8, 2021

Since we've got this #3476 merged, let's update this PR.

@openinx
Copy link
Member Author

openinx commented Nov 9, 2021

Ping @rdblue , any other concern ?

@SteNicholas
Copy link
Member

@openinx , what about the integration with the new Sink interface? The FLIP-27 is implemented by @stevenzwu , and the corresponding FLIP-143 support is necessary.

@openinx
Copy link
Member Author

openinx commented Nov 17, 2021

@openinx , what about the integration with the new Sink interface? The FLIP-27 is implemented by @stevenzwu , and the corresponding FLIP-143 support is necessary.

I think the flip-27 sink integration work will be merged in flink 1.13 module firstly, and then port other related flink versions. That is a bigger feature and we need to make sure it works stable for a specific flink version.

@stevenzwu
Copy link
Contributor

@openinx I think you meant flip-17 source integration will be merged to flink 1.13 module first.

@SteNicholas was asking about the new flip-143 unified sink interface, which I don't think anyone has picked that one up yet. We probably should create a separate github issue to track that work and discussions

@rdblue
Copy link
Contributor

rdblue commented Dec 6, 2021

I'll take a look today. Thanks, @openinx! And thanks to everyoen for reviewing!

@openinx
Copy link
Member Author

openinx commented Dec 7, 2021

I think this PR will need an update now because I think there are some newly introduced changes which were added into flin 1.13.

@rdblue rdblue merged commit 2d4b0dd into apache:master Dec 7, 2021
@rdblue
Copy link
Contributor

rdblue commented Dec 7, 2021

Thanks, @openinx!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants