Skip to content

Commit

Permalink
kefirjs#235 flatMapLatest adding overlapping option
Browse files Browse the repository at this point in the history
passing an options object of {overlapping:true} will cause the next stream to be added before any old streams are removed.
  • Loading branch information
J. Holmes committed Feb 16, 2017
1 parent b799e06 commit 822dabb
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 7 deletions.
10 changes: 9 additions & 1 deletion docs-src/descriptions/multiple-sources.jade
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ div



+descr-method('flat-map-latest', 'flatMapLatest', 'obs.flatMapLatest([fn])').
+descr-method('flat-map-latest', 'flatMapLatest', 'obs.flatMapLatest([fn], [options])').
Like #[b flatMap], but repeats events only from the latest added observable
i.e., switching from one observable to another.

Expand Down Expand Up @@ -433,6 +433,14 @@ pre(title='events in time').
spawned 3: ---3---3---3---3X

result: -------------1---1-----2---2-----3---3---3---3X

p.
By default, #[b flatMapLatest] will remove the previous observable #[i before]
adding the next. This may cause an obversable to deactivate briefly when switching
between observable, even if the observable is in the activation chain of both the
previous and next observable. If you want the next observable to be added #[i before]
removing the previous, you can pass #[tt {overlapping: true}] as #[b options].

div


Expand Down
3 changes: 2 additions & 1 deletion kefir.js.flow
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ declare class Observable<+V,+E=*> {
concat<V2,E2>(otherObs: Observable<V2,E2>): Observable<V|V2,E|E2>;

flatMap<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapLatest<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapLatest<V2,E2>(options?: {overlapping?: boolean}): Observable<V2,E|E2>;
flatMapLatest<V2,E2>(transform: (value: V) => Observable<V2,E2>, options?: {overlapping?: boolean}): Observable<V2,E|E2>;
flatMapFirst<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapConcat<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapConcurLimit<V2,E2>(transform: (value: V) => Observable<V2,E2>, limit: number): Observable<V2,E|E2>;
Expand Down
6 changes: 4 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,10 @@ import FlatMap from './many-sources/flat-map';
Observable.prototype.flatMap = function(fn) {
return new FlatMap(this, fn).setName(this, 'flatMap');
};
Observable.prototype.flatMapLatest = function(fn) {
return new FlatMap(this, fn, {concurLim: 1, drop: 'old'}).setName(this, 'flatMapLatest');

import flatMapLatest from './many-sources/flat-map-latest';
Observable.prototype.flatMapLatest = function(...args) {
return flatMapLatest(this, ...args);
};
Observable.prototype.flatMapFirst = function(fn) {
return new FlatMap(this, fn, {concurLim: 1}).setName(this, 'flatMapFirst');
Expand Down
14 changes: 11 additions & 3 deletions src/many-sources/abstract-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import {concat, forEach, findByPred, find, remove, cloneArray} from '../utils/co

const id = (x => x);

function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) {
function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new', overlapping = false} = {}) {
Stream.call(this);

this._queueLim = queueLim < 0 ? -1 : queueLim;
this._concurLim = concurLim < 0 ? -1 : concurLim;
this._drop = drop;
this._overlapping = overlapping;
this._queue = [];
this._curSources = [];
this._$handleSubAny = (event) => this._handleSubAny(event);
Expand All @@ -34,8 +35,15 @@ inherit(AbstractPool, Stream, {
if (this._queueLim === -1 || this._queue.length < this._queueLim) {
this._addToQueue(toObs(obj));
} else if (this._drop === 'old') {
this._removeOldest();
this._add(obj, toObs);
if (this._overlapping) {
this._addToCur(toObs(obj));
while (this._curSources.length > this._concurLim) {
this._removeOldest();
}
} else {
this._removeOldest();
this._add(obj, toObs);
}
}
}
},
Expand Down
11 changes: 11 additions & 0 deletions src/many-sources/flat-map-latest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import FlatMap from './flat-map';

export default function flatMapLatest(obs, fn, options) {
if (typeof fn !== 'function') {
options = fn;
fn = undefined;
}
options = options === undefined ? {} : options;
const { overlapping = false } = options;
return new FlatMap(obs, fn, {concurLim: 1, drop: 'old', overlapping }).setName(obs, 'flatMapLatest');
}
38 changes: 38 additions & 0 deletions test/specs/flat-map-latest.coffee
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{stream, prop, send, activate, deactivate, Kefir} = require('../test-helpers.coffee')
sinon = require('sinon')


describe 'flatMapLatest', ->
Expand Down Expand Up @@ -102,3 +103,40 @@ describe 'flatMapLatest', ->
a = send(prop(), [0])
b = send(prop(), [a])
expect(b.flatMapLatest()).toEmit [{current: 0}]


describe 'non-overlapping', ->
it 'should remove the previous stream before adding the next', ->
onDeactivate = sinon.spy()
a = Kefir.stream(-> onDeactivate)
b = stream()
map = b.flatMapLatest()
activate(map)
send(b, [a])
send(b, [a])
deactivate(map)
expect(onDeactivate.callCount).toBe(2)


describe 'overlapping', ->
it 'should add the next stream before removing the previous', ->
onDeactivate = sinon.spy()
a = Kefir.stream(-> onDeactivate)
b = stream()
map = b.flatMapLatest({ overlapping: true })
activate(map)
send(b, [a])
send(b, [a])
deactivate(map)
expect(onDeactivate.callCount).toBe(1)

it 'should accept optional map fn', ->
onDeactivate = sinon.spy()
a = Kefir.stream(-> onDeactivate)
b = stream()
map = b.flatMapLatest(((x) -> x.obs), { overlapping: true })
activate(map)
send(b, [{ obs: a }])
send(b, [{ obs: a }])
deactivate(map)
expect(onDeactivate.callCount).toBe(1)

0 comments on commit 822dabb

Please sign in to comment.