diff --git a/src/index.js b/src/index.js index 87ecec03..45b3e1b1 100644 --- a/src/index.js +++ b/src/index.js @@ -327,8 +327,10 @@ import FlatMap from './many-sources/flat-map' Observable.prototype.flatMap = function(fn) { return new FlatMap(this, fn).setName(this, 'flatMap') } -Observable.prototype.flatMapLatest = function(fn) { - return new FlatMap(this, fn, {concurLim: 1, drop: 'old'}).setName(this, 'flatMapLatest') +Observable.prototype.flatMapLatest = function(fn, options = {}) { + options.concurLim = 1 + options.drop = 'old' + return new FlatMap(this, fn, options).setName(this, 'flatMapLatest') } Observable.prototype.flatMapFirst = function(fn) { return new FlatMap(this, fn, {concurLim: 1}).setName(this, 'flatMapFirst') diff --git a/src/many-sources/abstract-pool.js b/src/many-sources/abstract-pool.js index 683314c5..7f99b5a5 100644 --- a/src/many-sources/abstract-pool.js +++ b/src/many-sources/abstract-pool.js @@ -5,12 +5,13 @@ import {concat, forEach, findByPred, find, remove, cloneArray} from '../utils/co const id = x => x -function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) { +function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new', overlapping = false} = {}) { Stream.call(this) this._queueLim = queueLim < 0 ? -1 : queueLim this._concurLim = concurLim < 0 ? -1 : concurLim this._drop = drop + this._overlapping = overlapping this._queue = [] this._curSources = [] this._$handleSubAny = event => this._handleSubAny(event) @@ -25,16 +26,21 @@ function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) { inherit(AbstractPool, Stream, { _name: 'abstractPool', - _add(obj, toObs /* Function | falsey */) { + _add(obj, toObs /* Function | falsey */, allowOverflow) { toObs = toObs || id if (this._concurLim === -1 || this._curSources.length < this._concurLim) { this._addToCur(toObs(obj)) } else { - if (this._queueLim === -1 || this._queue.length < this._queueLim) { + if (this._queueLim === -1 || this._queue.length < this._queueLim || allowOverflow) { this._addToQueue(toObs(obj)) } else if (this._drop === 'old') { - this._removeOldest() - this._add(obj, toObs) + if (this._overlapping) { + this._add(obj, toObs, true) + this._removeOldest(true) + } else { + this._removeOldest() + this._add(obj, toObs) + } } } }, @@ -148,8 +154,8 @@ inherit(AbstractPool, Stream, { return index }, - _removeCur(obs) { - if (this._active) { + _removeCur(obs, after) { + if (!after && this._active) { this._unsubscribe(obs) } let index = find(this._curSources, obs) @@ -161,11 +167,14 @@ inherit(AbstractPool, Stream, { this._onEmpty() } } + if (after && this._active) { + this._unsubscribe(obs) + } return index }, - _removeOldest() { - this._removeCur(this._curSources[0]) + _removeOldest(after) { + this._removeCur(this._curSources[0], after) }, _pullQueue() { diff --git a/src/many-sources/flat-map.js b/src/many-sources/flat-map.js index 37f36ea3..e34e17fa 100644 --- a/src/many-sources/flat-map.js +++ b/src/many-sources/flat-map.js @@ -2,7 +2,12 @@ import {VALUE, ERROR, END} from '../constants' import {inherit} from '../utils/objects' import AbstractPool from './abstract-pool' -function FlatMap(source, fn, options) { +function FlatMap(source, fn, options = {}) { + if (fn && typeof fn !== 'function') { + options = fn + fn = undefined + } + AbstractPool.call(this, options) this._source = source this._fn = fn diff --git a/test/specs/flat-map-latest.js b/test/specs/flat-map-latest.js index 3893b398..a953f997 100644 --- a/test/specs/flat-map-latest.js +++ b/test/specs/flat-map-latest.js @@ -1,3 +1,4 @@ +const sinon = require('sinon') const {stream, prop, send, value, error, end, activate, deactivate, Kefir, expect} = require('../test-helpers') describe('flatMapLatest', () => { @@ -82,6 +83,62 @@ describe('flatMapLatest', () => { }) ).to.emit([value(3), value(4), value(5)], () => send(a, [value(1), value(2), value(3), value(4), value(5)])) }) + + describe('non-overlapping', () => { + it('should remove the previous stream before adding the next', () => { + onDeactivate = sinon.spy() + a = Kefir.stream(() => onDeactivate) + b = stream() + map = b.flatMapLatest() + activate(map) + send(b, [value(a)]) + send(b, [value(a)]) + deactivate(map) + expect(onDeactivate.callCount).to.equal(2) + }) + }) + + describe('overlapping', () => { + it('should add the next stream before removing the previous', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMapLatest({overlapping: true}) + activate(map) + send(a, [value(b)]) + send(a, [value(b)]) + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('should accept optional map fn', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMapLatest(x => x.obs, {overlapping: true}) + activate(map) + send(a, [value({obs: b})]) + send(a, [value({obs: b})]) + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('should work nicely with Kefir.constant and Kefir.never', () => { + const a = stream() + expect( + a.flatMapLatest( + x => { + if (x > 2) { + return Kefir.constant(x) + } else { + return Kefir.never() + } + }, + {overlapping: true} + ) + ).to.emit([value(3), value(4), value(5)], () => send(a, [value(1), value(2), value(3), value(4), value(5)])) + }) + }) }) describe('property', () => { diff --git a/test/specs/flat-map.js b/test/specs/flat-map.js index b765e774..182b2787 100644 --- a/test/specs/flat-map.js +++ b/test/specs/flat-map.js @@ -1,3 +1,4 @@ +const sinon = require('sinon') const {stream, prop, send, value, error, end, activate, deactivate, Kefir, expect} = require('../test-helpers') describe('flatMap', () => { @@ -239,4 +240,90 @@ describe('flatMap', () => { expect(result).to.flowErrors(c) }) }) + + describe('overlapping with a concurrency limit that has maxed out', () => { + describe('and with a queue limit', () => { + it('not maxed out, should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: true, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('maxed out, should add the next stream before removing the previous', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: true, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + send(a, [value(b)]) // added to queue (overflow) + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + }) + + describe('and without a queue limit', () => { + it('should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: true, queueLim: -1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // replaced current + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + }) + }) + + describe('non-overlapping with a concurrency limit that has maxed out', () => { + describe('and with a queue limit', () => { + it('not maxed out, should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: false, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('maxed out, should remove the previous stream before adding the next', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: false, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + send(a, [value(b)]) // added to queue (overflow) + deactivate(map) + expect(onDeactivate.callCount).to.equal(2) + }) + }) + + describe('and without a queue limit', () => { + it('should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: false, queueLim: -1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // replaced current + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + }) + }) })