Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support await after / await use #92

Merged
merged 19 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 87 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async function third (instance, opts) {
* <a href="#constructor"><code><b>avvio()</b></code></a>
* <a href="#use"><code>instance.<b>use()</b></code></a>
* <a href="#after"><code>instance.<b>after()</b></code></a>
* <a href="#await-after"><code>await instance.<b>after()</b></code></a>
* <a href="#ready"><code>instance.<b>ready()</b></code></a>
* <a href="#start"><code>instance.<b>start()</b></code></a>
* <a href="#override"><code>instance.<b>override()</b></code></a>
Expand Down Expand Up @@ -144,7 +145,7 @@ app.on('start', () => {
-------------------------------------------------------
<a name="use"></a>

### app.use(func, [optsOrFunc])
### app.use(func, [optsOrFunc]) => Thenable

Loads one or more functions asynchronously.

Expand All @@ -163,17 +164,62 @@ app.use(plugin)
`done` should be called only once, when your plugin is ready to go. Additional
calls to `done` are ignored.

async/await is also supported:
`use` returns a thenable wrapped instance on which `use` is called, to support a chainable API that can also be awaited.

This way, async/await is also supported and `use` can be awaited instead of using `after`.

Example using `after`:

```js
async function plugin (server, opts) {
await sleep(10)
async function main () {
console.log('begin')
app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
app.after(async (err) => {
if (err) throw err
console.log('then this')
})
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

app.use(plugin)
Example using `await after`:


```js
async function main () {
console.log('begin')
app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
await app.after()
console.log('then this')
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

`use` returns the instance on which `use` is called, to support a chainable API.
Example using `await use`:

```js
async function main () {
davidmarkclements marked this conversation as resolved.
Show resolved Hide resolved
console.log('begin')
await app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
console.log('then this')
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

A function that returns the options argument instead of an object is supported as well:

Expand Down Expand Up @@ -242,6 +288,8 @@ in [`ready`](#ready).
Calls a function after all the previously defined plugins are loaded, including
all their dependencies. The `'start'` event is not emitted yet.

Note: `await after` can be used as an awaitable alternative to `after(func)`, or `await use` can be also as a shorthand for `use(plugin); await after()`.

The callback changes basing on the parameters your are giving:
1. If no parameter is given to the callback and there is an error, that error will be passed to the next error handler.
2. If one parameter is given to the callback, that parameter will be the `error` object.
Expand Down Expand Up @@ -290,7 +338,39 @@ app.after(async function () {

`done` must be called only once.

Returns the instance on which `after` is called, to support a chainable API.
If called with a function, it returns the instance on which `after` is called, to support a chainable API.

-------------------------------------------------------
<a name="await-after"></a>

### await app.after() | app.after() => Promise

Calling after with no function argument loads any plugins previously registered via `use` and returns a promise which resolves when all plugins registered so far have loaded.

```js
async function main () {
app.use(async function (server, opts) {
await sleep(10)
console.log('this first')
})
app.use(async function (server, opts) {
await sleep(10)
console.log('this second')
})
console.log('before after')
await app.after()
console.log('after after')
app.use(async function (server, opts) {
await sleep(10)
console.log('this third')
})
await app.ready()
console.log('ready')
}
main().catch((err) => console.error(err))
```

Unlike `after` and `use`, `await after` is *not* chainable.

-------------------------------------------------------
<a name="ready"></a>
Expand Down
58 changes: 47 additions & 11 deletions boot.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ function wrap (server, opts, instance) {
throw new Error(readyKey + '() is already defined, specify an expose option')
}

server[useKey] = function (a, b, c) {
instance.use(a, b, c)
return this
server[useKey] = function (fn, opts) {
instance.use(fn, opts)
return instance
}

Object.defineProperty(server, 'then', { get: thenify.bind(instance) })

server[afterKey] = function (func) {
if (typeof func !== 'function') {
throw new Error('not a function')
return instance._loadRegistered()
}
instance.after(encapsulateThreeParam(func, this))
return this
Expand Down Expand Up @@ -105,6 +107,7 @@ function Boot (server, opts, done) {
this._current = []
this._error = null
this._isOnCloseHandlerKey = Symbol('isOnCloseHandler')
this._lastUsed = null

this.setMaxListeners(0)

Expand Down Expand Up @@ -133,18 +136,18 @@ function Boot (server, opts, done) {
}

this._doStart = null
const main = new Plugin(this, root.bind(this), opts, noop, 0)

main.once('start', (serverName, funcName, time) => {
this._root = new Plugin(this, root.bind(this), opts, noop, 0)
this._root.once('start', (serverName, funcName, time) => {
const nodeId = this.pluginTree.start(null, funcName, time)
main.once('loaded', (serverName, funcName, time) => {
this._root.once('loaded', (serverName, funcName, time) => {
this.pluginTree.stop(nodeId, time)
})
})

Plugin.loadPlugin.call(this, main, (err) => {
Plugin.loadPlugin.call(this, this._root, (err) => {
debug('root plugin ready')
this.emit('preReady')
this._root = null
if (err) {
this._error = err
if (this._readyQ.length() === 0) {
Expand Down Expand Up @@ -187,11 +190,31 @@ function assertPlugin (plugin) {

// load a plugin
Boot.prototype.use = function (plugin, opts) {
this._addPlugin(plugin, opts, false)

this._lastUsed = this._addPlugin(plugin, opts, false)
return this
}

Boot.prototype._loadRegistered = function (plugin) {
plugin = plugin || this._lastUsed
return new Promise((resolve) => {
if (plugin && !plugin.loaded) {
plugin.asyncQ.push(() => {
resolve()
})
} else {
resolve()
}

// if the root plugin is not loaded, let's resume that
// so one can use after() befor calling ready
if (!this.started && !this.booted) {
this._root.q.resume()
}
})
}
mcollina marked this conversation as resolved.
Show resolved Hide resolved

Object.defineProperty(Boot.prototype, 'then', { get: thenify })

Boot.prototype._addPlugin = function (plugin, opts, isAfter) {
assertPlugin(plugin)
opts = opts || {}
Expand Down Expand Up @@ -221,6 +244,8 @@ Boot.prototype._addPlugin = function (plugin, opts, isAfter) {
this._error = err
}
})

return obj
}

Boot.prototype.after = function (func) {
Expand Down Expand Up @@ -313,6 +338,17 @@ Boot.prototype.toJSON = function () {

function noop () { }

function thenify () {
// If the instance is ready, then there is
// nothing to await. This is true during
// await server.ready() as ready() resolves
// with the server, end we will end up here
// because of automatic promise chaining.
if (this.booted) return
const p = this._loadRegistered()
return p.then.bind(p)
}

function callWithCbOrNextTick (func, cb, context) {
context = this._server
var err = this._error
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"dependencies": {
"archy": "^1.0.0",
"debug": "^4.0.0",
"fastq": "^1.6.0"
"fastq": "^1.6.0",
"queue-microtask": "^1.1.2"
}
}
75 changes: 51 additions & 24 deletions plugin.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict'

const queueMicrotask = require('queue-microtask')
const fastq = require('fastq')
const EE = require('events').EventEmitter
const inherits = require('util').inherits
Expand Down Expand Up @@ -36,9 +36,13 @@ function Plugin (parent, func, optsOrFunc, isAfter, timeout) {
this.timeout = timeout === undefined ? parent._timeout : timeout
this.name = getName(func)
this.isAfter = isAfter

this.q = fastq(parent, loadPlugin, 1)
this.q.pause()
this.asyncQ = fastq(parent, (resolve, cb) => {
resolve(this.server)
cb()
}, 1)
this.asyncQ.pause()
this.loaded = false

// always start the queue in the next tick
Expand Down Expand Up @@ -73,6 +77,31 @@ Plugin.prototype.exec = function (server, cb) {

var timer

const done = (err) => {
if (completed) {
debug('loading complete', name)
return
}

if (err) {
debug('exec errored', name)

// In case of errors, we need to kickstart
// the asyncQ as it won't get started otherwise
this.asyncQ.resume()
} else {
debug('exec completed', name)
}

completed = true

if (timer) {
clearTimeout(timer)
}

cb(err)
}

if (this.timeout > 0) {
debug('setting up timeout', name, this.timeout)
timer = setTimeout(function () {
Expand All @@ -87,33 +116,22 @@ Plugin.prototype.exec = function (server, cb) {

this.emit('start', this.server ? this.server.name : null, this.name, Date.now())
var promise = func(this.server, this.opts, done)

if (promise && typeof promise.then === 'function') {
debug('resolving promise', name)
queueMicrotask(() => {
if (this.asyncQ.length() > 0) {
this.server.after(() => {
this.asyncQ.resume()
})
}
this.q.resume()
})

promise.then(
() => process.nextTick(done),
(e) => process.nextTick(done, e))
}

function done (err) {
if (completed) {
debug('loading complete', name)
return
}

if (err) {
debug('exec errored', name)
} else {
debug('exec completed', name)
}

completed = true

if (timer) {
clearTimeout(timer)
}

cb(err)
}
}

Plugin.prototype.enqueue = function (obj, cb) {
Expand Down Expand Up @@ -144,7 +162,16 @@ Plugin.prototype.finish = function (err, cb) {
const check = () => {
debug('check', this.name, this.q.length(), this.q.running())
if (this.q.length() === 0 && this.q.running() === 0) {
done()
if (this.asyncQ.length() > 0) {
this.asyncQ.drain = () => {
this.asyncQ.drain = noop
this.asyncQ.pause()
check()
}
this.asyncQ.resume()
} else {
done()
}
} else {
debug('delayed', this.name)
// finish when the queue of nested plugins to load is empty
Expand Down
Loading