Skip to content

Commit

Permalink
support await after / await use (#92)
Browse files Browse the repository at this point in the history
* support await of use as alternative to after

* fix version check

* only queue after in in microtask when asyncQ has items

* ci trigger

* __proto__ -> Object.create, reuse then func

* expound upon await use usage via comparison with after example

* await use comprehensive test

* assimilate

* await after

* rm unused param

* assimilate -> _loadRegistered

* ci trigger

* node 6 test compat

* rm version check in tests

* add failing test for after grouping

* set then directly on instance for chaining compatiblity

* autoStart: false support

* Avoid chainable await as it breaks override

* rm stray todo, dry code (thenify)

Co-authored-by: Matteo Collina <[email protected]>
  • Loading branch information
davidmarkclements and mcollina authored Feb 12, 2020
1 parent 2c89091 commit 04b95b9
Show file tree
Hide file tree
Showing 9 changed files with 1,118 additions and 378 deletions.
94 changes: 87 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async function third (instance, opts) {
* <a href="#constructor"><code><b>avvio()</b></code></a>
* <a href="#use"><code>instance.<b>use()</b></code></a>
* <a href="#after"><code>instance.<b>after()</b></code></a>
* <a href="#await-after"><code>await instance.<b>after()</b></code></a>
* <a href="#ready"><code>instance.<b>ready()</b></code></a>
* <a href="#start"><code>instance.<b>start()</b></code></a>
* <a href="#override"><code>instance.<b>override()</b></code></a>
Expand Down Expand Up @@ -144,7 +145,7 @@ app.on('start', () => {
-------------------------------------------------------
<a name="use"></a>

### app.use(func, [optsOrFunc])
### app.use(func, [optsOrFunc]) => Thenable

Loads one or more functions asynchronously.

Expand All @@ -163,17 +164,62 @@ app.use(plugin)
`done` should be called only once, when your plugin is ready to go. Additional
calls to `done` are ignored.

async/await is also supported:
`use` returns a thenable wrapped instance on which `use` is called, to support a chainable API that can also be awaited.

This way, async/await is also supported and `use` can be awaited instead of using `after`.

Example using `after`:

```js
async function plugin (server, opts) {
await sleep(10)
async function main () {
console.log('begin')
app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
app.after(async (err) => {
if (err) throw err
console.log('then this')
})
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

app.use(plugin)
Example using `await after`:


```js
async function main () {
console.log('begin')
app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
await app.after()
console.log('then this')
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

`use` returns the instance on which `use` is called, to support a chainable API.
Example using `await use`:

```js
async function main () {
console.log('begin')
await app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
console.log('then this')
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

A function that returns the options argument instead of an object is supported as well:

Expand Down Expand Up @@ -242,6 +288,8 @@ in [`ready`](#ready).
Calls a function after all the previously defined plugins are loaded, including
all their dependencies. The `'start'` event is not emitted yet.

Note: `await after` can be used as an awaitable alternative to `after(func)`, or `await use` can be also as a shorthand for `use(plugin); await after()`.

The callback changes basing on the parameters your are giving:
1. If no parameter is given to the callback and there is an error, that error will be passed to the next error handler.
2. If one parameter is given to the callback, that parameter will be the `error` object.
Expand Down Expand Up @@ -290,7 +338,39 @@ app.after(async function () {

`done` must be called only once.

Returns the instance on which `after` is called, to support a chainable API.
If called with a function, it returns the instance on which `after` is called, to support a chainable API.

-------------------------------------------------------
<a name="await-after"></a>

### await app.after() | app.after() => Promise

Calling after with no function argument loads any plugins previously registered via `use` and returns a promise which resolves when all plugins registered so far have loaded.

```js
async function main () {
app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
app.use(async function (server, opts) {
await sleep(10)
console.log('this second')
})
console.log('before after')
await app.after()
console.log('after after')
app.use(async function (server, opts) {
await sleep(10)
console.log('this third')
})
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

Unlike `after` and `use`, `await after` is *not* chainable.

-------------------------------------------------------
<a name="ready"></a>
Expand Down
58 changes: 47 additions & 11 deletions boot.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ function wrap (server, opts, instance) {
throw new Error(readyKey + '() is already defined, specify an expose option')
}

server[useKey] = function (a, b, c) {
instance.use(a, b, c)
return this
server[useKey] = function (fn, opts) {
instance.use(fn, opts)
return instance
}

Object.defineProperty(server, 'then', { get: thenify.bind(instance) })

server[afterKey] = function (func) {
if (typeof func !== 'function') {
throw new Error('not a function')
return instance._loadRegistered()
}
instance.after(encapsulateThreeParam(func, this))
return this
Expand Down Expand Up @@ -105,6 +107,7 @@ function Boot (server, opts, done) {
this._current = []
this._error = null
this._isOnCloseHandlerKey = Symbol('isOnCloseHandler')
this._lastUsed = null

this.setMaxListeners(0)

Expand Down Expand Up @@ -133,18 +136,18 @@ function Boot (server, opts, done) {
}

this._doStart = null
const main = new Plugin(this, root.bind(this), opts, noop, 0)

main.once('start', (serverName, funcName, time) => {
this._root = new Plugin(this, root.bind(this), opts, noop, 0)
this._root.once('start', (serverName, funcName, time) => {
const nodeId = this.pluginTree.start(null, funcName, time)
main.once('loaded', (serverName, funcName, time) => {
this._root.once('loaded', (serverName, funcName, time) => {
this.pluginTree.stop(nodeId, time)
})
})

Plugin.loadPlugin.call(this, main, (err) => {
Plugin.loadPlugin.call(this, this._root, (err) => {
debug('root plugin ready')
this.emit('preReady')
this._root = null
if (err) {
this._error = err
if (this._readyQ.length() === 0) {
Expand Down Expand Up @@ -187,11 +190,31 @@ function assertPlugin (plugin) {

// load a plugin
Boot.prototype.use = function (plugin, opts) {
this._addPlugin(plugin, opts, false)

this._lastUsed = this._addPlugin(plugin, opts, false)
return this
}

Boot.prototype._loadRegistered = function (plugin) {
plugin = plugin || this._lastUsed
return new Promise((resolve) => {
if (plugin && !plugin.loaded) {
plugin.asyncQ.push(() => {
resolve()
})
} else {
resolve()
}

// if the root plugin is not loaded, let's resume that
// so one can use after() befor calling ready
if (!this.started && !this.booted) {
this._root.q.resume()
}
})
}

Object.defineProperty(Boot.prototype, 'then', { get: thenify })

Boot.prototype._addPlugin = function (plugin, opts, isAfter) {
assertPlugin(plugin)
opts = opts || {}
Expand Down Expand Up @@ -221,6 +244,8 @@ Boot.prototype._addPlugin = function (plugin, opts, isAfter) {
this._error = err
}
})

return obj
}

Boot.prototype.after = function (func) {
Expand Down Expand Up @@ -313,6 +338,17 @@ Boot.prototype.toJSON = function () {

function noop () { }

function thenify () {
// If the instance is ready, then there is
// nothing to await. This is true during
// await server.ready() as ready() resolves
// with the server, end we will end up here
// because of automatic promise chaining.
if (this.booted) return
const p = this._loadRegistered()
return p.then.bind(p)
}

function callWithCbOrNextTick (func, cb, context) {
context = this._server
var err = this._error
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"dependencies": {
"archy": "^1.0.0",
"debug": "^4.0.0",
"fastq": "^1.6.0"
"fastq": "^1.6.0",
"queue-microtask": "^1.1.2"
}
}
75 changes: 51 additions & 24 deletions plugin.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict'

const queueMicrotask = require('queue-microtask')
const fastq = require('fastq')
const EE = require('events').EventEmitter
const inherits = require('util').inherits
Expand Down Expand Up @@ -36,9 +36,13 @@ function Plugin (parent, func, optsOrFunc, isAfter, timeout) {
this.timeout = timeout === undefined ? parent._timeout : timeout
this.name = getName(func)
this.isAfter = isAfter

this.q = fastq(parent, loadPlugin, 1)
this.q.pause()
this.asyncQ = fastq(parent, (resolve, cb) => {
resolve(this.server)
cb()
}, 1)
this.asyncQ.pause()
this.loaded = false

// always start the queue in the next tick
Expand Down Expand Up @@ -73,6 +77,31 @@ Plugin.prototype.exec = function (server, cb) {

var timer

const done = (err) => {
if (completed) {
debug('loading complete', name)
return
}

if (err) {
debug('exec errored', name)

// In case of errors, we need to kickstart
// the asyncQ as it won't get started otherwise
this.asyncQ.resume()
} else {
debug('exec completed', name)
}

completed = true

if (timer) {
clearTimeout(timer)
}

cb(err)
}

if (this.timeout > 0) {
debug('setting up timeout', name, this.timeout)
timer = setTimeout(function () {
Expand All @@ -87,33 +116,22 @@ Plugin.prototype.exec = function (server, cb) {

this.emit('start', this.server ? this.server.name : null, this.name, Date.now())
var promise = func(this.server, this.opts, done)

if (promise && typeof promise.then === 'function') {
debug('resolving promise', name)
queueMicrotask(() => {
if (this.asyncQ.length() > 0) {
this.server.after(() => {
this.asyncQ.resume()
})
}
this.q.resume()
})

promise.then(
() => process.nextTick(done),
(e) => process.nextTick(done, e))
}

function done (err) {
if (completed) {
debug('loading complete', name)
return
}

if (err) {
debug('exec errored', name)
} else {
debug('exec completed', name)
}

completed = true

if (timer) {
clearTimeout(timer)
}

cb(err)
}
}

Plugin.prototype.enqueue = function (obj, cb) {
Expand Down Expand Up @@ -144,7 +162,16 @@ Plugin.prototype.finish = function (err, cb) {
const check = () => {
debug('check', this.name, this.q.length(), this.q.running())
if (this.q.length() === 0 && this.q.running() === 0) {
done()
if (this.asyncQ.length() > 0) {
this.asyncQ.drain = () => {
this.asyncQ.drain = noop
this.asyncQ.pause()
check()
}
this.asyncQ.resume()
} else {
done()
}
} else {
debug('delayed', this.name)
// finish when the queue of nested plugins to load is empty
Expand Down
Loading

0 comments on commit 04b95b9

Please sign in to comment.