Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(start): start method added to observers and subjects #3066

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import * as Rx from '../src/Rx';
import { ISubscription } from '../src/Subscription';
import { TeardownLogic } from '../src/Subscription';
import marbleTestingSignature = require('./helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { map } from '../src/operators/map';
Expand Down Expand Up @@ -534,6 +535,51 @@ describe('Observable', () => {
});

});

it('should allow a start method that provides the subscription object ' +
'prior to setting up the observable', () => {
let setup = false;
const source = new Observable<number>(observer => {
setup = true;
observer.next(1);
observer.complete();
});

let sub1: ISubscription;
let sub2: ISubscription;

sub1 = source.subscribe({
start(sub: ISubscription) {
expect(setup).to.be.false;
sub2 = sub;
},
next(value: number) {
expect(sub2).to.exist;
}
});

expect(sub1).to.equal(sub2);
});

it('should not let you call the start method in the body of the Observable', () => {
let startCalls = 0;
let nextCalls = 0;

new Observable(observer => {
observer.start(new Rx.Subscription());
observer.next(1);
observer.start(new Rx.Subscription());
observer.next(2);
observer.start(new Rx.Subscription());
})
.subscribe({
start() { startCalls++; },
next() { nextCalls++; },
});

expect(startCalls).to.equal(1);
expect(nextCalls).to.equal(2);
});
});
});

Expand Down
18 changes: 18 additions & 0 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { expect } from 'chai';
import * as Rx from '../src/Rx';
import { empty } from '../src/observable/empty';
import marbleTestingSignature = require('./helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { ISubscription } from '../src/Subscription';

declare const { time };
declare const hot: typeof marbleTestingSignature.hot;
Expand Down Expand Up @@ -45,6 +47,22 @@ describe('Subject', () => {
subject.complete();
});

it('should call start with the right Subscription', () => {
const subject = new Subject();
const observable = empty();
const sub1 = observable.subscribe(subject);
let sub2: ISubscription;
const sub3 = subject.subscribe({
start(subscription) {
sub2 = subscription;
},
complete() {/* added to meet type requirements */},
});

expect(sub1).not.to.equal(sub2);
expect(sub2).to.equal(sub3);
});

it('should handle subscribers that arrive and leave at different times, ' +
'subject does not complete', () => {
const subject = new Subject();
Expand Down
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ export class Observable<T> implements Subscribable<T> {
}

protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
sink.start(sink);
try {
return this._subscribe(sink);
} catch (err) {
Expand Down Expand Up @@ -252,6 +253,7 @@ export class Observable<T> implements Subscribable<T> {
}

protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
subscriber.start(subscriber);
return this.source.subscribe(subscriber);
}

Expand Down
9 changes: 8 additions & 1 deletion src/Observer.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
import { ISubscription } from './Subscription';

export interface NextObserver<T> {
closed?: boolean;
next: (value: T) => void;
error?: (err: any) => void;
complete?: () => void;
start?: (subscription: ISubscription) => void;
}

export interface ErrorObserver<T> {
closed?: boolean;
next?: (value: T) => void;
error: (err: any) => void;
complete?: () => void;
start?: (subscription: ISubscription) => void;
}

export interface CompletionObserver<T> {
closed?: boolean;
next?: (value: T) => void;
error?: (err: any) => void;
complete: () => void;
start?: (subscription: ISubscription) => void;
}

export type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;
Expand All @@ -26,11 +31,13 @@ export interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
start?: (subscription: ISubscription) => void;
}

export const empty: Observer<any> = {
closed: true,
next(value: any): void { /* noop */},
error(err: any): void { throw err; },
complete(): void { /*noop*/ }
complete(): void { /*noop*/ },
start(): void { /* noop */ },
};
39 changes: 38 additions & 1 deletion src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { isFunction } from './util/isFunction';
import { Observer, PartialObserver } from './Observer';
import { Subscription } from './Subscription';
import { Subscription, ISubscription } from './Subscription';
import { empty as emptyObserver } from './Observer';
import { rxSubscriber as rxSubscriberSymbol } from './symbol/rxSubscriber';
import { noop } from './util/noop';

/**
* Implements the {@link Observer} interface and extends the
Expand Down Expand Up @@ -116,6 +117,17 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}
}

/**
* The {@link Observer} callback to recieve the subscription prior to the
* observable's subscriber function being executed.
* @param subscription The same {@link Subscription} returned by subscribe.
*/
start(subscription: ISubscription) {
if (!this.isStopped) {
this._start(subscription);
}
}

unsubscribe(): void {
if (this.closed) {
return;
Expand All @@ -138,6 +150,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.unsubscribe();
}

protected _start(subscription: ISubscription): void {
const { destination } = this;
if (destination.start) {
destination.start(subscription);
}
}

protected _unsubscribeAndRecycle(): Subscriber<T> {
const { _parent, _parents } = this;
this._parent = null;
Expand All @@ -159,6 +178,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
class SafeSubscriber<T> extends Subscriber<T> {

private _context: any;
private isStarted = false;

constructor(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
Expand All @@ -167,13 +187,15 @@ class SafeSubscriber<T> extends Subscriber<T> {

let next: ((value: T) => void);
let context: any = this;
let start: ((subscription: ISubscription) => void) = noop;

if (isFunction(observerOrNext)) {
next = (<((value: T) => void)> observerOrNext);
} else if (observerOrNext) {
next = (<PartialObserver<T>> observerOrNext).next;
error = (<PartialObserver<T>> observerOrNext).error;
complete = (<PartialObserver<T>> observerOrNext).complete;
start = (<PartialObserver<T>> observerOrNext).start;
if (observerOrNext !== emptyObserver) {
context = Object.create(observerOrNext);
if (isFunction(context.unsubscribe)) {
Expand All @@ -187,6 +209,7 @@ class SafeSubscriber<T> extends Subscriber<T> {
this._next = next;
this._error = error;
this._complete = complete;
this._start = start;
}

next(value?: T): void {
Expand Down Expand Up @@ -228,6 +251,20 @@ class SafeSubscriber<T> extends Subscriber<T> {
}
}

start(subscription: ISubscription): void {
if (!this.isStopped && !this.isStarted) {
this.isStarted = true;
if (this._start) {
try {
this._start.call(this._context, subscription);
} catch (err) {
this._hostReportError(err);
this.unsubscribe();
}
}
}
}

protected _unsubscribe(): void {
this._context = null;
}
Expand Down