diff --git a/spec/operators/audit-spec.ts b/spec/operators/audit-spec.ts index 641a41bf50..1e4abede3a 100644 --- a/spec/operators/audit-spec.ts +++ b/spec/operators/audit-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { TestScheduler } from 'rxjs/testing'; -import { of, interval, EMPTY } from 'rxjs'; +import { of, interval, EMPTY, Observable } from 'rxjs'; import { audit, take, mergeMap } from 'rxjs/operators'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -436,4 +436,23 @@ describe('audit operator', () => { } ); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + audit(() => of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/bufferCount-spec.ts b/spec/operators/bufferCount-spec.ts index 6cac72cb44..a52918701a 100644 --- a/spec/operators/bufferCount-spec.ts +++ b/spec/operators/bufferCount-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { Subject, of } from 'rxjs'; -import { bufferCount, mergeMap } from 'rxjs/operators'; +import { Subject, of, Observable } from 'rxjs'; +import { bufferCount, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -163,4 +163,23 @@ describe('bufferCount operator', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + bufferCount(1), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/bufferWhen-spec.ts b/spec/operators/bufferWhen-spec.ts index 4c98e89dd1..0799ba2834 100644 --- a/spec/operators/bufferWhen-spec.ts +++ b/spec/operators/bufferWhen-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { of, EMPTY } from 'rxjs'; -import { bufferWhen, mergeMap, takeWhile } from 'rxjs/operators'; +import { of, EMPTY, Observable } from 'rxjs'; +import { bufferWhen, mergeMap, takeWhile, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; diff --git a/spec/operators/catch-spec.ts b/spec/operators/catchError-spec.ts similarity index 95% rename from spec/operators/catch-spec.ts rename to spec/operators/catchError-spec.ts index 379509af08..e9333187c3 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catchError-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs'; -import { catchError, map, mergeMap, takeWhile, delay } from 'rxjs/operators'; +import { catchError, map, mergeMap, takeWhile, delay, take } from 'rxjs/operators'; import * as sinon from 'sinon'; import { createObservableInputs } from '../helpers/test-helper'; import { TestScheduler } from 'rxjs/testing'; @@ -457,4 +457,24 @@ describe('catchError operator', () => { ); }); + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + catchError(() => EMPTY), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); + }); diff --git a/spec/operators/concat-spec.ts b/spec/operators/concat-legacy-spec.ts similarity index 100% rename from spec/operators/concat-spec.ts rename to spec/operators/concat-legacy-spec.ts diff --git a/spec/operators/concatAll-spec.ts b/spec/operators/concatAll-spec.ts index dd242a94ef..d4ce61dd03 100644 --- a/spec/operators/concatAll-spec.ts +++ b/spec/operators/concatAll-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { from, throwError, of } from 'rxjs'; +import { from, throwError, of, Observable } from 'rxjs'; import { concatAll, take, mergeMap } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -520,4 +520,23 @@ describe('concatAll operator', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + of(synchronousObservable).pipe( + concatAll(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/concatMap-spec.ts b/spec/operators/concatMap-spec.ts index 1610d969fb..8fa3450604 100644 --- a/spec/operators/concatMap-spec.ts +++ b/spec/operators/concatMap-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { of, from, Observable } from 'rxjs'; -import { concatMap, mergeMap, map } from 'rxjs/operators'; +import { concatMap, mergeMap, map, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -808,4 +808,23 @@ describe('Observable.prototype.concatMap', () => { } ); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + concatMap(value => of(value)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/concatMapTo-spec.ts b/spec/operators/concatMapTo-spec.ts index e902903ab5..8d54851e89 100644 --- a/spec/operators/concatMapTo-spec.ts +++ b/spec/operators/concatMapTo-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of, from } from 'rxjs'; -import { concatMapTo, mergeMap } from 'rxjs/operators'; +import { of, from, Observable } from 'rxjs'; +import { concatMapTo, mergeMap, take } from 'rxjs/operators'; /** @test {concatMapTo} */ describe('Observable.prototype.concatMapTo', () => { @@ -356,4 +356,23 @@ describe('Observable.prototype.concatMapTo', () => { done(new Error('Subscriber complete handler not supposed to be called.')); }); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + concatMapTo(of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/concatWith-spec.ts b/spec/operators/concatWith-spec.ts index 7648065f9d..7dcd9762c8 100644 --- a/spec/operators/concatWith-spec.ts +++ b/spec/operators/concatWith-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { of, Observable } from 'rxjs'; -import { concatWith, mergeMap } from 'rxjs/operators'; +import { concatWith, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { assertDeepEquals, NO_SUBS } from '../helpers/test-helper'; @@ -338,4 +338,23 @@ describe('concat operator', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + concatWith(of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/debounce-spec.ts b/spec/operators/debounce-spec.ts index 57c09c3703..a8af7c209e 100644 --- a/spec/operators/debounce-spec.ts +++ b/spec/operators/debounce-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { NEVER, timer, of, EMPTY, concat, Subject } from 'rxjs'; -import { debounce, mergeMap, mapTo } from 'rxjs/operators'; +import { NEVER, timer, of, EMPTY, concat, Subject, Observable } from 'rxjs'; +import { debounce, mergeMap, mapTo, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; @@ -431,4 +431,23 @@ describe('debounce operator', () => { expect(results).to.deep.equal([1, 2]); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + debounce(() => of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/defaultIfEmpty-spec.ts b/spec/operators/defaultIfEmpty-spec.ts index 270455eb8f..28737b376b 100644 --- a/spec/operators/defaultIfEmpty-spec.ts +++ b/spec/operators/defaultIfEmpty-spec.ts @@ -1,6 +1,7 @@ +import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of } from 'rxjs'; -import { defaultIfEmpty, mergeMap } from 'rxjs/operators'; +import { of, Observable } from 'rxjs'; +import { defaultIfEmpty, mergeMap, take } from 'rxjs/operators'; /** @test {defaultIfEmpty} */ describe('Observable.prototype.defaultIfEmpty', () => { @@ -84,4 +85,23 @@ describe('Observable.prototype.defaultIfEmpty', () => { expectObservable(e1.pipe(defaultIfEmpty('x'))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + defaultIfEmpty(0), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/dematerialize-spec.ts b/spec/operators/dematerialize-spec.ts index 54b26c09bd..f399865d12 100644 --- a/spec/operators/dematerialize-spec.ts +++ b/spec/operators/dematerialize-spec.ts @@ -1,5 +1,6 @@ -import { of, Notification, ObservableNotification } from 'rxjs'; -import { dematerialize, map, mergeMap, materialize } from 'rxjs/operators'; +import { expect } from 'chai'; +import { of, Notification, ObservableNotification, Observable } from 'rxjs'; +import { dematerialize, map, mergeMap, materialize, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; const NO_VALUES: { [key: string]: ObservableNotification } = {}; @@ -174,4 +175,24 @@ describe('dematerialize operator', () => { ); expectObservable(result).toBe(expected); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + materialize(), + dematerialize(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/distinct-spec.ts b/spec/operators/distinct-spec.ts index 79390926ae..7d10c27980 100644 --- a/spec/operators/distinct-spec.ts +++ b/spec/operators/distinct-spec.ts @@ -1,6 +1,7 @@ -import { distinct, mergeMap } from 'rxjs/operators'; +import { expect } from 'chai'; +import { distinct, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; /** @test {distinct} */ describe('distinct operator', () => { @@ -212,4 +213,23 @@ describe('distinct operator', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + distinct(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/distinctUntilChanged-spec.ts b/spec/operators/distinctUntilChanged-spec.ts index 4fd32549e7..4cbc7b12c1 100644 --- a/spec/operators/distinctUntilChanged-spec.ts +++ b/spec/operators/distinctUntilChanged-spec.ts @@ -1,5 +1,6 @@ -import { distinctUntilChanged, mergeMap } from 'rxjs/operators'; -import { of } from 'rxjs'; +import { expect } from 'chai'; +import { distinctUntilChanged, mergeMap, take } from 'rxjs/operators'; +import { of, Observable } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; /** @test {distinctUntilChanged} */ @@ -213,4 +214,23 @@ describe('distinctUntilChanged operator', () => { expectObservable(e1.pipe(distinctUntilChanged(null as any, keySelector))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + distinctUntilChanged(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/distinctUntilKeyChanged-spec.ts b/spec/operators/distinctUntilKeyChanged-spec.ts index af99926438..bba890a44c 100644 --- a/spec/operators/distinctUntilKeyChanged-spec.ts +++ b/spec/operators/distinctUntilKeyChanged-spec.ts @@ -1,6 +1,7 @@ -import { distinctUntilKeyChanged, mergeMap } from 'rxjs/operators'; +import { expect } from 'chai'; +import { distinctUntilKeyChanged, mergeMap, map, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; /** @test {distinctUntilKeyChanged} */ describe('distinctUntilKeyChanged operator', () => { @@ -223,4 +224,24 @@ describe('distinctUntilKeyChanged operator', () => { expectObservable(e1.pipe(distinctUntilKeyChanged('val', selector))).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + map(value => ({ value })), + distinctUntilKeyChanged('value'), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/elementAt-spec.ts b/spec/operators/elementAt-spec.ts index aa85e85b07..18e4993cdf 100644 --- a/spec/operators/elementAt-spec.ts +++ b/spec/operators/elementAt-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { elementAt, mergeMap } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { ArgumentOutOfRangeError, of, range } from 'rxjs'; +import { ArgumentOutOfRangeError, of, range, Observable } from 'rxjs'; /** @test {elementAt} */ describe('elementAt operator', () => { @@ -120,4 +120,22 @@ describe('elementAt operator', () => { expectObservable(source.pipe(elementAt(3, defaultValue))).toBe(expected, { x: defaultValue }); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits, it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + elementAt(2), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/endWith-spec.ts b/spec/operators/endWith-spec.ts index 325b0f579e..119ad4e67b 100644 --- a/spec/operators/endWith-spec.ts +++ b/spec/operators/endWith-spec.ts @@ -1,5 +1,6 @@ -import { of } from 'rxjs'; -import { endWith, mergeMap } from 'rxjs/operators'; +import { expect } from 'chai'; +import { of, Observable } from 'rxjs'; +import { endWith, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; @@ -163,4 +164,23 @@ describe('endWith operator', () => { expectObservable(e1.pipe(endWith('y', 'z', rxTestScheduler))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + endWith(0), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/exhaust-spec.ts b/spec/operators/exhaust-spec.ts index b1dc62d82e..0347962640 100644 --- a/spec/operators/exhaust-spec.ts +++ b/spec/operators/exhaust-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; -import { exhaust, mergeMap } from 'rxjs/operators'; +import { exhaust, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; /** @test {exhaust} */ describe('exhaust operator', () => { @@ -212,4 +212,24 @@ describe('exhaust operator', () => { done(new Error('should not be called')); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + of(synchronousObservable).pipe( + exhaust(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/exhaustMap-spec.ts b/spec/operators/exhaustMap-spec.ts index 75eb5d18af..5a878801fe 100644 --- a/spec/operators/exhaustMap-spec.ts +++ b/spec/operators/exhaustMap-spec.ts @@ -1,6 +1,6 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { concat, defer, Observable, of } from 'rxjs'; -import { exhaustMap, mergeMap, takeWhile, map } from 'rxjs/operators'; +import { exhaustMap, mergeMap, takeWhile, map, take } from 'rxjs/operators'; import { expect } from 'chai'; import { asInteropObservable } from '../helpers/interop-helper'; @@ -431,4 +431,24 @@ describe('exhaustMap', () => { expectSubscriptions(x.subscriptions).toBe(xsubs); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + exhaustMap(value => of(value)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/expand-spec.ts b/spec/operators/expand-spec.ts index 330cd3f763..5086f7cd84 100644 --- a/spec/operators/expand-spec.ts +++ b/spec/operators/expand-spec.ts @@ -436,4 +436,23 @@ describe('expand operator', () => { done, done ); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + expand(() => EMPTY), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/filter-spec.ts b/spec/operators/filter-spec.ts index ba4658dc07..4b5e129269 100644 --- a/spec/operators/filter-spec.ts +++ b/spec/operators/filter-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { filter, tap, map, mergeMap } from 'rxjs/operators'; +import { filter, tap, map, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { of, Observable, from } from 'rxjs'; @@ -337,4 +337,23 @@ describe('filter operator', () => { expectObservable(source.pipe(filter(Boolean))).toBe(expected, { t: 1, f: 0 }); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + filter(() => true), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/finalize-spec.ts b/spec/operators/finalize-spec.ts index 2c463226c8..f1cfcafeb8 100644 --- a/spec/operators/finalize-spec.ts +++ b/spec/operators/finalize-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { finalize, map, share } from 'rxjs/operators'; +import { finalize, map, share, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { of, timer, interval, NEVER, Observable } from 'rxjs'; @@ -203,4 +203,23 @@ describe('finalize operator', () => { ).subscribe(); expect(order).to.deep.equal(['teardown', 'finalize']); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + finalize(() => { /* noop */}), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/find-spec.ts b/spec/operators/find-spec.ts index bae2b7b8b9..4108933a07 100644 --- a/spec/operators/find-spec.ts +++ b/spec/operators/find-spec.ts @@ -223,4 +223,22 @@ describe('find operator', () => { // tslint:disable enable }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits, it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + find(value => value === 2), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/findIndex-spec.ts b/spec/operators/findIndex-spec.ts index 36b6d00abd..5d9b423f35 100644 --- a/spec/operators/findIndex-spec.ts +++ b/spec/operators/findIndex-spec.ts @@ -1,7 +1,8 @@ -import { findIndex, mergeMap, delay } from 'rxjs/operators'; +import { expect } from 'chai'; +import { findIndex, mergeMap, delay, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; declare const rxTestScheduler: TestScheduler; @@ -165,4 +166,22 @@ describe('findIndex operator', () => { expectObservable(source.pipe(findIndex(predicate))).toBe(expected); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits, it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + findIndex(value => value === 2), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/first-spec.ts b/spec/operators/first-spec.ts index 22b027f509..00dec72a9d 100644 --- a/spec/operators/first-spec.ts +++ b/spec/operators/first-spec.ts @@ -249,4 +249,22 @@ describe('Observable.prototype.first', () => { // tslint:disable enable }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits, it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + first(value => value === 2), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/groupBy-spec.ts b/spec/operators/groupBy-spec.ts index ac27dbec93..3c98dd4721 100644 --- a/spec/operators/groupBy-spec.ts +++ b/spec/operators/groupBy-spec.ts @@ -1461,6 +1461,25 @@ describe('groupBy operator', () => { done(); }); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + groupBy(value => value), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); /** diff --git a/spec/operators/map-spec.ts b/spec/operators/map-spec.ts index b44611b3a1..691402cdc7 100644 --- a/spec/operators/map-spec.ts +++ b/spec/operators/map-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; -import { map, tap, mergeMap } from 'rxjs/operators'; +import { map, tap, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; // function shortcuts const addDrama = function (x: number | string) { return x + '!'; }; @@ -250,4 +250,23 @@ describe('map operator', () => { expectObservable(r, unsub).toBe(expected, {x: '1!', y: '2!'}); expectSubscriptions(a.subscriptions).toBe(asubs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + map(value => value), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/mapTo-spec.ts b/spec/operators/mapTo-spec.ts index 8c1d9a417d..038d7c5abe 100644 --- a/spec/operators/mapTo-spec.ts +++ b/spec/operators/mapTo-spec.ts @@ -1,7 +1,7 @@ - -import { mapTo, mergeMap } from 'rxjs/operators'; +import { expect } from 'chai'; +import { mapTo, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; /** @test {mapTo} */ describe('mapTo operator', () => { @@ -89,4 +89,23 @@ describe('mapTo operator', () => { expectObservable(r, unsub).toBe(expected); expectSubscriptions(a.subscriptions).toBe(asubs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + mapTo(0), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/materialize-spec.ts b/spec/operators/materialize-spec.ts index d41af948b1..dc1038d340 100644 --- a/spec/operators/materialize-spec.ts +++ b/spec/operators/materialize-spec.ts @@ -1,5 +1,6 @@ -import { materialize, map, mergeMap } from 'rxjs/operators'; -import { Notification, of } from 'rxjs'; +import { expect } from 'chai'; +import { materialize, map, mergeMap, take } from 'rxjs/operators'; +import { Notification, of, Observable } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; /** @test {materialize} */ @@ -135,4 +136,23 @@ describe('materialize operator', () => { expectObservable(e1.pipe(materialize())).toBe(expected, { x: Notification.createError('error') }); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + materialize(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/mergeAll-spec.ts b/spec/operators/mergeAll-spec.ts index 6be5833458..22f9f147ef 100644 --- a/spec/operators/mergeAll-spec.ts +++ b/spec/operators/mergeAll-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { mergeAll, mergeMap, take } from 'rxjs/operators'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { throwError, from, of, queueScheduler } from 'rxjs'; +import { throwError, from, of, queueScheduler, Observable } from 'rxjs'; /** @test {mergeAll} */ describe('mergeAll oeprator', () => { @@ -459,4 +459,23 @@ describe('mergeAll oeprator', () => { expect(val).to.equal(r.shift()); }, null, done); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + of(synchronousObservable).pipe( + mergeAll(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 4d36385921..bb87c1acc0 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { mergeMap, map, delay } from 'rxjs/operators'; +import { mergeMap, map, delay, take } from 'rxjs/operators'; import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { asInteropObservable } from '../helpers/interop-helper'; @@ -851,4 +851,23 @@ describe('mergeMap', () => { expectObservable(result).toBe(expected, undefined, noXError); }); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + mergeMap(value => of(value)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/mergeMapTo-spec.ts b/spec/operators/mergeMapTo-spec.ts index 3564588852..c501cf3898 100644 --- a/spec/operators/mergeMapTo-spec.ts +++ b/spec/operators/mergeMapTo-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { mergeMapTo, map } from 'rxjs/operators'; -import { from, of } from 'rxjs'; +import { mergeMapTo, map, take } from 'rxjs/operators'; +import { from, of, Observable } from 'rxjs'; /** @test {mergeMapTo} */ describe('mergeMapTo', () => { @@ -366,4 +366,23 @@ describe('mergeMapTo', () => { expect(completed).to.be.true; }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + mergeMapTo(of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/mergeScan-spec.ts b/spec/operators/mergeScan-spec.ts index a2f878cc38..53d17947d8 100644 --- a/spec/operators/mergeScan-spec.ts +++ b/spec/operators/mergeScan-spec.ts @@ -1,7 +1,7 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { TestScheduler } from 'rxjs/testing'; -import { of, defer, EMPTY, NEVER, concat, throwError } from 'rxjs'; -import { mergeScan, delay, mergeMap, takeWhile, startWith } from 'rxjs/operators'; +import { of, defer, EMPTY, NEVER, concat, throwError, Observable } from 'rxjs'; +import { mergeScan, delay, mergeMap, takeWhile, startWith, take } from 'rxjs/operators'; import { expect } from 'chai'; declare const rxTestScheduler: TestScheduler; @@ -433,4 +433,23 @@ describe('mergeScan', () => { expect(recorded).to.deep.equal([0, 1, 2, 3]); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + mergeScan((acc, value) => of(value), 0), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/mergeWith-spec.ts b/spec/operators/mergeWith-spec.ts index e1a94f4c52..267f48b38f 100644 --- a/spec/operators/mergeWith-spec.ts +++ b/spec/operators/mergeWith-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; -import { mergeWith, map, mergeAll } from 'rxjs/operators'; +import { mergeWith, map, mergeAll, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { queueScheduler, of } from 'rxjs'; +import { queueScheduler, of, Observable } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {merge} */ @@ -337,4 +337,23 @@ describe('mergeAll operator', () => { done ); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + mergeWith(of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index e6c445bf7e..1f42e3ebfc 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { multicast, tap, mergeMapTo, takeLast, mergeMap, refCount, retry, repeat, switchMap, map } from 'rxjs/operators'; +import { multicast, tap, mergeMapTo, takeLast, mergeMap, refCount, retry, repeat, switchMap, map, take } from 'rxjs/operators'; import { Subject, ReplaySubject, of, ConnectableObservable, zip, concat, Subscription, Observable, from } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { hot, cold, expectObservable, expectSubscriptions, time } from '../helpers/marble-testing'; @@ -700,4 +700,24 @@ describe('multicast operator', () => { }); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + multicast(() => new Subject(), source => source), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/observeOn-spec.ts b/spec/operators/observeOn-spec.ts index 34720e41b7..60a42d5637 100644 --- a/spec/operators/observeOn-spec.ts +++ b/spec/operators/observeOn-spec.ts @@ -1,8 +1,8 @@ -import { observeOn, mergeMap } from 'rxjs/operators'; +import { observeOn, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { expect } from 'chai'; import { hot, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { of, Observable, asapScheduler } from 'rxjs'; +import { of, Observable, asapScheduler, queueScheduler } from 'rxjs'; declare const rxTestScheduler: TestScheduler; @@ -122,4 +122,23 @@ describe('observeOn operator', () => { } ); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + observeOn(queueScheduler), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/pairwise-spec.ts b/spec/operators/pairwise-spec.ts index 2989e38b91..b68533d134 100644 --- a/spec/operators/pairwise-spec.ts +++ b/spec/operators/pairwise-spec.ts @@ -1,6 +1,6 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { pairwise, take } from 'rxjs/operators'; -import { Subject } from 'rxjs'; +import { Subject, Observable } from 'rxjs'; import { expect } from 'chai'; /** @test {pairwise} */ @@ -124,4 +124,23 @@ describe('pairwise operator', () => { expect(results).to.deep.equal([['a', 'b'], ['b', 'c'], ['c', 'c']]); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + pairwise(), + take(2), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/race-spec.ts b/spec/operators/race-legacy-spec.ts similarity index 100% rename from spec/operators/race-spec.ts rename to spec/operators/race-legacy-spec.ts diff --git a/spec/operators/raceWith-spec.ts b/spec/operators/raceWith-spec.ts index 29c8ee21ea..8aff96513d 100644 --- a/spec/operators/raceWith-spec.ts +++ b/spec/operators/raceWith-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { EMPTY, NEVER, of, timer, defer, Observable, throwError } from 'rxjs'; -import { raceWith, mergeMap, map, finalize, startWith } from 'rxjs/operators'; +import { raceWith, mergeMap, map, finalize, startWith, take } from 'rxjs/operators'; /** @test {raceWith} */ describe('raceWith operator', () => { @@ -215,4 +215,24 @@ describe('raceWith operator', () => { subscription.unsubscribe(); expect(onUnsubscribe.calledOnce).to.be.true; }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + raceWith(of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/refCount-spec.ts b/spec/operators/refCount-spec.ts index 1d70673c21..20df864ef1 100644 --- a/spec/operators/refCount-spec.ts +++ b/spec/operators/refCount-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { refCount, publish, publishReplay, first } from 'rxjs/operators'; +import { refCount, publish, publishReplay, first, multicast, take } from 'rxjs/operators'; import { NEVER, noop, Observable, Subject } from 'rxjs'; /** @test {refCount} */ @@ -114,4 +114,25 @@ describe('refCount', () => { expect(arr[0]).to.equal('the number one'); expect(arr[1]).to.equal('the number two'); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + multicast(() => new Subject()), + refCount(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/repeat-spec.ts b/spec/operators/repeat-spec.ts index 21228cf1b1..b245b35536 100644 --- a/spec/operators/repeat-spec.ts +++ b/spec/operators/repeat-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { repeat, mergeMap, map, multicast, refCount } from 'rxjs/operators'; +import { repeat, mergeMap, map, multicast, refCount, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { of, Subject, Observable } from 'rxjs'; @@ -280,4 +280,24 @@ describe('repeat operator', () => { done(); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + repeat(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/repeatWhen-spec.ts b/spec/operators/repeatWhen-spec.ts index f8af6172af..da05cc7d2c 100644 --- a/spec/operators/repeatWhen-spec.ts +++ b/spec/operators/repeatWhen-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { repeatWhen, map, mergeMap, takeUntil, takeWhile } from 'rxjs/operators'; +import { repeatWhen, map, mergeMap, takeUntil, takeWhile, take } from 'rxjs/operators'; import { of, EMPTY, Observable, Subscriber } from 'rxjs'; /** @test {repeatWhen} */ @@ -406,4 +406,24 @@ describe('repeatWhen operator', () => { expect(subscription.closed).to.be.true; expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown']) }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + repeatWhen(() => of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/retry-spec.ts b/spec/operators/retry-spec.ts index 63c4327640..29dd295783 100644 --- a/spec/operators/retry-spec.ts +++ b/spec/operators/retry-spec.ts @@ -307,4 +307,24 @@ describe('retry operator', () => { done(new Error('should not be called')); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + retry(1), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/retryWhen-spec.ts b/spec/operators/retryWhen-spec.ts index 227a595919..3c30dd4d50 100644 --- a/spec/operators/retryWhen-spec.ts +++ b/spec/operators/retryWhen-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { retryWhen, map, mergeMap, takeUntil } from 'rxjs/operators'; +import { retryWhen, map, mergeMap, takeUntil, take } from 'rxjs/operators'; import { of, EMPTY, Observable, throwError } from 'rxjs'; /** @test {retryWhen} */ @@ -350,4 +350,24 @@ describe('retryWhen operator', () => { expect(subscription.closed).to.be.true; expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'bad', 'teardown']) }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + retryWhen(() => of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/scan-spec.ts b/spec/operators/scan-spec.ts index 1a08b02816..334d109b9c 100644 --- a/spec/operators/scan-spec.ts +++ b/spec/operators/scan-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { scan, mergeMap, finalize } from 'rxjs/operators'; -import { of } from 'rxjs'; +import { scan, mergeMap, finalize, take } from 'rxjs/operators'; +import { of, Observable } from 'rxjs'; /** @test {scan} */ describe('scan operator', () => { @@ -224,4 +224,23 @@ describe('scan operator', () => { expectObservable(scanObs).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + scan((acc, value: number) => value, 0), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index 3140446461..cbb7ace6c1 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { share, retry, mergeMapTo, mergeMap, tap, repeat } from 'rxjs/operators'; +import { share, retry, mergeMapTo, mergeMap, tap, repeat, take } from 'rxjs/operators'; import { Observable, EMPTY, NEVER, of } from 'rxjs'; /** @test {share} */ @@ -308,4 +308,24 @@ describe('share operator', () => { expectObservable(e1.pipe(share())).toBe(expected); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + share(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index ac881f7cf7..be308b5784 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { shareReplay, mergeMapTo, retry } from 'rxjs/operators'; +import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { Observable, Operator, Observer, of, from, defer } from 'rxjs'; @@ -299,4 +299,24 @@ describe('shareReplay operator', () => { expectObservable(result).toBe(expected); }); + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + shareReplay(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); + }); diff --git a/spec/operators/single-spec.ts b/spec/operators/single-spec.ts index db7b720f3f..433bdc019d 100644 --- a/spec/operators/single-spec.ts +++ b/spec/operators/single-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { single, mergeMap, tap } from 'rxjs/operators'; -import { of, EmptyError, SequenceError, NotFoundError } from 'rxjs'; +import { of, EmptyError, SequenceError, NotFoundError, Observable } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { assertDeepEquals } from '../helpers/test-helper'; @@ -328,4 +328,23 @@ describe('single operator', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits, it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + single(), + ).subscribe(() => { /* noop */ }, () => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1]); + }); }); diff --git a/spec/operators/skip-spec.ts b/spec/operators/skip-spec.ts index e6109abc7a..d15eed581b 100644 --- a/spec/operators/skip-spec.ts +++ b/spec/operators/skip-spec.ts @@ -1,6 +1,7 @@ +import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { skip, mergeMap } from 'rxjs/operators'; -import { of } from 'rxjs'; +import { skip, mergeMap, take } from 'rxjs/operators'; +import { of, Observable } from 'rxjs'; /** @test {skip} */ describe('skip operator', () => { @@ -146,4 +147,23 @@ describe('skip operator', () => { expectObservable(e1.pipe(skip(3))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + skip(1), + take(2), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/skipLast-spec.ts b/spec/operators/skipLast-spec.ts index 08d5d44242..3ab6efdb9f 100644 --- a/spec/operators/skipLast-spec.ts +++ b/spec/operators/skipLast-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { skipLast, mergeMap } from 'rxjs/operators'; -import { range, ArgumentOutOfRangeError, of } from 'rxjs'; +import { skipLast, mergeMap, take } from 'rxjs/operators'; +import { range, ArgumentOutOfRangeError, of, Observable } from 'rxjs'; /** @test {takeLast} */ describe('skipLast operator', () => { @@ -152,4 +152,24 @@ describe('skipLast operator', () => { expectObservable(result, unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + skipLast(1), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/skipUntil-spec.ts b/spec/operators/skipUntil-spec.ts index 31cc85f751..ca9ca9a9c7 100644 --- a/spec/operators/skipUntil-spec.ts +++ b/spec/operators/skipUntil-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { concat, defer, of, Subject } from 'rxjs'; -import { skipUntil, mergeMap } from 'rxjs/operators'; +import { concat, defer, of, Subject, Observable } from 'rxjs'; +import { skipUntil, mergeMap, take } from 'rxjs/operators'; import { asInteropObservable } from '../helpers/interop-helper'; /** @test {skipUntil} */ @@ -290,4 +290,23 @@ describe('skipUntil', () => { of(null).pipe(skipUntil(synchronousNotifer)).subscribe(() => { /* noop */ }); expect(sideEffects).to.deep.equal([1]); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + skipUntil(of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/skipWhile-spec.ts b/spec/operators/skipWhile-spec.ts index 75153ba7ae..d8fbf496e2 100644 --- a/spec/operators/skipWhile-spec.ts +++ b/spec/operators/skipWhile-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { skipWhile, mergeMap, tap } from 'rxjs/operators'; -import { of } from 'rxjs'; +import { skipWhile, mergeMap, tap, take } from 'rxjs/operators'; +import { of, Observable } from 'rxjs'; /** @test {skipWhile} */ describe('skipWhile operator', () => { @@ -192,4 +192,23 @@ describe('skipWhile operator', () => { expectObservable(source.pipe(skipWhile(() => true))).toBe(expected); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + skipWhile(value => value < 2), + take(1), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/startWith-spec.ts b/spec/operators/startWith-spec.ts index c33be2e7f5..0ba3609fae 100644 --- a/spec/operators/startWith-spec.ts +++ b/spec/operators/startWith-spec.ts @@ -1,7 +1,8 @@ +import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { startWith, mergeMap } from 'rxjs/operators'; +import { startWith, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; declare const rxTestScheduler: TestScheduler; @@ -155,4 +156,23 @@ describe('startWith operator', () => { expectObservable(e1.pipe(startWith('y', 'z', rxTestScheduler))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + startWith(-1), + take(4), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/subscribeOn-spec.ts b/spec/operators/subscribeOn-spec.ts index 06dd8ee5f5..d064939d33 100644 --- a/spec/operators/subscribeOn-spec.ts +++ b/spec/operators/subscribeOn-spec.ts @@ -1,7 +1,8 @@ +import { expect } from 'chai'; import { hot, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { subscribeOn, mergeMap } from 'rxjs/operators'; +import { subscribeOn, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { of } from 'rxjs'; +import { of, Observable, queueScheduler } from 'rxjs'; declare const rxTestScheduler: TestScheduler; @@ -87,4 +88,23 @@ describe('subscribeOn operator', () => { expectObservable(e1.pipe(subscribeOn(rxTestScheduler, Infinity))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe([]); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + subscribeOn(queueScheduler), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/switch-spec.ts b/spec/operators/switchAll-spec.ts similarity index 93% rename from spec/operators/switch-spec.ts rename to spec/operators/switchAll-spec.ts index 5a4f98e9d2..e51b03e736 100644 --- a/spec/operators/switch-spec.ts +++ b/spec/operators/switchAll-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { Observable, of, NEVER, queueScheduler, Subject } from 'rxjs'; -import { map, switchAll, mergeMap } from 'rxjs/operators'; +import { map, switchAll, mergeMap, take } from 'rxjs/operators'; /** @test {switch} */ describe('switchAll', () => { @@ -261,4 +261,23 @@ describe('switchAll', () => { ).to.equal(2); sub.unsubscribe(); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + of(synchronousObservable).pipe( + switchAll(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/switchMap-spec.ts b/spec/operators/switchMap-spec.ts index b60c73df16..0ede016ffe 100644 --- a/spec/operators/switchMap-spec.ts +++ b/spec/operators/switchMap-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { switchMap, mergeMap, map, takeWhile } from 'rxjs/operators'; +import { switchMap, mergeMap, map, takeWhile, take } from 'rxjs/operators'; import { concat, defer, of, Observable } from 'rxjs'; import { asInteropObservable } from '../helpers/interop-helper'; @@ -441,4 +441,23 @@ describe('switchMap', () => { expectSubscriptions(x.subscriptions).toBe(xsubs); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + switchMap(value => of(value)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/switchMapTo-spec.ts b/spec/operators/switchMapTo-spec.ts index adace2fedb..ce612d3b99 100644 --- a/spec/operators/switchMapTo-spec.ts +++ b/spec/operators/switchMapTo-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { Observable, of } from 'rxjs'; -import { switchMapTo, mergeMap } from 'rxjs/operators'; +import { switchMapTo, mergeMap, take } from 'rxjs/operators'; /** @test {switchMapTo} */ describe('switchMapTo', () => { @@ -275,4 +275,23 @@ describe('switchMapTo', () => { expectObservable(e1.pipe(switchMapTo(of('foo')))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + switchMapTo(of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/take-spec.ts b/spec/operators/take-spec.ts index 1b92c957e5..b666bd3e06 100644 --- a/spec/operators/take-spec.ts +++ b/spec/operators/take-spec.ts @@ -202,4 +202,22 @@ describe('take operator', () => { source.next(); expect(completed).to.be.true; }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/takeWhile-spec.ts b/spec/operators/takeWhile-spec.ts index 9053513ac1..d45fb5d120 100644 --- a/spec/operators/takeWhile-spec.ts +++ b/spec/operators/takeWhile-spec.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { takeWhile, tap, mergeMap } from 'rxjs/operators'; import { of, Observable, from } from 'rxjs'; +import { values } from 'lodash'; /** @test {takeWhile} */ describe('takeWhile operator', () => { @@ -264,4 +265,22 @@ describe('takeWhile operator', () => { .subscribe(x => x); // x is still string | number } }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + takeWhile(value => value < 2), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/tap-spec.ts b/spec/operators/tap-spec.ts index 09345c89a8..3bb0b6f985 100644 --- a/spec/operators/tap-spec.ts +++ b/spec/operators/tap-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { tap, mergeMap } from 'rxjs/operators'; -import { Subject, of, throwError, Observer, EMPTY } from 'rxjs'; +import { tap, mergeMap, take } from 'rxjs/operators'; +import { Subject, of, throwError, Observer, EMPTY, Observable } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; /** @test {tap} */ @@ -215,4 +215,23 @@ describe('tap operator', () => { expectObservable(result).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + tap(() => { /* noop */ }), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index 8ec5b5d98e..3991021067 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { throttle, mergeMap, mapTo } from 'rxjs/operators'; -import { of, concat, timer } from 'rxjs'; +import { throttle, mergeMap, mapTo, take } from 'rxjs/operators'; +import { of, concat, timer, Observable } from 'rxjs'; /** @test {throttle} */ describe('throttle operator', () => { @@ -395,4 +395,24 @@ describe('throttle operator', () => { expectSubscriptions(n1.subscriptions).toBe(n1Subs); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + throttle(() => of(0)), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/throwIfEmpty-spec.ts b/spec/operators/throwIfEmpty-spec.ts index 1a4b9caadc..5e835a9743 100644 --- a/spec/operators/throwIfEmpty-spec.ts +++ b/spec/operators/throwIfEmpty-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { EMPTY, of, EmptyError, defer, throwError } from 'rxjs'; -import { throwIfEmpty, mergeMap, retry } from 'rxjs/operators'; +import { EMPTY, of, EmptyError, defer, throwError, Observable } from 'rxjs'; +import { throwIfEmpty, mergeMap, retry, take } from 'rxjs/operators'; /** @test {timeout} */ describe('throwIfEmpty', () => { @@ -203,4 +203,23 @@ describe('throwIfEmpty', () => { }); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + throwIfEmpty(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/timeInterval-spec.ts b/spec/operators/timeInterval-spec.ts index d835fd82e4..48ac0aa643 100644 --- a/spec/operators/timeInterval-spec.ts +++ b/spec/operators/timeInterval-spec.ts @@ -1,7 +1,8 @@ +import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { timeInterval, map, mergeMap } from 'rxjs/operators'; +import { timeInterval, map, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; import { TimeInterval } from 'rxjs/internal/operators/timeInterval'; declare const rxTestScheduler: TestScheduler; @@ -149,4 +150,23 @@ describe('timeInterval operator', () => { expectObservable((e1).pipe(timeInterval(rxTestScheduler))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + timeInterval(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/timeout-spec.ts b/spec/operators/timeout-spec.ts index e02bc6fbb1..91ff69d01b 100644 --- a/spec/operators/timeout-spec.ts +++ b/spec/operators/timeout-spec.ts @@ -1,8 +1,8 @@ /** @prettier */ import { expect } from 'chai'; -import { timeout, mergeMap } from 'rxjs/operators'; +import { timeout, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { TimeoutError, of } from 'rxjs'; +import { TimeoutError, of, Observable } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {timeout} */ @@ -679,4 +679,24 @@ describe('timeout operator', () => { }); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + timeout(0), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index 392d6911bb..1b36aa161d 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -1,7 +1,8 @@ /** @prettier */ -import { timeoutWith, mergeMap } from 'rxjs/operators'; +import { expect } from 'chai'; +import { timeoutWith, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { of } from 'rxjs'; +import { of, Observable, EMPTY } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {timeoutWith} */ @@ -274,4 +275,24 @@ describe('timeoutWith operator', () => { expectSubscriptions(switchTo.subscriptions).toBe([]); }); }); + + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + timeoutWith(0, EMPTY), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/timestamp-spec.ts b/spec/operators/timestamp-spec.ts index 9398e5557f..109a096e01 100644 --- a/spec/operators/timestamp-spec.ts +++ b/spec/operators/timestamp-spec.ts @@ -1,7 +1,8 @@ +import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { timestamp, map, mergeMap } from 'rxjs/operators'; +import { timestamp, map, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { of } from 'rxjs'; +import { of, Observable } from 'rxjs'; declare const rxTestScheduler: TestScheduler; @@ -148,4 +149,23 @@ describe('timestamp operator', () => { expectObservable(e1.pipe(timestamp(rxTestScheduler))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + timestamp(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); diff --git a/spec/operators/windowCount-spec.ts b/spec/operators/windowCount-spec.ts index 5c50113b8f..2e9319053f 100644 --- a/spec/operators/windowCount-spec.ts +++ b/spec/operators/windowCount-spec.ts @@ -1,5 +1,6 @@ +import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { windowCount, mergeMap } from 'rxjs/operators'; +import { windowCount, mergeMap, mergeAll, take } from 'rxjs/operators'; import { of, Observable } from 'rxjs'; /** @test {windowCount} */ @@ -148,4 +149,24 @@ describe('windowCount operator', () => { expectObservable(result, unsub).toBe(expected, values); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + windowCount(3), + mergeAll(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); });