Skip to content

Conversation

@artembilan
Copy link
Member

The Flux.takeWhile() only works if there is data in the Publisher
to consume.
We still need to be able to cancel subscription and stop producing even if
there is no data at the moment.

  • Change takeWhile() to the doOnSubscribe() and store subscription
    in the volatile property of the MessageProducerSupport
  • Cancel such a subscription in the doStop() impl
  • Propagate doStop() to super in the ZeroMqMessageProducer
    which is only one reactive channel adapter overriding doStop()
  • Verify in the ReactiveMessageProducerTests that subscription is cancelled
    for delayed data in the Publisher

Cherry-pick to 5.5.x

The `Flux.takeWhile()` only works if there is data in the `Publisher`
to consume.
We still need to be able to cancel subscription and stop producing even if
there is no data at the moment.

* Change `takeWhile()` to the `doOnSubscribe()` and store `subscription`
in the `volatile` property of the `MessageProducerSupport`
* Cancel such a subscription in the `doStop()` impl
* Propagate `doStop()` to super in the `ZeroMqMessageProducer`
which is only one reactive channel adapter overriding `doStop()`
* Verify in the `ReactiveMessageProducerTests` that subscription is cancelled
for delayed data in the `Publisher`

**Cherry-pick to `5.5.x`**
*/
@Override
protected void doStop() {
Subscription subscriptionToCancel = this.subscription;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized ? Or is cancel idempotent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stop() is synchronized via lifecycleLock in the AbstractEndpoint.
I don't believe anyone is going to call this protected doStop() directly.

@garyrussell garyrussell merged commit e434a46 into spring-projects:main Mar 16, 2022
@garyrussell
Copy link
Contributor

...and cherry-picked.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants