Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplification of Observable API #201

Open
benlesh opened this issue Oct 14, 2019 · 69 comments
Open

Simplification of Observable API #201

benlesh opened this issue Oct 14, 2019 · 69 comments

Comments

@benlesh
Copy link

benlesh commented Oct 14, 2019

Okay, I'm going to throw my hat back in and see if I can resurrect this a little.

What I'm going to propose is slightly different than the current proposal and different than RxJS, but I strongly feel it will work.

API

interface Observable<T> {
  new (
    initialization: (
      nextHandler: (value: T) => void,
      errorHandler: (err: any) => void,
      completeHandler: () => void, 
      signal: AbortSignal
     ) => void;
  ): Observable<T>

  subscribe(
      nextHandler?: (value: T) => void,
      errorHandler?: (err: any) => void,
      completeHandler?: () => void, 
      signal?: AbortSignal
  ): void;

  forEach(nextHandler: (value: T) => void, signal: AbortSignal): Promise<void>

  first(signal: AbortSignal): Promise<T>;

  last(signal: AbortSignal): Promise<T>;
}

The idea is to remove the need to define Observer and Subscriber, as in other Observable implementations, and use simple functions instead. Also using a cancellation token (ala AbortController/AbortSignal) instead of introducing a Subscription type.

I realize that AbortController and AbortSignal are not a part of JavaScript proper. However, I strongly feel JavaScript could use a cancellation primitive, and Observable, which is also a primitive, is not as useful without it.

Defining an observable instance

Below is the simplest use case for an observable. A synchronous set of values.

  test("should at least work", () => {
    const source = new Observable((next, error, complete, signal) => {
      next(1);
      next(2);
      next(3);
      complete();
    });

    let results = [];

    source.subscribe(value => results.push(value), null, () =>
      results.push("done")
    );

    expect(results).toEqual([1, 2, 3, "done"]);
  });

Handling of "firehose" synchronous data

With a cancellation token, like AbortSignal, handling synchronous firehoses and stopping them due to external unsubscription becomes a bit more intuitive than it was with previous designs, IMO:

 test("should handle firehose", () => {
    let loops = 0;
    const source = new Observable((next, err, complete, signal) => {
      for (let i = 0; i < 1000000000 && !signal.aborted; i++) {
        next(i);
        loops++;
      }
      // this will noop due to protections after abort below
      // which is "unsubscription".
      complete();
    });

    const controller = new AbortController();
    const results = [];

    source.subscribe(
      value => {
        results.push(value);
        if (results.length === 3) {
          // "unsubscribe"
          controller.abort();
        }
      },
      null,
      // complete should not be called, because of the
      // abort (unsubscription) above
      () => results.push("done"),
      controller.signal
    );

    expect(loops).toBe(3);
    expect(results).toEqual([0, 1, 2]);
  });

Concessions

first and last may not be necessary, and are more "nice to have"s for this type. Their primary use cases would be for wrapped HTTP calls, which, in a world where AbortSignals were prolific, should probably just be done via fetch.

Cons

There are a few cons to this design. Notably, from my perspective, it's not completely compatible with current popular designs. But I'm less worried about that than getting the appropriate primitives into the language.

Other thoughts

It's possible to have this implement Symbol.asyncIterator with a known behavior, like buffering all values internally until they are read. This, of course, comes with some potential issues around back-pressure and memory pressure, but I think that's easy to understand for most people who might use this type with for await.

Another con is creating a "Subject", which is a common type created to compose with observables, becomes mildly challenging, in that it would need to be something that could be destructured into three functions and an abort signal, but again, I don't think that's really a concern for language authors. The community can take care of that.

Links

I've tossed together a demo here.

Repo: https://github.com/benlesh/tc39-observable-proposal
Codesandbox: https://codesandbox.io/s/tc39-observable-proposal-proposed-change-uxh4p

@benlesh
Copy link
Author

benlesh commented Oct 14, 2019

NOTE: AbortSignal could be replaced with any other cancellation standard that lands, provided it's a token-type cancellation.

@acutmore
Copy link

Can see that this implementation does not have the semantics of the 'safe' SubscriptionObserver i.e. guaranteed that calling nextHandler, errorHandler or completeHandler would never throw.

Is that because this issue should just focus on discussing just the API design and not other semantics, or because this new design would not come with that guarantee? Didn't want to presume either way.

@zenparsing
Copy link
Member

Cool - we probably should have went down this "function-first" road from the start, but we had a "no closure" bias, if I recall...

Arguably, the cancel token in this API would best be a "subclass" of observable (with a subscribe method, etc.).

@benjamingr
Copy link

It's alive!🥳

So what about https://github.com/tc39/proposal-emitter ?

@gre
Copy link

gre commented Oct 14, 2019

That's a great simplification. One downside I see is the need to remember the ordering of params especially for the subscribe.

Also another suggestion is that maybe you don't need a signal but just a function:

  • On create side I find it really great to return a unsubscribe func (like react effect or in rxjs)
  • On subscribe side, why not returning just a unsubscribe func?
new Observable((next, error, complete) => {
  next(1);
  next(2);
  next(3);
  complete();
  return () => {};
});
const unsub = source.subscribe();
unsub();

@benlesh
Copy link
Author

benlesh commented Oct 15, 2019

@gre: The signal also serves as a mechanism to check whether or not cancellation is closed during synchronous emission. Consider the following:

const source = new Observable((next, error, complete) => {
  let i = 0;
  while (true) { next(i++); }
  return () => { /* doesn't matter what you do here, it's never reached */ };
});

const controller = new AbortController();

source.subscribe(
  value => {
    if (value > 4) controller.abort();
  }
),
null,
null,
controller.signal
);

In the above, there's really no way to signal that we want to stop the while loop in the Observable. A token or signal solves that cleanly:

const source = new Observable((next, error, complete, signal) => {
  let i = 0;
  while (!signal.aborted) { next(i++); }
});

@zenparsing ... I'm game to explore other cancellation primitives, (AbortController isn't the best, but it's a good start), using a subclassed Observable might be nice. The main thing is that is needs to be something we can:

  1. synchronously check to see if it's "closed" or "aborted" or whatever, for the firehose case.
  2. Create "children" of, such that operators like mergeMap, etc. can be implement by the community.

@benlesh
Copy link
Author

benlesh commented Oct 15, 2019

@benjamingr I've expressed my concerns and opinions about the emitter proposal here: tc39/proposal-emitter#26 (comment)

@gre
Copy link

gre commented Oct 15, 2019

@benlesh I don't find this necessarily too verbose:

const source = new Observable((next, error, complete) => {
  let i = 0;
  let aborted;
  while (!aborted) { next(i++); }
  return () => {
    aborted = true;
  }
});

If the idea is to remove the types like Observer and Subscription, why not simplifying this too.

Moreover, your example is to me the most uncommon one, most of the time you would "clean up" things like timeout or event unlistening.
And for these, this is general much simple to me:

const source = new Observable((next, error, complete, signal) => {
  const interval = setInterval(next, 1000);
  return () => clearInterval(interval);
});

Now try to solve this with signal: it sounds like you're going to need an event listener. Who is cleaning up the listener on the signal? Is it getting called even when the observable was unsubscribe? Is it leaking if you reuse a signal at many places? what if I consume my observable more than once?

@benlesh
Copy link
Author

benlesh commented Oct 15, 2019

@gre that cannot possibly work, as it will enter an infinite loop before returning that cancellation function.

It's also not as uncommon as you might think. Converting array to observable, or creating a range observable, or an iterable (the iterable could even have side effects on each turn). All of these things require that we have the ability to check and existing flag of some sort to make sure we aren't looping unnecessarily.

@gre
Copy link

gre commented Oct 15, 2019

Good point sorry I missed half of the code point. Please ignore the first part of my answer. Second part remain. @benlesh

@gre
Copy link

gre commented Oct 15, 2019

Yet I'm curious why in your code you are not prone to the same infinite loop. No way it can works in monothreaded JS 🤔

@benlesh
Copy link
Author

benlesh commented Oct 15, 2019

@gre Sorry, I edited my other response. I guess I wasn't expecting you to respond so quickly.

@benlesh
Copy link
Author

benlesh commented Oct 15, 2019

@gre The reason it works is because the signal provided was created before the for loop was ever entered, If any action that occurs on next (called in the body of the for loop) aborts the signal, the signal will be flagged as aborted synchronously. Which means on the next pass through the loop, it will see it has been aborted, and stop the loop.

@gre
Copy link

gre commented Oct 15, 2019

Ok that's a great usecase for the sync loop then.
So now I'm curious about how the second problem I mentioned would be solved with signal (cleaning up listeners and timeouts) and so it doesn't leak listeners and call listeners once when observable is done (as it's the case in rxjs). The fact signal comes from outside makes it hard to solve.

@benjamingr
Copy link

benjamingr commented Oct 15, 2019

Would it make sense for RxJS to migrate to this API (experimentally)?


An observable is just a setter for a setter (basically why RxJS and things like MobX or Vue are fundamentally the same idea :] ). so if we want a much more minimal API the following could work:

(By the way, if anyone other than Ben (whom is probably familiar with the material) wants a good intro I recommend @headinthebox who authored Rx's https://channel9.msdn.com/Events/Lang-NEXT/Lang-NEXT-2014/Keynote-Duality )

type Iterable<T> = () => (() => T);
type Observable<T> = (observer: (arg: T) => void) => void;
// would have preffered being able to write `T => () => ()`

Then things like error handling and completion can be build on top of that. Your example could be:

  test("should at least work", () => {
    const source = (next) => { next(1); next(2); next(3); }

    let results = [];

    source(value => results.push(value));

    expect(results).toEqual([1, 2, 3]);
  });

After all in regular RxJS subscribe (without the fluff) is just:

class Observable {
  constructor(subscribe) { this.subscribe = subscribe; }
}

Although to be fair I am fine with either API and I don't think that's why this proposal has stalled :]

@SerkanSipahi
Copy link

SerkanSipahi commented Oct 15, 2019

Hello @yyx990803 (Vuejs Creator), would you like to get involved in the discussion? As far as I know, vuejs has its own Observable implementation. It would be good if we could create a standard here with RxJx, Mobx, Vue, others (I don't know the others).

The discussion/proposals should not only arise from the RxJs perspective!

@benlesh
Copy link
Author

benlesh commented Oct 15, 2019

@benjamingr the "fluff" is actually what makes Observable an important primitive.

  1. Guarantees that "teardown" occurs on error, completion, and unsubscription (consumer disinterest).
  2. Guarantees that calling next, complete and error handlers can't be called out of order or otherwise unreasonably.
  3. Guarantees that unhandled errors are propagated in a way that makes sense and doesn't cause strange, hard to debug errors.

If I thought we could do this reliably and safely with ordinary functions, I wouldn't be arguing it belonged in the language, as we already have functions.

@benlesh
Copy link
Author

benlesh commented Oct 15, 2019

Cool - we probably should have went down this "function-first" road from the start, but we had a "no closure" bias, if I recall...

@zenparsing, yeah, I recall reading something about a function-first approach before, but I think one of the problems there was we also needed a way to stop the synchronous "firehose", and subscriber.closed provided that. With a cancellation token, we no longer are hindered by that. (And honestly, from a maintainer's perspective, cancellation tokens would clean up a reactive library considerably, there's a lot of dancing around the fact that subscriptions need to exist up front)

@acutmore
Copy link

The Fetch API returns a Promise that rejects on signal.abort.

Should the promise returning methods in this proposal mirror that behaviour? Right now the promise remains in a pending state.

Perhaps the single Fetch example is not enough of a precedent to follow, but can see myself being asked to explain the difference when training developers

@appsforartists
Copy link
Contributor

I'm not a TC39 member, so I don't have enough context to know what makes this proposal easier/harder than the previous one.

As a user, I prefer Observer because named parameters have better usability than positional ones.

new Observable(
  // That's a lot to remember
  (next, error, complete, abort) => {}
)
// vs
new Observable(
  // I can use only the parts I need, and order doesn't matter
  ({ complete, next }) => {}
)

To your point, Subject is a very useful tool, and it's trivial when the shape of Observer is well-defined.

@zenparsing
Copy link
Member

@benlesh In the interest of taking a fresh look at things, let me offer my perspective on the firehose issue.

I've never liked it, although I realize that users typically don't complain about it, given typical Observable usage scenarios. But in my experience with implementing combinators, it's a reoccuring footgun. The naive solution is typically wrong due to it.

In zen-observable (which has some usage, although certainly not anywhere near as much as RxJS), I ended up disallowing firehosing completely. Instead, the "firehosed" items get queued up, and "of" and "from" don't firehose (they wait a tick before sending). The change went in almost two years ago, and although I've gotten (awesome) bug reports over that time, I've never seen a single complaint about the queuing behavior.

If you avoid the firehose problem, then you can just return a cleanup function from the Observable init function. Then you can have a clean layering: Observables don't need to assume a cancellation primitive (other than functions of course), and cancellation tokens can be built on top of Observable.

@acutmore
Copy link

Personally I quite like the AbortController-AbortSignal, though can see how that makes this proposal a little more difficult to digest on the Node side, not having those APIs already.

I have forked the original CodeSandBox with an example of how cancelation could be done using Observables to cancel Observables as @zenparsing has already mentioned (though I didn't add a subclass). I also added a few tests to cover the modifications.

https://codesandbox.io/s/tc39-observable-proposal-proposed-change-dqkqd

const source = new Observable((next, err, complete, takeUntil) => {
    let abort = false;
    takeUntil.subscribe(() => (abort = true));
    for (let i = 0; i < 100 && !abort; i++) {
        next(i);
    }
    complete();
});

let abort;
const abortWhen = new Observable(n => {
    abort = n;
});

source.subscribe(
    () => results.length === 3 && abort(),
    null,
    null,
    abortWhen
);

or a more real-life example:

const timeout = (time) => new Observable((next, err, complete, takeUntil) => {
    const id = setTimeout(() => {
        next();
        complete();
    }, time);
   takeUntil.subscribe(() => clearTimeout(id));
});

const onEvent = (elm, event) => new Observable((next, err, complete, takeUntil) => {
    elm.addEventListener(event, next);
    takeUntil.subscribe(() => elm.removeEventListener(event, next));
});

timeout(10 * 1000).subscribe(
    () => launchSatalite(),
    null,
    null,
    onEvent(document.getElementById('cancel'), 'click')
);

To make it work I did also need to change subscribe to return a unsubscribe function.

@yyx990803
Copy link

@SerkanSipahi thanks for tagging me, but Vue's "observable" is focusing on a rather different problem domain, which is why we renamed the API to "reactive" in Vue 3. In particular, modeling the stream of changes over time and dealing with cancellation etc. is not part of the concern in Vue's reactivity system. I see the two systems with slight overlap but can be easily used in a complementary fashion.

@benjamingr
Copy link

benjamingr commented Oct 15, 2019

@benlesh

@benjamingr the "fluff" is actually what makes Observable an important primitive.

Yeah I agree. I am just not convinced that the "fluff" is bad or that the original proposal was blocked on the fluff rather than the heavy lifting. I am perfectly content with a battle tested API over a more minimal one to be honest :]

(I am enjoying the mental exercise though!)

@benjamingr
Copy link

@yyx990803 it's actually exactly the same problem doing the exact same thing although I understand why it might not seem that way. That's the point the inventor of Rx makes in the talk I linked to above.

I know it might seem orthogonal but vue.observable and Rx are two sides of the same coin with observables just being setters for setters.

@yyx990803
Copy link

yyx990803 commented Oct 15, 2019

@benjamingr I agree they share the same root at the theory level. But the intention behind their specific designs are aimed at different end user scenarios. So my point is I'm not interested in expanding Vue's reactivity system to be more observable-like. I'd rather it stay minimal and serve its current purpose. Nor am I interested in making observable the default in the Vue system, due to the extra mental overhead it incurs on users. Vue's goal would be making sure its own reactivity system can play nicely with 3rd party observable systems.

@benjamingr
Copy link

@yyx990803 I think I am communicating poorly because I really don't think that it needs to be expanded.

Vue's reactivity system (like Knockout, MobX and any other getter/setter or proxy system) is very much already the same thing (not in the root or theory level but in practice) as RxJS observables just with better ergonomics and different goals.

That's what the talk I linked to by Erik shows - an observable is just a setter for a setter with error handling and completion semantics - it's the same thing :]

Regardless I commend your commitment towards a simple and user friendly API :] (which is why I use Vue in my own code)

@matthewwithanm
Copy link

matthewwithanm commented May 22, 2020

@cedmandocdoc Have you seen the rxjs takeUntil operator? It allows you to use another observable basically the same way as an AbortSignal. With a replay subject you can basically get the same result (an imperative cancelation API via subject.next()) and it's pretty ergonomic!

I think @benlesh's proposal here is more about simplifying the core Observable API by aligning with the de facto standard cancelation mechanism (AbortSignal).

@cedmandocdoc
Copy link

@acutmore The idea of cancellation being an Observable is supported by the definition of an Observable itself, that is, basically an object that emits values.
Cancellation is an action that comes from an entity and represents data in the context of canceling something. Whoever the actor is or even where or when the action takes place doesn't matter, if it represents a cancellation that is fine.

With all that in mind, we can implement different forms of cancellation. To list a few we have:

  • A cancellation that comes from DOM event (previous example). This form of cancellation shows that the actor is a button, and the event takes place from the moment the button was clicked.
interval(1000).subscribe(
  value => console.log(value),
  fromEvent(button, 'click').mapTo('CANCEL') // cancellation observable
)

  • An AbortController-like cancellation (could be an answer, to your feedback). A cancellation where the actor is a function and takes place upon the calling of that function e.g. an Observer.
class Teardown extends Observable {
  constructor(producer) {
    super(producer);
    this.run = () => {};
  }

  subscribe(next, observable) {
    this.run = () => next('CANCEL');
    this.producer(next, observable);
  }
}

const teardown = new Teardown(() => {}); // pass an empty producer

fromArray([1, 2, 3]).subscribe(
  value => value === 2 && teardown.run() // fire a cancellation from the observer
  teardown // pass the Teardown Observable
);

  • An RxJS Subject Cancellation (could be an answer, to your feedback). This implementation of cancellation is generic, Subject could be used for other contexts as well. This may not be a good solution for a one-to-one relation of Observable and Observer but on the other hand it fits with one-to-many relation. One invoke cancellation could cancel many Observables.
const subject = new Subject();

interval(100).subscribe(
  value => value === 2 && subject.next('CANCEL') // fire a cancellation
  subject // pass the subject
);

All examples show that cancellation could come from anywhere and could take place anytime. This could prove that cancellation is indeed just an Observable with the context of canceling something.

@cedmandocdoc
Copy link

@matthewwithanm The operator takeUntil shows that cancellation takes place when an Observable emits something.
I think this commend the idea of cancellation is just another form of Observable.

  • It could happen anywhere or anytime, since takeUntil knows when to emit a cancellation
  • It is a data-cancellation representation that comes from another Observable, takeUntil waits for an emission that represents a cancellation.
  • It could wrap by a Subject to add the ability to cancel imperatively

And as you have said it is pretty ergonomic and yes I agree with that.

But I think the difference from the pattern I've showed compared to takeUntil operator is implementation. As far as I know the operator takeUntil relies on the returned function of the producer to cancel things but with the pattern I've showed it cancel the Observable through an Observable (with a specific context) by default.


I think @benlesh's proposal here is more about simplifying the core Observable API by aligning with the de facto standard cancelation mechanism (AbortSignal).

What do you mean by the de facto standard cancelation mechanism? Does this mean for the whole Javascript API? For example setInterval:

const controller = new AbortController();
const callback = () => {
  // logic
  controller.abort();
};
setInterval(callback, 1000, controller.signal)

I'm not sure about that, but if Javascript will embrace AbortController as a standard to cancel things for all the callback base or async API, I would say there will be contradictions. Different types of async API have different forms of cancellations. For instance if we abort a fetch request it resolves to an error this contradicts to others like setTimeout which doesn't have an error callback.

But from the pattern I've showed we can generalized those inconsistencies. For example we can create an AbortObservable that truly aborts an ObservableFetch.

class AbortObservable extends Observable {
  constructor(producer) {
    super(producer);
    this.run = () => {};
  }

  subscribe(next, observable) {
    this.run => () => next('ABORT'); // this emission will be observed by an ObservableFetch which then aborts the request to resolve to an error
    this.producer(next, observable);
  }
}

const abort = new AbortObservable(() => {}); // pass an empty producer

fromFetch(options)
  .listen(
    value => console.log(value),
    abort
  );

abort.run(); // abort the fetch

This not just limited to just abort fetch you can pass a merged Observable and merges an AbortObservable or plain CancelObservable where just cancel an Observable.

merge(AbortObservable, CancelObservable)

I think the idea of cancellation being an Observable is more simple than the use of AbortController and AbortSignal. Because of the ability to generalized cancellation. Lastly, I think it is more primitive than AbortController because you can create an AbortController-like using an Observable. It just all depends on how Observable should communicate with each other.

@adamf92
Copy link

adamf92 commented Nov 19, 2020

@benlesh Are you planning that changes in some future RxJS release ?
From my perspective, one of the most important features in current Subscription implementation is abitity to create and manage aggregated/nested subscriptions - is it possible with signal/controller or will need additional implementation?
I'm creating a framework (Atomi-Q, I think it could be interesting for you, as it demonstrates the potential of RxJS as a base for the fastest DOM updating solution) with concept called "Reactive Virtual DOM" - it's a tree of objects like Virtual DOM, but every dynamic/changeable node is Observable - thanks to it every state change is causing updates only in connected nodes - it's many times faster than Virtual DOM diffing. So, when elements are created, I can easily add their subscriptions to the parent subscription with add() method and when some element is removed, unsubscribing its subscription,
automatically unsubscribes all children subscriptions, so it works great with my concept. From examples I suppose that passing same signal to multiple subscribe calls will allow unsubscribing multiple streams at once, but it will be nice to have it working same way as currently in RxJS

@dy
Copy link

dy commented Jan 23, 2022

Sorry for reiterating on the subject - can someone please explain (again) or give ref to:

  1. Why cancellation cannot be left up to userland libs to implement (like RxJS) and need to be part of spec? Promises don't have cancellation mechanism, and there's cancelable-promise (and other mechanisms) for that. Afaik some committee members are not much fond of AbortController.
  2. What's the point of forEach method here? Isn't that the same as subscribe(next)?

Also a point to add on leaking listeners. Idk was FinalizationRegistry a thing for the time of discussion, but as tested in sube - it's possible to subscribe leak-free without necessary unsubscription.

Feels like cramming too much stuff into spec is the main reason it's stalling.
Just want to ref #210 as minimal meaningful proposal.

@benjamingr
Copy link

Why cancellation cannot be left up to userland libs to implement (like RxJS) and need to be part of spec? Promises don't have cancellation mechanism, and there's cancelable-promise (and others) for that.

Cleanup is fundamental to observables and without cancellation their expressiveness has significantly diminished. Though node and browsers (and others) settled on AbortSignal and that Chrome is blocking any non AbortSignal primitive suggestions in TC39.

What's the point of forEach method here? Isn't that the same as subscribe(next)?

You can await it which helps ergonomically.

@dy
Copy link

dy commented Jan 23, 2022

Cleanup is fundamental to observables and without cancellation their expressiveness has significantly diminished.

Maybe I haven't met use-cases for that yet. It's just for UI purposes (subscribable-things, templize, observable-value/value-ref) it's not apparent where that's useful - things get unsubscribed when observable is garbage-collected.
@zenparsing also mentioned that as footgun. Do you possibly have more elaborate example where it is a must?

You can await forEach it which helps ergonomically.

Could you elaborate? Just want to understand where it helps and why it's unavoidable for spec. Recently I witnessed the opposite example: node collections can be detected by presence of forEach method and iterated over immediately (HTMLCollection, NodeList, Array), whereas Observable is expected to be subscribed to, but pretends to be a collection.

@benjamingr
Copy link

benjamingr commented Jan 23, 2022

things get unsubscribed when observable is garbage-collected

I don't think that's an approach any committee would like and I think deterministic cleanup is important.

Could you elaborate?

Sure, 90% of the time when writing code that uses both observables and promises I have something like:

async function doFoo() {
  // setup page
  await observable.forEach(() => {
    // update page
  });
  // do more work
}

Though I guess this can be by implementing Symbol.asyncIterator (which is probably more spec work?)


I'd like to emphasize the observable proposal isn't actually blocked on either of those things though. It's not that cleanup or forEach went to the committee and the proposal got blocked on them - the proposal just doesn't have an active champion and hasn't been presented to tc39 or actively worked on in a long while.

@dy
Copy link

dy commented Jan 23, 2022

Naive Symbol.asyncIterator:
async *[Symbol.asyncIterator]() {
  let resolve,
      buffer = [],
      unsubscribe,
      promise = new Promise(r => resolve = r)

  { unsubscribe } = this.subscribe(value => {
    buffer.push(value)
    resolve()
    promise = new Promise(r => resolve = r)
  })

  try {
    while (1) yield*  buffer.splice(0), promise
  }
  catch {}

  unsubscribe()
}

@runarberg
Copy link

For the record, I experimented with Symbol.asyncIterator back when I was trying out the simplified API for fun a couple of years ago:

https://github.com/runarberg/tc39-observable-proposal/blob/operators/src/Observable.js#L107-L156

I think there are some subtle differences between that particular implementation of Symbol.asyncIterator and the forEach. Unlike forEach the async iterator will not yield a new item until .next() is called. I don’t really know though when you would prefer one over the other in real life. However there has been a situation where I wanted my items chunked by the ticks they were pushed, so I also implemented a chunks operator for that. There could be an argument made that this is how Symbol.asyncIterator should behave.

@benjamingr
Copy link

benjamingr commented Jan 24, 2022

I don’t really know though when you would prefer one over the other in real life.

I can literally talk for 12 hours on the advantages of push streams (like observables) vs. pull streams (like async iterators) but to name a few:

  • observables guarantee they never buffer so you have a much stronger memory guarantee.

  • observables are super-strict (read: eager) which means you get data "as soon as possible" from the producer.

  • observables provide a much simpler interface since all they do is push data - the protocol is just "you subscribe to me and get updates".

  • observables (mostly) have to be "functions to actions" since because they are so eager - they require the extra step from observable to observer.

  • observables give all the rate control to the producer.

  • async iterators can buffer - if I have an iterator for click events and I do not consume it - it will "buffer" the click events for me and when I iterate it I can get events "from the past".

  • async iterators are lazy, which means they can support backpressure very easily (that is, controlling how much data you ask from the producer to even-load chained iterators).

  • async iterators are a pretty complicated interface making them harder to implement. They build on promises so their timing guarantees are different though that's just a detail and not push vs. pull.

  • If you have an async iterator - you already have a thing for consuming the resources (like an observer and unlike an observable) which you can only get away with because iteration is itself lazy.

  • with async iterators all the control is at the consumer side.

Note RxJS already supports Symbol.asyncIterator and since it's the most popular implementation anyway I recommend you experiment with that.

@spaceribs
Copy link

Jumping on to what @benjamingr said, ReactiveX has already built an interface around async iterators, and can be seen here: https://github.com/ReactiveX/IxJS

These technologies are not in opposition, they are different tools for different jobs.

@benlesh
Copy link
Author

benlesh commented Jan 24, 2022

FWIW: RxJS observables support interop with async iterables, and there's also a 3rd party library for converting observables to async iterables: https://github.com/benlesh/rxjs-for-await. In that library I identified the four most common ways developers might want to convert pushed events to async iterables.

@dy
Copy link

dy commented Jan 25, 2022

Great, that seems like exhaustive solution to the 2nd point from #201 (comment), which adds to #210 - leave .forEach / [Symbol.asyncIterable] to userland.

Remains cancellation mechanism and firehose case. Maybe there is argumentation to avoid that, like mentioned by @zenparsing? To clearly understand what purpose it properly serves, maybe some good examples (not theoretical purity).

@cedmandocdoc
Copy link

Sorry to slide in, but I don't think cancellation is fundamental to Observable. By itself, it just propagates data and nothing else.

We don't need cancellation as long as there is no observation that will occur. Observation is not equal to observable and it is the observation that creates the structure between the producer and the consumer to interact with one another.

Interaction is inevitable in observation and in this structure that cancellation emerges. It is not the cancellation that needs to be simplified in the API but should be the structure of interaction between the producer and consumer.

Cancellation is just an emergent property like anything else that an application would demand probably like a demand for the next data like iterable. These emergent properties act like just another observable, it propagates data but in reverse, that is, from the consumer to the producer.

In my opinion a much cleaner API:

new Observable((next, error, complete, observable) => {
  // producer pushes data
  // ...

  // the producer listens to external observable
  // which the observation takes place forming
  // a complete interaction
  observable.subscribe(data => {
    // listen for data cancellation?
    // listen for data pulling?
    // or any other emergent properties
  })
});

@benjamingr
Copy link

Sorry to slide in, but I don't think cancellation is fundamental to Observable. By itself, it just propagates data and nothing else.

Observable without clear cleanup and error handling semantics is greatly diminished IMO and is effectively just a function passed around (literally, on an object). You need to specify how to do stuff like "close the websocket" in order for observables to enable most use cases they are used for today.

@cedmandocdoc
Copy link

@benjamingr

Cancellation is still there is just another Observable that is passed around.

@cedmandocdoc
Copy link

@benjamingr

Observable without clear cleanup and error handling semantics is greatly diminished

With the idea that I suggest, what does it make to have not a clear cancellation? It doesn't have explicit cleanup but that doesn't mean it is not clear for the semantics to greatly diminish.

@benjamingr
Copy link

Like, to be honest these discussions are interesting but they happened before (in this repo and elsewhere, the idea of cancellation through an observable was talked about like ±3 times here I think?) and they are not the blocker.

In any case Chrome will block any cancellation scheme that isn't based on the web cancellation primitive (AbortSignal) and other parties will likely block any effort to do disposing of subscriptions that does not work with await using.

So we can discuss (and it's interesting!) why cancellation semantics are important and the issues with using an observable to signal cancellation and what happened the last attempt but I don't want to make any suggestion that it will help progress observables in JS in any way.

@cedmandocdoc
Copy link

cedmandocdoc commented Jan 25, 2022

Observable doesn't need cancellation semantics. What it needs is a structure for interaction so observation can happen. And in that structure, cancellation can happen but it is not fundamentally part of it.

In observation, interaction is the only inevitable.

Cancellation is application-dependent, observable and observation can happen even without it. It's just we have been accustomed that cancellation is a fundamental part of it. For this reason, we need to ship a cancellation semantics to support most implementation in the wild.

So the idea that I suggest is blocked because of the definition that cancellation in Observable is fundamental, which is not, and it has been widely used. But if we think fundamentally, cancellation has never been in the picture of Observable.

@benjamingr
Copy link

If we ignore how important resource cleanup and safe error handling semantics are - observables are very simple - you can see callbags for a take on it. I don't think there is any interest in standardizing semantics without cancellation and error handling semantics.

An observable with that stripped is just:

// (T => ()) => ()     or in TS syntax something like (T => void) => void
const observable = (fn) => {
  let i = 0;
  setInterval(() => fn(i++), 1000);
};

observable((data) => console.log('got data from observable', data));

Erik goes in-depth into it in his talks about Rx (About getters to getters and setters to setters).

As for why do we need everything around it? For the same reason we need clear defined semantics for iterators and why an iterator isn't just a getter to a getter the same way this is a setter to a setter:

// Note this has the exact same signature with all the arrows inverted, this is 
// what people refer to by duality in the repo or when talking about observables
// () => (() => T) or in TS syntax something like () => () => T
function iterator() {
  let i = 0;
  return () => i++;
}

iterator(); // get next value

The reason we need an iteration protocol rather than just tell people to pass functions around is twofold:

  • First because conventions are important for libraries to interact with eachother with clear error handling and resource management semantics
  • Second because it means the language and platform itself can use observables (or iterators or promises) in the language and specifications itself. For example in my case it would enable a better story for observables in Node.

@benjamingr
Copy link

But if we think fundamentally, cancellation has never been in the picture of Observable.

Also I don't think that's entirely true. The inventor of Observables (as we know them) presented them with resource management semantics and error handling from day 1 : http://csl.stanford.edu/~christos/pldi2010.fit/meijer.duality.pdf https://dl.acm.org/doi/pdf/10.1145/2168796.2169076

@cedmandocdoc
Copy link

Not cancellation semantics but interaction semantics is what is needed.

On the code you have shown:

// (T => ()) => ()     or in TS syntax something like (T => void) => void
const observable = (fn) => {
  let i = 0;
  setInterval(() => fn(i++), 1000);
};

observable((data) => console.log('got data from observable', data));

It is not just observable that takes place, it is also an observation. When we called the observable that is when the observation takes place. In an observation, interaction is inevitable. That is why we need a structure for interactive observable.

Also I don't think that's entirely true. The inventor of Observables (as we know them) presented them with resource management semantics and error handling

I am not establishing the idea from the current definition of Observable. I am more like to rediscover and find proof of why cancellation is fundamental. To me, it is not because it is simply doesn't exist to all kinds of observable. If you can point me onto where on the study the proof why the cancellation is the fundamental and not the interaction that would be great.

@benjamingr
Copy link

To me, it is not because it is simply doesn't exist to all kinds of observable.

You are right and in particular it doesn't exist for the type above (though it is very easy to add by returning a function from it). I warmly recommend that if you are interested in the theoretical bits and types to read on you check my link above in this comment #201 (comment)

@Jopie64
Copy link

Jopie64 commented Jan 25, 2022

I think cancellation should at least be standardized.
If cancellation is not standardized, and up to the user,
how would you implement an operator like switchMap?

@cedmandocdoc
Copy link

@benjamingr, sadly the link on #201 (comment) is redirecting to something else but I would love to read that.

@cedmandocdoc
Copy link

cedmandocdoc commented Jan 26, 2022

@Jopie64

On this comment I've explained that cancellation is still there. It is just another observable that is passed around. So you can implement cancellation or even standardized cancellation if needed, but it should not be directly part of the Observable. Observable and observation just manages data flow and nothing else.

How we will implement or standardize cancellation? The answer is, cancellation is observable. But this is not easy to implement primitively in JS now as have mentioned that anything that does not use the current primitive for cancellation is being blocked.

If you are interested in the implementation I've created a library, it is very simple.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests