diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index c9fdf744b8..62786fb0ac 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -409,4 +409,15 @@ describe('Observable.prototype.publishReplay', () => { published.connect(); }); + + it('should mirror a simple source Observable with selector', () => { + const selector = observable => observable.map(v => String.fromCharCode(96 + parseInt(v))); + const source = cold('--1-2---3-4---|'); + const sourceSubs = '^ !'; + const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector); + const expected = '--a-b---c-d---|'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); diff --git a/src/operator/publishReplay.ts b/src/operator/publishReplay.ts index 715936f961..b0e05ae534 100644 --- a/src/operator/publishReplay.ts +++ b/src/operator/publishReplay.ts @@ -6,6 +6,7 @@ import { publishReplay as higherOrder } from '../operators'; /** * @param bufferSize * @param windowTime + * @param selector * @param scheduler * @return {ConnectableObservable} * @method publishReplay @@ -13,6 +14,7 @@ import { publishReplay as higherOrder } from '../operators'; */ export function publishReplay(this: Observable, bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, + selector?: (source: Observable) => Observable, scheduler?: IScheduler): ConnectableObservable { - return higherOrder(bufferSize, windowTime, scheduler)(this); + return higherOrder(bufferSize, windowTime, selector, scheduler)(this); } diff --git a/src/operators/publishReplay.ts b/src/operators/publishReplay.ts index 5943db4acb..aada0682d9 100644 --- a/src/operators/publishReplay.ts +++ b/src/operators/publishReplay.ts @@ -7,6 +7,8 @@ import { UnaryFunction } from '../interfaces'; export function publishReplay(bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, + selector: (source: Observable) => Observable, scheduler?: IScheduler): UnaryFunction, ConnectableObservable> { - return (source: Observable) => multicast(new ReplaySubject(bufferSize, windowTime, scheduler))(source) as ConnectableObservable; + const subject = new ReplaySubject(bufferSize, windowTime, scheduler); + return (source: Observable) => multicast(() => subject, selector)(source) as ConnectableObservable; }