Skip to content

Commit

Permalink
add cancellation to Async (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
evilsoft authored May 21, 2017
1 parent af7e578 commit c65b930
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 17 deletions.
47 changes: 34 additions & 13 deletions crocks/Async.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const sequence = require('../pointfree/sequence')

const once = require('../helpers/once')
const mreduceMap = require('../helpers/mreduceMap')
const unit = require('../helpers/unit')

const All = require('../monoids/All')

Expand Down Expand Up @@ -79,11 +80,18 @@ function fromPromise(fn) {
}
}

function Async(fn) {
function Async(fn, parentCancel) {
if(!isFunction(fn)) {
throw new TypeError('Async: Function required')
}

var cancelled

const cancel = composeB(
() => { cancelled = true },
isFunction(parentCancel) ? parentCancel : unit
)

const type =
_type

Expand All @@ -93,12 +101,20 @@ function Async(fn) {
const inspect =
constant(`Async${_inspect(fn)}`)

function fork(reject, resolve) {
function fork(reject, resolve, cleanup) {
if(!isFunction(reject) || !isFunction(resolve)) {
throw new TypeError('Async.fork: Reject and resolve functions required')
}

fn(reject, resolve)
const forkCancel =
isFunction(cleanup) ? cleanup : unit

fn(
x => cancelled ? unit() : reject(x),
x => cancelled ? unit() : resolve(x)
)

return once(composeB(forkCancel, cancel))
}

function toPromise() {
Expand All @@ -117,7 +133,7 @@ function Async(fn) {
composeB(resolve, l),
composeB(reject, r)
)
})
}, cancel)
}

function coalesce(l, r) {
Expand All @@ -130,7 +146,7 @@ function Async(fn) {
composeB(resolve, l),
composeB(resolve, r)
)
})
}, cancel)
}

function map(fn) {
Expand All @@ -140,7 +156,7 @@ function Async(fn) {

return Async(function(reject, resolve) {
fork(reject, composeB(resolve, fn))
})
}, cancel)
}

function bimap(l, r) {
Expand All @@ -153,26 +169,29 @@ function Async(fn) {
composeB(reject, l),
composeB(resolve, r)
)
})
}, cancel)
}

function alt(m) {
var innerCancel = unit

if(!isSameType(Async, m)) {
throw new TypeError('Async.alt: Async required')
}

return Async((rej, res) => {
fork(
() => m.fork(rej, res),
() => { innerCancel = m.fork(rej, res) },
res
)
})
}, once(() => innerCancel(cancel())))
}

function ap(m) {
var fn, value
var fnDone = false
var valueDone = false
var innerCancel = unit

if(!isSameType(Async, m)) {
throw new TypeError('Async.ap: Async required')
Expand All @@ -197,15 +216,17 @@ function Async(fn) {
resolveBoth()
})

m.fork(rejectOnce, function(x) {
innerCancel = m.fork(rejectOnce, x => {
valueDone = true
value = x
resolveBoth()
})
})
}, once(() => { innerCancel(cancel()) }))
}

function chain(fn) {
var innerCancel = unit

if(!isFunction(fn)) {
throw new TypeError('Async.chain: Async returning function required')
}
Expand All @@ -218,9 +239,9 @@ function Async(fn) {
throw new TypeError('Async.chain: Function must return another Async')
}

m.fork(reject, resolve)
innerCancel = m.fork(reject, resolve)
})
})
}, once(() => { innerCancel(cancel()) }))
}

return {
Expand Down
120 changes: 116 additions & 4 deletions crocks/Async.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,6 @@ test('Async all resolution', t => {
Async.all([ Async.Resolved(val), Async.Resolved(val) ]).fork(rej(bad), res([ val, val ]))
Async.all([ Async.Rejected(bad), Async.Resolved(val) ]).fork(rej(bad), res([ val, val ]))
Async.all([]).fork(rej(bad), empty([]))

t.end()
})

test('Async inspect', t => {
Expand Down Expand Up @@ -261,23 +259,137 @@ test('Async fork', t => {
resolved.fork(noCall, res)
rejected.fork(rej, noCall)

t.ok(isFunction(Async.of(34).fork(unit, unit)), 'fork returns a function')
t.ok(res.calledWith('resolved'), 'calls resolved function when Async is resolved')
t.ok(rej.calledWith('rejected'), 'calls rejected function when Async is rejected')
t.notOk(noCall.called, 'does not call the other function')

t.end()
})

test('Async cancel chain cleanup functions', t => {
const resCleanUp = sinon.spy()
const rejCleanUp = sinon.spy()
const forkCleanUp = sinon.spy()

const cancel =
Async.of(0)
.chain(x => Async((_, res) => { res(x) }, resCleanUp))
.chain(x => Async(rej => { rej(x) }, rejCleanUp))
.fork(unit, unit, forkCleanUp)

cancel()

t.ok(rejCleanUp.calledAfter(resCleanUp), 'calls the first Async cleanup first')
t.ok(forkCleanUp.calledAfter(rejCleanUp), 'calls the fork cleanup last')

cancel()

t.ok(resCleanUp.calledOnce, 'calls the Async level cleanup only once')
t.ok(rejCleanUp.calledOnce, 'calls the Async level cleanup only once')
t.ok(forkCleanUp.calledOnce, 'calls the fork level cleanup only once')

t.end()
})

test('Async cancel ap cleanup functions', t => {
const resCleanUp = sinon.spy()
const rejCleanUp = sinon.spy()
const forkCleanUp = sinon.spy()

const cancel =
Async.of(x => y => [ x, y ])
.ap(Async((_, res) => { res(1) }, resCleanUp))
.ap(Async((_, rej) => { rej(1) }, rejCleanUp))
.fork(unit, unit, forkCleanUp)

cancel()

t.ok(rejCleanUp.calledAfter(resCleanUp), 'calls the first Async cleanup first')
t.ok(forkCleanUp.calledAfter(resCleanUp), 'calls the fork cleanup last')

cancel()

t.ok(resCleanUp.calledOnce, 'calls the resolved Async level cleanup only once')
t.ok(rejCleanUp.calledOnce, 'calls the rejected Async level cleanup only once')
t.ok(forkCleanUp.calledOnce, 'calls the fork level cleanup only once')

t.end()
})

test('Async cancel alt cleanup functions', t => {
const rejCleanUp = sinon.spy()
const resCleanUp = sinon.spy()
const forkCleanUp = sinon.spy()

const cancel =
Async.Rejected(0)
.alt(Async(rej => { rej(1) }, rejCleanUp))
.alt(Async((_, res) => { res(1) }, resCleanUp))
.fork(unit, unit, forkCleanUp)

cancel()

t.ok(resCleanUp.calledAfter(rejCleanUp), 'calls the first Async cleanup first')
t.ok(forkCleanUp.calledAfter(resCleanUp), 'calls the fork cleanup last')

cancel()

t.ok(rejCleanUp.calledOnce, 'calls the rejected Async level cleanup only once')
t.ok(resCleanUp.calledOnce, 'calls the resolved Async level cleanup only once')
t.ok(forkCleanUp.calledOnce, 'calls the fork level cleanup only once')

t.end()
})

test('Async cancel cancellation', t => {
t.plan(5)

function cancelTest(rejected, func) {
return function() {
return Async((rej, res) => setTimeout(() => rejected ? rej(0) : res(0)))[func].apply(null, arguments).fork(
t.fail.bind(t, `reject called after a ${func}`),
t.fail.bind(t, `resolve called after a ${func}`)
)
}
}

const swap = sinon.spy()
const coalesce = sinon.spy()
const map = sinon.spy()
const bimap = sinon.spy()
const chain = sinon.spy(x => Async.of(x))

cancelTest(true, 'swap')(swap, swap)()
cancelTest(false, 'swap')(swap, swap)()
cancelTest(true, 'coalesce')(coalesce, coalesce)()
cancelTest(false, 'coalesce')(coalesce, coalesce)()
cancelTest(true, 'map')(map)()
cancelTest(false, 'map')(map)()
cancelTest(true, 'bimap')(bimap, bimap)()
cancelTest(false, 'bimap')(bimap, bimap)()
cancelTest(true, 'chain')(chain)()
cancelTest(false, 'chain')(chain)()

setTimeout(() => {
t.notOk(swap.called, 'does not run swap')
t.notOk(coalesce.called, 'does not run coalesce')
t.notOk(map.called, 'does not run map')
t.notOk(bimap.called, 'does not run bimap')
t.notOk(chain.called, 'does not run chain')
})
})

test('Async toPromise', t => {
t.plan(2)

const val = 1337

const rej = y => x => t.equal(x, y, 'rejects a rejected Async')
const res = y => x => t.equal(x, y, 'resolves a resolved Async')

Async.Rejected(val).toPromise().then(res(val)).catch(rej(val))
Async.Resolved(val).toPromise().then(res(val)).catch(rej(val))

t.plan(2)
})

test('Async swap', t => {
Expand Down

0 comments on commit c65b930

Please sign in to comment.