From 482d77c549677942ec726b01ef4f1d68d7d76431 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 14 Mar 2019 20:57:42 -0700 Subject: [PATCH] feat(forkJoin): accepts a dictionary of sources - Adds `forkJoin({ foo, bar })` such that it will return an `Observable<{ foo, bar }>` containing the values emitted from the sources held at the properties of the passed object. - DEPRECATED: `forkJoin(a, b, c, d)` should no longer be used, rather, just pass an array such as `forkJoin([a, b, c, d])`. --- package.json | 2 +- spec-dtslint/observables/forkJoin-spec.ts | 165 +++--- spec/observables/forkJoin-spec.ts | 602 ++++++++++++++-------- src/internal/observable/forkJoin.ts | 293 +++++------ src/internal/types.ts | 2 + 5 files changed, 598 insertions(+), 466 deletions(-) diff --git a/package.json b/package.json index 5d8fbeb9a7..838b6bd825 100644 --- a/package.json +++ b/package.json @@ -238,7 +238,7 @@ "tslint": "5.9.1", "tslint-etc": "1.2.6", "tslint-no-unused-expression-chai": "0.0.3", - "typescript": "^3.0.1", + "typescript": "~2.8.0", "validate-commit-msg": "2.14.0", "webpack": "1.13.1", "xmlhttprequest": "1.8.0" diff --git a/spec-dtslint/observables/forkJoin-spec.ts b/spec-dtslint/observables/forkJoin-spec.ts index 4d8936838b..238b5b283f 100644 --- a/spec-dtslint/observables/forkJoin-spec.ts +++ b/spec-dtslint/observables/forkJoin-spec.ts @@ -1,94 +1,93 @@ import { of, forkJoin } from 'rxjs'; -it('should infer correctly with 1 parameter', () => { - const a = of(1, 2, 3); - const res = forkJoin(a); // $ExpectType Observable -}); +describe('deprecated rest args', () => { + it('should infer correctly with 1 parameter', () => { + const a = of(1, 2, 3); + const res = forkJoin(a); // $ExpectType Observable<[number]> + }); -it('should infer correctly with 2 parameters', () => { - const a = of(1, 2, 3); - const b = of('a', 'b', 'c'); - const res = forkJoin(a, b); // $ExpectType Observable<[number, string]> -}); + it('should infer correctly with 2 parameters', () => { + const a = of(1, 2, 3); + const b = of('a', 'b', 'c'); + const res = forkJoin(a, b); // $ExpectType Observable<[number, string]> + }); -it('should infer correctly with 3 parameters', () => { - const a = of(1, 2, 3); - const b = of('a', 'b', 'c'); - const c = of(1, 2, 3); - const res = forkJoin(a, b, c); // $ExpectType Observable<[number, string, number]> -}); + it('should infer correctly with 3 parameters', () => { + const a = of(1, 2, 3); + const b = of('a', 'b', 'c'); + const c = of(1, 2, 3); + const res = forkJoin(a, b, c); // $ExpectType Observable<[number, string, number]> + }); -it('should infer correctly with 4 parameters', () => { - const a = of(1, 2, 3); - const b = of('a', 'b', 'c'); - const c = of(1, 2, 3); - const d = of(1, 2, 3); - const res = forkJoin(a, b, c, d); // $ExpectType Observable<[number, string, number, number]> -}); + it('should infer correctly with 4 parameters', () => { + const a = of(1, 2, 3); + const b = of('a', 'b', 'c'); + const c = of(1, 2, 3); + const d = of(1, 2, 3); + const res = forkJoin(a, b, c, d); // $ExpectType Observable<[number, string, number, number]> + }); -it('should infer correctly with 5 parameters', () => { - const a = of(1, 2, 3); - const b = of('a', 'b', 'c'); - const c = of(1, 2, 3); - const d = of(1, 2, 3); - const e = of(1, 2, 3); - const res = forkJoin(a, b, c, d, e); // $ExpectType Observable<[number, string, number, number, number]> -}); + it('should infer correctly with 5 parameters', () => { + const a = of(1, 2, 3); + const b = of('a', 'b', 'c'); + const c = of(1, 2, 3); + const d = of(1, 2, 3); + const e = of(1, 2, 3); + const res = forkJoin(a, b, c, d, e); // $ExpectType Observable<[number, string, number, number, number]> + }); -it('should infer correctly with 6 parameters', () => { - const a = of(1, 2, 3); - const b = of('a', 'b', 'c'); - const c = of(1, 2, 3); - const d = of(1, 2, 3); - const e = of(1, 2, 3); - const f = of(1, 2, 3); - const res = forkJoin(a, b, c, d, e, f); // $ExpectType Observable<[number, string, number, number, number, number]> + it('should infer correctly with 6 parameters', () => { + const a = of(1, 2, 3); + const b = of('a', 'b', 'c'); + const c = of(1, 2, 3); + const d = of(1, 2, 3); + const e = of(1, 2, 3); + const f = of(1, 2, 3); + const res = forkJoin(a, b, c, d, e, f); // $ExpectType Observable<[number, string, number, number, number, number]> + }); }); -// TODO(benlesh): this needs to be fixed as well -// it('should infer of type any for more than 6 parameters', () => { -// const a = of(1, 2, 3); -// const b = of('a', 'b', 'c'); -// const c = of(1, 2, 3); -// const d = of(1, 2, 3); -// const e = of(1, 2, 3); -// const f = of(1, 2, 3); -// const g = of(1, 2, 3); -// const res = forkJoin(a, b, c, d, e, f, g); // $ExpectType Observable<{}> -// }); - -it('should infer correctly for array of 1 observable', () => { - const a = [of(1, 2, 3)]; - const res = forkJoin(a); // $ExpectType Observable +describe('forkJoin({})', () => { + it('should properly type empty objects', () => { + const res = forkJoin({}); // $ExpectType Observable + }); + + it('should work for the simple case', () => { + const res = forkJoin({ foo: of(1), bar: of('two'), baz: of(false) }); // $ExpectType Observable<{ foo: number; bar: string; baz: boolean; }> + }); }); -// TODO(benlesh): We need to fix forkJoin so these pass -// it('should infer correctly for array of 2 observables', () => { -// const a = [of(1, 2, 3), of('a', 'b', 'c')]; -// const res = forkJoin(a); // $ExpectType Observable<[number, string]> -// }); - -// it('should infer correctly for array of 3 observables', () => { -// const a = [of(1, 2, 3), of('a', 'b', 'c'), of(true, true, false)]; -// const res = forkJoin(a); // $ExpectType Observable<[number, string, boolean]> -// }); - -// it('should infer correctly for array of 4 observables', () => { -// const a = [of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3)]; -// const res = forkJoin(a); // $ExpectType Observable<[number, string, number, number]> -// }); - -// it('should infer correctly for array of 5 observables', () => { -// const a = [of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3)]; -// const res = forkJoin(a); // $ExpectType Observable<[number, string, number, number, number]> -// }); - -// it('should infer correctly for array of 6 observables', () => { -// const a = [of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3)]; -// const res = forkJoin(a); // $ExpectType Observable<[number, string, number, number, number, number]> -// }); - -// it('should force user cast for array of 6+ observables', () => { -// const a = [of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3)]; -// const res = forkJoin(a); // $ExpectType Observable<{}> -// }); +describe('forkJoin([])', () => { + // TODO(benlesh): Uncomment for TS 3.0 + // it('should properly type empty arrays', () => { + // const res = forkJoin([]); // $ExpectType Observable + // }); + + it('should infer correctly for array of 1 observable', () => { + const res = forkJoin([of(1, 2, 3)]); // $ExpectType Observable<[number]> + }); + + it('should infer correctly for array of 2 observables', () => { + const res = forkJoin([of(1, 2, 3), of('a', 'b', 'c')]); // $ExpectType Observable<[number, string]> + }); + + it('should infer correctly for array of 3 observables', () => { + const res = forkJoin([of(1, 2, 3), of('a', 'b', 'c'), of(true, true, false)]); // $ExpectType Observable<[number, string, boolean]> + }); + + it('should infer correctly for array of 4 observables', () => { + const res = forkJoin([of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3)]); // $ExpectType Observable<[number, string, number, number]> + }); + + it('should infer correctly for array of 5 observables', () => { + const res = forkJoin([of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3)]); // $ExpectType Observable<[number, string, number, number, number]> + }); + + it('should infer correctly for array of 6 observables', () => { + const res = forkJoin([of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3)]); // $ExpectType Observable<[number, string, number, number, number, number]> + }); + + it('should force user cast for array of 6+ observables', () => { + const res = forkJoin([of(1, 2, 3), of('a', 'b', 'c'), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3), of(1, 2, 3)]); // $ExpectType Observable<(string | number)[]> + }); +}); diff --git a/spec/observables/forkJoin-spec.ts b/spec/observables/forkJoin-spec.ts index b7be9ba7e9..0bbb31211e 100644 --- a/spec/observables/forkJoin-spec.ts +++ b/spec/observables/forkJoin-spec.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { Observable, forkJoin, of } from 'rxjs'; import { lowerCaseO } from '../helpers/test-helper'; import { hot, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; +import { AssertionError } from 'assert'; declare const type: Function; declare const asDiagram: Function; @@ -67,284 +68,441 @@ describe('forkJoin', () => { expect(results).to.deep.equal([18, 'done']); }); - it('should join the last values of the provided observables into an array', () => { + it('should accept single observable', () => { const e1 = forkJoin( - hot('--a--b--c--d--|'), - hot('(b|)'), - hot('--1--2--3--|') + hot('--a--b--c--d--|') ); const expected = '--------------(x|)'; - expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); + expectObservable(e1).toBe(expected, {x: ['d']}); }); - it('should allow emit null or undefined', () => { - const e2 = forkJoin( - hot('--a--b--c--d--|', { d: null }), - hot('(b|)'), - hot('--1--2--3--|'), - hot('-----r--t--u--|', { u: undefined }) - ); - const expected2 = '--------------(x|)'; + describe('forkJoin([input1, input2, input3])', () => { + it('should join the last values of the provided observables into an array', () => { + const e1 = forkJoin([ + hot('--a--b--c--d--|'), + hot('(b|)'), + hot('--1--2--3--|') + ]); + const expected = '--------------(x|)'; - expectObservable(e2).toBe(expected2, {x: [null, 'b', '3', undefined]}); - }); + expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); + }); - it('should accept single observable', () => { - const e1 = forkJoin( - hot('--a--b--c--d--|') - ); - const expected = '--------------(x|)'; + it('should allow emit null or undefined', () => { + const e2 = forkJoin([ + hot('--a--b--c--d--|', { d: null }), + hot('(b|)'), + hot('--1--2--3--|'), + hot('-----r--t--u--|', { u: undefined }) + ]); + const expected2 = '--------------(x|)'; - expectObservable(e1).toBe(expected, {x: ['d']}); - }); + expectObservable(e2).toBe(expected2, {x: [null, 'b', '3', undefined]}); + }); - it('should accept array of observable contains single', () => { - const e1 = forkJoin( - [hot('--a--b--c--d--|')] - ); - const expected = '--------------(x|)'; + it('should accept array of observable contains single', () => { + const e1 = forkJoin([hot('--a--b--c--d--|')]); + const expected = '--------------(x|)'; - expectObservable(e1).toBe(expected, {x: ['d']}); - }); + expectObservable(e1).toBe(expected, {x: ['d']}); + }); - it('should accept lowercase-o observables', () => { - const e1 = forkJoin( - hot('--a--b--c--d--|'), - hot('(b|)'), - lowerCaseO('1', '2', '3') - ); - const expected = '--------------(x|)'; + it('should accept lowercase-o observables', () => { + const e1 = forkJoin([ + hot('--a--b--c--d--|'), + hot('(b|)'), + lowerCaseO('1', '2', '3') + ]); + const expected = '--------------(x|)'; - expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); - }); + expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); + }); - it('should accept empty lowercase-o observables', () => { - const e1 = forkJoin( - hot('--a--b--c--d--|'), - hot('(b|)'), - lowerCaseO() - ); - const expected = '|'; + it('should accept empty lowercase-o observables', () => { + const e1 = forkJoin([ + hot('--a--b--c--d--|'), + hot('(b|)'), + lowerCaseO() + ]); + const expected = '|'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should accept promise', (done: MochaDone) => { - const e1 = forkJoin( - of(1), - Promise.resolve(2) - ); + it('should accept promise', done => { + const e1 = forkJoin([ + of(1), + Promise.resolve(2) + ]); - e1.subscribe((x: number[]) => { - expect(x).to.deep.equal([1, 2]); - }, - (err: any) => { - done(new Error('should not be called')); - }, () => { - done(); + e1.subscribe({ + next: x => expect(x).to.deep.equal([1, 2]), + complete: done, + }); }); - }); - it('should accept array of observables', () => { - const e1 = forkJoin( - [hot('--a--b--c--d--|'), + it('should accept array of observables', () => { + const e1 = forkJoin([ + hot('--a--b--c--d--|'), + hot('(b|)'), + hot('--1--2--3--|')]); + const expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); + }); + + it('should not emit if any of source observable is empty', () => { + const e1 = forkJoin([ + hot('--a--b--c--d--|'), hot('(b|)'), - hot('--1--2--3--|')] - ); - const expected = '--------------(x|)'; + hot('------------------|') + ]); + const expected = '------------------|'; - expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); - }); + expectObservable(e1).toBe(expected); + }); - it('should not emit if any of source observable is empty', () => { - const e1 = forkJoin( - hot('--a--b--c--d--|'), - hot('(b|)'), - hot('------------------|') - ); - const expected = '------------------|'; + it('should complete early if any of source is empty and completes before than others', () => { + const e1 = forkJoin([ + hot('--a--b--c--d--|'), + hot('(b|)'), + hot('---------|') + ]); + const expected = '---------|'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should complete early if any of source is empty and completes before than others', () => { - const e1 = forkJoin( - hot('--a--b--c--d--|'), - hot('(b|)'), - hot('---------|') - ); - const expected = '---------|'; + it('should complete when all sources are empty', () => { + const e1 = forkJoin([ + hot('--------------|'), + hot('---------|') + ]); + const expected = '---------|'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should complete when all sources are empty', () => { - const e1 = forkJoin( - hot('--------------|'), - hot('---------|') - ); - const expected = '---------|'; + it('should not complete when only source never completes', () => { + const e1 = forkJoin([ + hot('--------------') + ]); + const expected = '-'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should not complete when only source never completes', () => { - const e1 = forkJoin( - hot('--------------') - ); - const expected = '-'; + it('should not complete when one of the sources never completes', () => { + const e1 = forkJoin([ + hot('--------------'), + hot('-a---b--c--|') + ]); + const expected = '-'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should not complete when one of the sources never completes', () => { - const e1 = forkJoin( - hot('--------------'), - hot('-a---b--c--|') - ); - const expected = '-'; + it('should complete when one of the sources never completes but other completes without values', () => { + const e1 = forkJoin([ + hot('--------------'), + hot('------|') + ]); + const expected = '------|'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should complete when one of the sources never completes but other completes without values', () => { - const e1 = forkJoin( - hot('--------------'), - hot('------|') - ); - const expected = '------|'; + it('should complete if source is not provided', () => { + const e1 = forkJoin(); + const expected = '|'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should complete if source is not provided', () => { - const e1 = forkJoin(); - const expected = '|'; + it('should complete if sources list is empty', () => { + const e1 = forkJoin([]); + const expected = '|'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should complete if sources list is empty', () => { - const e1 = forkJoin([]); - const expected = '|'; + it('should raise error when any of source raises error with empty observable', () => { + const e1 = forkJoin([ + hot('------#'), + hot('---------|')]); + const expected = '------#'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should raise error when any of source raises error with empty observable', () => { - const e1 = forkJoin( - hot('------#'), - hot('---------|')); - const expected = '------#'; + it('should raise error when any of source raises error with source that never completes', () => { + const e1 = forkJoin([ + hot('------#'), + hot('----------')]); + const expected = '------#'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should raise error when any of source raises error with source that never completes', () => { - const e1 = forkJoin( - hot('------#'), - hot('----------')); - const expected = '------#'; + it('should raise error when source raises error', () => { + const e1 = forkJoin([ + hot('------#'), + hot('---a-----|')]); + const expected = '------#'; - expectObservable(e1).toBe(expected); - }); + expectObservable(e1).toBe(expected); + }); - it('should raise error when source raises error', () => { - const e1 = forkJoin( - hot('------#'), - hot('---a-----|')); - const expected = '------#'; + it('should allow unsubscribing early and explicitly', () => { + const e1 = hot('--a--^--b--c---d-| '); + const e1subs = '^ ! '; + const e2 = hot('---e-^---f--g---h-|'); + const e2subs = '^ ! '; + const expected = '---------- '; + const unsub = ' ! '; - expectObservable(e1).toBe(expected); - }); + const result = forkJoin([e1, e2]); - it('should allow unsubscribing early and explicitly', () => { - const e1 = hot('--a--^--b--c---d-| '); - const e1subs = '^ ! '; - const e2 = hot('---e-^---f--g---h-|'); - const e2subs = '^ ! '; - const expected = '---------- '; - const unsub = ' ! '; + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); - const result = forkJoin(e1, e2); + it('should unsubscribe other Observables, when one of them errors', () => { + const e1 = hot('--a--^--b--c---d-| '); + const e1subs = '^ ! '; + const e2 = hot('---e-^---f--g-#'); + const e2subs = '^ ! '; + const expected = '---------# '; - expectObservable(result, unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - expectSubscriptions(e2.subscriptions).toBe(e2subs); + const result = forkJoin([e1, e2]); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); }); - it('should unsubscribe other Observables, when one of them errors', () => { - const e1 = hot('--a--^--b--c---d-| '); - const e1subs = '^ ! '; - const e2 = hot('---e-^---f--g-#'); - const e2subs = '^ ! '; - const expected = '---------# '; + describe('forkJoin({ foo, bar, baz })', () => { + it('should join the last values of the provided observables into an array', () => { + const e1 = forkJoin({ + foo: hot('--a--b--c--d--|'), + bar: hot('(b|)'), + baz: hot('--1--2--3--|') + }); + const expected = '--------------(x|)'; - const result = forkJoin(e1, e2); + expectObservable(e1).toBe(expected, {x: { foo: 'd', bar: 'b', baz: '3' } }); + }); - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - expectSubscriptions(e2.subscriptions).toBe(e2subs); - }); + it('should allow emit null or undefined', () => { + const e2 = forkJoin({ + foo: hot('--a--b--c--d--|', { d: null }), + bar: hot('(b|)'), + baz: hot('--1--2--3--|'), + qux: hot('-----r--t--u--|', { u: undefined }) + }); + const expected2 = '--------------(x|)'; - type('should support promises', () => { - /* tslint:disable:no-unused-variable */ - let a: Promise; - let b: Promise; - let c: Promise; - let o1: Observable<[number, string, boolean]> = forkJoin(a, b, c); - let o2: Observable = forkJoin(a, b, c, (aa: number, bb: string, cc: boolean) => !!aa && !!bb && cc); - /* tslint:enable:no-unused-variable */ - }); + expectObservable(e2).toBe(expected2, {x: { foo: null, bar: 'b', baz: '3', qux: undefined } }); + }); - type('should support observables', () => { - /* tslint:disable:no-unused-variable */ - let a: Observable; - let b: Observable; - let c: Observable; - let o1: Observable<[number, string, boolean]> = forkJoin(a, b, c); - let o2: Observable = forkJoin(a, b, c, (aa: number, bb: string, cc: boolean) => !!aa && !!bb && cc); - /* tslint:enable:no-unused-variable */ - }); + it('should accept array of observable contains single', () => { + const e1 = forkJoin({ + foo: hot('--a--b--c--d--|') + }); + const expected = '--------------(x|)'; - type('should support mixed observables and promises', () => { - /* tslint:disable:no-unused-variable */ - let a: Promise; - let b: Observable; - let c: Promise; - let d: Observable; - let o1: Observable<[number, string, boolean, string[]]> = forkJoin(a, b, c, d); - /* tslint:enable:no-unused-variable */ - }); + expectObservable(e1).toBe(expected, {x: { foo: 'd' }}); + }); - type('should support arrays of promises', () => { - /* tslint:disable:no-unused-variable */ - let a: Promise[]; - let o1: Observable = forkJoin(a); - let o2: Observable = forkJoin(...a); - /* tslint:enable:no-unused-variable */ - }); + it('should accept lowercase-o observables', () => { + const e1 = forkJoin({ + foo: hot('--a--b--c--d--|'), + bar: hot('(b|)'), + baz: lowerCaseO('1', '2', '3') + }); + const expected = '--------------(x|)'; - type('should support arrays of observables', () => { - /* tslint:disable:no-unused-variable */ - let a: Observable[]; - let o1: Observable = forkJoin(a); - let o2: Observable = forkJoin(...a); - /* tslint:enable:no-unused-variable */ - }); + expectObservable(e1).toBe(expected, {x: { foo: 'd', bar: 'b', baz: '3' }}); + }); - type('should return Array when given a single promise', () => { - /* tslint:disable:no-unused-variable */ - let a: Promise; - let o1: Observable = forkJoin(a); - /* tslint:enable:no-unused-variable */ - }); + it('should accept empty lowercase-o observables', () => { + const e1 = forkJoin({ + foo: hot('--a--b--c--d--|'), + bar: hot('(b|)'), + baz: lowerCaseO() + }); + const expected = '|'; + + expectObservable(e1).toBe(expected); + }); + + it('should accept promise', done => { + const e1 = forkJoin({ + foo: of(1), + bar: Promise.resolve(2) + }); + + e1.subscribe({ + next: x => expect(x).to.deep.equal({ foo: 1, bar: 2 }), + complete: done, + }); + }); + + it('should accept array of observables', () => { + const e1 = forkJoin({ + foo: hot('--a--b--c--d--|'), + bar: hot('(b|)'), + baz: hot('--1--2--3--|') + }); + const expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: { foo: 'd', bar: 'b', baz: '3' }}); + }); + + it('should not emit if any of source observable is empty', () => { + const e1 = forkJoin({ + foo: hot('--a--b--c--d--|'), + bar: hot('(b|)'), + baz: hot('------------------|') + }); + const expected = '------------------|'; + + expectObservable(e1).toBe(expected); + }); + + it('should complete early if any of source is empty and completes before than others', () => { + const e1 = forkJoin({ + foo: hot('--a--b--c--d--|'), + bar: hot('(b|)'), + baz: hot('---------|') + }); + const expected = '---------|'; - type('should return Array when given a single observable', () => { - /* tslint:disable:no-unused-variable */ - let a: Observable; - let o1: Observable = forkJoin(a); - /* tslint:enable:no-unused-variable */ + expectObservable(e1).toBe(expected); + }); + + it('should complete when all sources are empty', () => { + const e1 = forkJoin({ + foo: hot('--------------|'), + bar: hot('---------|') + }); + const expected = '---------|'; + + expectObservable(e1).toBe(expected); + }); + + it('should not complete when only source never completes', () => { + const e1 = forkJoin({ + foo: hot('--------------') + }); + const expected = '-'; + + expectObservable(e1).toBe(expected); + }); + + it('should not complete when one of the sources never completes', () => { + const e1 = forkJoin({ + foo: hot('--------------'), + bar: hot('-a---b--c--|') + }); + const expected = '-'; + + expectObservable(e1).toBe(expected); + }); + + it('should complete when one of the sources never completes but other completes without values', () => { + const e1 = forkJoin({ + foo: hot('--------------'), + bar: hot('------|') + }); + const expected = '------|'; + + expectObservable(e1).toBe(expected); + }); + + // TODO(benlesh): this is the wrong behavior, it should probably throw right away. + it('should have same v5/v6 throwing behavior full argument of null', (done) => { + // It doesn't throw when you pass null + expect(() => forkJoin(null)).not.to.throw(); + + // It doesn't even throw if you subscribe to forkJoin(null). + expect(() => forkJoin(null).subscribe({ + // It sends the error to the subscription. + error: err => done(), + })).not.to.throw(); + }); + + it('should complete if sources object is empty', () => { + const e1 = forkJoin({}); + const expected = '|'; + + expectObservable(e1).toBe(expected); + }); + + it('should raise error when any of source raises error with empty observable', () => { + const e1 = forkJoin({ + lol: hot('------#'), + wut: hot('---------|') + }); + const expected = '------#'; + + expectObservable(e1).toBe(expected); + }); + + it('should raise error when any of source raises error with source that never completes', () => { + const e1 = forkJoin({ + lol: hot('------#'), + wut: hot('----------') + }); + const expected = '------#'; + + expectObservable(e1).toBe(expected); + }); + + it('should raise error when source raises error', () => { + const e1 = forkJoin({ + lol: hot('------#'), + foo: hot('---a-----|') + }); + const expected = '------#'; + + expectObservable(e1).toBe(expected); + }); + + it('should allow unsubscribing early and explicitly', () => { + const e1 = hot('--a--^--b--c---d-| '); + const e1subs = '^ ! '; + const e2 = hot('---e-^---f--g---h-|'); + const e2subs = '^ ! '; + const expected = '---------- '; + const unsub = ' ! '; + + const result = forkJoin({ + e1, e2 + }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should unsubscribe other Observables, when one of them errors', () => { + const e1 = hot('--a--^--b--c---d-| '); + const e1subs = '^ ! '; + const e2 = hot('---e-^---f--g-#'); + const e2subs = '^ ! '; + const expected = '---------# '; + + const result = forkJoin({ + e1, e2 + }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); }); }); diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index cc4f24a1c2..a7a8d3453a 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -1,131 +1,130 @@ import { Observable } from '../Observable'; -import { ObservableInput } from '../types'; +import { ObservableInput, ObservedValuesFromArray, ObservedValueOf, SubscribableOrPromise } from '../types'; import { isArray } from '../util/isArray'; -import { EMPTY } from './empty'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { Subscriber } from '../Subscriber'; import { map } from '../operators/map'; +import { isObject } from '../util/isObject'; +import { isObservable } from '../util/isObservable'; +import { from } from './from'; /* tslint:disable:max-line-length */ -// forkJoin([a$, b$, c$]); -export function forkJoin(sources: [ObservableInput]): Observable; -export function forkJoin(sources: [ObservableInput, ObservableInput]): Observable<[T, T2]>; -export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput]): Observable<[T, T2, T3]>; -export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable<[T, T2, T3, T4]>; -export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable<[T, T2, T3, T4, T5]>; -export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable<[T, T2, T3, T4, T5, T6]>; -export function forkJoin(sources: Array>): Observable; // forkJoin(a$, b$, c$) -export function forkJoin(v1: ObservableInput): Observable; +/** @deprecated Use the version that takes an array of Observables instead */ +export function forkJoin(v1: SubscribableOrPromise): Observable<[T]>; +/** @deprecated Use the version that takes an array of Observables instead */ export function forkJoin(v1: ObservableInput, v2: ObservableInput): Observable<[T, T2]>; +/** @deprecated Use the version that takes an array of Observables instead */ export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput): Observable<[T, T2, T3]>; +/** @deprecated Use the version that takes an array of Observables instead */ export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput): Observable<[T, T2, T3, T4]>; +/** @deprecated Use the version that takes an array of Observables instead */ export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput): Observable<[T, T2, T3, T4, T5]>; +/** @deprecated Use the version that takes an array of Observables instead */ export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): Observable<[T, T2, T3, T4, T5, T6]>; +// forkJoin([a$, b$, c$]); +// TODO(benlesh): Uncomment for TS 3.0 +// export function forkJoin(sources: []): Observable; +export function forkJoin(sources: [ObservableInput]): Observable<[A]>; +export function forkJoin(sources: [ObservableInput, ObservableInput]): Observable<[A, B]>; +export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput]): Observable<[A, B, C]>; +export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable<[A, B, C, D]>; +export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable<[A, B, C, D, E]>; +export function forkJoin(sources: [ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput, ObservableInput]): Observable<[A, B, C, D, E, F]>; +export function forkJoin[]>(sources: A): Observable[]>; + +// forkJoin({}) +export function forkJoin(sourcesObject: {}): Observable; +export function forkJoin(sourcesObject: T): Observable<{ [K in keyof T]: ObservedValueOf }>; + /** @deprecated resultSelector is deprecated, pipe to map instead */ export function forkJoin(...args: Array|Function>): Observable; +/** @deprecated Use the version that takes an array of Observables instead */ export function forkJoin(...sources: ObservableInput[]): Observable; /* tslint:enable:max-line-length */ /** - * Joins last values emitted by passed Observables. + * Accepts an `Array` of {@link ObservableInput} or a dictionary `Object` of {@link ObservableInput} and returns + * an {@link Observable} that emits either an array of values in the exact same order as the passed array, + * or a dictionary of values in the same shape as the passed dictionary. * * Wait for Observables to complete and then combine last values they emitted. * * ![](forkJoin.png) * - * `forkJoin` is an operator that takes any number of Observables which can be passed either as an array - * or directly as arguments. If no input Observables are provided, resulting stream will complete + * `forkJoin` is an operator that takes any number of input observables which can be passed either as an array + * or a dictionary of input observables. If no input observables are provided, resulting stream will complete * immediately. * - * `forkJoin` will wait for all passed Observables to complete and then it will emit an array with last - * values from corresponding Observables. So if you pass `n` Observables to the operator, resulting - * array will have `n` values, where first value is the last thing emitted by the first Observable, - * second value is the last thing emitted by the second Observable and so on. That means `forkJoin` will - * not emit more than once and it will complete after that. If you need to emit combined values not only - * at the end of lifecycle of passed Observables, but also throughout it, try out {@link combineLatest} + * `forkJoin` will wait for all passed observables to complete and then it will emit an array or an object with last + * values from corresponding observables. + * + * If you pass an array of `n` observables to the operator, resulting + * array will have `n` values, where first value is the last thing emitted by the first observable, + * second value is the last thing emitted by the second observable and so on. + * + * If you pass a dictionary of observables to the operator, resulting + * objects will have the same keys as the dictionary passed, with their last values they've emitted + * located at the corresponding key. + * + * That means `forkJoin` will not emit more than once and it will complete after that. If you need to emit combined + * values not only at the end of lifecycle of passed observables, but also throughout it, try out {@link combineLatest} * or {@link zip} instead. * - * In order for resulting array to have the same length as the number of input Observables, whenever any of - * that Observables completes without emitting any value, `forkJoin` will complete at that moment as well - * and it will not emit anything either, even if it already has some last values from other Observables. - * Conversely, if there is an Observable that never completes, `forkJoin` will never complete as well, - * unless at any point some other Observable completes without emitting value, which brings us back to - * the previous case. Overall, in order for `forkJoin` to emit a value, all Observables passed as arguments + * In order for resulting array to have the same length as the number of input observables, whenever any of + * that observables completes without emitting any value, `forkJoin` will complete at that moment as well + * and it will not emit anything either, even if it already has some last values from other observables. + * Conversely, if there is an observable that never completes, `forkJoin` will never complete as well, + * unless at any point some other observable completes without emitting value, which brings us back to + * the previous case. Overall, in order for `forkJoin` to emit a value, all observables passed as arguments * have to emit something at least once and complete. * - * If any input Observable errors at some point, `forkJoin` will error as well and all other Observables + * If any input observable errors at some point, `forkJoin` will error as well and all other observables * will be immediately unsubscribed. * * Optionally `forkJoin` accepts project function, that will be called with values which normally * would land in emitted array. Whatever is returned by project function, will appear in output - * Observable instead. This means that default project can be thought of as a function that takes + * observable instead. This means that default project can be thought of as a function that takes * all its arguments and puts them into an array. Note that project function will be called only - * when output Observable is supposed to emit a result. + * when output observable is supposed to emit a result. * * ## Examples - * ### Use forkJoin with operator emitting immediately - * ```javascript - * import { forkJoin, of } from 'rxjs'; - * - * const observable = forkJoin( - * of(1, 2, 3, 4), - * of(5, 6, 7, 8), - * ); - * observable.subscribe( - * value => console.log(value), - * err => {}, - * () => console.log('This is how it ends!'), - * ); * - * // Logs: - * // [4, 8] - * // "This is how it ends!" - * ``` + * ### Use forkJoin with a dictionary of observable inputs + * ```ts + * import { forkJoin, of, timer } from 'rxjs'; * - * ### Use forkJoin with operator emitting after some time - * ```javascript - * import { forkJoin, interval } from 'rxjs'; - * import { take } from 'rxjs/operators'; - * - * const observable = forkJoin( - * interval(1000).pipe(take(3)), // emit 0, 1, 2 every second and complete - * interval(500).pipe(take(4)), // emit 0, 1, 2, 3 every half a second and complete - * ); - * observable.subscribe( - * value => console.log(value), - * err => {}, - * () => console.log('This is how it ends!'), - * ); + * const observable = forkJoin({ + * foo: of(1, 2, 3, 4), + * bar: Promise.resolve(8), + * baz: timer(4000), + * }); + * observable.subscribe({ + * next: value => console.log(value), + * complete: () => console.log('This is how it ends!'), + * }); * * // Logs: - * // [2, 3] after 3 seconds + * // { foo: 4, bar: 8, baz: 0 } after 4 seconds * // "This is how it ends!" immediately after * ``` * - * ### Use forkJoin with project function - * ```javascript - * import { forkJoin, interval } from 'rxjs'; - * import { take } from 'rxjs/operators'; - * - * const observable = forkJoin( - * interval(1000).pipe(take(3)), // emit 0, 1, 2 every second and complete - * interval(500).pipe(take(4)), // emit 0, 1, 2, 3 every half a second and complete - * ).pipe( - * map(([n, m]) => n + m), - * ); - * observable.subscribe( - * value => console.log(value), - * err => {}, - * () => console.log('This is how it ends!'), - * ); + * ### Use forkJoin with an array of observable inputs + * ```ts + * import { forkJoin, of } from 'rxjs'; + * + * const observable = forkJoin([ + * of(1, 2, 3, 4), + * Promise.resolve(8), + * timer(4000), + * ]); + * observable.subscribe({ + * next: value => console.log(value), + * complete: () => console.log('This is how it ends!'), + * }); * * // Logs: - * // 5 after 3 seconds + * // [4, 8, 0] after 4 seconds * // "This is how it ends!" immediately after * ``` * @@ -139,93 +138,67 @@ export function forkJoin(...sources: ObservableInput[]): Observable; * @return {Observable} Observable emitting either an array of last values emitted by passed Observables * or value from project function. */ -export function forkJoin( - ...sources: Array | ObservableInput[] | Function> -): Observable { - - let resultSelector: Function; - if (typeof sources[sources.length - 1] === 'function') { - // DEPRECATED PATH - resultSelector = sources.pop() as Function; - } - - // if the first and only other argument is an array - // assume it's been called with `forkJoin([obs1, obs2, obs3])` - if (sources.length === 1 && isArray(sources[0])) { - sources = sources[0] as Array>; - } - - if (sources.length === 0) { - return EMPTY; +export function forkJoin( + ...sources: any[] +): Observable { + if (sources.length === 1) { + const first = sources[0]; + if (isArray(first)) { + return forkJoinInternal(first, null); + } + // TODO(benlesh): isObservable check will not be necessary when deprecated path is removed. + if (isObject(first) && !isObservable(first)) { + const keys = Object.keys(first); + return forkJoinInternal(keys.map(key => first[key]), keys); + } } - if (resultSelector) { - // DEPRECATED PATH - return forkJoin(sources).pipe( - map(args => resultSelector(...args)) + // DEPRECATED PATHS BELOW HERE + if (typeof sources[sources.length - 1] === 'function') { + const resultSelector = sources.pop() as Function; + sources = (sources.length === 1 && isArray(sources[0])) ? sources[0] : sources; + return forkJoinInternal(sources, null).pipe( + map((args: any[]) => resultSelector(...args)) ); } - return new Observable(subscriber => { - return new ForkJoinSubscriber(subscriber, sources as Array>); - }); + return forkJoinInternal(sources, null); } -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class ForkJoinSubscriber extends OuterSubscriber { - private completed = 0; - private values: T[]; - private haveValues = 0; - - constructor(destination: Subscriber, - private sources: Array>) { - super(destination); +function forkJoinInternal(sources: ObservableInput[], keys: string[] | null): Observable { + return new Observable(subscriber => { const len = sources.length; - this.values = new Array(len); - - for (let i = 0; i < len; i++) { - const source = sources[i]; - const innerSubscription = subscribeToResult(this, source, null, i); - - if (innerSubscription) { - this.add(innerSubscription); - } - } - } - - notifyNext(outerValue: any, innerValue: T, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.values[outerIndex] = innerValue; - if (!(innerSub as any)._hasValue) { - (innerSub as any)._hasValue = true; - this.haveValues++; - } - } - - notifyComplete(innerSub: InnerSubscriber): void { - const { destination, haveValues, values } = this; - const len = values.length; - - if (!(innerSub as any)._hasValue) { - destination.complete(); - return; - } - - this.completed++; - - if (this.completed !== len) { + if (len === 0) { + subscriber.complete(); return; } - - if (haveValues === len) { - destination.next(values); + const values = new Array(len); + let completed = 0; + let emitted = 0; + for (let i = 0; i < len; i++) { + const source = from(sources[i]); + let hasValue = false; + subscriber.add(source.subscribe({ + next: value => { + if (!hasValue) { + hasValue = true; + emitted++; + } + values[i] = value; + }, + error: err => subscriber.error(err), + complete: () => { + completed++; + if (completed === len || !hasValue) { + if (emitted === len) { + subscriber.next(keys ? + keys.reduce((result, key, i) => (result[key] = values[i], result), {}) : + values); + } + subscriber.complete(); + } + } + })); } - - destination.complete(); - } + }); } diff --git a/src/internal/types.ts b/src/internal/types.ts index 9e2d8d0912..e532dc809c 100644 --- a/src/internal/types.ts +++ b/src/internal/types.ts @@ -99,3 +99,5 @@ export interface SchedulerAction extends Subscription { } export type ObservedValueOf = O extends ObservableInput ? T : never; + +export type ObservedValuesFromArray = X extends Array> ? T : never;