Skip to content

Conversation

@stevenzwu
Copy link
Contributor

No description provided.

@github-actions github-actions bot added the flink label Jan 10, 2022
@stevenzwu
Copy link
Contributor Author

Here is the output of git diff --no-index flink/v1.13/flink/src flink/v1.14/flink/src
https://gist.github.com/stevenzwu/b6f12a56a8a7cb40b27969fda2b4f03c

I confirmed that there are no files relevant to the files we touched in PR 3501

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Let's just move this work to 1.14 while it's on going so we don't make conflicts.

Thanks @stevenzwu!

@kbendick
Copy link
Contributor

kbendick commented Jan 10, 2022

The diff seems to be mostly associated with things that haven't been ported to 1.14 (but aren't associated to PR 3501).

@stevenzwu
Copy link
Contributor Author

Here is the smaller diff if we are only looking at flink/source dir

git diff --no-index flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source

diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
index 235b17332..c8efc2b59 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;

 import java.io.IOException;
 import java.util.Queue;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.runtime.state.JavaSerializer;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -110,10 +110,10 @@ public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
         getOperatorConfig().getTimeCharacteristic(),
         getProcessingTimeService(),
         new Object(), // no actual locking needed
-        getContainingTask().getStreamStatusMaintainer(),
         output,
         getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
-        -1);
+        -1,
+        true);

     // Enqueue to process the recovered input splits.
     enqueueProcessSplits();
@@ -169,8 +169,8 @@ public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
   }

   @Override
-  public void dispose() throws Exception {
-    super.dispose();
+  public void close() throws Exception {
+    super.close();

     if (format != null) {
       format.close();
@@ -182,8 +182,8 @@ public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
   }

   @Override
-  public void close() throws Exception {
-    super.close();
+  public void finish() throws Exception {
+    super.finish();
     output.close();
     if (sourceContext != null) {
       sourceContext.emitWatermark(Watermark.MAX_WATERMARK);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants