Skip to content

Conversation

@dawidwys
Copy link
Contributor

@dawidwys dawidwys commented Jul 2, 2021

What is the purpose of the change

This PR cleans up StreamOperator API in regards to the termination phase and introduces a clean finish() method for flushing all records without releasing resources.

The StreamOperator#close method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose method.

Verifying this change

All existing tests pass.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 2, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit c134bf9 (Thu Sep 23 17:55:59 UTC 2021)

✅no warnings

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 2, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@dawidwys
Copy link
Contributor Author

dawidwys commented Jul 2, 2021

@flinkbot run azure

Copy link
Contributor

@gaoyunhaii gaoyunhaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dawidwys very thanks for opening the PR! The PR looks in general very good to me and I only left few comments.

Besides the comments inline,

  • it seems we might also need to modify the document task_lifecycle.md
  • It seems there are also some operators: PythonTimestampsAndWatermarksOperator, CollectSinkOperator, AbstractMapBundleOperator, StreamSortOperator, RowTimeMiniBatchAssginerOperator, TestBoundedMultipleInputOperator and TimestampITCase.CustomOperator, some of their logic in close() should also need to be migrated to be in finish() and some of them would also emit records in close()

*/
@Override
public void dispose() throws Exception {
public void close() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it is definitely not introduced in this PR, I have a bit concern here in that we rely on the subclass to call super.close(), otherwise the statehandler won't get cleaned up, which might further cause resource leak (like the memory occupied by rocksdb). Currently it seems we have operators like GenericWriteAheadSink , TemporalProcessTimeJoinOperator, TemporalProcessTimeJoinOperator and WatermarkAssignerOperator that indeed do not call super.close(). Perhaps we could introduce a final method closeAndCleanupState that get called by the framework, and call close() in that method~?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's definitely a good observation. I am a bit hesitant to introducing the change as it implies changing again the API of the StreamOperator. Underneath the framework calls StreamOperator#close. If we wanted to introduce closeAndCleanupState the way you described it that it'd call close(), we'd need to do it in the StreamOperator. If we wanted to do it the other way around and make AbstractStreamOperator#close final and call abstract closeAndCleanupState or similar we'd need to change all operators and most probably all user's operators as it's virtually impossible to implement an operator without extending one of the AbstractStreamOperatorV2.

How about we create a JIRA ticket for that and we try to fix it once we work making the operator API "more public". There is a desire to expose e.g. the MailboxProcessor and similar features in a better thought through manner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very thanks for the explanation! It indeed makes a lot of sense to me and I also agree with the plan.

And for now perhaps we first complement super.close() for those three operators? Since some of them indeed used states~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. Missed that bit 🤦 Yes, will update those three operators.

*/
@Override
public void dispose() throws Exception {
public void close() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here there might be a similar concern as the AbstractStreamOperator.

dawidwys added 8 commits July 5, 2021 12:34
…close and finish

This commit cleans up StreamOperator API in regards to the termination phase and introduces a clean finish() method for flushing all records without releasing resources.

The StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method.
@gaoyunhaii
Copy link
Contributor

Very thanks @dawidwys for the updates! The PR LGTM now~

@dawidwys dawidwys closed this in 541f430 Jul 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants