Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Observables to cancel Observables. (Alternative to Abort Signal) #5683

Closed
benlesh opened this issue Aug 27, 2020 · 9 comments
Closed

Comments

@benlesh
Copy link
Member

benlesh commented Aug 27, 2020

Okay, this one might sound a little strange, but I was talking with @benjamingr, and he reminded me of past discussions about just using Observable as a signal by talking about what a shame it was we had AbortSignal natively, but no Observable. Like "what if we wanted to signal something other than cancellation?".

The idea

To that end, I'd like to re-explore this idea. Currently, we have the idea of takeUntil as an operator, which is effectively cancellation of the source observable, and notification of completion downstream. But what if we allowed that cancellation notification at the point of subscription, and we also provided an observable of cancellations to the observable initializer:

These two things would be very similar, only one would't cause the completion notification:

source$.pipe(
  takeUntil(notifier$)
).subscribe({ 
  next: console.log,
  complete: () => console.log('complete')
});

source$.subscribe({ 
  next: console.log,
  complete: () => console.log('complete') // never called because it was cancelled.
}, notifier$);

Let's take the example from #5649:

    let sideEffects = 0;

    const firehose = new Observable((subscriber) => {
      let n = 0;
      while (!subscriber.closed && n < 1000) {
        sideEffects++;
        subscriber.next(n++);
      }
    });

    // Let's say this observable here is the result of some sort of operator that someone made.
    // If this is hard to understand, I guess a challenge would be to have someone "roll their own"
    // concatMap operator without using our Subscribers and having working knowledge of
    // our implementation details.
    const flatteningSyncOuter = new Observable((subscriber, teardown$) => {

      // Notice how now we are threading the signal through. Pretty straight forward.
      firehose.subscribe({
        next: value => subscriber.next(value)
      },  teardown$);

      firehose.subscribe({
        next: value => subscriber.next(value),
        complete: () => subscriber.complete()
      },  teardown$);
    });

    const unsubController = new Subject();
    const teardown$ = unsubController.asObservable(); // this is really optional

    flatteningSyncOuter.pipe(
      tap(() => {
          // Give it a 1 in 10 chance of unsubbing externally.
          if (Math.random() > 0.9) unsubController.next();
      }),
      take(300)
    ).subscribe(undefined, teardown$);

    expect(sideEffects).to.equal(3);

As you can see, the API difference is minimal.

Furthermore, we could easily provide something to get an Observable from an AbortSignal, and vice versa. AND we could even allow for users to pass AbortSignal there, if it's available, and we could do the conversion for them under the hood.

I think this solves a few problems around AbortSignal, in particular, if someone wanted to use the provided teardown$ to register teardowns, it's MUCH more ergonomic than AbortSignal:

AbortSignal

const source1$ = new Observable((subscriber, signal) => {
   const resource = new ExpensiveResource();
   signal.addEventListener('abort', () => resource.close(), { once: true });
});

Observable

const source1$ = new Observable((subscriber, teardown$) => {
   const resource = new ExpensiveResource();
   teardown.subscribe(() => resource.close());
});

It's a lot cleaner. I think the confusing bit above might be that users will wonder if they need to unsubscribe from the teardown they've subscribed to. But that's unnecessary, because teardown must happen eventually. Unless the subscription runs forever.

Pros

  • Solves all of the same problems that AbortSignal does
  • We an still provide compatibility with AbortSignal
  • Teardown registration is more ergonomic (if users want to do it manually)
  • We wouldn't need to ship any code for Subscription eventually (and we'll always need to ship observables, clearly)

Cons

  • Teardown registration has the confusing element of "should I unsubscribe from this too??".
  • We end up needing to ship the code for this to the browser in the short term
  • Less convenient for sharing AbortSignals with other APIs that accept them.

Other thoughts

If Promise was designed better, and allowed sync emission, it would have been better for this particular use case. Alas. We have what we have.

@benlesh benlesh added AGENDA ITEM Flagged for discussion at core team meetings type: discussion labels Aug 27, 2020
@benjamingr
Copy link
Contributor

Hmm, this is interesting - some thoughts on this:

  • I think the important bit (to me) is being able to pass a single token to multiple .subscribes (alongside other non Rx APIs).
    • I think the most interesting (to me) use case to me is to let .subscribe take a signal (optionally).
    • This would simplify a bunch of my code that deals with both Rx and non Rx bits.
  • I think for the observable constructor - returning a disposer is fine and probably as short as it gets.
  • I don't particularly understand the merit of tokens as another argument (like you said - you just want to signal it happened)
  • I don't think I understand the use case very well.

Tangent:

If Promise was designed better, and allowed sync emission, it would have been better for this particular use case. Alas. We have what we have.

For what it's worth when promises needed to do this we used callbacks which was both the shortest and had no error handling / timing guarantees that had to be upheld.

In your case that would be:

const source1$ = new Observable((subscriber, teardown) => {
   const resource = new ExpensiveResource();
   teardown(() => resource.close());
});
// which is almost as short, and provides no benefit over the 'current'
const source1$ = new Observable((subscriber, teardown) => {
   const resource = new ExpensiveResource();
   return () => resource.close();
});

We mainly did this because cancellation (in bluebird) was an afterthought of the promise constructor which already had other guarantees and constraints.

@benjamingr
Copy link
Contributor

Also:

he reminded me of past discussions about just using Observable as a signal by talking about what a shame it was we had AbortSignal natively, but no Observable

I would very much be interested in exploring Observables in the platform again (as a much simpler primitive). I had to learn a lot just to get a (pretty basic) understanding of EventTarget and it's a very complicated primitive in some surprising ways.

@cartant
Copy link
Collaborator

cartant commented Aug 28, 2020

Interesting idea. I like it.

One observation is that using a Subject as the notifier is a little different to using an AbortSignal. To behave more similarly to AbortSignal, it would need to be a ReplaySubject(1). Passing a Subject kinda assumes that all use of the notifier within observable's implementation is going to involve synchronous calls to subscribe - i.e. no possibility of a missed notification.

What would the implementation of Observable#subscribe look like - given that (as a garden-variety observable) the notifier won't have an equivalent of Subscription#closed or AbortSignal#aborted? How would it exit early upon receipt of an already-signalled notifier?

@benlesh
Copy link
Member Author

benlesh commented Aug 29, 2020

I guess a BehaviorSubject might model that better. 🤔

@cedmandocdoc
Copy link

Exciting! I am exploring this idea too a couple of months ago (check out the article), my thoughts:

The foundation that we currently have is that Observable has two fundamental properties: data propagation and cancelation. It is evident that data propagation is fundamental since it is what makes an Observable to be observable, it is its way to express something to the subscriber and we achieve it using a callback subscriber.next. The other property is cancelation, this may seem fundamental at first but it occurred to me that cancelation may not and it may be just an emergent property. Exploring this I've found that, cancelation is a demand from the subscriber, it is purely based on the application logic when to cancel. So it is not the cancelation is fundamental but rather the ability for an Observable to accept a demand or a request from the outside and it makes sense because, in a larger scale, a subscriber could demand other than cancelation and that could emerged somewhere (depends on the application requirement). This ability of Observable to react base on the subscriber request arrives at me to the conclusion that cancelation is indeed an Observable because sending a request is very similar to what an Observable is (a data propagation).

@benlesh will the idea of accepting another Observable inside the producer function as a way for cancelation is open for the possibility to be more generic like what you have said:

Like "what if we wanted to signal something other than cancellation?".

If so then the API that I am imagining:

Observable.CANCEL = Symbol('CANCEL');

const obs = new Observable((subscriber, talkback) => {
  const id = setInterval(() => subscriber.next('hey'), 500);

  talkback.pipe(
    filter(token => token === Observable.CANCEL),
    takeWhile(token => token !== Observable.CANCEL, true)
  ).subscribe(() => {
    clearInterval(id);
    subscriber.complete();
  });
})

const cancel = fromEvent(document.body, "click").pipe(mapTo(Observable.CANCEL))

obs.subscribe(data => console.log(data), cancel);

Then maybe we can extends the Observable to have a boilerplate encapsulation for automatic cancelation like:

// CancelObservable extends Observable
new CancelObservable((subscriber, cancel) => {
  cancel.subscribe(); // filtered subscriber request down to Observable.CANCEL only
})

I hope that RxJS team will explore this idea, it is very exciting! It will open up many opportunities.

@benjamingr
Copy link
Contributor

I'd like to iterate that supporting/using AbortSignals would not mean passing the signal to the subscriber function (in the observable constructor). It would just mean subscribe accepting an optional signal second argument and if a signal is passed there invoking the unsubscribe function

const source1$ = new Observable((subscriber) => {
   const resource = new ExpensiveResource();
   // call next somewhere

   return () => resource.close();
});
const unsubscrie = source1$.subscribe(...); // today, still works
source1$.subscribe(..., signal); // I am suggesting we add support for this

This would break very little existing code while enabling interop with all other async code :]

@Jopie64
Copy link

Jopie64 commented Dec 21, 2020

Bare with me cause I'm probably missing the point here... But out of curiosity, why introduce a new cancellation mechanism when rx already has one?
Also, when AbortSignal is used for a Promise, it would reject with AbortError once it is signalled. Wouldn't it be consistent if something similar is done for Observable? In that case, support for cancellation signal can be handled with just a new operator similar to takeUntil, but instead of completing, it errors out with AbortError when abortSignal emits. Something like

source$.pipe(
  abortWhen(abortSignal)
  ).subscribe({
    next: console.log,
    complete: () => console.log('not called on abort'),
    error: e => console.error('called on abort', e);
  });

This way, upstream is cancelled the normal way (returned function from subscribe function is called).
Also, one can make abortWhen operator accept an observable to signal the cancel as well.

@Jopie64
Copy link

Jopie64 commented Dec 22, 2020

Ok I indeed missed the point. Turns out (for me) that the plan was altogether to move away from old way of cancellation to a new way. #5863. As I understand to be able to support aborting of synchronously emitting observables (like in the firehose example), you need to be able to setup cancellation ahead of time.
I'll shut up (for now 😜).

@Jopie64
Copy link

Jopie64 commented Dec 24, 2020

@benjamingr
I think I'd agree with you that current way of detecting a cancellation should still be supported, so existing code is not broken. But consider the following example as what I understand is the main reason for the idea of passing the abort signal to the subscriber function:

const firehose$ = new Observable((subscriber) => {
   let cancelled = false;
   let count = 0;
   while(!cancelled) {
     subscriber.next(count++);
   }
   return () => cancelled = true;
});

In this synchronous 'firehose' example, the subscriber function will never reach the point where the cancellation function is returned. Hence it can never be cancelled.
But in the case where an abort signal is passed, it can still be cancelled like this:

const firehose$ = new Observable((subscriber, signal) => {
   let count = 0;
   while(!signal.aborted) {
     subscriber.next(count++);
   }
});

This is because cancellation is setup prior to calling the synchronous subscription function. Where in the first example, cancellation is (tried to be) setup after calling it.

Still, my opinion is that cancellation in the first example should still be supported (as well), because I'd say that in more than 99% of the cases, observables are used in asynchronous ways, and as you say, this way it can be backward compatible with existing code.

@benlesh benlesh removed the AGENDA ITEM Flagged for discussion at core team meetings label Apr 20, 2021
@benlesh benlesh closed this as completed May 4, 2021
@ReactiveX ReactiveX locked and limited conversation to collaborators May 4, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants