diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index d8e10eb56e..6a304e6b6a 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -261,6 +261,8 @@ export declare type InteropObservable = { export declare function interval(period?: number, scheduler?: SchedulerLike): Observable; +export declare function isAbortError(value: any): boolean; + export declare function isObservable(obj: any): obj is Observable; export declare function lastValueFrom(source: Observable): Promise; @@ -361,8 +363,8 @@ export declare class Observable implements Subscribable { constructor(subscribe?: (this: Observable, subscriber: Subscriber) => TeardownLogic); protected _subscribe(subscriber: Subscriber): TeardownLogic; protected _trySubscribe(sink: Subscriber): TeardownLogic; - forEach(next: (value: T) => void): Promise; - forEach(next: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise; + forEach(nextHandler: (value: T) => void, signal?: AbortSignal): Promise; + forEach(nextHandler: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise; protected lift(operator?: Operator): Observable; pipe(): Observable; pipe(op1: OperatorFunction): Observable; @@ -376,6 +378,7 @@ export declare class Observable implements Subscribable { pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction): Observable; pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction, ...operations: OperatorFunction[]): Observable; subscribe(observer?: PartialObserver): Subscription; + subscribe(observer: PartialObserver | null | undefined, signal: AbortSignal | null | undefined): Subscription; subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription; subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Subscription; subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Subscription; diff --git a/package-lock.json b/package-lock.json index 62bc4486cb..066da8b7e3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2598,6 +2598,12 @@ "through": "~2.3.1" } }, + "event-target-polyfill": { + "version": "0.0.2", + "resolved": "https://registry.npmjs.org/event-target-polyfill/-/event-target-polyfill-0.0.2.tgz", + "integrity": "sha512-m7I5gwno/p6H0wRJ7jsjJnbdcA8Do3mAf57vUphb1X+DUHAEMhjUmg8MBAaFsB/BtUCXUl70mUPr0Iw3dvdaqQ==", + "dev": true + }, "events": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/events/-/events-3.0.0.tgz", @@ -8757,6 +8763,12 @@ "lodash": "^4.17.15", "yargs": "^13.3.0" } + }, + "yet-another-abortcontroller-polyfill": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/yet-another-abortcontroller-polyfill/-/yet-another-abortcontroller-polyfill-0.0.3.tgz", + "integrity": "sha512-B4O92xxxZP6QhNMVYKv4qZcFbTB1txXQT2lUPq38e9TMuGQhGl2Ow+NN/XflY7aT4v4H/NZI1VTJT7B6YHc5iw==", + "dev": true } } } diff --git a/package.json b/package.json index ee6b5ed21a..9523eae0e6 100644 --- a/package.json +++ b/package.json @@ -117,6 +117,7 @@ "escape-string-regexp": "1.0.5", "eslint": "4.18.2", "eslint-plugin-jasmine": "^2.10.1", + "event-target-polyfill": "0.0.2", "fs-extra": "^8.1.0", "glob": "7.1.2", "google-closure-compiler-js": "20170218.0.0", @@ -150,7 +151,8 @@ "typedoc": "^0.17.8", "typescript": "~3.9.2", "validate-commit-msg": "2.14.0", - "webpack": "^4.31.0" + "webpack": "^4.31.0", + "yet-another-abortcontroller-polyfill": "0.0.3" }, "files": [ "dist/bundles", diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 75ad08bc3a..bebd91fdf2 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -1,10 +1,12 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { Observer, TeardownLogic } from '../src/internal/types'; -import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs'; -import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError } from 'rxjs/operators'; +import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty, interval } from 'rxjs'; +import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, mergeMap, finalize, mergeAll } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from './helpers/observableMatcher'; +import 'event-target-polyfill'; +import 'yet-another-abortcontroller-polyfill'; function expectFullObserver(val: any) { expect(val).to.be.a('object'); @@ -65,6 +67,26 @@ describe('Observable', () => { }); describe('forEach', () => { + it('should support AbortSignal', async () => { + const results: number[] = [] + const ac = new AbortController(); + try { + await of(1, 2, 3, 4, 5).forEach( + n => { + results.push(n); + if (n === 3) { + ac.abort(); + } + }, + ac.signal + ) + } catch (err) { + expect(err.name).to.equal('AbortError'); + } + expect(results).to.deep.equal([1, 2, 3]); + }); + + it('should iterate and return a Promise', (done) => { const expected = [1, 2, 3]; const result = of(1, 2, 3) @@ -199,6 +221,84 @@ describe('Observable', () => { }); }); + describe('subscribe with abortSignal', () => { + it('should allow unsubscription with the abortSignal', (done) => { + const source = new Observable(subscriber => { + let i = 0; + const id = setInterval(() => subscriber.next(i++)); + return () => { + clearInterval(id); + expect(results).to.deep.equal([0, 1, 2, 3]); + expect(subscription.closed).to.be.true; + done(); + } + }); + + const results: number[] = []; + const ac = new AbortController(); + const subscription = source.subscribe({ + next: n => { + results.push(n); + if (n === 3) { + ac.abort(); + } + } + }, ac.signal); + }); + + it('should not subscribe if the abortSignal is already aborted', () => { + let called = false; + const source = new Observable(() => { + called = true; + throw new Error('should not be called'); + }); + const ac = new AbortController(); + ac.abort(); + const subscription = source.subscribe(undefined, ac.signal); + expect(called).to.be.false; + expect(subscription.closed).to.be.true; + }); + + it('should still chain the unsubscriptions', () => { + rxTestScheduler.run(({ hot, cold, expectObservable, expectSubscriptions, time }) => { + const inner1 = cold(' ----a----a-----a-|'); + const inner2 = cold(' ----b----b------b---|'); + const inner3 = cold(' ----c----c----c----c---|'); + const source = hot(' ---a---b---c---|', { a: inner1, b: inner2, c: inner3 }); + const sSubs = ' ^--------------!'; + const i1Subs = ' ---^----------------!'; + const i2Subs = ' -------^--------------!'; + const i3Subs = ' -----------^----------!'; + const abortAt = time('----------------------|') + const expected = ' -------a---ba--cb-a-c---'; + const result = source.pipe( + mergeAll() + ); + + const ac = new AbortController(); + + const wrapperBecauseTestSchedulerDoesntSupportAbortYet = new Observable(subscriber => { + return result.subscribe({ + next: value => { + subscriber.next(value); + }, + error: err => subscriber.error(err), + complete: () => subscriber.complete() + }, ac.signal); + }); + rxTestScheduler.schedule(() => { + ac.abort(); + }, abortAt); + + expectObservable(wrapperBecauseTestSchedulerDoesntSupportAbortYet).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sSubs); + expectSubscriptions(inner1.subscriptions).toBe(i1Subs); + expectSubscriptions(inner2.subscriptions).toBe(i2Subs); + expectSubscriptions(inner3.subscriptions).toBe(i3Subs); + }) + }); + }); + describe('subscribe', () => { it('should be synchronous', () => { let subscribed = false; diff --git a/src/index.ts b/src/index.ts index 18c2577d86..1bbb53154d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ /* Observable */ -export { Observable } from './internal/Observable'; +export { Observable, isAbortError } from './internal/Observable'; export { ConnectableObservable } from './internal/observable/ConnectableObservable'; export { GroupedObservable } from './internal/operators/groupBy'; export { Operator } from './internal/Operator'; diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 48cf3653e7..1141efaa1b 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -10,6 +10,7 @@ import { toSubscriber } from './util/toSubscriber'; import { observable as Symbol_observable } from './symbol/observable'; import { pipeFromArray } from './util/pipe'; import { config } from './config'; +import { AbortError } from './util/AbortError'; /** * A representation of any set of values over any amount of time. This is the most basic building block @@ -71,6 +72,7 @@ export class Observable implements Subscribable { } subscribe(observer?: PartialObserver): Subscription; + subscribe(observer: PartialObserver | null | undefined, signal: AbortSignal | null | undefined): Subscription; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription; /** @deprecated Use an observer instead of an error callback */ @@ -205,12 +207,34 @@ export class Observable implements Subscribable { */ subscribe( observerOrNext?: PartialObserver | ((value: T) => void) | null, - error?: ((error: any) => void) | null, + errorOrSignal?: ((error: any) => void) | null | AbortSignal, complete?: (() => void) | null ): Subscription { const { operator } = this; + + let error: ((error: any) => void) | null | undefined = null; + let signal: AbortSignal | undefined = undefined; + if (isAbortSignal(errorOrSignal)) { + signal = errorOrSignal; + if (signal.aborted) { + return Subscription.EMPTY; + } + } else { + error = errorOrSignal; + } + const sink = toSubscriber(observerOrNext, error, complete); + if (signal) { + const handler = () => { + sink.unsubscribe(); + }; + signal.addEventListener('abort', handler); + sink.add(() => { + signal?.removeEventListener('abort', handler); + }); + } + if (operator) { sink.add(operator.call(sink, this.source)); } else { @@ -251,53 +275,80 @@ export class Observable implements Subscribable { } /** - * Used as a NON-CANCELLABLE means of subscribing to an observable, for use with - * APIs that expect promises, like `async/await`. You cannot unsubscribe from this. + * Subscribes to the observable in a way that returns a promise. Useful for async-await and + * other promise-based APIs. * - * **WARNING**: Only use this with observables you *know* will complete. If the source - * observable does not complete, you will end up with a promise that is hung up, and - * potentially all of the state of an async function hanging out in memory. To avoid - * this situation, look into adding something like {@link timeout}, {@link take}, - * {@link takeWhile}, or {@link takeUntil} amongst others. + * Returns a promise that will: + * + * - Resolve when the observable's producer completes and is done pushing values. + * - Rejects with the error if the observable's producer errors and can no longer produce values. + * - Rejects with an AbortError (see note below) if a passed `AbortSignal` signals. * - * ### Example: + * ### Example * * ```ts - * import { interval } from 'rxjs'; + * import { interval, isAbortError } from 'rxjs'; * import { take } from 'rxjs/operators'; * - * const source$ = interval(1000).pipe(take(4)); - * - * async function getTotal() { - * let total = 0; - * - * await source$.forEach(value => { - * total += value; - * console.log('observable -> ', value); - * }); - * - * return total; + * async function test() { + * const source = interval(200); + * + * console.log('start first forEach'); + * await source.pipe(take(4)).forEach(console.log); + * console.log('first forEach complete'); + * + * const ac = new AbortController(); + * setTimeout(() => { + * // unsubscribe after ~1 second + * ac.abort(); + * }, 1000); + * + * console.log('start second forEach'); + * try { + * await source.forEach(console.log, ac.signal); + * } catch (err) { + * if (isAbortError(err)) { + * console.log('second forEach cancelled'); + * } + * } * } * - * getTotal().then( - * total => console.log('Total:', total) - * ) - * - * // Expected: - * // "observable -> 0" - * // "observable -> 1" - * // "observable -> 2" - * // "observable -> 3" - * // "Total: 6" + * test(); + * + * // Expected output + * // "start first forEach" + * // 0 + * // 1 + * // 2 + * // 3 + * // "first forEach complete" + * // "start second forEach" + * // 0 + * // 1 + * // 2 + * // 3 + * // 4 + * // "second forEach cancelled" * ``` - * @param next a handler for each value emitted by the observable - * @return a promise that either resolves on observable completion or - * rejects with the handled error + * + * NOTE: `AbortError` isn't really a type yet. At the time of this writing, Chrome and Firefox utilize + * DOMException for `fetch` calls that are aborted via `AbortSignal`. However, MDN Documentation currently + * states that should be `AbortError`. As a middle ground, RxJS is currently just rejecting with a + * plain `Error`, that has a `name` of `"AbortError"`. RxJS provides a helper method called {@link isAbortError} + * to allow you to check to see if a promise rejection was due to cancellation via `abort` or not. + * The idea is that as this semantic evolves in various runtimes, we can evolve the inner workings of + * `isAbortError` without breaking your code. (we hope, lol). + * + * @param nextHandler A handler that is fired for each value pushed from the producer. + * @param signal A signal that can be used to unsubscribe, and tell the producer to + * stop pushing values. If the subscription ends because of this signal, the returned + * promise will reject. You can determine this rejection was from the signal using + * {@link isAbortError} */ - forEach(next: (value: T) => void): Promise; + forEach(nextHandler: (value: T) => void, signal?: AbortSignal): Promise; /** - * @param next a handler for each value emitted by the observable + * @param nextHandler a handler for each value emitted by the observable * @param promiseCtor a constructor function used to instantiate the Promise * @return a promise that either resolves on observable completion or * rejects with the handled error @@ -307,28 +358,66 @@ export class Observable implements Subscribable { * polyfill Promise, or you create an adapter to convert the returned native promise * to whatever promise implementation you wanted. */ - forEach(next: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise; + forEach(nextHandler: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise; + + forEach(nextHandler: (value: T) => void, promiseCtorOrSignal?: PromiseConstructorLike | AbortSignal): Promise { + let promiseCtor: PromiseConstructorLike | undefined; + let signal: AbortSignal | undefined; + if (isAbortSignal(promiseCtorOrSignal)) { + signal = promiseCtorOrSignal; + } else { + promiseCtor = promiseCtorOrSignal; + } - forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise { promiseCtor = getPromiseCtor(promiseCtor); return new promiseCtor((resolve, reject) => { + let cleanupSignalHandler: (() => void) | undefined; + + if (signal) { + const handler = () => { + // TODO: Chrome and Firefox both use DOMException here for now + // This should probably be something better defined. MDN documentation + // says there is supposed to be an AbortError, but I haven't found any + // implementations in any runtimes. For now, we can document this + // as the users needing to check `err.name === 'AbortError'` and + // adjust over time. + reject(new AbortError()); + cleanupSignalHandler = undefined; + }; + + cleanupSignalHandler = () => { + signal?.removeEventListener('abort', handler); + }; + + signal.addEventListener('abort', handler, { once: true }); + } + // Must be declared in a separate statement to avoid a ReferenceError when // accessing subscription below in the closure due to Temporal Dead Zone. let subscription: Subscription; subscription = this.subscribe( - (value) => { - try { - next(value); - } catch (err) { - reject(err); - if (subscription) { - subscription.unsubscribe(); + { + next(value) { + try { + nextHandler(value); + } catch (err) { + reject(err); + if (subscription) { + subscription.unsubscribe(); + } } - } + }, + error: (err) => { + cleanupSignalHandler?.(); + reject(err); + }, + complete: () => { + cleanupSignalHandler?.(); + resolve(); + }, }, - reject, - resolve + signal ); }) as Promise; } @@ -505,3 +594,33 @@ function getPromiseCtor(promiseCtor: PromiseConstructorLike | undefined) { return promiseCtor; } + +function isAbortSignal(value: any): value is AbortSignal { + return ( + (typeof AbortSignal !== 'undefined' && value instanceof AbortSignal) || + (value && 'aborted' in value && typeof value.addEventListener === 'function' && typeof value.removeEventListener === 'function') + ); +} + +/** + * A utility function to check to see if an error is an error created because a + * subscription was aborted in a method that returns a promise, such as {@link forEach}. + * + * Promises must resolve or reject, therefor aborting a subscription that was supposed to + * either resolve or reject a promise must be rejected, so the promise does not just + * hang out in memory. + * + * This is most useful in async-await. + * + * RxJS is providing this helper method because the error that is returned in + * @param value The error to test to see if you have an abort error. + */ +export function isAbortError(value: any): value is AbortError { + return ( + (value && value.name === 'AbortError') || + // It could be an abort error from an inner fetch, and Firefox and Chrome, as of this + // writing, reject with a DOMException. There's no telling what other platforms will + // do, or how this will change over time. We may need to add more do this conditional. + (typeof DOMException !== 'undefined' && value instanceof DOMException) + ); +} diff --git a/src/internal/util/AbortError.ts b/src/internal/util/AbortError.ts new file mode 100644 index 0000000000..b230d9f83b --- /dev/null +++ b/src/internal/util/AbortError.ts @@ -0,0 +1,31 @@ +export interface AbortError extends Error { +} + +export interface AbortErrorCtor { + new(): AbortError; +} + +const AbortErrorImpl = (() => { + function AbortErrorImpl(this: Error) { + Error.call(this); + this.message = 'Abort exception'; + this.name = 'AbortError'; + return this; + } + + AbortErrorImpl.prototype = Object.create(Error.prototype); + + return AbortErrorImpl; +})(); + +/** + * An error thrown when an Observable or a sequence was queried but has no + * elements. + * + * @see {@link first} + * @see {@link last} + * @see {@link single} + * + * @class AbortError + */ +export const AbortError: AbortErrorCtor = AbortErrorImpl as any; \ No newline at end of file