-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Support streaming reader. #1793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
| ScanContext options) { | ||
| Preconditions.checkArgument( | ||
| options.snapshotId() == null && options.asOfTimestamp() == null, | ||
| "The streaming reader does not support using snapshot-id or as-of-timestamp to select the table snapshot."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to include what _should _ be used to configure the stream. Looks like it should be startSnapshotId with no endSnapshotId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user did not provide a start-snapshot-id, then this function will use the snapshot id (it's a DUMMY_START_SNAPSHOT_ID in code) before current snapshot as the start-snapshot-id. So we are allowed to pass null start-snapshot-id but are not allowed to pass non-null end-snapshot-id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably also check this
Preconditions.checkArgument(scanContext.asOfTimestamp() == null,
"Can't set asOfTimestamp in ScanContext for continuous enumerator");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current condition ctxt.snapshotId() == null && ctxt.asOfTimestamp() == null will require both the snapshotId and asOfTimestamp to be null. So we don't have to do the extra scanContext.asOfTimestamp() == null check now. But I prefer to use the separate Preconditions.checkArgument for snapshotId and asOfTimestamp, so that people could distingush the cause quickly.
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Show resolved
Hide resolved
|
Looking pretty good to me, at least for the Iceberg concerns. It would be great to have @JingsongLi and @stevenzwu review as well. |
|
|
||
| import static org.apache.iceberg.types.Types.NestedField.required; | ||
|
|
||
| public class TestStreamScanSql extends AbstractTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good to make this unit tests to extend FlinkTestBase class.
|
@rdblue @stevenzwu Would you mind to take another look ? I've updated this patch, Thanks. |
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Outdated
Show resolved
Hide resolved
| public void initializeState(StateInitializationContext context) throws Exception { | ||
| super.initializeState(context); | ||
|
|
||
| checkpointState = context.getOperatorStateStore().getListState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened issue-1698 a while back regarding a more stable serializer (than Java serializable) for Flink checkpointing. While Java serialization works well for batch jobs, we need a more stable serialization to support schema evolution for long-running stream jobs. We need sth like DeltaManifestsSerializer and ManifestFiles.encode for CombinedScanTask.
It doesn't have to be done in the initial version. So I don't think it is a blocker for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry , I missed this issue before. Let me take a look .
| super.snapshotState(context); | ||
|
|
||
| checkpointState.clear(); | ||
| for (FlinkInputSplit split : splits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use addAll instead, which translates to one merge call in rocksdb?
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Show resolved
Hide resolved
|
@rdblue , After talked with some flink users from Asia. They have strong demand for this feature because it's necessary for building a classic data pipeline : (kafka) -> (flink) -> (iceberg) -> (flink) -> (iceberg) -> ... . I think it's good to merge this before iceberg 0.11.0 release, would you like to take another look when you have time ? Thanks. |
|
@openinx, I will take a look. Sorry I didn't get a chance to finish the review I started a few days ago. I agree that this is important to get in and will make some time for it this week. |
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
Show resolved
Hide resolved
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(StreamingReaderOperator.class); | ||
|
|
||
| private final MailboxExecutor executor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that using MailboxExecutor is a good idea. If I understand correctly, the mailbox queue for the executor cannot be used to hold state because that state would not be checkpointed (if, for example, it held the splits waiting to be processed). The result is that this operator has an elaborate way to keep state in a queue and continuously submit stateless mailbox tasks to keep running. But a simpler option is to create a thread directly that polls the splits queue and keeps running endlessly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot use a newly created thread to process the split asynchronously because it will break the checkpoint mechanism which depends on mail-box model in flink runtime, Assume the asynchronously thread keep processing the records of the newly split (which is polled from splits queue), now the flink checkpoint barrier come, How should we coordinate the checkpoint barrier and the processing split so that the barrier could effect ( trigger to persist all states of this operator ) once the current split is finished ? Will we go back to use the checkpoint lock to synchronize between checkpoint barrier event and the newly introduced async thread ?
In the current mail-box model, both flink's internal controlling action and user-provided events will be processed in the same thread ( Which is StreamTask in flink runtime). The StreamTask will run in a endless loop to process the event from mail box queue ( see the code here). For each loop, it will:
Step.1. Try to take mails which has been enqueued in mail-box queue ( code), those mails are flink's controlling actions, such as action to trigger flink's checkpoint , action to notify checkpoint complete etc. If there's no mail enqueued in mail-box queue, then the processMail will do nothing.
Step.2 Then read one completed record from the flink's network queue and invoke the processElement(record) , that's the process about incremental compute. Take the sum as an example, once a record come, the processElement will increment its counter.
So all the events (Regardless of flink's control events or user's events ) are being processed in the same thread StreamTask. In our flink streaming reader, we only need to control that there's only one split that is being processing, then the newly triggered checkpoint action could just enqueue the mail-box queue. Once the processing splits is finished, the StreamTask will execute the checkpoint action in step.1 . We don't need any extra checkpoint lock or extra synchronization.
That's why we use Mailbox to enqueue the action ( processSplits )to process whole elements from a given split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think I understand now. The missing piece of information was that the MailboxExecutor where these tasks are added is the same mailbox executor that is running the operator. Is that correct?
I thought that it was a separate mailbox executor and thread, which wouldn't make much sense. I also noted the problem with checkpoints that you pointed out, but thought that it must be handled within the mailbox.
If it is the same executor, then it makes sense because the same thread processing incoming checkpoint events will also run processSplits. I think I see what you're doing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the same mailbox executor that is running the operator, then it would be helpful to add a comment here that says it for future readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The missing piece of information was that the MailboxExecutor where these tasks are added is the same mailbox executor that is running the operator. Is that correct?
Yes, you're right. Let me add few comment to explain this more cleaner in code.
| } | ||
|
|
||
| private void enqueueProcessSplits() { | ||
| if (currentSplitState == SplitState.IDLE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Should this only queue a task if splits is non-empty?
If splits is currently empty and this is called from processSplits, then a new task will be queued. That task will process a split if one is waiting in the mailbox queue to be processed, but often it will do nothing and set the split state back to IDLE.
I'd probably only add a new task if splits is non-empty, or update processSplits to always submit a new task and not set IDLE in the finally block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I like this improvement. Will update it in the next patch.
| // currentSplitState will be marked as RUNNING). After finished all records processing, the currentSplitState will | ||
| // be marked as IDLE again. | ||
| // NOTICE: all the reader and writer of this variable are the same thread, so we don't need extra synchronization. | ||
| private transient SplitState currentSplitState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would rephrase the comment here. I didn't really understand it when I read it the first time. Here's what I would suggest, assuming that I understand what's happening here:
Splits are read by the same thread that calls
processElement. Each read task is submitted to that thread by adding them to theexecutor. This state is used to ensure that only one read task is in that queue at a time, so that read tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set toRUNNING. When there are no more files to read, this will be set toIDLE.
|
@openinx, I took another look at this after understanding that the mailbox executor is shared and everything looks good to me. I made a few minor comments you could fix, but assuming that my interpretation of your last comment is correct I think this is ready to commit. Once tests are passing, of course. |
|
All checks passed, Just merged this PR. Thanks all for reviewing. |
This patch port the flink streaming reader from here. https://github.com/generic-datalake/iceberg-poc/pull/3