diff --git a/docs-src/descriptions/multiple-sources.jade b/docs-src/descriptions/multiple-sources.jade index f9f14769..ebe80dce 100644 --- a/docs-src/descriptions/multiple-sources.jade +++ b/docs-src/descriptions/multiple-sources.jade @@ -403,7 +403,7 @@ div -+descr-method('flat-map-latest', 'flatMapLatest', 'obs.flatMapLatest([fn])'). ++descr-method('flat-map-latest', 'flatMapLatest', 'obs.flatMapLatest([fn], [options])'). Like #[b flatMap], but repeats events only from the latest added observable i.e., switching from one observable to another. @@ -433,6 +433,14 @@ pre(title='events in time'). spawned 3: ---3---3---3---3X result: -------------1---1-----2---2-----3---3---3---3X + +p. + By default, #[b flatMapLatest] will remove the previous observable #[i before] + adding the next. This may cause an obversable to deactivate briefly when switching + between observable, even if the observable is in the activation chain of both the + previous and next observable. If you want the next observable to be added #[i before] + removing the previous, you can pass #[tt {overlapping: true}] as #[b options]. + div diff --git a/kefir.js.flow b/kefir.js.flow index 1fc9d24a..b9ad1c8d 100644 --- a/kefir.js.flow +++ b/kefir.js.flow @@ -89,7 +89,8 @@ declare class Observable<+V,+E=*> { concat(otherObs: Observable): Observable; flatMap(transform: (value: V) => Observable): Observable; - flatMapLatest(transform: (value: V) => Observable): Observable; + flatMapLatest(options?: {overlapping?: boolean}): Observable; + flatMapLatest(transform: (value: V) => Observable, options?: {overlapping?: boolean}): Observable; flatMapFirst(transform: (value: V) => Observable): Observable; flatMapConcat(transform: (value: V) => Observable): Observable; flatMapConcurLimit(transform: (value: V) => Observable, limit: number): Observable; diff --git a/src/index.js b/src/index.js index 559d63cd..d198b58f 100644 --- a/src/index.js +++ b/src/index.js @@ -332,8 +332,10 @@ import FlatMap from './many-sources/flat-map'; Observable.prototype.flatMap = function(fn) { return new FlatMap(this, fn).setName(this, 'flatMap'); }; -Observable.prototype.flatMapLatest = function(fn) { - return new FlatMap(this, fn, {concurLim: 1, drop: 'old'}).setName(this, 'flatMapLatest'); + +import flatMapLatest from './many-sources/flat-map-latest'; +Observable.prototype.flatMapLatest = function(...args) { + return flatMapLatest(this, ...args); }; Observable.prototype.flatMapFirst = function(fn) { return new FlatMap(this, fn, {concurLim: 1}).setName(this, 'flatMapFirst'); diff --git a/src/many-sources/abstract-pool.js b/src/many-sources/abstract-pool.js index 418244cf..00d094c8 100644 --- a/src/many-sources/abstract-pool.js +++ b/src/many-sources/abstract-pool.js @@ -5,12 +5,13 @@ import {concat, forEach, findByPred, find, remove, cloneArray} from '../utils/co const id = (x => x); -function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) { +function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new', overlapping = false} = {}) { Stream.call(this); this._queueLim = queueLim < 0 ? -1 : queueLim; this._concurLim = concurLim < 0 ? -1 : concurLim; this._drop = drop; + this._overlapping = overlapping; this._queue = []; this._curSources = []; this._$handleSubAny = (event) => this._handleSubAny(event); @@ -34,8 +35,13 @@ inherit(AbstractPool, Stream, { if (this._queueLim === -1 || this._queue.length < this._queueLim) { this._addToQueue(toObs(obj)); } else if (this._drop === 'old') { - this._removeOldest(); - this._add(obj, toObs); + if (this._overlapping) { + this._addToCur(toObs(obj)); + this._removeOldest(); + } else { + this._removeOldest(); + this._addToCur(toObs(obj)); + } } } }, diff --git a/src/many-sources/flat-map-latest.js b/src/many-sources/flat-map-latest.js new file mode 100644 index 00000000..cda68d91 --- /dev/null +++ b/src/many-sources/flat-map-latest.js @@ -0,0 +1,11 @@ +import FlatMap from './flat-map'; + +export default function flatMapLatest(obs, fn, options) { + if (typeof fn !== 'function') { + options = fn; + fn = undefined; + } + options = options === undefined ? {} : options; + const { overlapping = false } = options; + return new FlatMap(obs, fn, {concurLim: 1, drop: 'old', overlapping }).setName(obs, 'flatMapLatest'); +} diff --git a/test/specs/flat-map-latest.coffee b/test/specs/flat-map-latest.coffee index 7b3adf88..87a8efc5 100644 --- a/test/specs/flat-map-latest.coffee +++ b/test/specs/flat-map-latest.coffee @@ -1,4 +1,5 @@ {stream, prop, send, activate, deactivate, Kefir} = require('../test-helpers.coffee') +sinon = require('sinon') describe 'flatMapLatest', -> @@ -102,3 +103,52 @@ describe 'flatMapLatest', -> a = send(prop(), [0]) b = send(prop(), [a]) expect(b.flatMapLatest()).toEmit [{current: 0}] + + + describe 'non-overlapping', -> + it 'should remove the previous stream before adding the next', -> + onDeactivate = sinon.spy() + a = Kefir.stream(-> onDeactivate) + b = stream() + map = b.flatMapLatest() + activate(map) + send(b, [a]) + send(b, [a]) + deactivate(map) + expect(onDeactivate.callCount).toBe(2) + + + describe 'overlapping', -> + it 'should add the next stream before removing the previous', -> + onDeactivate = sinon.spy() + a = Kefir.stream(-> onDeactivate) + b = stream() + map = b.flatMapLatest({ overlapping: true }) + activate(map) + send(b, [a]) + send(b, [a]) + deactivate(map) + expect(onDeactivate.callCount).toBe(1) + + it 'should accept optional map fn', -> + onDeactivate = sinon.spy() + a = Kefir.stream(-> onDeactivate) + b = stream() + map = b.flatMapLatest(((x) -> x.obs), { overlapping: true }) + activate(map) + send(b, [{ obs: a }]) + send(b, [{ obs: a }]) + deactivate(map) + expect(onDeactivate.callCount).toBe(1) + + it 'should work nicely with Kefir.constant and Kefir.never', -> + a = stream() + handle = (x) -> + if x > 2 + Kefir.constant(x) + else + Kefir.never() + expect( + a.flatMapLatest(handle, { overlapping: true }) + ).toEmit [3, 4, 5], -> + send(a, [1, 2, 3, 4, 5])