Skip to content

Commit 653b47a

Browse files
committed
feat(mergeMapTo): add higher-order lettable version of mergeMapTo
1 parent e97530f commit 653b47a

File tree

3 files changed

+171
-113
lines changed

3 files changed

+171
-113
lines changed

Diff for: src/operator/mergeMapTo.ts

+2-113
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
import { Observable, ObservableInput } from '../Observable';
2-
import { Operator } from '../Operator';
3-
import { PartialObserver } from '../Observer';
4-
import { Subscriber } from '../Subscriber';
5-
import { Subscription } from '../Subscription';
6-
import { OuterSubscriber } from '../OuterSubscriber';
7-
import { InnerSubscriber } from '../InnerSubscriber';
8-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { mergeMapTo as higherOrder } from '../operators/mergeMapTo';
93

104
/* tslint:disable:max-line-length */
115
export function mergeMapTo<T, R>(this: Observable<T>, observable: ObservableInput<R>, concurrent?: number): Observable<R>;
@@ -58,110 +52,5 @@ export function mergeMapTo<T, I, R>(this: Observable<T>, observable: ObservableI
5852
export function mergeMapTo<T, I, R>(this: Observable<T>, innerObservable: Observable<I>,
5953
resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number,
6054
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
61-
if (typeof resultSelector === 'number') {
62-
concurrent = <number>resultSelector;
63-
resultSelector = null;
64-
}
65-
return this.lift(new MergeMapToOperator(innerObservable, <any>resultSelector, concurrent));
66-
}
67-
68-
// TODO: Figure out correct signature here: an Operator<Observable<T>, R>
69-
// needs to implement call(observer: Subscriber<R>): Subscriber<Observable<T>>
70-
export class MergeMapToOperator<T, I, R> implements Operator<Observable<T>, R> {
71-
constructor(private ish: ObservableInput<I>,
72-
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
73-
private concurrent: number = Number.POSITIVE_INFINITY) {
74-
}
75-
76-
call(observer: Subscriber<R>, source: any): any {
77-
return source.subscribe(new MergeMapToSubscriber(observer, this.ish, this.resultSelector, this.concurrent));
78-
}
79-
}
80-
81-
/**
82-
* We need this JSDoc comment for affecting ESDoc.
83-
* @ignore
84-
* @extends {Ignored}
85-
*/
86-
export class MergeMapToSubscriber<T, I, R> extends OuterSubscriber<T, I> {
87-
private hasCompleted: boolean = false;
88-
private buffer: T[] = [];
89-
private active: number = 0;
90-
protected index: number = 0;
91-
92-
constructor(destination: Subscriber<R>,
93-
private ish: ObservableInput<I>,
94-
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
95-
private concurrent: number = Number.POSITIVE_INFINITY) {
96-
super(destination);
97-
}
98-
99-
protected _next(value: T): void {
100-
if (this.active < this.concurrent) {
101-
const resultSelector = this.resultSelector;
102-
const index = this.index++;
103-
const ish = this.ish;
104-
const destination = this.destination;
105-
106-
this.active++;
107-
this._innerSub(ish, destination, resultSelector, value, index);
108-
} else {
109-
this.buffer.push(value);
110-
}
111-
}
112-
113-
private _innerSub(ish: ObservableInput<I>,
114-
destination: PartialObserver<I>,
115-
resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
116-
value: T,
117-
index: number): void {
118-
this.add(subscribeToResult<T, I>(this, ish, value, index));
119-
}
120-
121-
protected _complete(): void {
122-
this.hasCompleted = true;
123-
if (this.active === 0 && this.buffer.length === 0) {
124-
this.destination.complete();
125-
}
126-
}
127-
128-
notifyNext(outerValue: T, innerValue: I,
129-
outerIndex: number, innerIndex: number,
130-
innerSub: InnerSubscriber<T, I>): void {
131-
const { resultSelector, destination } = this;
132-
if (resultSelector) {
133-
this.trySelectResult(outerValue, innerValue, outerIndex, innerIndex);
134-
} else {
135-
destination.next(innerValue);
136-
}
137-
}
138-
139-
private trySelectResult(outerValue: T, innerValue: I,
140-
outerIndex: number, innerIndex: number): void {
141-
const { resultSelector, destination } = this;
142-
let result: R;
143-
try {
144-
result = resultSelector(outerValue, innerValue, outerIndex, innerIndex);
145-
} catch (err) {
146-
destination.error(err);
147-
return;
148-
}
149-
150-
destination.next(result);
151-
}
152-
153-
notifyError(err: any): void {
154-
this.destination.error(err);
155-
}
156-
157-
notifyComplete(innerSub: Subscription): void {
158-
const buffer = this.buffer;
159-
this.remove(innerSub);
160-
this.active--;
161-
if (buffer.length > 0) {
162-
this._next(buffer.shift());
163-
} else if (this.active === 0 && this.hasCompleted) {
164-
this.destination.complete();
165-
}
166-
}
55+
return higherOrder(innerObservable, resultSelector as any, concurrent)(this);
16756
}

Diff for: src/operators/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ export { max } from './max';
4040
export { merge } from './merge';
4141
export { mergeAll } from './mergeAll';
4242
export { mergeMap } from './mergeMap';
43+
export { mergeMapTo } from './mergeMapTo';
4344
export { min } from './min';
4445
export { multicast } from './multicast';
4546
export { observeOn } from './observeOn';

Diff for: src/operators/mergeMapTo.ts

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import { Observable, ObservableInput } from '../Observable';
2+
import { Operator } from '../Operator';
3+
import { PartialObserver } from '../Observer';
4+
import { Subscriber } from '../Subscriber';
5+
import { Subscription } from '../Subscription';
6+
import { OuterSubscriber } from '../OuterSubscriber';
7+
import { InnerSubscriber } from '../InnerSubscriber';
8+
import { subscribeToResult } from '../util/subscribeToResult';
9+
import { OperatorFunction } from '../interfaces';
10+
11+
/* tslint:disable:max-line-length */
12+
export function mergeMapTo<T, R>(observable: ObservableInput<R>, concurrent?: number): OperatorFunction<T, R>;
13+
export function mergeMapTo<T, I, R>(observable: ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction<T, R>;
14+
/* tslint:enable:max-line-length */
15+
16+
/**
17+
* Projects each source value to the same Observable which is merged multiple
18+
* times in the output Observable.
19+
*
20+
* <span class="informal">It's like {@link mergeMap}, but maps each value always
21+
* to the same inner Observable.</span>
22+
*
23+
* <img src="./img/mergeMapTo.png" width="100%">
24+
*
25+
* Maps each source value to the given Observable `innerObservable` regardless
26+
* of the source value, and then merges those resulting Observables into one
27+
* single Observable, which is the output Observable.
28+
*
29+
* @example <caption>For each click event, start an interval Observable ticking every 1 second</caption>
30+
* var clicks = Rx.Observable.fromEvent(document, 'click');
31+
* var result = clicks.mergeMapTo(Rx.Observable.interval(1000));
32+
* result.subscribe(x => console.log(x));
33+
*
34+
* @see {@link concatMapTo}
35+
* @see {@link merge}
36+
* @see {@link mergeAll}
37+
* @see {@link mergeMap}
38+
* @see {@link mergeScan}
39+
* @see {@link switchMapTo}
40+
*
41+
* @param {ObservableInput} innerObservable An Observable to replace each value from
42+
* the source Observable.
43+
* @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector]
44+
* A function to produce the value on the output Observable based on the values
45+
* and the indices of the source (outer) emission and the inner Observable
46+
* emission. The arguments passed to this function are:
47+
* - `outerValue`: the value that came from the source
48+
* - `innerValue`: the value that came from the projected Observable
49+
* - `outerIndex`: the "index" of the value that came from the source
50+
* - `innerIndex`: the "index" of the value from the projected Observable
51+
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
52+
* Observables being subscribed to concurrently.
53+
* @return {Observable} An Observable that emits items from the given
54+
* `innerObservable` (and optionally transformed through `resultSelector`) every
55+
* time a value is emitted on the source Observable.
56+
* @method mergeMapTo
57+
* @owner Observable
58+
*/
59+
export function mergeMapTo<T, I, R>(innerObservable: Observable<I>,
60+
resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number,
61+
concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, R> {
62+
if (typeof resultSelector === 'number') {
63+
concurrent = <number>resultSelector;
64+
resultSelector = null;
65+
}
66+
return (source: Observable<T>) => source.lift(new MergeMapToOperator(innerObservable, <any>resultSelector, concurrent));
67+
}
68+
69+
// TODO: Figure out correct signature here: an Operator<Observable<T>, R>
70+
// needs to implement call(observer: Subscriber<R>): Subscriber<Observable<T>>
71+
export class MergeMapToOperator<T, I, R> implements Operator<Observable<T>, R> {
72+
constructor(private ish: ObservableInput<I>,
73+
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
74+
private concurrent: number = Number.POSITIVE_INFINITY) {
75+
}
76+
77+
call(observer: Subscriber<R>, source: any): any {
78+
return source.subscribe(new MergeMapToSubscriber(observer, this.ish, this.resultSelector, this.concurrent));
79+
}
80+
}
81+
82+
/**
83+
* We need this JSDoc comment for affecting ESDoc.
84+
* @ignore
85+
* @extends {Ignored}
86+
*/
87+
export class MergeMapToSubscriber<T, I, R> extends OuterSubscriber<T, I> {
88+
private hasCompleted: boolean = false;
89+
private buffer: T[] = [];
90+
private active: number = 0;
91+
protected index: number = 0;
92+
93+
constructor(destination: Subscriber<R>,
94+
private ish: ObservableInput<I>,
95+
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
96+
private concurrent: number = Number.POSITIVE_INFINITY) {
97+
super(destination);
98+
}
99+
100+
protected _next(value: T): void {
101+
if (this.active < this.concurrent) {
102+
const resultSelector = this.resultSelector;
103+
const index = this.index++;
104+
const ish = this.ish;
105+
const destination = this.destination;
106+
107+
this.active++;
108+
this._innerSub(ish, destination, resultSelector, value, index);
109+
} else {
110+
this.buffer.push(value);
111+
}
112+
}
113+
114+
private _innerSub(ish: ObservableInput<I>,
115+
destination: PartialObserver<I>,
116+
resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
117+
value: T,
118+
index: number): void {
119+
this.add(subscribeToResult<T, I>(this, ish, value, index));
120+
}
121+
122+
protected _complete(): void {
123+
this.hasCompleted = true;
124+
if (this.active === 0 && this.buffer.length === 0) {
125+
this.destination.complete();
126+
}
127+
}
128+
129+
notifyNext(outerValue: T, innerValue: I,
130+
outerIndex: number, innerIndex: number,
131+
innerSub: InnerSubscriber<T, I>): void {
132+
const { resultSelector, destination } = this;
133+
if (resultSelector) {
134+
this.trySelectResult(outerValue, innerValue, outerIndex, innerIndex);
135+
} else {
136+
destination.next(innerValue);
137+
}
138+
}
139+
140+
private trySelectResult(outerValue: T, innerValue: I,
141+
outerIndex: number, innerIndex: number): void {
142+
const { resultSelector, destination } = this;
143+
let result: R;
144+
try {
145+
result = resultSelector(outerValue, innerValue, outerIndex, innerIndex);
146+
} catch (err) {
147+
destination.error(err);
148+
return;
149+
}
150+
151+
destination.next(result);
152+
}
153+
154+
notifyError(err: any): void {
155+
this.destination.error(err);
156+
}
157+
158+
notifyComplete(innerSub: Subscription): void {
159+
const buffer = this.buffer;
160+
this.remove(innerSub);
161+
this.active--;
162+
if (buffer.length > 0) {
163+
this._next(buffer.shift());
164+
} else if (this.active === 0 && this.hasCompleted) {
165+
this.destination.complete();
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)