diff --git a/module/switchlatest/index.js b/module/switchlatest/index.js index 699f73e..6d58529 100644 --- a/module/switchlatest/index.js +++ b/module/switchlatest/index.js @@ -1,12 +1,10 @@ var flyd = require('../../lib'); var takeUntil = require('../takeuntil'); -var drop = require('ramda/src/drop'); -var dropCurrentValue = flyd.transduce(drop(1)); module.exports = function(s) { return flyd.combine(function(stream$, self) { var value$ = stream$(); - flyd.on(self, takeUntil(value$, dropCurrentValue(stream$))); + flyd.on(self, takeUntil(value$, stream$)); }, [s]); }; diff --git a/module/takeuntil/index.js b/module/takeuntil/index.js index 9f378c6..70e3302 100644 --- a/module/takeuntil/index.js +++ b/module/takeuntil/index.js @@ -1,7 +1,11 @@ var flyd = require('../../lib'); +var drop = require('ramda/src/drop'); + +var dropCurrentValue = flyd.transduce(drop(1)); module.exports = flyd.curryN(2, function(src, term) { - return flyd.endsOn(flyd.merge(term, src.end), flyd.combine(function(src, self) { + var end$ = term.hasVal ? dropCurrentValue(term) : term; + return flyd.endsOn(flyd.merge(end$, src.end), flyd.combine(function(src, self) { self(src()); }, [src])); }); diff --git a/module/takeuntil/test/index.js b/module/takeuntil/test/index.js index b5be138..9779760 100644 --- a/module/takeuntil/test/index.js +++ b/module/takeuntil/test/index.js @@ -38,4 +38,24 @@ describe('takeUntil', function() { assert.deepEqual(result, [1]); assert(s.end()); }); + + it('works in nested streams', function() { + var source = stream(1); + var terminator = stream(true); + + var value = stream(1).chain(function() { + return takeUntil(source, terminator); + }) + .map(function(val) { + return val + 1; + }); + + source(2)(3)(4)(5); + + terminator(true); + + source(6)(7)(8)(9); + + assert.equal(value(), 6); + }) });